PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:
- Data - includes Data Source and Data Preparator
- Algorithm(s)
- Serving
- Evaluator
Let's look at the code and see how you can customize the engine you built from the Complementary Purchase Engine Template.
The Engine Design
As you can see from the Quick Start, MyComplementaryPurchase takes a JSON prediction query, e.g. { "items" : ["s2i1"], "num" : 3 }
, and return a JSON predicted result. In MyComplementaryPurchase/src/main/scala/Engine.scala, the Query
case class defines the format of such query:
1 2 | case class Query(items: Set[String], num: Int) extends Serializable |
The PredictedResult
case class defines the format of predicted result, such as
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | { "rules":[ { "cond":["s2i1"], "itemScores":[ { "item":"s2i2", "support":0.2, "confidence":0.9090909090909091, "lift":3.787878787878788 }, { "item":"s2i3", "support":0.14, "confidence":0.6363636363636364, "lift":3.535353535353535 } ] } ] } |
with:
1 2 3 4 5 6 7 8 9 | case class PredictedResult(rules: Array[Rule]) extends Serializable case class Rule(cond: Set[String], itemScores: Array[ItemScore]) extends Serializable case class ItemScore( item: String, support: Double, confidence: Double, lift: Double ) extends Serializable |
Finally, ComplementaryPurchaseEngine
is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.
1 2 3 4 5 6 7 8 9 | object ComplementaryPurchaseEngine extends IEngineFactory { def apply() = { new Engine( classOf[DataSource], classOf[Preparator], Map("algo" -> classOf[Algorithm]), classOf[Serving]) } } |
Each DASE component of the ComplementaryPurchaseEngine
will be explained below.
Data
In the DASE architecture, data is prepared by 2 components sequentially: DataSource and DataPreparator. They take data from the data store and prepare them for Algorithm.
Data Source
In MyComplementaryPurchase/src/main/scala/DataSource.scala, the readTraining
method of class DataSource
reads and selects data from the Event Store (data store of the Event Server). It returns TrainingData
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | case class DataSourceParams(appName: String) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] { @transient lazy val logger = Logger[this.type] override def readTraining(sc: SparkContext): TrainingData = { // get all "user" "buy" "item" events val buyEvents: RDD[BuyEvent] = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("buy")), targetEntityType = Some(Some("item")))(sc) .map { event => try { new BuyEvent( user = event.entityId, item = event.targetEntityId.get, t = event.eventTime.getMillis ) } catch { case e: Exception => { logger.error(s"Cannot convert ${event} to BuyEvent. ${e}") throw e } } }.cache() new TrainingData(buyEvents) } } |
PredictionIO automatically loads the parameters of datasource specified in MyComplementaryPurchase/engine.json, including appName, to dsp
.
In engine.json:
1 2 3 4 5 6 7 8 9 | { ... "datasource": { "params" : { "appName": "MyApp1" } }, ... } |
In readTraining()
, PEventStore
is an object which provides function to access data that is collected by PredictionIO Event Server.
This Complementary Purchase Engine Template requires "buy" events.
PEventStore.find(...)
specifies the events that you want to read. In this case, "user buy item" events are read and then each is mapped to a BuyEvent
object.
BuyEvent
case class is defined as:
1 | case class BuyEvent(user: String, item: String, t: Long) |
TrainingData
contains an RDD of BuyEvent
objects. The class definition of TrainingData
is:
1 2 3 | class TrainingData( val buyEvents: RDD[BuyEvent] ) extends Serializable { ... } |
PredictionIO then passes the returned TrainingData
object to Data Preparator.
Data Preparator
In MyComplementaryPurchase/src/main/scala/Preparator.scala, the prepare
method of class Preparator
takes TrainingData
as its input and performs any necessary feature selection and data processing tasks. At the end, it returns PreparedData
which should contain the data Algorithm needs.
By default, prepare
simply copies the unprocessed TrainingData
data to PreparedData
:
1 2 3 4 5 6 7 8 9 10 11 12 13 | class Preparator extends PPreparator[TrainingData, PreparedData] { @transient lazy val logger = Logger[this.type] def prepare(sc: SparkContext, td: TrainingData): PreparedData = { new PreparedData(buyEvents = td.buyEvents) } } class PreparedData( val buyEvents: RDD[BuyEvent] ) extends Serializable |
PredictionIO passes the returned PreparedData
object to Algorithm's train
function.
Algorithm
In MyComplementaryPurchase/src/main/scala/ALSAlgorithm.scala, the two methods of the algorithm class are train
and predict
. train
is responsible for training the predictive model; predict
is responsible for using this model to make prediction.
The default algorithm is based on concept of Association Rule Learning to find interesting association rules (A implies B) that indicates additional item (B) may be bought together given a list of items (A). A is the condition and B is the consequence.
Algorithm parameters
The Algorithm takes the following parameters, as defined by the AlgorithmParams
case class:
1 2 3 4 5 6 7 8 9 10 | case class AlgorithmParams( basketWindow: Int, // in seconds maxRuleLength: Int, minSupport: Double, minConfidence: Double, minLift: Double, minBasketSize: Int, maxNumRulesPerCond: Int // max number of rules per condition ) extends Params |
Parameter description:
- basketWindow: The buy event is considered as the same basket as previous one if the time difference is within this window (in unit of seconds). For example, if it's set to 120, it means that if the user buys item B within 2 minutes of previous purchase (item A), then the item set [A, B] is considered as the same basket. The purchase of this basket is referred as one transaction.
- maxRuleLength: The maximum length of the association rule length. Must be at least 2. For example, rule of "A implies B" has length of 2 while rule "A, B implies C" has a length of 3. Increasing this number will increase the training time significantly because more combinations are considered.
- minSupport: The minimum required support for the item set to be considered as rule (valid range is 0 to 1). It's the percentage of the item set appearing among all transactions. This is used to filter out infrequent item set. For example, setting to 0.1 means that the item set must appear in 10 % of all transactions.
- minConfidence: The minimum confidence required for the rules (valid range is 0 to 1). The confidence indicates the probability of the condition and conseuquence appear in the same transaction. For example, if A appears in 30 transactions and the item set [A, B] appears in 20 transactions, then the rule "A implies B" has confidence of 0.66.
- minLift: The minimum lift required for the rule. It should be set to 1 to find high quality rule. It's the confidence of the rule divided by the support of the consequence. It is used to filter out rules that the consequence is very frequent anyway regardless of the condition.
- minBasketSize: The minimum number of items in basket to be considered by algorithm. This value must be at least 2.
- maxNumRulesPerCond: Maximum number of rules generated per condition and stored in the model. By default, the top rules are sorted by lift score.
The values of these parameters can be specified in algorithms of MyComplementaryPurchase/engine.json:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | { ... "algorithms": [ { "name": "algo", "params": { "basketWindow" : 120, "maxRuleLength" : 2, "minSupport": 0.1, "minConfidence": 0.6, "minLift" : 1.0, "minBasketSize" : 2, "maxNumRulesPerCond": 5 } } ] ... } |
PredictionIO will automatically loads these values into the constructor of the Algorithm
class.
1 2 3 4 | class Algorithm(val ap: AlgorithmParams) extends P2LAlgorithm[PreparedData, Model, Query, PredictedResult] { ... } |
train(...)
train
is called when you run pio train to train a predictive model. The algorithm first find all basket transactions, generates and filters the association rules based on the algorithm parameters:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | def train(sc: SparkContext, pd: PreparedData): Model = { val windowMillis = ap.basketWindow * 1000 ... val transactions: RDD[Set[String]] = ... val totalTransaction = transactions.count() val minSupportCount = ap.minSupport * totalTransaction ... // generate item sets val itemSets: RDD[Set[String]] = transactions .flatMap { tran => (1 to ap.maxRuleLength).flatMap(n => tran.subsets(n)) } ... val itemSetCount: RDD[(Set[String], Int)] = ... ... val rules: RDD[(Set[String], RuleScore)] = ... val sortedRules = rules.groupByKey .mapValues(iter => iter.toVector .sortBy(_.lift)(Ordering.Double.reverse) .take(ap.maxNumRulesPerCond) ) .collectAsMap.toMap new Model(sortedRules) } |
PredictionIO will automatically store the returned model after training, i.e. the Model
object.
The Model
stores the top rules for each condition:
1 2 3 4 5 | class Model( val rules: Map[Set[String], Vector[RuleScore]] ) extends Serializable { ... } |
predict(...)
predict
is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as { "items" : ["s2i1"], "num" : 3 }
to the Query
class you defined previously in Engine.scala
.
The predict()
function does the following:
- find all possible subset of the items in query
- use the subsets as condition to look up the model and return the rules for each condition.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | ... def predict(model: Model, query: Query): PredictedResult = { val conds = (1 to maxCondLength).flatMap(n => query.items.subsets(n)) val rules = conds.map { cond => model.rules.get(cond).map{ vec => val itemScores = vec.take(query.num).map { rs => new ItemScore( item = rs.conseq, support = rs.support, confidence = rs.confidence, lift = rs.lift ) }.toArray Rule(cond = cond, itemScores = itemScores) } }.flatten.toArray new PredictedResult(rules) } ... |
PredictionIO passes the returned PredictedResult
object to Serving.
Serving
The serve
method of class Serving
processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.
In MyComplementaryPurchase/src/main/scala/Serving.scala,
1 2 3 4 5 6 7 8 9 10 11 | class Serving extends LServing[Query, PredictedResult] { @transient lazy val logger = Logger[this.type] override def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = { predictedResults.head } } |
When you send a JSON query to http://localhost:8000/queries.json, PredictedResult
from all models will be passed to serve
as a sequence, i.e. Seq[PredictedResult]
.