This project has retired. For details please refer to its Attic page.

PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:

  • Data - includes Data Source and Data Preparator
  • Algorithm(s)
  • Serving
  • Evaluator

Let's look at the code and see how you can customize the engine you built from the Complementary Purchase Engine Template.

Evaluator will not be covered in this tutorial.

The Engine Design

As you can see from the Quick Start, MyComplementaryPurchase takes a JSON prediction query, e.g. { "items" : ["s2i1"], "num" : 3 }, and return a JSON predicted result. In MyComplementaryPurchase/src/main/scala/Engine.scala, the Query case class defines the format of such query:

1
2
case class Query(items: Set[String], num: Int)
  extends Serializable

The PredictedResult case class defines the format of predicted result, such as

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "rules":[
    {
      "cond":["s2i1"],
      "itemScores":[
        {
          "item":"s2i2",
          "support":0.2,
          "confidence":0.9090909090909091,
          "lift":3.787878787878788
        },
        {
          "item":"s2i3",
          "support":0.14,
          "confidence":0.6363636363636364,
          "lift":3.535353535353535
        }
      ]
    }
  ]
}

with:

1
2
3
4
5
6
7
8
9
case class PredictedResult(rules: Array[Rule])
  extends Serializable

case class Rule(cond: Set[String], itemScores: Array[ItemScore])
  extends Serializable

case class ItemScore(
  item: String, support: Double, confidence: Double, lift: Double
) extends Serializable

Finally, ComplementaryPurchaseEngine is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.

1
2
3
4
5
6
7
8
9
object ComplementaryPurchaseEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("algo" -> classOf[Algorithm]),
      classOf[Serving])
  }
}

Each DASE component of the ComplementaryPurchaseEngine will be explained below.

Data

In the DASE architecture, data is prepared by 2 components sequentially: DataSource and DataPreparator. They take data from the data store and prepare them for Algorithm.

Data Source

In MyComplementaryPurchase/src/main/scala/DataSource.scala, the readTraining method of class DataSource reads and selects data from the Event Store (data store of the Event Server). It returns TrainingData.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    // get all "user" "buy" "item" events
    val buyEvents: RDD[BuyEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("buy")),
      targetEntityType = Some(Some("item")))(sc)
      .map { event =>
        try {
          new BuyEvent(
            user = event.entityId,
            item = event.targetEntityId.get,
            t = event.eventTime.getMillis
          )
        } catch {
          case e: Exception => {
            logger.error(s"Cannot convert ${event} to BuyEvent. ${e}")
            throw e
          }
        }
      }.cache()

    new TrainingData(buyEvents)
  }
}

PredictionIO automatically loads the parameters of datasource specified in MyComplementaryPurchase/engine.json, including appName, to dsp.

In engine.json:

1
2
3
4
5
6
7
8
9
{
  ...
  "datasource": {
    "params" : {
      "appName": "MyApp1"
    }
  },
  ...
}

In readTraining(), PEventStore is an object which provides function to access data that is collected by PredictionIO Event Server.

This Complementary Purchase Engine Template requires "buy" events.

PEventStore.find(...) specifies the events that you want to read. In this case, "user buy item" events are read and then each is mapped to a BuyEvent object.

BuyEvent case class is defined as:

1
case class BuyEvent(user: String, item: String, t: Long)

TrainingData contains an RDD of BuyEvent objects. The class definition of TrainingData is:

1
2
3
class TrainingData(
  val buyEvents: RDD[BuyEvent]
) extends Serializable { ... }

PredictionIO then passes the returned TrainingData object to Data Preparator.

You could modify the DataSource to read other event other than the default buy.

Data Preparator

In MyComplementaryPurchase/src/main/scala/Preparator.scala, the prepare method of class Preparator takes TrainingData as its input and performs any necessary feature selection and data processing tasks. At the end, it returns PreparedData which should contain the data Algorithm needs.

By default, prepare simply copies the unprocessed TrainingData data to PreparedData:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  @transient lazy val logger = Logger[this.type]

  def prepare(sc: SparkContext, td: TrainingData): PreparedData = {
    new PreparedData(buyEvents = td.buyEvents)
  }
}

class PreparedData(
  val buyEvents: RDD[BuyEvent]
) extends Serializable

PredictionIO passes the returned PreparedData object to Algorithm's train function.

Algorithm

In MyComplementaryPurchase/src/main/scala/ALSAlgorithm.scala, the two methods of the algorithm class are train and predict. train is responsible for training the predictive model; predict is responsible for using this model to make prediction.

The default algorithm is based on concept of Association Rule Learning to find interesting association rules (A implies B) that indicates additional item (B) may be bought together given a list of items (A). A is the condition and B is the consequence.

Algorithm parameters

The Algorithm takes the following parameters, as defined by the AlgorithmParams case class:

1
2
3
4
5
6
7
8
9
10
case class AlgorithmParams(
  basketWindow: Int, // in seconds
  maxRuleLength: Int,
  minSupport: Double,
  minConfidence: Double,
  minLift: Double,
  minBasketSize: Int,
  maxNumRulesPerCond: Int // max number of rules per condition
  ) extends Params

