|
8 | 8 | usingSystem.Threading; |
9 | 9 | usingSystem.Threading.Tasks; |
10 | 10 | usingMicrosoft.Win32.SafeHandles; |
| 11 | +usingSystem.Runtime.CompilerServices; |
11 | 12 |
|
12 | 13 | /* |
13 | 14 | * Win32FileStream supports different modes of accessing the disk - async mode |
@@ -1684,6 +1685,299 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle |
1684 | 1685 | returnerrorCode; |
1685 | 1686 | } |
1686 | 1687 |
|
| 1688 | +publicoverrideTaskCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken) |
| 1689 | +{ |
| 1690 | +// Validate arguments as would the base implementation |
| 1691 | +if(destination==null)thrownewArgumentNullException(nameof(destination)); |
| 1692 | +if(bufferSize<=0)thrownewArgumentOutOfRangeException(nameof(bufferSize),SR.ArgumentOutOfRange_NeedPosNum); |
| 1693 | +boolparentCanRead=_parent.CanRead; |
| 1694 | +if(!parentCanRead&&!_parent.CanWrite)thrownewObjectDisposedException(null,SR.ObjectDisposed_StreamClosed); |
| 1695 | +booldestinationCanWrite=destination.CanWrite; |
| 1696 | +if(!destination.CanRead&&!destinationCanWrite)thrownewObjectDisposedException(nameof(destination),SR.ObjectDisposed_StreamClosed); |
| 1697 | +if(!parentCanRead)thrownewNotSupportedException(SR.NotSupported_UnreadableStream); |
| 1698 | +if(!destinationCanWrite)thrownewNotSupportedException(SR.NotSupported_UnwritableStream); |
| 1699 | +if(_handle.IsClosed)throwError.GetFileNotOpen(); |
| 1700 | + |
| 1701 | +// Bail early for cancellation if cancellation has been requested |
| 1702 | +if(cancellationToken.IsCancellationRequested) |
| 1703 | +{ |
| 1704 | +returnTask.FromCanceled<int>(cancellationToken); |
| 1705 | +} |
| 1706 | + |
| 1707 | +// Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync |
| 1708 | +Debug.Assert((_readPos==0&&_readLen==0&&_writePos>=0)||(_writePos==0&&_readPos<=_readLen),"We're either reading or writing, but not both."); |
| 1709 | +return_isAsync? |
| 1710 | +AsyncModeCopyToAsync(destination,bufferSize,cancellationToken): |
| 1711 | +base.CopyToAsync(destination,bufferSize,cancellationToken); |
| 1712 | +} |
| 1713 | + |
| 1714 | +privateasyncTaskAsyncModeCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken) |
| 1715 | +{ |
| 1716 | +Debug.Assert(_isAsync,"This implementation is for async mode only"); |
| 1717 | +Debug.Assert(!_handle.IsClosed,"!_handle.IsClosed"); |
| 1718 | +Debug.Assert(_parent.CanRead,"_parent.CanRead"); |
| 1719 | + |
| 1720 | +// Make sure any pending writes have been flushed before we do a read. |
| 1721 | +if(_writePos>0) |
| 1722 | +{ |
| 1723 | +awaitFlushWriteAsync(cancellationToken).ConfigureAwait(false); |
| 1724 | +} |
| 1725 | + |
| 1726 | +// Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is |
| 1727 | +// done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer. |
| 1728 | +if(_buffer!=null) |
| 1729 | +{ |
| 1730 | +intbufferedBytes=_readLen-_readPos; |
| 1731 | +if(bufferedBytes>0) |
| 1732 | +{ |
| 1733 | +awaitdestination.WriteAsync(_buffer,_readPos,bufferedBytes,cancellationToken).ConfigureAwait(false); |
| 1734 | +_readPos=_readLen=0; |
| 1735 | +} |
| 1736 | +} |
| 1737 | + |
| 1738 | +// For efficiency, we avoid creating a new task and associated state for each asynchronous read. |
| 1739 | +// Instead, we create a single reusable awaitable object that will be triggered when an await completes |
| 1740 | +// and reset before going again. |
| 1741 | +varreadAwaitable=newAsyncCopyToAwaitable(this); |
| 1742 | + |
| 1743 | +// Make sure we are reading from the position that we think we are. |
| 1744 | +// Only set the position in the awaitable if we can seek (e.g. not for pipes). |
| 1745 | +boolcanSeek=_parent.CanSeek; |
| 1746 | +if(canSeek) |
| 1747 | +{ |
| 1748 | +if(_exposedHandle)VerifyOSHandlePosition(); |
| 1749 | +readAwaitable._position=_pos; |
| 1750 | +} |
| 1751 | + |
| 1752 | +// Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use |
| 1753 | +// _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may |
| 1754 | +// actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically |
| 1755 | +// CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized. |
| 1756 | +// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that |
| 1757 | +// we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution. |
| 1758 | +byte[]copyBuffer=newbyte[bufferSize]; |
| 1759 | + |
| 1760 | +// Allocate an Overlapped we can use repeatedly for all operations |
| 1761 | +varawaitableOverlapped=newPreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback,readAwaitable,copyBuffer); |
| 1762 | +varcancellationReg=default(CancellationTokenRegistration); |
| 1763 | +try |
| 1764 | +{ |
| 1765 | +// Register for cancellation. We do this once for the whole copy operation, and just try to cancel |
| 1766 | +// whatever read operation may currently be in progress, if there is one. It's possible the cancellation |
| 1767 | +// request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested |
| 1768 | +// in the read/write copy loop. |
| 1769 | +if(cancellationToken.CanBeCanceled) |
| 1770 | +{ |
| 1771 | +cancellationReg=cancellationToken.Register(s=> |
| 1772 | +{ |
| 1773 | +varinnerAwaitable=(AsyncCopyToAwaitable)s; |
| 1774 | + unsafe |
| 1775 | +{ |
| 1776 | +lock(innerAwaitable.CancellationLock)// synchronize with cleanup of the overlapped |
| 1777 | +{ |
| 1778 | +if(innerAwaitable._nativeOverlapped!=null) |
| 1779 | +{ |
| 1780 | +// Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we |
| 1781 | +// don't want to fail the operation because we couldn't cancel it. |
| 1782 | +Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle,innerAwaitable._nativeOverlapped); |
| 1783 | +} |
| 1784 | +} |
| 1785 | +} |
| 1786 | +},readAwaitable); |
| 1787 | +} |
| 1788 | + |
| 1789 | +// Repeatedly read from this FileStream and write the results to the destination stream. |
| 1790 | +while(true) |
| 1791 | +{ |
| 1792 | +cancellationToken.ThrowIfCancellationRequested(); |
| 1793 | +readAwaitable.ResetForNextOperation(); |
| 1794 | + |
| 1795 | +try |
| 1796 | +{ |
| 1797 | +boolsynchronousSuccess; |
| 1798 | +interrorCode; |
| 1799 | + unsafe |
| 1800 | +{ |
| 1801 | +// Allocate a native overlapped for our reusable overlapped, and set position to read based on the next |
| 1802 | +// desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or |
| 1803 | +// if the stream isn't seekable.) |
| 1804 | +readAwaitable._nativeOverlapped=_handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped); |
| 1805 | +if(canSeek) |
| 1806 | +{ |
| 1807 | +readAwaitable._nativeOverlapped->OffsetLow=unchecked((int)readAwaitable._position); |
| 1808 | +readAwaitable._nativeOverlapped->OffsetHigh=(int)(readAwaitable._position>>32); |
| 1809 | +} |
| 1810 | + |
| 1811 | +// Kick off the read. |
| 1812 | +synchronousSuccess=ReadFileNative(_handle,copyBuffer,0,copyBuffer.Length,readAwaitable._nativeOverlapped,outerrorCode)>=0; |
| 1813 | +} |
| 1814 | + |
| 1815 | +// If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation. |
| 1816 | +if(!synchronousSuccess) |
| 1817 | +{ |
| 1818 | +switch(errorCode) |
| 1819 | +{ |
| 1820 | +caseERROR_IO_PENDING: |
| 1821 | +// Async operation in progress. |
| 1822 | +break; |
| 1823 | +caseERROR_BROKEN_PIPE: |
| 1824 | +caseERROR_HANDLE_EOF: |
| 1825 | +// We're at or past the end of the file, and the overlapped callback |
| 1826 | +// won't be raised in these cases. Mark it as completed so that the await |
| 1827 | +// below will see it as such. |
| 1828 | +readAwaitable.MarkCompleted(); |
| 1829 | +break; |
| 1830 | +default: |
| 1831 | +// Everything else is an error (and there won't be a callback). |
| 1832 | +throwWin32Marshal.GetExceptionForWin32Error(errorCode); |
| 1833 | +} |
| 1834 | +} |
| 1835 | + |
| 1836 | +// Wait for the async operation (which may or may not have already completed), then throw if it failed. |
| 1837 | +awaitreadAwaitable; |
| 1838 | +switch(readAwaitable._errorCode) |
| 1839 | +{ |
| 1840 | +case0:// success |
| 1841 | +Debug.Assert(readAwaitable._numBytes>=0,$"Expected non-negative numBytes, got{readAwaitable._numBytes}"); |
| 1842 | +break; |
| 1843 | +caseERROR_BROKEN_PIPE:// logically success with 0 bytes read (write end of pipe closed) |
| 1844 | +caseERROR_HANDLE_EOF:// logically success with 0 bytes read (read at end of file) |
| 1845 | +Debug.Assert(readAwaitable._numBytes==0,$"Expected 0 bytes read, got{readAwaitable._numBytes}"); |
| 1846 | +break; |
| 1847 | +caseInterop.mincore.Errors.ERROR_OPERATION_ABORTED:// canceled |
| 1848 | +thrownewOperationCanceledException(cancellationToken.IsCancellationRequested?cancellationToken:newCancellationToken(true)); |
| 1849 | +default:// error |
| 1850 | +throwWin32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode); |
| 1851 | +} |
| 1852 | + |
| 1853 | +// Successful operation. If we got zero bytes, we're done: exit the read/write loop. |
| 1854 | +// Otherwise, update the read position for next time accordingly. |
| 1855 | +if(readAwaitable._numBytes==0) |
| 1856 | +{ |
| 1857 | +break; |
| 1858 | +} |
| 1859 | +elseif(canSeek) |
| 1860 | +{ |
| 1861 | +readAwaitable._position+=(int)readAwaitable._numBytes; |
| 1862 | +} |
| 1863 | +} |
| 1864 | +finally |
| 1865 | +{ |
| 1866 | +// Free the resources for this read operation |
| 1867 | + unsafe |
| 1868 | +{ |
| 1869 | +NativeOverlapped*overlapped; |
| 1870 | +lock(readAwaitable.CancellationLock)// just an Exchange, but we need this to be synchronized with cancellation, so using the same lock |
| 1871 | +{ |
| 1872 | +overlapped=readAwaitable._nativeOverlapped; |
| 1873 | +readAwaitable._nativeOverlapped=null; |
| 1874 | +} |
| 1875 | +if(overlapped!=null) |
| 1876 | +{ |
| 1877 | +_handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped); |
| 1878 | +} |
| 1879 | +} |
| 1880 | +} |
| 1881 | + |
| 1882 | +// Write out the read data. |
| 1883 | +awaitdestination.WriteAsync(copyBuffer,0,(int)readAwaitable._numBytes,cancellationToken).ConfigureAwait(false); |
| 1884 | +} |
| 1885 | +} |
| 1886 | +finally |
| 1887 | +{ |
| 1888 | +// Cleanup from the whole copy operation |
| 1889 | +cancellationReg.Dispose(); |
| 1890 | +awaitableOverlapped.Dispose(); |
| 1891 | + |
| 1892 | +// Make sure the stream's current position reflects where we ended up |
| 1893 | +if(!_handle.IsClosed&&_parent.CanSeek) |
| 1894 | +{ |
| 1895 | +SeekCore(0,SeekOrigin.End); |
| 1896 | +} |
| 1897 | +} |
| 1898 | +} |
| 1899 | + |
| 1900 | +/// <summary>Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead.</summary> |
| 1901 | +privatesealedunsafeclassAsyncCopyToAwaitable:ICriticalNotifyCompletion |
| 1902 | +{ |
| 1903 | +/// <summary>Sentinel object used to indicate that the I/O operation has completed before being awaited.</summary> |
| 1904 | +privatereadonlystaticActions_sentinel=()=>{}; |
| 1905 | +/// <summary>Cached delegate to IOCallback.</summary> |
| 1906 | +internalstaticreadonlyIOCompletionCallbacks_callback=IOCallback; |
| 1907 | + |
| 1908 | +/// <summary>The FileStream that owns this instance.</summary> |
| 1909 | +internalreadonlyWin32FileStream_fileStream; |
| 1910 | + |
| 1911 | +/// <summary>Tracked position representing the next location from which to read.</summary> |
| 1912 | +internallong_position; |
| 1913 | +/// <summary>The current native overlapped pointer. This changes for each operation.</summary> |
| 1914 | +internalNativeOverlapped*_nativeOverlapped; |
| 1915 | +/// <summary> |
| 1916 | +/// null if the operation is still in progress, |
| 1917 | +/// s_sentinel if the I/O operation completed before the await, |
| 1918 | +/// s_callback if it completed after the await yielded. |
| 1919 | +/// </summary> |
| 1920 | +internalAction_continuation; |
| 1921 | +/// <summary>Last error code from completed operation.</summary> |
| 1922 | +internaluint_errorCode; |
| 1923 | +/// <summary>Last number of read bytes from completed operation.</summary> |
| 1924 | +internaluint_numBytes; |
| 1925 | + |
| 1926 | +/// <summary>Lock object used to protect cancellation-related access to _nativeOverlapped.</summary> |
| 1927 | +internalobjectCancellationLock=>this; |
| 1928 | + |
| 1929 | +/// <summary>Initialize the awaitable.</summary> |
| 1930 | +internalunsafeAsyncCopyToAwaitable(Win32FileStreamfileStream) |
| 1931 | +{ |
| 1932 | +_fileStream=fileStream; |
| 1933 | +} |
| 1934 | + |
| 1935 | +/// <summary>Reset state to prepare for the next read operation.</summary> |
| 1936 | +internalvoidResetForNextOperation() |
| 1937 | +{ |
| 1938 | +Debug.Assert(_position>=0,$"Expected non-negative position, got{_position}"); |
| 1939 | +_continuation=null; |
| 1940 | +_errorCode=0; |
| 1941 | +_numBytes=0; |
| 1942 | +} |
| 1943 | + |
| 1944 | +/// <summary>Overlapped callback: store the results, then invoke the continuation delegate.</summary> |
| 1945 | +internalunsafestaticvoidIOCallback(uinterrorCode,uintnumBytes,NativeOverlapped*pOVERLAP) |
| 1946 | +{ |
| 1947 | +varawaitable=(AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP); |
| 1948 | + |
| 1949 | +Debug.Assert(awaitable._continuation!=s_sentinel,"Sentinel must not have already been set as the continuation"); |
| 1950 | +awaitable._errorCode=errorCode; |
| 1951 | +awaitable._numBytes=numBytes; |
| 1952 | + |
| 1953 | +(awaitable._continuation??Interlocked.CompareExchange(refawaitable._continuation,s_sentinel,null))?.Invoke(); |
| 1954 | +} |
| 1955 | + |
| 1956 | +/// <summary> |
| 1957 | +/// Called when it's known that the I/O callback for an operation will not be invoked but we'll |
| 1958 | +/// still be awaiting the awaitable. |
| 1959 | +/// </summary> |
| 1960 | +internalvoidMarkCompleted() |
| 1961 | +{ |
| 1962 | +Debug.Assert(_continuation==null,"Expected null continuation"); |
| 1963 | +_continuation=s_sentinel; |
| 1964 | +} |
| 1965 | + |
| 1966 | +publicAsyncCopyToAwaitableGetAwaiter()=>this; |
| 1967 | +publicboolIsCompleted=>_continuation==s_sentinel; |
| 1968 | +publicvoidGetResult(){} |
| 1969 | +publicvoidOnCompleted(Actioncontinuation)=>UnsafeOnCompleted(continuation); |
| 1970 | +publicvoidUnsafeOnCompleted(Actioncontinuation) |
| 1971 | +{ |
| 1972 | +if(_continuation==s_sentinel|| |
| 1973 | +Interlocked.CompareExchange(ref_continuation,continuation,null)!=null) |
| 1974 | +{ |
| 1975 | +Debug.Assert(_continuation==s_sentinel,$"Expected continuation set to s_sentinel, got ${_continuation}"); |
| 1976 | +Task.Run(continuation); |
| 1977 | +} |
| 1978 | +} |
| 1979 | +} |
| 1980 | + |
1687 | 1981 | [System.Security.SecuritySafeCritical] |
1688 | 1982 | publicoverrideTask<int>ReadAsync(Byte[]buffer,intoffset,intcount,CancellationTokencancellationToken) |
1689 | 1983 | { |
|