use logger in scala pipeline
I have the code below. It comes from the blog post:
http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.
Code:
// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
// preprocessing & preparing Pipelines
// Indexers & Encoders
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
// assemble all vectors in to one vector to input to Model
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
// Pipeline
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
// bringing in data and removing null values
def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
// save predictions
def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}
// fitting and testing
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
// linear Regression
val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
spark-shell output:
warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")
Update:
//package net.sparktutorials.examples
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()
val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}
def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")
val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")
logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}
Code in spark-shell:
:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression
output:
Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression
scala apache-spark
add a comment |
I have the code below. It comes from the blog post:
http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.
Code:
// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
// preprocessing & preparing Pipelines
// Indexers & Encoders
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
// assemble all vectors in to one vector to input to Model
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
// Pipeline
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
// bringing in data and removing null values
def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
// save predictions
def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}
// fitting and testing
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
// linear Regression
val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
spark-shell output:
warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")
Update:
//package net.sparktutorials.examples
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()
val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}
def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")
val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")
logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}
Code in spark-shell:
:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression
output:
Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression
scala apache-spark
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26
add a comment |
I have the code below. It comes from the blog post:
http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.
Code:
// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
// preprocessing & preparing Pipelines
// Indexers & Encoders
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
// assemble all vectors in to one vector to input to Model
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
// Pipeline
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
// bringing in data and removing null values
def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
// save predictions
def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}
// fitting and testing
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
// linear Regression
val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
spark-shell output:
warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")
Update:
//package net.sparktutorials.examples
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()
val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}
def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")
val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")
logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}
Code in spark-shell:
:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression
output:
Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression
scala apache-spark
I have the code below. It comes from the blog post:
http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.
Code:
// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
// preprocessing & preparing Pipelines
// Indexers & Encoders
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
// assemble all vectors in to one vector to input to Model
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
// Pipeline
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
// bringing in data and removing null values
def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
// save predictions
def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}
// fitting and testing
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
// linear Regression
val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)
spark-shell output:
warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")
Update:
//package net.sparktutorials.examples
import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")
val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")
def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}
def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()
val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()
val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}
def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")
// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)
model
}
def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}
def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")
sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}
def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")
val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually
Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}
def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")
val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")
logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))
// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}
Code in spark-shell:
:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression
output:
Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression
scala apache-spark
scala apache-spark
edited Nov 21 '18 at 21:21
user3476463
asked Nov 20 '18 at 23:39
user3476463user3476463
74521334
74521334
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26
add a comment |
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403244%2fuse-logger-in-scala-pipeline%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403244%2fuse-logger-in-scala-pipeline%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).
– Shaido
Nov 21 '18 at 1:39
@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?
– user3476463
Nov 21 '18 at 21:26