Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
/corefxPublic archive

Commitf3a2e16

Browse files
authored
Merge pull request#11569 from stephentoub/filestream_copytoasync
Optimize overlapped I/O FileStream.CopyToAsync implementation on Windows
2 parents0d965c4 +5a8285f commitf3a2e16

File tree

2 files changed

+323
-0
lines changed

2 files changed

+323
-0
lines changed

‎src/System.IO.FileSystem/src/Resources/Strings.resx‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,7 @@
291291
<dataname="UnknownError_Num"xml:space="preserve">
292292
<value>Unknown error '{0}'.</value>
293293
</data>
294+
<dataname="ObjectDisposed_StreamClosed"xml:space="preserve">
295+
<value>Cannot access a closed Stream.</value>
296+
</data>
294297
</root>

‎src/System.IO.FileSystem/src/System/IO/Win32FileStream.cs‎

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
usingSystem.Threading;
99
usingSystem.Threading.Tasks;
1010
usingMicrosoft.Win32.SafeHandles;
11+
usingSystem.Runtime.CompilerServices;
1112

1213
/*
1314
* Win32FileStream supports different modes of accessing the disk - async mode
@@ -1684,6 +1685,325 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle
16841685
returnerrorCode;
16851686
}
16861687

1688+
publicoverrideTaskCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken)
1689+
{
1690+
// Validate arguments as would the base implementation
1691+
if(destination==null)
1692+
{
1693+
thrownewArgumentNullException(nameof(destination));
1694+
}
1695+
if(bufferSize<=0)
1696+
{
1697+
thrownewArgumentOutOfRangeException(nameof(bufferSize),SR.ArgumentOutOfRange_NeedPosNum);
1698+
}
1699+
boolparentCanRead=_parent.CanRead;
1700+
if(!parentCanRead&&!_parent.CanWrite)
1701+
{
1702+
thrownewObjectDisposedException(null,SR.ObjectDisposed_StreamClosed);
1703+
}
1704+
booldestinationCanWrite=destination.CanWrite;
1705+
if(!destination.CanRead&&!destinationCanWrite)
1706+
{
1707+
thrownewObjectDisposedException(nameof(destination),SR.ObjectDisposed_StreamClosed);
1708+
}
1709+
if(!parentCanRead)
1710+
{
1711+
thrownewNotSupportedException(SR.NotSupported_UnreadableStream);
1712+
}
1713+
if(!destinationCanWrite)
1714+
{
1715+
thrownewNotSupportedException(SR.NotSupported_UnwritableStream);
1716+
}
1717+
1718+
// Bail early for cancellation if cancellation has been requested
1719+
if(cancellationToken.IsCancellationRequested)
1720+
{
1721+
returnTask.FromCanceled<int>(cancellationToken);
1722+
}
1723+
1724+
// Fail if the file was closed
1725+
if(_handle.IsClosed)
1726+
{
1727+
throwError.GetFileNotOpen();
1728+
}
1729+
1730+
// Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync
1731+
Debug.Assert((_readPos==0&&_readLen==0&&_writePos>=0)||(_writePos==0&&_readPos<=_readLen),"We're either reading or writing, but not both.");
1732+
return_isAsync?
1733+
AsyncModeCopyToAsync(destination,bufferSize,cancellationToken):
1734+
base.CopyToAsync(destination,bufferSize,cancellationToken);
1735+
}
1736+
1737+
privateasyncTaskAsyncModeCopyToAsync(Streamdestination,intbufferSize,CancellationTokencancellationToken)
1738+
{
1739+
Debug.Assert(_isAsync,"This implementation is for async mode only");
1740+
Debug.Assert(!_handle.IsClosed,"!_handle.IsClosed");
1741+
Debug.Assert(_parent.CanRead,"_parent.CanRead");
1742+
1743+
// Make sure any pending writes have been flushed before we do a read.
1744+
if(_writePos>0)
1745+
{
1746+
awaitFlushWriteAsync(cancellationToken).ConfigureAwait(false);
1747+
}
1748+
1749+
// Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is
1750+
// done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer.
1751+
if(_buffer!=null)
1752+
{
1753+
intbufferedBytes=_readLen-_readPos;
1754+
if(bufferedBytes>0)
1755+
{
1756+
awaitdestination.WriteAsync(_buffer,_readPos,bufferedBytes,cancellationToken).ConfigureAwait(false);
1757+
_readPos=_readLen=0;
1758+
}
1759+
}
1760+
1761+
// For efficiency, we avoid creating a new task and associated state for each asynchronous read.
1762+
// Instead, we create a single reusable awaitable object that will be triggered when an await completes
1763+
// and reset before going again.
1764+
varreadAwaitable=newAsyncCopyToAwaitable(this);
1765+
1766+
// Make sure we are reading from the position that we think we are.
1767+
// Only set the position in the awaitable if we can seek (e.g. not for pipes).
1768+
boolcanSeek=_parent.CanSeek;
1769+
if(canSeek)
1770+
{
1771+
if(_exposedHandle)
1772+
{
1773+
VerifyOSHandlePosition();
1774+
}
1775+
readAwaitable._position=_pos;
1776+
}
1777+
1778+
// Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use
1779+
// _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may
1780+
// actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically
1781+
// CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized.
1782+
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
1783+
// we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution.
1784+
byte[]copyBuffer=newbyte[bufferSize];
1785+
1786+
// Allocate an Overlapped we can use repeatedly for all operations
1787+
varawaitableOverlapped=newPreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback,readAwaitable,copyBuffer);
1788+
varcancellationReg=default(CancellationTokenRegistration);
1789+
try
1790+
{
1791+
// Register for cancellation. We do this once for the whole copy operation, and just try to cancel
1792+
// whatever read operation may currently be in progress, if there is one. It's possible the cancellation
1793+
// request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested
1794+
// in the read/write copy loop.
1795+
if(cancellationToken.CanBeCanceled)
1796+
{
1797+
cancellationReg=cancellationToken.Register(s=>
1798+
{
1799+
varinnerAwaitable=(AsyncCopyToAwaitable)s;
1800+
unsafe
1801+
{
1802+
lock(innerAwaitable.CancellationLock)// synchronize with cleanup of the overlapped
1803+
{
1804+
if(innerAwaitable._nativeOverlapped!=null)
1805+
{
1806+
// Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we
1807+
// don't want to fail the operation because we couldn't cancel it.
1808+
Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle,innerAwaitable._nativeOverlapped);
1809+
}
1810+
}
1811+
}
1812+
},readAwaitable);
1813+
}
1814+
1815+
// Repeatedly read from this FileStream and write the results to the destination stream.
1816+
while(true)
1817+
{
1818+
cancellationToken.ThrowIfCancellationRequested();
1819+
readAwaitable.ResetForNextOperation();
1820+
1821+
try
1822+
{
1823+
boolsynchronousSuccess;
1824+
interrorCode;
1825+
unsafe
1826+
{
1827+
// Allocate a native overlapped for our reusable overlapped, and set position to read based on the next
1828+
// desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or
1829+
// if the stream isn't seekable.)
1830+
readAwaitable._nativeOverlapped=_handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped);
1831+
if(canSeek)
1832+
{
1833+
readAwaitable._nativeOverlapped->OffsetLow=unchecked((int)readAwaitable._position);
1834+
readAwaitable._nativeOverlapped->OffsetHigh=(int)(readAwaitable._position>>32);
1835+
}
1836+
1837+
// Kick off the read.
1838+
synchronousSuccess=ReadFileNative(_handle,copyBuffer,0,copyBuffer.Length,readAwaitable._nativeOverlapped,outerrorCode)>=0;
1839+
}
1840+
1841+
// If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation.
1842+
if(!synchronousSuccess)
1843+
{
1844+
switch(errorCode)
1845+
{
1846+
caseERROR_IO_PENDING:
1847+
// Async operation in progress.
1848+
break;
1849+
caseERROR_BROKEN_PIPE:
1850+
caseERROR_HANDLE_EOF:
1851+
// We're at or past the end of the file, and the overlapped callback
1852+
// won't be raised in these cases. Mark it as completed so that the await
1853+
// below will see it as such.
1854+
readAwaitable.MarkCompleted();
1855+
break;
1856+
default:
1857+
// Everything else is an error (and there won't be a callback).
1858+
throwWin32Marshal.GetExceptionForWin32Error(errorCode);
1859+
}
1860+
}
1861+
1862+
// Wait for the async operation (which may or may not have already completed), then throw if it failed.
1863+
awaitreadAwaitable;
1864+
switch(readAwaitable._errorCode)
1865+
{
1866+
case0:// success
1867+
Debug.Assert(readAwaitable._numBytes>=0,$"Expected non-negative numBytes, got{readAwaitable._numBytes}");
1868+
break;
1869+
caseERROR_BROKEN_PIPE:// logically success with 0 bytes read (write end of pipe closed)
1870+
caseERROR_HANDLE_EOF:// logically success with 0 bytes read (read at end of file)
1871+
Debug.Assert(readAwaitable._numBytes==0,$"Expected 0 bytes read, got{readAwaitable._numBytes}");
1872+
break;
1873+
caseInterop.mincore.Errors.ERROR_OPERATION_ABORTED:// canceled
1874+
thrownewOperationCanceledException(cancellationToken.IsCancellationRequested?cancellationToken:newCancellationToken(true));
1875+
default:// error
1876+
throwWin32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode);
1877+
}
1878+
1879+
// Successful operation. If we got zero bytes, we're done: exit the read/write loop.
1880+
// Otherwise, update the read position for next time accordingly.
1881+
if(readAwaitable._numBytes==0)
1882+
{
1883+
break;
1884+
}
1885+
elseif(canSeek)
1886+
{
1887+
readAwaitable._position+=(int)readAwaitable._numBytes;
1888+
}
1889+
}
1890+
finally
1891+
{
1892+
// Free the resources for this read operation
1893+
unsafe
1894+
{
1895+
NativeOverlapped*overlapped;
1896+
lock(readAwaitable.CancellationLock)// just an Exchange, but we need this to be synchronized with cancellation, so using the same lock
1897+
{
1898+
overlapped=readAwaitable._nativeOverlapped;
1899+
readAwaitable._nativeOverlapped=null;
1900+
}
1901+
if(overlapped!=null)
1902+
{
1903+
_handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped);
1904+
}
1905+
}
1906+
}
1907+
1908+
// Write out the read data.
1909+
awaitdestination.WriteAsync(copyBuffer,0,(int)readAwaitable._numBytes,cancellationToken).ConfigureAwait(false);
1910+
}
1911+
}
1912+
finally
1913+
{
1914+
// Cleanup from the whole copy operation
1915+
cancellationReg.Dispose();
1916+
awaitableOverlapped.Dispose();
1917+
1918+
// Make sure the stream's current position reflects where we ended up
1919+
if(!_handle.IsClosed&&_parent.CanSeek)
1920+
{
1921+
SeekCore(0,SeekOrigin.End);
1922+
}
1923+
}
1924+
}
1925+
1926+
/// <summary>Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead.</summary>
1927+
privatesealedunsafeclassAsyncCopyToAwaitable:ICriticalNotifyCompletion
1928+
{
1929+
/// <summary>Sentinel object used to indicate that the I/O operation has completed before being awaited.</summary>
1930+
privatereadonlystaticActions_sentinel=()=>{};
1931+
/// <summary>Cached delegate to IOCallback.</summary>
1932+
internalstaticreadonlyIOCompletionCallbacks_callback=IOCallback;
1933+
1934+
/// <summary>The FileStream that owns this instance.</summary>
1935+
internalreadonlyWin32FileStream_fileStream;
1936+
1937+
/// <summary>Tracked position representing the next location from which to read.</summary>
1938+
internallong_position;
1939+
/// <summary>The current native overlapped pointer. This changes for each operation.</summary>
1940+
internalNativeOverlapped*_nativeOverlapped;
1941+
/// <summary>
1942+
/// null if the operation is still in progress,
1943+
/// s_sentinel if the I/O operation completed before the await,
1944+
/// s_callback if it completed after the await yielded.
1945+
/// </summary>
1946+
internalAction_continuation;
1947+
/// <summary>Last error code from completed operation.</summary>
1948+
internaluint_errorCode;
1949+
/// <summary>Last number of read bytes from completed operation.</summary>
1950+
internaluint_numBytes;
1951+
1952+
/// <summary>Lock object used to protect cancellation-related access to _nativeOverlapped.</summary>
1953+
internalobjectCancellationLock=>this;
1954+
1955+
/// <summary>Initialize the awaitable.</summary>
1956+
internalunsafeAsyncCopyToAwaitable(Win32FileStreamfileStream)
1957+
{
1958+
_fileStream=fileStream;
1959+
}
1960+
1961+
/// <summary>Reset state to prepare for the next read operation.</summary>
1962+
internalvoidResetForNextOperation()
1963+
{
1964+
Debug.Assert(_position>=0,$"Expected non-negative position, got{_position}");
1965+
_continuation=null;
1966+
_errorCode=0;
1967+
_numBytes=0;
1968+
}
1969+
1970+
/// <summary>Overlapped callback: store the results, then invoke the continuation delegate.</summary>
1971+
internalunsafestaticvoidIOCallback(uinterrorCode,uintnumBytes,NativeOverlapped*pOVERLAP)
1972+
{
1973+
varawaitable=(AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP);
1974+
1975+
Debug.Assert(awaitable._continuation!=s_sentinel,"Sentinel must not have already been set as the continuation");
1976+
awaitable._errorCode=errorCode;
1977+
awaitable._numBytes=numBytes;
1978+
1979+
(awaitable._continuation??Interlocked.CompareExchange(refawaitable._continuation,s_sentinel,null))?.Invoke();
1980+
}
1981+
1982+
/// <summary>
1983+
/// Called when it's known that the I/O callback for an operation will not be invoked but we'll
1984+
/// still be awaiting the awaitable.
1985+
/// </summary>
1986+
internalvoidMarkCompleted()
1987+
{
1988+
Debug.Assert(_continuation==null,"Expected null continuation");
1989+
_continuation=s_sentinel;
1990+
}
1991+
1992+
publicAsyncCopyToAwaitableGetAwaiter()=>this;
1993+
publicboolIsCompleted=>_continuation==s_sentinel;
1994+
publicvoidGetResult(){}
1995+
publicvoidOnCompleted(Actioncontinuation)=>UnsafeOnCompleted(continuation);
1996+
publicvoidUnsafeOnCompleted(Actioncontinuation)
1997+
{
1998+
if(_continuation==s_sentinel||
1999+
Interlocked.CompareExchange(ref_continuation,continuation,null)!=null)
2000+
{
2001+
Debug.Assert(_continuation==s_sentinel,$"Expected continuation set to s_sentinel, got ${_continuation}");
2002+
Task.Run(continuation);
2003+
}
2004+
}
2005+
}
2006+
16872007
[System.Security.SecuritySafeCritical]
16882008
publicoverrideTask<int>ReadAsync(Byte[]buffer,intoffset,intcount,CancellationTokencancellationToken)
16892009
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp