Movatterモバイル変換


[0]ホーム

URL:


D Logo
Menu
Search

Library Reference

version 2.112.0

overview

Report a bug
If you spot a problem with this page, click here to create a Bugzilla issue.
Improve this page
Quickly fork, edit online, and submit a pull request for this page.Requires a signed-in GitHub account. This works well for small changes.If you'd like to make larger changes you may want to consider usinga local clone.

std.parallelism

std.parallelism implements high-level primitives for SMP parallelism.These include parallel foreach, parallel reduce, parallel eager map, pipeliningand future/promise parallelism.std.parallelism is recommended when thesame operation is to be executed in parallel on different data, or when afunction is to be executed in a background thread and its result returned to awell-defined main thread. For communication between arbitrary threads, seestd.concurrency.
std.parallelism is based on the concept of aTask. ATask is anobject that represents the fundamental unit of work in this library and may beexecuted in parallel with any otherTask. UsingTaskdirectly allows programming with a future/promise paradigm. All othersupported parallelism paradigms (parallel foreach, map, reduce, pipelining)represent an additional level of abstraction overTask. Theyautomatically create one or moreTask objects, or closely related typesthat are conceptually identical but not part of the public API.
After creation, aTask may be executed in a new thread, or submittedto aTaskPool for execution. ATaskPool encapsulates a task queueand its worker threads. Its purpose is to efficiently map a largenumber ofTasks onto a smaller number of threads. A task queue is aFIFO queue ofTask objects that have been submitted to theTaskPool and are awaiting execution. A worker thread is a thread thatis associated with exactly one task queue. It executes theTask at thefront of its queue when the queue has work available, or sleeps whenno work is available. Each task queue is associated with zero ormore worker threads. If the result of aTask is needed before executionby a worker thread has begun, theTask can be removed from the task queueand executed immediately in the thread where the result is needed.

WarningUnless marked as@trusted or@safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.

Sourcestd/parallelism.d

AuthorDavid Simcha

License:
Boost License 1.0
structTask(alias fun, Args...);
Task represents the fundamental unit of work. ATask may beexecuted in parallel with any otherTask. Using this struct directlyallows future/promise parallelism. In this paradigm, a function (or delegateor other callable) is executed in a thread other than the one it was calledfrom. The calling thread does not block while the function is being executed.A call toworkForce,yieldForce, orspinForce is used toensure that theTask has finished executing and to obtain the returnvalue, if any. These functions anddone also act as full memory barriers,meaning that any memory writes made in the thread that executed theTaskare guaranteed to be visible in the calling thread after one of these functionsreturns.
Thestd.parallelism.task andstd.parallelism.scopedTask functions canbe used to create an instance of this struct. Seetask for usage examples.
Function results are returned fromyieldForce,spinForce andworkForce by ref. Iffun returns by ref, the reference will pointto the returned reference offun. Otherwise it will point to afield in this struct.
Copying of this struct is disabled, since it would provide no useful semantics.If you want to pass this struct around, you should do so by reference orpointer.
Bugs:
Changes toref andout arguments are not propagated to the call site, only toargs in this struct.
aliasargs = _args[1 .. __dollar];
The arguments the function was called with. Changes toout andref arguments will be visible here.
aliasReturnType = typeof(fun(_args));
The return type of the function called by thisTask. This can bevoid.
@property ref @trusted ReturnTypespinForce();
If theTask isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, busy spin until it's done, then return the return value. If it threw an exception, rethrow that exception.
This function should be used when you expect the result of theTask to be available on a timescale shorter than that of an OS context switch.
@property ref @trusted ReturnTypeyieldForce();
If theTask isn't started yet, execute it in the current thread. If it's done, return its return value, if any. If it's in progress, wait on a condition variable. If it threw an exception, rethrow that exception.
This function should be used for expensive functions, as waiting on a condition variable introduces latency, but avoids wasted CPU cycles.
@property ref @trusted ReturnTypeworkForce();
If thisTask was not started yet, execute it in the current thread. If it is finished, return its result. If it is in progress, execute any otherTask from theTaskPool instance that thisTask was submitted to until this one is finished. If it threw an exception, rethrow that exception. If no other tasks are available or thisTask was executed usingexecuteInNewThread, wait on a condition variable.
@property @trusted booldone();
Returnstrue if theTask is finished executing.
Throws:
Rethrows any exception thrown during the execution of theTask.
@trusted voidexecuteInNewThread();

@trusted voidexecuteInNewThread(intpriority);
Create a new thread for executing thisTask, execute it in the newly created thread, then terminate the thread. This can be used for future/promise parallelism. An explicit priority may be given to theTask. If one is provided, its value is forwarded tocore.thread.Thread.priority. Seestd.parallelism.task for usage example.
autotask(alias fun, Args...)(Argsargs);
Creates aTask on the GC heap that calls an alias. This may be executedviaTask.executeInNewThread or by submitting to astd.parallelism.TaskPool. A globally accessible instance ofTaskPool is provided bystd.parallelism.taskPool.
Returns:
A pointer to theTask.

Example

// Read two files into memory at the same time.import std.file;void main(){// Create and execute a Task for reading// foo.txt.auto file1Task =task!read("foo.txt");    file1Task.executeInNewThread();// Read bar.txt in parallel.auto file2Data = read("bar.txt");// Get the results of reading foo.txt.auto file1Data = file1Task.yieldForce;}
// Sorts an array using a parallel quick sort algorithm.// The first partition is done serially.  Both recursion// branches are then executed in parallel.//// Timings for sorting an array of 1,000,000 doubles on// an Athlon 64 X2 dual core machine://// This implementation:               176 milliseconds.// Equivalent serial implementation:  280 millisecondsvoid parallelSort(T)(T[] data){// Sort small subarrays serially.if (data.length < 100)    {         std.algorithm.sort(data);return;    }// Partition the array.    swap(data[$ / 2], data[$ - 1]);auto pivot = data[$ - 1];bool lessThanPivot(T elem) {return elem < pivot; }auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);    swap(data[$ - greaterEqual.length - 1], data[$ - 1]);auto less = data[0..$ - greaterEqual.length - 1];    greaterEqual = data[$ - greaterEqual.length..$];// Execute both recursion branches in parallel.auto recurseTask =task!parallelSort(greaterEqual);    taskPool.put(recurseTask);    parallelSort(less);    recurseTask.yieldForce;}

autotask(F, Args...)(FdelegateOrFp, Argsargs)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);
Creates aTask on the GC heap that calls a function pointer, delegate, orclass/struct with overloaded opCall.

Example

// Read two files in at the same time again,// but this time use a function pointer instead// of an alias to represent std.file.read.import std.file;void main(){// Create and execute a Task for reading// foo.txt.auto file1Task =task(&read!string,"foo.txt", size_t.max);    file1Task.executeInNewThread();// Read bar.txt in parallel.auto file2Data = read("bar.txt");// Get the results of reading foo.txt.auto file1Data = file1Task.yieldForce;}

NotesThis function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, seescopedTask, which takes a scope delegate.

@trusted autotask(F, Args...)(Ffun, Argsargs)
if (__traits(compiles, () @safe =>fun(args)) && isSafeTask!F);
Version oftask usable from@safe code. Usage mechanics areidentical to the non-@safe case, but safety introduces some restrictions:
  1. fun must be @safe or @trusted.
  2. F must not have any unshared aliasing as defined bystd.traits.hasUnsharedAliasing. This means it may not be an unshared delegate or a non-shared class or struct with overloadedopCall. This also precludes accepting template alias parameters.
  3. Args must not have unshared aliasing.
  4. fun must not return by reference.
  5. The return type must not have unshared aliasing unlessfun ispure or theTask is executed viaexecuteInNewThread instead of using aTaskPool.
autoscopedTask(alias fun, Args...)(Argsargs);

autoscopedTask(F, Args...)(scope FdelegateOrFp, Argsargs)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);

