use logger in scala pipeline












0















I have the code below. It comes from the blog post:



http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines



I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.



Code:



// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


// preprocessing & preparing Pipelines

// Indexers & Encoders

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")


val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")


// assemble all vectors in to one vector to input to Model

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

// Pipeline

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}


// bringing in data and removing null values

def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}


// save predictions

def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}

// fitting and testing

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}


// linear Regression

val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)


spark-shell output:



warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")


Update:



//package net.sparktutorials.examples

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}

def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()

val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}

def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}

def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}

def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")

val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")

logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}


Code in spark-shell:



:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression


output:



Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression









share|improve this question

























  • You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

    – Shaido
    Nov 21 '18 at 1:39











  • @Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

    – user3476463
    Nov 21 '18 at 21:26
















0















I have the code below. It comes from the blog post:



http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines



I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.



Code:



// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


// preprocessing & preparing Pipelines

// Indexers & Encoders

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")


val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")


// assemble all vectors in to one vector to input to Model

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

// Pipeline

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}


// bringing in data and removing null values

def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}


// save predictions

def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}

// fitting and testing

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}


// linear Regression

val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)


spark-shell output:



warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")


Update:



//package net.sparktutorials.examples

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}

def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()

val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}

def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}

def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}

def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")

val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")

logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}


Code in spark-shell:



:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression


output:



Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression









share|improve this question

























  • You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

    – Shaido
    Nov 21 '18 at 1:39











  • @Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

    – user3476463
    Nov 21 '18 at 21:26














0












0








0








I have the code below. It comes from the blog post:



http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines



I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.



Code:



// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


// preprocessing & preparing Pipelines

// Indexers & Encoders

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")


val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")


// assemble all vectors in to one vector to input to Model

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

// Pipeline

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}


// bringing in data and removing null values

def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}


// save predictions

def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}

// fitting and testing

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}


// linear Regression

val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)


spark-shell output:



warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")


Update:



//package net.sparktutorials.examples

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}

def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()

val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}

def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}

def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}

def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")

val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")

logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}


Code in spark-shell:



:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression


output:



Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression









share|improve this question
















I have the code below. It comes from the blog post:



http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines



I'm trying to forecast store sales with linear regression. I'm doing some data cleansing and then creating a ml pipeline. I'm a total scala newbee. I have a piece of the spark-shel messages that print to the screen when I run the script. I post the piece where it starts throwing errors. Can someone please let me know what the issue is and how to fix it? All tips are greatly appreciated.



Code:



// loading packages
// from example: http://www.sparktutorials.net/Spark+MLLib+-+Predict+Store+Sales+with+ML+Pipelines

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


// preprocessing & preparing Pipelines

// Indexers & Encoders

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")


val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")


// assemble all vectors in to one vector to input to Model

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open", "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

// Pipeline

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,dayOfWeekEncoder, dayOfMonthEncoder, assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}


// bringing in data and removing null values

def loadTrainingData(sqlContext:HiveContext):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// path to training data
// .load("../mlproject/rossman/train.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/train.csv")
.repartition(6)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
// load test data
// .load("../mlproject/rossman/test.csv")
.load("/Users/username/Desktop/stuff/comp/clint/store_forecast/input/test.csv")
.repartition(6)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}


// save predictions

def savePredictions(predictions:DataFrame, testRaw:DataFrame) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
// save predictions
.save("linear_regression_predictions.csv")
//.save("/Users/username/Desktop/stuff/comp/clint/store_forecast/Output/linear_regression_predictions.csv")
}

// fitting and testing

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}


// linear Regression

val data = loadTrainingData(sqlContext)
val Array(testRaw, testData) = loadKaggleTestData(sqlContext)

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
savePredictions(lrOut, testRaw)


spark-shell output:



warning: there were two deprecation warnings; re-run with -deprecation for details
loadKaggleTestData: (sqlContext: org.apache.spark.sql.hive.HiveContext)Array[org.apache.spark.sql.DataFrame]
savePredictions: (predictions: org.apache.spark.sql.DataFrame, testRaw: org.apache.spark.sql.DataFrame)Unit
<console>:146: error: not found: value logger
logger.info("Fitting data")
^
<console>:148: error: not found: value logger
logger.info("Now performing test on hold out set")


Update:



//package net.sparktutorials.examples

import org.apache.log4j.{Logger}
//core and SparkSQL
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
// ML Feature Creation, Tuning, Models, and Model Evaluation
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics


object RossmannRegression extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)

val stateHolidayIndexer = new StringIndexer()
.setInputCol("StateHoliday")
.setOutputCol("StateHolidayIndex")
val schoolHolidayIndexer = new StringIndexer()
.setInputCol("SchoolHoliday")
.setOutputCol("SchoolHolidayIndex")
val stateHolidayEncoder = new OneHotEncoder()
.setInputCol("StateHolidayIndex")
.setOutputCol("StateHolidayVec")
val schoolHolidayEncoder = new OneHotEncoder()
.setInputCol("SchoolHolidayIndex")
.setOutputCol("SchoolHolidayVec")
val dayOfMonthEncoder = new OneHotEncoder()
.setInputCol("DayOfMonth")
.setOutputCol("DayOfMonthVec")
val dayOfWeekEncoder = new OneHotEncoder()
.setInputCol("DayOfWeek")
.setOutputCol("DayOfWeekVec")
val storeEncoder = new OneHotEncoder()
.setInputCol("Store")
.setOutputCol("StoreVec")

