- Notifications
You must be signed in to change notification settings - Fork5
Abortable async function primitives and combinators
License
deeplay-io/abort-controller-x
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Abortable async function primitives and combinators.
yarn add abort-controller-xSeeAbortController MDN page.AbortController isavailable in NodeJSsince 15.0.0, NodeJS 14.17+ requires the--experimental-abortcontrollerflag. Apolyfill is availablefor older NodeJS versions and browsers.
We defineabortable function as a function that obeys following rules:
- It must accept
AbortSignalin its arguments. - It must return a
Promise. - It must add
abortevent listener to theAbortSignal. Once theAbortSignalis aborted, thereturnedPromisemust reject withAbortErroreither immediately, or afterdoing any async cleanup. It's also possible to reject with other errors thathappen during cleanup. - Once the returned
Promiseis fulfilled or rejected, it must removeabortevent listener.
An example ofabortable function is the standardfetch function.
This library provides a way to build complex abortable functions using standardasync/await syntax, without the burden of manually managingabortevent listeners. You can reuse a singleAbortSignal between many operationsinside a parent function:
/** * Make requests repeatedly with a delay between consecutive requests */asyncfunctionmakeRequests(signal:AbortSignal):Promise<never>{while(true){awaitfetch('...',{signal});awaitdelay(signal,1000);}}constabortController=newAbortController();makeRequests(abortController.signal).catch(catchAbortError);process.on('SIGTERM',()=>{abortController.abort();});
The above example can be rewritten in a more ergonomic way usingrunhelper.
Usually you should only createAbortController somewhere on the top level, andin regular code useasync/await and passAbortSignal to abortablefunctions provided by this library or custom ones composed of other abortablefunctions.
abort-controller-x-rxjs— Abortable helpers for RxJS.abort-controller-x-reactive-store— Reactive store primitive and helpers.
functionall<T>(signal:AbortSignal,executor:(innerSignal:AbortSignal)=>readonlyPromiseLike<T>[],):Promise<T[]>;
Abortable version ofPromise.all.
Creates new innerAbortSignal and passes it toexecutor. That signal isaborted whensignal is aborted or any of the promises returned fromexecutorare rejected.
Returns a promise that fulfills with an array of results when all of thepromises returned fromexecutor fulfill, rejects when any of the promisesreturned fromexecutor are rejected, and rejects withAbortError whensignal is aborted.
The promises returned fromexecutor must be abortable, i.e. onceinnerSignalis aborted, they must reject withAbortError either immediately, or afterdoing any async cleanup.
Example:
const[result1,result2]=awaitall(signal,signal=>[makeRequest(signal,params1),makeRequest(signal,params2),]);
functionrace<T>(signal:AbortSignal,executor:(innerSignal:AbortSignal)=>readonlyPromiseLike<T>[],):Promise<T>;
Abortable version ofPromise.race.
Creates new innerAbortSignal and passes it toexecutor. That signal isaborted whensignal is aborted or any of the promises returned fromexecutorare fulfilled or rejected.
Returns a promise that fulfills or rejects when any of the promises returnedfromexecutor are fulfilled or rejected, and rejects withAbortError whensignal is aborted.
The promises returned fromexecutor must be abortable, i.e. onceinnerSignalis aborted, they must reject withAbortError either immediately, or afterdoing any async cleanup.
Example:
constresult=awaitrace(signal,signal=>[delay(signal,1000).then(()=>({status:'timeout'})),makeRequest(signal,params).then(value=>({status:'success', value})),]);if(result.status==='timeout'){// request timed out}else{constresponse=result.value;}
functiondelay(signal:AbortSignal,dueTime:number|Date):Promise<void>;
Return a promise that resolves after delay and rejects withAbortError oncesignal is aborted.
The delay time is specified as aDate object or as an integer denotingmilliseconds to wait.
Example:
// Make a request repeatedly with a delay between consecutive requestswhile(true){awaitmakeRequest(signal,params);awaitdelay(signal,1000);}
Example:
// Make a request repeatedly with a fixed intervalimport{addMilliseconds}from'date-fns';letdate=newDate();while(true){awaitmakeRequest(signal,params);date=addMilliseconds(date,1000);awaitdelay(signal,date);}
functionwaitForEvent<T>(signal:AbortSignal,target:EventTargetLike<T>,eventName:string,options?:EventListenerOptions,):Promise<T>;
Returns a promise that fulfills when an event of specific type is emitted fromgiven event target and rejects withAbortError oncesignal is aborted.
Example:
// Create a WebSocket and wait for connectionconstwebSocket=newWebSocket(url);constopenEvent=awaitrace(signal,signal=>[waitForEvent<WebSocketEventMap['open']>(signal,webSocket,'open'),waitForEvent<WebSocketEventMap['close']>(signal,webSocket,'close').then(event=>{thrownewError(`Failed to connect to${url}:${event.reason}`);},),]);
functionforever(signal:AbortSignal):Promise<never>;
Return a promise that never fulfills and only rejects withAbortError oncesignal is aborted.
functionspawn<T>(signal:AbortSignal,fn:(signal:AbortSignal,effects:SpawnEffects)=>Promise<T>,):Promise<T>;typeSpawnEffects={defer(fn:()=>void|Promise<void>):void;fork<T>(fn:(signal:AbortSignal)=>Promise<T>):ForkTask<T>;};typeForkTask<T>={abort():void;join():Promise<T>;};
Run an abortable function withfork anddefer effects attached to it.
spawn allows to write Go-style coroutines.
SpawnEffects.deferSchedules a function to run after spawned function finishes.
Deferred functions run serially in last-in-first-out order.
Promise returned from
spawnresolves or rejects only after all deferredfunctions finish.SpawnEffects.forkExecutes an abortable function in background.
If a forked function throws an exception, spawned function and other forks areaborted and promise returned from
spawnrejects with that exception.When spawned function finishes, all forks are aborted.
ForkTask.abortAbort a forked function.
ForkTask.joinReturns a promise returned from a forked function.
Example:
// Connect to a database, then start a server, then block until abort.// On abort, gracefully shutdown the server, and once done, disconnect// from the database.spawn(signal,async(signal,{defer})=>{constdb=awaitconnectToDb();defer(async()=>{awaitdb.close();});constserver=awaitstartServer(db);defer(async()=>{awaitserver.close();});awaitforever(signal);});
Example:
// Connect to a database, then start an infinite polling loop.// On abort, disconnect from the database.spawn(signal,async(signal,{defer})=>{constdb=awaitconnectToDb();defer(async()=>{awaitdb.close();});while(true){awaitpoll(signal,db);awaitdelay(signal,5000);}});
Example:
// Acquire a lock and execute a function.// Extend the lock while the function is running.// Once the function finishes or the signal is aborted, stop extending// the lock and release it.importRedlock= require('redlock');constlockTtl=30_000;functionwithLock<T>(signal:AbortSignal,redlock:Redlock,key:string,fn:(signal:AbortSignal)=>Promise<T>,):Promise<T>{returnspawn(signal,async(signal,{fork, defer})=>{constlock=awaitredlock.lock(key,lockTtl);defer(()=>lock.unlock());fork(asyncsignal=>{while(true){awaitdelay(signal,lockTtl/10);awaitlock.extend(lockTtl);}});returnawaitfn(signal);});}constredlock=newRedlock([redis],{retryCount:-1,});awaitwithLock(signal,redlock,'the-lock-key',asyncsignal=>{// ...});
functionretry<T>(signal:AbortSignal,fn:(signal:AbortSignal,attempt:number,reset:()=>void)=>Promise<T>,options?:RetryOptions,):Promise<T>;typeRetryOptions={baseMs?:number;maxDelayMs?:number;maxAttempts?:number;onError?:(error:unknown,attempt:number,delayMs:number)=>void;};
Retry a function with exponential backoff.
fnA function that will be called and retried in case of error. It receives:
signalAbortSignalthat is aborted when the signal passed toretryis aborted.attemptAttempt number starting with 0.
resetFunction that sets attempt number to -1 so that the next attempt will bemade without delay.
RetryOptions.baseMsStarting delay before first retry attempt in milliseconds.
Defaults to 1000.
Example: if
baseMsis 100, then retries will be attempted in 100ms, 200ms,400ms etc (not counting jitter).RetryOptions.maxDelayMsMaximum delay between attempts in milliseconds.
Defaults to 30 seconds.
Example: if
baseMsis 1000 andmaxDelayMsis 3000, then retries will beattempted in 1000ms, 2000ms, 3000ms, 3000ms etc (not counting jitter).RetryOptions.maxAttemptsMaximum for the total number of attempts.
Defaults to
Infinity.RetryOptions.onErrorCalled after each failed attempt before setting delay timer.
Rethrow error from this callback to prevent further retries.
functionproactiveRetry<T>(signal:AbortSignal,fn:(signal:AbortSignal,attempt:number)=>Promise<T>,options?:ProactiveRetryOptions,):Promise<T>;typeProactiveRetryOptions={baseMs?:number;maxAttempts?:number;onError?:(error:unknown,attempt:number)=>void;};
Proactively retry a function with exponential backoff.
Also known as hedging.
The function will be called multiple times in parallel until it succeeds, inwhich case all the other calls will be aborted.
fnA function that will be called multiple times in parallel until it succeeds.It receives:
signalAbortSignalthat is aborted when the signal passed toretryis aborted,or when the function succeeds.attemptAttempt number starting with 0.
ProactiveRetryOptions.baseMsBase delay between attempts in milliseconds.
Defaults to 1000.
Example: if
baseMsis 100, then retries will be attempted in 100ms, 200ms,400ms etc (not counting jitter).ProactiveRetryOptions.maxAttemptsMaximum for the total number of attempts.
Defaults to
Infinity.ProactiveRetryOptions.onErrorCalled after each failed attempt.
Rethrow error from this callback to prevent further retries.
functionexecute<T>(signal:AbortSignal,executor:(resolve:(value:T)=>void,reject:(reason?:any)=>void,)=>()=>void|PromiseLike<void>,):Promise<T>;
Similar tonew Promise(executor), but allows executor to return abort callbackthat is called oncesignal is aborted.
Returned promise rejects withAbortError oncesignal is aborted.
Callback can return a promise, e.g. for doing any async cleanup. In this case,the promise returned fromexecute rejects withAbortError after that promisefulfills.
functionabortable<T>(signal:AbortSignal,promise:PromiseLike<T>):Promise<T>;
Wrap a promise to reject withAbortError oncesignal is aborted.
Useful to wrap non-abortable promises. Note that underlying process will NOT beaborted.
functionrun(fn:(signal:AbortSignal)=>Promise<void>):()=>Promise<void>;
Invokes an abortable function with implicitly createdAbortSignal.
Returns a function that aborts that signal and waits until passed functionfinishes.
Any error other thanAbortError thrown from passed function will result inunhandled promise rejection.
Example:
conststop=run(asyncsignal=>{try{while(true){awaitdelay(signal,1000);console.log('tick');}}finally{awaitdoCleanup();}});// abort and wait until cleanup is doneawaitstop();
This function is also useful with ReactuseEffect hook:
// make requests periodically while the component is mounteduseEffect(()=>run(asyncsignal=>{while(true){awaitmakeRequest(signal);awaitdelay(signal,1000);}}),[],);
classAbortErrorextendsError
Thrown when an abortable function was aborted.
Warning: do not useinstanceof with this class. Instead, useisAbortError function.
functionisAbortError(error:unknown):boolean;
Checks whether givenerror is anAbortError.
functionthrowIfAborted(signal:AbortSignal):void;
Ifsignal is aborted, throwsAbortError. Otherwise does nothing.
functionrethrowAbortError(error:unknown):void;
Iferror isAbortError, throws it. Otherwise does nothing.
Useful fortry/catch blocks around abortable code:
try{awaitsomethingAbortable(signal);}catch(err){rethrowAbortError(err);// do normal error handling}
functioncatchAbortError(error:unknown):void;
Iferror isAbortError, does nothing. Otherwise throws it.
Useful for invoking top-level abortable functions:
somethingAbortable(signal).catch(catchAbortError);
WithoutcatchAbortError, aborting would result in unhandled promise rejection.
About
Abortable async function primitives and combinators
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Contributors4
Uh oh!
There was an error while loading.Please reload this page.