Spark Streaming: stateful streams (Android vs IOS?)

If you remember the post where we talk about how to connect Spark Streaming and Twitter, we said that the limit for performing analytics was up to your imagination.

In this post we’re going to propose some example that works for illustrating the idea of keeping a state based on the defined stream feed. Speaking plainly, we’ll see how popular is some Twitter topic compared to another one: Android vs IOS.

appleandroid

Stateful

The main idea is to have some S entity which state is fed by V-typed elements that are received and processed for each batch in the stream.

stateful stream

The way that the state is updated for a concrete instant Tn is by applying a user defined function that takes as parameters both the state that was at Tn-1 instant and the value set that provides the batch received at Tn instant:

updateState function (2)

A more casual notation for a common ‘scalero’ would be

type StateUpdate[S] = 
  (Option[S],Seq[V]) => Option[S]

Why Option[S]? For a good main reason: initially, when we first start listening on the stram, we don’t have any S state, so a (S,Seq[V]) => S function wouldn’t make sense.

And in practice?

Spark’s API for pair DStreams (PairDStreamFunctions) provides the following method for doing so:

def updateStateByKey[S]
  (updateFunc: (Seq[V], Option[S]) ⇒ Option[S])
  (implicit arg0: ClassTag[S]): DStream[(K, S)]

So for a DStream which is able to classify the elements by using a function that provides a key (we’ll see an example later on), we’ll be able to apply this method and get an S state (most of the cases, this state will refer to some aggregation over the values) for each key.

The method is overloaded, so you can specify a different partition apart from HashPartitioner, indicate a custom partition number or set an initial state (RDD[(K,S)]. Remember that, otherwise, initially our state for all K keys will be None).

The example

Let’s suppose we want to measure how strong is the rivalry between Android and IOS and we want to know which is the top trending topic at Twitter (in this example we won’t distinguish between against and in favour mentions).

Using the same project we proposed for the previous Spark + Twitter example, we’ll change the Boot.scala file so it looks like more to the following gist contents.

At first place, we have to enable the checkpointing dir and tweet filters that we are interested in:

  //  Set checkpoint dir

  ssc.checkpoint("/tmp")

  //  Add filters ...

  val Android = "android"
  val IOS = "ios"

  filter(
    Android,
    IOS
  )

We’ll next group our tweets based on whether these tweets contain the ‘Android’ or ‘IOS’ filter (if the tweet contains both, it will be counted in both sides). The result we get is another DStream but that contains key-value pairs (filter-tweet):

val groupedTweets = stream.flatMap{content =>
  List(Android, IOS).flatMap(key =>
    if (content.getText.contains(key)) 
      Option(key -> content)
    else None)
  }

Once we have grouped the tweets, we create a new DStream from the previous one by using the function we defined at the beginning of this post updateStateByKey, so the S state that we were talking about would be the sum of tweets for each key word:

val aggregatedTweets: DStream[(String,Long)] =
  groupedTweets.updateStateByKey{
    (newTweets, previousState) =>
      val newTweetsAmount = newTweets.size.toLong
      previousState.fold(
        Some(newTweetsAmount))(previousSize =>
        Some(previousSize + newTweetsAmount))
  }

The only ‘tricky’ part to understand could be the fold code part, but it’s simple. It actually indicates that, in case of having a previous amount (previous state) we just add the new amount to it. Otherwise, we use the new amount.

Apart from this, we make our snippet work by printing these figures and we start listening at the stream:

//  And add actions to perform (like printing the aggregatedTweets) ...

aggregatedTweets.foreachRDD{ results =>
  results.foreach{
    case (team, amount) => 
      logger.info(s">>> $team : $amount")
  }
}

// ... and begin listening

listen()

Can you think about another way to make the stream more interesting? Playing with tweets geo-location on a heat map, for example? 😉

Easy peasy!

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s