Parameter description:

  • basketWindow: The buy event is considered as the same basket as previous one if the time difference is within this window (in unit of seconds). For example, if it's set to 120, it means that if the user buys item B within 2 minutes of previous purchase (item A), then the item set [A, B] is considered as the same basket. The purchase of this basket is referred as one transaction.
  • maxRuleLength: The maximum length of the association rule length. Must be at least 2. For example, rule of "A implies B" has length of 2 while rule "A, B implies C" has a length of 3. Increasing this number will increase the training time significantly because more combinations are considered.
  • minSupport: The minimum required support for the item set to be considered as rule (valid range is 0 to 1). It's the percentage of the item set appearing among all transactions. This is used to filter out infrequent item set. For example, setting to 0.1 means that the item set must appear in 10 % of all transactions.
  • minConfidence: The minimum confidence required for the rules (valid range is 0 to 1). The confidence indicates the probability of the condition and conseuquence appear in the same transaction. For example, if A appears in 30 transactions and the item set [A, B] appears in 20 transactions, then the rule "A implies B" has confidence of 0.66.
  • minLift: The minimum lift required for the rule. It should be set to 1 to find high quality rule. It's the confidence of the rule divided by the support of the consequence. It is used to filter out rules that the consequence is very frequent anyway regardless of the condition.
  • minBasketSize: The minimum number of items in basket to be considered by algorithm. This value must be at least 2.
  • maxNumRulesPerCond: Maximum number of rules generated per condition and stored in the model. By default, the top rules are sorted by lift score.

If you import your own data and the engine doesn't return any results, it could be caused by the following reasons: (1) the algorithm parameter constraint is too high and the algo couldn't find rules that satisfy the condition. you could try setting the following param to 0: minSupport, minConfidence, minLift and then see if anything returned (regardless of recommendation quality), and then adjust the parameter accordingly. (2) the complementary purchase engine requires buy event with correct eventTime. If you import data without specifying eventTime, the SDK will use current time because it assumes the event happens in real time (which is not the case if you import as batch offline), resulting in that all buy events are treated as one big transaction while they should be treated as multiple transactions.

The values of these parameters can be specified in algorithms of MyComplementaryPurchase/engine.json:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  ...
  "algorithms": [
    {
      "name": "algo",
      "params": {
        "basketWindow" : 120,
        "maxRuleLength" : 2,
        "minSupport": 0.1,
        "minConfidence": 0.6,
        "minLift" : 1.0,
        "minBasketSize" : 2,
        "maxNumRulesPerCond": 5
      }
    }
  ]
  ...
}

PredictionIO will automatically loads these values into the constructor of the Algorithm class.

1
2
3
4
class Algorithm(val ap: AlgorithmParams)
  extends P2LAlgorithm[PreparedData, Model, Query, PredictedResult] {
    ...
}

train(...)

train is called when you run pio train to train a predictive model. The algorithm first find all basket transactions, generates and filters the association rules based on the algorithm parameters:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
  def train(sc: SparkContext, pd: PreparedData): Model = {
    val windowMillis = ap.basketWindow * 1000

    ...

    val transactions: RDD[Set[String]] = ...

    val totalTransaction = transactions.count()
    val minSupportCount = ap.minSupport * totalTransaction

    ...

    // generate item sets
    val itemSets: RDD[Set[String]] = transactions
      .flatMap { tran =>
        (1 to ap.maxRuleLength).flatMap(n => tran.subsets(n))
      }

    ...

    val itemSetCount: RDD[(Set[String], Int)] = ...

    ...

    val rules: RDD[(Set[String], RuleScore)] = ...

    val sortedRules = rules.groupByKey
      .mapValues(iter =>
        iter.toVector
          .sortBy(_.lift)(Ordering.Double.reverse)
          .take(ap.maxNumRulesPerCond)
        )
      .collectAsMap.toMap

    new Model(sortedRules)
  }

PredictionIO will automatically store the returned model after training, i.e. the Model object.

The Model stores the top rules for each condition:

1
2
3
4
5
class Model(
  val rules: Map[Set[String], Vector[RuleScore]]
) extends Serializable {
  ...
}

predict(...)

predict is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as { "items" : ["s2i1"], "num" : 3 } to the Query class you defined previously in Engine.scala.

The predict() function does the following:

  1. find all possible subset of the items in query
  2. use the subsets as condition to look up the model and return the rules for each condition.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  ...

  def predict(model: Model, query: Query): PredictedResult = {
    val conds = (1 to maxCondLength).flatMap(n => query.items.subsets(n))

    val rules = conds.map { cond =>
      model.rules.get(cond).map{ vec =>
        val itemScores = vec.take(query.num).map { rs =>
          new ItemScore(
            item = rs.conseq,
            support = rs.support,
            confidence = rs.confidence,
            lift = rs.lift
          )
        }.toArray
        Rule(cond = cond, itemScores = itemScores)
      }
    }.flatten.toArray

    new PredictedResult(rules)
  }

  ...

PredictionIO passes the returned PredictedResult object to Serving.

Serving

The serve method of class Serving processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.

In MyComplementaryPurchase/src/main/scala/Serving.scala,

1
2
3
4
5
6
7
8
9
10
11
class Serving
  extends LServing[Query, PredictedResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}

When you send a JSON query to http://localhost:8000/queries.json, PredictedResult from all models will be passed to serve as a sequence, i.e. Seq[PredictedResult].

An engine can train multiple models if you specify more than one Algorithm component in object ComplementaryPurchaseEngine inside Engine.scala and the corresponding parameters in engine.json. Since only one Algorithm is implemented by default, this Seq contains one element.