@trusted autoscopedTask(F, Args...)(Ffun, Argsargs)
if (__traits(compiles, () @safe =>fun(args)) && isSafeTask!F);
These functions allow the creation ofTask objects on the stack ratherthan the GC heap. The lifetime of aTask created byscopedTaskcannot exceed the lifetime of the scope it was created in.
scopedTask might be preferred overtask:
  1. When aTask that calls a delegate is being created and a closure cannot be allocated due to objects on the stack that have scoped destruction. The delegate overload ofscopedTask takes ascope delegate.
  2. As a micro-optimization, to avoid the heap allocation associated withtask or with the creation of a closure.
Usage is otherwise identical totask.

NotesTask objects created usingscopedTask will automaticallycallTask.yieldForce in their destructor if necessary to ensuretheTask is complete before the stack frame they reside on is destroyed.

aliastotalCPUs = __lazilyInitializedConstant!(immutable(uint), 4294967295u, totalCPUsImpl).__lazilyInitializedConstant;
The total number of CPU cores available on the current machine, as reported bythe operating system.
classTaskPool;
This class encapsulates a task queue and a set of worker threads. Its purposeis to efficiently map a large number ofTasks onto a smaller number ofthreads. A task queue is a FIFO queue ofTask objects that have beensubmitted to theTaskPool and are awaiting execution. A worker thread is athread that executes theTask at the front of the queue when one isavailable and sleeps when the queue is empty.
This class should usually be used via the global instantiationavailable via thestd.parallelism.taskPool property.Occasionally it is useful to explicitly instantiate aTaskPool:
  1. When you wantTaskPool instances with multiple priorities, for example a low priority pool and a high priority pool.
  2. When the threads in the global task pool are waiting on a synchronization primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.

NoteThe worker threads in this pool will not stop untilstop orfinish is called, even if the main thread has finished already. This may lead to programs that never end. If you do not want this behaviour, you can setisDaemon to true.

@trusted this();
Default constructor that initializes aTaskPool withtotalCPUs - 1 worker threads. The minus 1 is included because the main thread will also be available to do work.

NoteOn single-core machines, the primitives provided byTaskPool operate transparently in single-threaded mode.

@trusted this(size_tnWorkers);
Allows for custom number of worker threads.
ParallelForeach!Rparallel(R)(Rrange, size_tworkUnitSize);

ParallelForeach!Rparallel(R)(Rrange);
Implements a parallel foreach loop over a range. This works by implicitly creating and submitting oneTask to theTaskPool for each worker thread. A work unit is a set of consecutive elements ofrange to be processed by a worker thread between communication with any other thread. The number of elements processed per work unit is controlled by theworkUnitSize parameter. Smaller work units provide better load balancing, but larger work units avoid the overhead of communicating with other threads frequently to fetch the next work unit. Large work units also avoid false sharing in cases where the range is being modified. The less time a single iteration of the loop takes, the largerworkUnitSize should be. For very expensive loop bodies,workUnitSize should be 1. An overload that chooses a default work unit size is also available.

Example

// Find the logarithm of every number from 1 to// 10_000_000 in parallel.auto logs =newdouble[10_000_000];// Parallel foreach works with or without an index// variable.  It can iterate by ref if range.front// returns by ref.// Iterate over logs using work units of size 100.foreach (i,ref elem; taskPool.parallel(logs, 100)){    elem = log(i + 1.0);}// Same thing, but use the default work unit size.//// Timings on an Athlon 64 X2 dual core machine://// Parallel foreach:  388 milliseconds// Regular foreach:   619 millisecondsforeach (i,ref elem; taskPool.parallel(logs)){    elem = log(i + 1.0);}

NotesThe memory usage of this implementation is guaranteed to be constant inrange.length.

Breaking from a parallel foreach loop via a break, labeled break, labeled continue, return or goto statement throws aParallelForeachError.
In the case of non-random access ranges, parallel foreach buffers lazily to an array of sizeworkUnitSize before executing the parallel portion of the loop. The exception is that, if a parallel foreach is executed over a range returned byasyncBuf ormap, the copying is elided and the buffers are simply swapped. In this caseworkUnitSize is ignored and the work unit size is set to the buffer size ofrange.
A memory barrier is guaranteed to be executed on exit from the loop, so that results produced by all threads are visible in the calling thread.
Exception Handling:
When at least one exception is thrown from inside a parallel foreach loop, the submission of additionalTask objects is terminated as soon as possible, in a non-deterministic manner. All executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown by any work unit are chained usingThrowable.next and rethrown. The order of the exception chaining is non-deterministic.

