This project has retired. For details please refer to its Attic page.

In some cases, if you don't need to keep track the user ID being created/deleted or user properties changes with events, then you can simplify the template as described in this example to get rid of '$set' events for users.

You can find the complete modified source code here.

Modification

DataSource.scala

Modify TrainingData class to remove the RDD of users.

1
2
3
4
5
6
7
8
9
class TrainingData(
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent]
) extends Serializable {
  override def toString = {
    s"items: [${items.count()} (${items.take(2).toList}...)]" +
    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
  }
}

Modify readTraining() function of DataSource to remove the RDD of (entityID, User).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  override
  def readTraining(sc: SparkContext): TrainingData = {

    // REMOVED
    // create a RDD of (entityID, User)
    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "user"
    )(sc).map { case (entityId, properties) =>
      val user = try {
        User()
      } catch {
        case e: Exception => {
          logger.error(s"Failed to get properties ${properties} of" +
            s" user ${entityId}. Exception: ${e}.")
          throw e
        }
      }
      (entityId, user)
    }.cache()

    ...

    new TrainingData(
      items = itemsRDD,
      viewEvents = viewEventsRDD
    )
  }

Preparator.scala

Modify Preparator to remove the RDD of users.

Modify Preparator's parpare() method:

1
2
3
4
5
6
7
8
  ...

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(
      items = trainingData.items,
      viewEvents = trainingData.viewEvents)
  }

Modify PreparedData class:

1
2
3
4
class PreparedData(
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent]
) extends Serializable

ALSAlgorithm.scala

Modify train() method:

  • remove the check of users in PreparedData
  • modify user index BiMap to extract the user ID from the viewEvents
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  def train(sc: SparkContext, data: PreparedData): ECommModel = {
    ...
    // REMOVED
    require(!data.users.take(1).isEmpty,
      s"users in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    ...
    // create User and item's String ID to integer index BiMap
    val userStringIntMap = BiMap.stringInt(data.viewEvents.map(_.user)) // MODIFIED
    val itemStringIntMap = BiMap.stringInt(data.items.keys)

    ...

  }

You are ready to run pio build, train and deploy as described in the Quick Start. Simply send the same queries as described in the Quick Start. The result will be the same.

That's it! Now your engine can get rid of '$set' events for users.