This is more advanced example, we recommend you go through the DASE explanation first.
The default algorithm described in DASE uses user-to-item view events as training data. However, your application may have more than one type of events which you want to take into account, such as buy, rate and like events. One way to incorporate other types of events to improve the system is to add another algorithm to process these events, build a separated model and then combine the outputs of multiple algorithms during Serving.
In this example, we will add another algorithm to process like/dislike events. The final PredictedResults will be the combined outputs of both algorithms.
This example will demonstrate the following:
- Read multiple types of events
- Use positive and negative implicit events such as like and dislike with MLlib ALS algorithm
- Integrate multiple algorithms into one engine
The complete source code of this example can be found in here.
Step 1. Read "like" and "dislike" events as TrainingData
Modify the following in DataSource.scala:
- In addition to the original
ViewEvent
class, add a new classLikeEvent
which has a booleanlike
field to represent it's like or dislike event. - Add a new field
likeEvents
intoTrainingData
class to store theRDD[LikeEvent]
. - Modify DataSource's
readTraining()
function to read "like" and "dislike" events from the Event Store.
The modification is shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] { override def readTraining(sc: SparkContext): TrainingData = { ... // get all "user" "view" "item" events val viewEventsRDD: RDD[ViewEvent] = ... // ADDED // get all "user" "like" and "dislike" "item" events val likeEventsRDD: RDD[LikeEvent] = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("like", "dislike")), // targetEntityType is optional field of an event. targetEntityType = Some(Some("item")))(sc) // eventsDb.find() returns RDD[Event] .map { event => val likeEvent = try { event.event match { case "like" | "dislike" => LikeEvent( user = event.entityId, item = event.targetEntityId.get, t = event.eventTime.getMillis, like = (event.event == "like")) case _ => throw new Exception(s"Unexpected event ${event} is read.") } } catch { case e: Exception => { logger.error(s"Cannot convert ${event} to LikeEvent." + s" Exception: ${e}.") throw e } } likeEvent }.cache() new TrainingData( users = usersRDD, items = itemsRDD, viewEvents = viewEventsRDD, likeEvents = likeEventsRDD // ADDED ) } } ... case class LikeEvent( // ADDED user: String, item: String, t: Long, like: Boolean // true: like. false: dislike ) class TrainingData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val viewEvents: RDD[ViewEvent], val likeEvents: RDD[LikeEvent] // ADDED ) extends Serializable { override def toString = { s"users: [${users.count()} (${users.take(2).toList}...)]" + s"items: [${items.count()} (${items.take(2).toList}...)]" + s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" + // ADDED s"likeEvents: [${likeEvents.count()}] (${likeEvents.take(2).toList}...)" } } |
Step 2. Modify Preparator and PreparedData
In Preparator.scala, simply pass the newly added likeEvents
from TrainingData
to PreparedData
, as shown the code below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | class Preparator extends PPreparator[TrainingData, PreparedData] { def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { new PreparedData( users = trainingData.users, items = trainingData.items, viewEvents = trainingData.viewEvents, likeEvents = trainingData.likeEvents) // ADDED } } class PreparedData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val viewEvents: RDD[ViewEvent], val likeEvents: RDD[LikeEvent] // ADDED ) extends Serializable |
Step 3. Add a new algorithm to train model with likeEvents
For demonstration purpose, we also use MLlib ALS to train model with likeEvents
and hence the new algorithm class will share many common logic of the original algorithm. The only difference will be the train()
function - the original one trains model with viewEvents
while the new one uses likeEvents
. In this case, we can simply create a new algorithm which extends the original ALSAlgorithm
class and override the train()
function.
Using positive and negative implicit events (without explicit rating) with MLlib ALS
In the original ALSAlgorithm
, the train()
function calculates the number of times that the user has viewed the same item and then map it to MLlibRating
object. However, like/dislike event is boolean and one time preference, so it doesn't make sense to aggregate the events if the user has multiple like/dislike events on the same item. However, a user may like an item and change her mind to dislike the same item later, or vice versa. In this case, we could simply use the latest like or dislike event of the user for the same item.
In addition, MLlib ALS can handle negative preference with ALS.trainImplicit()
. Hence, we can map a dislike to rating of -1 and like to 1.
In summary, this new LikeAlgorithm
does the following:
- Extends original
ALSAlgorithm
class - Override
train()
to process thelikeEvents
inPreparedData
- Use the latest event if the user likes/dislikes the same item multiple times
- Map dislike to a
MLlibRating
object with rating of -1 and like to rating of 1 - Use the
MLlibRating
to train theALSModel
in the same way as the originalALSAlgorithm
- The
predict()
function is the same as the originalALSAlgorithm
It is shown in the code below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | package org.apache.predictionio.examples.similarproduct import org.apache.predictionio.data.storage.BiMap import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} import grizzled.slf4j.Logger // ADDED // Extend original ALSAlgorithm and override train() function to handle // like and dislike events class LikeAlgorithm(ap: ALSAlgorithmParams) extends ALSAlgorithm(ap) { @transient lazy override val logger = Logger[this.type] override def train(sc: SparkContext, data: PreparedData): ALSModel = { require(!data.likeEvents.take(1).isEmpty, s"likeEvents in PreparedData cannot be empty." + " Please check if DataSource generates TrainingData" + " and Preprator generates PreparedData correctly.") require(!data.users.take(1).isEmpty, s"users in PreparedData cannot be empty." + " Please check if DataSource generates TrainingData" + " and Preprator generates PreparedData correctly.") require(!data.items.take(1).isEmpty, s"items in PreparedData cannot be empty." + " Please check if DataSource generates TrainingData" + " and Preprator generates PreparedData correctly.") // create User and item's String ID to integer index BiMap val userStringIntMap = BiMap.stringInt(data.users.keys) val itemStringIntMap = BiMap.stringInt(data.items.keys) // collect Item as Map and convert ID to Int index val items: Map[Int, Item] = data.items.map { case (id, item) => (itemStringIntMap(id), item) }.collectAsMap.toMap val mllibRatings = data.likeEvents .map { r => // Convert user and item String IDs to Int index for MLlib val uindex = userStringIntMap.getOrElse(r.user, -1) val iindex = itemStringIntMap.getOrElse(r.item, -1) if (uindex == -1) logger.info(s"Couldn't convert nonexistent user ID ${r.user}" + " to Int index.") if (iindex == -1) logger.info(s"Couldn't convert nonexistent item ID ${r.item}" + " to Int index.") // key is (uindex, iindex) tuple, value is (like, t) tuple ((uindex, iindex), (r.like, r.t)) }.filter { case ((u, i), v) => // keep events with valid user and item index (u != -1) && (i != -1) }.reduceByKey { case (v1, v2) => // MODIFIED // An user may like an item and change to dislike it later, // or vice versa. Use the latest value for this case. val (like1, t1) = v1 val (like2, t2) = v2 // keep the latest value if (t1 > t2) v1 else v2 }.map { case ((u, i), (like, t)) => // MODIFIED // With ALS.trainImplicit(), we can use negative value to indicate // nagative siginal (ie. dislike) val r = if (like) 1 else -1 // MLlibRating requires integer index for user and item MLlibRating(u, i, r) } .cache() // MLLib ALS cannot handle empty training data. require(!mllibRatings.take(1).isEmpty, s"mllibRatings cannot be empty." + " Please check if your events contain valid user and item ID.") // seed for MLlib ALS val seed = ap.seed.getOrElse(System.nanoTime) val m = ALS.trainImplicit( ratings = mllibRatings, rank = ap.rank, iterations = ap.numIterations, lambda = ap.lambda, blocks = -1, alpha = 1.0, seed = seed) new ALSModel( productFeatures = m.productFeatures.collectAsMap.toMap, itemStringIntMap = itemStringIntMap, items = items ) } } |
Step 4. Modify Serving to combine multiple algorithms' outputs
When the engine is deployed, the Query is sent to all algorithms of the engine. The PredictedResults returned by all algorithms are passed to Serving component for further processing, as you could see that the argument predictedResults
of the serve()
function is type of Seq[PredictedResult]
.
In this example, the serve()
function at first standardizes the PredictedResults of each algorithm so that we can combine the scores of multiple algorithms by adding the scores of the same item. Then we can take the top N items as defined in query
.
Modify Serving.scala as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | class Serving extends LServing[Query, PredictedResult] { override def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = { // MODFIED val standard: Seq[Array[ItemScore]] = if (query.num == 1) { // if query 1 item, don't standardize predictedResults.map(_.itemScores) } else { // Standardize the score before combine val mvList: Seq[MeanAndVariance] = predictedResults.map { pr => meanAndVariance(pr.itemScores.map(_.score)) } predictedResults.zipWithIndex .map { case (pr, i) => pr.itemScores.map { is => // standardize score (z-score) // if standard deviation is 0 (when all items have the same score, // meaning all items are ranked equally), return 0. val score = if (mvList(i).stdDev == 0) { 0 } else { (is.score - mvList(i).mean) / mvList(i).stdDev } ItemScore(is.item, score) } } } // sum the standardized score if same item val combined = standard.flatten // Array of ItemScore .groupBy(_.item) // groupBy item id .mapValues(itemScores => itemScores.map(_.score).reduce(_ + _)) .toArray // array of (item id, score) .sortBy(_._2)(Ordering.Double.reverse) .take(query.num) .map { case (k,v) => ItemScore(k, v) } PredictedResult(combined) } } |
Step 5. Modify Engine.scala and engine.json
Modify Engine.scala to include the new algorithm LikeAlgorithm
class and give it the name "likealgo"
as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | ... object SimilarProductEngine extends EngineFactory { def apply() = { new Engine( classOf[DataSource], classOf[Preparator], Map( "als" -> classOf[ALSAlgorithm], "cooccurrence" -> classOf[CooccurrenceAlgorithm], "likealgo" -> classOf[LikeAlgorithm]), // ADDED classOf[Serving]) } } ... |
Next, in order to train and deploy two algorithms for this engine, we also need to modify engine.json to include the new algorithm. The "algorithms"
parameter is an array of each algorithm's name and parameters. By default, it has the one algorithm "als"
and its parameter. Add another JSON for the new algorithm named "likealgo"
and its parameter to the "algorithms"
array, as shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | { ... "algorithms": [ { "name": "als", "params": { "rank": 10, "numIterations" : 20, "lambda": 0.01, "seed": 3 } }, { "name": "likealgo", "params": { "rank": 8, "numIterations" : 15, "lambda": 0.01, "seed": 3 } } ] } |
That's it! Now you have a engine configured with two algorithms.
Sample data with "like" and "dislike" events
For demonstration purpose, a sample import script is also provided for you to quickly test this engine. The script is modified from the original one used in Quick Start with the addition of importing like and dislike events.
You could find the import script in data/import_eventserver.py
.
Make sure you are under the App directory. Execute the following to import the data (Replace the value of access_key parameter with your Access Key):
1 | $ python data/import_eventserver.py --access_key 3mZWDzci2D5YsqAnqNnXH9SB6Rg3dsTBs8iHkK6X2i54IQsIZI1eEeQQyMfs7b3F
|
You are ready to run pio build, train and deploy as described in the Quick Start.