Spark Streaming + Twitter : Analytics al gusto

Hay días en que te sientes manitas y días en los que no. Hoy es de los primeros.
En este post vamos a crear una pequeña aplicación con Spark Streaming que nos permita realizar analytics de manera muy básica sobre Twitter. Filtrar es solo el principio: el límite lo que marque tu imaginación 🙂

giphy (1)

Paso 1: Crear token de OAuth en Twitter

Antes de ponernos a codificar como locos, debemos hacernos con un token de OAuth para poder usar la API de Twitter. Es realmente sencillo y podeis seguir los pasos tal y como se describe en la documentación oficial de Twitter.

Una vez creado, si os logais en apps.twitter.com y pulsáis sobre la aplicación que habéis creado, en la pestaña Keys and Access Tokens podréis obtener los datos que nos van a hacer falta en nuestra aplicación:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

Paso 2: Definiendo nuestro DSL

Para ello haced fork o simplemente descargaros el proyecto twitter-stream que se encuentra en el Github de Scalera. En esta sección daremos un repaso rápido a los componentes que forman el proyecto.

El core: Source

El core de esta prueba de concepto es desplegar una aplicación Spark Streaming que lee de una determinada fuente (en este caso de Twitter). Cabe destacar que el streaming de Spark es NRT (Near Real Time) ya que, en base a un tamaño de ventana de tiempo N, se creará un batch cada N segundos con todos los elementos que hayan entrado en dicha ventana.

Al stream de datos, en Spark, se le denomina DStream, mientras que cada uno de esos batches se llama RDD, que para la gente que no esté familiarizada con Spark core, representan una colección distribuida en el cluster de Spark (Resilient Distributed Dataset).

Si echamos un vistazo al trait Source (Más adelante veremos qué funcionalidad aportan Actions y Filters):

trait Source extends Actions with Filters {
  _: util.Config =>

  type Content = twitter4j.Status

  val conf = new SparkConf()
    .setAppName(config.getString(Config.AppName))
    .setMaster(config.getString(Config.SparkMaster))

  val windowTime = Seconds(config.getInt(Config.WindowSeconds))

  val ssc = new StreamingContext(conf, windowTime)

  lazy val stream = TwitterUtils.createStream(ssc, None, filters)

}

vemos que definimos una configuración de Spark (SparkConf) a partir de los parámetros definidos en el fichero de configuración.

También definimos el tamaño de ventana e instanciamos un StreamingContext sobre el que declarar nuestro stream stream. La forma de instanciarlo la aporta TwitterUtils (que importamos), y necesita el Streaming context y las palabras claves sobre las que filtraremos los tweets.

Es importante mencionar que, no por crear el stream, este se pone a escuchar, sino que se evaluará de manera perezosa. Esto es así para poder definir las acciones que queremos aplicar con cada nuevo batch en nuestro stream.

Definiendo filtros

Sin trampa ni cartón: solo una variable privada con un synchronized que permite añadir filtros como Strings a una secuencia. A la hora de crear el stream, como hemos visto en Source se pasan como argumento dichos filtros.

trait Filters {

  private var _filters: Seq[String] = Seq()

  def filters = synchronized(_filters)

  def filter(filters: String*): Unit =
    synchronized(_filters = _filters ++ filters.toSeq)

}

Cómo definir el comportamiento del stream: Actions

El trait Actions se encarga de añadir acciones / handlers/ callbacks a ejecutar cada vez que se reciba un nuevo batch. Estas acciones se guardan en actions. Para añadir una nueva acción (Action no es más que un type alias para una función de batch – RDD[Content] – a Unit), se invoca al método when.

Una vez que hayamos definido todas las acciones a realizar, comenzaremos a recibir contenido en el stream invocando a listen. Este aplica las funciones definidas en actions sobre el stream y luego comienza a escuchar del mismo.

trait Actions {
  _: util.Config =>

  type Content

  type Action = RDD[Content] => Unit

  private var actions: List[Action] = List()

  val ssc: StreamingContext

  val stream: ReceiverInputDStream[Content]

  def listen(): Unit = {
    actions.foreach(f => 
      stream.foreachRDD(rdd => f(rdd)))
    ssc.start()
  }

  def when(action: Action): Unit = {
    actions = actions :+ action
  }

}

…y el resto: Credentials y Analytics

Credentials se encarga de leer del fichero de configuración los parámetros relativos al token de seguridad y escribe dichas propiedades como propiedades en la JVM.

El trait Analytics, extiende de todos los componentes antes definidos, de manera que para usarlo sea tan sencillo como hacer

object Boot extends Analytics

Ejemplo

En primer lugar vamos a modificar el fichero de configuración para que utilice el token que acabamos de generar: sustituimos el comodín secret en src/main/resources/app.conf por los valores reales.

Una vez hecho esto, añadimos los filtros que queramos (para trabajar solo con los tweets que contengan ciertas palabras clave):

  filter(
    "dance",
    "music"
  )

Y posteriormente indicamos con una (o varias) sentencias when la acción a realizar cuando llegue un nuevo grupo de tweets. Por ejemplo, los contamos e imprimimos por pantalla:

when { tweets =>
  logger.info(s"Received tweets [${tweets.count()}}]")
  tweets.foreach {
    tweet => logger.info(s"\n$tweet")
  }
}

Para hacer funcionar nuestro ejemplo bastará con ejecutar (sobre el directorio del proyecto):

sbt run

¡Y los tweets deberían ir apareciendo en tu abultada salida de log!

idwzlyrarjaylstrg8q7

Easy peasy 🙂

Anuncios

One thought on “Spark Streaming + Twitter : Analytics al gusto

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s