Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1bafe6

Browse files
authored
Handle completed FlushResult on CopyToAsync (#51147)
* Handle completed FlushResult on CopyToAsync* Avoid the async state machine if write finishes synchronously* Add test case for resume after target pipe is completed
1 parent4d629d7 commitf1bafe6

File tree

2 files changed

+67
-16
lines changed

2 files changed

+67
-16
lines changed

‎src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs‎

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -130,16 +130,10 @@ public virtual Task CopyToAsync(PipeWriter destination, CancellationToken cancel
130130
returnTask.FromCanceled(cancellationToken);
131131
}
132132

133-
returnCopyToAsyncCore(destination,async(destination,memory,cancellationToken)=>
134-
{
135-
FlushResultresult=awaitdestination.WriteAsync(memory,cancellationToken).ConfigureAwait(false);
136-
137-
if(result.IsCanceled)
138-
{
139-
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
140-
}
141-
},
142-
cancellationToken);
133+
returnCopyToAsyncCore(
134+
destination,
135+
(destination,memory,cancellationToken)=>destination.WriteAsync(memory,cancellationToken),
136+
cancellationToken);
143137
}
144138

145139
/// <summary>Asynchronously reads the bytes from the <see cref="System.IO.Pipelines.PipeReader" /> and writes them to the specified stream, using a specified cancellation token.</summary>
@@ -158,13 +152,28 @@ public virtual Task CopyToAsync(Stream destination, CancellationToken cancellati
158152
returnTask.FromCanceled(cancellationToken);
159153
}
160154

161-
returnCopyToAsyncCore(
162-
destination,
163-
(destination,memory,cancellationToken)=>destination.WriteAsync(memory,cancellationToken),
164-
cancellationToken);
155+
returnCopyToAsyncCore(destination,(destination,memory,cancellationToken)=>
156+
{
157+
ValueTasktask=destination.WriteAsync(memory,cancellationToken);
158+
159+
if(task.IsCompletedSuccessfully)
160+
{
161+
task.GetAwaiter().GetResult();
162+
returnnewValueTask<FlushResult>(newFlushResult(isCanceled:false,isCompleted:false));
163+
}
164+
165+
staticasyncValueTask<FlushResult>Awaited(ValueTaskwriteTask)
166+
{
167+
awaitwriteTask.ConfigureAwait(false);
168+
returnnewFlushResult(isCanceled:false,isCompleted:false);
169+
}
170+
171+
returnAwaited(task);
172+
},
173+
cancellationToken);
165174
}
166175

167-
privateasyncTaskCopyToAsyncCore<TStream>(TStreamdestination,Func<TStream,ReadOnlyMemory<byte>,CancellationToken,ValueTask>writeAsync,CancellationTokencancellationToken)
176+
privateasyncTaskCopyToAsyncCore<TStream>(TStreamdestination,Func<TStream,ReadOnlyMemory<byte>,CancellationToken,ValueTask<FlushResult>>writeAsync,CancellationTokencancellationToken)
168177
{
169178
while(true)
170179
{
@@ -182,9 +191,19 @@ private async Task CopyToAsyncCore<TStream>(TStream destination, Func<TStream, R
182191

183192
while(buffer.TryGet(refposition,outReadOnlyMemory<byte>memory))
184193
{
185-
awaitwriteAsync(destination,memory,cancellationToken).ConfigureAwait(false);
194+
FlushResultflushResult=awaitwriteAsync(destination,memory,cancellationToken).ConfigureAwait(false);
195+
196+
if(flushResult.IsCanceled)
197+
{
198+
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
199+
}
186200

187201
consumed=position;
202+
203+
if(flushResult.IsCompleted)
204+
{
205+
return;
206+
}
188207
}
189208

190209
// The while loop completed succesfully, so we've consumed the entire buffer.

‎src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs‎

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,38 @@ public async Task CopyToAsyncPipeWriterWorks()
8686
targetPipe.Writer.Complete();
8787
}
8888

89+
[Fact]
90+
publicasyncTaskCopyToAsyncPipeWriterResume()
91+
{
92+
varmessages=newList<byte[]>()
93+
{
94+
Encoding.UTF8.GetBytes("Hello World1"),
95+
Encoding.UTF8.GetBytes("Hello World2"),
96+
Encoding.UTF8.GetBytes("Hello World3"),
97+
};
98+
99+
varpipe=newPipe(s_testOptions);
100+
vartargetPipe=newPipe(s_testOptions);
101+
targetPipe.Reader.Complete();
102+
Tasktask=pipe.Reader.CopyToAsync(targetPipe.Writer);
103+
foreach(varmsginmessages)
104+
{
105+
awaitpipe.Writer.WriteAsync(msg);
106+
}
107+
pipe.Writer.Complete();
108+
awaittask;
109+
110+
varresumePipe=newPipe(s_testOptions);
111+
awaitpipe.Reader.CopyToAsync(resumePipe.Writer);
112+
113+
ReadResultreadResult=awaitresumePipe.Reader.ReadAsync();
114+
Assert.Equal(messages.SelectMany(msg=>msg).ToArray(),readResult.Buffer.ToArray());
115+
116+
resumePipe.Reader.AdvanceTo(readResult.Buffer.End);
117+
resumePipe.Reader.Complete();
118+
resumePipe.Writer.Complete();
119+
}
120+
89121
[Fact]
90122
publicasyncTaskMultiSegmentWritesWorks()
91123
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp