Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit347b061

Browse files
committed
Rework and test read of buffered segments
1 parent88cea42 commit347b061

File tree

2 files changed

+128
-63
lines changed

2 files changed

+128
-63
lines changed

‎src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs‎

Lines changed: 64 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -271,53 +271,54 @@ public override async Task CopyToAsync(PipeWriter destination, CancellationToken
271271
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
272272
}
273273

274-
if(_bufferedBytes>0&&(!_examinedEverything||_isStreamCompleted))
274+
CancellationTokenRegistrationreg=default;
275+
if(cancellationToken.CanBeCanceled)
276+
{
277+
reg=cancellationToken.UnsafeRegister(state=>((StreamPipeReader)state!).Cancel(),this);
278+
}
279+
280+
using(reg)
275281
{
276-
BufferSegment?segment=_readHead;
277282
try
278283
{
279-
while(segment!=null)
284+
if(_bufferedBytes>0)
280285
{
281-
FlushResultflushResult=awaitdestination.WriteAsync(segment.AvailableMemory,cancellationToken).ConfigureAwait(false);
282-
if(flushResult.IsCanceled)
286+
BufferSegment?segment=_readHead;
287+
try
283288
{
284-
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
289+
while(segment!=null)
290+
{
291+
FlushResultflushResult=awaitdestination.WriteAsync(segment.Memory,tokenSource.Token).ConfigureAwait(false);
292+
293+
if(flushResult.IsCanceled)
294+
{
295+
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
296+
}
297+
298+
segment=segment.NextSegment;
299+
300+
if(flushResult.IsCompleted)
301+
{
302+
return;
303+
}
304+
}
285305
}
286-
287-
segment=segment.NextSegment;
288-
289-
if(flushResult.IsCompleted)
306+
finally
290307
{
291-
return;
308+
// Advance even if WriteAsync throws so the PipeReader is not left in the
309+
// currently reading state
310+
if(segment!=null)
311+
{
312+
AdvanceTo(segment,segment.End,segment,segment.End);
313+
}
292314
}
293315
}
294-
}
295-
finally
296-
{
297-
// Advance even if WriteAsync throws so the PipeReader is not left in the
298-
// currently reading state
299-
if(segment!=null)
316+
317+
if(_isStreamCompleted)
300318
{
301-
AdvanceTo(segment,segment.End,segment,segment.End);
319+
return;
302320
}
303-
}
304-
}
305321

306-
if(_isStreamCompleted)
307-
{
308-
return;
309-
}
310-
311-
CancellationTokenRegistrationreg=default;
312-
if(cancellationToken.CanBeCanceled)
313-
{
314-
reg=cancellationToken.UnsafeRegister(state=>((StreamPipeReader)state!).Cancel(),this);
315-
}
316-
317-
using(reg)
318-
{
319-
try
320-
{
321322
awaitInnerStream.CopyToAsync(destination,tokenSource.Token).ConfigureAwait(false);
322323
}
323324
catch(OperationCanceledException)
@@ -341,34 +342,6 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can
341342
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
342343
}
343344

344-
if(_bufferedBytes>0&&(!_examinedEverything||_isStreamCompleted))
345-
{
346-
BufferSegment?segment=_readHead;
347-
try
348-
{
349-
while(segment!=null)
350-
{
351-
awaitdestination.WriteAsync(segment.AvailableMemory,cancellationToken).ConfigureAwait(false);
352-
353-
segment=segment.NextSegment;
354-
}
355-
}
356-
finally
357-
{
358-
// Advance even if WriteAsync throws so the PipeReader is not left in the
359-
// currently reading state
360-
if(segment!=null)
361-
{
362-
AdvanceTo(segment,segment.End,segment,segment.End);
363-
}
364-
}
365-
}
366-
367-
if(_isStreamCompleted)
368-
{
369-
return;
370-
}
371-
372345
CancellationTokenRegistrationreg=default;
373346
if(cancellationToken.CanBeCanceled)
374347
{
@@ -379,6 +352,34 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can
379352
{
380353
try
381354
{
355+
if(_bufferedBytes>0)
356+
{
357+
BufferSegment?segment=_readHead;
358+
try
359+
{
360+
while(segment!=null)
361+
{
362+
awaitdestination.WriteAsync(segment.Memory,tokenSource.Token).ConfigureAwait(false);
363+
364+
segment=segment.NextSegment;
365+
}
366+
}
367+
finally
368+
{
369+
// Advance even if WriteAsync throws so the PipeReader is not left in the
370+
// currently reading state
371+
if(segment!=null)
372+
{
373+
AdvanceTo(segment,segment.End,segment,segment.End);
374+
}
375+
}
376+
}
377+
378+
if(_isStreamCompleted)
379+
{
380+
return;
381+
}
382+
382383
#if(!NETSTANDARD2_0&&!NETFRAMEWORK)
383384
awaitInnerStream.CopyToAsync(destination,tokenSource.Token).ConfigureAwait(false);
384385
#else

‎src/libraries/System.IO.Pipelines/tests/StreamPipeReaderCopyToAsyncTests.cs‎

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,70 @@ public async Task CopyToAsyncPipeWriterWorks()
9090
targetPipe.Writer.Complete();
9191
}
9292

93+
[ConditionalFact(typeof(PlatformDetection),nameof(PlatformDetection.IsThreadingSupported))]
94+
publicasyncTaskCopyToAsyncStreamWorksWithBufferedSegments()
95+
{
96+
varmessages=newList<byte[]>()
97+
{
98+
Encoding.UTF8.GetBytes("Hello World1"),
99+
Encoding.UTF8.GetBytes("Hello World2"),
100+
Encoding.UTF8.GetBytes("Hello World3"),
101+
};
102+
103+
varpipe=newPipe(s_testPipeOptions);
104+
varpipeReader=PipeReader.Create(pipe.Reader.AsStream(),s_testOptions);
105+
106+
foreach(varmsginmessages)
107+
{
108+
awaitpipe.Writer.WriteAsync(msg);
109+
}
110+
pipe.Writer.Complete();
111+
112+
byte[]expected=messages.SelectMany(msg=>msg).ToArray();
113+
114+
varreadResult=awaitpipeReader.ReadAsync();
115+
Assert.Equal(expected,readResult.Buffer.ToArray());
116+
117+
varstream=newMemoryStream();
118+
awaitpipeReader.CopyToAsync(stream);
119+
Assert.Equal(expected,stream.ToArray());
120+
}
121+
122+
[Fact]
123+
publicasyncTaskCopyToAsyncPipeWriterWorksWithBufferedSegments()
124+
{
125+
varmessages=newList<byte[]>()
126+
{
127+
Encoding.UTF8.GetBytes("Hello World1"),
128+
Encoding.UTF8.GetBytes("Hello World2"),
129+
Encoding.UTF8.GetBytes("Hello World3"),
130+
};
131+
132+
varpipe=newPipe(s_testPipeOptions);
133+
varpipeReader=PipeReader.Create(pipe.Reader.AsStream(),s_testOptions);
134+
vartargetPipe=newPipe(s_testPipeOptions);
135+
136+
foreach(varmsginmessages)
137+
{
138+
awaitpipe.Writer.WriteAsync(msg);
139+
}
140+
pipe.Writer.Complete();
141+
142+
byte[]expected=messages.SelectMany(msg=>msg).ToArray();
143+
144+
varreadResult=awaitpipeReader.ReadAsync();
145+
Assert.Equal(expected,readResult.Buffer.ToArray());
146+
147+
awaitpipeReader.CopyToAsync(targetPipe.Writer);
148+
149+
readResult=awaittargetPipe.Reader.ReadAsync();
150+
Assert.Equal(expected,readResult.Buffer.ToArray());
151+
152+
targetPipe.Reader.AdvanceTo(readResult.Buffer.End);
153+
targetPipe.Reader.Complete();
154+
targetPipe.Writer.Complete();
155+
}
156+
93157
[Fact]
94158
publicasyncTaskMultiSegmentWritesWorks()
95159
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp