vertically scaling nodejs: dotty + graaljs + zio

vertically scaling nodejs: dotty + graaljs + zio

You can horizontally or vertically scale. Horizontal scaling usually involves running multiple separate process either on the same compute server or on different servers. It is relatively complex to scale horizontally if you need to share state. Engineers often scale horizontally and decouple different data “domains” into micro-services, both of which can also be more complex to engineer. Cloud compute services and frameworks like Kubernetes help you scale horizontally but at, potentially, a large increase in complexity.

You can also scale vertically. Multi-threaded programming is harder than single threaded programming. Python and nodejs are loved by developers because they make the compute enviroment simpler, and hence, the programming simpler. nodejs has asynchronous programming support via ES “await” syntax or js Promises, both of which are harder to use than straight procedural code. But, most developers have found asynchronous programming to be within easy grasp.

If we are running in a simple enviroment, like nodejs, we could use workers to help improve performance, however, they have some limitations that limit their usefullness. Communication between the main nodejs thread and wokers is handled with shared-nothing design using “messages.” There is nothing wrong with that but if we wish to vertically scale nodejs processing, we can do a bit better.

We can use zio running on the jvm to scale a nodejs process quite easily using graal’s nodejs replacement, graaljs.

Design

The design is:

  • nodejs setup:
    • Create a jvm queue to communicate between different threads and the nodejs thread.
      • Pass the queue to each jvm interop call or set it into the polyglot bindings.
    • Create a “worker” from the main nodejs thread. The worker listens to a jvm queue and blocks waiting for messages.
    • When a message comes on the jvm queue, the worker posts the message to the main nodejs thread.
  • Call the jvm function through the graaljs interop mechanism.
  • Start a separate zio runner thread for running the zio effect via the zio runtime.
  • If access to nodejs variables is needed, zio fork into an ExecutionContext that posts a message to a jvm queue.
  • Send the result of the interop call, computed in zio, to nodejs via a js Promise.
  • Nodejs awaits the result and continues processing.
  • nodejs exit:
    • Ensure that you stop the worker through termination or unref’ing.

This approach allows you to run a nodejs process easily, such as a web server, and scale the web server vertically using the jvm easily and quickly with little engineering effort. You don’t have to create a micro-service or rewrite the server.

Nodejs

Nodejs needs a jvm queue and a worker.

/** Returns objects related to the queue and worker. */
function  startChannel() {
	// Blocks the thread that .take is called on, so run in a worked thread.
	const  javaToJSQueue = new  java.util.concurrent.LinkedBlockingDeque();
	const { Worker } = require('worker_threads');
	// code running in the worker
	const  worker = new  Worker(`
		const { workerData, parentPort } = require('worker_threads');
		while (true) {
			// Blocks worker thread taking 1 element. We could enhance to take N
			// callbacks at a time.
			const runnable = workerData.take()
			// Send to listener on main thread to run the runnable.
			// Since its "message" it is inserted into the main event loop.
			// You can change the message type to add different "run" protocols
			// specific to your application e.g. { type: STOP } or { type: "RUN", callback }.
			parentPort.postMessage(runnable);
		}
	`, { eval:  true, workerData:  javaToJSQueue });
	// callbacks run on the main nodejs thread
	worker.on('message', (callback) => {
		try {
			callback();
		} catch (e) {
			console.error(`Error running callback from JVM`);
			console.error(e)
		}
	});
	Polyglot.export("javaToJSQueue", javaToJSQueue)
	Polyglot.export("javaToJSQueueWorker", worker)
	return {
		queue:  javaToJSQueue,
		worker
	}
}

const  channel = startChannel()

// main program logic...

Now we can call our jvm functions. Since we used dotty and wrote functions in the open at the package level, we have some funky names to navigate through to call the function:

const  arg1 = { a:  1 }
const  arg2 = "arg2"
const  arg3 = 10

const  result2 = Packages.example6.example6$package.run2(arg3, arg1)
console.log("Result2: ", result2)

const  result3P = Packages.example6.example6$package.run3(arg3, arg1, channel.queue)
// we would only unref at the end of the program but we do it here properly sequenced on just this result.
result3P
	.then(result  =>  console.log("Result3: ", result))
	.catch(err  =>  console.log("error", err))
	.then(() =>  channel.worker.unref())

dotty

On the jvm side we can use dotty and zio.

Let’s first create an effect that access a nodejs variables but because its running on a separate thread fails by design. graaljs interop automatically converts js values if you specify their type and the conversion can happen successfully, otherwise, you can keep the value as a “passthrough” using Value. Since a converted value is a jvm value, it can be freely accessed. Primitive values are easy to pass this way.

We also show the main zio runtime running on the nodejs thread! That can’t be good!

def run2(primitive: Int, value: Value) =
	println("run2")
	println(s"args before effect, on same thread: $primitive, $value")
	val  program: ZIO[Console, Throwable, Unit] =
	for {
		_ <- putStrLn("starting effect")
		// fork will cause this effect to potentially run on a different thread
		fiber <- IO.effect {
			println(s"Primitive value: $primitive")
			// accessing value here will cause a thread access exception
			println(s"Value: $value")
		}.fork
		_ <- fiber.join
	} yield ()
	// Running on the same thread as nodejs!
	Runtime.default.unsafeRunSync(program) match {
		case  Exit.Success(r) => println("never called")
		case  Exit.Failure(e) =>
		// IllegalStateOption about multi-threaded access
		val  err = e.failureOption
		println(s"Failed: ${err}")
	}
	// the main effect value is not returned to the caller!
	"completed"

Ouch!

run2
args before effect, on same thread: 10, {a: 1}
starting effect
Primitive value: 10
Failed: Some(java.lang.IllegalStateException: 
Multi threaded access requested by thread 
Thread[zio-default-async-1-2097989776,5,zio-default-async]
but is not allowed for language(s) js.)
Result2:  completed

Now let’s add some infrasturcture:

/** Convert scala Future to a Value which is a js Promise.
* We could drop the using Context parameter and call `Context.getCurrent()` directly.
* However, we keep the given parameters in order to force the recognition of the
* need for these two parameters. The execution context should execute on the same
* thread the Context was created on.
*/
def [A](f: Future[A]).toJSPromise(using ec: ExecutionContext, ctx: Context) =
	val  global = ctx.getBindings("js")
	val  p = global.getMember("Promise")
	def  executor(resolve: Value, reject: Value) =
		f.onComplete:
			case scala.util.Success(v) => resolve.execute(ctx.asValue(v))
			// err is throwable, which polyglot can translate automatically
			case scala.util.Failure(e) => reject.execute(e)
	val  n = p.newInstance(executor: PromiseExecutor)
	n

/** Should only be called with the node JS Thread. Typically this is
* thread you are on upon entering a JVM function called from nodejs.
*
* @parameter queue Queue used by a js worker to run callbacks in the nodejs thread.
* @parameter nodeJSThread The nodejs thread. Uses the current thread by default.
*/
def  makeEC(
	queue: java.util.concurrent.LinkedBlockingDeque[Runnable],
	nodeJSThread: Thread = Thread.currentThread
) =
	import java.util.concurrent._
	ExecutionContext.fromExecutor(
	new  Executor:
		def  execute(command: Runnable) =
			if  Thread.currentThread == nodeJSThread then command.run()
			else queue.add(command)
	)

Now we can do something more proper and scalable:

/** Succeeds as the fork forks onto the nodejs thread. */
def  run3(
	primitive: Int,
	value: Value,
	queue: java.util.concurrent.LinkedBlockingDeque[Runnable]
) =
	println("run3")
	implicit  val  ec2 = makeEC(queue)
	implicit  val  context = Context.getCurrent

	println(s"args before effect, on same thread: $primitive, $value")
	queue.add(new  Runnable { def  run(): Unit = println("Run from JVM but on nodejs thread.") })
	// effect is an immutable value and can be created on any thread
	val  effect: ZIO[Console, Throwable, Unit] =
		for {
			_ <- putStrLn("starting effect")
			fiber <- IO.effect {
				println(s"Value accessed inside zio effect but forked on nodejs thread: $value")
			}.forkOn(ec2)
			_ <- fiber.join
		} yield ()
		val  program = effect

	// We can get the value back to nodejs different ways, one way
	// is to use scala concurrent machinery.
	val  scalaPromise = scala.concurrent.Promise[String]()

	// We don't want the main zio processing to be on the nodejs thread and
	// blocking it, so run inside another thread. We could also create
	// a separate java Executor and submit it that way, etc. Or even run
	// it inside a scala Future :-).
	Thread{() =>
		queue.add(new  Runnable {
			def  run(): Unit = println("Called from inside the new 'main' zio runner thread.")
		})
		Runtime.default.unsafeRunSync(program) match {
			case  Exit.Success(r) => scalaPromise.success("completed")
			case  Exit.Failure(e) =>
				val  err = e.failureOption
				println(s"Failed: ${err}")
				scalaPromise.failure(new  RuntimeException("failed"))
		}
		// needed for "java" type inference
		()
	}.start()

	// and convert the scala Promise "future" to a js Promise
	scalaPromise.future.toJSPromise

Yeah, it works!

run3
args before effect, on same thread: 10, {a: 1}
starting effect
Run from JVM but on nodejs thread.
Called from inside the new 'main' zio runner thread.
Value accessed inside zio effect but forked on nodejs thread: {a: 1}
Result3:  completed

Comments

Popular posts from this blog

attributes with react and typescript.md

quick note on scala.js, react hooks, monix, auth

zio environment and modules pattern: zio, scala.js, react, query management