- Notifications
You must be signed in to change notification settings - Fork25
A stream implementation that does more by doing less
License
isaacs/minipass
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Avery minimal implementation of aPassThroughstream
It's veryfastfor objects, strings, and buffers.
Supportspipe()
ing (including multi-pipe()
and backpressuretransmission), buffering data until either adata
event handlerorpipe()
is added (so you don't lose the first chunk), andmost other cases where PassThrough is a good idea.
There is aread()
method, but it's much more efficient toconsume data from this stream via'data'
events or by callingpipe()
into some other stream. Callingread()
requires thebuffer to be flattened in some cases, which requires copyingmemory.
If you setobjectMode: true
in the options, then whatever iswritten will be emitted. Otherwise, it'll do a minimal amount ofBuffer copying to ensure proper Streams semantics whenread(n)
is called.
objectMode
can only be set at instantiation. Attempting towrite something other than a String or Buffer without having setobjectMode
in the options will throw an error.
This is not athrough
orthrough2
stream. It doesn'ttransform the data, it just passes it right through. If you wantto transform the data, extend the class, and override thewrite()
method. Once you're done transforming the data howeveryou want, callsuper.write()
with the transform output.
For some examples of streams that extend Minipass in variousways, check out:
- minizlib
- fs-minipass
- tar
- minipass-collect
- minipass-flush
- minipass-pipeline
- tap
- tap-parser
- treport
- minipass-fetch
- pacote
- make-fetch-happen
- cacache
- ssri
- npm-registry-fetch
- minipass-json-stream
- minipass-sized
TheMinipass
class takes three type template definitions:
RType
the type being read, which defaults toBuffer
. IfRType
isstring
, then the constructormust get an optionsobject specifying either anencoding
orobjectMode: true
.If it's anything other thanstring
orBuffer
, then itmust get an options object specifyingobjectMode: true
.WType
the type being written. IfRType
isBuffer
orstring
, then this defaults toContiguousData
(Buffer,string, ArrayBuffer, or ArrayBufferView). Otherwise, itdefaults toRType
.Events
type mapping event names to the arguments emittedwith that event, which extendsMinipass.Events
.
To declare types for custom events in subclasses, extend thethird parameter with your own event signatures. For example:
import{Minipass}from'minipass'// a NDJSON stream that emits 'jsonError' when it can't stringifyexportinterfaceEventsextendsMinipass.Events{ jsonError:[e:Error]}exportclassNDJSONStreamextendsMinipass<string,any,Events>{constructor(){super({objectMode:true})}// data is type `any` because that's WTypewrite(data,encoding,cb){try{constjson=JSON.stringify(data)returnsuper.write(json+'\n',encoding,cb)}catch(er){if(!erinstanceofError){er=Object.assign(newError('json stringify failed'),{cause:er,})}// trying to emit with something OTHER than an error will// fail, because we declared the event arguments type.this.emit('jsonError',er)}}}consts=newNDJSONStream()s.on('jsonError',e=>{// here, TS knows that e is an Error})
Emitting/handling events that aren't declared in this way isfine, but the arguments will be typed asunknown
.
There are several things that make Minipass streams differentfrom (and in some ways superior to) Node.js core streams.
Please read these caveats if you are familiar with node-corestreams and intend to use Minipass streams in your programs.
You can avoid most of these differences entirely (for a verysmall performance penalty) by setting{async: true}
in theconstructor options.
Minipass streams are designed to support synchronous use-cases.Thus, data is emitted as soon as it is available, always. It isbuffered until read, but no longer. Another way to look at it isthat Minipass streams are exactly as synchronous as the logicthat writes into them.
This can be surprising if your code relies onPassThrough.write()
always providing data on the next tickrather than the current one, or being able to callresume()
andnot have the entire buffer disappear immediately.
However, without this synchronicity guarantee, there would be noway for Minipass to achieve the speeds it does, or support thesynchronous use cases that it does. Simply put, waiting takestime.
This non-deferring approach makes Minipass streams much easier toreason about, especially in the context of Promises and otherflow-control mechanisms.
Example:
// hybrid module, either worksimport{Minipass}from'minipass'// or:const{ Minipass}=require('minipass')conststream=newMinipass()stream.on('data',()=>console.log('data event'))console.log('before write')stream.write('hello')console.log('after write')// output:// before write// data event// after write
If you wish to have a Minipass stream with behavior that moreclosely mimics Node.js core streams, you can set the stream inasync mode either by settingasync: true
in the constructoroptions, or by settingstream.async = true
later on.
// hybrid module, either worksimport{Minipass}from'minipass'// or:const{ Minipass}=require('minipass')constasyncStream=newMinipass({async:true})asyncStream.on('data',()=>console.log('data event'))console.log('before write')asyncStream.write('hello')console.log('after write')// output:// before write// after write// data event <-- this is deferred until the next tick
Switchingout of async mode is unsafe, as it could cause datacorruption, and so is not enabled. Example:
import{Minipass}from'minipass'conststream=newMinipass({encoding:'utf8'})stream.on('data',chunk=>console.log(chunk))stream.async=trueconsole.log('before writes')stream.write('hello')setStreamSyncAgainSomehow(stream)// <-- this doesn't actually exist!stream.write('world')console.log('after writes')// hypothetical output would be:// before writes// world// after writes// hello// NOT GOOD!
To avoid this problem, once set into async mode, any attempt tomake the stream sync again will be ignored.
const{ Minipass}=require('minipass')conststream=newMinipass({encoding:'utf8'})stream.on('data',chunk=>console.log(chunk))stream.async=trueconsole.log('before writes')stream.write('hello')stream.async=false// <-- no-op, stream already asyncstream.write('world')console.log('after writes')// actual output:// before writes// after writes// hello// world
Node.js core streams will optimistically fill up a buffer,returningtrue
on all writes until the limit is hit, even ifthe data has nowhere to go. Then, they will not attempt to drawmore data in until the buffer size dips below a minimum value.
Minipass streams are much simpler. Thewrite()
method willreturntrue
if the data has somewhere to go (which is to say,given the timing guarantees, that the data is already there bythe timewrite()
returns).
If the data has nowhere to go, thenwrite()
returns false, andthe data sits in a buffer, to be drained out immediately as soonas anyone consumes it.
Since nothing is ever buffered unnecessarily, there is much lesscopying data, and less bookkeeping about buffer capacity levels.
Since data written to a Minipass stream is immediately writtenall the way through the pipeline, andwrite()
always returnstrue/false based on whether the data was fully flushed,backpressure is communicated immediately to the upstream caller.This minimizes buffering.
Consider this case:
const{ PassThrough}=require('stream')constp1=newPassThrough({highWaterMark:1024})constp2=newPassThrough({highWaterMark:1024})constp3=newPassThrough({highWaterMark:1024})constp4=newPassThrough({highWaterMark:1024})p1.pipe(p2).pipe(p3).pipe(p4)p4.on('data',()=>console.log('made it through'))// this returns false and buffers, then writes to p2 on next tick (1)// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'// on next tick (4)// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and// 'drain' on next tick (5)// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next// tick (7)p1.write(Buffer.alloc(2048))// returns false
Along the way, the data was buffered and deferred at each stage,and multiple event deferrals happened, for an unblocked pipelinewhere it was perfectly safe to write all the way through!
Furthermore, setting ahighWaterMark
of1024
might leadsomeone reading the code to think an advisory maximum of 1KiB isbeing set for the pipeline. However, the actual advisorybuffering level is thesum ofhighWaterMark
values, sinceeach one has its own bucket.
Consider the Minipass case:
constm1=newMinipass()constm2=newMinipass()constm3=newMinipass()constm4=newMinipass()m1.pipe(m2).pipe(m3).pipe(m4)m4.on('data',()=>console.log('made it through'))// m1 is flowing, so it writes the data to m2 immediately// m2 is flowing, so it writes the data to m3 immediately// m3 is flowing, so it writes the data to m4 immediately// m4 is flowing, so it fires the 'data' event immediately, returns true// m4's write returned true, so m3 is still flowing, returns true// m3's write returned true, so m2 is still flowing, returns true// m2's write returned true, so m1 is still flowing, returns true// No event deferrals or buffering along the way!m1.write(Buffer.alloc(2048))// returns true
It is extremely unlikely that youdon't want to buffer any datawritten, orever buffer data that can be flushed all the waythrough. Neither node-core streams nor Minipass ever fail tobuffer written data, but node-core streams do a lot ofunnecessary buffering and pausing.
As always, the faster implementation is the one that does lessstuff and waits less time to do it.
If a stream is not paused, andend()
is called before writingany data into it, then it will emitend
immediately.
If you have logic that occurs on theend
event which you don'twant to potentially happen immediately (for example, closing filedescriptors, moving on to the next entry in an archive parsestream, etc.) then be sure to callstream.pause()
on creation,and thenstream.resume()
once you are ready to respond to theend
event.
However, this isusually not a problem because:
One hazard of immediately emitting'end'
is that you may notyet have had a chance to add a listener. In order to avoid thishazard, Minipass streams safely re-emit the'end'
event if anew listener is added after'end'
has been emitted.
Ie, if you dostream.on('end', someFunction)
, and the streamhas already emittedend
, then it will call the handler rightaway. (You can think of this somewhat like attaching a new.then(fn)
to a previously-resolved Promise.)
To prevent calling handlers multiple times who would not expectmultiple ends to occur, all listeners are removed from the'end'
event whenever it is emitted.
The most recent error object passed to the'error'
event isstored on the stream. If a new'error'
event handler is added,and an error was previously emitted, then the event handler willbe called immediately (or onprocess.nextTick
in the case ofasync streams).
This makes it much more difficult to end up trying to interactwith a broken stream, if the error handler is added after anerror was previously emitted.
A "tee stream" is a stream piping to multiple destinations:
consttee=newMinipass()t.pipe(dest1)t.pipe(dest2)t.write('foo')// goes to both destinations
Since Minipass streamsimmediately process any pending datathrough the pipeline when a new pipe destination is added, thiscan have surprising effects, especially when a stream comes infrom some other function and may or may not have data in itsbuffer.
// WARNING! WILL LOSE DATA!constsrc=newMinipass()src.write('foo')src.pipe(dest1)// 'foo' chunk flows to dest1 immediately, and is gonesrc.pipe(dest2)// gets nothing!
One solution is to create a dedicated tee-stream junction thatpipes to both locations, and then pipe tothat instead.
// Safe example: tee to both placesconstsrc=newMinipass()src.write('foo')consttee=newMinipass()tee.pipe(dest1)tee.pipe(dest2)src.pipe(tee)// tee gets 'foo', pipes to both locations
The same caveat applies toon('data')
event listeners. Thefirst one added willimmediately receive all of the data,leaving nothing for the second:
// WARNING! WILL LOSE DATA!constsrc=newMinipass()src.write('foo')src.on('data',handler1)// receives 'foo' right awaysrc.on('data',handler2)// nothing to see here!
Using a dedicated tee-stream can be used in this case as well:
// Safe example: tee to both data handlersconstsrc=newMinipass()src.write('foo')consttee=newMinipass()tee.on('data',handler1)tee.on('data',handler2)src.pipe(tee)
All of the hazards in this section are avoided by setting{ async: true }
in the Minipass constructor, or by settingstream.async = true
afterwards. Note that this does add someoverhead, so should only be done in cases where you are willingto lose a bit of performance in order to avoid having to refactorprogram logic.
It's a stream! Use it like a stream and it'll most likely do whatyou want.
import{Minipass}from'minipass'constmp=newMinipass(options)// options is optionalmp.write('foo')mp.pipe(someOtherStream)mp.end('bar')
encoding
How would you like the data comingout of thestream to be encoded? Accepts any values that can be passed toBuffer.toString()
.objectMode
Emit data exactly as it comes in. This will beflipped on by default if you write() something other than astring or Buffer at any point. SettingobjectMode: true
willprevent setting any encoding value.async
Defaults tofalse
. Set totrue
to defer dataemission until next tick. This reduces performance slightly,but makes Minipass streams use timing behavior closer to Nodecore streams. SeeTiming for more details.signal
AnAbortSignal
that will cause the stream to unhookitself from everything and become as inert as possible. Notethat providing asignal
parameter will make'error'
eventsno longer throw if they are unhandled, but they will still beemitted to handlers if any are attached.
Implements the user-facing portions of Node.js'sReadable
andWritable
streams.
write(chunk, [encoding], [callback])
- Put data in. (Notethat, in the base Minipass class, the same data will come out.)Returnsfalse
if the stream will buffer the next write, ortrue if it's still in "flowing" mode.end([chunk, [encoding]], [callback])
- Signal that you haveno more data to write. This will queue anend
event to befired when all the data has been consumed.pause()
- No more data for a while, please. This alsopreventsend
from being emitted for empty streams until thestream is resumed.resume()
- Resume the stream. If there's data in the buffer,it is all discarded. Any buffered events are immediatelyemitted.pipe(dest)
- Send all output to the stream provided. Whendata is emitted, it is immediately written to any and all pipedestinations. (Or written on next tick inasync
mode.)unpipe(dest)
- Stop piping to the destination stream. This isimmediate, meaning that any asynchronously queued data willnot make it to the destination when running inasync
mode.options.end
- Boolean, end the destination stream when thesource stream ends. Defaulttrue
.options.proxyErrors
- Boolean, proxyerror
events fromthe source stream to the destination stream. Note that errorsarenot proxied after the pipeline terminates, either dueto the source emitting'end'
or manually unpiping withsrc.unpipe(dest)
. Defaultfalse
.
on(ev, fn)
,emit(ev, fn)
- Minipass streams areEventEmitters. Some events are given special treatment,however. (See below under "events".)promise()
- Returns a Promise that resolves when the streamemitsend
, or rejects if the stream emitserror
.collect()
- Return a Promise that resolves onend
with anarray containing each chunk of data that was emitted, orrejects if the stream emitserror
. Note that this consumesthe stream data.concat()
- Same ascollect()
, but concatenates the datainto a single Buffer object. Will reject the returned promiseif the stream is in objectMode, or if it goes into objectModeby the end of the data.read(n)
- Consumen
bytes of data out of the buffer. Ifn
is not provided, then consume all of it. Ifn
bytes are notavailable, then it returns null.Note consuming streams inthis way is less efficient, and can lead to unnecessary Buffercopying.destroy([er])
- Destroy the stream. If an error is provided,then an'error'
event is emitted. If the stream has aclose()
method, and has not emitted a'close'
event yet,thenstream.close()
will be called. Any Promises returned by.promise()
,.collect()
or.concat()
will be rejected.After being destroyed, writing to the stream will emit anerror. No more data will be emitted if the stream is destroyed,even if it was previously buffered.
bufferLength
Read-only. Total number of bytes buffered, or inthe case of objectMode, the total number of objects.encoding
Read-only. The encoding that has been set.flowing
Read-only. Boolean indicating whether a chunk writtento the stream will be immediately emitted.emittedEnd
Read-only. Boolean indicating whether the end-ishevents (ie,end
,prefinish
,finish
) have been emitted.Note that listening on any end-ish event will immediateylre-emit it if it has already been emitted.writable
Whether the stream is writable. Defaulttrue
. Settofalse
whenend()
readable
Whether the stream is readable. Defaulttrue
.pipes
An array of Pipe objects referencing streams that thisstream is piping into.destroyed
A getter that indicates whether the stream wasdestroyed.paused
True if the stream has been explicitly paused,otherwise false.objectMode
Indicates whether the stream is inobjectMode
.aborted
Readonly property set when theAbortSignal
dispatches anabort
event.
data
Emitted when there's data to read. Argument is the datato read. This is never emitted while not flowing. If a listeneris attached, that will resume the stream.end
Emitted when there's no more data to read. This will beemitted immediately for empty streams whenend()
is called.If a listener is attached, andend
was already emitted, thenit will be emitted again. All listeners are removed whenend
is emitted.prefinish
An end-ish event that follows the same logic asend
and is emitted in the same conditions whereend
isemitted. Emitted after'end'
.finish
An end-ish event that follows the same logic asend
and is emitted in the same conditions whereend
is emitted.Emitted after'prefinish'
.close
An indication that an underlying resource has beenreleased. Minipass does not emit this event, but will defer ituntil afterend
has been emitted, since it throws off somestream libraries otherwise.drain
Emitted when the internal buffer empties, and it isagain suitable towrite()
into the stream.readable
Emitted when data is buffered and ready to be readby a consumer.resume
Emitted when stream changes state from buffering toflowing mode. (Ie, whenresume
is called,pipe
is called,or adata
event listener is added.)
Minipass.isStream(stream)
Returnstrue
if the argument is astream, and false otherwise. To be considered a stream, theobject must be either an instance of Minipass, or anEventEmitter that has either apipe()
method, or bothwrite()
andend()
methods. (Pretty much any stream innode-land will returntrue
for this.)
Here are some examples of things you can do with Minipassstreams.
mp.promise().then(()=>{// stream is finished},er=>{// stream emitted an error})
mp.collect().then(all=>{// all is an array of all the data emitted// encoding is supported in this case, so// so the result will be a collection of strings if// an encoding is specified, or buffers/objects if not.//// In an async function, you may do// const data = await stream.collect()})
This is a bit slower because it concatenates the data into onechunk for you, but if you're going to do it yourself anyway, it'sconvenient this way:
mp.concat().then(onebigchunk=>{// onebigchunk is a string if the stream// had an encoding set, or a buffer otherwise.})
You can iterate over streams synchronously or asynchronously inplatforms that support it.
Synchronous iteration will end when the currently available datais consumed, even if theend
event has not been reached. Instring and buffer mode, the data is concatenated, so unlessmultiple writes are occurring in the same tick as theread()
,sync iteration loops will generally only have a single iteration.
To consume chunks in this way exactly as they have been written,with no flattening, create the stream with the{ objectMode: true }
option.
constmp=newMinipass({objectMode:true})mp.write('a')mp.write('b')for(letletterofmp){console.log(letter)// a, b}mp.write('c')mp.write('d')for(letletterofmp){console.log(letter)// c, d}mp.write('e')mp.end()for(letletterofmp){console.log(letter)// e}for(letletterofmp){console.log(letter)// nothing}
Asynchronous iteration will continue until the end event is reached,consuming all of the data.
constmp=newMinipass({encoding:'utf8'})// some source of some dataleti=5constinter=setInterval(()=>{if(i-->0)mp.write(Buffer.from('foo\n','utf8'))else{mp.end()clearInterval(inter)}},100)// consume the data with asynchronous iterationasyncfunctionconsume(){forawait(letchunkofmp){console.log(chunk)}return'ok'}consume().then(res=>console.log(res))// logs `foo\n` 5 times, and then `ok`
classLoggerextendsMinipass{write(chunk,encoding,callback){console.log('WRITE',chunk,encoding)returnsuper.write(chunk,encoding,callback)}end(chunk,encoding,callback){console.log('END',chunk,encoding)returnsuper.end(chunk,encoding,callback)}}someSource.pipe(newLogger()).pipe(someDest)
// js classes are funsomeSource.pipe(new(classextendsMinipass{emit(ev, ...data){// let's also log events, because debugging some weird thingconsole.log('EMIT',ev)returnsuper.emit(ev, ...data)}write(chunk,encoding,callback){console.log('WRITE',chunk,encoding)returnsuper.write(chunk,encoding,callback)}end(chunk,encoding,callback){console.log('END',chunk,encoding)returnsuper.end(chunk,encoding,callback)}})()).pipe(someDest)
classSlowEndextendsMinipass{emit(ev, ...args){if(ev==='end'){console.log('going to end, hold on a sec')setTimeout(()=>{console.log('ok, ready to end now')super.emit('end', ...args)},100)returntrue}else{returnsuper.emit(ev, ...args)}}}
classNDJSONEncodeextendsMinipass{write(obj,cb){try{// JSON.stringify can throw, emit an error on thatreturnsuper.write(JSON.stringify(obj)+'\n','utf8',cb)}catch(er){this.emit('error',er)}}end(obj,cb){if(typeofobj==='function'){cb=objobj=undefined}if(obj!==undefined){this.write(obj)}returnsuper.end(cb)}}
classNDJSONDecodeextendsMinipass{constructor(options){// always be in object mode, as far as Minipass is concernedsuper({objectMode:true})this._jsonBuffer=''}write(chunk,encoding,cb){if(typeofchunk==='string'&&typeofencoding==='string'&&encoding!=='utf8'){chunk=Buffer.from(chunk,encoding).toString()}elseif(Buffer.isBuffer(chunk)){chunk=chunk.toString()}if(typeofencoding==='function'){cb=encoding}constjsonData=(this._jsonBuffer+chunk).split('\n')this._jsonBuffer=jsonData.pop()for(leti=0;i<jsonData.length;i++){try{// JSON.parse can throw, emit an error on thatsuper.write(JSON.parse(jsonData[i]))}catch(er){this.emit('error',er)continue}}if(cb)cb()}}
About
A stream implementation that does more by doing less