parallelism implements high-level primitives for SMP parallelism.These include parallel foreach, parallel reduce, parallel eager map, pipeliningand future/promise parallelism.std.parallelism is recommended when thesame operation is to be executed in parallel on different data, or when afunction is to be executed in a background thread and its result returned to awell-defined main thread. For communication between arbitrary threads, seestd.concurrency.parallelism is based on the concept of aTask. ATask is anobject that represents the fundamental unit of work in this library and may beexecuted in parallel with any otherTask. UsingTaskdirectly allows programming with a future/promise paradigm. All othersupported parallelism paradigms (parallel foreach, map, reduce, pipelining)represent an additional level of abstraction overTask. Theyautomatically create one or moreTask objects, or closely related typesthat are conceptually identical but not part of the public API.After creation, aTask may be executed in a new thread, or submittedto aTaskPool for execution. ATaskPool encapsulates a task queueand its worker threads. Its purpose is to efficiently map a largenumber ofTasks onto a smaller number of threads. A task queue is aFIFO queue ofTask objects that have been submitted to theTaskPool and are awaiting execution. A worker thread is a thread thatis associated with exactly one task queue. It executes theTask at thefront of its queue when the queue has work available, or sleeps whenno work is available. Each task queue is associated with zero ormore worker threads. If the result of aTask is needed before executionby a worker thread has begun, theTask can be removed from the task queueand executed immediately in the thread where the result is needed.WarningUnless marked as@trusted or@safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.
Sourcestd/parallelism.d
AuthorDavid Simcha
Task(alias fun, Args...);Task represents the fundamental unit of work. ATask may beexecuted in parallel with any otherTask. Using this struct directlyallows future/promise parallelism. In this paradigm, a function (or delegateor other callable) is executed in a thread other than the one it was calledfrom. The calling thread does not block while the function is being executed.A call toworkForce,yieldForce, orspinForce is used toensure that theTask has finished executing and to obtain the returnvalue, if any. These functions anddone also act as full memory barriers,meaning that any memory writes made in the thread that executed theTaskare guaranteed to be visible in the calling thread after one of these functionsreturns.args = _args[1 .. __dollar];ReturnType = typeof(fun(_args));spinForce();yieldForce();workForce();done();executeInNewThread();executeInNewThread(intpriority);priority. Seestd.parallelism.task for usage example.task(alias fun, Args...)(Argsargs);Example
// Read two files into memory at the same time.import std.file;void main(){// Create and execute a Task for reading// foo.txt.auto file1Task =task!read("foo.txt"); file1Task.executeInNewThread();// Read bar.txt in parallel.auto file2Data = read("bar.txt");// Get the results of reading foo.txt.auto file1Data = file1Task.yieldForce;}
// Sorts an array using a parallel quick sort algorithm.// The first partition is done serially. Both recursion// branches are then executed in parallel.//// Timings for sorting an array of 1,000,000 doubles on// an Athlon 64 X2 dual core machine://// This implementation: 176 milliseconds.// Equivalent serial implementation: 280 millisecondsvoid parallelSort(T)(T[] data){// Sort small subarrays serially.if (data.length < 100) { std.algorithm.sort(data);return; }// Partition the array. swap(data[$ / 2], data[$ - 1]);auto pivot = data[$ - 1];bool lessThanPivot(T elem) {return elem < pivot; }auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); swap(data[$ - greaterEqual.length - 1], data[$ - 1]);auto less = data[0..$ - greaterEqual.length - 1]; greaterEqual = data[$ - greaterEqual.length..$];// Execute both recursion branches in parallel.auto recurseTask =task!parallelSort(greaterEqual); taskPool.put(recurseTask); parallelSort(less); recurseTask.yieldForce;}
task(F, Args...)(FdelegateOrFp, Argsargs)delegateOrFp(args))) && !isSafeTask!F);Example
// Read two files in at the same time again,// but this time use a function pointer instead// of an alias to represent std.file.read.import std.file;void main(){// Create and execute a Task for reading// foo.txt.auto file1Task =task(&read!string,"foo.txt", size_t.max); file1Task.executeInNewThread();// Read bar.txt in parallel.auto file2Data = read("bar.txt");// Get the results of reading foo.txt.auto file1Data = file1Task.yieldForce;}
NotesThis function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, seescopedTask, which takes a scope delegate.
task(F, Args...)(Ffun, Argsargs)fun(args)) && isSafeTask!F);task usable from@safe code. Usage mechanics areidentical to the non-@safe case, but safety introduces some restrictions:fun must be @safe or @trusted.fun must not return by reference.fun ispure or theTask is executed viaexecuteInNewThread instead of using aTaskPool.scopedTask(alias fun, Args...)(Argsargs);scopedTask(F, Args...)(scope FdelegateOrFp, Argsargs)delegateOrFp(args))) && !isSafeTask!F);scopedTask(F, Args...)(Ffun, Argsargs)fun(args)) && isSafeTask!F);scopedTaskcannot exceed the lifetime of the scope it was created in.scopedTask might be preferred overtask:scopedTask takes ascope delegate.NotesTask objects created usingscopedTask will automaticallycallTask.yieldForce in their destructor if necessary to ensuretheTask is complete before the stack frame they reside on is destroyed.
totalCPUs = __lazilyInitializedConstant!(immutable(uint), 4294967295u, totalCPUsImpl).__lazilyInitializedConstant;TaskPool;TaskPool and are awaiting execution. A worker thread is athread that executes theTask at the front of the queue when one isavailable and sleeps when the queue is empty.TaskPool:TaskPool instances with multiple priorities, for example a low priority pool and a high priority pool.NoteThe worker threads in this pool will not stop untilstop orfinish is called, even if the main thread has finished already. This may lead to programs that never end. If you do not want this behaviour, you can setisDaemon to true.
NoteOn single-core machines, the primitives provided byTaskPool operate transparently in single-threaded mode.
nWorkers);parallel(R)(Rrange, size_tworkUnitSize);parallel(R)(Rrange);range to be processed by a worker thread between communication with any other thread. The number of elements processed per work unit is controlled by theworkUnitSize parameter. Smaller work units provide better load balancing, but larger work units avoid the overhead of communicating with other threads frequently to fetch the next work unit. Large work units also avoid false sharing in cases where the range is being modified. The less time a single iteration of the loop takes, the largerworkUnitSize should be. For very expensive loop bodies,workUnitSize should be 1. An overload that chooses a default work unit size is also available.Example
// Find the logarithm of every number from 1 to// 10_000_000 in parallel.auto logs =newdouble[10_000_000];// Parallel foreach works with or without an index// variable. It can iterate by ref if range.front// returns by ref.// Iterate over logs using work units of size 100.foreach (i,ref elem; taskPool.parallel(logs, 100)){ elem = log(i + 1.0);}// Same thing, but use the default work unit size.//// Timings on an Athlon 64 X2 dual core machine://// Parallel foreach: 388 milliseconds// Regular foreach: 619 millisecondsforeach (i,ref elem; taskPool.parallel(logs)){ elem = log(i + 1.0);}
NotesThe memory usage of this implementation is guaranteed to be constant inrange.length.
workUnitSize before executing the parallel portion of the loop. The exception is that, if a parallel foreach is executed over a range returned byasyncBuf ormap, the copying is elided and the buffers are simply swapped. In this caseworkUnitSize is ignored and the work unit size is set to the buffer size ofrange. A memory barrier is guaranteed to be executed on exit from the loop, so that results produced by all threads are visible in the calling thread.Exception Handling: When at least one exception is thrown from inside a parallel foreach loop, the submission of additionalTask objects is terminated as soon as possible, in a non-deterministic manner. All executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown by any work unit are chained usingThrowable.next and rethrown. The order of the exception chaining is non-deterministic.amap(functions...)amap(Args...)(Argsargs)auto numbers = iota(100_000_000.0);// Find the square roots of numbers.//// Timings on an Athlon 64 X2 dual core machine://// Parallel eager map: 0.802 s// Equivalent serial implementation: 1.768 sauto squareRoots = taskPool.amap!sqrt(numbers);Immediately after the range argument, an optional work unit size argument may be provided. Work units as used by
amap are identical to those defined for parallel foreach. If no work unit size is provided, the default work unit size is used.// Same thing, but make work unit size 100.auto squareRoots = taskPool.amap!sqrt(numbers, 100);An output range for returning the results may be provided as the last argument. If one is not provided, an array of the proper type will be allocated on the garbage collected heap. If one is provided, it must be a random access range with assignable elements, must have reference semantics with respect to assignment to its elements, and must have the same length as the input range. Writing to adjacent elements from different threads must be safe.
// Same thing, but explicitly allocate an array// to return the results in. The element type// of the array may be either the exact type// returned by functions or an implicit conversion// target.auto squareRoots =newfloat[numbers.length];taskPool.amap!sqrt(numbers, squareRoots);// Multiple functions, explicit output range, and// explicit work unit size.auto results =new Tuple!(float,real)[numbers.length];taskPool.amap!(sqrt, log)(numbers, 100, results);
NoteA memory barrier is guaranteed to be executed after all results are written but before returning so that results produced by all threads are visible in the calling thread.
TipsTo perform the mapping operation in place, provide the same range for the input and output range.
To parallelize the copying of a range with expensive to evaluate elements to an array, pass an identity function (a function that just returns whatever argument is provided to it) toamap.Exception Handling: When at least one exception is thrown from inside the map functions, the submission of additionalTask objects is terminated as soon as possible, in a non-deterministic manner. All currently executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown from any work unit are chained usingThrowable.next and rethrown. The order of the exception chaining is non-deterministic.map(functions...)map(S)(Ssource, size_tbufSize = 100, size_tworkUnitSize = size_t.max)bufSize elements and stored in a buffer and made available topopFront. Meanwhile, in the background a second buffer of the same size is filled. When the first buffer is exhausted, it is swapped with the second buffer and filled while the values from what was originally the second buffer are read. This implementation allows for elements to be written to the buffer without the need for atomic operations or synchronization for each write, and enables the mapping function to be evaluated efficiently in parallel.map has more overhead than the simpler procedure used byamap but avoids the need to keep all results in memory simultaneously and works with non-random access ranges.Ssource | Theinput range to be mapped. Ifsource is not random access it will be lazily buffered to an array of sizebufSize before the map function is evaluated. (For an exception to this rule, see Notes.) |
size_tbufSize | The size of the buffer to store the evaluated elements. |
size_tworkUnitSize | The number of elements to evaluate in a singleTask. Must be less than or equal tobufSize, and should be a fraction ofbufSize such that all worker threads can be used. If the default of size_t.max is used, workUnitSize will be set to the pool-wide default. |
source has a length.NotesIf a range returned bymap orasyncBuf is used as an input tomap, then as an optimization the copying from the output buffer of the first range to the input buffer of the second range is elided, even though the ranges returned bymap andasyncBuf are non-random access ranges. This means that thebufSize parameter passed to the current call tomap will be ignored and the size of the buffer will be the buffer size ofsource.
Example
// Pipeline reading a file, converting each line// to a number, taking the logarithms of the numbers,// and performing the additions necessary to find// the sum of the logarithms.auto lineRange = File("numberList.txt").byLine();auto dupedLines = std.algorithm.map!"a.idup"(lineRange);auto nums = taskPool.map!(to!double)(dupedLines);auto logs = taskPool.map!log10(nums);double sum = 0;foreach (elem; logs){ sum += elem;}Exception Handling: Any exceptions thrown while iterating over
source or computing the map function are re-thrown on a call topopFront or, if thrown during construction, are simply allowed to propagate to the caller. In the case of exceptions thrown while computing the map function, the exceptions are chained as inTaskPool.amap.asyncBuf(S)(Ssource, size_tbufSize = 100)source range that is expensive to iterate over, returns aninput range that asynchronously buffers the contents ofsource into a buffer ofbufSize elements in a worker thread, while making previously buffered elements from a second buffer, also of sizebufSize, available via the range interface of the returned object. The returned range has a length iffhasLength!S.asyncBuf is useful, for example, when performing expensive operations on the elements of ranges that represent data on a disk or network.Example
import std.conv, std.stdio;void main(){// Fetch lines of a file in a background thread// while processing previously fetched lines,// dealing with byLine's buffer recycling by// eagerly duplicating every line.auto lines = File("foo.txt").byLine();auto duped = std.algorithm.map!"a.idup"(lines);// Fetch more lines in the background while we// process the lines already read into memory// into a matrix of doubles.double[][] matrix;auto asyncReader = taskPool.asyncBuf(duped);foreach (line; asyncReader) {auto ls = line.split("\t"); matrix ~= to!(double[])(ls); }}Exception Handling: Any exceptions thrown while iterating over
source are re-thrown on a call topopFront or, if thrown during construction, simply allowed to propagate to the caller.asyncBuf(C1, C2)(C1next, C2empty, size_tinitialBufSize = 0, size_tnBuffers = 100)next that writes to a user-provided buffer and a second callable objectempty that determines whether more data is available to write vianext, returns an input range that asynchronously callsnext with a set of sizenBuffers of buffers and makes the results available in the order they were obtained via the input range interface of the returned object. Similarly to the input range overload ofasyncBuf, the first half of the buffers are made available via the range interface while the second half are filled and vice-versa.C1next | A callable object that takes a single argument that must be an array with mutable elements. When called,next writes data to the array provided by the caller. |
C2empty | A callable object that takes no arguments and returns a type implicitly convertible tobool. This is used to signify that no more data is available to be obtained by callingnext. |
size_tinitialBufSize | The initial size of each buffer. Ifnext takes its array by reference, it may resize the buffers. |
size_tnBuffers | The number of buffers to cycle through when callingnext. |
Example
// Fetch lines of a file in a background// thread while processing previously fetched// lines, without duplicating any lines.auto file = File("foo.txt");voidnext(refchar[] buf){ file.readln(buf);}// Fetch more lines in the background while we// process the lines already read into memory// into a matrix of doubles.double[][] matrix;auto asyncReader = taskPool.asyncBuf(&next, &file.eof);foreach (line; asyncReader){auto ls = line.split("\t"); matrix ~= to!(double[])(ls);}Exception Handling: Any exceptions thrown while iterating overrange are re-thrown on a call topopFront.
WarningUsing the range returned by this function in a parallel foreach loop will not work because buffers may be overwritten while the task that processes them is in queue. This is checked for at compile time and will result in a static assertion failure.
reduce(functions...)reduce(Args...)(Argsargs);// Find the sum of squares of a range in parallel, using// an explicit seed.//// Timings on an Athlon 64 X2 dual core machine://// Parallel reduce: 72 milliseconds// Using std.algorithm.reduce instead: 181 millisecondsauto nums = iota(10_000_000.0f);auto sumSquares = taskPool.reduce!"a + b"( 0.0, std.algorithm.map!"a * a"(nums));If no explicit seed is provided, the first element of each work unit is used as a seed. For the final reduction, the result from the first work unit is used as the seed.
// Find the sum of a range in parallel, using the first// element of each work unit as the seed.auto sum = taskPool.reduce!"a + b"(nums);An explicit work unit size may be specified as the last argument. Specifying too small a work unit size will effectively serialize the reduction, as the final reduction of the result of each work unit will dominate computation time. IfTaskPool.size for this instance is zero, this parameter is ignored and one work unit is used.
// Use a work unit size of 100.auto sum2 = taskPool.reduce!"a + b"(nums, 100);// Work unit size of 100 and explicit seed.auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);Parallel reduce supports multiple functions, likestd.algorithm.
reduce.// Find both the min and max of nums.auto minMax = taskPool.reduce!(min, max)(nums);assert(minMax[0] ==reduce!min(nums));assert(minMax[1] ==reduce!max(nums));Exception Handling: After this function is finished executing, any exceptions thrown are chained together viaThrowable.next and rethrown. The chaining order is non-deterministic.
fold(functions...)fold(Args...)(Argsargs);fold is functionally equivalent toreduce except the range parameter comes first and there is no need to usetuple for multiple seeds. There may be one or more callable entities (functions argument) to apply.Argsargs | Just the range to fold over; or the range and one seed per function; or the range, one seed per function, and the work unit size |
Example
staticint adder(int a,int b){return a + b;}staticint multiplier(int a,int b){return a * b;}// Just the rangeauto x = taskPool.fold!adder([1, 2, 3, 4]);assert(x == 10);// The range and the seeds (0 and 1 below; also note multiple// functions in this example)auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);assert(y[0] == 10);assert(y[1] == 24);// The range, the seed (0), and the work unit size (20)auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);assert(z == 10);
workerIndex() const;Example
// Execute a loop that computes the greatest common// divisor of every number from 0 through 999 with// 42 in parallel. Write the results out to// a set of files, one for each thread. This allows// results to be written out without any synchronization.import std.conv, std.range, std.numeric, std.stdio;void main(){auto filesHandles =new File[taskPool.size + 1];scope(exit) {foreach (ref handle; fileHandles) { handle.close(); } }foreach (i,ref handle; fileHandles) { handle = File("workerResults" ~ to!string(i) ~".txt"); }foreach (num; parallel(iota(1_000))) {auto outHandle = fileHandles[taskPool.workerIndex]; outHandle.writeln(num, '\t', gcd(num, 42)); }}
WorkerLocalStorage(T);WorkerLocalStorage are:WorkerLocalStorage as local to each thread for only the parallel portion of an algorithm.Example
// Calculate pi as in our synopsis example, but// use an imperative instead of a functional style.immutable n = 1_000_000_000;immutable delta = 1.0L / n;auto sums = taskPool.workerLocalStorage(0.0L);foreach (i; parallel(iota(n))){immutable x = ( i - 0.5L ) * delta;immutable toAdd = delta / ( 1.0 + x * x ); sums.get += toAdd;}// Add up the results from each worker thread.real pi = 0;foreach (threadResult; sums.toRange){ pi += 4.0L * threadResult;}
get(this Qualified)();get from any thread outside theTaskPool that created this instance will return the same reference, so an instance of worker-local storage should only be accessed from one thread outside the pool that created it. If this rule is violated, undefined behavior will result.get(Tval);toRange();WorkerLocalStorageRange(T);workerLocalStorage(T)(lazy TinitialVal = T.init);stop();finish(boolblocking = false);WarningCalling this function withblocking = true from a worker thread that is a member of the sameTaskPool thatfinish is being called on will result in a deadlock.
size() const;put(alias fun, Args...)(ref Task!(fun, Args)task)task)));put(alias fun, Args...)(Task!(fun, Args)*task)task)));Example
import std.file;// Create a task.auto t =task!read("foo.txt");// Add it to the queue to be executed.taskPool.put(t);
Notes@trusted overloads of this function are called forTasks ifstd.traits.hasUnsharedAliasing is false for theTask's return type or the function theTask executes ispure.Task objects that meet all other requirements specified in the@trusted overloads oftask andscopedTask may be created and executed from@safe code viaTask.executeInNewThread but not viaTaskPool.
isDaemon();isDaemon(boolnewVal);NoteFor a size zero pool, the getter arbitrarily returns true and the setter has no effect.
priority();priority(intnewPriority);priority, so a given priority value here means the same thing as an identical priority value incore.thread.NoteFor a size zero pool, the getter arbitrarily returnscore.thread.Thread.PRIORITY_MIN and the setter has no effect.
taskPool();defaultPoolThreads();defaultPoolThreads(uintnewVal);parallel(R)(Rrange);parallel(R)(Rrange, size_tworkUnitSize);parallel. Thepurpose of these is to make parallel foreach less verbose and morereadable.Example
// Find the logarithm of every number from// 1 to 1_000_000 in parallel, using the// default TaskPool instance.auto logs =newdouble[1_000_000];foreach (i,ref elem;parallel(logs)){ elem = log(i + 1.0);}