scalaz streams and how to think about streams

Update: I've created an electronic book to collect together scalaz-stream user notes: My scalaz-streams User Notes

Working on data streaming (many problems can be cast as data streams) is hard. Controlling synchronous and asynchronous behaviors easily and simply requires frameworks and code that is often is uncommon to most programmers, and hence, its hard to write the code while still retaining simplicity.
Scalaz Streams (labelled sstreams in this article) help you manage complexity by providing a few fundamental abstractions. But I found the abstractions hard to use at first because I was not use to thinking in a model that sstreams uses.
sstreams casts the problem as a state machine. There are 3 states and a "driver" that iterates through the states. Each state carries with it enough information to move to the next state. Each state is a "one step process" and so all states derive from the Process trait.
The level of abstraction is pretty high which means that the framework should be able to applied to a highly diverse set of issues. I have used spring integration. I found that framework hard to use as well because I was not used to an event-driven approach. sstreams ups the level of abstraction. If you are not a strong functional programmer, it will take time to learn because you will be learning different programming approaches and integrating them. For example, you'll be combining asynchronous/concurrent programming, event-driven programming and state machine thinking all at the same time. That's alot of fluent knowledge you need to program using sstreams or even spring-integration.

Working with sstreams

You can create a small sbt file and open a console to play with sstreams and understand what it is doing.
Here's my build.sbt file:
scalaVersion := "2.10.4"

resolvers += "Scalaz Bintray Repo" at ""

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % "7.0.1",
  "org.typelevel" %% "scodec-bits" % "1.0.0",
  "org.scalaz" %% "scalaz-concurrent" % "7.0.1",
  "" %% "scalaz-stream" % "0.3.1"
Then run sbt console to start a console. Then import the objects into your namespace so you can use them easily.
scala> import scalaz._, Scalaz._, stream._, Process._, process1._, scalaz.concurrent._
import scalaz._
import Scalaz._
import stream._
import Process._
import process1._
import scalaz.concurrent._

Now your ready to use the objects.


Process is the superclass of the 3 core states: Await, Halt and Emit. A process needs to know the environment that the external world exists in as well as the type of object that is passing through the states, for example, string values from a list of customer names or a list of account balances from a monthly customer summary at a bank.
scala> Process.emit(10)
res2:[Nothing,Int] = Emit(List(10),Halt($End$))

scala> Process.emitSeq(Seq(10,11,12))
res11:[Nothing,Int] = Emit(List(10, 11, 12),Halt($End$))

scala> halt
res12:[Nothing,Nothing] = Halt($End$)
The simplest thing we can do is emit a series of values coming from an in-memory sequence. emitindicates a state that would emit a single value and emitSeq is state that describes emitting a sequence of values, one at a time. emit* functions come from the companion object from Process and we see that the "state" created is the "Emit" state. This should be interpreted as: the stream is ready to emit a single value and the next state that should be set in the state machine should be the "Halt" state. Hence, once a driver is applied, it will emit a single value then go into the halt state and the streaming will be complete. We also create a "halt" state which indicates that streaming should be halted. The emit functions also assume, unless you provide the information otherwise, that the next step after the emit is processed by a state machine driver, is a halt and hence its the end of the stream.
However, we see that the F parameter (Process takes 2 types, an environment/protocol called F and the stream value, such as an Int in this example) is Nothing. By itself, the process instance (an instance of a subclass of the trait Process) cannot be run to the next step because a "driver" would not be able to know which protocol to use to run the process.
Also, if we did use a driver to take this current process to the next step, where would the output values go? How would we see them? In other words, where would the "emitted" values go? If there is not a place to "collect" the values or "log" the values, we would not see any output, just the movement to the next step which is halt. We should also ask the question about how we should manage concurrency? What if the sequence of values was really coming from a network connection and value were only available infrequently without any known schedule? These are the types of concerns that require us to compose this emit process instance with other processes to achieve a real-world effect.
First let's address the need to "collect" the emitted values. Lets say that we have a process that emits a value or sequence of values. If we want to "collect" the values, we would want to create another process that collects those values. However, to do that, we would need to have a data structure, like a vector, that waits for the input (an await state) from the upstream process and then appends (adds) the new emitted value to its internal vector data structure then returns to the await state again. In other words, we would need a general data structure that contains a binary associate operator called add/plus and that can be initialized as an empty data structure (called the zero value) so it has a default value once the process is used for the first time. A data structure that has a zero and a binary associate operation like "add" is called a monoid. So we would need a Process that can act on a monoid. A vector is a monoid so a Process instance with a a monoid inside would allow us to collect the values. So for one model, say logging the outputs from running the processes, we could use the monoid concept as the collection data structure. However, we have not specified within what environment is that monoid being accessed? Is the use of the monoid async or sync?
Let's dig deeper on the environment/protocol F[_] type parameter to Process.
If a value should be added to a vector in the "collect" process, should it be added asynchronously, synchronously, or should it use some other "protocol" or "environment?" So for the "collect" process, its pretty clear that we need to specify an environment as well as the monoid to collect values into. What's needed for the environment/protocol?
If we wanted to add values using the monoid asynchronously, then we need that F environment be some type of task/thread that performs a function inside that task, for example, a task that runs that adds a single value using the monoid binary operation "add." In other words, we need to have a function operating in a task environment. In functional programming, this is referred to the "map" function. "Map" lifts a function into an environment and runs it. This allows the programmer to not care about the environment itself, but just that the function you want to run (the "add" operation using the monoid) can be lifted using the map function. The use of the "map" concept is the protocol. And since we are talking about "map" and we most likely need to create an empty environment at times as well as perform other functions, the environment should be a monad. sstreams assumes that those environments are monads. So the environment could be a scalaz "Task" or some other monad that represents your environment--your protocol--for performing operations. The operations could be adding the emitted value to the monoid or it could be "getting" the value from the sequence list and emitted it asynchronously.
So now think about emitting values like in the example above. If we want to emit values, those values need to be emitted in a certain way say, asynchronously. So the environment could be something like a task, the task accesses the initial sequence you pass into the "emitSeq" function and rips off a value on each iteration through the process, and offers up that value as the next value to be emitted. To promote the "ripped" value from the sequence into the environment, it needs a "bind" function. In functional programming language, a "bind/unit" function takes a value and puts it into the environment correctly. Just like map lifts a function to operate on the values in an environment, a bind function lifts a value into the environment. The need for a monad as the environment parameter makes alot of sense.
So the F environment needs to be specified even if you are just emitting simple values, like the list of integers in the example, you need an environment and that environment should be a monad.

Running Your Process

Our example above can now be modified a bit. Let's create an emit process that uses a Task environment. And then use a Process function, such as runLog that takes a process instance and creates some machinery to create a monoid to collect the output.
scala> val x: Process[Task, Int] = Process.emit(10)
x:[scalaz.concurrent.Task,Int] = Emit(List(10),Halt($End$))

scala> x.runLog
res29: scalaz.concurrent.Task[scala.collection.immutable.IndexedSeq[Int]] = scalaz.concurrent.Task@19a12f24

res30: scala.collection.immutable.IndexedSeq[Int] = Vector(10)
We define x to be a process with a Task environment. All we needed to do was specify the environment type. Since Task is a monad, the framework can create a default task instance as needed to perform the emitting. The "runLog" function takes the initial process and forces the emit to collect into a monoid, in this case, the default monoid in the framework will be a IndexedSeq (a Vector in scala).
The set of "run*" functions like a "runLog" return an object that is not a process. It returns the environment/protocol that you have specified and that monad needs to have a way for you to execute the process. In this case, runLog returns a scalaz task. A scalaz task is run using "run" which is what the last command does.
So the nature of having 2 "run*" functions in a row makes sense. The first one (or something similar) sets up the up the process to allow the processing to produce a side-effect of some sort, such as filling up a vector with your emited sequence values and the last run is what your protocol needs to run, in this case a scalaz Task with a standard run method.
As a side note, since the Process takes a F[_] parameter, you can change your environment/protocol using flatMap so as you build up your set of processes to create your flow, you can have your value-types change as well as the environments.

You can read the entire post here: github

Popular posts from this blog

graphql (facebook), falcor (netflix) and odata and ...

React, Redux, Recompose and some simple steps to remove "some" boilerplate and improve reuse

Using wye and tee with scalaz-stream