Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A stream implementation that does more by doing less

License

NotificationsYou must be signed in to change notification settings

isaacs/minipass

Repository files navigation

Avery minimal implementation of aPassThroughstream

It's veryfastfor objects, strings, and buffers.

Supportspipe()ing (including multi-pipe() and backpressuretransmission), buffering data until either adata event handlerorpipe() is added (so you don't lose the first chunk), andmost other cases where PassThrough is a good idea.

There is aread() method, but it's much more efficient toconsume data from this stream via'data' events or by callingpipe() into some other stream. Callingread() requires thebuffer to be flattened in some cases, which requires copyingmemory.

If you setobjectMode: true in the options, then whatever iswritten will be emitted. Otherwise, it'll do a minimal amount ofBuffer copying to ensure proper Streams semantics whenread(n)is called.

objectMode can only be set at instantiation. Attempting towrite something other than a String or Buffer without having setobjectMode in the options will throw an error.

This is not athrough orthrough2 stream. It doesn'ttransform the data, it just passes it right through. If you wantto transform the data, extend the class, and override thewrite() method. Once you're done transforming the data howeveryou want, callsuper.write() with the transform output.

For some examples of streams that extend Minipass in variousways, check out:

Usage in TypeScript

TheMinipass class takes three type template definitions:

  • RType the type being read, which defaults toBuffer. IfRType isstring, then the constructormust get an optionsobject specifying either anencoding orobjectMode: true.If it's anything other thanstring orBuffer, then itmust get an options object specifyingobjectMode: true.
  • WType the type being written. IfRType isBuffer orstring, then this defaults toContiguousData (Buffer,string, ArrayBuffer, or ArrayBufferView). Otherwise, itdefaults toRType.
  • Events type mapping event names to the arguments emittedwith that event, which extendsMinipass.Events.

To declare types for custom events in subclasses, extend thethird parameter with your own event signatures. For example:

import{Minipass}from'minipass'// a NDJSON stream that emits 'jsonError' when it can't stringifyexportinterfaceEventsextendsMinipass.Events{  jsonError:[e:Error]}exportclassNDJSONStreamextendsMinipass<string,any,Events>{constructor(){super({objectMode:true})}// data is type `any` because that's WTypewrite(data,encoding,cb){try{constjson=JSON.stringify(data)returnsuper.write(json+'\n',encoding,cb)}catch(er){if(!erinstanceofError){er=Object.assign(newError('json stringify failed'),{cause:er,})}// trying to emit with something OTHER than an error will// fail, because we declared the event arguments type.this.emit('jsonError',er)}}}consts=newNDJSONStream()s.on('jsonError',e=>{// here, TS knows that e is an Error})

Emitting/handling events that aren't declared in this way isfine, but the arguments will be typed asunknown.

Differences from Node.js Streams

There are several things that make Minipass streams differentfrom (and in some ways superior to) Node.js core streams.

Please read these caveats if you are familiar with node-corestreams and intend to use Minipass streams in your programs.

You can avoid most of these differences entirely (for a verysmall performance penalty) by setting{async: true} in theconstructor options.

Timing

Minipass streams are designed to support synchronous use-cases.Thus, data is emitted as soon as it is available, always. It isbuffered until read, but no longer. Another way to look at it isthat Minipass streams are exactly as synchronous as the logicthat writes into them.

This can be surprising if your code relies onPassThrough.write() always providing data on the next tickrather than the current one, or being able to callresume() andnot have the entire buffer disappear immediately.

However, without this synchronicity guarantee, there would be noway for Minipass to achieve the speeds it does, or support thesynchronous use cases that it does. Simply put, waiting takestime.

This non-deferring approach makes Minipass streams much easier toreason about, especially in the context of Promises and otherflow-control mechanisms.

Example:

// hybrid module, either worksimport{Minipass}from'minipass'// or:const{ Minipass}=require('minipass')conststream=newMinipass()stream.on('data',()=>console.log('data event'))console.log('before write')stream.write('hello')console.log('after write')// output:// before write// data event// after write

Exception: Async Opt-In

If you wish to have a Minipass stream with behavior that moreclosely mimics Node.js core streams, you can set the stream inasync mode either by settingasync: true in the constructoroptions, or by settingstream.async = true later on.

// hybrid module, either worksimport{Minipass}from'minipass'// or:const{ Minipass}=require('minipass')constasyncStream=newMinipass({async:true})asyncStream.on('data',()=>console.log('data event'))console.log('before write')asyncStream.write('hello')console.log('after write')// output:// before write// after write// data event <-- this is deferred until the next tick

Switchingout of async mode is unsafe, as it could cause datacorruption, and so is not enabled. Example:

import{Minipass}from'minipass'conststream=newMinipass({encoding:'utf8'})stream.on('data',chunk=>console.log(chunk))stream.async=trueconsole.log('before writes')stream.write('hello')setStreamSyncAgainSomehow(stream)// <-- this doesn't actually exist!stream.write('world')console.log('after writes')// hypothetical output would be:// before writes// world// after writes// hello// NOT GOOD!

To avoid this problem, once set into async mode, any attempt tomake the stream sync again will be ignored.

const{ Minipass}=require('minipass')conststream=newMinipass({encoding:'utf8'})stream.on('data',chunk=>console.log(chunk))stream.async=trueconsole.log('before writes')stream.write('hello')stream.async=false// <-- no-op, stream already asyncstream.write('world')console.log('after writes')// actual output:// before writes// after writes// hello// world

No High/Low Water Marks

Node.js core streams will optimistically fill up a buffer,returningtrue on all writes until the limit is hit, even ifthe data has nowhere to go. Then, they will not attempt to drawmore data in until the buffer size dips below a minimum value.

Minipass streams are much simpler. Thewrite() method willreturntrue if the data has somewhere to go (which is to say,given the timing guarantees, that the data is already there bythe timewrite() returns).

If the data has nowhere to go, thenwrite() returns false, andthe data sits in a buffer, to be drained out immediately as soonas anyone consumes it.

Since nothing is ever buffered unnecessarily, there is much lesscopying data, and less bookkeeping about buffer capacity levels.

Hazards of Buffering (or: Why Minipass Is So Fast)

Since data written to a Minipass stream is immediately writtenall the way through the pipeline, andwrite() always returnstrue/false based on whether the data was fully flushed,backpressure is communicated immediately to the upstream caller.This minimizes buffering.

Consider this case:

const{ PassThrough}=require('stream')constp1=newPassThrough({highWaterMark:1024})constp2=newPassThrough({highWaterMark:1024})constp3=newPassThrough({highWaterMark:1024})constp4=newPassThrough({highWaterMark:1024})p1.pipe(p2).pipe(p3).pipe(p4)p4.on('data',()=>console.log('made it through'))// this returns false and buffers, then writes to p2 on next tick (1)// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'// on next tick (4)// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and// 'drain' on next tick (5)// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next// tick (7)p1.write(Buffer.alloc(2048))// returns false

Along the way, the data was buffered and deferred at each stage,and multiple event deferrals happened, for an unblocked pipelinewhere it was perfectly safe to write all the way through!

Furthermore, setting ahighWaterMark of1024 might leadsomeone reading the code to think an advisory maximum of 1KiB isbeing set for the pipeline. However, the actual advisorybuffering level is thesum ofhighWaterMark values, sinceeach one has its own bucket.

Consider the Minipass case:

constm1=newMinipass()constm2=newMinipass()constm3=newMinipass()constm4=newMinipass()m1.pipe(m2).pipe(m3).pipe(m4)m4.on('data',()=>console.log('made it through'))// m1 is flowing, so it writes the data to m2 immediately// m2 is flowing, so it writes the data to m3 immediately// m3 is flowing, so it writes the data to m4 immediately// m4 is flowing, so it fires the 'data' event immediately, returns true// m4's write returned true, so m3 is still flowing, returns true// m3's write returned true, so m2 is still flowing, returns true// m2's write returned true, so m1 is still flowing, returns true// No event deferrals or buffering along the way!m1.write(Buffer.alloc(2048))// returns true

It is extremely unlikely that youdon't want to buffer any datawritten, orever buffer data that can be flushed all the waythrough. Neither node-core streams nor Minipass ever fail tobuffer written data, but node-core streams do a lot ofunnecessary buffering and pausing.

As always, the faster implementation is the one that does lessstuff and waits less time to do it.

Immediately emitend for empty streams (when not paused)

If a stream is not paused, andend() is called before writingany data into it, then it will emitend immediately.

If you have logic that occurs on theend event which you don'twant to potentially happen immediately (for example, closing filedescriptors, moving on to the next entry in an archive parsestream, etc.) then be sure to callstream.pause() on creation,and thenstream.resume() once you are ready to respond to theend event.

However, this isusually not a problem because:

Emitend When Asked

One hazard of immediately emitting'end' is that you may notyet have had a chance to add a listener. In order to avoid thishazard, Minipass streams safely re-emit the'end' event if anew listener is added after'end' has been emitted.

Ie, if you dostream.on('end', someFunction), and the streamhas already emittedend, then it will call the handler rightaway. (You can think of this somewhat like attaching a new.then(fn) to a previously-resolved Promise.)

To prevent calling handlers multiple times who would not expectmultiple ends to occur, all listeners are removed from the'end' event whenever it is emitted.

Emiterror When Asked

The most recent error object passed to the'error' event isstored on the stream. If a new'error' event handler is added,and an error was previously emitted, then the event handler willbe called immediately (or onprocess.nextTick in the case ofasync streams).

This makes it much more difficult to end up trying to interactwith a broken stream, if the error handler is added after anerror was previously emitted.

Impact of "immediate flow" on Tee-streams

A "tee stream" is a stream piping to multiple destinations:

consttee=newMinipass()t.pipe(dest1)t.pipe(dest2)t.write('foo')// goes to both destinations

Since Minipass streamsimmediately process any pending datathrough the pipeline when a new pipe destination is added, thiscan have surprising effects, especially when a stream comes infrom some other function and may or may not have data in itsbuffer.

// WARNING! WILL LOSE DATA!constsrc=newMinipass()src.write('foo')src.pipe(dest1)// 'foo' chunk flows to dest1 immediately, and is gonesrc.pipe(dest2)// gets nothing!

One solution is to create a dedicated tee-stream junction thatpipes to both locations, and then pipe tothat instead.

// Safe example: tee to both placesconstsrc=newMinipass()src.write('foo')consttee=newMinipass()tee.pipe(dest1)tee.pipe(dest2)src.pipe(tee)// tee gets 'foo', pipes to both locations

The same caveat applies toon('data') event listeners. Thefirst one added willimmediately receive all of the data,leaving nothing for the second:

// WARNING! WILL LOSE DATA!constsrc=newMinipass()src.write('foo')src.on('data',handler1)// receives 'foo' right awaysrc.on('data',handler2)// nothing to see here!

Using a dedicated tee-stream can be used in this case as well:

// Safe example: tee to both data handlersconstsrc=newMinipass()src.write('foo')consttee=newMinipass()tee.on('data',handler1)tee.on('data',handler2)src.pipe(tee)

All of the hazards in this section are avoided by setting{ async: true } in the Minipass constructor, or by settingstream.async = true afterwards. Note that this does add someoverhead, so should only be done in cases where you are willingto lose a bit of performance in order to avoid having to refactorprogram logic.

USAGE

It's a stream! Use it like a stream and it'll most likely do whatyou want.

import{Minipass}from'minipass'constmp=newMinipass(options)// options is optionalmp.write('foo')mp.pipe(someOtherStream)mp.end('bar')

OPTIONS

  • encoding How would you like the data comingout of thestream to be encoded? Accepts any values that can be passed toBuffer.toString().
  • objectMode Emit data exactly as it comes in. This will beflipped on by default if you write() something other than astring or Buffer at any point. SettingobjectMode: true willprevent setting any encoding value.
  • async Defaults tofalse. Set totrue to defer dataemission until next tick. This reduces performance slightly,but makes Minipass streams use timing behavior closer to Nodecore streams. SeeTiming for more details.
  • signal AnAbortSignal that will cause the stream to unhookitself from everything and become as inert as possible. Notethat providing asignal parameter will make'error' eventsno longer throw if they are unhandled, but they will still beemitted to handlers if any are attached.

API

Implements the user-facing portions of Node.js'sReadable andWritable streams.

Methods

  • write(chunk, [encoding], [callback]) - Put data in. (Notethat, in the base Minipass class, the same data will come out.)Returnsfalse if the stream will buffer the next write, ortrue if it's still in "flowing" mode.
  • end([chunk, [encoding]], [callback]) - Signal that you haveno more data to write. This will queue anend event to befired when all the data has been consumed.
  • pause() - No more data for a while, please. This alsopreventsend from being emitted for empty streams until thestream is resumed.
  • resume() - Resume the stream. If there's data in the buffer,it is all discarded. Any buffered events are immediatelyemitted.
  • pipe(dest) - Send all output to the stream provided. Whendata is emitted, it is immediately written to any and all pipedestinations. (Or written on next tick inasync mode.)
  • unpipe(dest) - Stop piping to the destination stream. This isimmediate, meaning that any asynchronously queued data willnot make it to the destination when running inasync mode.
    • options.end - Boolean, end the destination stream when thesource stream ends. Defaulttrue.
    • options.proxyErrors - Boolean, proxyerror events fromthe source stream to the destination stream. Note that errorsarenot proxied after the pipeline terminates, either dueto the source emitting'end' or manually unpiping withsrc.unpipe(dest). Defaultfalse.
  • on(ev, fn),emit(ev, fn) - Minipass streams areEventEmitters. Some events are given special treatment,however. (See below under "events".)
  • promise() - Returns a Promise that resolves when the streamemitsend, or rejects if the stream emitserror.
  • collect() - Return a Promise that resolves onend with anarray containing each chunk of data that was emitted, orrejects if the stream emitserror. Note that this consumesthe stream data.
  • concat() - Same ascollect(), but concatenates the datainto a single Buffer object. Will reject the returned promiseif the stream is in objectMode, or if it goes into objectModeby the end of the data.
  • read(n) - Consumen bytes of data out of the buffer. Ifnis not provided, then consume all of it. Ifn bytes are notavailable, then it returns null.Note consuming streams inthis way is less efficient, and can lead to unnecessary Buffercopying.
  • destroy([er]) - Destroy the stream. If an error is provided,then an'error' event is emitted. If the stream has aclose() method, and has not emitted a'close' event yet,thenstream.close() will be called. Any Promises returned by.promise(),.collect() or.concat() will be rejected.After being destroyed, writing to the stream will emit anerror. No more data will be emitted if the stream is destroyed,even if it was previously buffered.

Properties

  • bufferLength Read-only. Total number of bytes buffered, or inthe case of objectMode, the total number of objects.
  • encoding Read-only. The encoding that has been set.
  • flowing Read-only. Boolean indicating whether a chunk writtento the stream will be immediately emitted.
  • emittedEnd Read-only. Boolean indicating whether the end-ishevents (ie,end,prefinish,finish) have been emitted.Note that listening on any end-ish event will immediateylre-emit it if it has already been emitted.
  • writable Whether the stream is writable. Defaulttrue. Settofalse whenend()
  • readable Whether the stream is readable. Defaulttrue.
  • pipes An array of Pipe objects referencing streams that thisstream is piping into.
  • destroyed A getter that indicates whether the stream wasdestroyed.
  • paused True if the stream has been explicitly paused,otherwise false.
  • objectMode Indicates whether the stream is inobjectMode.
  • aborted Readonly property set when theAbortSignaldispatches anabort event.

Events

  • data Emitted when there's data to read. Argument is the datato read. This is never emitted while not flowing. If a listeneris attached, that will resume the stream.
  • end Emitted when there's no more data to read. This will beemitted immediately for empty streams whenend() is called.If a listener is attached, andend was already emitted, thenit will be emitted again. All listeners are removed whenendis emitted.
  • prefinish An end-ish event that follows the same logic asend and is emitted in the same conditions whereend isemitted. Emitted after'end'.
  • finish An end-ish event that follows the same logic asendand is emitted in the same conditions whereend is emitted.Emitted after'prefinish'.
  • close An indication that an underlying resource has beenreleased. Minipass does not emit this event, but will defer ituntil afterend has been emitted, since it throws off somestream libraries otherwise.
  • drain Emitted when the internal buffer empties, and it isagain suitable towrite() into the stream.
  • readable Emitted when data is buffered and ready to be readby a consumer.
  • resume Emitted when stream changes state from buffering toflowing mode. (Ie, whenresume is called,pipe is called,or adata event listener is added.)

Static Methods

  • Minipass.isStream(stream) Returnstrue if the argument is astream, and false otherwise. To be considered a stream, theobject must be either an instance of Minipass, or anEventEmitter that has either apipe() method, or bothwrite() andend() methods. (Pretty much any stream innode-land will returntrue for this.)

EXAMPLES

Here are some examples of things you can do with Minipassstreams.

simple "are you done yet" promise

mp.promise().then(()=>{// stream is finished},er=>{// stream emitted an error})

collecting

mp.collect().then(all=>{// all is an array of all the data emitted// encoding is supported in this case, so// so the result will be a collection of strings if// an encoding is specified, or buffers/objects if not.//// In an async function, you may do// const data = await stream.collect()})

collecting into a single blob

This is a bit slower because it concatenates the data into onechunk for you, but if you're going to do it yourself anyway, it'sconvenient this way:

mp.concat().then(onebigchunk=>{// onebigchunk is a string if the stream// had an encoding set, or a buffer otherwise.})

iteration

You can iterate over streams synchronously or asynchronously inplatforms that support it.

Synchronous iteration will end when the currently available datais consumed, even if theend event has not been reached. Instring and buffer mode, the data is concatenated, so unlessmultiple writes are occurring in the same tick as theread(),sync iteration loops will generally only have a single iteration.

To consume chunks in this way exactly as they have been written,with no flattening, create the stream with the{ objectMode: true } option.

constmp=newMinipass({objectMode:true})mp.write('a')mp.write('b')for(letletterofmp){console.log(letter)// a, b}mp.write('c')mp.write('d')for(letletterofmp){console.log(letter)// c, d}mp.write('e')mp.end()for(letletterofmp){console.log(letter)// e}for(letletterofmp){console.log(letter)// nothing}

Asynchronous iteration will continue until the end event is reached,consuming all of the data.

constmp=newMinipass({encoding:'utf8'})// some source of some dataleti=5constinter=setInterval(()=>{if(i-->0)mp.write(Buffer.from('foo\n','utf8'))else{mp.end()clearInterval(inter)}},100)// consume the data with asynchronous iterationasyncfunctionconsume(){forawait(letchunkofmp){console.log(chunk)}return'ok'}consume().then(res=>console.log(res))// logs `foo\n` 5 times, and then `ok`

subclass thatconsole.log()s everything written into it

classLoggerextendsMinipass{write(chunk,encoding,callback){console.log('WRITE',chunk,encoding)returnsuper.write(chunk,encoding,callback)}end(chunk,encoding,callback){console.log('END',chunk,encoding)returnsuper.end(chunk,encoding,callback)}}someSource.pipe(newLogger()).pipe(someDest)

same thing, but using an inline anonymous class

// js classes are funsomeSource.pipe(new(classextendsMinipass{emit(ev, ...data){// let's also log events, because debugging some weird thingconsole.log('EMIT',ev)returnsuper.emit(ev, ...data)}write(chunk,encoding,callback){console.log('WRITE',chunk,encoding)returnsuper.write(chunk,encoding,callback)}end(chunk,encoding,callback){console.log('END',chunk,encoding)returnsuper.end(chunk,encoding,callback)}})()).pipe(someDest)

subclass that defers 'end' for some reason

classSlowEndextendsMinipass{emit(ev, ...args){if(ev==='end'){console.log('going to end, hold on a sec')setTimeout(()=>{console.log('ok, ready to end now')super.emit('end', ...args)},100)returntrue}else{returnsuper.emit(ev, ...args)}}}

transform that creates newline-delimited JSON

classNDJSONEncodeextendsMinipass{write(obj,cb){try{// JSON.stringify can throw, emit an error on thatreturnsuper.write(JSON.stringify(obj)+'\n','utf8',cb)}catch(er){this.emit('error',er)}}end(obj,cb){if(typeofobj==='function'){cb=objobj=undefined}if(obj!==undefined){this.write(obj)}returnsuper.end(cb)}}

transform that parses newline-delimited JSON

classNDJSONDecodeextendsMinipass{constructor(options){// always be in object mode, as far as Minipass is concernedsuper({objectMode:true})this._jsonBuffer=''}write(chunk,encoding,cb){if(typeofchunk==='string'&&typeofencoding==='string'&&encoding!=='utf8'){chunk=Buffer.from(chunk,encoding).toString()}elseif(Buffer.isBuffer(chunk)){chunk=chunk.toString()}if(typeofencoding==='function'){cb=encoding}constjsonData=(this._jsonBuffer+chunk).split('\n')this._jsonBuffer=jsonData.pop()for(leti=0;i<jsonData.length;i++){try{// JSON.parse can throw, emit an error on thatsuper.write(JSON.parse(jsonData[i]))}catch(er){this.emit('error',er)continue}}if(cb)cb()}}

About

A stream implementation that does more by doing less

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp