Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Delightful multicore programming in Node.js

License

NotificationsYou must be signed in to change notification settings

streamich/node-multicore

Repository files navigation

Parallel programming for Node.js made easy. Make any CommonJs or ESM modulerun in a thread pool.

  • Global thread pool: designed to be a shared thread pool for all NPM packages.
  • Custom threads pools: create a custom thread pool, if you need to.
  • Instant start: starts with 0 threads and scales as the load increases.
  • Instant module loading: load modules to the thread pool dynamically andinstantly—module is loaded in more threads as the module concurrencyincreases.
  • Channels: each function invocation creates a bi-directional data channel,which allows you to stream data to a worker thread and back to the main thread.
  • Pin work to a thread: ability to pin a module to a single thread. Say,your thread holds state—you can pin execution to a single thread, makingsubsequent method call hit the same thread.
  • Single function module: quickly create a single function modules by justdefining the function in your code.
  • Dynamic: pool size grows as the concurrency rises, dead threads are replaced by new ones.
  • Fast: Node Multicore is as fast, see benchmarks below.

Table of contents

Getting started

Install the package

npm install node-multicore

Create amodule.ts that should be executed in the thread pool

import{WorkerFn}from'node-multicore';exportconstadd:WorkerFn<[number,number],number>=([a,b])=>{returna+b;};

Load your module from the main thread

import{resolve}from'path';import{pool}from'node-multicore';constspecifier=resolve(__dirname,'module');typeMethods=typeofimport('./module');constmath=pool.module(specifier).typed<Methods>();

Now call your methods from the main thread

constresult=awaitmath.exec('add',[1,2]);// 3

The thread pool

The global thread pool

Thenode-multicore thread pool is designed to be a single shared global threadpool for all compute intensive NPM packages. You can import it as follows:

import{pool}from'node-multicore';

The global thread pool starts with 0 threads and scales up to the number of CPUsless 1, as the load increases. This is a design decision as this way the globalthread pool avoids overloading the CPU with threads. You can customize theminimum and maxium number of threads in the thread pool using theMC_MIN_THREAD_POOL_SIZEandMC_MAX_THREAD_POOL_SIZE environment variables.

Creating a custom thread pool

The thread pool is designed to be a shared resource, so it is notrecommended to create your own pool. However, if you need to create a separateone, you can:

import{WorkerPool}from'node-multicore';constdedicatedPool=newWorkerPool({});// Instantiate the minimum number of threadsawaitdedicatedPool.init();

When creating a thread pool, you can pass the following options:

  • min — minimum number of threads in the pool, defaults to0 orprocess.env.MC_MIN_THREAD_POOL_SIZE environment setting.
  • max — maximum number of threads in the pool, defaults to the number ofCPUs less 1 orprocess.env.MC_MAX_THREAD_POOL_SIZE environment setting.
  • trackUnmanagedFds — whether to track unmanaged file descriptors inworker threads and close them when the thread is terminated. Defaults tofalse.
  • name — name of the thread pool, used for debugging purposes. Defaultstomulticore.
  • resourceLimits — resource limits for worker threads.
  • env — environment variables for worker threads. Defaults toprocess.env.

Modules

A unit of parallelism in JavaScript is a module. You can load a module in thethread pool and call its exported functions.

Similar to the thread pool, each module is designed to be "lazy" as well. Amodule is not loaded in any of the threads initially, but as the moduleconcurrency rises, the module is gradually loaded in more worker threads.

Static modules

This is the preferred way to use this library, it will load a module by a global"specifier"pool.module(specifier) in the thread pool and you can call itsexported functions.

To begin, first create a module you want to be loaded in the thread pool, put itin amodule.ts file:

exportconstadd=([a,b])=>a+b;

Now add your module to the thread pool:

import{resolve}from'path';constspecifier=resolve(__dirname,'module');constmodule=pool.module(specifier);

To add TypeScript support, you can use thetyped() method:

consttyped=module.typed<typeofimport('./module')>();

This will create a type-safe wrapper, which knows the types of the exportedfunctions. You can now call the exported functions from the module in one of thefollowing ways:

Using the.exec() method

This will execute the function in one of the threads in the thread pool andreturn the result as a promise.

constresult=awaittyped.exec('add',[1,2]);// 3

Using the.ch() method

Every function call creates a channel, which is a duplex stream (more on thatlater). By calling the.ch() method, you can get a reference to the channel.

You can get the final result of the function call from the.result promise:

constresult=awaittyped.ch('add',[1,2]).result;// 3

Using the.api() builder

You can construct an "API" object of your module using the.api() method.

constapi=typed.api();

This returns an object of all the exported functions, which you can call:

constresult=awaitapi.add(1,2).result;// 3

Using the.fn() closure

To use this method you need to make sure that you module is loaded in at leastone thread. You can achieve that by calling themodule.init() method.

awaitmodule.init();

Now you can create a closure for you function

constadd=typed.fn('add');

and run it as a function (it returns a channel)

constresult=awaitadd(1,2).result;// 3

Single function modules

Thefun() method will create a module out of a single function and load it inthe main thread pool.

import{fun}from'node-multicore';constfn=fun((a:number,b:number)=>a+b);constresult=awaitfn(1,2);// 3

Note, when using thefun() method do not get access to the underlying channeland you can specify all function arguments in function callfn(1, 2) instead ofas an arrayfn([1, 2]).

Under the hood, thefun() method creates a module with a single function. Youcan achieve that manually as well:

constmodule=pool.fun((a:number,b:number)=>a+b);

Now themodule object is just like any other module, the single function isexported asdefault.

Note, you function cannot access any variables outside of its scope.

Dynamic CommonJs modules

You can load a CommonJs module from a string. This is useful if you want toload a module dynamically. It is loaded into threads progressively, as themodule concurrency rises. After you are done with the module, you can unload it.

Create a CommonJs text module:

import{pool}from'..';consttext=/* js */`let state = 0;exports.add = ([a, b]) => {  return a + b;}exports.set = (value) => state = value;exports.get = () => state;`;

Load it using thepool.cjs() method:

constmodule=pool.cjs(text);

Now you can use it as any other module:

// Execute a function exported by the moduleconstresult=awaitmodule.exec('add',[1,2]);console.log(result);// 3// Pin module to a single random thread, so multiple calls access the same stateconstpinned=module.pinned();awaitpinned.ch('set',123).result;constget=awaitpinned.ch('get',void0).result;console.log(get);// 123

Once you don't need this module, you can unload it:

// Unload the module, once it's no longer neededawaitmodule.unload();// await module.exec will throw an error now

Run a demo with the following command:

node -r ts-node/register src/demo/cjs-text.ts

Module Expressions

ECMAScriptModule Expressionsproposal will allow to create anonymous modules at runtime, which can then becopied to other threads. This library will support this proposal once it isimplemented in Node.js.

Module exports

Modules are loaded in worker threads and their exports become available in themain thread. Below we describe how different types of exports are handled.

Functions

The most common export is a function, which receives a single "payload" argument.The function can be async as well as synchronous. The return value of the functionis sent back to the main thread.

import{WorkerFn}from'node-multicore';exportconstadd:WorkerFn<[a:number,b:number],number>=([a,b])=>{returna+b;};

Channels

Channels are functions, which accept 2 or 3 arguments. The first argument is a"payload" argument, which is the same as for regular functions. The next twoarguments are "send" and "receive" methods, which can be used to send and receivedata from the main thread.

import{WorkerCh,taker}from'node-multicore';exportconstaddThreeNumbers:WorkerCh<number,number,number,void>=async(one,send,recv)=>{consttake=taker(recv);consttwo=awaittake();constthree=awaittake();returnone+two+three;};

The channel is open until the function returns. You can use thetaker() helperto create a function, which will wait for the next value from the channel.

Promises

If module exports a promise, when called from the main thread the promise willbe resolved first and then: (1) if the promise resolves to a function, thefunction will be called with the payload argument, (2) if the promise resolvesto anything else, the value will be returned.

Other exports

All other exports are returned to the main thread as is, using thepostMessagecopy algorithm.

Advanced concepts

Pinning a thread

Sometimes your threads need to share state. In that case you may want to pina series of module calls to the same thread. You can do that by calling thepinned() method on a module.

constpinned=module.pinned();

Then use thepinned object to call the module functions:

constresult=awaitpinned.ch('add',[1,2]).result;

All calls through thepinned instance will be executed on the same thread.

Transferring data by ownership

When you are sending data between threads, the most efficient way is to transferownership of the data. You can do that using theArrayBuffer objects. This waythe data will not be copied, but instead the buffers will be truncated in thecurrent thread and become available in the new thread.

Transfer buffers when executing a function:

module.exec('fn',params,[buffer1,buffer2,buffer3]);

Transfers buffers when writing to a channel from the main thread:

constchannel=module.ch('fn',params,[buffer1,buffer2,buffer3]);channel.send(123,[buffer4,buffer5,buffer6]);

Transfer buffers when returning a value using themsg helper:

import{msg}from'node-multicore';exportconstadd=([a:number,b:number])=>{returnmsg(a+b,[buffer1,buffer2,buffer3]);};

Transfer buffers when writing to a channel from a worker thread:

exportconstmethod=(params,send,recv)=>{send(123,[buffer1,buffer2,buffer3]);send(456,[buffer4,buffer5,buffer6]);return123;};

Multicore packages

Use this shared thread pool to improve performance of compute intensive NPMpackages. Say, there is a packagefoo which performs some heavy computations.Create a new packagefoo.multicore and use this library to improve performanceof thefoo package.

module.ts:

import{fooasfooNative}from'foo';exportconstfoo=(params)=>fooNative(...params);

index.ts:

import{pool}from'node-multicore';consttyped=pool.module(__dirname+'/module').typed<typeofimport('./module')>();exportconstfoo=async(...params)=>{returnawaittyped.call('foo',params);};

Demo / Benchmark

Run a demo with the following commands:

yarnyarn demo

Sample output:

CPU = Apple M1, Cores = 8, Max threads = 7, Node = v18.15.0, Arch = arm64, OS = darwinWarmup ...Thread pool: node-multicore (concurrency = 2): 5.280sThread pool: piscina (concurrency = 2): 5.214sThread pool: worker-nodes (concurrency = 2): 5.255sThread pool: node-multicore (concurrency = 4): 3.510sThread pool: piscina (concurrency = 4): 2.734sThread pool: worker-nodes (concurrency = 4): 2.747sThread pool: node-multicore (concurrency = 8): 2.598sThread pool: piscina (concurrency = 8): 2.178sThread pool: worker-nodes (concurrency = 8): 2.070sThread pool: node-multicore (concurrency = 16): 2.144sThread pool: piscina (concurrency = 16): 2.158sThread pool: worker-nodes (concurrency = 16): 2.045sThread pool: node-multicore (concurrency = 32): 1.919sThread pool: piscina (concurrency = 32): 2.153sThread pool: worker-nodes (concurrency = 32): 2.043sThread pool: node-multicore (concurrency = 64): 1.835sThread pool: piscina (concurrency = 64): 2.177sThread pool: worker-nodes (concurrency = 64): 2.044sThread pool: node-multicore (concurrency = 128): 1.843sThread pool: piscina (concurrency = 128): 2.145sThread pool: worker-nodes (concurrency = 128): 2.046sThread pool: node-multicore (concurrency = 256): 1.820sThread pool: piscina (concurrency = 256): 2.116sThread pool: worker-nodes (concurrency = 256): 2.020sThread pool: node-multicore (concurrency = 512): 1.797sThread pool: piscina (concurrency = 512): 2.088sThread pool: worker-nodes (concurrency = 512): 1.995sThread pool: node-multicore (concurrency = 1024): 1.787sThread pool: piscina (concurrency = 1024): 2.058sThread pool: worker-nodes (concurrency = 1024): 2.003sThread pool: node-multicore (concurrency = 1): 9.968sThread pool: piscina (concurrency = 1): 9.995sThread pool: worker-nodes (concurrency = 1): 10.043sOn main thread (concurrency = 1): 9.616sOn main thread (concurrency = 10): 9.489s

[8]ページ先頭

©2009-2025 Movatter.jp