Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4a3ecbf

Browse files
MattiasBuelensmarco-ippolito
authored andcommitted
stream: implementmin option forReadableStreamBYOBReader.read
PR-URL:#50888Backport-PR-URL:#54044Reviewed-By: Matteo Collina <matteo.collina@gmail.com>Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com>
1 parent7625dc4 commit4a3ecbf

File tree

15 files changed

+968
-114
lines changed

15 files changed

+968
-114
lines changed

‎doc/api/webstreams.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ added: v16.5.0
488488
-->
489489

490490
* Returns: A promise fulfilled with an object:
491-
*`value` {ArrayBuffer}
491+
*`value` {any}
492492
*`done` {boolean}
493493

494494
Requests the next chunk of data from the underlying {ReadableStream}
@@ -613,15 +613,24 @@ added: v16.5.0
613613
{ReadableStream} is closed or rejected if the stream errors or the reader's
614614
lock is released before the stream finishes closing.
615615
616-
####`readableStreamBYOBReader.read(view)`
616+
####`readableStreamBYOBReader.read(view[, options])`
617617
618618
<!-- YAML
619619
added: v16.5.0
620+
changes:
621+
- version: REPLACEME
622+
pr-url: https://github.com/nodejs/node/pull/54044
623+
description: Added`min` option.
620624
-->
621625
622626
*`view` {Buffer|TypedArray|DataView}
627+
*`options` {Object}
628+
*`min` {number} When set, the returned promise will only be
629+
fulfilled as soon as`min` number of elements are available.
630+
When not set, the promise fulfills when at least one element
631+
is available.
623632
* Returns: A promise fulfilled with an object:
624-
*`value` {ArrayBuffer}
633+
*`value` {TypedArray|DataView}
625634
*`done` {boolean}
626635
627636
Requests the next chunk of data from the underlying {ReadableStream}

‎lib/internal/encoding.js

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ const {
4747
const{
4848
validateString,
4949
validateObject,
50-
kValidateObjectAllowNullable,
51-
kValidateObjectAllowArray,
52-
kValidateObjectAllowFunction,
50+
kValidateObjectAllowObjectsAndNull,
5351
}=require('internal/validators');
5452
constbinding=internalBinding('encoding_binding');
5553
const{
@@ -393,10 +391,6 @@ const TextDecoder =
393391
makeTextDecoderICU() :
394392
makeTextDecoderJS();
395393

396-
constkValidateObjectAllowObjectsAndNull=kValidateObjectAllowNullable|
397-
kValidateObjectAllowArray|
398-
kValidateObjectAllowFunction;
399-
400394
functionmakeTextDecoderICU(){
401395
const{
402396
decode:_decode,

‎lib/internal/validators.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
222222
constkValidateObjectAllowNullable=1<<0;
223223
constkValidateObjectAllowArray=1<<1;
224224
constkValidateObjectAllowFunction=1<<2;
225+
constkValidateObjectAllowObjects=kValidateObjectAllowArray|
226+
kValidateObjectAllowFunction;
227+
constkValidateObjectAllowObjectsAndNull=kValidateObjectAllowNullable|
228+
kValidateObjectAllowArray|
229+
kValidateObjectAllowFunction;
225230

226231
/**
227232
*@callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
583588
kValidateObjectAllowNullable,
584589
kValidateObjectAllowArray,
585590
kValidateObjectAllowFunction,
591+
kValidateObjectAllowObjects,
592+
kValidateObjectAllowObjectsAndNull,
586593
validateOneOf,
587594
validatePlainFunction,
588595
validatePort,

‎lib/internal/webstreams/readablestream.js

Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
SymbolAsyncIterator,
2424
SymbolDispose,
2525
SymbolToStringTag,
26+
TypedArrayPrototypeGetLength,
2627
Uint8Array,
2728
}=primordials;
2829

@@ -34,6 +35,7 @@ const {
3435
ERR_INVALID_ARG_TYPE,
3536
ERR_INVALID_STATE,
3637
ERR_INVALID_THIS,
38+
ERR_OUT_OF_RANGE,
3739
},
3840
}=require('internal/errors');
3941

@@ -59,8 +61,8 @@ const {
5961
validateAbortSignal,
6062
validateBuffer,
6163
validateObject,
62-
kValidateObjectAllowNullable,
63-
kValidateObjectAllowFunction,
64+
kValidateObjectAllowObjects,
65+
kValidateObjectAllowObjectsAndNull,
6466
}=require('internal/validators');
6567

6668
const{
@@ -247,9 +249,9 @@ class ReadableStream {
247249
*@param {UnderlyingSource} [source]
248250
*@param {QueuingStrategy} [strategy]
249251
*/
250-
constructor(source={},strategy=kEmptyObject){
251-
if(source===null)
252-
thrownewERR_INVALID_ARG_VALUE('source','Object',source);
252+
constructor(source=kEmptyObject,strategy=kEmptyObject){
253+
validateObject(source,'source',kValidateObjectAllowObjects);
254+
validateObject(strategy,'strategy',kValidateObjectAllowObjectsAndNull);
253255
this[kState]=createReadableStreamState();
254256

255257
this[kIsClosedPromise]=createDeferredPromise();
@@ -335,7 +337,7 @@ class ReadableStream {
335337
getReader(options=kEmptyObject){
336338
if(!isReadableStream(this))
337339
thrownewERR_INVALID_THIS('ReadableStream');
338-
validateObject(options,'options',kValidateObjectAllowNullable|kValidateObjectAllowFunction);
340+
validateObject(options,'options',kValidateObjectAllowObjectsAndNull);
339341
constmode=options?.mode;
340342

341343
if(mode===undefined)
@@ -373,6 +375,7 @@ class ReadableStream {
373375

374376
// The web platform tests require that these be handled one at a
375377
// time and in a specific order. options can be null or undefined.
378+
validateObject(options,'options',kValidateObjectAllowObjectsAndNull);
376379
constpreventAbort=options?.preventAbort;
377380
constpreventCancel=options?.preventCancel;
378381
constpreventClose=options?.preventClose;
@@ -415,6 +418,7 @@ class ReadableStream {
415418
destination);
416419
}
417420

421+
validateObject(options,'options',kValidateObjectAllowObjectsAndNull);
418422
constpreventAbort=options?.preventAbort;
419423
constpreventCancel=options?.preventCancel;
420424
constpreventClose=options?.preventClose;
@@ -459,10 +463,8 @@ class ReadableStream {
459463
values(options=kEmptyObject){
460464
if(!isReadableStream(this))
461465
thrownewERR_INVALID_THIS('ReadableStream');
462-
validateObject(options,'options');
463-
const{
464-
preventCancel=false,
465-
}=options;
466+
validateObject(options,'options',kValidateObjectAllowObjectsAndNull);
467+
constpreventCancel=!!(options?.preventCancel);
466468

467469
// eslint-disable-next-line no-use-before-define
468470
constreader=newReadableStreamDefaultReader(this);
@@ -926,47 +928,62 @@ class ReadableStreamBYOBReader {
926928

927929
/**
928930
*@param {ArrayBufferView} view
931+
*@param {{
932+
* min? : number
933+
* }} [options]
929934
*@returns {Promise<{
930-
*view : ArrayBufferView,
935+
*value : ArrayBufferView,
931936
* done : boolean,
932937
* }>}
933938
*/
934-
read(view){
939+
asyncread(view,options=kEmptyObject){
935940
if(!isReadableStreamBYOBReader(this))
936-
returnPromiseReject(newERR_INVALID_THIS('ReadableStreamBYOBReader'));
941+
thrownewERR_INVALID_THIS('ReadableStreamBYOBReader');
937942
if(!isArrayBufferView(view)){
938-
returnPromiseReject(
939-
newERR_INVALID_ARG_TYPE(
940-
'view',
941-
[
942-
'Buffer',
943-
'TypedArray',
944-
'DataView',
945-
],
946-
view));
943+
thrownewERR_INVALID_ARG_TYPE(
944+
'view',
945+
[
946+
'Buffer',
947+
'TypedArray',
948+
'DataView',
949+
],
950+
view,
951+
);
947952
}
953+
validateObject(options,'options',kValidateObjectAllowObjectsAndNull);
948954

949955
constviewByteLength=ArrayBufferViewGetByteLength(view);
950956
constviewBuffer=ArrayBufferViewGetBuffer(view);
951957
constviewBufferByteLength=ArrayBufferPrototypeGetByteLength(viewBuffer);
952958

953959
if(viewByteLength===0||viewBufferByteLength===0){
954-
returnPromiseReject(
955-
newERR_INVALID_STATE.TypeError(
956-
'View or Viewed ArrayBuffer is zero-length or detached',
957-
),
958-
);
960+
thrownewERR_INVALID_STATE.TypeError(
961+
'View or Viewed ArrayBuffer is zero-length or detached');
959962
}
960963

961964
// Supposed to assert here that the view's buffer is not
962965
// detached, but there's no API available to use to check that.
966+
967+
constmin=options?.min??1;
968+
if(typeofmin!=='number')
969+
thrownewERR_INVALID_ARG_TYPE('options.min','number',min);
970+
if(!NumberIsInteger(min))
971+
thrownewERR_INVALID_ARG_VALUE('options.min',min,'must be an integer');
972+
if(min<=0)
973+
thrownewERR_INVALID_ARG_VALUE('options.min',min,'must be greater than 0');
974+
if(!isDataView(view)){
975+
if(min>TypedArrayPrototypeGetLength(view)){
976+
thrownewERR_OUT_OF_RANGE('options.min','<= view.length',min);
977+
}
978+
}elseif(min>viewByteLength){
979+
thrownewERR_OUT_OF_RANGE('options.min','<= view.byteLength',min);
980+
}
981+
963982
if(this[kState].stream===undefined){
964-
returnPromiseReject(
965-
newERR_INVALID_STATE.TypeError(
966-
'The reader is not attached to a stream'));
983+
thrownewERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
967984
}
968985
constreadIntoRequest=newReadIntoRequest();
969-
readableStreamBYOBReaderRead(this,view,readIntoRequest);
986+
readableStreamBYOBReaderRead(this,view,min,readIntoRequest);
970987
returnreadIntoRequest.promise;
971988
}
972989

@@ -1880,7 +1897,7 @@ function readableByteStreamTee(stream) {
18801897
reading=false;
18811898
},
18821899
};
1883-
readableStreamBYOBReaderRead(reader,view,readIntoRequest);
1900+
readableStreamBYOBReaderRead(reader,view,1,readIntoRequest);
18841901
}
18851902

18861903
functionpull1Algorithm(){
@@ -2207,7 +2224,7 @@ function readableStreamReaderGenericRelease(reader) {
22072224
reader[kState].stream=undefined;
22082225
}
22092226

2210-
functionreadableStreamBYOBReaderRead(reader,view,readIntoRequest){
2227+
functionreadableStreamBYOBReaderRead(reader,view,min,readIntoRequest){
22112228
const{
22122229
stream,
22132230
}=reader[kState];
@@ -2220,6 +2237,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22202237
readableByteStreamControllerPullInto(
22212238
stream[kState].controller,
22222239
view,
2240+
min,
22232241
readIntoRequest);
22242242
}
22252243

@@ -2492,7 +2510,7 @@ function readableByteStreamControllerClose(controller) {
24922510

24932511
if(pendingPullIntos.length){
24942512
constfirstPendingPullInto=pendingPullIntos[0];
2495-
if(firstPendingPullInto.bytesFilled>0){
2513+
if(firstPendingPullInto.bytesFilled%firstPendingPullInto.elementSize!==0){
24962514
consterror=newERR_INVALID_STATE.TypeError('Partial read');
24972515
readableByteStreamControllerError(controller,error);
24982516
throwerror;
@@ -2509,7 +2527,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25092527

25102528
letdone=false;
25112529
if(stream[kState].state==='closed'){
2512-
desc.bytesFilled=0;
2530+
assert(desc.bytesFilled%desc.elementSize===0);
25132531
done=true;
25142532
}
25152533

@@ -2598,6 +2616,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
25982616
functionreadableByteStreamControllerPullInto(
25992617
controller,
26002618
view,
2619+
min,
26012620
readIntoRequest){
26022621
const{
26032622
closeRequested,
@@ -2610,6 +2629,11 @@ function readableByteStreamControllerPullInto(
26102629
elementSize=view.constructor.BYTES_PER_ELEMENT;
26112630
ctor=view.constructor;
26122631
}
2632+
2633+
constminimumFill=min*elementSize;
2634+
assert(minimumFill>=elementSize&&minimumFill<=view.byteLength);
2635+
assert(minimumFill%elementSize===0);
2636+
26132637
constbuffer=ArrayBufferViewGetBuffer(view);
26142638
constbyteOffset=ArrayBufferViewGetByteOffset(view);
26152639
constbyteLength=ArrayBufferViewGetByteLength(view);
@@ -2628,6 +2652,7 @@ function readableByteStreamControllerPullInto(
26282652
byteOffset,
26292653
byteLength,
26302654
bytesFilled:0,
2655+
minimumFill,
26312656
elementSize,
26322657
ctor,
26332658
type:'byob',
@@ -2715,7 +2740,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27152740
}
27162741

27172742
functionreadableByteStreamControllerRespondInClosedState(controller,desc){
2718-
assert(!desc.bytesFilled);
2743+
assert(desc.bytesFilled%desc.elementSize===0);
27192744
if(desc.type==='none'){
27202745
readableByteStreamControllerShiftPendingPullInto(controller);
27212746
}
@@ -2892,17 +2917,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28922917
byteLength,
28932918
byteOffset,
28942919
bytesFilled,
2920+
minimumFill,
28952921
elementSize,
28962922
}=desc;
2897-
constcurrentAlignedBytes=bytesFilled-(bytesFilled%elementSize);
28982923
constmaxBytesToCopy=MathMin(
28992924
controller[kState].queueTotalSize,
29002925
byteLength-bytesFilled);
29012926
constmaxBytesFilled=bytesFilled+maxBytesToCopy;
29022927
constmaxAlignedBytes=maxBytesFilled-(maxBytesFilled%elementSize);
29032928
lettotalBytesToCopyRemaining=maxBytesToCopy;
29042929
letready=false;
2905-
if(maxAlignedBytes>currentAlignedBytes){
2930+
assert(bytesFilled<minimumFill);
2931+
if(maxAlignedBytes>=minimumFill){
29062932
totalBytesToCopyRemaining=maxAlignedBytes-bytesFilled;
29072933
ready=true;
29082934
}
@@ -2945,7 +2971,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29452971
if(!ready){
29462972
assert(!controller[kState].queueTotalSize);
29472973
assert(desc.bytesFilled>0);
2948-
assert(desc.bytesFilled<elementSize);
2974+
assert(desc.bytesFilled<minimumFill);
29492975
}
29502976
returnready;
29512977
}
@@ -3001,7 +3027,7 @@ function readableByteStreamControllerRespondInReadableState(
30013027
return;
30023028
}
30033029

3004-
if(desc.bytesFilled<desc.elementSize)
3030+
if(desc.bytesFilled<desc.minimumFill)
30053031
return;
30063032

30073033
readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3186,6 +3212,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31863212
byteOffset:0,
31873213
byteLength:autoAllocateChunkSize,
31883214
bytesFilled:0,
3215+
minimumFill:1,
31893216
elementSize:1,
31903217
ctor:Uint8Array,
31913218
type:'default',

‎lib/internal/webstreams/transformstream.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const {
2929
kEnumerableProperty,
3030
}=require('internal/util');
3131

32+
const{
33+
validateObject,
34+
kValidateObjectAllowObjects,
35+
kValidateObjectAllowObjectsAndNull,
36+
}=require('internal/validators');
37+
3238
const{
3339
kDeserialize,
3440
kTransfer,
@@ -119,9 +125,12 @@ class TransformStream {
119125
*@param {QueuingStrategy} [readableStrategy]
120126
*/
121127
constructor(
122-
transformer=null,
128+
transformer=kEmptyObject,
123129
writableStrategy=kEmptyObject,
124130
readableStrategy=kEmptyObject){
131+
validateObject(transformer,'transformer',kValidateObjectAllowObjects);
132+
validateObject(writableStrategy,'writableStrategy',kValidateObjectAllowObjectsAndNull);
133+
validateObject(readableStrategy,'readableStrategy',kValidateObjectAllowObjectsAndNull);
125134
constreadableType=transformer?.readableType;
126135
constwritableType=transformer?.writableType;
127136
conststart=transformer?.start;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp