By default, the similar product template uses implicit preference, such as "view" event.
To handle explicit preference, such as "rating" given to item by users, you can customize the template. Higher "rating" means a stronger indication that the user likes the item.
This examples demonstrates how to modify similar product template to use "rate" event as Training Data. You can find the complete modified source code here.
Modification
DataSource.scala
In DataSource, change ViewEvent
case class to RateEvent. Add rating: Double
is added to the RateEvent.
Change
1 | case class ViewEvent(user: String, item: String, t: Long) |
to
1 2 | // MODIFIED case class RateEvent(user: String, item: String, rating: Double, t: Long) |
Modify TrainingData class to use rateEvent
1 2 3 4 5 6 7 8 9 10 11 12 | class TrainingData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val rateEvents: RDD[RateEvent] // MODIFIED ) extends Serializable { override def toString = { s"users: [${users.count()} (${users.take(2).toList}...)]" + s"items: [${items.count()} (${items.take(2).toList}...)]" + // MODIFIED s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)" } } |
Modify readTraining()
function of DataSource
to read "rate" events (commented with "// MODIFIED"). Replace all ViewEvent
with RateEvent
. Replace all viewEventsRDD
with rateEventsRDD
. Retrieve the rating value from the event properties:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | override def readTraining(sc: SparkContext): TrainingData = { ... // get all "user" "rate" "item" events val rateEventsRDD: RDD[RateEvent] = PEventStore.find( // MODIFIED appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("rate")), // MODIFIED // targetEntityType is optional field of an event. targetEntityType = Some(Some("item")))(sc) // eventsDb.find() returns RDD[Event] .map { event => val rateEvent = try { // MODIFIED event.event match { case "rate" => RateEvent( // MODIFIED user = event.entityId, item = event.targetEntityId.get, rating = event.properties.get[Double]("rating"), // ADDED t = event.eventTime.getMillis) case _ => throw new Exception(s"Unexpected event ${event} is read.") } } catch { case e: Exception => { logger.error(s"Cannot convert ${event} to RateEvent." + // MODIFIED s" Exception: ${e}.") throw e } } rateEvent // MODIFIED }.cache() new TrainingData( users = usersRDD, items = itemsRDD, rateEvents = rateEventsRDD // MODIFIED ) } |
Preparator.scala
Modify Preparator to pass rateEvents to algorithm as PreparedData (Replace all ViewEvent
with RateEvent
. Replace all viewEvents
with rateEvents
)
Modify Preparator's parpare()
method:
1 2 3 4 5 6 7 8 9 | ... def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { new PreparedData( users = trainingData.users, items = trainingData.items, rateEvents = trainingData.rateEvents) // MODIFIED } |
Modify PreparedData
class:
1 2 3 4 5 6 | class PreparedData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val rateEvents: RDD[RateEvent] // MODIFIED ) extends Serializable |
ALSAlgorithm.scala
Modify train()
method to train with rate event.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | def train(sc: SparkContext, data: PreparedData): ECommModel = { require(!data.rateEvents.take(1).isEmpty, // MODIFIED s"rateEvents in PreparedData cannot be empty." + // MODIFIED " Please check if DataSource generates TrainingData" + " and Preprator generates PreparedData correctly.") ... val mllibRatings = data.rateEvents // MODIFIED .map { r => ... ((uindex, iindex), (r.rating,r.t)) //MODIFIED }.filter { case ((u, i), v) => // keep events with valid user and item index (u != -1) && (i != -1) } .reduceByKey { case (v1, v2) => // MODIFIED // if a user may rate same item with different value at different times, // use the latest value for this case. // Can remove this reduceByKey() if no need to support this case. val (rating1, t1) = v1 val (rating2, t2) = v2 // keep the latest value if (t1 > t2) v1 else v2 } .map { case ((u, i), (rating, t)) => // MODIFIED // MLlibRating requires integer index for user and item MLlibRating(u, i, rating) // MODIFIED } .cache() ... } |
Modify train()
method to use ALS.trainImplicit()
:
Change the following from:
1 2 3 4 5 6 7 8 9 10 11 12 | ... val m = ALS.trainImplicit( ratings = mllibRatings, rank = ap.rank, iterations = ap.numIterations, lambda = ap.lambda, blocks = -1, alpha = 1.0, seed = seed) ... |
to:
1 2 3 4 5 6 7 8 9 10 11 | ... val m = ALS.train( // MODIFIED ratings = mllibRatings, rank = ap.rank, iterations = ap.numIterations, lambda = ap.lambda, blocks = -1, seed = seed) ... |
That's it! Now your engine can train model with rate events.