kworker: Library to use Workers on Multiplatform Kotlin 1.3

01 Jan 2019

I have been a couple of days validating an idea I had to make a portable Workers implementation that works on all Kotlin targets.

You can find the preview here: https://github.com/korlibs/kworker

I provide a couple of mechanisms to use it. But in both cases you have to put a code in your main entry point to avoid executing other stuff in the workers.

The RAW API:

suspend fun main(args: Array<String>) {
    WorkerFork({
        while (true) {
            val message = recv()
            println("IN WORKER ${getWorkerId()} $message")
            send(WorkerMessage("reply", "demo"))
        }
    }, {
        val worker1 = Worker()
        val worker2 = Worker()
        println("Sending messages")
        worker1.send(WorkerMessage("hello", "world"))
        worker1.send(WorkerMessage("hello", "world"))
        worker1.send(WorkerMessage("hello", "world"))

        val workerId = getWorkerId()
        println("IN MAIN $workerId ${worker1.recv()}")
        println("IN MAIN $workerId ${worker1.recv()}")
        println("IN MAIN $workerId ${worker1.recv()}")
        worker2.send(WorkerMessage("hello", "world"))
        worker2.send(WorkerMessage("hello", "world"))
        println("IN MAIN $workerId ${worker2.recv()}")
        println("IN MAIN $workerId ${worker2.recv()}")
    })
}

The Jobs API:

fun main(args: Array<String>) {
    JobsMainSync({
        registerXorJob()
    }, {
        val jobs = Jobs()
        val array = byteArrayOf(1, 2, 3)
        println(array.toList())
        println(array.xor(jobs).toList())
    })
}

fun JobsRegister.registerXorJob() = register(DemoXorJob)
suspend fun ByteArray.xor(jobs: Jobs): ByteArray = jobs.execute(DemoXorJob, arrayOf(this))[0] as ByteArray

object DemoXorJob : JobDescriptor {
    override suspend fun execute(args: Array<Any?>): Array<Any?> {
        val arg = args[0] as ByteArray
        val out = ByteArray(arg.size)
        for (n in out.indices) out[n] = (arg[n].toInt() xor 0x77).toByte()
        return arrayOf(out)
    }
}

JVM

On the JVM one can just create a Thread and execute code there. But I wanted to go further and totally prevent sharing data issues, statics and stuff.

To do this, what you can do is to fork a ClassLoader sharing only the platform classes. Then relaunching the application with some switches.

fun forkClassLoader(): ClassLoader {
    val originalClassLoader = Thread.currentThread().contextClassLoader
    return object : ClassLoader() {
        private  val classes = LinkedHashMap<String, Class<*>>()
        override fun loadClass(name: String): Class<*> {
            return classes[name] ?: try {
                classes[name] = ClassLoader.getPlatformClassLoader().loadClass(name)
                classes[name]!!
            } catch (e: ClassNotFoundException) {
                val bytes = originalClassLoader.getResourceAsStream(name.replace('.', '/') + ".class").readBytes()
                classes[name] = defineClass(name, bytes, 0, bytes.size)
                classes[name]!!
            }
        }
    }
}

JS

On JS we have two cases: Node.JS and Browser. They have different APIs for this.

Node.JS:

It has a cluster module, and a more recent but unstable worker_threads.

To play safe I used the cluster module. What it does is to fork the process, and provide an IPC mechanism so you can communicate between the main process and the workers. The same code and entry point is executed, so you have to determine if you are in a worker with an if and execute the worker code or the main code.

Browser

On the browser there is the Worker API. The worker API works by providing a JS file/url to load and establishes an IPC mechanism for communication. Data is copied so no sharing. (SharedWorker is not widely supported yet and not implemented for the first PoC).

I’m using URL.createObjectURL + Blob to load an entry point. And then I check for require.js and a data-main to determine what I have to load in the Worker. If not available I simply load all the scripts with src in the worker.

Native

On Native I was not able to actually get it working due to several Mutating issues. But I provided a fallback that works on the same Thread. I will update it once I discover how to do it properly.