Scalera tip: Keep your actor’s state with no VAR at all!

It’s pretty well known that using VARs is, apart from unethical, the evil itself, some kind of hell, it make kitties die and many other stuff you might probably heard before and that could eventually be the cause of a painfull slowly dead.

The essence of functional programming is therefore the immutability: every time I mutate an element, I actually generate a new one.

What about Akka actors?

When we talk about actors, we can define them as stateful computation units that sequentially process a message queue by reacting(or not) to each of these messages.

It’s always been said that, in order to keep state within some actor’s logic, it was ok to use VARs:

It’s impossible that concurrency problems happen: it’s the actor itself and nobody else who access that var and will only process one message at a time.

But maybe, we could renounce to this premise if we look for some way to redefine the actor’s behavior based on a new state.

Mortal approach

If we follow the previously described philosophy, the very first (and more straight forward) approach for keeping some actor’s state would be pretty similar to the following:

class Foo extends Actor{
  var state: Int = 0
  override def receive = {
    case Increase => state += 1
  }
}

Every time an Increase arrives, we modify the state value by adding 1.
So easy so far, right?

Immutable approach

Nevertheless, we could define a receive function parameterized by certain state, so when a message arrives, this parameter is the state to take into account.

If the circumstances to mutate the state took place, we would just invoke the become method that would modify the actor’s behavior. In our case, that behavior mutation would consist on changing the state value.

If we use the previously defined example:

class Foo extends Actor{
  def receive(state: Int): Receive = {
    case Increase =>
      context.become(
        receive(state + 1),
        discardOld = true)
  }
  override def receive = receive(0)
}

we can notice that the function defined by receive is parameterized by some state argument. When some Increase message arrives, what we perform is an invocation to become for modifying the actor’s behavior, passing as an argument the new state to handle.

If we wanted some extra legibility, we could even get abstract from every updatabe-state actor:

trait SActor[State] extends Actor {
  val initialState: State
  def receive(state: State): Receive
  def update(state: State): Receive =
    context.become(
      receive(state),
      discardold = true)
  override def receive =
    receive(initialState)
}

this way, we just have to define the initial state of some actor, a new parameterized receive function and a new update function that takes care of performing the proper become call as explained before.
With all these in mind, we now have some cuter brand new Foo actor:

class Foo extends SActor[Int] {
  val initialState = 0
  def receive(state: Int): Receive = {
    case Increase => update(state +1)
  }
}

Potential hazardous issues

Please, do notice that in the featuring example, we’ve used a second argument for become: discardOld = true. This argument settles whether the new behavior should be stashed on the top of the older one, or ‘au contraire’ it should completely substitute the previous behavior.

Let’s suppose we used discardOld = false. If every single time a new Increase message arrived we had to stash a new behavior, we could obtain a wonderful overflow issue.

See you in the next tip.

Peace out 🙂

Scalera tip: Mantén estado en tu actor sin usar un solo VAR

Es de todos sabido que usar VARs es algo que, aparte de mal visto, está mal, es el infierno en vida, hace que mueran gatitos y muchas otras perlitas que probablemente ya habréis oído antes y que por poco os ha causado una muerte lenta y dolorosa en el cadalso.

La esencia en programación funcional es, por tanto, la inmutabilidad: cada vez que muto un elemento, genero uno nuevo.

What about Akka actors?

Cuando hablamos de actores, podemos definirlos como unidades con estado que procesan de manera secuencial una cola de mensajes, asociando (o no) a cada uno de estos mensajes una cierta lógica computacional.

Siempre se ha dicho que para mantener dicho estado dentro de la lógica de un actor, no pasaba nada si usabas un var:

Es imposible que hayan problemas de concurrencia: solo el propio actor tiene acceso a dicho VAR y procesará un solo mensaje al mismo tiempo.

Pero quizás podamos renunciar a esta premisa si buscamos una manera de redefinir el comportamiento del actor en base a un nuevo estado.

Mortal approach

Siguiendo la filosofía antes descrita, la primera (y más directa) aproximación para mantener el estado en un actor se parecería bastante a lo siguiente:

class Foo extends Actor{
  var state: Int = 0
  override def receive = {
    case Increase => state += 1
  }
}

Cada vez que llega un mensaje Increase, modificamos el valor de state, sumando 1.
Hasta aquí nada complicado, ¿no?

