Project Predictionio has retired. For details please refer to its Attic page.

PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:

  • Data - includes Data Source and Data Preparator
  • Algorithm(s)
  • Serving
  • Evaluator

Let's look at the code and see how you can customize the engine you built from the Lead Scoring Engine Template.

Evaluator will not be covered in this tutorial.

The Engine Design

As you can see from the Quick Start, MyLeadScoring takes a JSON prediction query, e.g. '{ "landingPageId" : "example.com/page9", "referrerId" : "referrer10.com", "browser": "Firefox" }' , and return a JSON predicted result. In MyLeadScoring/src/main/scala/Engine.scala, the Query case class defines the format of such query:

1
2
3
4
5
case class Query(
  landingPageId: String,
  referrerId: String,
  browser: String
) extends Serializable

The PredictedResult case class defines the format of predicted result, such as

1
{"score":0.7466666666666667}

with:

1
2
3
case class PredictedResult(
  score: Double
) extends Serializable

Finally, LeadScoringEngine is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.

1
2
3
4
5
6
7
8
9
object LeadScoringEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("randomforest" -> classOf[RFAlgorithm]),
      classOf[Serving])
  }
}

Each DASE component of the LeadScoringEngine will be explained below.

By default, Spark's MLlib RandomForest algorithm is used.

Data

In the DASE architecture, data is prepared by 2 components sequentially: DataSource and DataPreparator. They take data from the data store and prepare them for Algorithm.

Data Source

In MyLeadScoring/src/main/scala/DataSource.scala, the readTraining method of class DataSource reads and selects data from the Event Store (data store of the Event Server). It returns TrainingData.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val viewPage: RDD[(String, Event)] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(Seq("view")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("page")))(sc)
      // PEventStore.find() returns RDD[Event]
      .map { event =>
        val sessionId = try {
          event.properties.get[String]("sessionId")
        } catch {
          case e: Exception => {
            logger.error(s"Cannot get sessionId from event ${event}. ${e}.")
            throw e
          }
        }
        (sessionId, event)
      }

    val buyItem: RDD[(String, Event)] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(Seq("buy")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      // PEventStore.find() returns RDD[Event]
      .map { event =>
        val sessionId = try {
          event.properties.get[String]("sessionId")
        } catch {
          case e: Exception => {
            logger.error(s"Cannot get sessionId from event ${event}. ${e}.")
            throw e
          }
        }
        (sessionId, event)
      }

    val session: RDD[Session] = viewPage.cogroup(buyItem)
      .map { case (sessionId, (viewIter, buyIter)) =>
        // the first view event of the session is the landing event
        val landing = viewIter.reduce{ (a, b) =>
          if (a.eventTime.isBefore(b.eventTime)) a else b
        }
        // any buy after landing
        val buy = buyIter.filter( b => b.eventTime.isAfter(landing.eventTime))
          .nonEmpty

        try {
          new Session(
            landingPageId = landing.targetEntityId.get,
            referrerId = landing.properties.getOrElse[String]("referrerId", ""),
            browser = landing.properties.getOrElse[String]("browser", ""),
            buy = buy
          )
        } catch {
          case e: Exception => {
            logger.error(s"Cannot create session data from ${landing}. ${e}.")
            throw e
          }
        }
      }.cache()

    new TrainingData(session)
  }
}

PredictionIO automatically loads the parameters of datasource specified in MyLeadScoring/engine.json, including appName, to dsp.

In engine.json:

1
2
3
4
5
6
7
8
9
{
  ...
  "datasource": {
    "params" : {
      "appName": "MyApp1"
    }
  },
  ...
}

In readTraining(), PEventStore is an object which provides function to access data that is collected by PredictionIO Event Server.

This Lead Scoring Engine Template requires "view" and "buy" events with sessionId in event property.

PEventStore.find(...) specifies the events that you want to read. In this case, "user view page" and "user buy item" events are read and then each is mapped to tuple of (sessionId, event). The event are then "cogrouped" by sessionId to find out the information in the session, such as first page view (landing page view), and whether the user converts (buy event), to create a RDD of Session as TrainingData:

1
2
3
4
5
6
7
8
9
10
11
case class Session(
  landingPageId: String,
  referrerId: String,
  browser: String,
  buy: Boolean // buy or not
) extends Serializable

class TrainingData(
  val session: RDD[Session]
) extends Serializable

PredictionIO then passes the returned TrainingData object to Data Preparator.

You could modify the DataSource to read other event other than the default buy if the definition of conversion is not "buy item" event.

Data Preparator

In MyLeadScoring/src/main/scala/Preparator.scala, the prepare method of class Preparator takes TrainingData as its input and performs any necessary feature selection and data processing tasks. At the end, it returns PreparedData which should contain the data Algorithm needs.

In this template, prepare will select the features from the Session object and convert them to the data required by the MLlib's RandomForest algorithm.

The PreparedData is defined as:

1
2
3
4
5
class PreparedData(
  val labeledPoints: RDD[LabeledPoint],
  val featureIndex: Map[String, Int],
  val featureCategoricalIntMap: Map[String, Map[String, Int]]
) extends Serializable

The LabeledPoint class is defined in Spark MLlib and it's required for the RandomForest Algorithm. The featureIndex is a Map of feature name to the position index in the feature vector. featureCategoricalIntMap is a Map of categorical feature name to the Map of categorical value map for this feature.

By default, the feature used for classification is "landingPage", "referrer" and "browser". Since these features contain categorical values, we need to create a map of categorical values to the integer values for the algorithm to use.

You can customize the template to use other features.

For example, if the feature "landingPage" can be any of the following values: "page1", "page2", "page3", "page4". We can create a categorical Int value Map, such as:

1
2
3
4
5
6
Map(
  "page1" -> 0,
  "page2" -> 1,
  "page3" -> 2,
  "page4" -> 3
)

Instead of manually create such Map, a helper method createCategoricalIntMap() is defined in Prepraator.scala for this purpose.

Each labeledPoint is a label and a feature vector. The element index of the vector for the corresponding feature is defined by featureIndex Map. By default, it's defined as

1
2
3
4
5
val featureIndex = Map(
  "landingPage" -> 0,
  "referrer" -> 1,
  "browser" -> 2
)

which means that index 0 of the feature vector is the "landingPage" feature, index 1 is "referrer" feature, and so on.

The prepare() of the Preparator class first finds out all possible categorical values for the features and create a categorical Int map. Then it converts to the Session object to the LabeledPoint by creating the feature vector and the label. In this case, the label is 1 if there is any conversion and 0 if there is no conversion:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Preparator extends PPreparator[TrainingData, PreparedData] {

  ...

  def prepare(sc: SparkContext, td: TrainingData): PreparedData = {

    // find out all values of the each feature
    val landingValues = td.session.map(_.landingPageId).distinct.collect
    val referrerValues = td.session.map(_.referrerId).distinct.collect
    val browserValues = td.session.map(_.browser).distinct.collect

    // map feature value to integer for each categorical feature
    val featureCategoricalIntMap = Map(
      "landingPage" -> createCategoricalIntMap(landingValues, ""),
      "referrer" -> createCategoricalIntMap(referrerValues, ""),
      "browser" -> createCategoricalIntMap(browserValues, "")
    )
    // index position of each feature in the vector
    val featureIndex = Map(
      "landingPage" -> 0,
      "referrer" -> 1,
      "browser" -> 2
    )

    // inject some default to cover default cases
    val defaults = Seq(
      new Session(
        landingPageId = "",
        referrerId = "",
        browser = "",
        buy = false
      ),
      new Session(
        landingPageId = "",
        referrerId = "",
        browser = "",
        buy = true
      ))

    val defaultRDD = sc.parallelize(defaults)
    val sessionRDD = td.session.union(defaultRDD)

    val labeledPoints: RDD[LabeledPoint] = sessionRDD.map { session =>
      logger.debug(s"${session}")
      val label = if (session.buy) 1.0 else 0.0

      val feature = new Array[Double](featureIndex.size)
      feature(featureIndex("landingPage")) =
        featureCategoricalIntMap("landingPage")(session.landingPageId).toDouble
      feature(featureIndex("referrer")) =
        featureCategoricalIntMap("referrer")(session.referrerId).toDouble
      feature(featureIndex("browser")) =
        featureCategoricalIntMap("browser")(session.browser).toDouble

      LabeledPoint(label, Vectors.dense(feature))
    }.cache()

    logger.debug(s"labelelPoints count: ${labeledPoints.count()}")
    new PreparedData(
      labeledPoints = labeledPoints,
      featureIndex = featureIndex,
      featureCategoricalIntMap = featureCategoricalIntMap)
  }
}

PredictionIO passes the returned PreparedData object to Algorithm's train function.

Algorithm

In MyLeadScoring/src/main/scala/ALSAlgorithm.scala, the two methods of the algorithm class are train and predict. train is responsible for training the predictive model; predict is responsible for using this model to make prediction.

The default algorithm is Spark's MLlib RandomForest algorithm.

Algorithm parameters

The Algorithm takes the following parameters, as defined by the AlgorithmParams case class:

1
2
3
4
5
6
7
8
case class RFAlgorithmParams(
  numTrees: Int,
  featureSubsetStrategy: String,
  impurity: String,
  maxDepth: Int,
  maxBins: Int,
  seed: Option[Int]
) extends Params

You can find more description of the parameters in MLlib's RandomForest documentation and Decision Tree documentation.

The values of these parameters can be specified in algorithms of MyLeadScoring/engine.json:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  ...
  "algorithms": [
    {
      "name": "randomforest",
      "params": {
        "numClasses": 3,
        "numTrees": 5,
        "featureSubsetStrategy": "auto",
        "impurity": "variance",
        "maxDepth": 4,
        "maxBins": 100,
        "seed" : 12345
      }
    }
  ]
  ...
}

PredictionIO will automatically loads these values into the constructor of the RFAlgorithm class.

1
2
3
4
class RFAlgorithm(val ap: RFAlgorithmParams)
  extends P2LAlgorithm[PreparedData, RFModel, Query, PredictedResult] {
    ...
}

train(...)

train is called when you run pio train to train a predictive model.

The algorithm first generates the categoricalFeaturesInfo which is required by the MLlib. This indicates how many categorical values for each categorical features. Then it calls RandomForest.trainRegressor() to train a RandomForestModel to predict the probability that the user may convert.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  def train(sc: SparkContext, pd: PreparedData): RFModel = {

    val categoricalFeaturesInfo = pd.featureCategoricalIntMap
      .map { case (f, m) =>
        (pd.featureIndex(f), m.size)
      }

    logger.info(s"categoricalFeaturesInfo: ${categoricalFeaturesInfo}")

    // use random seed if seed is not specified
    val seed = ap.seed.getOrElse(scala.util.Random.nextInt())

    val forestModel: RandomForestModel = RandomForest.trainRegressor(
      input = pd.labeledPoints,
      categoricalFeaturesInfo = categoricalFeaturesInfo,
      numTrees = ap.numTrees,
      featureSubsetStrategy = ap.featureSubsetStrategy,
      impurity = ap.impurity,
      maxDepth = ap.maxDepth,
      maxBins = ap.maxBins,
      seed = seed)

    new RFModel(
      forest = forestModel,
      featureIndex = pd.featureIndex,
      featureCategoricalIntMap = pd.featureCategoricalIntMap
    )
  }

PredictionIO will automatically store the returned model after training.

The RFModel stores the RandomForestModel, and the featureIndex and featureCategoricalIntMap:

1
2
3
4
5
6
7
class RFModel(
  val forest: RandomForestModel,
  val featureIndex: Map[String, Int],
  val featureCategoricalIntMap: Map[String, Map[String, Int]]
) extends Serializable {
  ...
}

predict(...)

predict is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as '{ "landingPageId" : "example.com/page9", "referrerId" : "referrer10.com", "browser": "Firefox" }' to the Query class you defined previously in Engine.scala.

The predict() function does the following:

  1. convert the Query to the required feature vector input
  2. use the RandomForestModel to predict the probability of conversion given this feature.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  ...

  def predict(model: RFModel, query: Query): PredictedResult = {

    val featureIndex = model.featureIndex
    val featureCategoricalIntMap = model.featureCategoricalIntMap

    val landingPageId = query.landingPageId
    val referrerId = query.referrerId
    val browser = query.browser

    // look up categorical feature Int for landingPageId
    val landingFeature = lookupCategoricalInt(
      featureCategoricalIntMap = featureCategoricalIntMap,
      feature = "landingPage",
      value = landingPageId,
      default = ""
    ).toDouble


    // look up categorical feature Int for referrerId
    val referrerFeature = lookupCategoricalInt(
      featureCategoricalIntMap = featureCategoricalIntMap,
      feature = "referrer",
      value = referrerId,
      default = ""
    ).toDouble

    // look up categorical feature Int for brwoser
    val browserFeature = lookupCategoricalInt(
      featureCategoricalIntMap = featureCategoricalIntMap,
      feature = "browser",
      value = browser,
      default = ""
    ).toDouble

    // create feature Array
    val feature = new Array[Double](model.featureIndex.size)
    feature(featureIndex("landingPage")) = landingFeature
    feature(featureIndex("referrer")) = referrerFeature
    feature(featureIndex("browser")) = browserFeature

    val score = model.forest.predict(Vectors.dense(feature))
    new PredictedResult(score)
  }

  ...

PredictionIO passes the returned PredictedResult object to Serving.

Serving

The serve method of class Serving processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.

In MyLeadScoring/src/main/scala/Serving.scala,

1
2
3
4
5
6
7
8
9
10
class Serving extends LServing[Query, PredictedResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}

When you send a JSON query to http://localhost:8000/queries.json, PredictedResult from all models will be passed to serve as a sequence, i.e. Seq[PredictedResult].

An engine can train multiple models if you specify more than one Algorithm component in object LeadScoringEngine inside Engine.scala and the corresponding parameters in engine.json. Since only one algorithm is implemented by default, this Seq contains one element.