templateamap(functions...)
autoamap(Args...)(Argsargs)
if (isRandomAccessRange!(Args[0]));
Eager parallel map. The eagerness of this function means it has less overhead than the lazily evaluatedTaskPool.map and should be preferred where the memory requirements of eagerness are acceptable.functions are the functions to be evaluated, passed as template alias parameters in a style similar tostd.algorithm.iteration.map. The first argument must be a random access range. For performance reasons, amap will assume the range elements have not yet been initialized. Elements will be overwritten without calling a destructor nor doing an assignment. As such, the range must not contain meaningful data: either un-initialized objects, or objects in their.init state.
auto numbers = iota(100_000_000.0);// Find the square roots of numbers.//// Timings on an Athlon 64 X2 dual core machine://// Parallel eager map:                   0.802 s// Equivalent serial implementation:     1.768 sauto squareRoots = taskPool.amap!sqrt(numbers);
Immediately after the range argument, an optional work unit size argument may be provided. Work units as used byamap are identical to those defined for parallel foreach. If no work unit size is provided, the default work unit size is used.
// Same thing, but make work unit size 100.auto squareRoots = taskPool.amap!sqrt(numbers, 100);
An output range for returning the results may be provided as the last argument. If one is not provided, an array of the proper type will be allocated on the garbage collected heap. If one is provided, it must be a random access range with assignable elements, must have reference semantics with respect to assignment to its elements, and must have the same length as the input range. Writing to adjacent elements from different threads must be safe.
// Same thing, but explicitly allocate an array// to return the results in.  The element type// of the array may be either the exact type// returned by functions or an implicit conversion// target.auto squareRoots =newfloat[numbers.length];taskPool.amap!sqrt(numbers, squareRoots);// Multiple functions, explicit output range, and// explicit work unit size.auto results =new Tuple!(float,real)[numbers.length];taskPool.amap!(sqrt, log)(numbers, 100, results);

NoteA memory barrier is guaranteed to be executed after all results are written but before returning so that results produced by all threads are visible in the calling thread.

TipsTo perform the mapping operation in place, provide the same range for the input and output range.

To parallelize the copying of a range with expensive to evaluate elements to an array, pass an identity function (a function that just returns whatever argument is provided to it) toamap.
Exception Handling:
When at least one exception is thrown from inside the map functions, the submission of additionalTask objects is terminated as soon as possible, in a non-deterministic manner. All currently executing or enqueued work units are allowed to complete. Then, all exceptions that were thrown from any work unit are chained usingThrowable.next and rethrown. The order of the exception chaining is non-deterministic.

templatemap(functions...)
automap(S)(Ssource, size_tbufSize = 100, size_tworkUnitSize = size_t.max)
if (isInputRange!S);
A semi-lazy parallel map that can be used for pipelining. The map functions are evaluated for the firstbufSize elements and stored in a buffer and made available topopFront. Meanwhile, in the background a second buffer of the same size is filled. When the first buffer is exhausted, it is swapped with the second buffer and filled while the values from what was originally the second buffer are read. This implementation allows for elements to be written to the buffer without the need for atomic operations or synchronization for each write, and enables the mapping function to be evaluated efficiently in parallel.
map has more overhead than the simpler procedure used byamap but avoids the need to keep all results in memory simultaneously and works with non-random access ranges.
Parameters:
SsourceTheinput range to be mapped. Ifsource is not random access it will be lazily buffered to an array of sizebufSize before the map function is evaluated. (For an exception to this rule, see Notes.)
size_tbufSizeThe size of the buffer to store the evaluated elements.
size_tworkUnitSizeThe number of elements to evaluate in a singleTask. Must be less than or equal tobufSize, and should be a fraction ofbufSize such that all worker threads can be used. If the default of size_t.max is used, workUnitSize will be set to the pool-wide default.
Returns:
An input range representing the results of the map. This range has a length iffsource has a length.

NotesIf a range returned bymap orasyncBuf is used as an input tomap, then as an optimization the copying from the output buffer of the first range to the input buffer of the second range is elided, even though the ranges returned bymap andasyncBuf are non-random access ranges. This means that thebufSize parameter passed to the current call tomap will be ignored and the size of the buffer will be the buffer size ofsource.

Example

// Pipeline reading a file, converting each line// to a number, taking the logarithms of the numbers,// and performing the additions necessary to find// the sum of the logarithms.auto lineRange = File("numberList.txt").byLine();auto dupedLines = std.algorithm.map!"a.idup"(lineRange);auto nums = taskPool.map!(to!double)(dupedLines);auto logs = taskPool.map!log10(nums);double sum = 0;foreach (elem; logs){    sum += elem;}
Exception Handling:
Any exceptions thrown while iterating oversource or computing the map function are re-thrown on a call topopFront or, if thrown during construction, are simply allowed to propagate to the caller. In the case of exceptions thrown while computing the map function, the exceptions are chained as inTaskPool.amap.

autoasyncBuf(S)(Ssource, size_tbufSize = 100)
if (isInputRange!S);
Given asource range that is expensive to iterate over, returns aninput range that asynchronously buffers the contents ofsource into a buffer ofbufSize elements in a worker thread, while making previously buffered elements from a second buffer, also of sizebufSize, available via the range interface of the returned object. The returned range has a length iffhasLength!S.asyncBuf is useful, for example, when performing expensive operations on the elements of ranges that represent data on a disk or network.

Example

