|
| 1 | +#Parallel Linq |
| 2 | + |
| 3 | +*Parallel Linq* is an extension to process your query in a parallel manner and Linq style. |
| 4 | +Just like`Parallel` class,*Parallel Linq* is based on`Task` and are all blocking operations too. |
| 5 | + |
| 6 | +-`ParallelEnumerable.AsParallel`: to convert a enumerable to a`ParallelQuery<T>` |
| 7 | +- Parallel counterpart of linq operators such as`Select`,`Sum`... |
| 8 | +-`ParallelEnumerable.AsSequential`: to convert a`ParallelQuery<T>` back to normal`IEnumerable<T>` |
| 9 | +-`ParallelEnumerable.WithCancellation`: pass cancellation token |
| 10 | +-`ParallelEnumerable.WithMergeOptions`: control how to buffer the scheduled iterations |
| 11 | + |
| 12 | +##`ParallelQuery` is`IEnumerable` |
| 13 | + |
| 14 | +`ParallelQuery<T>` is a simple wrapper on`IEnumerable<T>`, some of the implementations were overridden in the class itself, the rest were implemented as extension on`ParallelEnumerable`. |
| 15 | +The compiler would choose another version of linq operator from`ParallelEnumerable` extension when the compile-time type is a`ParallelQuery<T>`. |
| 16 | + |
| 17 | +```cs |
| 18 | +publicclassParallelQuery<TSource> :ParallelQuery,IEnumerable<TSource> {/* ...*/ } |
| 19 | +``` |
| 20 | + |
| 21 | +###`AsParallel` &`AsSequential` &`AsEnumerable` |
| 22 | + |
| 23 | +-`ParallelEnumerable.AsParallel()` is for converting a enumerable to`ParallelQuery` |
| 24 | +-`ParallelEnumerable.AsSequential()` is an extension dedicated for`ParallelQuery<T>` |
| 25 | +- does not change runtime type of source |
| 26 | +- but would notify the compiler to force the source to pick general implementations from`Enumerable` extension |
| 27 | +- all subsequent operations would became sequential which is not in parallel |
| 28 | +-`ParallelEnumerable.AsEnumerable()` is a common extension on`IEnumerable<T>`, however`ParallelEnumerable.AsEnumerable` exists to unwrap the backing enumerable when working with`ParallelQuery<T>` |
| 29 | +- identical to`ParallelEnumerable.AsSequential()` |
| 30 | + |
| 31 | +>[!NOTE] |
| 32 | +>They're all deferred execution |
| 33 | +
|
| 34 | +##Preserve Ordering |
| 35 | + |
| 36 | +`AsOrdered` makes sure the subsequent operation in parallel would preseve order of the original enumerable. |
| 37 | + |
| 38 | +```cs |
| 39 | +varseq=Enumerable.Range(1,100); |
| 40 | +varordered=Enumerable.Range(1,100) |
| 41 | + .AsParallel() |
| 42 | + .AsOrdered()// [!code highlight] |
| 43 | + .Select(x=>x); |
| 44 | + |
| 45 | +Console.WriteLine(seq.SequenceEqual(ordered));// always true // [!code highlight] |
| 46 | +``` |
| 47 | + |
| 48 | +However, preseving ordering would be consuming anyway, so you could disable it when ordering does not matters anymore using`ParallelEnumerable.AsUnordered` |
| 49 | + |
| 50 | +```cs |
| 51 | +varordered=Enumerable.Range(1,100) |
| 52 | + .AsParallel() |
| 53 | + .AsOrdered()// [!code highlight] |
| 54 | + .Select(x=>x) |
| 55 | + .AsUnordered();// cancel the ordering preservation // [!code highlight] |
| 56 | +``` |
| 57 | + |
| 58 | +>[!NOTE] |
| 59 | +>See[Order Preservation](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/order-preservation-in-plinq#query-operators-and-ordering) |
| 60 | +
|
| 61 | +##Cancellation & Exception |
| 62 | + |
| 63 | +It's basically a functional version of parallel loop, so exception handling and cancellation is just as the same as`Parallel` class. |
| 64 | + |
| 65 | +- use`ParallelEnumerable.WithCancellation` to specify a cancellation token |
| 66 | +- cancellation is special so please catch it as`OperationCanceledException` |
| 67 | +- other exceptions from all iterations are just wrapped in`AggregateException` |
| 68 | +-**remember to evaluate the query** otherwise such cancellation or exception would never be triggered |
| 69 | + |
| 70 | +```cs |
| 71 | +varparallelSeq=ParallelEnumerable.Range(1,100); |
| 72 | +varcts=newCancellationTokenSource(2000); |
| 73 | + |
| 74 | +varquery=parallelSeq |
| 75 | + .WithCancellation(cts.Token) |
| 76 | + .Select(x=> { |
| 77 | +Thread.Sleep(2000); |
| 78 | +cts.Token.ThrowIfCancellationRequested();// [!code highlight] |
| 79 | +if (int.IsOddInteger(x))thrownewException("a normal exception was thrown"); |
| 80 | +returnx*x; |
| 81 | + }); |
| 82 | + |
| 83 | +try { |
| 84 | +// you must consume query // [!code highlight] |
| 85 | +_=query.ToList();// [!code highlight] |
| 86 | +}catch (AggregateExceptionex) { |
| 87 | +ex.Handle(iex=> { |
| 88 | +switch (iex) { |
| 89 | +caseException: |
| 90 | +Console.WriteLine(ex.Message); |
| 91 | +returntrue; |
| 92 | +default: |
| 93 | +returnfalse; |
| 94 | + } |
| 95 | + }); |
| 96 | +}catch (OperationCanceledException) { |
| 97 | +Console.WriteLine($"{nameof(OperationCanceledException)} was thrown"); |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +##Merge Option |
| 102 | + |
| 103 | +Parallel iterations were scheduled as groups, so buffering is enabled by default and the size of group is dependent on the system. |
| 104 | + |
| 105 | +-`ParallelMergeOptions.AutoBuffered` |
| 106 | +-`ParallelMergeOptions.Default`: alias to`AutoBuffered` |
| 107 | +-`ParallelMergeOptions.FullyBuffered`: source not available until all iteration were finished |
| 108 | +-`ParallelMergeOptions.NotBuffered`: yield item immediately whenever available |
| 109 | + |
| 110 | +:::code-group |
| 111 | + |
| 112 | +```cs[FullyBuffered] |
| 113 | +var query = ParallelEnumerable.Range(1, 100); |
| 114 | + .WithMergeOptions(ParallelMergeOptions.FullyBuffered) // [!code highlight] |
| 115 | + .Select(x => { |
| 116 | + Thread.Sleep(Random.Shared.Next(100)); |
| 117 | + Console.WriteLine("produced"); |
| 118 | + return x; |
| 119 | + }); |
| 120 | +
|
| 121 | +foreach (var _ in query) { |
| 122 | + Console.WriteLine("consumed"); |
| 123 | +} |
| 124 | +
|
| 125 | +// consume only happens after all were produced when FullyBuffered is specified |
| 126 | +// produced |
| 127 | +// produced |
| 128 | +// produced |
| 129 | +// ... |
| 130 | +// consumed |
| 131 | +// consumed |
| 132 | +// consumed |
| 133 | +``` |
| 134 | + |
| 135 | +```cs[NotBuffered] |
| 136 | +var query = ParallelEnumerable.Range(1, 100); |
| 137 | + .WithMergeOptions(ParallelMergeOptions.NotBuffered) // [!code highlight] |
| 138 | + .Select(x => { |
| 139 | + Thread.Sleep(Random.Shared.Next(100)); |
| 140 | + Console.WriteLine("produced"); |
| 141 | + return x; |
| 142 | + }); |
| 143 | +
|
| 144 | +foreach (var _ in query) { |
| 145 | + Console.WriteLine("consumed"); |
| 146 | +} |
| 147 | +
|
| 148 | +// consuming happens as long as one was available |
| 149 | +// produced |
| 150 | +// consumed |
| 151 | +// produced |
| 152 | +// consumed |
| 153 | +// produced |
| 154 | +// ... |
| 155 | +// consumed |
| 156 | +``` |
| 157 | + |
| 158 | +##Performance Enhancement |
| 159 | + |
| 160 | +###Local Storage |
| 161 | + |
| 162 | +`ParallelEnumerable.Aggregate` is exactly the role the perform local storage, the following example uses one of its most flexible overload. |
| 163 | + |
| 164 | +```cs |
| 165 | +varsize=Directory.EnumerateFiles(@"c:/Users//User/Projects/nix-config","*",SearchOption.AllDirectories) |
| 166 | + .AsParallel() |
| 167 | + .Aggregate(// [!code highlight] |
| 168 | +seed:0L,// [!code highlight] |
| 169 | +updateAccumulatorFunc: (localSum,curr)=>localSum+newFileInfo(curr).Length,// iteration // [!code highlight] |
| 170 | +combineAccumulatorsFunc: (sum,localSum)=>sum+localSum,// add up when each group was finished // [!code highlight] |
| 171 | +resultSelector:i=>i/1024D// post action to transform the result // [!code highlight] |
| 172 | + );// [!code highlight] |
| 173 | +
|
| 174 | +Console.WriteLine($"size in kb: {size}");// [!code highlight] |
| 175 | +``` |