|
| 1 | +// Licensed to the .NET Foundation under one or more agreements. |
| 2 | +// The .NET Foundation licenses this file to you under the MIT license. |
| 3 | + |
| 4 | +usingSystem.Buffers; |
| 5 | +usingSystem.Diagnostics; |
| 6 | +usingSystem.Runtime.InteropServices; |
| 7 | +usingSystem.Threading; |
| 8 | +usingSystem.Threading.Tasks.Sources; |
| 9 | +usingTaskSourceCodes=System.IO.Strategies.FileStreamHelpers.TaskSourceCodes; |
| 10 | + |
| 11 | +namespaceSystem.IO.Strategies |
| 12 | +{ |
| 13 | +internalsealedpartialclassAsyncWindowsFileStreamStrategy:WindowsFileStreamStrategy |
| 14 | +{ |
| 15 | +/// <summary> |
| 16 | +/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync. |
| 17 | +/// </summary> |
| 18 | +privateunsafeclassValueTaskSource:IValueTaskSource<int>,IValueTaskSource |
| 19 | +{ |
| 20 | +internalstaticreadonlyIOCompletionCallbacks_ioCallback=IOCallback; |
| 21 | + |
| 22 | +privatereadonlyAsyncWindowsFileStreamStrategy_strategy; |
| 23 | + |
| 24 | +privateManualResetValueTaskSourceCore<int>_source;// mutable struct; do not make this readonly |
| 25 | +privateNativeOverlapped*_overlapped; |
| 26 | +privateCancellationTokenRegistration_cancellationRegistration; |
| 27 | +privatelong_result;// Using long since this needs to be used in Interlocked APIs |
| 28 | +#ifDEBUG |
| 29 | +privatebool_cancellationHasBeenRegistered; |
| 30 | +#endif |
| 31 | + |
| 32 | +publicstaticValueTaskSourceCreate( |
| 33 | +AsyncWindowsFileStreamStrategystrategy, |
| 34 | +PreAllocatedOverlapped?preallocatedOverlapped, |
| 35 | +ReadOnlyMemory<byte>memory) |
| 36 | +{ |
| 37 | +// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider, |
| 38 | +// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived |
| 39 | +// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case |
| 40 | +// where the underlying memory is backed by pre-pinned buffers. |
| 41 | +returnpreallocatedOverlapped!=null&& |
| 42 | +MemoryMarshal.TryGetArray(memory,outArraySegment<byte>buffer)&& |
| 43 | +preallocatedOverlapped.IsUserObject(buffer.Array)? |
| 44 | +newValueTaskSource(strategy,preallocatedOverlapped,buffer.Array): |
| 45 | +newMemoryValueTaskSource(strategy,memory); |
| 46 | +} |
| 47 | + |
| 48 | +protectedValueTaskSource( |
| 49 | +AsyncWindowsFileStreamStrategystrategy, |
| 50 | +PreAllocatedOverlapped?preallocatedOverlapped, |
| 51 | +byte[]?bytes) |
| 52 | +{ |
| 53 | +_strategy=strategy; |
| 54 | +_result=TaskSourceCodes.NoResult; |
| 55 | + |
| 56 | +_source=default; |
| 57 | +_source.RunContinuationsAsynchronously=true; |
| 58 | + |
| 59 | +_overlapped=bytes!=null&& |
| 60 | +_strategy.CompareExchangeCurrentOverlappedOwner(this,null)==null? |
| 61 | +_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!):// allocated when buffer was created, and buffer is non-null |
| 62 | +_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback,this,bytes); |
| 63 | + |
| 64 | +Debug.Assert(_overlapped!=null,"AllocateNativeOverlapped returned null"); |
| 65 | +} |
| 66 | + |
| 67 | +internalNativeOverlapped*Overlapped=>_overlapped; |
| 68 | +publicValueTaskSourceStatusGetStatus(shorttoken)=>_source.GetStatus(token); |
| 69 | +publicvoidOnCompleted(Action<object?>continuation,object?state,shorttoken,ValueTaskSourceOnCompletedFlagsflags)=>_source.OnCompleted(continuation,state,token,flags); |
| 70 | +voidIValueTaskSource.GetResult(shorttoken)=>_source.GetResult(token); |
| 71 | +intIValueTaskSource<int>.GetResult(shorttoken)=>_source.GetResult(token); |
| 72 | +internalshortVersion=>_source.Version; |
| 73 | + |
| 74 | +internalvoidRegisterForCancellation(CancellationTokencancellationToken) |
| 75 | +{ |
| 76 | +#ifDEBUG |
| 77 | +Debug.Assert(cancellationToken.CanBeCanceled); |
| 78 | +Debug.Assert(!_cancellationHasBeenRegistered,"Cannot register for cancellation twice"); |
| 79 | +_cancellationHasBeenRegistered=true; |
| 80 | +#endif |
| 81 | + |
| 82 | +// Quick check to make sure the IO hasn't completed |
| 83 | +if(_overlapped!=null) |
| 84 | +{ |
| 85 | +// Register the cancellation only if the IO hasn't completed |
| 86 | +longpackedResult=Interlocked.CompareExchange(ref_result,TaskSourceCodes.RegisteringCancellation,TaskSourceCodes.NoResult); |
| 87 | +if(packedResult==TaskSourceCodes.NoResult) |
| 88 | +{ |
| 89 | +_cancellationRegistration=cancellationToken.UnsafeRegister((s,token)=>Cancel(token),this); |
| 90 | + |
| 91 | +// Switch the result, just in case IO completed while we were setting the registration |
| 92 | +packedResult=Interlocked.Exchange(ref_result,TaskSourceCodes.NoResult); |
| 93 | +} |
| 94 | +elseif(packedResult!=TaskSourceCodes.CompletedCallback) |
| 95 | +{ |
| 96 | +// Failed to set the result, IO is in the process of completing |
| 97 | +// Attempt to take the packed result |
| 98 | +packedResult=Interlocked.Exchange(ref_result,TaskSourceCodes.NoResult); |
| 99 | +} |
| 100 | + |
| 101 | +// If we have a callback that needs to be completed |
| 102 | +if((packedResult!=TaskSourceCodes.NoResult)&&(packedResult!=TaskSourceCodes.CompletedCallback)&&(packedResult!=TaskSourceCodes.RegisteringCancellation)) |
| 103 | +{ |
| 104 | +CompleteCallback((ulong)packedResult); |
| 105 | +} |
| 106 | +} |
| 107 | +} |
| 108 | + |
| 109 | +internalvirtualvoidReleaseNativeResource() |
| 110 | +{ |
| 111 | +// Ensure that cancellation has been completed and cleaned up. |
| 112 | +_cancellationRegistration.Dispose(); |
| 113 | + |
| 114 | +// Free the overlapped. |
| 115 | +// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory |
| 116 | +// (this is why we disposed the registration above). |
| 117 | +if(_overlapped!=null) |
| 118 | +{ |
| 119 | +_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped); |
| 120 | +_overlapped=null; |
| 121 | +} |
| 122 | + |
| 123 | +// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with). |
| 124 | +// Only one operation at a time is eligible to use the preallocated overlapped |
| 125 | +_strategy.CompareExchangeCurrentOverlappedOwner(null,this); |
| 126 | +} |
| 127 | + |
| 128 | +privatestaticvoidIOCallback(uinterrorCode,uintnumBytes,NativeOverlapped*pOverlapped) |
| 129 | +{ |
| 130 | +// Extract the AwaitableProvider from the overlapped. The state in the overlapped |
| 131 | +// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used), |
| 132 | +// in which case the operation being completed is its _currentOverlappedOwner, or it'll |
| 133 | +// be directly the AwaitableProvider that's completing (in the case where the preallocated |
| 134 | +// overlapped was already in use by another operation). |
| 135 | +object?state=ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped); |
| 136 | +Debug.Assert(stateisAsyncWindowsFileStreamStrategy orValueTaskSource); |
| 137 | +ValueTaskSourcevalueTaskSource=stateswitch |
| 138 | +{ |
| 139 | +AsyncWindowsFileStreamStrategystrategy=>strategy._currentOverlappedOwner!,// must be owned |
| 140 | + _=>(ValueTaskSource)state |
| 141 | +}; |
| 142 | +Debug.Assert(valueTaskSource!=null); |
| 143 | +Debug.Assert(valueTaskSource._overlapped==pOverlapped,"Overlaps don't match"); |
| 144 | + |
| 145 | +// Handle reading from & writing to closed pipes. While I'm not sure |
| 146 | +// this is entirely necessary anymore, maybe it's possible for |
| 147 | +// an async read on a pipe to be issued and then the pipe is closed, |
| 148 | +// returning this error. This may very well be necessary. |
| 149 | +ulongpackedResult; |
| 150 | +if(errorCode!=0&&errorCode!=Interop.Errors.ERROR_BROKEN_PIPE&&errorCode!=Interop.Errors.ERROR_NO_DATA) |
| 151 | +{ |
| 152 | +packedResult=((ulong)TaskSourceCodes.ResultError|errorCode); |
| 153 | +} |
| 154 | +else |
| 155 | +{ |
| 156 | +packedResult=((ulong)TaskSourceCodes.ResultSuccess|numBytes); |
| 157 | +} |
| 158 | + |
| 159 | +// Stow the result so that other threads can observe it |
| 160 | +// And, if no other thread is registering cancellation, continue |
| 161 | +if(Interlocked.Exchange(refvalueTaskSource._result,(long)packedResult)==TaskSourceCodes.NoResult) |
| 162 | +{ |
| 163 | +// Successfully set the state, attempt to take back the callback |
| 164 | +if(Interlocked.Exchange(refvalueTaskSource._result,TaskSourceCodes.CompletedCallback)!=TaskSourceCodes.NoResult) |
| 165 | +{ |
| 166 | +// Successfully got the callback, finish the callback |
| 167 | +valueTaskSource.CompleteCallback(packedResult); |
| 168 | +} |
| 169 | +// else: Some other thread stole the result, so now it is responsible to finish the callback |
| 170 | +} |
| 171 | +// else: Some other thread is registering a cancellation, so it *must* finish the callback |
| 172 | +} |
| 173 | + |
| 174 | +privatevoidCompleteCallback(ulongpackedResult) |
| 175 | +{ |
| 176 | +CancellationTokencancellationToken=_cancellationRegistration.Token; |
| 177 | + |
| 178 | +ReleaseNativeResource(); |
| 179 | + |
| 180 | +// Unpack the result and send it to the user |
| 181 | +longresult=(long)(packedResult&TaskSourceCodes.ResultMask); |
| 182 | +if(result==TaskSourceCodes.ResultError) |
| 183 | +{ |
| 184 | +interrorCode=unchecked((int)(packedResult&uint.MaxValue)); |
| 185 | +Exceptione; |
| 186 | +if(errorCode==Interop.Errors.ERROR_OPERATION_ABORTED) |
| 187 | +{ |
| 188 | +CancellationTokenct=cancellationToken.IsCancellationRequested?cancellationToken:newCancellationToken(canceled:true); |
| 189 | +e=newOperationCanceledException(ct); |
| 190 | +} |
| 191 | +else |
| 192 | +{ |
| 193 | +e=Win32Marshal.GetExceptionForWin32Error(errorCode); |
| 194 | +} |
| 195 | +e.SetCurrentStackTrace(); |
| 196 | +_source.SetException(e); |
| 197 | +} |
| 198 | +else |
| 199 | +{ |
| 200 | +Debug.Assert(result==TaskSourceCodes.ResultSuccess,"Unknown result"); |
| 201 | +_source.SetResult((int)(packedResult&uint.MaxValue)); |
| 202 | +} |
| 203 | +} |
| 204 | + |
| 205 | +privatevoidCancel(CancellationTokentoken) |
| 206 | +{ |
| 207 | +// WARNING: This may potentially be called under a lock (during cancellation registration) |
| 208 | +Debug.Assert(_overlapped!=null&&GetStatus(Version)!=ValueTaskSourceStatus.Succeeded,"IO should not have completed yet"); |
| 209 | + |
| 210 | +// If the handle is still valid, attempt to cancel the IO |
| 211 | +if(!_strategy._fileHandle.IsInvalid&& |
| 212 | +!Interop.Kernel32.CancelIoEx(_strategy._fileHandle,_overlapped)) |
| 213 | +{ |
| 214 | +interrorCode=Marshal.GetLastWin32Error(); |
| 215 | + |
| 216 | +// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel. |
| 217 | +// This probably means that the IO operation has completed. |
| 218 | +if(errorCode!=Interop.Errors.ERROR_NOT_FOUND) |
| 219 | +{ |
| 220 | +Exceptione=newOperationCanceledException(SR.OperationCanceled,Win32Marshal.GetExceptionForWin32Error(errorCode),token); |
| 221 | +e.SetCurrentStackTrace(); |
| 222 | +_source.SetException(e); |
| 223 | +} |
| 224 | +} |
| 225 | +} |
| 226 | +} |
| 227 | + |
| 228 | +/// <summary> |
| 229 | +/// Extends <see cref="ValueTaskSource"/> with to support disposing of a |
| 230 | +/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used |
| 231 | +/// when memory doesn't wrap a byte[]. |
| 232 | +/// </summary> |
| 233 | +privatesealedclassMemoryValueTaskSource:ValueTaskSource |
| 234 | +{ |
| 235 | +privateMemoryHandle_handle;// mutable struct; do not make this readonly |
| 236 | + |
| 237 | +// this type handles the pinning, so bytes are null |
| 238 | +internalunsafeMemoryValueTaskSource(AsyncWindowsFileStreamStrategystrategy,ReadOnlyMemory<byte>memory) |
| 239 | +:base(strategy,null,null)// this type handles the pinning, so null is passed for bytes to the base |
| 240 | +{ |
| 241 | +_handle=memory.Pin(); |
| 242 | +} |
| 243 | + |
| 244 | +internaloverridevoidReleaseNativeResource() |
| 245 | +{ |
| 246 | +_handle.Dispose(); |
| 247 | +base.ReleaseNativeResource(); |
| 248 | +} |
| 249 | +} |
| 250 | +} |
| 251 | +} |