|
8 | 8 | usingSystem.Threading; |
9 | 9 | usingSystem.Threading.Tasks; |
10 | 10 | usingMicrosoft.Win32.SafeHandles; |
| 11 | +usingSystem.Runtime.CompilerServices; |
11 | 12 |
|
12 | 13 | /* |
13 | 14 | * Win32FileStream supports different modes of accessing the disk - async mode |
@@ -1684,6 +1685,325 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle |
1684 | 1685 | returnerrorCode; |
1685 | 1686 | } |
1686 | 1687 |
|
| 1688 | +publicoverrideTaskCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken) |
| 1689 | +{ |
| 1690 | +// Validate arguments as would the base implementation |
| 1691 | +if(destination==null) |
| 1692 | +{ |
| 1693 | +thrownewArgumentNullException(nameof(destination)); |
| 1694 | +} |
| 1695 | +if(bufferSize<=0) |
| 1696 | +{ |
| 1697 | +thrownewArgumentOutOfRangeException(nameof(bufferSize),SR.ArgumentOutOfRange_NeedPosNum); |
| 1698 | +} |
| 1699 | +boolparentCanRead=_parent.CanRead; |
| 1700 | +if(!parentCanRead&&!_parent.CanWrite) |
| 1701 | +{ |
| 1702 | +thrownewObjectDisposedException(null,SR.ObjectDisposed_StreamClosed); |
| 1703 | +} |
| 1704 | +booldestinationCanWrite=destination.CanWrite; |
| 1705 | +if(!destination.CanRead&&!destinationCanWrite) |
| 1706 | +{ |
| 1707 | +thrownewObjectDisposedException(nameof(destination),SR.ObjectDisposed_StreamClosed); |
| 1708 | +} |
| 1709 | +if(!parentCanRead) |
| 1710 | +{ |
| 1711 | +thrownewNotSupportedException(SR.NotSupported_UnreadableStream); |
| 1712 | +} |
| 1713 | +if(!destinationCanWrite) |
| 1714 | +{ |
| 1715 | +thrownewNotSupportedException(SR.NotSupported_UnwritableStream); |
| 1716 | +} |
| 1717 | + |
| 1718 | +// Bail early for cancellation if cancellation has been requested |
| 1719 | +if(cancellationToken.IsCancellationRequested) |
| 1720 | +{ |
| 1721 | +returnTask.FromCanceled<int>(cancellationToken); |
| 1722 | +} |
| 1723 | + |
| 1724 | +// Fail if the file was closed |
| 1725 | +if(_handle.IsClosed) |
| 1726 | +{ |
| 1727 | +throwError.GetFileNotOpen(); |
| 1728 | +} |
| 1729 | + |
| 1730 | +// Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync |
| 1731 | +Debug.Assert((_readPos==0&&_readLen==0&&_writePos>=0)||(_writePos==0&&_readPos<=_readLen),"We're either reading or writing, but not both."); |
| 1732 | +return_isAsync? |
| 1733 | +AsyncModeCopyToAsync(destination,bufferSize,cancellationToken): |
| 1734 | +base.CopyToAsync(destination,bufferSize,cancellationToken); |
| 1735 | +} |
| 1736 | + |
| 1737 | +privateasyncTaskAsyncModeCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken) |
| 1738 | +{ |
| 1739 | +Debug.Assert(_isAsync,"This implementation is for async mode only"); |
| 1740 | +Debug.Assert(!_handle.IsClosed,"!_handle.IsClosed"); |
| 1741 | +Debug.Assert(_parent.CanRead,"_parent.CanRead"); |
| 1742 | + |
| 1743 | +// Make sure any pending writes have been flushed before we do a read. |
| 1744 | +if(_writePos>0) |
| 1745 | +{ |
| 1746 | +awaitFlushWriteAsync(cancellationToken).ConfigureAwait(false); |
| 1747 | +} |
| 1748 | + |
| 1749 | +// Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is |
| 1750 | +// done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer. |
| 1751 | +if(_buffer!=null) |
| 1752 | +{ |
| 1753 | +intbufferedBytes=_readLen-_readPos; |
| 1754 | +if(bufferedBytes>0) |
| 1755 | +{ |
| 1756 | +awaitdestination.WriteAsync(_buffer,_readPos,bufferedBytes,cancellationToken).ConfigureAwait(false); |
| 1757 | +_readPos=_readLen=0; |
| 1758 | +} |
| 1759 | +} |
| 1760 | + |
| 1761 | +// For efficiency, we avoid creating a new task and associated state for each asynchronous read. |
| 1762 | +// Instead, we create a single reusable awaitable object that will be triggered when an await completes |
| 1763 | +// and reset before going again. |
| 1764 | +varreadAwaitable=newAsyncCopyToAwaitable(this); |
| 1765 | + |
| 1766 | +// Make sure we are reading from the position that we think we are. |
| 1767 | +// Only set the position in the awaitable if we can seek (e.g. not for pipes). |
| 1768 | +boolcanSeek=_parent.CanSeek; |
| 1769 | +if(canSeek) |
| 1770 | +{ |
| 1771 | +if(_exposedHandle) |
| 1772 | +{ |
| 1773 | +VerifyOSHandlePosition(); |
| 1774 | +} |
| 1775 | +readAwaitable._position=_pos; |
| 1776 | +} |
| 1777 | + |
| 1778 | +// Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use |
| 1779 | +// _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may |
| 1780 | +// actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically |
| 1781 | +// CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized. |
| 1782 | +// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that |
| 1783 | +// we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution. |
| 1784 | +byte[]copyBuffer=newbyte[bufferSize]; |
| 1785 | + |
| 1786 | +// Allocate an Overlapped we can use repeatedly for all operations |
| 1787 | +varawaitableOverlapped=newPreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback,readAwaitable,copyBuffer); |
| 1788 | +varcancellationReg=default(CancellationTokenRegistration); |
| 1789 | +try |
| 1790 | +{ |
| 1791 | +// Register for cancellation. We do this once for the whole copy operation, and just try to cancel |
| 1792 | +// whatever read operation may currently be in progress, if there is one. It's possible the cancellation |
| 1793 | +// request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested |
| 1794 | +// in the read/write copy loop. |
| 1795 | +if(cancellationToken.CanBeCanceled) |
| 1796 | +{ |
| 1797 | +cancellationReg=cancellationToken.Register(s=> |
| 1798 | +{ |
| 1799 | +varinnerAwaitable=(AsyncCopyToAwaitable)s; |
| 1800 | + unsafe |
| 1801 | +{ |
| 1802 | +lock(innerAwaitable.CancellationLock)// synchronize with cleanup of the overlapped |
| 1803 | +{ |
| 1804 | +if(innerAwaitable._nativeOverlapped!=null) |
| 1805 | +{ |
| 1806 | +// Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we |
| 1807 | +// don't want to fail the operation because we couldn't cancel it. |
| 1808 | +Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle,innerAwaitable._nativeOverlapped); |
| 1809 | +} |
| 1810 | +} |
| 1811 | +} |
| 1812 | +},readAwaitable); |
| 1813 | +} |
| 1814 | + |
| 1815 | +// Repeatedly read from this FileStream and write the results to the destination stream. |
| 1816 | +while(true) |
| 1817 | +{ |
| 1818 | +cancellationToken.ThrowIfCancellationRequested(); |
| 1819 | +readAwaitable.ResetForNextOperation(); |
| 1820 | + |
| 1821 | +try |
| 1822 | +{ |
| 1823 | +boolsynchronousSuccess; |
| 1824 | +interrorCode; |
| 1825 | + unsafe |
| 1826 | +{ |
| 1827 | +// Allocate a native overlapped for our reusable overlapped, and set position to read based on the next |
| 1828 | +// desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or |
| 1829 | +// if the stream isn't seekable.) |
| 1830 | +readAwaitable._nativeOverlapped=_handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped); |
| 1831 | +if(canSeek) |
| 1832 | +{ |
| 1833 | +readAwaitable._nativeOverlapped->OffsetLow=unchecked((int)readAwaitable._position); |
| 1834 | +readAwaitable._nativeOverlapped->OffsetHigh=(int)(readAwaitable._position>>32); |
| 1835 | +} |
| 1836 | + |
| 1837 | +// Kick off the read. |
| 1838 | +synchronousSuccess=ReadFileNative(_handle,copyBuffer,0,copyBuffer.Length,readAwaitable._nativeOverlapped,outerrorCode)>=0; |
| 1839 | +} |
| 1840 | + |
| 1841 | +// If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation. |
| 1842 | +if(!synchronousSuccess) |
| 1843 | +{ |
| 1844 | +switch(errorCode) |
| 1845 | +{ |
| 1846 | +caseERROR_IO_PENDING: |
| 1847 | +// Async operation in progress. |
| 1848 | +break; |
| 1849 | +caseERROR_BROKEN_PIPE: |
| 1850 | +caseERROR_HANDLE_EOF: |
| 1851 | +// We're at or past the end of the file, and the overlapped callback |
| 1852 | +// won't be raised in these cases. Mark it as completed so that the await |
| 1853 | +// below will see it as such. |
| 1854 | +readAwaitable.MarkCompleted(); |
| 1855 | +break; |
| 1856 | +default: |
| 1857 | +// Everything else is an error (and there won't be a callback). |
| 1858 | +throwWin32Marshal.GetExceptionForWin32Error(errorCode); |
| 1859 | +} |
| 1860 | +} |
| 1861 | + |
| 1862 | +// Wait for the async operation (which may or may not have already completed), then throw if it failed. |
| 1863 | +awaitreadAwaitable; |
| 1864 | +switch(readAwaitable._errorCode) |
| 1865 | +{ |
| 1866 | +case0:// success |
| 1867 | +Debug.Assert(readAwaitable._numBytes>=0,$"Expected non-negative numBytes, got{readAwaitable._numBytes}"); |
| 1868 | +break; |
| 1869 | +caseERROR_BROKEN_PIPE:// logically success with 0 bytes read (write end of pipe closed) |
| 1870 | +caseERROR_HANDLE_EOF:// logically success with 0 bytes read (read at end of file) |
| 1871 | +Debug.Assert(readAwaitable._numBytes==0,$"Expected 0 bytes read, got{readAwaitable._numBytes}"); |
| 1872 | +break; |
| 1873 | +caseInterop.mincore.Errors.ERROR_OPERATION_ABORTED:// canceled |
| 1874 | +thrownewOperationCanceledException(cancellationToken.IsCancellationRequested?cancellationToken:newCancellationToken(true)); |
| 1875 | +default:// error |
| 1876 | +throwWin32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode); |
| 1877 | +} |
| 1878 | + |
| 1879 | +// Successful operation. If we got zero bytes, we're done: exit the read/write loop. |
| 1880 | +// Otherwise, update the read position for next time accordingly. |
| 1881 | +if(readAwaitable._numBytes==0) |
| 1882 | +{ |
| 1883 | +break; |
| 1884 | +} |
| 1885 | +elseif(canSeek) |
| 1886 | +{ |
| 1887 | +readAwaitable._position+=(int)readAwaitable._numBytes; |
| 1888 | +} |
| 1889 | +} |
| 1890 | +finally |
| 1891 | +{ |
| 1892 | +// Free the resources for this read operation |
| 1893 | + unsafe |
| 1894 | +{ |
| 1895 | +NativeOverlapped*overlapped; |
| 1896 | +lock(readAwaitable.CancellationLock)// just an Exchange, but we need this to be synchronized with cancellation, so using the same lock |
| 1897 | +{ |
| 1898 | +overlapped=readAwaitable._nativeOverlapped; |
| 1899 | +readAwaitable._nativeOverlapped=null; |
| 1900 | +} |
| 1901 | +if(overlapped!=null) |
| 1902 | +{ |
| 1903 | +_handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped); |
| 1904 | +} |
| 1905 | +} |
| 1906 | +} |
| 1907 | + |
| 1908 | +// Write out the read data. |
| 1909 | +awaitdestination.WriteAsync(copyBuffer,0,(int)readAwaitable._numBytes,cancellationToken).ConfigureAwait(false); |
| 1910 | +} |
| 1911 | +} |
| 1912 | +finally |
| 1913 | +{ |
| 1914 | +// Cleanup from the whole copy operation |
| 1915 | +cancellationReg.Dispose(); |
| 1916 | +awaitableOverlapped.Dispose(); |
| 1917 | + |
| 1918 | +// Make sure the stream's current position reflects where we ended up |
| 1919 | +if(!_handle.IsClosed&&_parent.CanSeek) |
| 1920 | +{ |
| 1921 | +SeekCore(0,SeekOrigin.End); |
| 1922 | +} |
| 1923 | +} |
| 1924 | +} |
| 1925 | + |
| 1926 | +/// <summary>Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead.</summary> |
| 1927 | +privatesealedunsafeclassAsyncCopyToAwaitable:ICriticalNotifyCompletion |
| 1928 | +{ |
| 1929 | +/// <summary>Sentinel object used to indicate that the I/O operation has completed before being awaited.</summary> |
| 1930 | +privatereadonlystaticActions_sentinel=()=>{}; |
| 1931 | +/// <summary>Cached delegate to IOCallback.</summary> |
| 1932 | +internalstaticreadonlyIOCompletionCallbacks_callback=IOCallback; |
| 1933 | + |
| 1934 | +/// <summary>The FileStream that owns this instance.</summary> |
| 1935 | +internalreadonlyWin32FileStream_fileStream; |
| 1936 | + |
| 1937 | +/// <summary>Tracked position representing the next location from which to read.</summary> |
| 1938 | +internallong_position; |
| 1939 | +/// <summary>The current native overlapped pointer. This changes for each operation.</summary> |
| 1940 | +internalNativeOverlapped*_nativeOverlapped; |
| 1941 | +/// <summary> |
| 1942 | +/// null if the operation is still in progress, |
| 1943 | +/// s_sentinel if the I/O operation completed before the await, |
| 1944 | +/// s_callback if it completed after the await yielded. |
| 1945 | +/// </summary> |
| 1946 | +internalAction_continuation; |
| 1947 | +/// <summary>Last error code from completed operation.</summary> |
| 1948 | +internaluint_errorCode; |
| 1949 | +/// <summary>Last number of read bytes from completed operation.</summary> |
| 1950 | +internaluint_numBytes; |
| 1951 | + |
| 1952 | +/// <summary>Lock object used to protect cancellation-related access to _nativeOverlapped.</summary> |
| 1953 | +internalobjectCancellationLock=>this; |
| 1954 | + |
| 1955 | +/// <summary>Initialize the awaitable.</summary> |
| 1956 | +internalunsafeAsyncCopyToAwaitable(Win32FileStreamfileStream) |
| 1957 | +{ |
| 1958 | +_fileStream=fileStream; |
| 1959 | +} |
| 1960 | + |
| 1961 | +/// <summary>Reset state to prepare for the next read operation.</summary> |
| 1962 | +internalvoidResetForNextOperation() |
| 1963 | +{ |
| 1964 | +Debug.Assert(_position>=0,$"Expected non-negative position, got{_position}"); |
| 1965 | +_continuation=null; |
| 1966 | +_errorCode=0; |
| 1967 | +_numBytes=0; |
| 1968 | +} |
| 1969 | + |
| 1970 | +/// <summary>Overlapped callback: store the results, then invoke the continuation delegate.</summary> |
| 1971 | +internalunsafestaticvoidIOCallback(uinterrorCode,uintnumBytes,NativeOverlapped*pOVERLAP) |
| 1972 | +{ |
| 1973 | +varawaitable=(AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP); |
| 1974 | + |
| 1975 | +Debug.Assert(awaitable._continuation!=s_sentinel,"Sentinel must not have already been set as the continuation"); |
| 1976 | +awaitable._errorCode=errorCode; |
| 1977 | +awaitable._numBytes=numBytes; |
| 1978 | + |
| 1979 | +(awaitable._continuation??Interlocked.CompareExchange(refawaitable._continuation,s_sentinel,null))?.Invoke(); |
| 1980 | +} |
| 1981 | + |
| 1982 | +/// <summary> |
| 1983 | +/// Called when it's known that the I/O callback for an operation will not be invoked but we'll |
| 1984 | +/// still be awaiting the awaitable. |
| 1985 | +/// </summary> |
| 1986 | +internalvoidMarkCompleted() |
| 1987 | +{ |
| 1988 | +Debug.Assert(_continuation==null,"Expected null continuation"); |
| 1989 | +_continuation=s_sentinel; |
| 1990 | +} |
| 1991 | + |
| 1992 | +publicAsyncCopyToAwaitableGetAwaiter()=>this; |
| 1993 | +publicboolIsCompleted=>_continuation==s_sentinel; |
| 1994 | +publicvoidGetResult(){} |
| 1995 | +publicvoidOnCompleted(Actioncontinuation)=>UnsafeOnCompleted(continuation); |
| 1996 | +publicvoidUnsafeOnCompleted(Actioncontinuation) |
| 1997 | +{ |
| 1998 | +if(_continuation==s_sentinel|| |
| 1999 | +Interlocked.CompareExchange(ref_continuation,continuation,null)!=null) |
| 2000 | +{ |
| 2001 | +Debug.Assert(_continuation==s_sentinel,$"Expected continuation set to s_sentinel, got ${_continuation}"); |
| 2002 | +Task.Run(continuation); |
| 2003 | +} |
| 2004 | +} |
| 2005 | +} |
| 2006 | + |
1687 | 2007 | [System.Security.SecuritySafeCritical] |
1688 | 2008 | publicoverrideTask<int>ReadAsync(Byte[]buffer,intoffset,intcount,CancellationTokencancellationToken) |
1689 | 2009 | { |
|