- Notifications
You must be signed in to change notification settings - Fork1
Delightful multicore programming in Node.js
License
streamich/node-multicore
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Parallel programming for Node.js made easy. Make any CommonJs or ESM modulerun in a thread pool.
- Global thread pool: designed to be a shared thread pool for all NPM packages.
- Custom threads pools: create a custom thread pool, if you need to.
- Instant start: starts with 0 threads and scales as the load increases.
- Instant module loading: load modules to the thread pool dynamically andinstantly—module is loaded in more threads as the module concurrencyincreases.
- Channels: each function invocation creates a bi-directional data channel,which allows you to stream data to a worker thread and back to the main thread.
- Pin work to a thread: ability to pin a module to a single thread. Say,your thread holds state—you can pin execution to a single thread, makingsubsequent method call hit the same thread.
- Single function module: quickly create a single function modules by justdefining the function in your code.
- Dynamic: pool size grows as the concurrency rises, dead threads are replaced by new ones.
- Fast: Node Multicore is as fast, see benchmarks below.
- Getting started
- The thread pool
- Modules
- Module exports
- Advanced concepts
- Multicore packages
- Demo / Benchmark
Install the package
npm install node-multicoreCreate amodule.ts that should be executed in the thread pool
import{WorkerFn}from'node-multicore';exportconstadd:WorkerFn<[number,number],number>=([a,b])=>{returna+b;};
Load your module from the main thread
import{resolve}from'path';import{pool}from'node-multicore';constspecifier=resolve(__dirname,'module');typeMethods=typeofimport('./module');constmath=pool.module(specifier).typed<Methods>();
Now call your methods from the main thread
constresult=awaitmath.exec('add',[1,2]);// 3
Thenode-multicore thread pool is designed to be a single shared global threadpool for all compute intensive NPM packages. You can import it as follows:
import{pool}from'node-multicore';
The global thread pool starts with 0 threads and scales up to the number of CPUsless 1, as the load increases. This is a design decision as this way the globalthread pool avoids overloading the CPU with threads. You can customize theminimum and maxium number of threads in the thread pool using theMC_MIN_THREAD_POOL_SIZEandMC_MAX_THREAD_POOL_SIZE environment variables.
The thread pool is designed to be a shared resource, so it is notrecommended to create your own pool. However, if you need to create a separateone, you can:
import{WorkerPool}from'node-multicore';constdedicatedPool=newWorkerPool({});// Instantiate the minimum number of threadsawaitdedicatedPool.init();
When creating a thread pool, you can pass the following options:
min— minimum number of threads in the pool, defaults to0orprocess.env.MC_MIN_THREAD_POOL_SIZEenvironment setting.max— maximum number of threads in the pool, defaults to the number ofCPUs less 1 orprocess.env.MC_MAX_THREAD_POOL_SIZEenvironment setting.trackUnmanagedFds— whether to track unmanaged file descriptors inworker threads and close them when the thread is terminated. Defaults tofalse.name— name of the thread pool, used for debugging purposes. Defaultstomulticore.resourceLimits— resource limits for worker threads.env— environment variables for worker threads. Defaults toprocess.env.
A unit of parallelism in JavaScript is a module. You can load a module in thethread pool and call its exported functions.
Similar to the thread pool, each module is designed to be "lazy" as well. Amodule is not loaded in any of the threads initially, but as the moduleconcurrency rises, the module is gradually loaded in more worker threads.
This is the preferred way to use this library, it will load a module by a global"specifier"pool.module(specifier) in the thread pool and you can call itsexported functions.
To begin, first create a module you want to be loaded in the thread pool, put itin amodule.ts file:
exportconstadd=([a,b])=>a+b;
Now add your module to the thread pool:
import{resolve}from'path';constspecifier=resolve(__dirname,'module');constmodule=pool.module(specifier);
To add TypeScript support, you can use thetyped() method:
consttyped=module.typed<typeofimport('./module')>();
This will create a type-safe wrapper, which knows the types of the exportedfunctions. You can now call the exported functions from the module in one of thefollowing ways:
This will execute the function in one of the threads in the thread pool andreturn the result as a promise.
constresult=awaittyped.exec('add',[1,2]);// 3
Every function call creates a channel, which is a duplex stream (more on thatlater). By calling the.ch() method, you can get a reference to the channel.
You can get the final result of the function call from the.result promise:
constresult=awaittyped.ch('add',[1,2]).result;// 3
You can construct an "API" object of your module using the.api() method.
constapi=typed.api();
This returns an object of all the exported functions, which you can call:
constresult=awaitapi.add(1,2).result;// 3
To use this method you need to make sure that you module is loaded in at leastone thread. You can achieve that by calling themodule.init() method.
awaitmodule.init();
Now you can create a closure for you function
constadd=typed.fn('add');
and run it as a function (it returns a channel)
constresult=awaitadd(1,2).result;// 3
Thefun() method will create a module out of a single function and load it inthe main thread pool.
import{fun}from'node-multicore';constfn=fun((a:number,b:number)=>a+b);constresult=awaitfn(1,2);// 3
Note, when using thefun() method do not get access to the underlying channeland you can specify all function arguments in function callfn(1, 2) instead ofas an arrayfn([1, 2]).
Under the hood, thefun() method creates a module with a single function. Youcan achieve that manually as well:
constmodule=pool.fun((a:number,b:number)=>a+b);
Now themodule object is just like any other module, the single function isexported asdefault.
Note, you function cannot access any variables outside of its scope.
You can load a CommonJs module from a string. This is useful if you want toload a module dynamically. It is loaded into threads progressively, as themodule concurrency rises. After you are done with the module, you can unload it.
Create a CommonJs text module:
import{pool}from'..';consttext=/* js */`let state = 0;exports.add = ([a, b]) => { return a + b;}exports.set = (value) => state = value;exports.get = () => state;`;
Load it using thepool.cjs() method:
constmodule=pool.cjs(text);
Now you can use it as any other module:
// Execute a function exported by the moduleconstresult=awaitmodule.exec('add',[1,2]);console.log(result);// 3// Pin module to a single random thread, so multiple calls access the same stateconstpinned=module.pinned();awaitpinned.ch('set',123).result;constget=awaitpinned.ch('get',void0).result;console.log(get);// 123
Once you don't need this module, you can unload it:
// Unload the module, once it's no longer neededawaitmodule.unload();// await module.exec will throw an error now
Run a demo with the following command:
node -r ts-node/register src/demo/cjs-text.ts
ECMAScriptModule Expressionsproposal will allow to create anonymous modules at runtime, which can then becopied to other threads. This library will support this proposal once it isimplemented in Node.js.
Modules are loaded in worker threads and their exports become available in themain thread. Below we describe how different types of exports are handled.
The most common export is a function, which receives a single "payload" argument.The function can be async as well as synchronous. The return value of the functionis sent back to the main thread.
import{WorkerFn}from'node-multicore';exportconstadd:WorkerFn<[a:number,b:number],number>=([a,b])=>{returna+b;};
Channels are functions, which accept 2 or 3 arguments. The first argument is a"payload" argument, which is the same as for regular functions. The next twoarguments are "send" and "receive" methods, which can be used to send and receivedata from the main thread.
import{WorkerCh,taker}from'node-multicore';exportconstaddThreeNumbers:WorkerCh<number,number,number,void>=async(one,send,recv)=>{consttake=taker(recv);consttwo=awaittake();constthree=awaittake();returnone+two+three;};
The channel is open until the function returns. You can use thetaker() helperto create a function, which will wait for the next value from the channel.
If module exports a promise, when called from the main thread the promise willbe resolved first and then: (1) if the promise resolves to a function, thefunction will be called with the payload argument, (2) if the promise resolvesto anything else, the value will be returned.
All other exports are returned to the main thread as is, using thepostMessagecopy algorithm.
Sometimes your threads need to share state. In that case you may want to pina series of module calls to the same thread. You can do that by calling thepinned() method on a module.
constpinned=module.pinned();
Then use thepinned object to call the module functions:
constresult=awaitpinned.ch('add',[1,2]).result;
All calls through thepinned instance will be executed on the same thread.
When you are sending data between threads, the most efficient way is to transferownership of the data. You can do that using theArrayBuffer objects. This waythe data will not be copied, but instead the buffers will be truncated in thecurrent thread and become available in the new thread.
Transfer buffers when executing a function:
module.exec('fn',params,[buffer1,buffer2,buffer3]);
Transfers buffers when writing to a channel from the main thread:
constchannel=module.ch('fn',params,[buffer1,buffer2,buffer3]);channel.send(123,[buffer4,buffer5,buffer6]);
Transfer buffers when returning a value using themsg helper:
import{msg}from'node-multicore';exportconstadd=([a:number,b:number])=>{returnmsg(a+b,[buffer1,buffer2,buffer3]);};
Transfer buffers when writing to a channel from a worker thread:
exportconstmethod=(params,send,recv)=>{send(123,[buffer1,buffer2,buffer3]);send(456,[buffer4,buffer5,buffer6]);return123;};
Use this shared thread pool to improve performance of compute intensive NPMpackages. Say, there is a packagefoo which performs some heavy computations.Create a new packagefoo.multicore and use this library to improve performanceof thefoo package.
module.ts:
import{fooasfooNative}from'foo';exportconstfoo=(params)=>fooNative(...params);
index.ts:
import{pool}from'node-multicore';consttyped=pool.module(__dirname+'/module').typed<typeofimport('./module')>();exportconstfoo=async(...params)=>{returnawaittyped.call('foo',params);};
Run a demo with the following commands:
yarnyarn demo
Sample output:
CPU = Apple M1, Cores = 8, Max threads = 7, Node = v18.15.0, Arch = arm64, OS = darwinWarmup ...Thread pool: node-multicore (concurrency = 2): 5.280sThread pool: piscina (concurrency = 2): 5.214sThread pool: worker-nodes (concurrency = 2): 5.255sThread pool: node-multicore (concurrency = 4): 3.510sThread pool: piscina (concurrency = 4): 2.734sThread pool: worker-nodes (concurrency = 4): 2.747sThread pool: node-multicore (concurrency = 8): 2.598sThread pool: piscina (concurrency = 8): 2.178sThread pool: worker-nodes (concurrency = 8): 2.070sThread pool: node-multicore (concurrency = 16): 2.144sThread pool: piscina (concurrency = 16): 2.158sThread pool: worker-nodes (concurrency = 16): 2.045sThread pool: node-multicore (concurrency = 32): 1.919sThread pool: piscina (concurrency = 32): 2.153sThread pool: worker-nodes (concurrency = 32): 2.043sThread pool: node-multicore (concurrency = 64): 1.835sThread pool: piscina (concurrency = 64): 2.177sThread pool: worker-nodes (concurrency = 64): 2.044sThread pool: node-multicore (concurrency = 128): 1.843sThread pool: piscina (concurrency = 128): 2.145sThread pool: worker-nodes (concurrency = 128): 2.046sThread pool: node-multicore (concurrency = 256): 1.820sThread pool: piscina (concurrency = 256): 2.116sThread pool: worker-nodes (concurrency = 256): 2.020sThread pool: node-multicore (concurrency = 512): 1.797sThread pool: piscina (concurrency = 512): 2.088sThread pool: worker-nodes (concurrency = 512): 1.995sThread pool: node-multicore (concurrency = 1024): 1.787sThread pool: piscina (concurrency = 1024): 2.058sThread pool: worker-nodes (concurrency = 1024): 2.003sThread pool: node-multicore (concurrency = 1): 9.968sThread pool: piscina (concurrency = 1): 9.995sThread pool: worker-nodes (concurrency = 1): 10.043sOn main thread (concurrency = 1): 9.616sOn main thread (concurrency = 10): 9.489sAbout
Delightful multicore programming in Node.js
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.