This how-to tutorial would explain how you can also use $pio eval
to persist predicted result for a batch of queries. Please read the Evaluation to understand the usage of DataSource's readEval()
and the Evaluation component first.
1. Modify DataSource
Modify DataSource's readEval()
to generate the batch Queries which you want to run batch predict.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | override def readEval(sc: SparkContext) : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { // This function only return one evaluation data set // Create your own queries here. Below are provided as examples. // for example, you may get all distinct user id from the trainingData to create the Query val batchQueries: RDD[Query] = sc.parallelize( Seq( Query(user = "1", num = 10), Query(user = "3", num = 15), Query(user = "5", num = 20) ) ) val queryAndActual: RDD[(Query, ActualResult)] = batchQueries.map (q => // the ActualResult contain dummy empty rating array // because we not interested in Actual result for batch predict purpose. (q, ActualResult(Array())) ) val evalDataSet = ( readTraining(sc), new EmptyEvaluationInfo(), queryAndActual ) Seq(evalDataSet) } |
2. Add a new Evaluator
Create a new file BatchPersistableEvaluator.scala
. Unlike the MetricEvaluator
, this Evaluator simply writes the Query and corresponding PredictedResult to the output directory without performing any metrics calculation.
Note that output directory is specified by the variable outputDir
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | package org.template.recommendation import org.apache.predictionio.controller.EmptyEvaluationInfo import org.apache.predictionio.controller.Engine import org.apache.predictionio.controller.EngineParams import org.apache.predictionio.controller.EngineParamsGenerator import org.apache.predictionio.controller.Evaluation import org.apache.predictionio.controller.Params import org.apache.predictionio.core.BaseEvaluator import org.apache.predictionio.core.BaseEvaluatorResult import org.apache.predictionio.workflow.WorkflowParams import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.json4s.DefaultFormats import org.json4s.Formats import org.json4s.native.Serialization import grizzled.slf4j.Logger class BatchPersistableEvaluatorResult extends BaseEvaluatorResult {} class BatchPersistableEvaluator extends BaseEvaluator[ EmptyEvaluationInfo, Query, PredictedResult, ActualResult, BatchPersistableEvaluatorResult] { @transient lazy val logger = Logger[this.type] // A helper object for the json4s serialization case class Row(query: Query, predictedResult: PredictedResult) extends Serializable def evaluateBase( sc: SparkContext, evaluation: Evaluation, engineEvalDataSet: Seq[( EngineParams, Seq[(EmptyEvaluationInfo, RDD[(Query, PredictedResult, ActualResult)])])], params: WorkflowParams): BatchPersistableEvaluatorResult = { /** Extract the first data, as we are only interested in the first * evaluation. It is possible to relax this restriction, and have the * output logic below to write to different directory for different engine * params. */ require( engineEvalDataSet.size == 1, "There should be only one engine params") val evalDataSet = engineEvalDataSet.head._2 require(evalDataSet.size == 1, "There should be only one RDD[(Q, P, A)]") val qpaRDD = evalDataSet.head._2 // qpaRDD contains 3 queries we specified in readEval, the corresponding // predictedResults, and the dummy actual result. /** The output directory. Better to use absolute path if you run on cluster. * If your database has a Hadoop interface, you can also convert the * following to write to your database in parallel as well. */ val outputDir = "batch_result" logger.info("Writing result to disk") qpaRDD .map { case (q, p, a) => Row(q, p) } .map { row => // Convert into a json implicit val formats: Formats = DefaultFormats Serialization.write(row) } .saveAsTextFile(outputDir) logger.info(s"Result can be found in $outputDir") new BatchPersistableEvaluatorResult() } } |
3. Define Evaluation and EngineParamsGenerator object
Create a new file BatchEvaluation.scala
. Note that the new BatchPersistableEvaluator
is used. The BatchEngineParamsList
specifies the parameters of the engine.
Modify the appName parameter in DataSourceParams
to match your app name.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package org.template.recommendation import org.apache.predictionio.controller.EngineParamsGenerator import org.apache.predictionio.controller.EngineParams import org.apache.predictionio.controller.Evaluation object BatchEvaluation extends Evaluation { // Define Engine and Evaluator used in Evaluation /** * Specify the new BatchPersistableEvaluator. */ engineEvaluator = (RecommendationEngine(), new BatchPersistableEvaluator()) } object BatchEngineParamsList extends EngineParamsGenerator { // We only interest in a single engine params. engineParamsList = Seq( EngineParams( dataSourceParams = DataSourceParams(appName = "INVALID_APP_NAME", evalParams = None), algorithmParamsList = Seq(("als", ALSAlgorithmParams( rank = 10, numIterations = 20, lambda = 0.01, seed = Some(3L)))))) } |
4. build and run
Run the following command to build
1 | $ pio build
|
After the build is successful, you should see the following outputs:
1 | [INFO] [Console$] Your engine is ready for training. |
To run the BatchEvaluation
with BatchEngineParamsList
, run the following command:
1 | $ pio eval org.template.recommendation.BatchEvaluation org.template.recommendation.BatchEngineParamsList |
You should see the following outputs:
1 2 3 4 | [INFO] [BatchPersistableEvaluator] Writing result to disk [INFO] [BatchPersistableEvaluator] Result can be found in batch_result [INFO] [CoreWorkflow$] Updating evaluation instance with result: org.template.recommendation.BatchPersistableEvaluatorResult@2f886889 [INFO] [CoreWorkflow$] runEvaluation completed |
You should find the batch queries and the predicted results in the output directory batch_result/
.