Immutable approach

Sin embargo, podríamos definir una función receive que estuviera parametrizada por un cierto estado, de manera que cuando llegue un mensaje, el estado a tener en cuenta sea este parámetro.

Si se diera la circunstancia de tener que actualizar el valor de dicho estado, bastaría con invocar al método become que modifica el comportamiento del actor. En nuestro caso, dicha modificación del comportamiento consistiría en cambiar el valor del estado.

Si usamos el mismo ejemplo que antes:

class Foo extends Actor{
  def receive(state: Int): Receive = {
    case Increase =>
      context.become(
        receive(state + 1),
        discardOld = true)
  }
  override def receive = receive(0)
}

vemos que la función que define el receive en base al estado recibe un parámetro denominado state. Cuando llega un mensaje de tipo Increase, lo que hacemos es invocar a become para modificar el comportamiento del actor, pasando como argumento el nuevo estado a tener en cuenta.

Si queremos mejorar un poco la legibilidad, podríamos incluso abstraer todo actor con estado actualizable:

trait SActor[State] extends Actor {
  val initialState: State
  def receive(state: State): Receive
  def update(state: State): Receive =
    context.become(
      receive(state),
      discardold = true)
  override def receive =
    receive(initialState)
}

de manera que se especifique el estado inicial del actor, una nueva función de receive que queda parametrizada por el nuevo estado a gestionar, y una nueva función de update que se encarga de realizar la llamada a become como antes explicábamos.
Con todo ello nos queda un nuevo actor Foo mucho más curioso:

class Foo extends SActor[Int] {
  val initialState = 0
  def receive(state: Int): Receive = {
    case Increase => update(state +1)
  }
}

Potential hazardous issues

Nótese que en el ejemplo de antes hemos pasado un segundo argumento: discardOld = true. Este argumento indica si el comportamiento nuevo debe apilarse sobre el anterior o si por el contrario debe sustituirlo por completo.

Supongamos que usáramos un discardOld = false. Si cada vez que llegase un mensaje de incremento, apilásemos un nuevo comportamiento, podríamos llegar a tener un problema de desbordamiento.

Hasta el próximo briconsejo.

Agur de limón 🙂

Scalera tips: How NOT to change your actor’s state

A common key when working with Akka, is modifying properly our actor’s state. If we jog our memory, this framework paradigm allows us to program modeling the concurrency based on actors and message passing. From that base, we could define an actor as a computing unit that may have state and perform tasks based on messages that it will receive through its mailbox and that will be processed sequentially .

That means that, in order to avoid side effects, the actor’s state modification has to take place when processing a message. So senseful so far. However, it’s a pretty common mistake to do something similar to this:

class MyActor extends Actor {

  var state: Int = 0

  def receive = {

    case "command" => 
      Future(state = 1)

    case "someOtherCommand" => 
      state = 2

  }

}

In this case, we have no more warranty that the state change (whose only responsible of keeping it consistent and thread safe is the actor) might cause side efects given that, in the precise moment where the Future modifies the var, it’s possible that the state is being modified by the actor itself (probably as a reaction to some other received message).

This Future[Unit] might not be a block like that. It could be the result of having asked to some other actor:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState").map{
        case newState: State => state = newState
      }

    case "someOtherCommand" => 
      state = 2
  }

}

Something that probably none of us has ever tried.

giphy

The proper way

If we want to modify the actor’s state as result of having previously asked to some other actor and without breaking the concurrency control of the actor, it could be achieved like this:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState") pipeTo self

    case "someOtherCommand" => 
      state = 2

    case newState: State => 
      state = newState
  }

}

With pipeTo we specify to send to certain actor the result of having evaluated some future when its resolved. This way we’re indicating that, when we get the response of the other actor, it will be sent to our mailbox, so it will be processed like a normal message, sequentially.

bill_murray_gif_1

Easy peasy 🙂

Scalera tips: Como NO modificar el estado de tu actor

Una cuestión habitual a la hora de trabajar con Akka, es modificar de manera correcta el estado de nuestro actor. Si recordamos la base del paradigma de este framework que nos permite programar modelando la concurrencia en base a actores y el paso de mensajes, es que un actor puede definirse como una unidad computacional que puede tener estado y realiza tareas en base a mensajes que recibe en su mailbox y que procesará de manera secuencial.

Esto significa que, para no tener efectos de lado, es necesario que la modificación del estado del actor se haga al procesar un mensaje. Hasta aquí todo tiene sentido. No obstante, es un fallo bastante común el hacer algo del siguiente estilo:

class MyActor extends Actor {

  var state: Int = 0

  def receive = {

    case "command" => 
      Future(state = 1)

    case "someOtherCommand" => 
      state = 2

  }

}

En ese caso, ya no tenemos garantía de que el cambio de estado (cuyo único responsable de mantenerlo consistente y thread-safe es el actor) puede generar efectos de lado dado que en el momento en que futuro modifica el var es posible que el estado esté siendo modificado por el propio actor, desencadenado por el procesamiento de otro mensaje.

Este Future[Unit] puede no ser un bloque como tal, sino el resultado de haber preguntado a otro actor:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState").map{
        case newState: State => state = newState
      }

    case "someOtherCommand" => 
      state = 2
  }

}

Algo que probablemente nadie de nosotros haya intentado jamás.

giphy

La forma correcta

En caso de querer modificar el estado del actor como resultado de dicha consulta a otro actor sin romper el control de concurrencia sobre el estado, se podría hacer como sigue:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState") pipeTo self

    case "someOtherCommand" => 
      state = 2

    case newState: State => 
      state = newState
  }

}

Con pipeTo lo que hacemos es mandar a cierto actor el resultado de evaluar un futuro cuando este se resuelva. De esta manera estamos indicando que, cuando tengamos la respuesta del otro actor, se envie a nuestro mailbox, de manera que se procesará como otro mensaje más, de manera secuencial.

bill_murray_gif_1

Easy peasy 🙂

Graffiti Rules: Hello World in Spray

Today we are going to get started with a tool to develop REST API: Spray. In order to create those APIs, Spray defines a DSL (Domain Specific Language) to define the routes and responses to the HTTP requests we receive. There are other options to create this type of APIs such as scalatra, Play, Lift… However, Spray serves us well as we will later speak about its successor: Akka-HTTP. But not so fast, let’s go bit by bit.

computers20graffiti20error20technology20explorer201440x90020wallpaper_www-wallpaperfo-com_98

The aim of this post is to make an introductory incursion. We can count ourselves lucky if we come to implement a Hello World with Spray by the end of it. To do that, it is required to know which are the modules with which the framework is built. These are some of main ones:
  • spray-can: HTTP server that will allow us to run our API.
  • spray-routing: DSL to describe the routes used to define the desired behaviour. Unlike other frameworks (such as Play), the routes are defined by code, instead of using a text file or a configuration file.
  • spray-http: module in charge of modelling and managing the HTTP protocol.
  • spray-json: module used for the serialization and deserialization of messages in JSON format. It can be replaced with other JSON serialization modules such as play-json, jackson…
  • spray-testkit: small DSL to test the endpoints in a really simple way.

This first example, due to its simplicity, won’t need all of them. We’ll be using the rest of them further on. So, the first thing we need to do is to build our SBT file with the needed dependencies. In our case, we will only be using spray-can,spray-routing and akka. Thus, we add those dependencies to the build.sbt file:

val akkaV = "2.3.9"
val sprayV = "1.3.3"

libraryDependencies ++= Seq(
    "io.spray" %% "spray-can" % sprayV,
    "io.spray" %% "spray-routing" % sprayV,
    "com.typesafe.akka" %% "akka-actor" % akkaV
  )

Once the configuration is ready, we shall start writing the Scala code. It is important to know that Spray is built on top of Akka. As we’ll see, much of its functionality is based on its use of actors.

First, we define our route. In such a route we will simply mark that we a GET is received, a Hello World will be sent as response:

import spray.routing._
trait ApiRoute extends HttpService {

 val myApiRoute =
   get {
     complete("Hello World!")
   }
}

As can be appreciated, a mixin with HttpService is required. Furthermore, if a response needs to be returned, it is also mandatory for it to be encapsulated in a complete method. The part of spray-routing that we’re using here is the get directive, which defines the behaviour towards GET requests (quite logical, as it should).

Now, we have to define an actor that uses that route. This is really simple as it is completely based in Akka. The only rare (or new) thing is that we’ll have to define its behaviour with a method called runRoute, which requires a route. And which route is this? Well, naturally, it’ll be the one we’ve just defined in the ApiRoute trait:

import akka.actor.Actor
import spray.routing._

class ApiActor extends Actor with ApiRoute {

  def actorRefFactory = context

  def receive = runRoute(myApiRoute)
}

Now, all that remains is to run the API and bind it to a port. For this, the first thing we need to do is to create an ActorSystem. Once it is created, we create an actor of the type ApiActor (the actor we’ve just defined in the previous paragraph). Finally, we send a Bind message to it with the IP and port to which it should be binded:

import akka.actor.{ ActorSystem, Props }
import akka.io.IO
import akka.util.Timeout
import akka.pattern.ask

import spray.can.Http

import scala.concurrent.duration._

object Main extends App {

  implicit val system = ActorSystem("My-System")
  val apiActor = system.actorOf(Props[ApiActor])

  implicit val timeout = Timeout(3 seconds)

  IO(Http) ? Http.Bind(apiActor, interface = "localhost", port = 8080)

}

And this is it. Now, we just have to execute the application, open our favourite browser and go to localhost:8080 to see a wonderful yet surprising ‘HELLO WORLD!’ 🙂

tumblr_mamiy1kp641qcp26yo1_400

Graffiti Rules: Hello Word con Spray

Hoy toca empezar con una herramienta para realizar API REST: Spray. Para construir dichas API’s, Spray define un DSL (Lenguaje Específico de Dominio) con el que definir cuales son las rutas y las reacciones a las llamadas HTTP que recibamos. Existen otras alternativas para realizar este tipo de API’s: scalatra, Play, Lift … Sin embargo, este viene perfecto para luego hablar de su evolución: Akka-HTTP. Pero no vayamos tan rápido, vamos poco a poco.

computers20graffiti20error20technology20explorer201440x90020wallpaper_www-wallpaperfo-com_98

El objetivo de este POST es hacer una incursión de iniciación. Nos podemos ir contentos si conseguimos realizar un Hello World con Spray. Para ello, lo principal es conocer cuales son los módulos en los que se compone el framework. Aquí van algunos de los principales:

  • spray-can: servidor HTTP que nos permitirá levantar nuestra API.
  • spray-routing: DSL de definición de rutas para definir el comportamiento deseado. A diferencia de otros frameworks (como Play), las rutas están definidas en código en vez de en un fichero de texto o de configuración.
  • spray-http: módulo que se encarga de modelar y manejar el protocolo HTTP.
  • spray-json: módulo para serialización y deserialización de mensajes en formato JSON. Es reemplazable por otros módulos de serialización a JSON como play-json, jackson…
  • spray-testkit: pequeño DSL para realizar testing de los endpoints de una manera muy sencilla.

En este primer ejemplo, por su sencillez, no necesitaremos todos ellos. Ya iremos utilizando otros módulos más adelante. Por ello, lo primero que tenemos que hacer es construir nuestro fichero SBT con las dependencias necesarias. En nuestro caso nos basta con spray-can, spray-routing y akka. Por tanto, añadimos dichas dependencias al fichero build.sbt:

val akkaV = "2.3.9"
val sprayV = "1.3.3"

libraryDependencies ++= Seq(
    "io.spray" %% "spray-can" % sprayV,
    "io.spray" %% "spray-routing" % sprayV,
    "com.typesafe.akka" %% "akka-actor" % akkaV
  )

Una vez que tenemos la configuración lista, toca ponernos a escribir el código Scala. Es importante saber que Spray está construido sobre Akka. Como vamos a ver, mucha de la funcionalidad se basa en utilizar actores.

En primer lugar vamos a definir nuestra ruta. En dicha ruta simplemente marcaremos que si recibimos un GET responderemos con un Hello World:

import spray.routing._
trait ApiRoute extends HttpService {

 val myApiRoute =
   get {
     complete("Hello World!")
   }
}

Como se puede ver, es necesario hacer mixin de HttpService. Además, siempre que queramos devolver alguna respuesta será necesario encapsularlo en un método complete. La parte de spray-routing que utilizamos es la directiva get, que define el comportamiento para las peticiones de tipo GET (muy lógico todo).

Ahora, tenemos que definir un actor que utilice esta ruta. Esto es muy sencillo ya que se basa totalmente en Akka. Lo único raro (o nuevo) es que tendremos que definir su comportamiento basándonos en un método llamado runRoute que define una ruta. ¿Y cuál será dicha ruta? Pues, por supuesto, será la que acabamos de definir en el trait ApiRoute:

