Scalera tip: Keep your actor’s state with no VAR at all!

It’s pretty well known that using VARs is, apart from unethical, the evil itself, some kind of hell, it make kitties die and many other stuff you might probably heard before and that could eventually be the cause of a painfull slowly dead.

The essence of functional programming is therefore the immutability: every time I mutate an element, I actually generate a new one.

What about Akka actors?

When we talk about actors, we can define them as stateful computation units that sequentially process a message queue by reacting(or not) to each of these messages.

It’s always been said that, in order to keep state within some actor’s logic, it was ok to use VARs:

It’s impossible that concurrency problems happen: it’s the actor itself and nobody else who access that var and will only process one message at a time.

But maybe, we could renounce to this premise if we look for some way to redefine the actor’s behavior based on a new state.

Mortal approach

If we follow the previously described philosophy, the very first (and more straight forward) approach for keeping some actor’s state would be pretty similar to the following:

class Foo extends Actor{
  var state: Int = 0
  override def receive = {
    case Increase => state += 1
  }
}

Every time an Increase arrives, we modify the state value by adding 1.
So easy so far, right?

Immutable approach

Nevertheless, we could define a receive function parameterized by certain state, so when a message arrives, this parameter is the state to take into account.

If the circumstances to mutate the state took place, we would just invoke the become method that would modify the actor’s behavior. In our case, that behavior mutation would consist on changing the state value.

If we use the previously defined example:

class Foo extends Actor{
  def receive(state: Int): Receive = {
    case Increase =>
      context.become(
        receive(state + 1),
        discardOld = true)
  }
  override def receive = receive(0)
}

we can notice that the function defined by receive is parameterized by some state argument. When some Increase message arrives, what we perform is an invocation to become for modifying the actor’s behavior, passing as an argument the new state to handle.

If we wanted some extra legibility, we could even get abstract from every updatabe-state actor:

trait SActor[State] extends Actor {
  val initialState: State
  def receive(state: State): Receive
  def update(state: State): Receive =
    context.become(
      receive(state),
      discardold = true)
  override def receive =
    receive(initialState)
}

this way, we just have to define the initial state of some actor, a new parameterized receive function and a new update function that takes care of performing the proper become call as explained before.
With all these in mind, we now have some cuter brand new Foo actor:

class Foo extends SActor[Int] {
  val initialState = 0
  def receive(state: Int): Receive = {
    case Increase => update(state +1)
  }
}

Potential hazardous issues

Please, do notice that in the featuring example, we’ve used a second argument for become: discardOld = true. This argument settles whether the new behavior should be stashed on the top of the older one, or ‘au contraire’ it should completely substitute the previous behavior.

Let’s suppose we used discardOld = false. If every single time a new Increase message arrived we had to stash a new behavior, we could obtain a wonderful overflow issue.

See you in the next tip.

Peace out 馃檪

Anuncios

Scalera tip: Mant茅n estado en tu actor sin usar un solo VAR

Es de todos sabido que usar VARs es algo que, aparte de mal visto, est谩 mal, es el infierno en vida, hace que mueran gatitos y muchas otras perlitas que probablemente ya habr茅is o铆do antes y que por poco os ha causado una muerte lenta y dolorosa en el cadalso.

La esencia en programaci贸n funcional es, por tanto, la inmutabilidad: cada vez que muto un elemento, genero uno nuevo.

What about Akka actors?

Cuando hablamos de actores, podemos definirlos como unidades con estado que procesan de manera secuencial una cola de mensajes, asociando (o no) a cada uno de estos mensajes una cierta l贸gica computacional.

Siempre se ha dicho que para mantener dicho estado dentro de la l贸gica de un actor, no pasaba nada si usabas un var:

Es imposible que hayan problemas de concurrencia: solo el propio actor tiene acceso a dicho VAR y procesar谩 un solo mensaje al mismo tiempo.

Pero quiz谩s podamos renunciar a esta premisa si buscamos una manera de redefinir el comportamiento del actor en base a un nuevo estado.

Mortal approach

Siguiendo la filosof铆a antes descrita, la primera (y m谩s directa) aproximaci贸n para mantener el estado en un actor se parecer铆a bastante a lo siguiente:

class Foo extends Actor{
  var state: Int = 0
  override def receive = {
    case Increase => state += 1
  }
}

