This section gives you an overview of DASE components and how to implement them. You will find links to some engine templates for more concrete examples.
DataSource
DataSource reads and selects useful data from the Event Store (data store of the Event Server) and returns TrainingData.
readTraining()
You need to implement readTraining() of PDataSource, where you can use the PEventStore Engine API to read the events and create the TrainingData based on the events.
The following code example reads user "view" and "buy" item events, filters specific type of events for future processing and returns TrainingData accordingly.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] { @transient lazy val logger = Logger[this.type] override def readTraining(sc: SparkContext): TrainingData = { val eventsRDD: RDD[Event] = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("view", "buy")), // targetEntityType is optional field of an event. targetEntityType = Some(Some("item")))(sc) .cache() val viewEventsRDD: RDD[ViewEvent] = eventsRDD .filter { event => event.event == "view" } .map { ... } ... new TrainingData(...) } } |
Using PEventStore Engine API
Please see Event Server Overview to understand EventAPI and event modeling.
With PEventStore Engine API, you can easily read different events in DataSource and get the information you need.
For example, let's say you have events like the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | { "event": "myEvent", "entityType": "user", "entityId": "u0", "targetEntityType": "item", "targetEntityId": "i0", "properties" : { "a" : 3, "b" : "some_string", "c" : ["a", "b", "c"], "d" : [1.2, 3.4, 5.6], "e" : 6 } } |
Then following code could read these events and extract the properties field of the event and convert it to a MyEvent
object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | val myEvents: RDD[MyEvent] = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("myEvent")), // targetEntityType is optional field of an event. targetEntityType = Some(Some("item")))(sc) .map { event => try { MyEvent( entityId = event.entityId, targetEntityId = event.targetEntityId.get, a = event.properties.get[Int]("a"), b = event.properties.get[String]("b"), c = event.properties.get[List[String]]("c"), d = event.properties.get[List[Double]]("d"), e = event.properties.getOpt[Int]("e") // use getOpt for optional data ) } catch { case e: Exception => logger.error(s"Cannot convert ${event}. Exception: ${e}.") throw e } } |
If you have used special events $set/$unset/$delete
setting entity's properties, you can retrieve it with PEventStore.aggregateProperties()
.
Please see event modeling to understand usage of special $set/$unset/$delete
events.
For example, the following code show how you could retrieve properties of the "item" entities:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | // create a RDD of (entityID, Item) val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( appName = dsp.appName, entityType = "item" )(sc).map { case (entityId, properties) => try { val item = Item( a = preopties.get[Int]("a"), b = properties.get[String]("b"), c = properties.get[List[String]]("c"), d = properties.get[List[Double]]("d"), e = properties.getOpt[Int]("e") // use getOpt for optional data ) (entityId, item) } catch { case e: Exception => logger.error(s"Failed to get properties ${properties} of ${entityId}. Exception: ${e}.") throw e } } |
Example:
Preparator
Preparator is responsible for pre-processing TrainingData
for any necessary feature selection and data processing tasks and generate PreparedData
which contains the data the Algorithm needs.
A few example usages of Preparator:
- Feature extraction
- Common pre-processing logic if you have multiple algorithms
- For simple cases, the Preparator may simply pass the same
TrainingData
asPreparedData
for Algorithm.
prepare()
You need to implement the prepare()
method of PPrepartor to perform such tasks.
Example:
- Preparator of Leading Scoring Template: it pre-processes the TrainingData and generate the feature vectors needed for the algorithm.
- Preparator of Similar Product Template: it simply passes the TrainingData as PreparedData for the algorithm.
Algorithm
The two methods of the Algorithm class are train() and predict():
train()
train() is responsible for training a predictive model. It is called when you run pio train
. Apache PredictionIO will store this model.
predict()
predict() is responsible for using this model to make prediction. It is called when you send a JSON query to the engine. Note that predict() is called in real time.
Apache PredictionIO supports two types of algorithms:
- P2LAlgorithm: trains a Model which does not contain RDD
- PAlgorithm: trains a Model which contains RDD
P2LAlgorithm
For P2LAlgorithm
, the Model is automatically serialized and persisted by Apache PredictionIO after training.
Implementing IPersistentModel
and IPersistentModelLoader
is optional for P2LAlgorithm.
Example:
PAlgorithm
PAlgorithm
should be used when your Model contains RDD. The model produced by PAlgorithm
is not persisted by default. To persist the model, you need to do the following:
- The Model class should extend the
IPersistentModel
trait and implement thesave()
method for saving the model. The traitIPersistentModel
requires a type parameter which is the class type of algorithm parameter. - Implement a Model factory object which extends the
IPersistentModelLoader
trait and implement theapply()
for loading the model. The traitIPersistentModelLoader
requires two type parameters which are the types of algorithm parameter and the model produced by the algorithm.
Example:
- Algorithm of Recommendation Template: it implements PAlgorithm and the IPersistentModel and IPersistentModelLoader.
- Algorithm of Vanilla Template: it walks through example of P2LAlgorithm and PAlgorithm.
using LEventStore Engine API in predict()
You may use LEventStore.findByEntity() to retrieve events of a specific entity. For example, retrieve recent events of the user specified in the query) and use these recent events to make prediction in real time.
For example, the following code reads the recent 10 view events of query.user
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | val recentEvents = try { LEventStore.findByEntity( appName = ap.appName, // entityType and entityId is specified for fast lookup entityType = "user", entityId = query.user, eventNames = Some(List("view")), targetEntityType = Some(Some("item")), limit = Some(10), latest = true, // set time limit to avoid super long DB access timeout = Duration(200, "millis") ) } catch { case e: scala.concurrent.TimeoutException => logger.error(s"Timeout when read recent events." + s" Empty list is used. ${e}") Iterator[Event]() case e: Exception => logger.error(s"Error when read recent events: ${e}") throw e } |
Example:
- Algorithm of E-Commerce Recommendation template: LEventStore.findByEntity() is used to retrieve all items seen by the user and filter them from recommendation in predict().
Serving
serve()
You need to implement the serve() method of the class LServing. The serve() method processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model.
Example:
- Serving of Similar Product Template: It simply returns the predicted result
- Serving of multi-algorithm examples of Similar Product Template: It combines the result of multiple algorithms and return