- Notifications
You must be signed in to change notification settings - Fork1
Memory optimized promise blocking queue with concurrency control
License
PruvoNet/promise-blocking-queue
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Memory optimized promise blocking queue with concurrency control, specially designed to handle large data sets that mustbe consumed using streams.
Useful for rate-limiting async (or sync) operations that consume large data sets.For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
If we useBluebird.map() for example, we are forced to load all the data in memory,before being able to consume it - Out Of Memory Exception is right around the corner.
If we usep-queue (by the amazingsindresorhus)for example, we can utilize streams to avoid memory bloat, but we have no (easy) way to controlthe stream flow without hitting that Out Of Memory Exception.
The solution - a blocking queue that returns a promise that will be resolved when the added item gains an available slot in thequeue, thus, allowing us to pause the stream consumption, until there is areal need to consume the next item - keeping usmemory smart while maintaining concurrency level of data handling.
npm install promise-blocking-queue
Let's assume we have a very large (a couple of GBs) file calledusers.json which contains a long list of users we want to add to our DB.
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle2 concurrent DB insert operations.
We can achieve a short scalable solution like so:
import*asJSONStreamfrom'JSONStream';import*asfsfrom'fs';import*asesfrom'event-stream';import*assleepfrom'sleep-promise';import{BlockingQueue}from'promise-blocking-queue';constqueue=newBlockingQueue({concurrency:2});lethandled=0;letfailed=0;letawaitDrain:Promise<void>|undefined;constreadStream=fs.createReadStream('./users.json',{flags:'r',encoding:'utf-8'});constjsonReadStream=JSONStream.parse('*');constjsonWriteStream=JSONStream.stringify();constwriteStream=fs.createWriteStream('./results.json');constaddUserToDB=async(user)=>{try{console.log(`adding${user.username}`);// Simulate long running taskawaitsleep((handled+1)*100);console.log(`added${user.username} #${++handled}`);constwritePaused=!jsonWriteStream.write(user.username);if(writePaused&&!awaitDrain){// Down stream asked to pause the writes for nowawaitDrain=newPromise((resolve)=>{jsonWriteStream.once('drain',resolve);});}}catch(err){console.log(`failed${++failed}`,err);}};consthandleUser=async(user)=>{// Wait until the down stream is ready to receive more data without increasing the memory footprintif(awaitDrain){awaitawaitDrain;awaitDrain=undefined;}returnqueue.enqueue(addUserToDB,user).enqueuePromise;};// Do not use async!constmapper=(user,cb)=>{console.log(`streamed${user.username}`);handleUser(user).then(()=>{cb();});// Pause the read stream until we are ready to handle more datareturnfalse;};constonReadEnd=()=>{console.log('done read streaming');// If nothing was written, idle event will not be firedif(queue.pendingCount===0&&queue.activeCount===0){jsonWriteStream.end();}else{// Wait until all work is donequeue.on('idle',()=>{jsonWriteStream.end();});}};constonWriteEnd=()=>{console.log(`done processing -${handled} handled,${failed} failed`);process.exit(0);};jsonWriteStream.pipe(writeStream).on('error',(err)=>{console.log('error wrtie streaming',err);process.exit(1);}).on('end',onWriteEnd).on('finish',onWriteEnd);readStream.pipe(jsonReadStream).pipe(es.map(mapper)).on('data',()=>{// Do nothing}).on('error',(err)=>{console.log('error read streaming',err);process.exit(1);}).on('finish',onReadEnd).on('end',onReadEnd);
Ifusers.json is like:
[ {"username":"a" }, {"username":"b" }, {"username":"c" }, {"username":"d" }]Output will be:
streamed aadding astreamed badding bstreamed c // c now waitsin line to start and streaming is pauseduntilthenadded a#1adding c // c only gets handled after a isdonestreamed d // d only get streamed after c has a spotin the queueadded b#2adding d // d only gets handled after b isdonedoneread streamingadded c#3added d#4done processing - 4 handled, 0 failed
results.json will be:
["a","b","c","d"]
Returns a newqueue instance, which is anEventEmitter subclass.
Type:object
Type:number
Default:Infinity
Minimum:1
Concurrency limit.
BlockingQueue instance.
Adds a sync or async task to the queue
Type:object
Type:Promise<void>
A promise that will be resolved when the queue has an available slot to run the task.
Used to realize that it is a good time to add another task to the queue.
Type:Promise<T>
A promise that will be resolved with the result offn.
Type:boolean
Indicates if the task has already started to run
Type:Function
Promise/Value returning function.
Type:any[]
The arguments to pass to the function
The number of promises that are currently running.
The number of promises that are waiting to run.
Emitted when the queue becomes empty.Useful if, for example, you add additional items at a later time.
Emitted when the queue becomes empty, and all promises have completed:queue.activeCount === 0 && queue.pendingCount === 0.
The difference withempty is thatidle guarantees that all work from the queue has finished.empty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
The library is based onp-limit andp-queue (by the amazingsindresorhus)
Promise Blocking Queue supports Node 6 LTS and higher.
All contributions are happily welcomed!
Please make all pull requests to themaster branch from your fork and ensure tests pass locally.
About
Memory optimized promise blocking queue with concurrency control
Topics
Resources
License
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.