Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit834eb27

Browse files
committed
main
1 parentfb9f61a commit834eb27

File tree

2 files changed

+274
-1
lines changed

2 files changed

+274
-1
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
#Parallel Loop
2+
3+
`Parallel` static class provides utilities based on`Task` to perform parallel enumerations, all parallel operation are shipped with a`Task`
4+
5+
-`Parallel.For`: range based parallel enumerations, an simulation of`for` statement
6+
-`Parallel.ForEach`: parallel enumerations for`IEnumerable` and`IAsyncEnumerable`
7+
- async counterpart of`For` and`ForEach`
8+
- optionally run with a`ParallelOptions`: to specify cancellation token, paralleism degree and task scheduler.
9+
- access state of entire loop by a`ParallelLoopState` parameter in callback.
10+
11+
Additionally an`Invoke` exists to run action in parallel.
12+
13+
-`Parallel.Invoke`: invoke multiple actions in parallel
14+
15+
>[!NOTE]
16+
>Each non-async method from`Parallel` are blocking operations that would block the thread until all tasks were terminated.
17+
18+
##For
19+
20+
```cs
21+
varfiles=Directory.GetFiles(@"C:/Users/User/Projects/nix-config","*",SearchOption.AllDirectories);
22+
23+
longtotalSize=0;
24+
25+
Parallel.For(0,files.Length,idx=> {
26+
FileInfoinfo=new(files[idx]);
27+
Interlocked.Add(reftotalSize,info.Length);// [!code highlight]
28+
});
29+
30+
Console.WriteLine(totalSize);
31+
```
32+
33+
##ForEach
34+
35+
```cs
36+
string[]files=Directory.GetFiles(@"~/projects/","*",SearchOption.AllDirectories);
37+
38+
longtotalSize=0;
39+
40+
Parallel.ForEach(files,f=> {
41+
FileInfoinfo=new(f);
42+
Interlocked.Add(reftotalSize,info.Length);// [!code highlight]
43+
});
44+
45+
Console.WriteLine(totalSize);
46+
```
47+
48+
###Enumerate by Step
49+
50+
`Parallel.For` does not provide an overload to skip a count on each iteration. But it could be achieved by using a iterator method.
51+
52+
```cs
53+
int[]numbers= [..Enumerable.Range(1,10)];
54+
55+
Parallel.ForEach(Range(1,numbers.Length ,2),idx=> {
56+
_=numbers[idx];// [!code highlight]
57+
});
58+
59+
staticIEnumerable<int>Range(intstart,intend,intstep) {// [!code highlight]
60+
for (inti=start;i<end;i+=step) {// [!code highlight]
61+
yieldreturni;// [!code highlight]
62+
}// [!code highlight]
63+
}// [!code highlight]
64+
```
65+
66+
##Break Parallel Loop
67+
68+
Parallel loop methods provides overloads supports extra parameter typed as`ParallelLoopState` for the callback to describe the state of the iterations.
69+
The state could control the termination of iterations, but in a different manner since they're parallel.
70+
Each iteration would start when the scheduler has enough places to activate the tasks, the remaining would still have to wait.
71+
72+
-`ParallelLoopState.Stop()`:
73+
- Any iteration that hasn't started yet will not be scheduled.
74+
- Any iteration that is already running will continue to completion.
75+
-**Does not terminate current thread**
76+
77+
-`ParallelLoopState.Break()`:
78+
- Any iteration that hasn't started yet(*except the ones with index less than current index*) will not be scheduled.
79+
- Any iteration that is already running will continue to completion.
80+
-**Does not terminate current thread**
81+
82+
```cs
83+
Parallel.ForEach(
84+
Enumerable.Range(1,2_000_000),// would all that many iterations be started? // [!code highlight]
85+
(n,state)=> {
86+
Console.WriteLine(n);
87+
// let's break on a condition that would hit for real quick
88+
// so you would see only few iterations were started
89+
if (int.IsOddInteger(n)) {// [!code highlight]
90+
state.Stop();// [!code highlight]
91+
}// [!code highlight]
92+
}
93+
);
94+
```
95+
96+
It's hard to exemplify what`Break` does in a concurrent context.
97+
98+
```cs
99+
Parallel.ForEach(
100+
Enumerable.Range(1,2_000_000),// would all that many iterations be started? // [!code highlight]
101+
(n,state)=> {
102+
// let's break on a condition that would hit for real quick
103+
// so you would see only few iterations were started
104+
if (n==123) {
105+
state.Break();// [!code highlight]
106+
}
107+
108+
Console.WriteLine(n);// would still prints 123 after Break() // [!code highlight]
109+
}
110+
);
111+
```
112+
113+
You could examine that the`Break` does not terminate the current thread by
114+
115+
```ps1
116+
dotnet run | sls \b123\b
117+
```
118+
119+
>[!NOTE]
120+
>`ShouldExitCurrentIteration` would be true after`Stop()` or`Break()` or any exception was thrown.
121+
122+
>[!TIP]
123+
>Additionally you could use`IsStopped` and`IsExceptional` to coordinate in other running iterations when`Stop()` was called or any exception was thrown from any iteration.
124+
125+
##Exception Handling
126+
127+
Any exception from any iteration would break all other iterations not started yet, and terminate the loop**as soon as all currently running iterations finish.**
128+
129+
Since`Parallel` utils are synchronous and blocking,`AggregateException` could be caught from it. Each iteration could possibly push exceptions to`AggregateException.InnerExceptions`.
130+
131+
```cs
132+
try {
133+
Parallel.For(1,10_000_000, (n,state)=> {
134+
Console.WriteLine(n);
135+
136+
if (int.IsOddInteger(n))
137+
thrownewException();// multiple thread would throw this
138+
});
139+
}catch (AggregateExceptionex) {
140+
ex.Handle(iex=> {
141+
Console.WriteLine(iex.Message);// write this for multiple times for thrown from multiple threads
142+
returntrue;
143+
});
144+
}
145+
146+
// 9166664
147+
// 833334
148+
// 9999997
149+
// 4166666
150+
// Exception of type 'System.Exception' was thrown.
151+
// Exception of type 'System.Exception' was thrown.
152+
// Exception of type 'System.Exception' was thrown.
153+
// Exception of type 'System.Exception' was thrown.
154+
// Exception of type 'System.Exception' was thrown.
155+
```
156+
157+
###Cancellation is Unique
158+
159+
Cancellation on a parallel loop is unique because it is dedicatedly to cancel the entire loop, not specific running thread.
160+
And the cancellation should only be triggered as if for once and**terminate all iterations not matter they're running or not**.
161+
So expectation made the runtime to propagate`OperationCancelledException` thrown by`token.ThrowIfCancellationRequested`**directly** instead of wrapping it inside a`AggregateException` when the**cancellation is succeeded**.
162+
163+
>[!NOTE]
164+
>Only a succeeded cancellation would propagate`OperationCanceledException` directly, or it would be wrapped inside`AggregateException`.
165+
166+
```cs
167+
CancellationTokenSourcects=new(millisecondsDelay:2000);
168+
169+
try {
170+
Parallel.For(
171+
0,
172+
10,
173+
newParallelOptions() {CancellationToken=cts.Token },
174+
_=> {// [!code highlight]
175+
while (true)// [!code highlight]
176+
cts.Token.ThrowIfCancellationRequested();// [!code highlight]
177+
}
178+
);// [!code highlight]
179+
}catch (AggregateExceptionex) {
180+
ex.Handle(iex=> {
181+
if (iexisOperationCanceledException) {
182+
// not reachable
183+
Console.WriteLine($"{nameof(OperationCanceledException)} was caught by {nameof(AggregateException)}");
184+
returntrue;
185+
}
186+
returnfalse;
187+
});
188+
}catch (OperationCanceledException) {// [!code highlight]
189+
// would hit here since cancellation should be succeeded // [!code highlight]
190+
Console.WriteLine($"{nameof(OperationCanceledException)} was propagated directly");// [!code highlight]
191+
}// [!code highlight]
192+
```
193+
194+
##Performance Enhancement
195+
196+
###Thread-Local Storage
197+
198+
If one could calculate partially on**each worker thread**(the thread manages a batch of iterations), and finally add up all partial results to the target variable, it could be much more efficient than contenting one single variable from threads.
199+
Such approach is call**Thread-Local Storage**, a dedicated storage target for each worker thread.
200+
The design is pretty similar to`Enumerable.Aggregate` that folds calculation base on a given initial value on each iteration.
201+
202+
```cs
203+
string[]files=Directory.GetFiles(@"C:/Users/User/Projects/nix-config","*",SearchOption.AllDirectories);
204+
205+
longsize=0L;
206+
// calculate file size using thread local storage
207+
// to be more efficient
208+
Parallel.ForEach(
209+
source:files,
210+
localInit: ()=>0L,// initial value for the thread local storage // [!code highlight]
211+
body: (f,state,sum)=> {// just like a Aggregate but with extra state // [!code highlight]
212+
returnsum+newFileInfo(f).Length;// [!code highlight]
213+
},// [!code highlight]
214+
// add up to target variable when all iterations of a worker thread were finished
215+
localFinally:sum=>Interlocked.Add(refsize,sum)// [!code highlight]
216+
);
217+
218+
Console.WriteLine(size);
219+
```
220+
221+
###Partitioning
222+
223+
Partitioning is a trade-off solution when**invoking callback delegates in parallel loop is way too expensive** and**the operation within the delegate body is relatively fast enough**.
224+
So one can partition items from source with specified count into**ranges** and process each range**within a same thread**(because each operation is fast enough), so this reduces the cost of involing delegate callback by reducing the thread count started by the loop.
225+
226+
>[!NOTE]
227+
>`Partitioner` requires collections**with indexer** to work with, it's the only way to represent a range.
228+
229+
```cs
230+
// calculating sum of a large array is a good example for partitioning
231+
// for it has simple operation on adding up
232+
// and to avoid callback on each iteration
233+
// optionally you could avoid resource contention by Thread-Local storage
234+
235+
int[]source=Enumerable.Range(1,1000*1000).ToArray();
236+
237+
varpartition=Partitioner.Create(0,source.Length);// auto slice ranges from source // [!code highlight]
238+
239+
longsumOfArray=0L;
240+
241+
Parallel.ForEach(
242+
partition,// iterate on ranges instead // [!code highlight]
243+
()=>0L,
244+
(range,_,sum)=> {
245+
var (start,end)=range;// unpack the tuple // [!code highlight]
246+
for (inti=start;i<end;i++) {
247+
sum=checked(sum+source[i]);
248+
}
249+
returnsum;
250+
},
251+
sum=>Interlocked.Add(refsumOfArray,sum)
252+
);
253+
254+
Console.WriteLine(sumOfArray);
255+
256+
// you can direct sum this using linq // [!code error]
257+
// because it returns int which might overflow for such a large // [!code error]
258+
Console.WriteLine(source.Sum()isint);// System.OverflowException // [!code error]
259+
```
260+
261+
##Invoke
262+
263+
`Parallel.Invoke` is not really a loop, but I can't find a appropriate place to introduce it.
264+
It simply run multiple actions in a parallel manner as an blocking operation, no async counterpart exist.
265+
266+
```cs
267+
// blocking operation
268+
Parallel.Invoke(
269+
()=>Console.WriteLine(1),
270+
()=>Console.WriteLine(2),
271+
()=>Console.WriteLine(3),
272+
()=>Console.WriteLine(4)
273+
);// order is not guaranteed
274+
```

‎docs/document/Modern CSharp/docs/Parallel Programming/Synchronization/5.Thread Coordination Primitives.md‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ int shared = 0;
144144
145145
vartasks=Enumerable.Range(1,100).Select(n=> {
146146
returnTask.Run(()=> {
147-
148147
semaphore.Wait();// would block when count is 0 // [!code highlight]
149148
150149
Thread.Sleep(1000);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp