zio streams + scala.js: example of designing a streaming db API

zio streams + scala.js: example of designing a streaming db API

nodejs has a streams API. However, if you are using zio with scala.js, you may want to use zstreams. To use zstreams, you will need to adapt your APIs. The content below was inspired by a recent zio stream Itamar training video: https://www.youtube.com/watch?v=XIIX2YSg7M0

While scala.js is used below on nodejs, as always, scala.js has such good conformance to scala that you cannot really tell its javascript except for the use of javascript APIs.

The example below adapts a javascript streaming API to scala.js and zio’s zstreams. It makes certain tradeoffs to reduce implementation time to a few minutes (the time it takes to write a blog).

Generally, since its javascript, we don’t need to worry about parallelism. Ideally, we would write something whose pattern could be moved over to the jvm or another platform where parallelism is possible.

Here are some good resources on zio streams:

mssql

The mssql database connector allows a nodejs application to access SQL Server. The mssql API can run off callbacks (CPS style) or js.Promise’s (monadic style). With ES, you can await the promises so the syntax is a bit less heavy. It’s tricky to ensure you have caught all errors with this type of API. Some js APIs that return a promise can also throw an exception vs returning a failed js.Promise since the function was incorrectly written. Ugh! I had to put in two different types of error management just for that one function call.

The mssql streaming interface is described at https://www.npmjs.com/package/mssql#streaming. Essentially, it looks like:

const  sql  =  require('mssql')
sql.connect(config,  err  =>  {
	 // ... error checks
	 const  request  =  new  sql.Request()
	 request.stream  =  true  // You can set streaming differently for each request
	 
	 // ******** this returns a js.Promise[] that completes when "done" encountered *********
	 request.query('select * from verylargetable')  // or request.execute(procedure)
	 
	 request.on('recordset',  columns  =>  {
		 // Emitted once for each recordset in a query
	 })
	 request.on('row',  row  =>  {
		 // Emitted for each row in a recordset
	 })
	 request.on('error',  err  =>  {
		 // May be emitted multiple times
	 })
	 request.on('done',  result  =>  {
		 // Always emitted as the last one
	 })
	})
sql.on('error',  err  =>  {
 // ... error handler
})

The streaming API calls a function based on the event type. We need to adapt this library to scala.js which I’ve done in a library available at https://github.com/aappddeevv/scalajs-reaction. We now need to use that facade to integrate into zio zstreams.

“Error” and “recordset” can be issued multiple times because you can return more than one recordset per “request”, for example, by including two select queries in “query” or calling query multiple times after the prior query has completed to run queries in succession.

Each select query could return a differently shaped row. You would normally expect that errors and doneness are rare events and the “row” event is the most frequent event to manage. An “error” and “done” can be signalled while you are still processing rows if your downstream processing is slower than upstream insertion so you could miss processing a few data elements if your stream is interrupted as soon as it gets a done or error event. We need to let the downstream process as much as is available.

Some events are not documented in the mssql docs page. For example, info and rollback events can also be emitted.

A consumer can pause streaming using an imperative style:

let rowsToProcess =  [];

request.on('row',  row  =>  {
	 rowsToProcess.push(row);
	 if  (rowsToProcess.length  >=  15)  {
	 request.pause();
	 processRows();
	 }
});
request.on('done',  ()  =>  {
	processRows();
});
function  processRows()  {
	 // process rows
	 rowsToProcess =  [];
	 request.resume();
}

Backpressure appears to be handled imperatively (don’t forget to call resume()!) and you can only get 1 row at a time–you can’t ask for n rows at a time. But appearences are deceiving. I address backpressure in a later section.

We are going to make a big assumption around the API in order to avoid building a full-blown state machine inside a zstream. We will assume that any query issued returns the same value type regardless of recordset, perhaps a complex sum-type, and event types are limited to those listed above excluding “info” and “rollback”. Streaming will be supported for queries or executing stored procedures. Again, to keep it simple, our API won’t enforce these restrictions in the types themselves since they cannot be enforced in any practical, ergonomic way.

zstreams

zio zstreams are effects that emit multiple values instead of a single value.

From the javascript example above its clear we need to use some type of queue on the scala side to buffer results as it should be expected that downstream processing could be slower than upstream fetching. The pause API helps with a single-threaded flavor of chunking and caching, both of which zio queues support. If we have a zio queue, we can easily create a zio stream. However, its not so obvious how to handle different types of events.

  • If events can happen prior to previous results being fully procesed, do we want to stop (interrupt) all processing immediately if there is an error? Or should we let processing on data that is available continue as much as possible?
  • Should both domain data and “control instructions” (such as “error”) be put into a single queue with some type of uber sum-type–a type that can represent all three different types of events/values?
  • If we allocate any resources processing this stream, how do we free them?
  • How might we get access to the “schema” from the recordset event type prior to processing the data?

These are important design questions and greatly affect the exposed API. We’ll just pick one way to address these questions in this blog but you may use a different design when delaing with another API.

First, lets think about linearizing the control and data information which is essentially what the mssql API does. Let the following symbols represent:

  • A or B = Domain data
  • D = Done control (data on processing of all recordsets, e.g. recordcounts)
  • R = Recordset control (schema information)
  • E = Error control

Let’s say our steam is 5 elements (“A”) long. We could encounter streams like:

Single recordset streams could look like:

  • R A A A A A D
  • R A A A A A E
  • R E
  • E (very common if your sql string is ill-formed)

The net result of this analysis is that an error is not necessarily a function of the data itself stream. Although there are some errors that can occur during a datastream, some errors occur before the data starts streams. The mssql error type, MSSQLError, is the supertype of all mssql errors.

Multi-recordset streams could look like:

  • R A A A A A D
  • R A A A A A E
  • R A A A A A D R B B B B B D
  • R A A A A A D E

It’s a bit tougher when we issue multiple queries and obtain multiple recordsets back.

Practically speaking, when desgining a solution with zio streams, or really any streaming API, you need to specify:

  1. How to handle errors and the different places where errors can occur.
    • There could be errors from the “source” itself, in this case mssql and there could be other internal errors, which should be considered defects.
  2. How to stop the stream once upstream inputs complete either explicitly or implicitly.
  3. How to handle backpressure when the downstream says “gimme a break!”

The tricky part about using FP streams like ZIO streams or fs2 is that a stream is general purpose data structure that we can map or flatMap into. For example using FP thinking, we start a stream and flatMap into it to produce another stream with the domain value or a failed Stream, etc. It might seem strange to immediately flatMap into a new stream as the outer stream only produces one value but that’s the type of mental model you need.

In FP land, a stream type is just a value. Similar to an Option or a List we can flatmap into a stream type efficiently. In this way, a complete stream can be created by adding together or layering individual little streams and the library takes care of flattening layerd streams and outputing the data as a sequence of values. Thinking of streams this way is very different than the way java or other libraries present streams but it allows libraries to create designs that are much easier to reason about.

In our case, we also want to process as many prior results as possible so we need to inline “control” and “data” similar to what is done with zio streams and treat the mssql stream as an ordered, linear sequence of “tokens” some of which are data elements and some of which are “control” instructions. Our API implementation can use this to “imperatively” capture specific elements, like the schema or the results object, and provide an easy-to-use API.

Part of being “easy to use” is that the stream emitting data only contains our domain objects and not the “control” instructions. Otherwise, the stream consumers must perform their own “async/imperative” programming to sort out the details. Our approach pulls a downstream “match” statement inside the API. The consumer only needs to worry about data and how to compose streams to use various mssql artifacts if they so want. For example, for the API consumer, if a stream fails, it is because the zio ZStream fails through it’s E (error channel) not because they received an Either with a left error as a data element.

Quick Version

Using the simplifying assumptions above, we can add some, not perfect, protection around a mssql Request. The only key issues is that since we need to adapt Request, we have to move up the mssql object chain and start with a ConnectionPool.

While ZStream.effectAsync might be usfeul here (not available in RC21 for scala.js but its coming), we will roll our own queue and event handling. Also, effectAsync has extra machinery in it to handle the abliity to push chunks into the stream from the stream source. We can’t do that, we only get 1 element at a time. So let’s keep it simple and perhaps reduce processing overhead slightly.

@js.native 
trait BaseResult extends js.Object {
  val recordCounts: js.Array[Int] = js.native
}

@js.native
trait Schema extends js.Object {
 // schema details for a recordset
}

/** Control sequencing of when streaming starts using this Request helper. */
case class ProducerQuery(val build: Seq[Request => Request]=Seq(identity)) { self =>
  def input(name: String, t: SQLType, v: Any) = ProducerQuery(build :+ (r => r.input(name, t, v)))
  def sql(qstr: String) = new ProducerQueryCompleted(build, qstr)
}

class ProducerQueryCompleted(val build: Seq[Request => Request], val qstr: String) { 
  def apply(request: Request) = Function.chain(build)(request).query(qstr)
}

/** mssql results promise and stream */
class DataInfo[S,R,E,A](
 val schema: Promise[Nothing, S]
 val results: Promise[Nothing, R], 
 val stream: Stream[E, A]
)

case class StreamingOptions(
  qsize: Int = 1024,
)

case class Producer(cp: ConnectionPool,options: StreamingOptions = StreamingOptions()) {
  type R = BaseResult
  type E = js.Error
  type S = Schema
  def query[A](f: ProducerQuery => ProducerQueryCompleted): Stream[Throwable, DataInfo[S,R,E,A]] = {
    type D = Either[Option[E], A]
    for {
      rts <- ZStream.fromEffect(IO.runtime)
      q <- ZStream.managed(Queue.bounded[D](options.qsize).toManaged(_.shutdown))
      pq <- ZStream.succeed(f(ProducerQuery(Seq(identity))))
      // install https://github.com/oleg-py/better-monadic-for to allow tuple on the left
      (results, schema) <- ZStream.fromEffect(Promise.make[Nothing, R] &&& Promise.make[Nothing, S])
      request <- ZStream {
        val request = cp.request()
        request.stream = true
        // these request calls can all throw
        request.on("error", (e: E) => rts.unsafeRunAsync_(q.offer(Left(Option(e)))))
        request.on("row", (a: A) => rts.unsafeRunAsync_(q.offer(Right(a))))
        request.on("done", (d: R) => rts.unsafeRunAsync_(results.succeed(d) *> q.offer(Left(None))))
        request.on("recordset", (s: S) => rts.unsafeRunAsync_(schema.succeed(s)))
        request
      }
      stream = ZStream.fromQueue(q)
      .takeWhile(_ != Left(None))
      .flatMap{
        case Right(a) => Stream.succeed(a)
        case Left(Some(e)) => Stream.fail(e)
        case _ => Stream.empty
      }
      // start streaming by setting the query
      _ <- ZStream(pq(request))
    } yield new DataInfo[S,R,E,A](schema, results, stream)
  }
}

Use like:

for {
   dbPool <- dbPoolInit
   producer = Producer(dbPool)
   _ <- producer.query[js.Dynamic](
     _.sql("select LastName, FirstName from contacts")
   ).flatMap(_.stream)
   .foreach(r => debug("db row: %O", r)) // debug is an effect
   .mapError(jserr => new RuntimeException(jserr.message))
}

The outer stream and managed resources can throw Throwable and reflects standard scala exception handling. The actual data stream can fail with a js.Error since its mssql generating the data and its API throws subclasses of js.Error exceptions. That means our for-comprehension, in this case, has a js.Error error channel. Depending on your final effect’s error channel, you may have to adapt it via .mapError. When you flip between error channel types (can’t wait for dotty to help with this), you tend to see alot of explicit error handling–which of course is part of the point of static typing.

Multiple Recordsets

We’ll have to save returning multiple typed recordsets for another day. At the moment our element type could be a sum-type although that’s less than ideal and adds extra processing overhead to each element to split them out. Since there is nothing that can be done to force a query string to contain only 1 statement, its probably not worth the effort of trying to handle multiple recordsets. One could write an entire SQL DSL to force only statement, but that’s too much work for the benefit. Let’s leave this issue for another day.

Back Pressure Woes In a Pull Model

Let’s look at the original backpressue mechanism again:

let rowsToProcess =  [];

request.on('row',  row  =>  {
	 rowsToProcess.push(row);
	 if  (rowsToProcess.length  >=  15)  {
	 request.pause();
	 processRows();
	 }
});
request.on('done',  ()  =>  {
	processRows();
});
function  processRows()  {
	 // process rows
	 rowsToProcess =  [];
	 request.resume();
}

The original Request.pause() and Request.resume() methods appear to address back pressure. Calling pause pauses lower level driver processing. Of course, this is javascript so if a particular item requires significant synchronous processing, e.g. calculating a sum or processing a string, the driver is not running anyway.

If an item is being processed asynchronously, then effectively we are doing cooperative concurrency as the driver will eventually get a processing slice. zio’s ZQueue.offer returns a boolean indicating whether the item was added or not. A “dropping” queue may drop new items if the queue is full and return false. If we use a bounded queue, the fiber is “suspended” if your queue is full but the offer eventually succeeds so it always returns true.

But how does “offer” always return true with a bounded queue? The “bounded queue” packpressure “strategy” has an unbounded queue inside it that holds an order list of “putters.” Hence, our implementation above has 2 queues. 1 queue that roughly helps smooth through downstream processing and another “putter” queue to handle puts from the source. The reality is that we cannot use “pause” and “resume” based on information from the “offer” method using a bounded queue.

Let’s think about using a “dropping” queue internally, call pause when something would be dropped and keep trying to add the item using some schedule. When the offer succeeds, resume would be called. We would need to use a “schedule” to retry the offer. This also requires the API consumer to guarantee that calling pause means that no more “row” events will be issued.

But it is not clear that in a parallel environment that this can be guaranteed. Hence, this method will not work across both the jvm and js platforms without alot more work that may be wasted engineering time.

Why does this seem so hard?

If we look at the above example code again we see that there is a “proccesRows()” call. In other words, “processRows()” is “pushing” the information that there is data to process. But zio is a “pull” stream model. That’s also why ZStream.effectAsync will also not help us too much as it will have the same problem.

Let’s try something!

To keep the problem easy and to allow us to control memory usage, which is a real problem for large datasets, we’ll just check the queue size when we make an offer on an unbounded queue, pause if our queue is at capacity and resume if its not. We’ll use a Ref to keep track of pause state. That means the internal, unbounded backpressure queue inside the unbounded queue be used very little. That’s good enough for what we need. We’ll add a little capacity check so that this approach is not always burning CPU cycles at the edge of the queue size.

Something like the following implements a psuedo- “stop-and-wait” protocol:

  request.on("row", (a: A) => rts.unsafeRunAsync_(
    for {
      qsize <- q.size
      qcap = q.capacity
      p <- paused.get
      _ <- if(!p && qsize > 0.9*qcap) UIO.succeed(request.pause()) *> 
	      paused.set(true) *> q.offer(Right(a))
      else if(p) UIO.succeed(request.resume()) *> paused.set(false) *> 
	      q.offer(Right(a))
      else q.offer(Right(a))
    } yield()
  ))

However, this won’t work either because if we pause when a “row” event is issued, then we never get another row event to “resume”. Since we cannot attach an “event handler” to see when data is pulled from the queue, we have no mechanism to signal to resume source processing other than to schedule a “resume” on the clock–not a very precise, resource-smart, back-pressury way to do things. The mssql API is not helpful in a pull-based model.

In fact, even if we set the request to stream and then unsubscribe/resubscribe to the row event, we cannot do backpressure without dropping or introducing a schedule (which then is highly data-specific).

These issues remind me of something on the reactive manifesto site:

https://www.reactive-streams.org/:

The benefits of asynchronous processing would be 
negated if the backpressure signals were synchronous

Ouch!

Maybe I have this all wrong.

So while there are some things we could do, it’s not clear its worth the engineering cost.

Life’s not perfect!

Comments

Popular posts from this blog

zio layers and framework integration

typescript and react types

typescript ambient declarations, global.d.ts, lib and typeRoot.md