Scalera tip: Mantén estado en tu actor sin usar un solo VAR

Es de todos sabido que usar VARs es algo que, aparte de mal visto, está mal, es el infierno en vida, hace que mueran gatitos y muchas otras perlitas que probablemente ya habréis oído antes y que por poco os ha causado una muerte lenta y dolorosa en el cadalso.

La esencia en programación funcional es, por tanto, la inmutabilidad: cada vez que muto un elemento, genero uno nuevo.

What about Akka actors?

Cuando hablamos de actores, podemos definirlos como unidades con estado que procesan de manera secuencial una cola de mensajes, asociando (o no) a cada uno de estos mensajes una cierta lógica computacional.

Siempre se ha dicho que para mantener dicho estado dentro de la lógica de un actor, no pasaba nada si usabas un var:

Es imposible que hayan problemas de concurrencia: solo el propio actor tiene acceso a dicho VAR y procesará un solo mensaje al mismo tiempo.

Pero quizás podamos renunciar a esta premisa si buscamos una manera de redefinir el comportamiento del actor en base a un nuevo estado.

Mortal approach

Siguiendo la filosofía antes descrita, la primera (y más directa) aproximación para mantener el estado en un actor se parecería bastante a lo siguiente:

class Foo extends Actor{
  var state: Int = 0
  override def receive = {
    case Increase => state += 1
  }
}

Cada vez que llega un mensaje Increase, modificamos el valor de state, sumando 1.
Hasta aquí nada complicado, ¿no?

Immutable approach

Sin embargo, podríamos definir una función receive que estuviera parametrizada por un cierto estado, de manera que cuando llegue un mensaje, el estado a tener en cuenta sea este parámetro.

Si se diera la circunstancia de tener que actualizar el valor de dicho estado, bastaría con invocar al método become que modifica el comportamiento del actor. En nuestro caso, dicha modificación del comportamiento consistiría en cambiar el valor del estado.

Si usamos el mismo ejemplo que antes:

class Foo extends Actor{
  def receive(state: Int): Receive = {
    case Increase =>
      context.become(
        receive(state + 1),
        discardOld = true)
  }
  override def receive = receive(0)
}

vemos que la función que define el receive en base al estado recibe un parámetro denominado state. Cuando llega un mensaje de tipo Increase, lo que hacemos es invocar a become para modificar el comportamiento del actor, pasando como argumento el nuevo estado a tener en cuenta.

Si queremos mejorar un poco la legibilidad, podríamos incluso abstraer todo actor con estado actualizable:

trait SActor[State] extends Actor {
  val initialState: State
  def receive(state: State): Receive
  def update(state: State): Receive =
    context.become(
      receive(state),
      discardold = true)
  override def receive =
    receive(initialState)
}

de manera que se especifique el estado inicial del actor, una nueva función de receive que queda parametrizada por el nuevo estado a gestionar, y una nueva función de update que se encarga de realizar la llamada a become como antes explicábamos.
Con todo ello nos queda un nuevo actor Foo mucho más curioso:

class Foo extends SActor[Int] {
  val initialState = 0
  def receive(state: Int): Receive = {
    case Increase => update(state +1)
  }
}

Potential hazardous issues

Nótese que en el ejemplo de antes hemos pasado un segundo argumento: discardOld = true. Este argumento indica si el comportamiento nuevo debe apilarse sobre el anterior o si por el contrario debe sustituirlo por completo.

Supongamos que usáramos un discardOld = false. Si cada vez que llegase un mensaje de incremento, apilásemos un nuevo comportamiento, podríamos llegar a tener un problema de desbordamiento.

Hasta el próximo briconsejo.

Agur de limón 🙂

Anuncios

Scalera tips: How NOT to change your actor’s state

A common key when working with Akka, is modifying properly our actor’s state. If we jog our memory, this framework paradigm allows us to program modeling the concurrency based on actors and message passing. From that base, we could define an actor as a computing unit that may have state and perform tasks based on messages that it will receive through its mailbox and that will be processed sequentially .

That means that, in order to avoid side effects, the actor’s state modification has to take place when processing a message. So senseful so far. However, it’s a pretty common mistake to do something similar to this:

class MyActor extends Actor {

  var state: Int = 0

  def receive = {

    case "command" => 
      Future(state = 1)

    case "someOtherCommand" => 
      state = 2

  }

}

In this case, we have no more warranty that the state change (whose only responsible of keeping it consistent and thread safe is the actor) might cause side efects given that, in the precise moment where the Future modifies the var, it’s possible that the state is being modified by the actor itself (probably as a reaction to some other received message).

