use async to manage rust program inside web-worker

I would really appreciate any discussion that I can learn how to code better in rust/wasm or coding in general. So if you have some suggestions, please point them out. Thx.

Into

It always fascinated me to explore how rust/wasm and webworker can be used together. For people with short memory span like me, it’s always nice to be able to understand the context of asynchronous task marked after async/await.

After some experiments, I’ve started to think that it might not be a bad idea to offload blocking/network tasks to webworker. By offload I mean to instantiate the rust wasm module inside the webworker script.

It would also be nicer, if we can do some task management inside the wasm module, so that we can let the main thread decide whether the task should keep going, resume if necessary and spawning new tasks.

Let’s make this happen.

Goal

Since not all workflows are born equal, before we start, let’s create some limitations so we can focus on a smaller scope.

  1. Create a running service inside the rust/wasm module, instantiated in the worker script.
  2. Being able to suspend/resume a task from main thread.
  3. Being able to cancel a task from main thread.

Implementation

This example is build with the following crate:

  1. Nightly rust
  2. futures-preview(0.3) with features “async-await”, “nightly” enabled
  3. wasm-bindgen-futures with feature “futures_0_3” enabled
  4. web_sys with features “Worker”, “WorkerGlobalScope” enabled

Send msg to rust/wasm

After instantiating our rust program inside the worker script. The first thing we need to consider, is how to pass messages from the main thread to the rust program. Typically for js programs, messages are handled in the onmesage block, so we can start from there.

This can be done by creating a receiving method in the rust program.

use wasm_bindgen::JsValue;

#[wasm_bindgen]
pub fn recv_msg(msg: JsValue) {
  // handle msg here.
}

Import module and listen to incoming message

/// this one differs with different loader.
import { module } from "Cargo.toml";

self.onmessage = e=> {
  module.recv_msg(e.data);
}

Create a non blocking rust service.

Although we’re now able to send msg to wasm/rust, what we actually want to achieve initially is to let the rust program works on its own.

This can be done by creating a future, that embed a never ending loop, and yield every so often, so it won’t block the whole thread. This approach is very familiar to what python programmer would do to:

while True:
  await asyncio.sleep(0)
  # ...

This is somewhat needed to create a task, because currently condvar or similar primitives are not implemented in rust/wasm, so we can resume a future externally. However, to make a task reactive, we can use a inner flag to let it be runnable, else skipping every so often so it won’t block other executions.

pub async fn looper() {
  loop {
    timer(0).await;
    /// ...
  }
}

The timer here, is actually converting a setTimeout promise to rust future, so that we can await it inside our rust program.

use js_sys::Promise;
use wasm_bindgen_futures::futures_0_3::JsFuture;

/// regular timer, which setTimeout is created by windows.
pub async fn timer(ms: i32) -> Result<(), JsValue> {

    // create a js promise using js_sys::Promise
    let promise = Promise::new(&mut |yes, _| {
        let win = window().unwrap();
        win.set_timeout_with_callback_and_timeout_and_arguments_0(&yes, ms)
            .unwrap();
    });

    // create a rust future from the promise
    let js_fut = JsFuture::from(promise);

    // now it can be awaited like other futures, nice!
    js_fut.await?;
    Ok(())
}

/// worker timer, which setTimeout is created by WorkerGlobalScope
/// This is necessary because worker has no access to windows.
pub async fn worker_timer(ms: i32) -> Result<(), JsValue> {
    let promise = Promise::new(&mut |yes, _| {
        let global = js_sys::global();
        let scope = global.dyn_into::<WorkerGlobalScope>().unwrap();
        scope
            .set_timeout_with_callback_and_timeout_and_arguments_0(&yes, ms)
            .unwrap();
    });
    let js_fut = JsFuture::from(promise);
    js_fut.await?;
    Ok(())
}

So now we can instantiate our rust program, it will not block, and the rest of js can run.

pub fn spawn_service() {
  spawn_local(looper());
}

Receiving messages won’t be blocked by rust.

import { module } from "Cargo.toml"

// spawning the task as a promise, so we can proceed with the following
module.spawn_service();
self.onmessage = e=> {
  // ...
}

Non blocking producer

Dummy loop do no one any good, let’s replace it with something better. We can improve this by passing the worker instance to the rust program directly, so rust can decide whether it want to dial back to the main thread via post_message.

pub async timely_call(worker: web_sys::Worker) {
  let worker = worker.clone();
  loop {
    timer(1000).await.unwrap();
    worker.post_message(&JsValue::from("anything convertable")).unwrap();
  }
}

pub fn spawn_service(worker: web_sys::Worker) {
  let worker = worker.clone();
  spawn_local(timely_call(worker));
}

Now the rust program can send message to the main thread periodically.

Can sending/receiving be non-blocking too?

The previous part, we enable the program to send without blocking, but it lacks the ability to receive message. We can use futures channel to enhance the situation where all messages are stored in a queue.

#[wasm_bindgen]
pub struct TaskHolder {
    input_rx: Rc<Mutex<mpsc::Receiver<JsValue>>>,
    input_tx: mpsc::Sender<JsValue>,
    output_rx: Rc<Mutex<mpsc::Receiver<JsValue>>>,
    output_tx: mpsc::Sender<JsValue>,
}

#[wasm_bindgen]
pub fn run() -> TaskHolder {

  // make sure you impl that method under TaskHolder
  TaskHolder::new()
}

If we want to let js issue commands, we have to make our rust program accessible as a js object where it can mutate rust data via function call.

// let's make method call accessible under both rust/js
#[wasm_bindgen]
impl TaskHolder {
  pub fn catch_msg(&self, msg: JsValue) {
    let mut sender = self.input_tx.clone();
    spawn_local(async move {
        sender.send(msg).await.unwrap();
    });
  }
}

And now we can reach for the method inside js too.

// now messages are queue in the struct channel, hence nonblocking
let holder = module.run()
self.addEventListener("message", e => {
  holder.catch_msg(e.data)
})

This only sends data to the queue, our rust program haven’t started processing or producing any results just yet.

Implementing the consumer/producer loop

use futures:{sink::SinkExt, stream::StreamExt};
// this is needed, so we can use next for stream and send to sink as an awaitable future.

impl TaskHolder {
  pub fn init_service(&self, worker: web_sys::Worker) {

    // This future will listen to all incoming msg in the incomin queue.
    let input_rx = self.input_rx.clone();
    let output_tx = self.output_rx.clone();
    let input_loop = async move {
        loop {
            // only lock it when we want to listen to it.
            let mut unlocked_rx = input_rx.lock().await;
            if let Some(msg) = unlocked_rx.next().await {
                info!("{:?}", &msg);
                output_tx.send(msg).unwrap();
            }
        }
    };

    // This future will produce all results to the output queue
    let output_rx = self.output_rx.clone();
    let task_loop = async move {
        loop {
            let mut unlocked_rx = output_rx.lock().await;
            if let Some(msg) = unlocked_rx.next().await {
                info!("{:?}", &msg);
                // do something about the data
                worker.send(&msg).unwrap();
            }

        }
    };

    // run them concurrently.
    let combined = futures::future::join(input_loop, output_loop);

    // run the combined future
    spawn_local(async {
        combined.await;
    });
  }
}

To this point, it becomes clear, that using queue internally, we are able to create a running service, where input/output are unblocking.

Offload tasks to another inner self controlled loop.

There are many tools to extend this. For example, we can create another task look, that intercept the incoming message, and create some sub-tasks, and eventually submit the results to the outgoing loop.

#[wasm_bindgen]
impl TaskHolder {
    pub fn new() -> Self {
        let (input_tx, input_rx) = mpsc::channel::<JsValue>(100);
        let (output_tx, output_rx) = mpsc::channel::<JsValue>(100);
        TaskHolder {
            running: Rc::new(Mutex::new(true)),
            input_rx: Rc::new(Mutex::new(input_rx)),
            input_tx,
            output_rx: Rc::new(Mutex::new(output_rx)),
            output_tx,
        }
    }

    pub fn catch_msg(&self, msg: JsValue) {
        let mut sender = self.input_tx.clone();
        spawn_local(async move {
            sender.send(msg).await.unwrap();
        });
    }

    pub fn init_holder(&self, worker: web_sys::Worker) {
        let fut = {
            let running = self.running.clone();
            let input_rx = self.input_rx.clone();
            let input_loop = async move {
                loop {
                    let mut unlocked_rx = input_rx.lock().await;
                    if let Some(msg) = unlocked_rx.next().await {
                        info!("{:?}", &msg);
                        let mut running = running.lock().await;
                        *running = !*running;
                    }
                }
            };

            let worker = worker.clone();
            let output_rx = self.output_rx.clone();
            let output_loop = async move {
                loop {
                    let mut unlocked_rx = output_rx.lock().await;
                    if let Some(msg) = unlocked_rx.next().await {
                        worker.post_message(&msg).unwrap();
                    }
                }
            };

            let running = self.running.clone();
            let mut output_tx = self.output_tx.clone();
            let task_loop = async move {
                loop {
                    utils::worker_timer(0).await.unwrap();
                    if *running.lock().await {
                        output_tx.send(JsValue::from("hello")).await.unwrap();
                        utils::worker_timer(1000).await.unwrap();
                    }
                }
            };
            let combined = futures::future::join3(input_loop, output_loop, task_loop);
            async {
                combined.await;
            }
        };

        spawn_local(fut);
    }
}

#[wasm_bindgen]
pub fn run() -> TaskHolder {

    set_panic_hook();
    console_log::init().unwrap();
    TaskHolder::new()
}

The above example, started another interval that will try to run if applicable, while the input/output are all non-blocking too. You can implement your own suspending/resuming mechanics by matching the incoming msg.

Potential use cases.

While haven’t tried any, I think you can use this setup to offload other tasks tool.

For example, when scrolling past a certain region of your page, send a fetch future(converted from fetch.promise) to get new posts, and cancel it by invoking the AbortHandle inside TaskHolder, if user no longer is interested in that direction.

Or maybe a pathfinder in a game, that will try to obtain new possible paths a user might go while the user is moving(hence sending msg to the worker), suspend finding if it reaches places where no interaction is possible.

Silly Demo

The demo below will let you pause the inner task I mentioned earlier, and be able to resume it again by flipping the running flag.

demo

Findings

what I lked so far

What I like the most about rust’s async story, is that we have so much more control over how task’s is handled. We can suspend, cancel by AbortHandle and chain them all together.

awaiting brighter future with synchronize primitives

A big caveat about the last example is that it requires your inner task to have small distance between await points. Because we are letting it be non-blocking by sleeping every so often.

While it may not cost much to yielding every so often, it would still be better if we can use CondVar or Events for task notification, so we can manage tasks with more confidence and submit less tasks to the js microtasks queue.

unexplored: Inter Process Communication

Right now, we are only controlling tasks in a single worker, so it won’t magically make your program go full multi-threaded. However it might be possible to orchestrate IPC within a central node, and let them communicate with each other as if we are spawning services inside different workers.

Surprisingly, there’s similarity between wasm-worker and python that I recognized in the process of learning this.

Python’s GIL prohibits speed gains by sending computational work across threads, one have to use multiprocessing and exchange data between processes.

WASM(wasm-unknown-unknown) right now lacks of threading. So if someone want to gain speed, using workers and exchange data between them without meddling the WASM blob directly, seems very similar to how python’s multiprocessing works.

When I was coding multiprocessing in python, the three biggest drawbacks would be 1. memory usage, 2. latency 3. data duplication. While haven’t benchmarked, I think the wasm version might be somehow limited to these kinds of IPC approach too.

I think the right direction would be try to use SharedArrayBuffer or Atomic instructions to improve the situation later, so we I share more data between workers without cloning them.

The learning

Since I’m by no means a multi-core master, or let along a rust veteran, while I believe it is possible to orchestrate tasks between workers in an clean manner, I’m yet to be educated.

This posts was created to accompany my journey of learning to use rust/wasm/async/worker all together, it’s never a professional tutorial.

comments powered by Disqus