Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Use UnixFileStream's ReadAsync implementation on Windows when !IsAsync#56682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
adamsitnik merged 1 commit intodotnet:mainfromstephentoub:consolidatefslogic
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Use UnixFileStream's ReadAsync implementation on Windows as well
UnixFileStream's ReadAsync implementation uses a reusable IValueTaskSource implementation to avoid allocating a new work item on every read.  We can push that implementation down to OSFileStreamStrategy, and then use it for the Windows implementation of ReadAsync as well when IsAsync==false, rather than delegating to the base Stream implementation.This PR almost entirely just moves code around.  The only change to logic is in RandomAccess.Windows.cs, to only set an offset into the NativeOverlapped if the SafeFileHandle is seekable; otherwise, it fails when used with pipes.
  • Loading branch information
@stephentoub
stephentoub committedAug 1, 2021
commit9fd4cdaf74b5a47fb61dff6404f18ce8aa8c5d4a
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -695,8 +695,11 @@ private static NativeOverlapped GetNativeOverlappedForSyncHandle(SafeFileHandle
Debug.Assert(!handle.IsAsync);

NativeOverlapped result = default;
result.OffsetLow = unchecked((int)fileOffset);
result.OffsetHigh = (int)(fileOffset >> 32);
if (handle.CanSeek)
{
result.OffsetLow = unchecked((int)fileOffset);
result.OffsetHigh = (int)(fileOffset >> 32);
}
return result;
}

Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,13 +21,7 @@ internal AsyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess a

internal override bool IsAsync => true;

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
=> ReadAsyncInternal(destination, cancellationToken);

private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanSeek)
{
Expand All@@ -52,17 +46,11 @@ private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationT
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> WriteAsyncInternal(buffer, cancellationToken);

private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition,source.Length) -source.Length : -1;
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition,buffer.Length) -buffer.Length : -1;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle,source, writeOffset, cancellationToken);
(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle,buffer, writeOffset, cancellationToken);
return vts != null
? new ValueTask(vts, vts.Version)
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
Expand DownExpand Up@@ -120,15 +108,5 @@ await FileStreamHelpers
}
}
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public override int EndRead(IAsyncResult asyncResult) => TaskToApm.End<int>(asyncResult);

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public override void EndWrite(IAsyncResult asyncResult) => TaskToApm.End(asyncResult);
}
}
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.Win32.SafeHandles;

namespace System.IO.Strategies
Expand All@@ -13,6 +14,7 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy
{
protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws
private readonly FileAccess _access; // What file was opened for.
private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads

protected long _filePosition;
protected long _length = -1; // negative means that hasn't been fetched.
Expand DownExpand Up@@ -69,6 +71,8 @@ internal OSFileStreamStrategy(string path, FileMode mode, FileAccess access, Fil
}
}

internal override bool IsAsync => _fileHandle.IsAsync;

public sealed override bool CanSeek => _fileHandle.CanSeek;

public sealed override bool CanRead => !_fileHandle.IsClosed && (_access & FileAccess.Read) != 0;
Expand DownExpand Up@@ -278,5 +282,145 @@ public sealed override void Write(ReadOnlySpan<byte> buffer)
RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
_filePosition += buffer.Length;
}

public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public sealed override void EndWrite(IAsyncResult asyncResult) =>
TaskToApm.End(asyncResult);

public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
}

public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public sealed override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanSeek)
{
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}

// This implementation updates the file position before the operation starts and updates it after incomplete read.
// Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
return rats.QueueRead(destination, readOffset, cancellationToken);
}

/// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
private sealed class ReadAsyncTaskSource : IValueTaskSource<int>, IThreadPoolWorkItem
{
private readonly OSFileStreamStrategy _stream;
private ManualResetValueTaskSourceCore<int> _source;

private Memory<byte> _destination;
private long _readOffset;
private ExecutionContext? _context;
private CancellationToken _cancellationToken;

public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream;

public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
{
_destination = destination;
_readOffset = readOffset;
_cancellationToken = cancellationToken;
_context = ExecutionContext.Capture();

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
return new ValueTask<int>(this, _source.Version);
}

void IThreadPoolWorkItem.Execute()
{
if (_context is null || _context.IsDefault)
{
Read();
}
else
{
ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this);
}
}

private void Read()
{
Exception? error = null;
int result = 0;

try
{
if (_cancellationToken.IsCancellationRequested)
{
error = new OperationCanceledException(_cancellationToken);
}
else
{
result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
}
}
catch (Exception e)
{
error = e;
}
finally
{
// if the read was incomplete, we need to update the file position:
if (result != _destination.Length)
{
_stream.OnIncompleteRead(_destination.Length, result);
}

_destination = default;
_readOffset = -1;
_cancellationToken = default;
_context = null;
}

if (error is not null)
{
_source.SetException(error);
}
else
{
_source.SetResult(result);
}
}

int IValueTaskSource<int>.GetResult(short token)
{
try
{
return _source.GetResult(token);
}
finally
{
_source.Reset();
#pragma warning disable CS0197
Volatile.Write(ref _stream._readAsyncTaskSource, this);
#pragma warning restore CS0197
}
}

ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) =>
_source.GetStatus(token);

void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_source.OnCompleted(continuation, state, token, flags);
}
}
}
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

usingSystem.Runtime.InteropServices;
usingSystem.Threading;
usingSystem.Threading.Tasks;
usingMicrosoft.Win32.SafeHandles;

namespaceSystem.IO.Strategies
Expand All@@ -20,45 +17,5 @@ internal SyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess ac
}

internaloverrideboolIsAsync=>false;

publicoverrideTask<int>ReadAsync(byte[]buffer,intoffset,intcount,CancellationTokencancellationToken)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
returnBeginReadInternal(buffer,offset,count,null,null,serializeAsynchronously:true,apm:false);
}

publicoverrideValueTask<int>ReadAsync(Memory<byte>buffer,CancellationTokencancellationToken=default)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Read is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
returnMemoryMarshal.TryGetArray(buffer,outArraySegment<byte>segment)?
newValueTask<int>(BeginReadInternal(segment.Array!,segment.Offset,segment.Count,null,null,serializeAsynchronously:true,apm:false)):
base.ReadAsync(buffer,cancellationToken);
}

publicoverrideTaskWriteAsync(byte[]buffer,intoffset,intcount,CancellationTokencancellationToken)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
returnBeginWriteInternal(buffer,offset,count,null,null,serializeAsynchronously:true,apm:false);
}

publicoverrideValueTaskWriteAsync(ReadOnlyMemory<byte>buffer,CancellationTokencancellationToken=default)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Write is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
returnMemoryMarshal.TryGetArray(buffer,outArraySegment<byte>segment)?
newValueTask(BeginWriteInternal(segment.Array!,segment.Offset,segment.Count,null,null,serializeAsynchronously:true,apm:false)):
base.WriteAsync(buffer,cancellationToken);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

after this code removal,SyncWindowsFileStreamStrategy andUnixFileStreamStrategy become de facto classes that do not change or extend the behaviour ofOSFileStreamStrategy.

Have you considered the removal ofSyncWindowsFileStreamStrategy andUnixFileStreamStrategy and usingOSFileStreamStrategy directly?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

No strong opinion. If you'd like to consolidate, go for it. If not, I'm ok keeping the derived types.

}
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp