This section gives you an overview of DASE components and how to implement them. You will find links to some engine templates for more concrete examples.

DataSource

DataSource reads and selects useful data from the Event Store (data store of the Event Server) and returns TrainingData.

readTraining()

You need to implment readTraining() of PDataSource, where you can use the PEventStore Engine API to read the events and create the TrainingData based on the events.

The following code example reads user "view" and "buy" item events, filters specific type of events for future processing and returns TrainingData accordingly.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val eventsRDD: RDD[Event] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("view", "buy")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      .cache()

    val viewEventsRDD: RDD[ViewEvent] = eventsRDD
      .filter { event => event.event == "view" }
      .map { ... }

    ...

    new TrainingData(...)
  }

}

Using PEventStore Engine API

Please see Event Server Overview to understand EventAPI and event modeling.

With PEventStore Engine API, you can easily read different events in DataSource and get the information you need.

For example, let's say you have events like the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
  "event": "myEvent",
  "entityType": "user",
  "entityId": "u0",
  "targetEntityType": "item",
  "targetEntityId": "i0",
  "properties" : {
    "a" : 3,
    "b" : "some_string",
    "c" : ["a", "b", "c"],
    "d" : [1.2, 3.4, 5.6],
    "e" : 6
  }
}

Then following code could read these events and extract the properties field of the event and convert it to a MyEvent object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  val myEvents: RDD[MyEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("myEvent")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
    .map { event =>
      try {
        MyEvent(
          entityId = event.entityId,
          targetEntityId = event.targetEntityId.get,
          a = event.properties.get[Int]("a"),
          b = event.properties.get[String]("b"),
          c = event.properties.get[List[String]]("c"),
          d = event.properties.get[List[Double]]("d"),
          e = event.properties.getOpt[Int]("e") // use getOpt for optional data
        )
      } catch {
        case e: Exception =>
          logger.error(s"Cannot convert ${event}. Exception: ${e}.")
          throw e
      }
    }

If you have used special events $set/$unset/$delete setting entity's properties, you can retrieve it with PEventStore.aggregateProperties().

Please see event modeling to understand usage of special $set/$unset/$delete events.

For example, the following code show how you could retrieve properties of the "item" entities:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    // create a RDD of (entityID, Item)
    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "item"
    )(sc).map { case (entityId, properties) =>

      try {
        val item = Item(
          a = preopties.get[Int]("a"),
          b = properties.get[String]("b"),
          c = properties.get[List[String]]("c"),
          d = properties.get[List[Double]]("d"),
          e = properties.getOpt[Int]("e") // use getOpt for optional data
        )

        (entityId, item)
      } catch {
        case e: Exception =>
          logger.error(s"Failed to get properties ${properties} of ${entityId}. Exception: ${e}.")
          throw e
      }

    }

Example:

Preparator

Preparator is responsible for pre-processing TrainingData for any necessary feature selection and data processing tasks and generate PreparedData which contains the data the Algorithm needs.

A few example usages of Preparator:

  • Feature extraction
  • Common pre-processing logic if you have multiple algorithms
  • For simple cases, the Preparator may simply pass the same TrainingData as PreparedData for Algorithm.

prepare()

You need to implement the prepare() method of PPrepartor to perform such tasks.

Example:

Algorithm

The two methods of the Algorithm class are train() and predict():

train()

train() is responsible for training a predictive model. It is called when you run pio train. Apache PredictionIO will store this model.

predict()

predict() is responsible for using this model to make prediction. It is called when you send a JSON query to the engine. Note that predict() is called in real time.

Apache PredictionIO supports two types of algorithms:

P2LAlgorithm

For P2LAlgorithm, the Model is automatically serialized and persisted by Apache PredictionIO after training.

Implementing IPersistentModel and IPersistentModelLoader is optional for P2LAlgorithm.

Example:

PAlgorithm

PAlgorithm should be used when your Model contains RDD. The model produced by PAlgorithm is not persisted by default. To persist the model, you need to do the following:

  • The Model class should extend the IPersistentModel trait and implement the save() method for saving the model. The trait IPersistentModel requires a type parameter which is the class type of algorithm parameter.
  • Implement a Model factory object which extends the IPersistentModelLoader trait and implement the apply() for loading the model. The trait IPersistentModelLoader requires two type parameters which are the types of algorithm parameter and the model produced by the algorithm.

Example:

using LEventStore Engine API in predict()

You may use LEventStore.findByEntity() to retrieve events of a specific entity. For example, retrieve recent events of the user specified in the query) and use these recent events to make prediction in real time.

For example, the following code reads the recent 10 view events of query.user:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    val recentEvents = try {
      LEventStore.findByEntity(
        appName = ap.appName,
        // entityType and entityId is specified for fast lookup
        entityType = "user",
        entityId = query.user,
        eventNames = Some(List("view")),
        targetEntityType = Some(Some("item")),
        limit = Some(10),
        latest = true,
        // set time limit to avoid super long DB access
        timeout = Duration(200, "millis")
      )
    } catch {
      case e: scala.concurrent.TimeoutException =>
        logger.error(s"Timeout when read recent events." +
          s" Empty list is used. ${e}")
        Iterator[Event]()
      case e: Exception =>
        logger.error(s"Error when read recent events: ${e}")
        throw e
    }

Example:

Serving

serve()

You need to implement the serve() method of the class LServing. The serve() method processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model.

Example: