- Notifications
You must be signed in to change notification settings - Fork4.9k
Optimize DeflateStream.CopyToAsync#13184
Uh oh!
There was an error while loading.Please reload this page.
Conversation
Today CopyToAsync on DeflateStream is the basic read/write loop. However, all CopyToAsync really needs to do on DeflateStream is transform the source data (via a synchronous operation) and pass it along to the destination. This means that we can improve its implementation by effectively having the source stream CopyToAsync to the destination stream, albeit with that transformation added in the middle. This has a two key benefits:- It eliminates the read operations and the associated copy, allowing the source to effectively push directly to the inflater inside DeflateStream.- It takes advantage of any optimizations the source stream has in its CopyToAsync implementation, e.g. MemoryStream simply passing its whole internal buffer, NetworkStream using a SocketAsyncEventArgs, FileStream reusing its async state, etc.- CopyToAsync is typically used when no other reading has been done from the stream. Since this change causes CopyToAsync to bypass the internal buffer used by DeflateStream before the inflater itself, we can avoid allocating that buffer at all if CopyToAsync is used by itself. This avoids an 8K allocation per DeflateStream instance, and even if we switch to pulling this buffer from ArrayPool, it still avoids needing to rent/return with the ArrayPool in these cases.
| returnnewCopyToAsyncStream(this,destination,ArrayPool<byte>.Shared.Rent(bufferSize),cancellationToken).CopyFromSourceToDestination(); | ||
| } | ||
| privatesealedclassCopyToAsyncStream:Stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What is the purpose of having this be a stream and not just some helper function? Is this CopyToAsyncStream getting returned somewhere that I don't see?
stephentoubOct 31, 2016 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It's getting passed as the destination argument to the wrapped Stream's CopyToAsync:
https://github.com/dotnet/corefx/pull/13184/files#diff-35ce8d1b7a1f37dccabcc9a644648242R738
| } | ||
| } | ||
| publicoverrideasyncTaskWriteAsync(byte[]buffer,intoffset,intcount,CancellationTokencancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Where is this being called? The only write I see is on the destination stream on line 732.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This instance gets passed as the destination argument to the wrapped stream's CopyToAsync, so that wrapped stream's CopyToAsync pushes data to this instance via this WriteAsync. That's why this is a Stream ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
nvm, copyTo on 738
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Beat me by 8 seconds on the response :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
😄
| if(bytesRead>_arrayPoolBufferHighWaterMark)_arrayPoolBufferHighWaterMark=bytesRead; | ||
| await_destination.WriteAsync(_arrayPoolBuffer,0,bytesRead,cancellationToken).ConfigureAwait(false); | ||
| } | ||
| elsebreak; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Instead of breaking here, we should first check if _deflateStream._inflater.Finished == falselike we do in ReadAsync to handle the case where bytesRead == 0 but the inflation is not yet complete. This can happen in the rare case where the header is the exact amount of bytes as the inflation buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Interesting. Ok, I'll fix that. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Spoke about this offline with@ianhays; we think it's fine as-is, but he's putting a test case together.
| // a buffer for storing the inflated output and passing along to the destination stream, and whatever | ||
| // buffer the source stream creates in its CopyToAsync implementation. As such, we halve the buffer size, | ||
| // to split it across the two. | ||
| bufferSize=Math.Max(1,bufferSize/2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I understand the reasoning here but I'm not sure it's what a user expects.
I guess the question is, if I specify a buffer of size N, do I expect that N is a bound on the memory used, or do I expect that the copy happens in chunks of N (when possible), regardless of the memory this would use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Another thing to note is that in the CopyTo ArrayPool helper we were using before this PR we did not do any halving of the bufferSize, so we did a ReadAsync(..bufferSize) followed by a WriteAsync(...bufferSize).
Personally I would expect a copyToAsync(bufferSize 10) to complete in one Read/Write call when there is only 10 bytes in the source stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fair question. My primary concern is around the case where the buffer pool is exhausted. If there are plenty of available buffers in the pool, then using 2 instead of 1 at the requested size isn't that big a deal. But if there aren't any buffers in the pool, we're going to now end up allocating 2 buffers where we previously would have only allocated 1. I figured we could at least allocate no more in total size than the user requested in such cases.
I don't have enough data to really argue one way or the other, though. If you think I should remove this and just go with the original bufferSize, I can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Personally I would expect a copyToAsync(bufferSize 10) to complete in one Read/Write call when there is only 10 bytes in the source stream.
Given that DeflateStream changes the amount of data as part of the transformation, you generally can't expect 10 bytes of input with a 10 byte buffer will result in a single write to the output stream. But I get the general point.
I'll remove the halving. We can add it later if this looks like an issue.
ianhays left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
These changes look great,@stephentoub! Minus the one edge case around theFinished check, this LGTM.
| bufferSize=Math.Max(1,bufferSize/2); | ||
| // Do the copy | ||
| returnnewCopyToAsyncStream(this,destination,ArrayPool<byte>.Shared.Rent(bufferSize),cancellationToken).CopyFromSourceToDestination(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Very minor nit: seems like it would be better to rent the ArrayPool in CopyToAsyncStream; that way it's more cleanly encapsulated if we ever wanted to change to a different buffer pool implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Ok. We can rent it in the CopyToAsyncStream ctor, so I'll change it to do that. If we instead rent it in CopyFromSourceToDestination, which would lead to the best symmetry/encapsulation, we'd unfortunately pay a bit more for it, as we'd end up spending a field for each of the bufferSize and the rented byte[].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Couldn't you pass bufferSize to CopyFromSourceToDestination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Couldn't you pass bufferSize to CopyFromSourceToDestination?
CopyFromSourceToDestination is an async method. Any arguments to that method will end up on its state machine object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Ah, good point.
| privatevoidIncrementAsyncOperations() | ||
| { | ||
| intnewCount=Interlocked.Increment(ref_asyncOperations); | ||
| Debug.Assert(newCount==1,$"DeflateStream was misused, with multiple pending async operations"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I do wonder why this isn't an exception, if we're going to specifically check for it. (Or if we only want to check for this in debug, then the count should be debug-only too.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I wonder that as well, but I didn't want to mess with that as part of this change; I just encapsulated the check into its own method so that we can more easily do such things in the future (it also helps workaround a compiler warning regarding passing a ref on a MarshalByRefObject). There are earlier checks for whether _asyncOperations != 0, throwing an exception in such cases... I don't know why those checks aren't just based on these atomic operations so as to avoid race conditions, but such a change now would be a small breaking change, as it could impact which exceptions get thrown in a very niche corner cases. Probably not something we should care too much about, but definitely something to be left for later.
geoffkizer commentedOct 31, 2016
Overall this looks great; some minor nits above. |
stephentoub commentedOct 31, 2016
@dotnet-bot Test Innerloop OSX Release Build and Test please (#12753) |
stephentoub commentedOct 31, 2016
Thanks for the reviews,@ianhays and@geoffkizer. |
davidfowl commentedNov 1, 2016
👏 |
Today, CopyToAsync on DeflateStream is the basic read/write loop. However, all CopyToAsync really needs to do on DeflateStream is transform the source data (via a synchronous operation) and pass it along to the destination. This means that we can improve its implementation by effectively having the source stream CopyToAsync to the destination stream, albeit with that transformation added in the middle. Doing this has a three key benefits:
(I also noticed an unnecessary allocation in Inflater, which I removed.)
The net effect of this change is a sizeable boost on throughput for many scenarios (I didn't find any scenarios where it regressed). Here are a few examples. In this table, "MS" is a memory stream, so "MS to MS" is using DeflateStream created around a compressed MemoryStream and using CopyToAsync on the DeflateStream to decompress it to another MemoryStream. "CMS" is a "CustomMemoryStream", which is simply a type that derives from MemoryStream and doesn't add or override any logic; this derivation defeats some the CopyToAsync optimization in MemoryStream, so this is essentially the worst case for this DeflateStream optimization. And "FS" is FileStream, where a "(true)" suffix means useAsync==true (in which case FileStream has its own CopyToAsync implementation) and a "(false)" suffix means useAsync==false (in which case it essentially delegates back to the base Stream's CopyToAsync). "Len" is the length of the original random input in bytes, and "Iters" is the number of iterations (the number of times positions were reset and CopyToAsync was called).
Fixeshttps://github.com/dotnet/corefx/issues/11571
Contributes tohttps://github.com/dotnet/corefx/issues/12549
cc:@ianhays,@JeremyKuhne,@geoffkizer,@benaadams,@davidfowl
(@ianhays, please pay careful attention to my use of the inflater. I essentially copied what was in ReadAsync and refactored it, and I want to make sure I'm using it correctly / didn't misunderstand anything about its behavior.)