PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:
- Data - includes Data Source and Data Preparator
- Algorithm(s)
- Serving
- Evaluator
Let's look at the code and see how you can customize the engine you built from the Classification Engine Template.
The Engine Design
As you can see from the Quick Start, MyClassification takes a JSON prediction query, e.g. { "attr0":4, "attr1":3, "attr2":8 }
, and return a JSON predicted result.
In MyClassification/src/main/scala/Engine.scala, the Query
case class defines the format of query, such as { "attr0":4, "attr1":3, "attr2":8 }
:
1 2 3 4 5 6 | case class Query( attr0 : Double, attr1 : Double, attr2 : Double ) |
The PredictedResult
case class defines the format of predicted result, such as {"label":2.0}
:
1 2 3 | case class PredictedResult( val label: Double ) |
Finally, ClassificationEngine
is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.
1 2 3 4 5 6 7 8 9 | object ClassificationEngine extends IEngineFactory { def apply() = { new Engine( classOf[DataSource], classOf[Preparator], Map("naive" -> classOf[NaiveBayesAlgorithm]), classOf[Serving]) } } |
Spark MLlib
Spark's MLlib NaiveBayes algorithm takes training data of RDD type, i.e. RDD[LabeledPoint]
and train a model, which is a NaiveBayesModel
object.
PredictionIO's MLlib Classification engine template, which MyClassification bases on, integrates this algorithm under the DASE architecture. We will take a closer look at the DASE code below.
Check this out to learn more about MLlib's NaiveBayes algorithm.
Data
In the DASE architecture, data is prepared by 2 components sequentially: Data Source and Data Preparator. Data Source and Data Preparator takes data from the data store and prepares RDD[LabeledPoint]
for the NaiveBayes algorithm.
Data Source
In MyClassification/src/main/scala/DataSource.scala, the readTraining
method of the class DataSource
reads, and selects, data from datastore of EventServer and it returns TrainingData
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | case class DataSourceParams(appName: String) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] { @transient lazy val logger = Logger[this.type] override def readTraining(sc: SparkContext): TrainingData = { val labeledPoints: RDD[LabeledPoint] = PEventStore.aggregateProperties( appName = dsp.appName, entityType = "user", // only keep entities with these required properties defined required = Some(List("plan", "attr0", "attr1", "attr2")))(sc) // aggregateProperties() returns RDD pair of // entity ID and its aggregated properties .map { case (entityId, properties) => try { LabeledPoint(properties.get[Double]("plan"), Vectors.dense(Array( properties.get[Double]("attr0"), properties.get[Double]("attr1"), properties.get[Double]("attr2") )) ) } catch { case e: Exception => { logger.error(s"Failed to get properties ${properties} of" + s" ${entityId}. Exception: ${e}.") throw e } } }.cache() new TrainingData(labeledPoints) } } |
PEventStore
is an object which provides function to access data that is collected through the Event Server, and PEventStore.aggregateProperties
aggregates the event records of the 4 properties (attr0, attr1, attr2 and plan) for each user.
PredictionIO automatically loads the parameters of datasource specified in MyEngine/engine.json, including appName, to dsp
.
In engine.json:
1 2 3 4 5 6 7 8 9 | { ... "datasource": { "params": { "appName": "MyApp1" } }, ... } |
In this sample text data file, columns are delimited by comma (,). The first column are labels. The second column are features.
The class definition of TrainingData
is:
1 2 3 | class TrainingData( val labeledPoints: RDD[LabeledPoint] ) extends Serializable |
and PredictionIO passes the returned TrainingData
object to Data Preparator.
Data Preparator
In MyClassification/src/main/scala/Preparator.scala, the prepare
of class Preparator
takes TrainingData
. It then conducts any necessary feature selection and data processing tasks. At the end, it returns PreparedData
which should contain the data Algorithm needs. For MLlib NaiveBayes, it is RDD[LabeledPoint]
.
By default, prepare
simply copies the unprocessed TrainingData
data to PreparedData
:
1 2 3 4 5 6 7 8 9 10 11 | class PreparedData( val labeledPoints: RDD[LabeledPoint] ) extends Serializable class Preparator extends PPreparator[TrainingData, PreparedData] { def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { new PreparedData(trainingData.labeledPoints) } } |
PredictionIO passes the returned PreparedData
object to Algorithm's train
function.
Algorithm
In MyClassification/src/main/scala/NaiveBayesAlgorithm.scala, the two methods of the algorithm class are train
and predict
. train
is responsible for training a predictive model. PredictionIO will store this model and predict
is responsible for using this model to make prediction.
train(...)
train
is called when you run pio train. This is where MLlib NaiveBayes algorithm, i.e. NaiveBayes.train
, is used to train a predictive model.
1 2 3 | def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = { NaiveBayes.train(data.labeledPoints, ap.lambda) } |
In addition to RDD[LabeledPoint]
(i.e. data.labeledPoints
), NaiveBayes.train
takes 1 parameter: lambda.
The values of this parameter is specified in algorithms of MyClassification/engine.json:
1 2 3 4 5 6 7 8 9 10 11 12 | { ... "algorithms": [ { "name": "naive", "params": { "lambda": 1.0 } } ] ... } |
PredictionIO will automatically loads these values into the constructor ap
, which has a corresponding case class AlgorithmParams
:
1 2 3 | case class AlgorithmParams( lambda: Double ) extends Params |
NaiveBayes.train
then returns a NaiveBayesModel
model. PredictionIO will automatically store the returned model.
predict(...)
The predict
method is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as { "attr0":4, "attr1":3, "attr2":8 }
to the Query
class you defined previously.
The predictive model NaiveBayesModel
of MLlib NaiveBayes offers a function called predict
. predict
takes a dense vector of features. It predicts the label of the item represented by this feature vector.
1 2 3 4 5 6 | def predict(model: NaiveBayesModel, query: Query): PredictedResult = { val label = model.predict(Vectors.dense( query.attr0, query.attr1, query.attr2 )) PredictedResult(label) } |
You have defined the class
PredictedResult
earlier in this page.
PredictionIO passes the returned PredictedResult
object to Serving.
Serving
The serve
method of class Serving
processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.
In MyClassification/src/main/scala/Serving.scala,
1 2 3 4 5 6 7 8 9 | class Serving extends LServing[Query, PredictedResult] { override def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = { predictedResults.head } } |
When you send a JSON query to http://localhost:8000/queries.json, PredictedResult
from all models will be passed to serve
as a sequence, i.e. Seq[PredictedResult]
.
An engine can train multiple models if you specify more than one Algorithm component in
object RecommendationEngine
inside Engine.scala. Since only oneNaiveBayesAlgorithm
is implemented by default, thisSeq
contains one element.
In this case, serve
simply returns the predicted result of the first, and the only, algorithm, i.e. predictedResults.head
.
Congratulations! You have just learned how to customize and build a production-ready engine. Have fun!