Transformando el futuro

Hace ya unas cuantas semanas estuvimos hablando sobre el tipo Future para crear llamadas asíncronas.
Vimos como trabajar con llamadas bloqueantes para obtener el valor del futuro. También utilizamos callbacks para obtener el resultado del futuro de forma asíncrona. Sin embargo se quedaron algunos puntos en el tintero. Me refiero a transformar los Future sin bloquear la ejecución.

Transformaciones de Futuros

Para transformar los futuros, al igual que con otros tipos básicos de Scala, se usa principalmente los métodos map y flatmap.

El método map

El método map nos permite cambiar el contenido de un futuro aplicando una función. Por ejemplo, si tenemos un método que nos permite obtener el primer millón de números primos, pero queremos transformarlo para que solo nos devuelva los cien primeros, podemos aplicar el método map de la siguiente manera:

def getFirstMillionOfPrimes(): Future[List[Int]] = ???

getFirstMillionOfPrimes().map(
  (list: List[Int]) => list.take(100)
)

De esta forma estamos transformando el interior del Futuro sin romper la asincronía.

pi2band2bi

El método flatMap

Por otro lado, el método flatMap nos permite aplicar una función al contenido del futuro, que devuelve un Futuro a su vez. Después, aplica una operación de flatten para convertir el Future[Future[A]] en un simple Future[A]. What the f…? Se entiende mejor con un ejemplo.

Imaginemos que queremos concatenar en una cadena de texto el primer millón de números primos. Para ello utilizamos un nuevo método:

def concatenate(l: List[Int]): Future[String] = ???

y ahora realizamos un flatMap

getFirstMillionOfPrimes().flatMap(
  (list: List[Int]) => concatenate(list)
) //Future[String]

¿Y como podemos componer hacer todo esto de una forma más sencilla?

Pues muy sencillo. ¡For comprehenssion al rescate! Aplicando un poco de syntactic sugar podemos tener un código mucho más legible.
Basta con hacer lo siguiente:

for {
  primes <- getFirstMillionOfPrimes()
  primesString <- concatenate(primes)
} yield primesString

De esta manera, no se aplicará la operación de concatenación hasta que no se hayan obtenido los números primos mediante el método getFirstMillionPrimes.
Esta permite guardar un cierto orden a la hora de hacer composición de llamadas asíncronas. Además, si la primera llamada asíncrona falla, no se efectuará la segunda.

Y esto es todo por hoy. Ahora ya sabes como cambiar el Futuro. Una lástima no poder cambiar el pasado 😦

doesnt-go-into-girls-shower

¡Hasta la próxima!

Anuncios

Scalera tips: Referential transparency

We have already talked in other posts about several details of pure functional programming and the importance of avoiding side effects wherever possible.

Should we investigate a bit further about it, we would likely run into the concept of referential transparency. Today it’s time to see what this concept is about.

giphy

Referential transparency is strongly linked to the substitution model. If we want to use Scala and exploit its full potential as a functional programming language, we must keep this concept in mind. Despite its weird name, it is pretty easy to understand. Referential transparency means that a function of type I => O ought to be replaceable by a value of type O, without that entailing any loss of functionality. This way, we can be sure that the function has no side effects.

Side effects: a function has no side effects if it only transforms the input arguments into output arguments and does nothing else. Connecting to a database, printing on screen, writing logs or modifying a variable outside the scope of the function are considered to be side effects.

Let’s see an example with code. In this example we are going to create a function that will allow us to book a hotel room. The function will update the hotel’s reservations list and will return an identifier for the user:

var reservations: List[Reservation] = List.empy[Reservation]

def reserveRoom(roomNumber: Int, from: String, to: String): Int = {
  val id = generateId()
  reservations = Reservation(roomNumber, from, to, id) :: reservations
  id
}

val myReservation: Int = reserveRoom(1, "1/8/16", "15/8/16")

In this case, we are modifying a list which is outside the scope of the function. This is a side effect. Therefore, if we performed a substitution by any value belonging to the function domain, we would lose functionality as nothing would be added to the reservations list:

var reservations: List[Reservation] = List.empy[Reservation]

val myReservation: Int = "1" //Result of reserveRoom method

reservations.isEmpty //true....where is my reservation???

How can referential transparency be achieved? A simple option would be to return both the new updated list and the reservation identifier:

var reservations: List[Reservation] = List.empy[Reservation]

def reserveRoom(
  roomNumber: Int,
  from: Date,
  to: Date
): (Int, List[Reservation]) = {
  val id = generateId()
  (id, Reservation(roomNumber, from, to, id) :: reservations)
}

val (myReservation, reservationsUpdated) = reserveRoom(1, "1/8/16", "15/8/16")
reservations = reservationsUpdated