Cada vez que llega un mensaje Increase, modificamos el valor de state, sumando 1.
Hasta aqu铆 nada complicado, 驴no?

Immutable approach

Sin embargo, podr铆amos definir una funci贸n receive que estuviera parametrizada por un cierto estado, de manera que cuando llegue un mensaje, el estado a tener en cuenta sea este par谩metro.

Si se diera la circunstancia de tener que actualizar el valor de dicho estado, bastar铆a con invocar al m茅todo become que modifica el comportamiento del actor. En nuestro caso, dicha modificaci贸n del comportamiento consistir铆a en cambiar el valor del estado.

Si usamos el mismo ejemplo que antes:

class Foo extends Actor{
  def receive(state: Int): Receive = {
    case Increase =>
      context.become(
        receive(state + 1),
        discardOld = true)
  }
  override def receive = receive(0)
}

vemos que la funci贸n que define el receive en base al estado recibe un par谩metro denominado state. Cuando llega un mensaje de tipo Increase, lo que hacemos es invocar a become para modificar el comportamiento del actor, pasando como argumento el nuevo estado a tener en cuenta.

Si queremos mejorar un poco la legibilidad, podr铆amos incluso abstraer todo actor con estado actualizable:

trait SActor[State] extends Actor {
  val initialState: State
  def receive(state: State): Receive
  def update(state: State): Receive =
    context.become(
      receive(state),
      discardold = true)
  override def receive =
    receive(initialState)
}

de manera que se especifique el estado inicial del actor, una nueva funci贸n de receive que queda parametrizada por el nuevo estado a gestionar, y una nueva funci贸n de update que se encarga de realizar la llamada a become como antes explic谩bamos.
Con todo ello nos queda un nuevo actor Foo mucho m谩s curioso:

class Foo extends SActor[Int] {
  val initialState = 0
  def receive(state: Int): Receive = {
    case Increase => update(state +1)
  }
}

Potential hazardous issues

N贸tese que en el ejemplo de antes hemos pasado un segundo argumento: discardOld = true. Este argumento indica si el comportamiento nuevo debe apilarse sobre el anterior o si por el contrario debe sustituirlo por completo.

Supongamos que us谩ramos un discardOld = false. Si cada vez que llegase un mensaje de incremento, apil谩semos un nuevo comportamiento, podr铆amos llegar a tener un problema de desbordamiento.

Hasta el pr贸ximo briconsejo.

Agur de lim贸n 馃檪

More lazy values, the State monad and other stateful stuff

In the previous post, we talked about lazy evaluation in Scala. At the end of that post, we asked an interesting question: Does a Lazy value hold an state?

24195622

In order to answer that question, we’ll try to define a type that could represent the Lazy values:

trait Lazy[T] {

  val evalF : () => T

  val value: Option[T] = None

}
object Lazy{
  def apply[T](f: => T): Lazy[T] =
    new Lazy[T]{ val evalF = () => f }
}

As you can see, our Lazy type is parameterized by some T type that represents the actual value type(Lazy[Int] would be the representation for a lazy integer).
Besides that, we can see that it’s composed of the two main Lazy type features:

  • evalF : Zero-parameter function that, when its ‘apply’ method is invoked, it evaluates the contained T expression.
  • value : The result value of the interpretation of the evalF function. This concrete part denotes the state in the Lazy type, and it only admit two possible values: None (not evaluated) or Some(t) (if it has been already evaluated and the result itself).

We’ve also added a companion object that defines the Lazy instance constructor that receives a by-name parameter that is returned as result of the evalF function.

e9a2295b3db9b45c8f5484a09033c1c71cf88e3375bb7ff60456bc81c29a4e04

Now the question is, how do we join both the evaluation function and the value that it returns so we can make Lazy an stateful type? We define the ‘eval’ function this way:

trait Lazy[T] { lzy =>

  val evalF : () => T

  val value: Option[T] = None

  def eval: (T, Lazy[T]) = {
    val evaluated = evalF.apply()
    evaluated -> new Lazy[T]{ mutated =>
      val evalF = lzy.evalF
      override val value = Some(evaluated)
      override def eval: (T, Lazy[T]) = 
        evaluated -> mutated
    }
  } 

}

The ‘eval’ function returns a two-element tuple:

  • The value result of evaluating the expression that stands for the lazy value.
  • a new Lazy value version that contains the new state: the T evaluation result.