This Future[Unit] might not be a block like that. It could be the result of having asked to some other actor:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState").map{
        case newState: State => state = newState
      }

    case "someOtherCommand" => 
      state = 2
  }

}

Something that probably none of us has ever tried.

giphy

The proper way

If we want to modify the actor’s state as result of having previously asked to some other actor and without breaking the concurrency control of the actor, it could be achieved like this:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState") pipeTo self

    case "someOtherCommand" => 
      state = 2

    case newState: State => 
      state = newState
  }

}

With pipeTo we specify to send to certain actor the result of having evaluated some future when its resolved. This way we’re indicating that, when we get the response of the other actor, it will be sent to our mailbox, so it will be processed like a normal message, sequentially.

bill_murray_gif_1

Easy peasy 🙂

Scalera tips: Como NO modificar el estado de tu actor

Una cuestión habitual a la hora de trabajar con Akka, es modificar de manera correcta el estado de nuestro actor. Si recordamos la base del paradigma de este framework que nos permite programar modelando la concurrencia en base a actores y el paso de mensajes, es que un actor puede definirse como una unidad computacional que puede tener estado y realiza tareas en base a mensajes que recibe en su mailbox y que procesará de manera secuencial.

Esto significa que, para no tener efectos de lado, es necesario que la modificación del estado del actor se haga al procesar un mensaje. Hasta aquí todo tiene sentido. No obstante, es un fallo bastante común el hacer algo del siguiente estilo:

class MyActor extends Actor {

  var state: Int = 0

  def receive = {

    case "command" => 
      Future(state = 1)

    case "someOtherCommand" => 
      state = 2

  }

}

En ese caso, ya no tenemos garantía de que el cambio de estado (cuyo único responsable de mantenerlo consistente y thread-safe es el actor) puede generar efectos de lado dado que en el momento en que futuro modifica el var es posible que el estado esté siendo modificado por el propio actor, desencadenado por el procesamiento de otro mensaje.

Este Future[Unit] puede no ser un bloque como tal, sino el resultado de haber preguntado a otro actor:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState").map{
        case newState: State => state = newState
      }

    case "someOtherCommand" => 
      state = 2
  }

}

Algo que probablemente nadie de nosotros haya intentado jamás.

giphy

La forma correcta

En caso de querer modificar el estado del actor como resultado de dicha consulta a otro actor sin romper el control de concurrencia sobre el estado, se podría hacer como sigue:

class MyActor extends Actor {

  type State = Int

  var state: State = 0

  def receive = {

    case "command" => 
      (service ? "giveMeMyNewState") pipeTo self

    case "someOtherCommand" => 
      state = 2

    case newState: State => 
      state = newState
  }

}

Con pipeTo lo que hacemos es mandar a cierto actor el resultado de evaluar un futuro cuando este se resuelva. De esta manera estamos indicando que, cuando tengamos la respuesta del otro actor, se envie a nuestro mailbox, de manera que se procesará como otro mensaje más, de manera secuencial.

bill_murray_gif_1

Easy peasy 🙂

Distributed key-value registry with Akka Clustering

Almost a year ago, I wanted to see how good Akka was, and how easy it would be to create a distributed hash with Akka Clustering…

So I created a sample project to play with it a little bit (https://github.com/roclas/akka-distributed-hash – I actually changed a couple of things just before writing this post).

It consist of a distributed key-value registry supported by different akka instances with the same code launched on different nodes.

39a0aab0c5b10c7204bc38788dfac55d02140c91f8b07ea4f036d673eec97379

The idea behind it is very straight forward: you run the application a few times simultaneously (passing a different port number as a parameter) and once the first node (if it is in the seeds list) has started, you can connect as many nodes to the cluster as you want. Nodes (in this case, actors) have a hash that is modified through HTTP requests, and that hash’s changes are propagated across the cluster, so that every node ends up having the same information.

The seed nodes are on src/main/resources/application.conf

When the application starts, one of the first thing it does is trying to join the cluster. In order to do that, it goes through the list of seeds and tries to connect to the first available node.

A very interesting fact is that if the node is the first element of the seeds list, and it is not the first node we start, we will most likely end up having an island in the cluster (when it finds itself as the first element of the seeds list, it will connect to itself and will create an island). The previously started nodes will be in their own sub-cluster, and the subsequently created nodes will get connected to this node, creating an isolated second sub-cluster.

akka clustering graph

I have avoided this problem by doing a trick (maybe you could find a more elegant way?):

val selfAddress = Cluster(system).selfAddress
val seeds= system.settings
  .config.getList("akka.cluster.seed-nodes")
System.setProperty("akka.cluster.seed-nodes",
seeds filter(!_.toString.contains(selfAddress.toString)) toString)

As a general rule, we could say it is not a bad idea to remove your own address from the seeds list before joining the cluster (by doing that, you would not have any problem; but I did not want to keep a different config file for each node I create and that’s why each of them programmatically removes itself from the seeds list).

Once the actor is connected to the cluster, I can receive cluster messages such as:

MemberUp (when a new node joins the cluster)
MemberRemoved (when a node leaves the cluster)

Each actor will have a hash object and a tiny HTTP server (that will respond to get, put and delete requests). Put requests will add elements to the actor’s internal hash, and remove requests will delete them.

And from this point on, you can let your imagination run wild:

  • Every time we modify an actor’s hash, it will send a message  notifying its peers of the change, so that they can change their hash too
  • After actualizing the hash, by another actors request, it sends a message back with its hash’s md5 so that the original actor can check if the changes were successfully made
  • When a new member joins the cluster, it synchronizes its state (its hash) with the other nodes

Please download the zip or clone (git@github.com:roclas/akka-distributed-hash.git) the project, and try it yourselves; it is so easy to try and very fun to play with. It’s a very simple example that you can use as a base for more complex projects. Everything is explained on the README.md file.

Also, don’t forget to tell us of useful things you think you could do with Akka Clustering. Fork, use and improve the project, and suggest us things we could do with this awesome technology.

 

 

Registro clave-valor distribuido con Akka Clustering

Hace casi un año, quise poner a prueba cómo de bueno era Akka y lo sencillo que sería crear un registro distribuido con Akka Clustering…

De modo que creé un proyecto de prueba para cacharrear un poco con ello (https://github.com/roclas/akka-distributed-hash – de hecho, he cambiado un par de cosas para dar forma a este post).

Se trata de un registro clave-valor distribuido quese apoya en distintas instancias de Akka con el mismo código, lanzadas en diferentes nodos.

39a0aab0c5b10c7204bc38788dfac55d02140c91f8b07ea4f036d673eec97379

La idea, en esencia, es muy sencilla: lanzas varias instancias de la aplicación al mismo tiempo (especificando puertos distintos como parámetros) y, una vez el primer nodo (se se encuentra en la lista de nodos semilla) ha empezado, puedes conectar tantos nodos del cluster como quieras. Los nodos (en este caso, actores) tienen una clave o hash que se modifica a través de peticiones HTTP, y dichos cambios se propagan por el cluster, de manera que todo nodo acaba por tener la misma información replicada.

Los nodos semilla se especifican en src/main/resources/application.conf

Cuando la aplicación ha arrancado, una de las primeras cosas que hace es intentar unirse al cluster. Para hacer esto, comprueba la lista de nodos semilla e intenta conectarse al primer nodo disponible.

Un dato muy interesante es que si el nodo se encuentra en la primera posición de la lista de semillas, si no es el primer nodo que arrancamos, es muy probable que acabe por generar una isla en el cluster (cuando se encuentra a si mismo como el primer nodo disponible en la lista de semillas sin ser el primero de dicha lista, se conecta a sí mismo creando así una isla). Los nodos que hayan arrancado previamente se encontrarán en su propio subcluster, y los nodos creados a partir de entonces se conectaran a este último, creando así un segundo sub-cluster aislado.

akka clustering graph

He evitado este problema usando un pequeño truco (es probable que haya una forma más elegante de hacerlo):

val selfAddress = Cluster(system).selfAddress
val seeds= system.settings
  .config.getList("akka.cluster.seed-nodes")
System.setProperty("akka.cluster.seed-nodes",
seeds filter(!_.toString.contains(selfAddress.toString)) toString)

Como normal general, podríamos decir que no es una mala idea eliminar la propia dirección de la lista antes de unirse al cluster (así evitamos el problema, pero sin tener que mantener un fichero de configuración distinto por cada nodo que cree, eliminándose a sí mismo de manera programática de la lista).

Una vez que el actor se ha conectado al cluster, pueden recibirse mensajes como:

MemberUp (when a new node joins the cluster)
MemberRemoved (when a node leaves the cluster)

Cada actor tiene un objeto registro y un servidor HTTP ligero (que responderá a peticiones de GET, PUT y DELETE. Las peticiones PUT añadirán elementos al registro interno del actor y las peticiones DELETE los eliminarán.

Y a partir de este punto, podéis hacer tanto como os imaginéis:

  • Cada vez que se modifique el registro de un actor, enviar un mensaje a los compañeros para notificarles el cambio y que así modifiquen su registro interno
  • Después de actualizar el registro, mediante otra petición de actor, mandar un mensaje de vuelta con su md5 para que el actor original compruebe si los cambios se han realizado correctamente.
  • Cuando un nuevo miembro se una al cluster, sincronizar su estado (su registro) con el resto de nodos.

Podéis descargaros el zip  o clonar (git@github.com:roclas/akka-distributed-hash.git) el proyecto, y probarlo por vosotros mismo; es muy sencillo cacharrear con ello y se puede usar como código base para proyectos más complejos (se explica todo en el fichero README.md.

Por último, contadnos que cosas útiles se podrían hacer con Akka Clustering. Forkead, usad y mejorad el proyectoFork, use and improve the project, y sugeridnos que cosas se podrían hacer con esta tecnología tan alucinante.

Hello World in Akka (Part I)

Today we are going on a different matter. Since the reactive programming course is currently being given in Coursera, we are taking the first steps to start working with Akka.

Akka is a toolkit developed by the Typesafe ecosystem. It allows the implementation of reactive systems on an actor-based model. There are two versions of Akka, one for Java and one for Scala. We will focus, obviously, on the second.

What’s an actor?

An actor in an entity to which messages can be sent. The actor will react to the received messages by performing an action. Besides, it has two main characteristics:

  • The inside of the actor can’t be accessed from outside. The interaction with the actor can only be achieved by sending messages to it.
  • Messages that arrive to the actor will be processed one at a time.

To create an actor in Akka – please make an extra comprehension effort – the only thing that is required is extending the trait Actor.

If we want the actor to be fully defined, its behaviour needs to be specified. In order to do so, the receive method of the actor needs to be implemented. This method will define the reaction of the actor to the different messages that it might receive.

But before going on with actors, tool time!

timallen

Tool time: Partial functions

Partial functions are functions that are only defined for a limited set of arguments. From a mathematical point of view, we could say that its domain is bounded. This concept may become clearer with a simple example:

def getCountry: PartialFunction[String, String] = {
  case "Madrid" => "Spain"
  case "Paris" => "France"
  case "Roma" => "Italy"
}

In this case, the getCountry function will return the country only for the capital cities defined in the partial function. If any other capital city is used as input to this function, it will throw a MatchError.

getCountry("Madrid") //returns "Spain";
getCountry("Berlin") //throws MatchError

Moreover, partial functions have a method called isDefinedAt, which allows us to know if a certain input parameter is considered in its body. In our example:

getCountry.isDefinedAt("Madrid") //returns true
getCountry.isDefinedAt("Berlin") //returns false

Another important trick is that partial functions can be combined to create a new one. To do so, orElse method is used.

def getCountryEurope: PartialFunction[String, String] = {
  case "Madrid" => "Spain"
  case "Paris" => "France"
  case "Roma" => "Italy"
}

def getCountryAmerica: PartialFunction[String, String] = {
  case "Brasilia" => "Brazil"
  case "Buenos Aires" => "Argentina"
}

def getCountry =
  getCountryEurope orElse getCountryAmerica

getCountry.isDefinedAt("Madrid") //returns true
getCountry.isDefinedAt("Brasilia") //returns true

To end up with the PartialFunction masterclass, it is important to know the method applyOrElse, with which we can evaluate a partial function with an input value but also define an auxiliary function to use in case the input value does not belong to the domain. Sorry, what? Again, an example might save us:

def unhandled(capital: String) = s"$capital is not defined"
getCountry.applyOrElse("Foo", unhandled) //Foo is not defined

Hello World Actor

Good! Now we know what partial functions are, we can implement the receive method of the actor, which will define the actor’s behaviour. That receive method, as you all might have guessed, is a partial function actually.

type Receive = PartialFunction[Any, Unit]

def receive: Receive

This partial function expects any type as input parameter and returns Unit. Therefore, thanks to the argument type Any, anything can be sent as a message to the actor: a string, an integer, an object…

def receive {
  case num: Int => println(s"Num $num received")
  case words: String => println(s"String $words received")
  case _ => println("I've received other thing")
}

In addition, we must know that when a message arrives to an actor, at first the partial function defined at the receive method tries to be applied and, in case the message is not defined in the domain, the unhandled method is executed:

def unhandled(msg: Any): Unit

By default, this method will print the unhandled messages in a log. This way, if a message cannot be handled by receive method, instead of causing a MatchError and blowing up our program, a debug message will be printed in Akka’s log. Using our high-level knowledge in partial functions, we can guess that the behaviour of an actor for a given message called msg will be defined as follows:

receive.applyOrElse(msg, unhandled)

All that remains is to define the actor that, in our case, no matter what the received message is, will print a lovely and passionate Hello World:

class HelloWorldActor extends Actor {

  def receive {
    case _ => println("Hello World!")
  }
}

Voila! The next post will teach us how we can use this actor and put it into operation.