Spark Streaming: streams con estado (¿Android vs IOS?)

Si os acordais del post en el que hablabamos sobre como conectar Spark Streaming con Twitter, dijimos que el límite para hacer analíticas lo ponía vuestra imaginación.

En este post vamos a proponer algún ejemplo que nos sirva a su vez para ilustrar la idea de mantener un estado en base a la alimentación del stream que definimos. En cristiano, vamos a ver cómo de popular es un topic de twitter frente a otro: Android vs IOS.

appleandroid

Stateful

La idea básica es tener una entidad S cuyo estado se alimenta en base a los batches con elementos de tipo V que procesa el stream.

stateful stream

La manera en que dicho estado se actualiza para un instante discreto Tn es aplicando una función definida por el usuario que toma como argumentos el estado que había en el instante Tn-1 y el conjunto de valores que trae el batch recibido por el stream en Tn:

updateState function (2)

Que en una notación más familiar para el scalero de a pie sería

type StateUpdate[S] = 
  (Option[S],Seq[V]) => Option[S]

¿Por qué Option[S]? Por una razón principal: inicialmente, cuando arrancamos el stream, no disponemos de ningún estado S, de manera que la función no podía ser (S,Seq[V]) => S.

¿Y todo esto en la práctica?

La API de Spark para funciones de DStreams sobre pares (PairDStreamFunctions) provee el siguiente método para ello:

def updateStateByKey[S]
  (updateFunc: (Seq[V], Option[S]) ⇒ Option[S])
  (implicit arg0: ClassTag[S]): DStream[(K, S)]

De manera que para un DStream en el que podamos discriminar los elementos por una función que nos provea una clave (veremos un ejemplo más adelante), podremos aplicar este método, obteniendo un estado S (en la mayoría de los casos se podrá corresponder con una agregación) por cada clave.

El método se encuentra sobrecargado, de manera que se puede especificar un particionador disinto del HashPartitioner, indicar un número de particiones particular o bien pasar un estado inicial RDD[(K,S)] (recordad que, de otra manera, inicialmente nuestro estado para todas las claves K tiene que ser None).

El ejemplo

Supongamos que tenemos intención de medir la rivalidad entre Android e IOS y queremos saber de quién se está hablando más en Twitter (en este ejemplo no distinguiremos entre si las menciones son positivas o negativas).

Utilizando el mismo proyecto que propusimos para el anterior ejemplo de Spark + Twitter, modificaremos el fichero Boot.scala para que tenga el contenido del siguiente gist.

En primer lugar, tenemos que habilitar el directorio de checkpointing y los filtros de los tweets que nos interesan:

  //  Set checkpoint dir

  ssc.checkpoint("/tmp")

  //  Add filters ...

  val Android = "android"
  val IOS = "ios"

  filter(
    Android,
    IOS
  )

A continuación agruparemos nuestros tweets en base a si estos contienen el filtro Android o IOS (en caso de que el tweet contenga ambos, se contabilizará en ambas claves). El resultado obtenido es otro DStream pero que contiene pares de clave-valor (filtro-tweet):

val groupedTweets = stream.flatMap{content =>
  List(Android, IOS).flatMap(key =>
    if (content.getText.contains(key)) 
      Option(key -> content)
    else None)
  }

Una vez agrupados los tweets, creamos un nuevo DStream a partir de este, usando la función que definimos al principio del post updateStateByKey, de manera que el estado S del que hablábamos sería la suma de tweets para cada palabra clave:

val aggregatedTweets: DStream[(String,Long)] =
  groupedTweets.updateStateByKey{
    (newTweets, previousState) =>
      val newTweetsAmount = newTweets.size.toLong
      previousState.fold(
        Some(newTweetsAmount))(previousSize =>
        Some(previousSize + newTweetsAmount))
  }

La única parte ‘complicada’ de entender del fragmento de código podría ser el fold, pero es sencillo ya que en realidad indica que, en caso de tener una cantidad anterior (estado previo) sumamos la nueva cantidad a la anterior. En caso contrario, dejamos la nueva.

Aparte de esto, terminamos de hacer funcionar el snippet imprimiendo estas cantidades por pantalla y comenzando la escucha del stream:

//  And add actions to perform (like printing the aggregatedTweets) ...

aggregatedTweets.foreachRDD{ results =>
  results.foreach{
    case (team, amount) => 
      logger.info(s">>> $team : $amount")
  }
}

// ... and begin listening

listen()

¿Se te ocurre otra forma de hacer el stream más interesante? ¿jugar con la geolocalización de los tweets en un mapa de calor, por ejemplo? 😉

¡Agur de limón!

Anuncios

One thought on “Spark Streaming: streams con estado (¿Android vs IOS?)

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s