If you take a closer look, what ‘eval’ method does in first place is to invoke the evalF function so it can retrieved the T value that remained until that point not-evaluated.
Once done, we return it as well as the new Lazy value version. This new version (let’s call it mutated version) will have in its ‘value’ attribute the result of having invoked the evalF function. In the same way, we change its eval method, so in future invocations the Lazy instance itself is returned instead of creating new instances (because it actually won’t change its state, like Scala’s lazy definitions work).

The interesting question that comes next is: is this an isolated case? Could anything else be defined as stateful? Let’s perform an abstraction exercise.

Looking for generics: stateful stuff

Let’s think about a simple stack:

sealed trait Stack[+T]
case object Empty extends Stack[Nothing]
case class NonEmpty[T](head: T, tail: Stack[T]) extends Stack

The implementation is really simple. But let’s focus in the Stack trait and in a hypothetical pop method that pops an element from the stack so it is returned as well as the rest of the stack:

sealed trait Stack[+T]{
  def pop(): (Option[T], Stack[T])
}

Does it sound familiar to you? It is mysteriously similar to

trait Lazy[T]{
  def eval: (T, Lazy[T])
}

isn’t it?

If we try to re-factor for getting a common trait between Lazy and Stack, we could define a much more abstract type called State:

trait State[S,T]聽{
  def apply(s: S): (T, S)
}

Simple but pretty: the State trait is parameterized by two types: S (state type) and T (info or additional element that is returned in the specified state mutation). Though it’s simple, it’s also a ver common pattern when designing Scala systems. There’s always something that holds certain state. And everything that has an state, it mutates. And if something mutates in a fancy and smart way…oh man.

That already exists…

24314442

All this story that seems to be created from a post-modern essay, has already been subject of study for people…that study stuff. Without going into greater detail, in ScalaZ library you can find the State monad that, apart from what was previously pointed, is fully-equipped with composability and everything that being a monad means (semigroup, monoid, …).

If we define our Lazy type with the State monad, we’ll get something similar to:

import scalaz.State

type Lazy[T] = (() => T, Option[T])

def Lazy[T](f: => T) = (() => f, None)

def eval[T] = State[Lazy[T], T]{
  case ((f, None)) => {
    val evaluated = f.apply()
    ((f, Some(evaluated)), evaluated)
  }
  case s@((_, Some(evaluated))) => (s, evaluated) 
}

When decrypting the egyptian hieroglyph, given the State[S,T] monad, we have that our S state will be a tuple composed of what exactly represents a lazy expression (that we also previously described):

type Lazy[T] = (() => T, Option[T])
  • A Function0 that represents the lazy evaluation of T
  • The T value that might have been evaluated or not

For building a Lazy value, we generate a tuple with a function that stands for the expression pointed with the by-name parameter of the Lazy method; and the None value (because the Lazy guy hasn’t been evaluated yet):

def Lazy[T](f: => T) = (() => f, None)

Last, but not least (it’s actually the most important part), we define the only state transition that is possible in this type: the evaluation. This is the key when designing any State type builder: how to model what out S type stands for and the possible state transitions that we might consider.

In the case of the Lazy type, we have two possible situations: the expression hasn’t been evaluated yet (in that case, we’ll evaluate it and we’ll return the same function and the result) or the expression has been already evaluated (in that case we won’t change the state at all and we’ll return the evaluation result):

def eval[T] = State[Lazy[T], T]{
  case ((f, None)) => {
    val evaluated = f.apply()
    ((f, Some(evaluated)), evaluated)
  }
  case s@((_, Some(evaluated))) => (s, evaluated) 
}

iZcUNxH

In order to check that we can still count on the initial features we described for the Lazy type (it can only be evaluated once, only when necessary, …) we check the following assertions:

var sideEffectDetector: Int = 0

val two = Lazy {
  sideEffectDetector += 1
  2
}

require(sideEffectDetector==0)

val (_, (evaluated, evaluated2)) = (for {
  evaluated <- eval[Int]
  evaluated2 <- eval[Int]
} yield (evaluated, evaluated2)).apply(two)

require(sideEffectDetector == 1)
require(evaluated == 2)
require(evaluated2 == 2)

Please, do notice that, as we mentioned before, what is defined inside the for-comprehension are the same transitions or steps that the state we decide will face. That means that we define the mutations that any S state will suffer. Once the recipe is defined, we apply it to the initial state we want.
In this particular case, we define as initial state a lazy integer that will hold the 2 value. For checking the amount of times that our Lazy guy is evaluated, we just add a very dummy var that will be used as a counter. After that, we define inside our recipe that the state must mutate twice by ussing the eval operation. Afterwards we’ll check that the expression of the Lazy block has only been evaluated once and that the returning value is the expected one.

I wish you the best tea for digesting all this crazy story 馃檪
Please, feel free to add comments/menaces at the end of this post or even at our gitter channel.

See you on next post.
Peace out!

M谩s lazy’s, la m贸nada State y otras cosas con estado

En el anterior post habl谩bamos sobre la evaluaci贸n perezosa en Scala. Al final de dicho post, plante谩bamos una pregunta: 驴Un Lazy tiene estado?

24195622

Para responder a dicha pregunta, vamos a intentar definir un tipo que represente un valor Lazy como sigue:

trait Lazy[T] {

  val evalF : () => T

  val value: Option[T] = None

}
object Lazy{
  def apply[T](f: => T): Lazy[T] =
    new Lazy[T]{ val evalF = () => f }
}

Como se puede observar, nuestro tipo Lazy est谩 parametrizado por un tipo T que representa el tipo del valor en cuesti贸n(Lazy[Int] ser铆a la representaci贸n de un entero perezoso).
Adem谩s, podemos ver que se compone de dos elementos principales que caracterizan a un Lazy:

  • evalF : Funci贸n de cero argumentos que, al invocar su m茅todo apply, eval煤a la expresi贸n de T contenida.
  • value : El valor resultante de la interpretaci贸n de la funci贸n evalF. Esta parte es la que denota el estado en el tipo Lazy, y solo admite dos posibles valores: None (no evaluado) o Some(t) (si ya ha sido evaluado y el resultado obtenido).

Tambi茅n hemos a帽adido un objeto companion que define el constructor de instancias Lazy que recibe un argumento by-name que se devuelve como resultado de la funci贸n evalF.

e9a2295b3db9b45c8f5484a09033c1c71cf88e3375bb7ff60456bc81c29a4e04

La cuesti贸n ahora es: 驴C贸mo unimos la funci贸n de evaluaci贸n con el valor que devuelve para hacer que Lazy mantenga un estado? Definiendo la funci贸n eval:

trait Lazy[T] { lzy =>

  val evalF : () => T

  val value: Option[T] = None

  def eval: (T, Lazy[T]) = {
    val evaluated = evalF.apply()
    evaluated -> new Lazy[T]{ mutated =>
      val evalF = lzy.evalF
      override val value = Some(evaluated)
      override def eval: (T, Lazy[T]) = 
        evaluated -> mutated
    }
  } 

}

La funci贸n eval devuelve una tupla de dos elementos:

  • el valor resultante de la evaluaci贸n de la expresi贸n que representa el valor perezoso.
  • una nueva versi贸n del valor Lazy que contiene el nuevo estado: el resultado de la evaluaci贸n.

Si os fij谩is, lo que hace el m茅todo en primer lugar, es invocar a la funci贸n evalF para obtener el valor de tipo T que a煤n estaba sin evaluar.
Una vez hecho esto, lo devolvemos as铆 como la nueva versi贸n del elemento Lazy. Esta nueva versi贸n (llam茅mosla mutated) tendr谩 en su atributo value el resultado de haber invocado a evalF. Del mismo modo, modificamos su m茅todo eval, para que en sucesivas invocaciones se devuelva a s铆 mismo y no genere nueva instancias que en realidad no var铆an su estado.

La cuesti贸n interesante viene ahora: 驴es este un caso 煤nico? 驴Existen m谩s ‘cosas’ que mantienen un estado? Hagamos un ejercicio de abstracci贸n.

Buscando la genericidad: cosas-con-estado

Pensemos en el caso de una pila:

sealed trait Stack[+T]
case object Empty extends Stack[Nothing]
case class NonEmpty[T](head: T, tail: Stack[T]) extends Stack

La implementaci贸n sale casi sola. Pero centr茅monos en el trait Stack y en un hipot茅tico m茅todo pop que desapila un elemento que se devuelve junto al resto de la pila:

sealed trait Stack[+T]{
  def pop(): (Option[T], Stack[T])
}

驴Os suena de algo? 驴No se parece misteriosamente a

trait Lazy[T]{
  def eval: (T, Lazy[T])
}

…?

Si intentamos sacar factor com煤n entre Lazy y Stack podr铆amos definir un tipo mucho m谩s abstracto llamado State:

trait State[S,T]聽{
  def apply(s: S): (T, S)
}

Simple pero bello: el trait State est谩 parametrizado por dos tipos: S (tipo de estado) y T (informaci贸n o elemento adicional que devuelve cada vez que mutamos el estado). Aqu铆 donde lo veis, se trata de un patr贸n muy recurrente al dise帽ar sistemas en Scala. Siempre hay algo que mantiene un estado. Y todo lo que tiene estado muta. Y si ese algo muta de manera segura y elegante…oh man.

Esto ya existe …

21495586

Toda esta historia que parece sacada de un ensayo post-moderno, resulta que ya ha sido objeto de estudio de personas que estudian cosas. Sin entrar en mucho detalle, en la librer铆a ScalaZ pod茅is encontrar la m贸nada State que, adem谩s de lo descrito anteriormente, trae de serie un full-equipped de componibilidad y todo lo que conlleva ser M贸nada (semigrupo, monoide, etc).

Si definimos nuestro tipo Lazy con la m贸nada State tenemos algo como:

import scalaz.State

type Lazy[T] = (() => T, Option[T])

def Lazy[T](f: => T) = (() => f, None)

def eval[T] = State[Lazy[T], T]{
  case ((f, None)) => {
    val evaluated = f.apply()
    ((f, Some(evaluated)), evaluated)
  }
  case s@((_, Some(evaluated))) => (s, evaluated) 
}

Al descomponer el jerogl铆fico egipcio arriba expuesto, dada la m贸nada State[S,T], nuestro estado S va a ser una tupla de lo que representa en el fondo a una evaluaci贸n perezosa:

type Lazy[T] = (() => T, Option[T])

y que m谩s arriba hemos descrito:

  • Una Function0 que representa la evaluaci贸n demorada de T
  • El valor T que puede haberse evaluado o no

Para construir un valor Lazy, generamos una tupla con una funci贸n que recoge la expresi贸n indicada por un argumento by-name del m茅todo Lazy y el valor None (porque a煤n no ha sido evaluado el Lazy):

def Lazy[T](f: => T) = (() => f, None)

Por 煤ltimo (y esta es la parte importante) definimos la 煤nica transici贸n posible de estado que podemos concebir cuando hablamos de valores perezosos: la evaluaci贸n. Esta es la clave cuando dise帽amos cualquier constructor de tipos que extiende de State: lo importante es modelar qu茅 es nuestro tipo S y las transiciones de estado posibles.

Para el tipo Lazy, tenemos dos posibles casos: que la expresi贸n a煤n no haya sido evaluada (en cuyo caso la evaluamos y devolvemos la misma funci贸n y el resultado) 贸 que la expresi贸n ya haya sido evaluada (en cuyo caso dejamos el estado como est谩 y devolvemos adem谩s el resultado de la evaluaci贸n):

def eval[T] = State[Lazy[T], T]{
  case ((f, None)) => {
    val evaluated = f.apply()
    ((f, Some(evaluated)), evaluated)
  }
  case s@((_, Some(evaluated))) => (s, evaluated) 
}

iZcUNxH

Para comprobar que seguimos contando con las mismas caracter铆sticas iniciales para las que definimos el tipo Lazy (solo se eval煤a una vez, solo se eval煤a cuando es necesario, …) lanzamos las siguiente aserciones:

var sideEffectDetector: Int = 0

val two = Lazy {
  sideEffectDetector += 1
  2
}

require(sideEffectDetector==0)

val (_, (evaluated, evaluated2)) = (for {
  evaluated <- eval[Int]
  evaluated2 <- eval[Int]
} yield (evaluated, evaluated2)).apply(two)

require(sideEffectDetector == 1)
require(evaluated == 2)
require(evaluated2 == 2)

Si os fij谩is, como antes coment谩bamos, lo que se define en la for-comprehension son las transiciones o pasos que va a enfrentar el estado que nosotros queramos. Es decir, definimos las mutaciones que sufrir谩 un estado S cualquiera. Una vez definida la ‘receta’, la aplicamos al estado inicial que nosotros queramos.
En este caso, definimos como estado inicial un perezoso n煤mero entero dos. Para comprobar el n煤mero de veces que se eval煤a nuestro Lazy, a帽adimos un var muy dummy que funcionar谩 a modo de contador. Luego definimos en nuestra ‘receta’ que el estado debe mutar dos veces mediante la operaci贸n eval. Posteriormente comprobamos que solo se ha ejecutado una vez la expresi贸n del bloque Lazy y que el valor resultante de la expresi贸n es el esperado.

Os deseo la mejor de las sales de frutas para digerir todo esto 馃檪
Sent铆os libres de a帽adir comentarios/amenazas en el post o en nuestro canal de gitter.

Hasta el pr贸ximo post.
隆Agur de lim贸n!

Lazy values

Just in case you lived in a hole for the last ten years and you didn’t know: Scala allows managing lazy values.

image

In Scala, we can define a value that won’t be evaluated until it is explicitly invoked. For example:

lazy val myLazyInt: Int = { println("hi"); 2 }

As you can see, using lazy notation, we’ve defined lazily an integer that stands for the literal 2 and also prints a ‘hi’ when it’s evaluated.
Apart from violating the biggest functional programming law (referential transparency) due to the insidious println, side effects, dead, destruction, blah blah …

anigif_enhanced-1822-1407333641-6

notice that, if we execute the code block, the previously mentioned ‘println’ is not executed. The block is not evaluated until any other expression makes use of our lazy integer value:

val result = myLazyInt + 3
//woa! somebody printed 'hi' and I have a brand new 5 inside 'result'

Once myLazyInt is evaluated, its value won’t be calculated again, no matter how many times it’s invoked. Therefore, the mysterious impression won’t salute us anymore:

lazy val myLazyInt: Int = { println("hi"); 2 }
myLazyInt
//"hi"
myLazyInt //nothing special happened now ...
myLazyInt //no matter how many times you invoke it...
myLazyInt //seriously, let it go...

Curious. The question that could come up is, if I define a lazy value and I pass it as a method parameter, what happens? Is it evaluated at the very same moment that the method is invoked? Maybe inside the method? That’ll depend on the way you define your method’s parameters.

Call by name vs. call by value

When defining a method, people usually define its parameter ‘by-value’, that means, that we expect the parameter to be already evaluated when it is passed to the method:

def myMethod(someInteger: Int): Int = {
  println("begin")
  val result = someInteger + 2
  println("end")
  result
}

If we invoke our method with any integer:

val n = 3
val result = myMethod(n)
//"begin"
//"end"
require(result == 5)

We just print both traces and it’s not big deal. Nothing new so far.
What happens if we now pass to the method our lazy value? In which exact moment will it print the salutation? Before or after the method traces?
Let’s try:

myMethod(myLazyInt)
//"hi"
//"begin"
//"end"

It printed it out before the method traces, which means that our lazy value was evaluated just before the method was invoked. Why does this happen? Because the way that Scala usually works needs the exact value of someInteger in order to be able to execute myMethod
It’s a pity if we want to keep myLazyInt lazy until the very last moment. How do we fix that? We’ll pass the argument ‘by-name’, that is, indicating the way the value has to be resolved instead of explicitly passing the value:

def myMethod(someInteger: => Int): Int = {
  println("begin")
  val result = someInteger + 2
  println("end")
  result
}

This way (someInteger: => Int) we indicate that our method requires as parameter an expression that, in the end, returns an integer and not an integer itself. If we now execute the method passing our non-yet evaluated lazy value:

myMethod(myLazyInt)
//"begin"
//"hi"
//"end"

Voil脿! We made it. The ‘hi’ trace is not printed until the exact value of our lazy guy is required inside the method.

Some other ways to express laziness

Another way to express a lazy evaluation, which could be extremely useful, is the Function0 type:

trait Function0[+R]{
  def apply(): R
}

It’s just a function that requires zero parameters and return an only output type. It’s expressed as follows:

val f: () => Int =
  () => 2
f.apply() //2

And that’s pretty much everything…Once understood in rough outlines how laziness works in Scala, let’s move on to more interesting questions. A Lazy value, does it represent something stateful?
The answer (or more extra questions) will be available in the following post.

