Spark Streaming + Twitter : Analytics to one’s taste

Some days you feel handyman, some others you don’t. Today is one of those first.
In this post we’ll create a small application with Spark Streaming that allows us to perform analytics over Twitter in a very basic way. Filtering is just the beginning: the limit is your imagination 🙂

giphy (1)

Step 1: Create your OAuth token at Twitter

Before start coding like crazy, we have to get an OAuth token for being able to use Twitter’s API. It’s really easy and you can follow the steps as described in Twitter’s official documentation.

Once created, if you log in at apps.twitter.com and you click on the application you have just created, inside Keys and Access Tokens tab you’ll get all the data that our application requires:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

Step 2: Defining our DSL

For that purpose, you can fork or just clone the twitter-stream project that can be found at Scalera’s Github account. In this section, we’ll cover in a flash all components that take part in the project.

The core: Source

The core of this PoC is deploying a Spark Streaming application that reads from some source (in this case Twitter). We have to mention that Spark’s streaming is NRT (Near Real Time) given that, based on certain N window size, every N seconds a new batch will be created with all elements that were written in the source in that time window.

That data stream in Spark is called DStream, while each of those batches is called RDD. For people that is not familiarized with Spark, an RDD represents a collection that is distributed among the Spark cluster (Resilient Distributed Dataset).

If we take a look at Source trait (Later on we’ll see which functionality both Actions and Filters provide):

trait Source extends Actions with Filters {
  _: util.Config =>

  type Content = twitter4j.Status

  val conf = new SparkConf()
    .setAppName(config.getString(Config.AppName))
    .setMaster(config.getString(Config.SparkMaster))

  val windowTime = Seconds(config.getInt(Config.WindowSeconds))

  val ssc = new StreamingContext(conf, windowTime)

  lazy val stream = TwitterUtils.createStream(ssc, None, filters)

}

as you’ll see, we define some Spark configuration (SparkConf) from the parameters defined in the configuration file.

We also define the time window size and instantiate a StreamingContext that we will use to contain our next-declared stream. TwitterUtils (it’s imported) provides the way to instantiate the stream, which requires the Streaming context and the key words we want to use for filtering.

It’s good to know that, by creating the stream, it won’t start listening new items. It will be evaluated lazily. This is explained by the need of defining in an early moment all those actions that we will want to apply to each new batch in our stream.

Defining filters

No gimmicks: just a private synchronized variable that allows adding filters as a String sequence. When creating the stream, as we’ve seen when talking about Source, these filters are passed as parameters.

trait Filters {

  private var _filters: Seq[String] = Seq()

  def filters = synchronized(_filters)

  def filter(filters: String*): Unit =
    synchronized(_filters = _filters ++ filters.toSeq)

}

How to define stream’s behavior: Actions

The Actions trait is in charge of adding actions/handlers/callbacks to execute each time a new batch is received. These actions will be saved inside actions variable. For adding a new action (Action is just a type alias for a batch – RDD[Content] – to Unit function), method when is invoked.

Once we have defined all possible actions to perform, we’ll start receiving items in the stream after having invoked listen method. It will settle that all defined function inside actions will be applied over the stream and then it starts listening.

trait Actions {
  _: util.Config =>

  type Content

  type Action = RDD[Content] => Unit

  private var actions: List[Action] = List()

  val ssc: StreamingContext

  val stream: ReceiverInputDStream[Content]

  def listen(): Unit = {
    actions.foreach(f => 
      stream.foreachRDD(rdd => f(rdd)))
    ssc.start()
  }

  def when(action: Action): Unit = {
    actions = actions :+ action
  }

}

…the rest of it: Credentials and Analytics

Credentials has the purpose of reading from the configuration file all parameters relative to the security token and write them as JVM properties.

The Analytics trait extends from all previously defined components, so using it will be as easy as

object Boot extends Analytics

Example

In first place, we’re going to modify the configuration file for using the token we have just created: we replace the ‘secret’ word in src/main/resources/app.conf with real values.

Once this is done, we’ll add all the filters we want (for dealing only with the tweets than contain certain key workds):

  filter(
    "dance",
    "music"
  )

We’ll later indicate with one (or several) when clauses the actions to perform when a new bunch of tweets is arrived. For example, we count and print them out:

when { tweets =>
  logger.info(s"Received tweets [${tweets.count()}}]")
  tweets.foreach {
    tweet => logger.info(s"\n$tweet")
  }
}

For making our exaple work, you only have to execute (over the project dir):

sbt run

And tweets should start appearing among your messy blog output!

idwzlyrarjaylstrg8q7

Easy peasy 🙂

Anuncios

2 thoughts on “Spark Streaming + Twitter : Analytics to one’s taste

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s