Scala Jet Streams *

Sometimes, it’s usual that obtaining a method’s result which is, in fact, a collection; this collection has a huge size and besides, the process of computing it is really harsh or slow. On the other side, if we’re not going to process all its composing elements or if the collection is not finite (numerical series), we cannot use conventional structures to hold these functionality.

For example, let’s supose we’re trying to implement a near-real-time weather system . We can picture the query process like the typical infinite loop:


case class Weather(
  tempCelsius: Float, 
  humidity: Double)

def checkWeather(): Weather = {
  /* Some request to a web service 
   * or access some weather station device.
   * We'll mock it.
   */
   import scala.util.Random
   Weather(
     Random.nextFloat() * 100 % 40,
     Random.nextDouble() % 1)
}

type WeatherHandler[T] = Weather => T

val handler: WeatherHandler[Unit] = println

while(true){
  handler.apply(checkWeather())
}

Or we can assume that weather data is just an infinite collection of precise discrete data elements.


val weather: TraversableOnce[Weather] = ???

for (value <- weather) handler.apply(value)

This way, we could also change the nature of that collection by using a simple map for getting only Celsius temperatures, for example:

val extractTemperature: WeatherHandler[Float] = 
  _.tempCelsius

val onlyTemperatures: TraversableOnce[Float] = 
  weather.map(extractTemperature)

Let’s see which other ways we can use to achieve such behavior.

Iterators

A first approach could be implementing Iterator[+A]:

object weather extends Iterator[Weather]{

  def hasNext() = true

  def next(): Weather = checkWeather()

}

Given that our iterator will not certainly finish, the value of hasNext() is always true. Every time we invoke method next(), we will obtain a new value Weather-typed (this may involve invoking some web service or a request to a weather measure device).

Easy peasy. However, we can think about another kind of Scala collections, known as Streams…

Streams

We can think of Streams as sorted collections whose values are lazy evaluated. For example, if we describe the numerical serie that define integer numbers we will have that:

val integers: Stream[Int] = {

  def loop(v: Int): Stream[Int] = 
    v #:: loop(v + 1)

  loop(0)

}

As you can see, method #:: is used for adding an initial element to an existing Stream. If we run:

integers.head
integers.take(2).toArray

we will get both first stream’s element and two first elements.

We can invoke from this Stream every single method that apply to Scala collections (Traversable):

integers
  .filter(_ % 2 == 0)
  .take(5)
  .toArray //Array(0, 2, 4, 6, 8)

integers
  .takeWhile(_ < 5)
  .toArray //Array(0, 1, 2, 3, 4)

Look out! Until we invoke method toArray, streams that result of applying transformations to initial stream(filter, take, …) are not evaluated. Only we retrieving concrete values, transformation methods will be applied.

Now we have it, let’s use it with our weather service:

val weather: Stream[Weather] = {

  def loop(): Stream[Weather] = 
    checkWeather() #:: loop

  loop

}

We will get a Stream that will have an inner method loop() that defines the way of evaluating the Stream (invoking method checkWeather()).

And that’s all folks 🙂
Fear us, Al Roker!

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s