Concurrent Unions
18th May 2021
For the last few years I had been constantly looking for a way to consume async values from multiple producers without needing to correlate how these values were produced.
Throughout this journey I came across multiple ways to utilise these async values in JavaScript
Pushing one by one
I had initially implemented a class that provided a push(value) function, each individual call to push resulted in a new iteration for consumers.
This seemed okay, but because of the new iteration per pushI could never seem to group sets of iterations together from a consumers point of view.
I tried to implement a union based on this code, but was unsuccessful
Batching by microtask
After refocusing on other things I found myself back at implementing a similar kind of class, this time I was looking to watch what changes happened to a sync source over some set of time.
I found that I was to collect each value in an array across the microtask then after the callback for the next microtask was received I would take a copy of that array, reset the working array, and yield that new array.
This provided me a clean and consistent way to batch by microtask. I hadn't yet realised that I could use this as well as part of a union.
At a later point after publishing the microtask-collector module I found that I could start many async producers and collect everything within the same batch and yield non-overlapping groups of results.
This however led to issues because now the execution of these async iterators are detached completely from the consumer, in some cases this may be whats wanted, but the implementation what I was looking for suited a just in time model better.
All was not lost though, because this led me to the pattern I now utilise extensively.
The union
We can boil down the core of the union problem to the solution code below, I will break down each concept bit by bit.
async function wait() { const promises = knownIterators.map(next); currentMicrotaskPromise = currentMicrotaskPromise || new Promise(microtask).then(() => NextMicrotask); const reason = await Promise.any<unknown>([ currentMicrotaskPromise, Promise.all(promises), errorOccurred.promise, iteratorAvailable.promise, ]); if (reason === NextMicrotask) { currentMicrotaskPromise = undefined; } if (!results.length) { await Promise.any([ Promise.any(promises), errorOccurred.promise, iteratorAvailable.promise, ]); } if (errors.length) { return []; } if (!results.length) { return []; } const cloned = [...results]; results = []; return cloned; }
Setup
First within our function we can assume that we already have our known in flight iterators, this is named knownIterators here.
By using const promises = knownIterators.map(next); we are mapping each iterator to either its next pending promise, or an existing promise if the iterator didn't resolve within the previous microtask.
The variable currentMicrotaskPromise is used to utilise the same microtask if all of our previous iterator promises resolved before the end of the microtask.
The first wait
Now that we have all the information required about our context we need to wait until something happens. For the first wait this is one of four processes.
- The microtask has finished
- All the promises have finished
- We found an error occurred somewhere within our union
- A new iterator is known and should be added to our promise set
We use the microtask as a common target as it is the smallest shared async precision between all JavaScript code.
If we have at least one result produced from with promises, and we have no errors.length then we have the next set of updated values, we clone these before returning and reset our working results array
const cloned = [...results]; results = []; return cloned;
The second wait
If we didn't have at least one result in the previous step, then before we move on to returning our results we want to wait until:
- At least one promise has finished
- We found an error occurred somewhere within our union
- A new iterator is known and should be added to our promise set
Now, we are in a complete holding state until at least one thing changes within our context, this allows tasks to take longer than a microtask, for example the usage of setTimeout
Yielding a result
Now that we have a set of values that represented the next state, externally from the above wait function we can freely store a copy of the latest state for all iterators, and update it with every iteration, yielding to the consumers the newly snapshot state.
In the end the implementation uses
const latestSnapshot = knownIterators.map(read); if (onlyDone) { continue; } if (!valuesDone) { yield latestSnapshot.map(result => result?.value); }
Externally to the consumer this means:
- The snapshot follows the same order as the initial input
- A source iteration result may appear more than once
- Only snapshots with new results will yield
The second point shows as stuttering steps to a consumer. If the source values are unique the consumer can freely ignore values it already knows, allowing for this implementation to freely provide maximum consistency.
Consuming
The resulting union can be consumed using for await
Say we had two generator functions producing a value from an array at different rates, defined by the below code:
async function *timedGenerator(values, interval) { const clone = values.slice(); while(clone.length) { yield clone.shift(); await new Promise(resolve => setTimeout(resolve, interval)); } } for await(const [left, right] of union([ timedGenerator([1, 2, 3], 500), timedGenerator([5, 6, 7], 1000) ])) { console.log({ left, right }); }
By the end of the above code { "left": 3, "right": 7 } will be the last console log.
Summary
This union function provided a way for async and sync iterators (and inherently generators) to be synchronised across many sources, allowing a consumer to focus on the set as a whole.
After minor performance testing I found this code to perform well under pressure, utilising it as a core pillar of the benchmarked code I was able to push a single Node.js process to utilise over three billion promises, resolving each before safely exiting.
A live demo can be found on codesandbox, or alternatively checkout the the source code or the npm package
This code produced consistent promise and microtask counts across many executions, showing it allows for deterministic just in time execution within JavaScript.