This way, if we repeated the exercise of substituting by a value, we wouldn’t be losing any information at all. And that’s all for today! See you soon 😉

Scalera tips: Transparencia referencial

Ya hemos hablado en otros post sobre algunos detalles de la programación funcional pura o sobre la importancia de evitar los efectos de lado siempre que sea posible.

Si investigamos un poco en el tema, es posible que nos encontremos con el concepto de transparencia referencial. Hoy vamos a ver a qué se refiere este concepto.

giphy

La transparencia referencial está fuertemente ligada con el modelo de sustitución. Si queremos utilizar Scala utilizando toda su potencia como lenguaje de programación funcional, es necesario que tengamos en mente este concepto. A pesar de tener un nombre algo raruno, es bastante fácil de entender. La transparencia referencial indica que una función de tipo E => S debería poder ser sustituda por un valor de tipo S sin que eso supusiera una pérdida de funcionalidad. De esta forma, podemos estar seguros de que no la función no tiene efectos de lado.

Efectos de lado: una función no tiene efectos de lado si solamente transforma los argumentos de entrada en los de salida, y no realiza absolutamente nada más. Conectarse con una base de datos, imprimir por pantalla, crear logs o modificar una variable fuera del scope de la función, se consideran efectos de lado.

Vamos a ver un ejemplo con código. En este ejemplo vamos a crear una función que permita reservar una habitación de hotel. La función actualizará la lista de reservas del hotel y además devolverá un identificador para el usuario:

var reservations: List[Reservation] = List.empy[Reservation]

def reserveRoom(roomNumber: Int, from: String, to: String): Int = {
  val id = generateId()
  reservations = Reservation(roomNumber, from, to, id) :: reservations
  id
}

val myReservation: Int = reserveRoom(1, "1/8/16", "15/8/16")

En este caso, estamos modificando una lista que está fuera del ámbito de la función. Esto es un efecto de lado. Por tanto, si realizamos una sustitución por un valor cualquiera perteneciente al dominio de la función realmente estamos perdiendo funcionalidad porque ya no se está añadiendo nada a la lista de reservas:

var reservations: List[Reservation] = List.empy[Reservation]

val myReservation: Int = "1" //Result of reserveRoom method

reservations.isEmpty //true....where is my reservation???

¿Cómo podríamos cumplir la transparencia referencial? Una opción sencilla sería devolver, tanto una nueva lista actualizada, como el identificador de la reserva:

var reservations: List[Reservation] = List.empy[Reservation]

def reserveRoom(
  roomNumber: Int,
  from: Date,
  to: Date
): (Int, List[Reservation]) = {
  val id = generateId()
  (id, Reservation(roomNumber, from, to, id) :: reservations)
}

val (myReservation, reservationsUpdated) = reserveRoom(1, "1/8/16", "15/8/16")
reservations = reservationsUpdated

De esta manera, si repetimos el ejercicio de sustituir por un valor, no perdemos información ¡Y ya hemos acabado! Hasta la próxima 😉

Spark: RDD basic operations

After more than a year publishing Scalera posts, I think the time for scratching one of the most important tools in Scala ecosystem has arrived. Of course, I’m talking about Spark.

Spark is a framework that allows parallelizing collections and their process. This process might be batch map-reduce tasks over different data sources (HDFS, Cassandra, ElasticSearch, …), streaming process of certain data flows (Kafka, Flume, …); or perform different data sources queries in some unified way with a query language like SQL. But that’s too much rock&roll. We’ll cover today the basic data type that is used in Spark: RDD.

What’s an RDD?

The RDD type (Resilient Distributed Dataset) looks like many other Scala collections. However, it’s important to get to know some of its main features:

  • It’s a distributed collection. This means that it’s partitioned among the different Spark nodes (known as workers).
  • They’re immutable: when you apply a transformation over an RDD, we’re actually creating a new one.
  • It’s lazily evaluated. With RDD’s, we’re just defining the information flow, but it won’t be evaluated at its definition moment, but at the moment when you apply an action over the RDD.funny-meme-super-lazy1

Besides, it’s good to know that you can perform two different type of operations on an RDD: transformations and actions.

Great, but how do I create them?

There are several ways to do so:

  • Parallelizing an in-memory collection, like a list of values. For doing so, we’ll use the parallelize method of the SparkContext.
    val newRDD: RDD[Int] = 
      sc.parallelize(List(1, 2, 3, 4))
    
  • From some data source using, for example, the textFile function of the SparkContext.
    val newRDD: RDD[Int] = 
      sc.textFile("myValues.txt")
    
  • Transforming an RDD by applying a transformation in order to create a new RDD from another one.
    val newRDD: RDD[String] = 
      intValues.map(_.toString)
    

What about that transformations stuff …?

Transformations define how the information flow will change by generating a new RDD. Some of these transformations are:

  • map: applies a function for transforming each collection element:
    intValues.map(_.toString) // RDD[String]
    
  • filter: select the subset of elements that match certaing boolean expression:
    intValues.filter(_.isOdd)// RDD[Int]
    
  • flatMap: apart from applying the map function, it flattens the returning collection:
    textFile.map(_.split(" ")) //RDD[Array[String]] but ...
    textFile.flatMap(_.split(" ")) //RDD[String]
    

You spoke about actions, didn’t you?

Actions will allow us to evaluate the RDD and return some result. This way, the whole defined data flow that represents the RDD is evaluated. Any example? Some of them:

  • count: it returns the total amount of elements:
    sc.parallelize(List(1, 2, 3, 4)).count //4
    
  • collect: it returns the WHOLE collection inside an in-memory array:
    sc.parallelize(List(1, 2, 3, 4)).collect // Array(1, 2, 3, 4)
    

    So, beware! If the RDD size doesn’t fit into the driver’s assigned memory, the program will crash.

  • saveAsTextFile: it pipes the collection to some text file:
    intValues.saveAsTextFile("results.txt")
    

 

So how can I apply all of these with Spark? We’ll find out the way in a week, with a practical case that will join both Twitter and Spark Streaming functionalities for performing some basic analytics in an easy, simple, g-rated way. So we’ll be able to make profit of it and feel both powerful and geek at the same time.

aad69873-6bcc-4a0d-84eb-abe375f34c6c

Spark: Operaciones básicas con RDD’s

Después de más de un año publicando entradas en Scalera creo que ya ha llegado el momento de tocar un poco una de las herramientas más importantes del ecosistema Scala. Por supuesto, me refiero a Spark.

Spark es un framework que permite paralelizar colecciones y su procesamiento. Este procesamiento pueden ser tareas batch tipo Map-Reduce sobre multitud de orígenes de datos (HDFS, Cassandra, ElasticSearch, …), procesamiento en streaming de ciertos flujos de datos (Kafka, Flume, …); o realizar consultas con distintas fuentes de datos de manera unificada con un lenguaje de consulta tipo SQL.
Pero hoy no vamos a abarcar tanto. Hoy vamos a conocer el tipo de dato básico que se utiliza en Spark: el tipo RDD.

¿Qué es un RDD?

El tipo RDD (Resilient Distributed Dataset) se parece mucho a otras colecciones de Scala. Sin embargo es importante conocer alguna de sus características principales:

  • Es una colección distribuida. Esto quiere decir que está particionada entre los distintos workers de Spark.
  • Son inmutables: cuando transformamos un nuevo RDD realmente estamos creando uno nuevo.
  • Su evaluación es perezosa. Con los RDD’s estamos definiendo un flujo de información, pero no se ejecuta en el momento de definición, sino en el momento en el que se evalúe aplicando una acción sobre el RDD.funny-meme-super-lazy1

Además, es importate saber que sobre los RDD’s se pueden realizar dos tipos de operaciones: transformaciones y acciones.

Genial, ¿pero cómo los creo?

Existen varias formas para crear un RDD:

  • Paralelizando una colección en memoria, por ejemplo, una lista de valores. Para ello utilizamos el método parallelize del SparkContext.
    val newRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4))
    
  • A partir de una fuente de almacenamiento utilizando, por ejemplo, la función textFile del SparkContext.
    val newRDD: RDD[Int] = sc.textFile("myValues.txt")
    
  • Transformando un RDD aplicando una transformación para crear un nuevo RDD a partir de otro.
    val newRDD: RDD[String] = intValues.map(_.toString)
    

¿Y eso de las transformaciones …?

Las transformaciones definirán como cambiará el flujo de información generando un nuevo RDD. Con estas transformaciones no se está evaluando el RDD, sino creando uno nuevo. Algunas de las transformaciones son:

  • map: aplica una función a cada elemento de la colección:
    intValues.map(_.toString) // RDD[String]
    
  • filter: selecciona el subconjunto de elementos que cumplen una determinada expresión booleana:
    intValues.filter(_.isOdd)// RDD[Int]
    
  • flatMap: además de realizar una función map, aplica un método flatten:
    textFile.map(_.split(" ")) //RDD[Array[String]] but ...
    textFile.flatMap(_.split(" ")) //RDD[String]
    

Habías dicho acciones, ¿no?

Las acciones nos permitirán evaluar un RDD y devolver un resultado. De esta forma se ejecuta todo el flujo de datos definido. ¿Algún ejemplo? Aquí van algunos:

  • count: nos devuelve el número total de elementos:
    sc.parallelize(List(1, 2, 3, 4)).count //4
    
  • collect: nos vuelca toda la colección distribuida en un array en memoria:
    sc.parallelize(List(1, 2, 3, 4)).collect // Array(1, 2, 3, 4)
    

    Ojo, cuidao. Si el RDD es muy grande, podemos tener problemas al volcar toda la colección en memoria.

  • saveAsTextFile: nos vuelca la información en un fichero de texto:
    intValues.saveAsTextFile("results.txt")
    

 

¿Y esto como lo puedo usar con Spark? Dentro de una semana lo veremos con un caso práctico que aunará Twitter con Spark Streaming para hacer analitycs de una forma fácil, sencilla y para toda la familia. De esta forma podremos sacarle todo el partido que queramos y sentirnos poderosos y geeks al mismo tiempo.

aad69873-6bcc-4a0d-84eb-abe375f34c6c

Scalera tips: default parameters and overloading

It’s time today for a short post which hopefully will help some of you in discovering a new world. Today we are talking about the problems that may arise due to overloading when we have default parameters.

Let’s start defining the basic concepts:

– Overloading: …really?
– Default parameters: in Scala, it is possible to define default parameters in methods. These parameters can be obviated in the method call. With an example:

def getUri(host: String = "localhost", port: Int = 8080): String =
"$host:$port"

getUri("127.0.0.1", 8081) //"127.0.0.1:8081"
getUri(port = 8081) //"localhost:8081"
getUri("127.0.0.1") //"127.0.0.1:8080"
getUri() //"localhost:8080"

Great… and what’s the problem with it?

Let’s create the following trait:

trait A {
  def a(a: Int, p: Boolean = false)
  def a(b: Boolean = false)
}

We now compile and…

giphy2

error: in trait A, multiple overloaded alternatives of method a define default arguments.

However, if we take away the default parameter in the first function:

trait A {
  def a(a: Int, p: Boolean)
  def a(b: Boolean = false)
}

and we compile …..

giphy1

everything works.

What’s the reason for all this mess?

The problem is that the compiler uses default parameters to generate the names of some auxiliary functions that will help in dealing with the methods with default values (we shouldn’t forget that we are still in the JVM). Let’s see the example that is shown in the documentation:

def f(a: Int = 1, b: String)
// generates a method: def f$default$1 = 1
f(b = "3")
// transformed to: f(b = "3", a = f$default$1)

As can be observed, a new function is generated and the call to the original method is overloaded by making use of this function.

After this, we can say that in our messy example, the two auxiliary functions that are created will have this name:

def a$default$1 = false

and this is why a name conflict will arise.

In the second case, given that there is no default parameter in the first method, only one new function will be generated and thus, no conflict will appear.

And this is how our post ends 🙂

Scalera tips : parámetros por defecto y sobrecarga

Hoy toca un post breve pero espero que descubra algo nuevo para algunos. Hoy hablaremos de los problemas que puede dar la sobrecarga al tener parámetros por defecto.

Vamos a empezar definiendo los conceptos básicos:

– Sobrecarga: …really?
– Parámetros por defecto: en Scala, es posible definir parámetros por defecto en los métodos. Estos parámetros nos permitirán obviar algunos parámetros en su llamada. Vamos a ver un ejemplo:

def getUri(host: String = "localhost", port: Int = 8080): String = "$host:$port"

getUri("127.0.0.1", 8081) //"127.0.0.1:8081"
getUri(port = 8081) //"localhost:8081"
getUri("127.0.0.1") //"127.0.0.1:8080"
getUri() //"localhost:8080"

Genial…y ¿cuál es el problema?

Vamos a crear el siguiente trait:

trait A {
  def a(a: Int, p: Boolean = false)
  def a(b: Boolean = false)
}

y ahora compilamos y…

giphy2

error: in trait A, multiple overloaded alternatives of method a define default arguments.

Sin embargo, si quitamos el parámetro por defecto de la primera función:

trait A {
  def a(a: Int, p: Boolean)
  def a(b: Boolean = false)
}

y compilamos …..

giphy1

todo va bien.

¿Cuál es la causa de este kilombo?

El problema es que el compilador utilizará los parámetros por defecto para generar los nombres de unas funciones auxiliares que utilizará para tratar los método con valores por defecto (no olvidemos que seguimos en la JVM). Veamos el ejemplo que aparece en la documentación:

def f(a: Int = 1, b: String)
// generates a method: def f$default$1 = 1
f(b = "3")
// transformed to: f(b = "3", a = f$default$1)

Como se puede observar, se genera una nueva función y se sobreescribe la llamada al método original haciendo uso de dicha función.

Después de esto, podemos decir que en el primer caso de nuestro kilombo, las dos funciones auxiliares creadas tendrán este nombre:

def a$default$1 = false

y por ello se producirá un conflicto de nombres.

En el segundo caso, al no existir un parámetro por defecto en el primer método, solo se generará una nueva función y, por tanto, no se producirá ningún conflicto.

Y hasta aquí el post 🙂