import std.conv, std.stdio;void main(){// Fetch lines of a file in a background thread// while processing previously fetched lines,// dealing with byLine's buffer recycling by// eagerly duplicating every line.auto lines = File("foo.txt").byLine();auto duped = std.algorithm.map!"a.idup"(lines);// Fetch more lines in the background while we// process the lines already read into memory// into a matrix of doubles.double[][] matrix;auto asyncReader = taskPool.asyncBuf(duped);foreach (line; asyncReader)    {auto ls = line.split("\t");        matrix ~= to!(double[])(ls);    }}
Exception Handling:
Any exceptions thrown while iterating oversource are re-thrown on a call topopFront or, if thrown during construction, simply allowed to propagate to the caller.

autoasyncBuf(C1, C2)(C1next, C2empty, size_tinitialBufSize = 0, size_tnBuffers = 100)
if (is(typeof(C2.init()) : bool) && (Parameters!C1.length == 1) && (Parameters!C2.length == 0) && isArray!(Parameters!C1[0]));
Given a callable objectnext that writes to a user-provided buffer and a second callable objectempty that determines whether more data is available to write vianext, returns an input range that asynchronously callsnext with a set of sizenBuffers of buffers and makes the results available in the order they were obtained via the input range interface of the returned object. Similarly to the input range overload ofasyncBuf, the first half of the buffers are made available via the range interface while the second half are filled and vice-versa.
Parameters:
C1nextA callable object that takes a single argument that must be an array with mutable elements. When called,next writes data to the array provided by the caller.
C2emptyA callable object that takes no arguments and returns a type implicitly convertible tobool. This is used to signify that no more data is available to be obtained by callingnext.
size_tinitialBufSizeThe initial size of each buffer. Ifnext takes its array by reference, it may resize the buffers.
size_tnBuffersThe number of buffers to cycle through when callingnext.

Example

// Fetch lines of a file in a background// thread while processing previously fetched// lines, without duplicating any lines.auto file = File("foo.txt");voidnext(refchar[] buf){    file.readln(buf);}// Fetch more lines in the background while we// process the lines already read into memory// into a matrix of doubles.double[][] matrix;auto asyncReader = taskPool.asyncBuf(&next, &file.eof);foreach (line; asyncReader){auto ls = line.split("\t");    matrix ~= to!(double[])(ls);}
Exception Handling:
Any exceptions thrown while iterating overrange are re-thrown on a call topopFront.

WarningUsing the range returned by this function in a parallel foreach loop will not work because buffers may be overwritten while the task that processes them is in queue. This is checked for at compile time and will result in a static assertion failure.

templatereduce(functions...)
autoreduce(Args...)(Argsargs);
Parallel reduce on a random access range. Except as otherwise noted, usage is similar tostd.algorithm.iteration.reduce. There is alsofold which does the same thing with a different parameter order.
This function works by splitting the range to be reduced into work units, which are slices to be reduced in parallel. Once the results from all work units are computed, a final serial reduction is performed on these results to compute the final answer. Therefore, care must be taken to choose the seed value appropriately.
Because the reduction is being performed in parallel,functions must be associative. For notational simplicity, let # be an infix operator representingfunctions. Then, (a # b) # c must equal a # (b # c). Floating point addition is not associative even though addition in exact arithmetic is. Summing floating point numbers using this function may give different results than summing serially. However, for many practical purposes floating point addition can be treated as associative.
Note that, sincefunctions are assumed to be associative, additional optimizations are made to the serial portion of the reduction algorithm. These take advantage of the instruction level parallelism of modern CPUs, in addition to the thread-level parallelism that the rest of this module exploits. This can lead to better than linear speedups relative tostd.algorithm.iteration.reduce, especially for fine-grained benchmarks like dot products.
An explicit seed may be provided as the first argument. If provided, it is used as the seed for all work units and for the final reduction of results from all work units. Therefore, if it is not the identity value for the operation being performed, results may differ from those generated bystd.algorithm.iteration.reduce or depending on how many work units are used. The next argument must be the range to be reduced.
// Find the sum of squares of a range in parallel, using// an explicit seed.//// Timings on an Athlon 64 X2 dual core machine://// Parallel reduce:                     72 milliseconds// Using std.algorithm.reduce instead:  181 millisecondsauto nums = iota(10_000_000.0f);auto sumSquares = taskPool.reduce!"a + b"(    0.0, std.algorithm.map!"a * a"(nums));
If no explicit seed is provided, the first element of each work unit is used as a seed. For the final reduction, the result from the first work unit is used as the seed.
// Find the sum of a range in parallel, using the first// element of each work unit as the seed.auto sum = taskPool.reduce!"a + b"(nums);
An explicit work unit size may be specified as the last argument. Specifying too small a work unit size will effectively serialize the reduction, as the final reduction of the result of each work unit will dominate computation time. IfTaskPool.size for this instance is zero, this parameter is ignored and one work unit is used.
// Use a work unit size of 100.auto sum2 = taskPool.reduce!"a + b"(nums, 100);// Work unit size of 100 and explicit seed.auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
Parallel reduce supports multiple functions, likestd.algorithm.reduce.
// Find both the min and max of nums.auto minMax = taskPool.reduce!(min, max)(nums);assert(minMax[0] ==reduce!min(nums));assert(minMax[1] ==reduce!max(nums));
Exception Handling:
After this function is finished executing, any exceptions thrown are chained together viaThrowable.next and rethrown. The chaining order is non-deterministic.
See Also:
fold is functionally equivalent toreduce except the range parameter comes first and there is no need to usetuple for multiple seeds.
templatefold(functions...)
autofold(Args...)(Argsargs);
Implements the homonym function (also known asaccumulate,compress,inject, orfoldl) present in various programming languages of functional flavor.
fold is functionally equivalent toreduce except the range parameter comes first and there is no need to usetuple for multiple seeds.
There may be one or more callable entities (functions argument) to apply.
Parameters:
ArgsargsJust the range to fold over; or the range and one seed per function; or the range, one seed per function, and the work unit size
Returns:
The accumulated result as a single value for single function and as a tuple of values for multiple functions
See Also:
Similar tostd.algorithm.iteration.fold,fold is a wrapper aroundreduce.

Example

staticint adder(int a,int b){return a + b;}staticint multiplier(int a,int b){return a * b;}// Just the rangeauto x = taskPool.fold!adder([1, 2, 3, 4]);assert(x == 10);// The range and the seeds (0 and 1 below; also note multiple// functions in this example)auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);assert(y[0] == 10);assert(y[1] == 24);// The range, the seed (0), and the work unit size (20)auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);assert(z == 10);

nothrow @property @safe size_tworkerIndex() const;
Gets the index of the current thread relative to thisTaskPool. Any thread not in this pool will receive an index of 0. The worker threads in this pool receive unique indices of 1 throughthis.size.
This function is useful for maintaining worker-local resources.

Example

// Execute a loop that computes the greatest common// divisor of every number from 0 through 999 with// 42 in parallel.  Write the results out to// a set of files, one for each thread.  This allows// results to be written out without any synchronization.import std.conv, std.range, std.numeric, std.stdio;void main(){auto filesHandles =new File[taskPool.size + 1];scope(exit) {foreach (ref handle; fileHandles)        {            handle.close();        }    }foreach (i,ref handle; fileHandles)    {        handle = File("workerResults" ~ to!string(i) ~".txt");    }foreach (num; parallel(iota(1_000)))    {auto outHandle = fileHandles[taskPool.workerIndex];        outHandle.writeln(num, '\t', gcd(num, 42));    }}

structWorkerLocalStorage(T);
Struct for creating worker-local storage. Worker-local storage is thread-local storage that exists only for worker threads in a givenTaskPool plus a single thread outside the pool. It is allocated on the garbage collected heap in a way that avoids false sharing, and doesn't necessarily have global scope within any thread. It can be accessed from any worker thread in theTaskPool that created it, and one thread outside thisTaskPool. All threads outside the pool that created a given instance of worker-local storage share a single slot.
Since the underlying data for this struct is heap-allocated, this struct has reference semantics when passed between functions.
The main uses cases forWorkerLocalStorage are:
  1. Performing parallel reductions with an imperative, as opposed to functional, programming style. In this case, it's useful to treatWorkerLocalStorage as local to each thread for only the parallel portion of an algorithm.
  2. Recycling temporary buffers across iterations of a parallel foreach loop.

Example

// Calculate pi as in our synopsis example, but// use an imperative instead of a functional style.immutable n = 1_000_000_000;immutable delta = 1.0L / n;auto sums = taskPool.workerLocalStorage(0.0L);foreach (i; parallel(iota(n))){immutable x = ( i - 0.5L ) * delta;immutable toAdd = delta / ( 1.0 + x * x );    sums.get += toAdd;}// Add up the results from each worker thread.real pi = 0;foreach (threadResult; sums.toRange){    pi += 4.0L * threadResult;}

@property ref autoget(this Qualified)();
Get the current thread's instance. Returns by ref. Note that callingget from any thread outside theTaskPool that created this instance will return the same reference, so an instance of worker-local storage should only be accessed from one thread outside the pool that created it. If this rule is violated, undefined behavior will result.
If assertions are enabled andtoRange has been called, then this WorkerLocalStorage instance is no longer worker-local and an assertion failure will result when calling this method. This is not checked when assertions are disabled for performance reasons.
@property voidget(Tval);
Assign a value to the current thread's instance. This function has the same caveats as its overload.
@property WorkerLocalStorageRange!TtoRange();
Returns a range view of the values for all threads, which can be used to further process the results of each thread after running the parallel part of your algorithm. Do not use this method in the parallel portion of your algorithm.
Calling this function sets a flag indicating that this struct is no longer worker-local, and attempting to use theget method again will result in an assertion failure if assertions are enabled.
structWorkerLocalStorageRange(T);
Range primitives for worker-local storage. The purpose of this is to access results produced by each worker thread from a single thread once you are no longer using the worker-local storage from multiple threads. Do not use this struct in the parallel portion of your algorithm.
The proper way to instantiate this object is to callWorkerLocalStorage.toRange. Once instantiated, this object behaves as a finite random-access range with assignable, lvalue elements and a length equal to the number of worker threads in theTaskPool that created it plus 1.
WorkerLocalStorage!TworkerLocalStorage(T)(lazy TinitialVal = T.init);
Creates an instance of worker-local storage, initialized with a given value. The value islazy so that you can, for example, easily create one instance of a class for each worker. For usage example, see theWorkerLocalStorage struct.
@trusted voidstop();
Signals to all worker threads to terminate as soon as they are finished with their currentTask, or immediately if they are not executing aTask.Tasks that were in queue will not be executed unless a call toTask.workForce,Task.yieldForce orTask.spinForce causes them to be executed.
Use only if you have waited on everyTask and therefore know the queue is empty, or if you speculatively executed some tasks and no longer need the results.
@trusted voidfinish(boolblocking = false);
Signals worker threads to terminate when the queue becomes empty.
If blocking argument is true, wait for all worker threads to terminate before returning. This option might be used in applications where task results are never consumed-- e.g. whenTaskPool is employed as a rudimentary scheduler for tasks which communicate by means other than return values.

WarningCalling this function withblocking = true from a worker thread that is a member of the sameTaskPool thatfinish is being called on will result in a deadlock.

pure nothrow @property @safe size_tsize() const;
Returns the number of worker threads in the pool.
voidput(alias fun, Args...)(ref Task!(fun, Args)task)
if (!isSafeReturn!(typeof(task)));

voidput(alias fun, Args...)(Task!(fun, Args)*task)
if (!isSafeReturn!(typeof(*task)));
Put aTask object on the back of the task queue. TheTask object may be passed by pointer or reference.

Example

import std.file;// Create a task.auto t =task!read("foo.txt");// Add it to the queue to be executed.taskPool.put(t);

Notes@trusted overloads of this function are called forTasks ifstd.traits.hasUnsharedAliasing is false for theTask's return type or the function theTask executes ispure.Task objects that meet all other requirements specified in the@trusted overloads oftask andscopedTask may be created and executed from@safe code viaTask.executeInNewThread but not viaTaskPool.

While this function takes the address of variables that may be on the stack, some overloads are marked as @trusted.Task includes a destructor that waits for the task to complete before destroying the stack frame it is allocated on. Therefore, it is impossible for the stack frame to be destroyed before the task is complete and no longer referenced by aTaskPool.

@property @trusted boolisDaemon();

@property @trusted voidisDaemon(boolnewVal);
These properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a program from terminating as long as it has not terminated.
If anyTaskPool with non-daemon threads is active, eitherstop orfinish must be called on it before the program can terminate.
The worker treads in theTaskPool instance returned by thetaskPool property are daemon by default. The worker threads of manually instantiated task pools are non-daemon by default.

NoteFor a size zero pool, the getter arbitrarily returns true and the setter has no effect.

@property @trusted intpriority();

@property @trusted voidpriority(intnewPriority);
These functions allow getting and setting the OS scheduling priority of the worker threads in thisTaskPool. They forward tocore.thread.Thread.priority, so a given priority value here means the same thing as an identical priority value incore.thread.

NoteFor a size zero pool, the getter arbitrarily returnscore.thread.Thread.PRIORITY_MIN and the setter has no effect.

@property @trusted TaskPooltaskPool();
Returns a lazily initialized global instantiation ofTaskPool.This function can safely be called concurrently from multiple non-workerthreads. The worker threads in this pool are daemon threads, meaning that itis not necessary to callTaskPool.stop orTaskPool.finish beforeterminating the main thread.
@property @trusted uintdefaultPoolThreads();

@property @trusted voiddefaultPoolThreads(uintnewVal);
These properties get and set the number of worker threads in theTaskPoolinstance returned bytaskPool. The default value istotalCPUs - 1.Calling the setter after the first call totaskPool does not changesnumber of worker threads in the instance returned bytaskPool.
ParallelForeach!Rparallel(R)(Rrange);

ParallelForeach!Rparallel(R)(Rrange, size_tworkUnitSize);
Convenience functions that forwards totaskPool.parallel. Thepurpose of these is to make parallel foreach less verbose and morereadable.

Example

// Find the logarithm of every number from// 1 to 1_000_000 in parallel, using the// default TaskPool instance.auto logs =newdouble[1_000_000];foreach (i,ref elem;parallel(logs)){    elem = log(i + 1.0);}

Copyright © 1999-2026 by theD Language Foundation | Page generated byDdoc on Fri Feb 20 17:58:46 2026

[8]ページ先頭

©2009-2026 Movatter.jp