- Notifications
You must be signed in to change notification settings - Fork70
A mutex for synchronizing async workflows in Javascript
License
DirtyHairy/async-mutex
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This package implements primitives for synchronizing asynchronous operations inJavascript.
The term "mutex" usually refers to a data structure used to synchronizeconcurrent processes running on different threads. For example, before accessinga non-threadsafe resource, a thread will lock the mutex. This is guaranteedto block the thread until no other thread holds a lock on the mutex and thusenforces exclusive access to the resource. Once the operation is complete, thethread releases the lock, allowing other threads to acquire a lock and access theresource.
While Javascript is strictly single-threaded, the asynchronous nature of itsexecution model allows for race conditions that require similar synchronizationprimitives. Consider for example a library communicating with a web worker thatneeds to exchange several subsequent messages with the worker in order to achievea task. As these messages are exchanged in an asynchronous manner, it is perfectlypossible that the library is called again during this process. Depending on theway state is handled during the async process, this will lead to race conditionsthat are hard to fix and even harder to track down.
This library solves the problem by applying the concept of mutexes to Javascript.Locking the mutex will return a promise that resolves once the mutex becomesavailable. Once the async process is complete (usually taking multiplespins of the event loop), a callback supplied to the caller should be called in orderto release the mutex, allowing the next scheduled worker to execute.
Imagine a situation where you need to control access to several instances ofa shared resource. For example, you might want to distribute images between severalworker processes that perform transformations, or you might want to create a webcrawler that performs a defined number of requests in parallel.
A semaphore is a data structure that is initialized with an arbitrary integer value and thatcan be locked multiple times.As long as the semaphore value is positive, locking it will return the current valueand the locking process will continue execution immediately; the semaphore willbe decremented upon locking. Releasing the lock will increment the semaphore again.
Once the semaphore has reached zero, the next process that attempts to acquire a lockwill be suspended until another process releases its lock and this increments the semaphoreagain.
This library provides a semaphore implementation for Javascript that is similar to themutex implementation described above.
You can install the library into your project via npm
npm install async-mutexThe library is written in TypeScript and will work in any environment thatsupports ES5, ES6 promises andArray.isArray. On ancient browsers,a shim can be used (e.g.core-js).No external typings are required for using this library withTypeScript (version >= 2).
Starting with Node 12.16 and 13.7, native ES6 style imports are supported.
WARNING: Node 13 versions < 13.2.0 fail to import this package correctly.Node 12 and earlier are fine, as are newer versions of Node 13.
CommonJS:
const{Mutex, Semaphore, withTimeout}=require('async-mutex');
ES6:
import{Mutex,Semaphore,withTimeout}from'async-mutex';
TypeScript:
import{Mutex,MutexInterface,Semaphore,SemaphoreInterface,withTimeout}from'async-mutex';
With the latest version of Node, native ES6 style imports are supported.
constmutex=newMutex();
Create a new mutex.
Promise style:
mutex.runExclusive(()=>{// ...}).then((result)=>{// ...});
async/await:
awaitmutex.runExclusive(async()=>{// ...});
runExclusive schedules the supplied callback to be run once the mutex is unlocked.The function may return a promise. Once the promise is resolved or rejected (or immediately afterexecution if an immediate value was returned),the mutex is released.runExclusive returns a promise that adopts the state of the function result.
The mutex is released and the result rejected if an exception occurs during executionof the callback.
Promise style:
mutex.acquire().then(function(release){// ...release();});
async/await:
constrelease=awaitmutex.acquire();try{// ...}finally{release();}
acquire returns an (ES6) promise that will resolve as soon as the mutex isavailable. The promise resolves with a functionrelease thatmust be called once the mutex should be released again. Therelease callbackis idempotent.
IMPORTANT: Failure to callrelease will hold the mutex locked and willlikely deadlock the application. Make sure to callrelease under all circumstancesand handle exceptions accordingly.
As an alternative to calling therelease callback returned byacquire, the mutexcan be released by callingrelease directly on it:
mutex.release();
mutex.isLocked();
Pending locks can be cancelled by callingcancel() on the mutex. This will rejectall pending locks withE_CANCELED:
Promise style:
import{E_CANCELED}from'async-mutex';mutex.runExclusive(()=>{// ...}).then(()=>{// ...}).catch(e=>{if(e===E_CANCELED){// ...}});
async/await:
import{E_CANCELED}from'async-mutex';try{awaitmutex.runExclusive(()=>{// ...});}catch(e){if(e===E_CANCELED){// ...}}
This works withacquire, too:ifacquire is used for locking, the resulting promise will reject withE_CANCELED.
The error that is thrown can be customized by passing a different error to theMutexconstructor:
constmutex=newMutex(newError('fancy custom error'));
Note that while all pending locks are cancelled, a currently held lock will not berevoked. In consequence, the mutex may not be available even aftercancel() has been called.
You can wait until the mutex is available without locking it by callingwaitForUnlock().This will return a promise that resolve once the mutex can be acquired again. This operationwill not lock the mutex, and there is no guarantee that the mutex will still be availableonce an async barrier has been encountered.
Promise style:
mutex.waitForUnlock().then(()=>{// ...});
Async/await:
awaitmutex.waitForUnlock();// ...
constsemaphore=newSemaphore(initialValue);
Creates a new semaphore.initialValue is an arbitrary integer that defines theinitial value of the semaphore.
Promise style:
semaphore.runExclusive(function(value){// ...}).then(function(result){// ...});
async/await:
awaitsemaphore.runExclusive(async(value)=>{// ...});
runExclusive schedules the supplied callback to be run once the semaphore is available.The callback will receive the current value of the semaphore as its argument.The function may return a promise. Once the promise is resolved or rejected (or immediately afterexecution if an immediate value was returned),the semaphore is released.runExclusive returns a promise that adopts the state of the function result.
The semaphore is released and the result rejected if an exception occurs during executionof the callback.
runExclusive accepts a first optional argumentweight. Specifying aweight will decrement thesemaphore by the specified value, and the callback will only be invoked once the semaphore'svalue greater or equal toweight.
runExclusive accepts a second optional argumentpriority. Specifying a greater value forprioritytells the scheduler to run this task before other tasks.priority can be any real number. The defaultis zero.
Promise style:
semaphore.acquire().then(function([value,release]){// ...release();});
async/await:
const[value,release]=awaitsemaphore.acquire();try{// ...}finally{release();}
acquire returns an (ES6) promise that will resolve as soon as the semaphore isavailable. The promise resolves to an array with thefirst entry being the current value of the semaphore, and the second value afunction that must be called to release the semaphore once the critical operationhas completed. Therelease callback is idempotent.
IMPORTANT: Failure to callrelease will hold the semaphore locked and willlikely deadlock the application. Make sure to callrelease under all circumstancesand handle exceptions accordingly.
acquire accepts a first optional argumentweight. Specifying aweight will decrement thesemaphore by the specified value, and the semaphore will only be acquired once itsvalue is greater or equal toweight.
acquire accepts a second optional argumentpriority. Specifying a greater value forprioritytells the scheduler to release the semaphore to the caller before other callers.priority can beany real number. The default is zero.
As an alternative to calling therelease callback returned byacquire, the semaphorecan be released by callingrelease directly on it:
semaphore.release();
release accepts an optional argumentweight and increments the semaphore accordingly.
IMPORTANT: Releasing a previously acquired semaphore with the releaser that wasreturned by acquire will automatically increment the semaphore by the correct weight. Ifyou release by calling the unscopedrelease you have to supply the correct weightyourself!
semaphore.getValue()
semaphore.isLocked();
The semaphore is considered to be locked if its value is either zero or negative.
The value of a semaphore can be set directly to a desired value. A positive value willcause the semaphore to schedule any pending waiters accordingly.
semaphore.setValue();
Pending locks can be cancelled by callingcancel() on the semaphore. This will rejectall pending locks withE_CANCELED:
Promise style:
import{E_CANCELED}from'async-mutex';semaphore.runExclusive(()=>{// ...}).then(()=>{// ...}).catch(e=>{if(e===E_CANCELED){// ...}});
async/await:
import{E_CANCELED}from'async-mutex';try{awaitsemaphore.runExclusive(()=>{// ...});}catch(e){if(e===E_CANCELED){// ...}}
This works withacquire, too:ifacquire is used for locking, the resulting promise will reject withE_CANCELED.
The error that is thrown can be customized by passing a different error to theSemaphoreconstructor:
constsemaphore=newSemaphore(2,newError('fancy custom error'));
Note that while all pending locks are cancelled, any currently held locks will not berevoked. In consequence, the semaphore may not be available even aftercancel() has been called.
You can wait until the semaphore is available without locking it by callingwaitForUnlock().This will return a promise that resolve once the semaphore can be acquired again. This operationwill not lock the semaphore, and there is no guarantee that the semaphore will still be availableonce an async barrier has been encountered.
Promise style:
semaphore.waitForUnlock().then(()=>{// ...});
Async/await:
awaitsemaphore.waitForUnlock();// ...
waitForUnlock accepts optional argumentsweight andpriority. The promise will resolve as soonas it is possible toacquire the semaphore with the given weight and priority. Scheduled tasks withthe greatestpriority values execute first.
Sometimes it is desirable to limit the time a program waits for a mutex orsemaphore to become available. ThewithTimeout decorator can be appliedto both semaphores and mutexes and changes the behavior ofacquire andrunExclusive accordingly.
import{withTimeout,E_TIMEOUT}from'async-mutex';constmutexWithTimeout=withTimeout(newMutex(),100);constsemaphoreWithTimeout=withTimeout(newSemaphore(5),100);
The API of the decorated mutex or semaphore is unchanged.
The second argument ofwithTimeout is the timeout in milliseconds. After thetimeout is exceeded, the promise returned byacquire andrunExclusive willreject withE_TIMEOUT. The latter will not run the provided callback in caseof an timeout.
The third argument ofwithTimeout is optional and can be used tocustomize the error with which the promise is rejected.
constmutexWithTimeout=withTimeout(newMutex(),100,newError('new fancy error'));constsemaphoreWithTimeout=withTimeout(newSemaphore(5),100,newError('new fancy error'));
A shortcut exists for the case where you do not want to wait for a lock tobe available at all. ThetryAcquire decorator can be applied to both mutexesand semaphores and changes the behavior ofacquire andrunExclusive toimmediately throwE_ALREADY_LOCKED if the mutex is not available.
Promise style:
import{tryAcquire,E_ALREADY_LOCKED}from'async-mutex';tryAcquire(semaphoreOrMutex).runExclusive(()=>{// ...}).then(()=>{// ...}).catch(e=>{if(e===E_ALREADY_LOCKED){// ...}});
async/await:
import{tryAcquire,E_ALREADY_LOCKED}from'async-mutex';try{awaittryAcquire(semaphoreOrMutex).runExclusive(()=>{// ...});}catch(e){if(e===E_ALREADY_LOCKED){// ...}}
Again, the error can be customized by providing a custom error as second argument totryAcquire.
tryAcquire(semaphoreOrMutex,newError('new fancy error')).runExclusive(()=>{// ...});
Feel free to use this library under the conditions of the MIT license.
About
A mutex for synchronizing async workflows in Javascript
Resources
License
Uh oh!
There was an error while loading.Please reload this page.