Peace out!

Valores perezosos

Por si hubieras estado en un agujero durante los 煤ltimos 10 a帽os y no lo supieras, Scala permite gestionar valores de evaluaci贸n perezosa.

image

En Scala, podemos definir un valor que no ser谩 evaluado hasta que se le llame de manera expl铆cita. Por ejemplo:

lazy val myLazyInt: Int = { println("hi"); 2 }

Como pod茅is ver, usando la notaci贸n lazy hemos definido de manera perezosa un entero que vale 2 y que imprime un ‘hola’ cuando se eval煤a.
Aparte de haber violado la gran ley de la programaci贸n funcional (transparencia referencial) debido al infame println, side effects, muerte, destrucci贸n, blah blah …

anigif_enhanced-1822-1407333641-6

fijaros que si ejecutamos el fragmento de c贸digo, dicho println no se ejecuta.
No es sino hasta que otra expresi贸n hace uso de nuestro entero perezoso, que no se ejecuta el bloque:

val result = myLazyInt + 3
//woa! somebody printed 'hi' and I have a brand new 5 inside 'result'

Una vez calculado myLazyInt, su valor no volver谩 a calcularse independientemente de cuantas veces se invoque. Es decir, ya no volver谩 a aparecer una misteriosa impresi贸n que nos saluda:

lazy val myLazyInt: Int = { println("hi"); 2 }
myLazyInt
//"hi"
myLazyInt //nothing special happened now ...
myLazyInt //no matter how many times you invoke it...
myLazyInt //seriously, let it go...

Curioso. La cuesti贸n es, si yo defino un valor perezoso y lo paso a un m茅todo como argumento, 驴qu茅 ocurre? 驴Se eval煤a en el momento en que se invoca la funci贸n?驴Quiz谩s dentro del cuerpo de la funci贸n? Eso depender谩 de c贸mo definas los argumentos de tu m茅todo.

Call by name vs. call by value

Al definir un m茅todo, por lo general, definimos sus argumentos ‘by-value’, es decir, esperamos que el argumento ya se encuentre evaluado al pasarse al m茅todo:

def myMethod(someInteger: Int): Int = {
  println("begin")
  val result = someInteger + 2
  println("end")
  result
}

Si invocamos nuestro m茅todo con un n煤mero entero cualquiera:

val n = 3
val result = myMethod(n)
//"begin"
//"end"
require(result == 5)

Imprimimos nuestras dos trazas y ya est谩. Hasta aqu铆 nada nuevo.
驴Qu茅 ocurre ahora si le pasamos nuestro valor perezoso?驴En qu茅 momento imprimir谩 “hi”?驴Antes o despu茅s de las trazas del m茅todo?
Probemos:

myMethod(myLazyInt)
//"hi"
//"begin"
//"end"

Lo imprimi贸 antes, es decir, nuestro valor perezoso se evalu贸 antes de invocarse el m茅todo. 驴Esto por qu茅 ocurre? Porque Scala, para poder ejecutar myMethod, necesita saber el valor de someInteger.
Es un fastidio si queremos mantener la evaluaci贸n de myLazyInt perezosa hasta el final. 驴C贸mo lo solucionamos? Pasando el argumento ‘by-name’, es decir, indicando c贸mo se resolver谩 en el futuro el valor, pero sin pasar el valor de manera expl铆cita:

def myMethod(someInteger: => Int): Int = {
  println("begin")
  val result = someInteger + 2
  println("end")
  result
}

De esta forma (someInteger: => Int) indicamos que le vamos a nuestro m茅todo como argumento una expresi贸n que devolver谩 un entero (que no un entero). Si ahora ejecutamos el m茅todo pas谩ndole nuestro valor perezoso no-evaluado:

myMethod(myLazyInt)
//"begin"
//"hi"
//"end"

Voil脿! No es hasta el 煤ltimo momento en que se requiere el valor dentro del m茅todo, que no se eval煤a nuestro entero perezoso.

Otras formas de expresar laziness

Otra forma que nos puede resultar muy 煤til para denotar que una expresi贸n se eval煤a de manera perezosa, es el tipo Function0:

trait Function0[+R]{
  def apply(): R
}

Se trata de una funci贸n que recibe 0 argumentos y devuelve un tipo de salida. Normalmente se suele notar como sigue:

val f: () => Int =
  () => 2
f.apply() //2

No hay mucho m谩s misterio…Una vez comprendido a grandes rasgos el funcionamiento de la evaluaci贸n perezosa en Scala, pasemos a cuestiones m谩s interesantes…驴Un Lazy es algo con estado?
La respuesta (o m谩s preguntas) en el pr贸ximo post.

隆Agur de lim贸n!

Spark Streaming: stateful streams (Android vs IOS?)

If you remember the post where we talk about how to connect Spark Streaming and Twitter, we said that the limit for performing analytics was up to your imagination.

In this post we’re going to propose some example that works for illustrating the idea of keeping a state based on the defined stream feed. Speaking plainly, we’ll see how popular is some Twitter topic compared to another one: Android vs IOS.

appleandroid

Stateful

The main idea is to have some S entity which state is fed by V-typed elements that are received and processed for each batch in the stream.

stateful stream

The way that the state is updated for a concrete instant Tn is by applying a user defined function that takes as parameters both the state that was at Tn-1 instant and the value set that provides the batch received at Tn instant:

updateState function (2)

A more casual notation for a common ‘scalero’ would be

type StateUpdate[S] = 
  (Option[S],Seq[V]) => Option[S]

Why Option[S]? For a good main reason: initially, when we first start listening on the stram, we don’t have any S state, so a (S,Seq[V]) => S function wouldn’t make sense.

And in practice?

Spark’s API for pair DStreams (PairDStreamFunctions) provides the following method for doing so:

def updateStateByKey[S]
  (updateFunc: (Seq[V], Option[S]) 鈬 Option[S])
  (implicit arg0: ClassTag[S]): DStream[(K, S)]

So for a DStream which is able to classify the elements by using a function that provides a key (we’ll see an example later on), we’ll be able to apply this method and get an S state (most of the cases, this state will refer to some aggregation over the values) for each key.

The method is overloaded, so you can specify a different partition apart from HashPartitioner, indicate a custom partition number or set an initial state (RDD[(K,S)]. Remember that, otherwise, initially our state for all K keys will be None).

The example

Let’s suppose we want to measure how strong is the rivalry between Android and IOS and we want to know which is the top trending topic at Twitter (in this example we won’t distinguish between against and in favour mentions).

Using the same project we proposed for the previous Spark + Twitter example, we’ll change the Boot.scala file so it looks like more to the following gist contents.

At first place, we have to enable the checkpointing dir and tweet filters that we are interested in:

  //  Set checkpoint dir

  ssc.checkpoint("/tmp")

  //  Add filters ...

  val Android = "android"
  val IOS = "ios"

  filter(
    Android,
    IOS
  )

We’ll next group our tweets based on whether these tweets contain the ‘Android’ or ‘IOS’ filter (if the tweet contains both, it will be counted in both sides). The result we get is another DStream but that contains key-value pairs (filter-tweet):

val groupedTweets = stream.flatMap{content =>
  List(Android, IOS).flatMap(key =>
    if (content.getText.contains(key)) 
      Option(key -> content)
    else None)
  }

Once we have grouped the tweets, we create a new DStream from the previous one by using the function we defined at the beginning of this post updateStateByKey, so the S state that we were talking about would be the sum of tweets for each key word:

val aggregatedTweets: DStream[(String,Long)] =
  groupedTweets.updateStateByKey{
    (newTweets, previousState) =>
      val newTweetsAmount = newTweets.size.toLong
      previousState.fold(
        Some(newTweetsAmount))(previousSize =>
        Some(previousSize + newTweetsAmount))
  }

The only ‘tricky’ part to understand could be the fold code part, but it’s simple. It actually indicates that, in case of having a previous amount (previous state) we just add the new amount to it. Otherwise, we use the new amount.

Apart from this, we make our snippet work by printing these figures and we start listening at the stream:

//  And add actions to perform (like printing the aggregatedTweets) ...

aggregatedTweets.foreachRDD{ results =>
  results.foreach{
    case (team, amount) => 
      logger.info(s">>> $team : $amount")
  }
}

// ... and begin listening

listen()

Can you think about another way to make the stream more interesting? Playing with tweets geo-location on a heat map, for example? 馃槈

Easy peasy!