import akka.actor.Actor
import spray.routing._

class ApiActor extends Actor with ApiRoute {

  def actorRefFactory = context

  def receive = runRoute(myApiRoute)
}

Ahora queda lanzar la API y atarla a un puerto. Lo primero que necesitaremos realizar es crear un ActorSystem. Una vez creado el ActorSystem, creamos un actor de tipo ApiActor (el actor que acabamos definir en el paso anterior). Finalmente le enviamos un mensaje de tipo Bind indicando la IP y el puerto donde se debe conectar:

import akka.actor.{ ActorSystem, Props }
import akka.io.IO
import akka.util.Timeout
import akka.pattern.ask

import spray.can.Http

import scala.concurrent.duration._

object Main extends App {

  implicit val system = ActorSystem("My-System")
  val apiActor = system.actorOf(Props[ApiActor])

  implicit val timeout = Timeout(3 seconds)

  IO(Http) ? Http.Bind(apiActor, interface = "localhost", port = 8080)

}

Y ya está. Ahora solo queda arrancar la aplicación, abrir nuestro navegador favorito e ir a localhost:8080 para poder ver un maravilloso a la par que sorprendente “HELLO WORLD!” 🙂

tumblr_mamiy1kp641qcp26yo1_400

Distributed key-value registry with Akka Clustering

Almost a year ago, I wanted to see how good Akka was, and how easy it would be to create a distributed hash with Akka Clustering…

So I created a sample project to play with it a little bit (https://github.com/roclas/akka-distributed-hash – I actually changed a couple of things just before writing this post).

It consist of a distributed key-value registry supported by different akka instances with the same code launched on different nodes.

39a0aab0c5b10c7204bc38788dfac55d02140c91f8b07ea4f036d673eec97379

The idea behind it is very straight forward: you run the application a few times simultaneously (passing a different port number as a parameter) and once the first node (if it is in the seeds list) has started, you can connect as many nodes to the cluster as you want. Nodes (in this case, actors) have a hash that is modified through HTTP requests, and that hash’s changes are propagated across the cluster, so that every node ends up having the same information.

The seed nodes are on src/main/resources/application.conf

When the application starts, one of the first thing it does is trying to join the cluster. In order to do that, it goes through the list of seeds and tries to connect to the first available node.

A very interesting fact is that if the node is the first element of the seeds list, and it is not the first node we start, we will most likely end up having an island in the cluster (when it finds itself as the first element of the seeds list, it will connect to itself and will create an island). The previously started nodes will be in their own sub-cluster, and the subsequently created nodes will get connected to this node, creating an isolated second sub-cluster.

akka clustering graph

I have avoided this problem by doing a trick (maybe you could find a more elegant way?):

val selfAddress = Cluster(system).selfAddress
val seeds= system.settings
  .config.getList("akka.cluster.seed-nodes")
System.setProperty("akka.cluster.seed-nodes",
seeds filter(!_.toString.contains(selfAddress.toString)) toString)

As a general rule, we could say it is not a bad idea to remove your own address from the seeds list before joining the cluster (by doing that, you would not have any problem; but I did not want to keep a different config file for each node I create and that’s why each of them programmatically removes itself from the seeds list).

Once the actor is connected to the cluster, I can receive cluster messages such as:

MemberUp (when a new node joins the cluster)
MemberRemoved (when a node leaves the cluster)

Each actor will have a hash object and a tiny HTTP server (that will respond to get, put and delete requests). Put requests will add elements to the actor’s internal hash, and remove requests will delete them.

And from this point on, you can let your imagination run wild:

  • Every time we modify an actor’s hash, it will send a message  notifying its peers of the change, so that they can change their hash too
  • After actualizing the hash, by another actors request, it sends a message back with its hash’s md5 so that the original actor can check if the changes were successfully made
  • When a new member joins the cluster, it synchronizes its state (its hash) with the other nodes

Please download the zip or clone (git@github.com:roclas/akka-distributed-hash.git) the project, and try it yourselves; it is so easy to try and very fun to play with. It’s a very simple example that you can use as a base for more complex projects. Everything is explained on the README.md file.

Also, don’t forget to tell us of useful things you think you could do with Akka Clustering. Fork, use and improve the project, and suggest us things we could do with this awesome technology.