val assembler = new VectorAssembler()
.setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
"DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
.setOutputCol("features")

def preppedLRPipeline():TrainValidationSplit = {
val lr = new LinearRegression()

val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, lr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.75)
tvs
}

def preppedRFPipeline():TrainValidationSplit = {
val dfr = new RandomForestRegressor()

val paramGrid = new ParamGridBuilder()
.addGrid(dfr.minInstancesPerNode, Array(1, 5, 15))
.addGrid(dfr.maxDepth, Array(2, 4, 8))
.addGrid(dfr.numTrees, Array(20, 50, 100))
.build()

val pipeline = new Pipeline()
.setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
dayOfWeekEncoder, dayOfMonthEncoder,
assembler, dfr))

val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.55)
tvs
}

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
logger.info("Fitting data")
val model = tvs.fit(training)
logger.info("Now performing test on hold out set")
val holdout = model.transform(test).select("prediction","label")

// have to do a type conversion for RegressionMetrics
val rm = new RegressionMetrics(holdout.rdd.map(x =>
(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

logger.info("Test Metrics")
logger.info("Test Explained Variance:")
logger.info(rm.explainedVariance)
logger.info("Test R^2 Coef:")
logger.info(rm.r2)
logger.info("Test MSE:")
logger.info(rm.meanSquaredError)
logger.info("Test RMSE:")
logger.info(rm.rootMeanSquaredError)

model
}

def savePredictions(predictions:DataFrame, testRaw:DataFrame, filePath:String) = {
val tdOut = testRaw
.select("Id")
.distinct()
.join(predictions, testRaw("Id") === predictions("PredId"), "outer")
.select("Id", "Sales")
.na.fill(0:Double) // some of our inputs were null so we have to
// fill these with something
tdOut
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(filePath)
}

def loadTrainingData(sqlContext:HiveContext, filePath:String):DataFrame = {
val trainRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/train.csv")
.repartition(30)
trainRaw.registerTempTable("raw_training_data")

sqlContext.sql("""SELECT
double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek,
StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_training_data
""").na.drop()
}

def loadKaggleTestData(sqlContext:HiveContext, filePath:String) = {
val testRaw = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
//.load(filePath)
.load("/Users/username/Desktop/stuff/comp/clnt/store_forecast/input/test.csv")
.repartition(30)
testRaw.registerTempTable("raw_test_data")

val testData = sqlContext.sql("""SELECT
Id, double(Store) Store, int(Open) Open, double(DayOfWeek) DayOfWeek, StateHoliday,
SchoolHoliday, (double(regexp_extract(Date, '\d+-\d+-(\d+)', 1))) DayOfMonth
FROM raw_test_data
WHERE !(ISNULL(Id) OR ISNULL(Store) OR ISNULL(Open) OR ISNULL(DayOfWeek)
OR ISNULL(StateHoliday) OR ISNULL(SchoolHoliday))
""").na.drop() // weird things happen if you don't filter out the null values manually

Array(testRaw, testData) // got to hold onto testRaw so we can make sure
// to have all the prediction IDs to submit to kaggle
}

def main(args:Array[String]) = {
val name = "Linear Regression Application"
logger.info(s"Starting up $name")

val conf = new SparkConf().setAppName(name)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
// sc.setLogLevel("INFO")

logger.info("Set Up Complete")
val data = loadTrainingData(sqlContext, args(0))
val Array(testRaw, testData) = loadKaggleTestData(sqlContext, args(1))

// The linear Regression Pipeline
val linearTvs = preppedLRPipeline()
logger.info("evaluating linear regression")
val lrModel = fitModel(linearTvs, data)
logger.info("Generating kaggle predictions")
val lrOut = lrModel.transform(testData)
.withColumnRenamed("prediction","Sales")
.withColumnRenamed("Id","PredId")
.select("PredId", "Sales")
//savePredictions(lrOut, testRaw, "linear_predictions.csv")
savePredictions(lrOut, testRaw, "/Users/username/Desktop/stuff/comp/clnt/store_forecast/Output/linear_predictions.csv")
}
}


Code in spark-shell:



:load /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression


output:



Loading /Users/username/Desktop/stuff/comp/clnt/simple_spark_regression...
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
warning: there were 10 deprecation warnings; re-run with -deprecation for details
defined object RossmannRegression






scala apache-spark






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 21 '18 at 21:21







user3476463

















asked Nov 20 '18 at 23:39









user3476463user3476463

74521334




74521334













  • You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

    – Shaido
    Nov 21 '18 at 1:39











  • @Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

    – user3476463
    Nov 21 '18 at 21:26



















  • You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

    – Shaido
    Nov 21 '18 at 1:39











  • @Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

    – user3476463
    Nov 21 '18 at 21:26

















You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

– Shaido
Nov 21 '18 at 1:39





You are missing some code, e.g. the whole main method. There is a link to github on the page that has everything though (github.com/anabranch/simple-spark-regression/blob/master/src/…).

– Shaido
Nov 21 '18 at 1:39













@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

– user3476463
Nov 21 '18 at 21:26





@Shaido thank you for the tip. I've added an update to my original post with the complete code. Also I've tried running the script in spark-shell with ": load" I get several warnings but it doesn't output the final csv or print any of the logger steps to the screen. Is there something I'm missing when I try to run the script?

– user3476463
Nov 21 '18 at 21:26












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403244%2fuse-logger-in-scala-pipeline%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403244%2fuse-logger-in-scala-pipeline%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith

How to fix TextFormField cause rebuild widget in Flutter