Movatterモバイル変換


[0]ホーム

URL:


Node.js (1)

Node.js v5.12.0 Documentation

Index |View on single page |View as JSON


Table of Contents

Stream#

Stability: 2 - Stable

A stream is an abstract interface implemented by various objects inNode.js. For example arequest to an HTTP server is astream, as isprocess.stdout. Streams are readable, writable, or both. Allstreams are instances ofEventEmitter.

You can load the Stream base classes by doingrequire('stream').There are base classes provided forReadable streams,Writablestreams,Duplex streams, andTransform streams.

This document is split up into 3 sections:

  1. The first section explains the parts of the API that you need to beaware of to use streams in your programs.
  2. The second section explains the parts of the API that you need touse if you implement your own custom streams yourself. The API is designed tomake this easy for you to do.
  3. The third section goes into more depth about how streams work,including some of the internal mechanisms and functions that youshould probably not modify unless you definitely know what you aredoing.

API for Stream Consumers#

Streams can be eitherReadable,Writable, or both (Duplex).

All streams are EventEmitters, but they also have other custom methodsand properties depending on whether they are Readable, Writable, orDuplex.

If a stream is both Readable and Writable, then it implements all ofthe methods and events. So, aDuplex orTransform stream isfully described by this API, though their implementation may besomewhat different.

It is not necessary to implement Stream interfaces in order to consumestreams in your programs. If youare implementing streaminginterfaces in your own program, please also refer toAPI for Stream Implementors.

Almost all Node.js programs, no matter how simple, use Streams in someway. Here is an example of using Streams in an Node.js program:

const http = require('http');var server = http.createServer( (req, res) => {  // req is an http.IncomingMessage, which is a Readable Stream  // res is an http.ServerResponse, which is a Writable Stream  var body = '';  // we want to get the data as utf8 strings  // If you don't set an encoding, then you'll get Buffer objects  req.setEncoding('utf8');  // Readable streams emit 'data' events once a listener is added  req.on('data', (chunk) => {    body += chunk;  });  // the end event tells you that you have entire body  req.on('end', () => {    try {      var data = JSON.parse(body);    } catch (er) {      // uh oh!  bad json!      res.statusCode = 400;      return res.end(`error: ${er.message}`);    }    // write back something interesting to the user:    res.write(typeof data);    res.end();  });});server.listen(1337);// $ curl localhost:1337 -d '{}'// object// $ curl localhost:1337 -d '"foo"'// string// $ curl localhost:1337 -d 'not json'// error: Unexpected token o

Class: stream.Duplex#

Duplex streams are streams that implement both theReadable andWritable interfaces.

Examples of Duplex streams include:

Class: stream.Readable#

The Readable stream interface is the abstraction for asource ofdata that you are reading from. In other words, data comesout of aReadable stream.

A Readable stream will not start emitting data until you indicate thatyou are ready to receive it.

Readable streams have two "modes": aflowing mode and apausedmode. When in flowing mode, data is read from the underlying systemand provided to your program as fast as possible. In paused mode, youmust explicitly callstream.read() to get chunks of data out.Streams start out in paused mode.

Note: If no data event handlers are attached, and there are nostream.pipe() destinations, and the stream is switched into flowingmode, then data will be lost.

You can switch to flowing mode by doing any of the following:

You can switch back to paused mode by doing either of the following:

  • If there are no pipe destinations, by calling thestream.pause() method.
  • If there are pipe destinations, by removing any'data' eventhandlers, and removing all pipe destinations by calling thestream.unpipe() method.

Note that, for backwards compatibility reasons, removing'data'event handlers willnot automatically pause the stream. Also, ifthere are piped destinations, then callingstream.pause() willnot guarantee that the stream willremain paused once thosedestinations drain and ask for more data.

Examples of readable streams include:

Event: 'close'#

Emitted when the stream and any of its underlying resources (a filedescriptor, for example) have been closed. The event indicates thatno more events will be emitted, and no further computation will occur.

Not all streams will emit the'close' event.

Event: 'data'#

Attaching a'data' event listener to a stream that has not beenexplicitly paused will switch the stream into flowing mode. Data willthen be passed as soon as it is available.

If you just want to get all the data out of the stream as fast aspossible, this is the best way to do so.

var readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log('got %d bytes of data', chunk.length);});

Event: 'end'#

This event fires when there will be no more data to read.

Note that the'end' eventwill not fire unless the data iscompletely consumed. This can be done by switching into flowing mode,or by callingstream.read() repeatedly until you get to theend.

var readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log('got %d bytes of data', chunk.length);});readable.on('end', () => {  console.log('there will be no more data.');});

Event: 'error'#

Emitted if there was an error receiving data.

Event: 'readable'#

When a chunk of data can be read from the stream, it will emit a'readable' event.

In some cases, listening for a'readable' event will cause some datato be read into the internal buffer from the underlying system, if ithadn't already.

var readable = getReadableStreamSomehow();readable.on('readable', () => {  // there is some data to read now});

Once the internal buffer is drained, a'readable' event will fireagain when more data is available.

The'readable' event is not emitted in the "flowing" mode with thesole exception of the last one, on end-of-stream.

The'readable' event indicates that the stream has new information:either new data is available or the end of the stream has been reached.In the former case,stream.read() will return that data. In thelatter case,stream.read() will return null. For instance, inthe following example,foo.txt is an empty file:

const fs = require('fs');var rr = fs.createReadStream('foo.txt');rr.on('readable', () => {  console.log('readable:', rr.read());});rr.on('end', () => {  console.log('end');});

The output of running this script is:

$ node test.jsreadable: nullend

readable.isPaused()#

This method returns whether or not thereadable has beenexplicitlypaused by client code (usingstream.pause() without acorrespondingstream.resume()).

var readable = new stream.Readablereadable.isPaused() // === falsereadable.pause()readable.isPaused() // === truereadable.resume()readable.isPaused() // === false

readable.pause()#

  • Return:this

This method will cause a stream in flowing mode to stop emitting'data' events, switching out of flowing mode. Any data that becomesavailable will remain in the internal buffer.

var readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log('got %d bytes of data', chunk.length);  readable.pause();  console.log('there will be no more data for 1 second');  setTimeout(() => {    console.log('now data will start flowing again');    readable.resume();  }, 1000);});

readable.pipe(destination[, options])#

This method pulls all the data out of a readable stream, and writes itto the supplied destination, automatically managing the flow so thatthe destination is not overwhelmed by a fast readable stream.

Multiple destinations can be piped to safely.

var readable = getReadableStreamSomehow();var writable = fs.createWriteStream('file.txt');// All the data from readable goes into 'file.txt'readable.pipe(writable);

This function returns the destination stream, so you can set up pipechains like so:

var r = fs.createReadStream('file.txt');var z = zlib.createGzip();var w = fs.createWriteStream('file.txt.gz');r.pipe(z).pipe(w);

For example, emulating the Unixcat command:

process.stdin.pipe(process.stdout);

By defaultstream.end() is called on the destination when thesource stream emits'end', so thatdestination is no longer writable.Pass{ end: false } asoptions to keep the destination stream open.

This keepswriter open so that "Goodbye" can be written at theend.

reader.pipe(writer, { end: false });reader.on('end', () => {  writer.end('Goodbye\n');});

Note thatprocess.stderr andprocess.stdout are never closed untilthe process exits, regardless of the specified options.

readable.read([size])#

Theread() method pulls some data out of the internal buffer andreturns it. If there is no data available, then it will returnnull.

If you pass in asize argument, then it will return that manybytes. Ifsize bytes are not available, then it will returnnull,unless we've ended, in which case it will return the data remainingin the buffer.

If you do not specify asize argument, then it will return all thedata in the internal buffer.

This method should only be called in paused mode. In flowing mode,this method is called automatically until the internal buffer isdrained.

var readable = getReadableStreamSomehow();readable.on('readable', () => {  var chunk;  while (null !== (chunk = readable.read())) {    console.log('got %d bytes of data', chunk.length);  }});

If this method returns a data chunk, then it will also trigger theemission of a'data' event.

Note that callingstream.read([size]) after the'end'event has been triggered will returnnull. No runtime error will be raised.

readable.resume()#

  • Return:this

This method will cause the readable stream to resume emitting'data'events.

This method will switch the stream into flowing mode. If you donotwant to consume the data from a stream, but youdo want to get toits'end' event, you can callstream.resume() to openthe flow of data.

var readable = getReadableStreamSomehow();readable.resume();readable.on('end', () => {  console.log('got to the end, but did not read anything');});

readable.setEncoding(encoding)#

  • encoding<String> The encoding to use.
  • Return:this

Call this function to cause the stream to return strings of the specifiedencoding instead of Buffer objects. For example, if you doreadable.setEncoding('utf8'), then the output data will be interpreted asUTF-8 data, and returned as strings. If you doreadable.setEncoding('hex'),then the data will be encoded in hexadecimal string format.

This properly handles multi-byte characters that would otherwise bepotentially mangled if you simply pulled the Buffers directly andcalledbuf.toString(encoding) on them. If you want to read the dataas strings, always use this method.

Also you can disable any encoding at all withreadable.setEncoding(null).This approach is very useful if you deal with binary data or with largemulti-byte strings spread out over multiple chunks.

var readable = getReadableStreamSomehow();readable.setEncoding('utf8');readable.on('data', (chunk) => {  assert.equal(typeof chunk, 'string');  console.log('got %d characters of string data', chunk.length);});

readable.unpipe([destination])#

This method will remove the hooks set up for a previousstream.pipe()call.

If the destination is not specified, then all pipes are removed.

If the destination is specified, but no pipe is set up for it, thenthis is a no-op.

var readable = getReadableStreamSomehow();var writable = fs.createWriteStream('file.txt');// All the data from readable goes into 'file.txt',// but only for the first secondreadable.pipe(writable);setTimeout(() => {  console.log('stop writing to file.txt');  readable.unpipe(writable);  console.log('manually close the file stream');  writable.end();}, 1000);

readable.unshift(chunk)#

This is useful in certain cases where a stream is being consumed by aparser, which needs to "un-consume" some data that it hasoptimistically pulled out of the source, so that the stream can bepassed on to some other party.

Note thatstream.unshift(chunk) cannot be called after the'end' eventhas been triggered; a runtime error will be raised.

If you find that you must often callstream.unshift(chunk) in yourprograms, consider implementing aTransform stream instead. (SeeAPIfor Stream Implementors.)

// Pull off a header delimited by \n\n// use unshift() if we get too much// Call the callback with (error, header, stream)const StringDecoder = require('string_decoder').StringDecoder;function parseHeader(stream, callback) {  stream.on('error', callback);  stream.on('readable', onReadable);  var decoder = new StringDecoder('utf8');  var header = '';  function onReadable() {    var chunk;    while (null !== (chunk = stream.read())) {      var str = decoder.write(chunk);      if (str.match(/\n\n/)) {        // found the header boundary        var split = str.split(/\n\n/);        header += split.shift();        var remaining = split.join('\n\n');        var buf = new Buffer(remaining, 'utf8');        if (buf.length)          stream.unshift(buf);        stream.removeListener('error', callback);        stream.removeListener('readable', onReadable);        // now the body of the message can be read from the stream.        callback(null, header, stream);      } else {        // still reading the header.        header += str;      }    }  }}

Note that, unlikestream.push(chunk),stream.unshift(chunk)will not end the reading process by resetting the internal reading state of thestream. This can cause unexpected results ifunshift() is called during aread (i.e. from within astream._read() implementation on acustom stream). Following the call tounshift() with an immediatestream.push('') will reset the reading state appropriately,however it is best to simply avoid callingunshift() while in the process ofperforming a read.

readable.wrap(stream)#

  • stream<Stream> An "old style" readable stream

Versions of Node.js prior to v0.10 had streams that did not implement theentire Streams API as it is today. (SeeCompatibility formore information.)

If you are using an older Node.js library that emits'data' events andhas astream.pause() method that is advisory only, then youcan use thewrap() method to create aReadable stream that uses the oldstream as its data source.

You will very rarely ever need to call this function, but it existsas a convenience for interacting with old Node.js programs and libraries.

For example:

const OldReader = require('./old-api-module.js').OldReader;const Readable = require('stream').Readable;const oreader = new OldReader;const myReader = new Readable().wrap(oreader);myReader.on('readable', () => {  myReader.read(); // etc.});

Class: stream.Transform#

Transform streams areDuplex streams where the output is in some waycomputed from the input. They implement both theReadable andWritable interfaces.

Examples of Transform streams include:

Class: stream.Writable#

The Writable stream interface is an abstraction for adestinationthat you are writing datato.

Examples of writable streams include:

Event: 'drain'#

If astream.write(chunk) call returnsfalse, then the'drain' event will indicate when it is appropriate to begin writing more datato the stream.

// Write the data to the supplied writable stream one million times.// Be attentive to back-pressure.function writeOneMillionTimes(writer, data, encoding, callback) {  var i = 1000000;  write();  function write() {    var ok = true;    do {      i -= 1;      if (i === 0) {        // last time!        writer.write(data, encoding, callback);      } else {        // see if we should continue, or wait        // don't pass the callback, because we're not done yet.        ok = writer.write(data, encoding);      }    } while (i > 0 && ok);    if (i > 0) {      // had to stop early!      // write some more once it drains      writer.once('drain', write);    }  }}

Event: 'error'#

Emitted if there was an error when writing or piping data.

Event: 'finish'#

When thestream.end() method has been called, and all data hasbeen flushed to the underlying system, this event is emitted.

var writer = getWritableStreamSomehow();for (var i = 0; i < 100; i ++) {  writer.write('hello, #${i}!\n');}writer.end('this is the end\n');writer.on('finish', () => {  console.error('all writes are now complete.');});

Event: 'pipe'#

This is emitted whenever thestream.pipe() method is called on a readablestream, adding this writable to its set of destinations.

var writer = getWritableStreamSomehow();var reader = getReadableStreamSomehow();writer.on('pipe', (src) => {  console.error('something is piping into the writer');  assert.equal(src, reader);});reader.pipe(writer);

Event: 'unpipe'#

This is emitted whenever thestream.unpipe() method is called on areadable stream, removing this writable from its set of destinations.

var writer = getWritableStreamSomehow();var reader = getReadableStreamSomehow();writer.on('unpipe', (src) => {  console.error('something has stopped piping into the writer');  assert.equal(src, reader);});reader.pipe(writer);reader.unpipe(writer);

writable.cork()#

Forces buffering of all writes.

Buffered data will be flushed either atstream.uncork() or atstream.end() call.

writable.end([chunk][, encoding][, callback])#

Call this method when no more data will be written to the stream. If supplied,the callback is attached as a listener on the'finish' event.

Callingstream.write() after callingstream.end() will raise an error.

// write 'hello, ' and then end with 'world!'var file = fs.createWriteStream('example.txt');file.write('hello, ');file.end('world!');// writing more now is not allowed!

writable.setDefaultEncoding(encoding)#

  • encoding<String> The new default encoding

Sets the default encoding for a writable stream.

writable.uncork()#

Flush all data, buffered sincestream.cork() call.

writable.write(chunk[, encoding][, callback])#

This method writes some data to the underlying system, and calls thesupplied callback once the data has been fully handled. If an erroroccurs, the callback may or may not be called with the error as itsfirst argument. To detect write errors, listen for the'error' event.

The return value indicates if you should continue writing right now.If the data had to be buffered internally, then it will returnfalse. Otherwise, it will returntrue.

This return value is strictly advisory. You MAY continue to write,even if it returnsfalse. However, writes will be buffered inmemory, so it is best not to do this excessively. Instead, wait forthe'drain' event before writing more data.

API for Stream Implementors#

To implement any sort of stream, the pattern is the same:

  1. Extend the appropriate parent class in your own subclass. (Theutil.inherits() method is particularly helpful for this.)
  2. Call the appropriate parent class constructor in your constructor,to be sure that the internal mechanisms are set up properly.
  3. Implement one or more specific methods, as detailed below.

The class to extend and the method(s) to implement depend on the sortof stream class you are writing:

Use-case

Class

Method(s) to implement

Reading only

Readable

_read

Writing only

Writable

_write,_writev

Reading and writing

Duplex

_read,_write,_writev

Operate on written data, then read the result

Transform

_transform,_flush

In your implementation code, it is very important to never call the methodsdescribed inAPI for Stream Consumers. Otherwise, you can potentially causeadverse side effects in programs that consume your streaming interfaces.

Class: stream.Duplex#

A "duplex" stream is one that is both Readable and Writable, such as a TCPsocket connection.

Note thatstream.Duplex is an abstract class designed to be extendedwith an underlying implementation of thestream._read(size)andstream._write(chunk, encoding, callback) methods as youwould with a Readable or Writable stream class.

Since JavaScript doesn't have multiple prototypal inheritance, this classprototypally inherits from Readable, and then parasitically from Writable. It isthus up to the user to implement both the low-levelstream._read(n) method as well as the low-levelstream._write(chunk, encoding, callback) method on extensionduplex classes.

new stream.Duplex(options)#

  • options<Object> Passed to both Writable and Readableconstructors. Also has the following fields:
    • allowHalfOpen<Boolean> Default =true. If set tofalse, thenthe stream will automatically end the readable side when thewritable side ends and vice versa.
    • readableObjectMode<Boolean> Default =false. SetsobjectModefor readable side of the stream. Has no effect ifobjectModeistrue.
    • writableObjectMode<Boolean> Default =false. SetsobjectModefor writable side of the stream. Has no effect ifobjectModeistrue.

In classes that extend the Duplex class, make sure to call theconstructor so that the buffering settings can be properlyinitialized.

Class: stream.PassThrough#

This is a trivial implementation of aTransform stream that simplypasses the input bytes across to the output. Its purpose is mainlyfor examples and testing, but there are occasionally use cases whereit can come in handy as a building block for novel sorts of streams.

Class: stream.Readable#

stream.Readable is an abstract class designed to be extended with anunderlying implementation of thestream._read(size) method.

Please seeAPI for Stream Consumers for how to consumestreams in your programs. What follows is an explanation of how toimplement Readable streams in your programs.

new stream.Readable([options])#

  • options<Object>
    • highWaterMark<Number> The maximum number of bytes to store inthe internal buffer before ceasing to read from the underlyingresource. Default =16384 (16kb), or16 forobjectMode streams
    • encoding<String> If specified, then buffers will be decoded tostrings using the specified encoding. Default =null
    • objectMode<Boolean> Whether this stream should behaveas a stream of objects. Meaning thatstream.read(n) returnsa single value instead of a Buffer of size n. Default =false
    • read<Function> Implementation for thestream._read()method.

In classes that extend the Readable class, make sure to call theReadable constructor so that the buffering settings can be properlyinitialized.

readable._read(size)#

  • size<Number> Number of bytes to read asynchronously

Note:Implement this method, but do NOT call it directly.

This method is prefixed with an underscore because it is internal to theclass that defines it and should only be called by the internal Readableclass methods. All Readable stream implementations must provide a _readmethod to fetch data from the underlying resource.

When_read() is called, if data is available from the resource, the_read()implementation should start pushing that data into the read queue by callingthis.push(dataChunk)._read() should continue reading fromthe resource and pushing data until push returnsfalse, at which point itshould stop reading from the resource. Only when_read() is called again afterit has stopped should it start reading more data from the resource and pushingthat data onto the queue.

Note: once the_read() method is called, it will not be called again untilthestream.push() method is called.

Thesize argument is advisory. Implementations where a "read" is asingle call that returns data can use this to know how much data tofetch. Implementations where that is not relevant, such as TCP orTLS, may ignore this argument, and simply provide data whenever itbecomes available. There is no need, for example to "wait" untilsize bytes are available before callingstream.push(chunk).

readable.push(chunk[, encoding])#

  • chunk<Buffer> |<Null> |<String> Chunk of data to push into the read queue
  • encoding<String> Encoding of String chunks. Must be a validBuffer encoding, such as'utf8' or'ascii'
  • return<Boolean> Whether or not more pushes should be performed

Note:This method should be called by Readable implementors, NOTby consumers of Readable streams.

If a value other than null is passed, Thepush() method adds a chunk of datainto the queue for subsequent stream processors to consume. Ifnull ispassed, it signals the end of the stream (EOF), after which no more datacan be written.

The data added withpush() can be pulled out by calling thestream.read() method when the'readable' event fires.

This API is designed to be as flexible as possible. For example,you may be wrapping a lower-level source which has some sort ofpause/resume mechanism, and a data callback. In those cases, youcould wrap the low-level source object by doing something like this:

// source is an object with readStop() and readStart() methods,// and an `ondata` member that gets called when it has data, and// an `onend` member that gets called when the data is over.util.inherits(SourceWrapper, Readable);function SourceWrapper(options) {  Readable.call(this, options);  this._source = getLowlevelSourceObject();  // Every time there's data, we push it into the internal buffer.  this._source.ondata = (chunk) => {    // if push() returns false, then we need to stop reading from source    if (!this.push(chunk))      this._source.readStop();  };  // When the source ends, we push the EOF-signaling `null` chunk  this._source.onend = () => {    this.push(null);  };}// _read will be called when the stream wants to pull more data in// the advisory size argument is ignored in this case.SourceWrapper.prototype._read = function(size) {  this._source.readStart();};

Example: A Counting Stream#

This is a basic example of a Readable stream. It emits the numeralsfrom 1 to 1,000,000 in ascending order, and then ends.

const Readable = require('stream').Readable;const util = require('util');util.inherits(Counter, Readable);function Counter(opt) {  Readable.call(this, opt);  this._max = 1000000;  this._index = 1;}Counter.prototype._read = function() {  var i = this._index++;  if (i > this._max)    this.push(null);  else {    var str = '' + i;    var buf = new Buffer(str, 'ascii');    this.push(buf);  }};

Example: SimpleProtocol v1 (Sub-optimal)#

This is similar to theparseHeader function describedhere, but implemented as a custom stream.Also, note that this implementation does not convert the incoming data to astring.

However, this would be better implemented as aTransform stream. SeeSimpleProtocol v2 for a better implementation.

// A parser for a simple data protocol.// The "header" is a JSON object, followed by 2 \n characters, and// then a message body.//// NOTE: This can be done more simply as a Transform stream!// Using Readable directly for this is sub-optimal. See the// alternative example below under the Transform section.const Readable = require('stream').Readable;const util = require('util');util.inherits(SimpleProtocol, Readable);function SimpleProtocol(source, options) {  if (!(this instanceof SimpleProtocol))    return new SimpleProtocol(source, options);  Readable.call(this, options);  this._inBody = false;  this._sawFirstCr = false;  // source is a readable stream, such as a socket or file  this._source = source;  source.on('end', () => {    this.push(null);  });  // give it a kick whenever the source is readable  // read(0) will not consume any bytes  source.on('readable', () => {    this.read(0);  });  this._rawHeader = [];  this.header = null;}SimpleProtocol.prototype._read = function(n) {  if (!this._inBody) {    var chunk = this._source.read();    // if the source doesn't have data, we don't have data yet.    if (chunk === null)      return this.push('');    // check if the chunk has a \n\n    var split = -1;    for (var i = 0; i < chunk.length; i++) {      if (chunk[i] === 10) { // '\n'        if (this._sawFirstCr) {          split = i;          break;        } else {          this._sawFirstCr = true;        }      } else {        this._sawFirstCr = false;      }    }    if (split === -1) {      // still waiting for the \n\n      // stash the chunk, and try again.      this._rawHeader.push(chunk);      this.push('');    } else {      this._inBody = true;      var h = chunk.slice(0, split);      this._rawHeader.push(h);      var header = Buffer.concat(this._rawHeader).toString();      try {        this.header = JSON.parse(header);      } catch (er) {        this.emit('error', new Error('invalid simple protocol data'));        return;      }      // now, because we got some extra data, unshift the rest      // back into the read queue so that our consumer will see it.      var b = chunk.slice(split);      this.unshift(b);      // calling unshift by itself does not reset the reading state      // of the stream; since we're inside _read, doing an additional      // push('') will reset the state appropriately.      this.push('');      // and let them know that we are done parsing the header.      this.emit('header', this.header);    }  } else {    // from there on, just provide the data to our consumer.    // careful not to push(null), since that would indicate EOF.    var chunk = this._source.read();    if (chunk) this.push(chunk);  }};// Usage:// var parser = new SimpleProtocol(source);// Now parser is a readable stream that will emit 'header'// with the parsed header data.

Class: stream.Transform#

A "transform" stream is a duplex stream where the output is causallyconnected in some way to the input, such as azlib stream or acrypto stream.

There is no requirement that the output be the same size as the input,the same number of chunks, or arrive at the same time. For example, aHash stream will only ever have a single chunk of output which isprovided when the input is ended. A zlib stream will produce outputthat is either much smaller or much larger than its input.

Rather than implement thestream._read() andstream._write() methods, Transform classes must implement thestream._transform() method, and may optionallyalso implement thestream._flush() method. (See below.)

new stream.Transform([options])#

In classes that extend the Transform class, make sure to call theconstructor so that the buffering settings can be properlyinitialized.

Events: 'finish' and 'end'#

The'finish' and'end' events are from the parent Writableand Readable classes respectively. The'finish' event is fired afterstream.end() is called and all chunks have been processed bystream._transform(),'end' is fired after all data hasbeen output which is after the callback instream._flush()has been called.

transform._flush(callback)#

  • callback<Function> Call this function (optionally with an errorargument) when you are done flushing any remaining data.

Note:This function MUST NOT be called directly. It MAY be implementedby child classes, and if so, will be called by the internal Transformclass methods only.

In some cases, your transform operation may need to emit a bit moredata at the end of the stream. For example, aZlib compressionstream will store up some internal state so that it can optimallycompress the output. At the end, however, it needs to do the best itcan with what is left, so that the data will be complete.

In those cases, you can implement a_flush() method, which will becalled at the very end, after all the written data is consumed, butbefore emitting'end' to signal the end of the readable side. Justlike withstream._transform(), calltransform.push(chunk) zero or more times, as appropriate, and callcallbackwhen the flush operation is complete.

This method is prefixed with an underscore because it is internal tothe class that defines it, and should not be called directly by userprograms. However, youare expected to override this method inyour own extension classes.

transform._transform(chunk, encoding, callback)#

  • chunk<Buffer> |<String> The chunk to be transformed. Willalwaysbe a buffer unless thedecodeStrings option was set tofalse.
  • encoding<String> If the chunk is a string, then this is theencoding type. If chunk is a buffer, then this is the specialvalue - 'buffer', ignore it in this case.
  • callback<Function> Call this function (optionally with an errorargument and data) when you are done processing the supplied chunk.

Note:This function MUST NOT be called directly. It should beimplemented by child classes, and called by the internal Transformclass methods only.

All Transform stream implementations must provide a_transform()method to accept input and produce output.

_transform() should do whatever has to be done in this specificTransform class, to handle the bytes being written, and pass them offto the readable portion of the interface. Do asynchronous I/O,process things, and so on.

Calltransform.push(outputChunk) 0 or more times to generate outputfrom this input chunk, depending on how much data you want to outputas a result of this chunk.

Call the callback function only when the current chunk is completelyconsumed. Note that there may or may not be output as a result of anyparticular input chunk. If you supply a second argument to the callbackit will be passed to the push method. In other words the following areequivalent:

transform.prototype._transform = function (data, encoding, callback) {  this.push(data);  callback();};transform.prototype._transform = function (data, encoding, callback) {  callback(null, data);};

This method is prefixed with an underscore because it is internal tothe class that defines it, and should not be called directly by userprograms. However, youare expected to override this method inyour own extension classes.

Example:SimpleProtocol parser v2#

The examplehere of a simpleprotocol parser can be implemented simply by using the higher levelTransform stream class, similar to theparseHeader andSimpleProtocolv1 examples.

In this example, rather than providing the input as an argument, itwould be piped into the parser, which is a more idiomatic Node.js streamapproach.

const util = require('util');const Transform = require('stream').Transform;util.inherits(SimpleProtocol, Transform);function SimpleProtocol(options) {  if (!(this instanceof SimpleProtocol))    return new SimpleProtocol(options);  Transform.call(this, options);  this._inBody = false;  this._sawFirstCr = false;  this._rawHeader = [];  this.header = null;}SimpleProtocol.prototype._transform = function(chunk, encoding, done) {  if (!this._inBody) {    // check if the chunk has a \n\n    var split = -1;    for (var i = 0; i < chunk.length; i++) {      if (chunk[i] === 10) { // '\n'        if (this._sawFirstCr) {          split = i;          break;        } else {          this._sawFirstCr = true;        }      } else {        this._sawFirstCr = false;      }    }    if (split === -1) {      // still waiting for the \n\n      // stash the chunk, and try again.      this._rawHeader.push(chunk);    } else {      this._inBody = true;      var h = chunk.slice(0, split);      this._rawHeader.push(h);      var header = Buffer.concat(this._rawHeader).toString();      try {        this.header = JSON.parse(header);      } catch (er) {        this.emit('error', new Error('invalid simple protocol data'));        return;      }      // and let them know that we are done parsing the header.      this.emit('header', this.header);      // now, because we got some extra data, emit this first.      this.push(chunk.slice(split));    }  } else {    // from there on, just provide the data to our consumer as-is.    this.push(chunk);  }  done();};// Usage:// var parser = new SimpleProtocol();// source.pipe(parser)// Now parser is a readable stream that will emit 'header'// with the parsed header data.

Class: stream.Writable#

stream.Writable is an abstract class designed to be extended with anunderlying implementation of thestream._write(chunk, encoding, callback) method.

Please seeAPI for Stream Consumers for how to consumewritable streams in your programs. What follows is an explanation ofhow to implement Writable streams in your programs.

new stream.Writable([options])#

In classes that extend the Writable class, make sure to call theconstructor so that the buffering settings can be properlyinitialized.

writable._write(chunk, encoding, callback)#

  • chunk<Buffer> |<String> The chunk to be written. Willalwaysbe a buffer unless thedecodeStrings option was set tofalse.
  • encoding<String> If the chunk is a string, then this is theencoding type. If chunk is a buffer, then this is the specialvalue - 'buffer', ignore it in this case.
  • callback<Function> Call this function (optionally with an errorargument) when you are done processing the supplied chunk.

All Writable stream implementations must provide astream._write() method to send data to the underlyingresource.

Note:This function MUST NOT be called directly. It should beimplemented by child classes, and called by the internal Writableclass methods only.

Call the callback using the standardcallback(error) pattern tosignal that the write completed successfully or with an error.

If thedecodeStrings flag is set in the constructor options, thenchunk may be a string rather than a Buffer, andencoding willindicate the sort of string that it is. This is to supportimplementations that have an optimized handling for certain stringdata encodings. If you do not explicitly set thedecodeStringsoption tofalse, then you can safely ignore theencoding argument,and assume thatchunk will always be a Buffer.

This method is prefixed with an underscore because it is internal tothe class that defines it, and should not be called directly by userprograms. However, youare expected to override this method inyour own extension classes.

writable._writev(chunks, callback)#

  • chunks<Array> The chunks to be written. Each chunk has followingformat:{ chunk: ..., encoding: ... }.
  • callback<Function> Call this function (optionally with an errorargument) when you are done processing the supplied chunks.

Note:This function MUST NOT be called directly. It may beimplemented by child classes, and called by the internal Writableclass methods only.

This function is completely optional to implement. In most cases it isunnecessary. If implemented, it will be called with all the chunksthat are buffered in the write queue.

Simplified Constructor API#

In simple cases there is now the added benefit of being able to construct astream without inheritance.

This can be done by passing the appropriate methods as constructor options:

Examples:

Duplex#

var duplex = new stream.Duplex({  read: function(n) {    // sets this._read under the hood    // push data onto the read queue, passing null    // will signal the end of the stream (EOF)    this.push(chunk);  },  write: function(chunk, encoding, next) {    // sets this._write under the hood    // An optional error can be passed as the first argument    next()  }});// orvar duplex = new stream.Duplex({  read: function(n) {    // sets this._read under the hood    // push data onto the read queue, passing null    // will signal the end of the stream (EOF)    this.push(chunk);  },  writev: function(chunks, next) {    // sets this._writev under the hood    // An optional error can be passed as the first argument    next()  }});

Readable#

var readable = new stream.Readable({  read: function(n) {    // sets this._read under the hood    // push data onto the read queue, passing null    // will signal the end of the stream (EOF)    this.push(chunk);  }});

Transform#

var transform = new stream.Transform({  transform: function(chunk, encoding, next) {    // sets this._transform under the hood    // generate output as many times as needed    // this.push(chunk);    // call when the current chunk is consumed    next();  },  flush: function(done) {    // sets this._flush under the hood    // generate output as many times as needed    // this.push(chunk);    done();  }});

Writable#

var writable = new stream.Writable({  write: function(chunk, encoding, next) {    // sets this._write under the hood    // An optional error can be passed as the first argument    next()  }});// orvar writable = new stream.Writable({  writev: function(chunks, next) {    // sets this._writev under the hood    // An optional error can be passed as the first argument    next()  }});

Streams: Under the Hood#

Buffering#

Both Writable and Readable streams will buffer data on an internalobject which can be retrieved from_writableState.getBuffer() or_readableState.buffer, respectively.

The amount of data that will potentially be buffered depends on thehighWaterMark option which is passed into the constructor.

Buffering in Readable streams happens when the implementation callsstream.push(chunk). If the consumer of the Stream does notcallstream.read(), then the data will sit in the internalqueue until it is consumed.

Buffering in Writable streams happens when the user callsstream.write(chunk) repeatedly, even when it returnsfalse.

The purpose of streams, especially with thestream.pipe() method, is tolimit the buffering of data to acceptable levels, so that sources anddestinations of varying speed will not overwhelm the available memory.

Compatibility with Older Node.js Versions#

In versions of Node.js prior to v0.10, the Readable stream interface wassimpler, but also less powerful and less useful.

  • Rather than waiting for you to call thestream.read() method,'data' events would start emitting immediately. If you needed to dosome I/O to decide how to handle data, then you had to store the chunksin some kind of buffer so that they would not be lost.
  • Thestream.pause() method was advisory, rather thanguaranteed. This meant that you still had to be prepared to receive'data' events even when the stream was in a paused state.

In Node.js v0.10, theReadable class was added.For backwards compatibility with older Node.js programs, Readable streamsswitch into "flowing mode" when a'data' event handler is added, orwhen thestream.resume() method is called. The effect isthat, even if you are not using the newstream.read() methodand'readable' event, you no longer have to worry about losing'data' chunks.

Most programs will continue to function normally. However, thisintroduces an edge case in the following conditions:

  • No'data' event handler is added.
  • Thestream.resume() method is never called.
  • The stream is not piped to any writable destination.

For example, consider the following code:

// WARNING!  BROKEN!net.createServer((socket) => {  // we add an 'end' method, but never consume the data  socket.on('end', () => {    // It will never get here.    socket.end('I got your message (but didnt read it)\n');  });}).listen(1337);

In versions of Node.js prior to v0.10, the incoming message data would besimply discarded. However, in Node.js v0.10 and beyond,the socket will remain paused forever.

The workaround in this situation is to call thestream.resume() method to start the flow of data:

// Workaroundnet.createServer((socket) => {  socket.on('end', () => {    socket.end('I got your message (but didnt read it)\n');  });  // start the flow of data, discarding it.  socket.resume();}).listen(1337);

In addition to new Readable streams switching into flowing mode,pre-v0.10 style streams can be wrapped in a Readable class using thestream.wrap() method.

Object Mode#

Normally, Streams operate on Strings and Buffers exclusively.

Streams that are inobject mode can emit generic JavaScript valuesother than Buffers and Strings.

A Readable stream in object mode will always return a single item froma call tostream.read(size), regardless of what the sizeargument is.

A Writable stream in object mode will always ignore theencodingargument tostream.write(data, encoding).

The special valuenull still retains its special value for objectmode streams. That is, for object mode readable streams,null as areturn value fromstream.read() indicates that there is no moredata, andstream.push(null) will signal the end of stream data(EOF).

No streams in Node.js core are object mode streams. This pattern is onlyused by userland streaming libraries.

You should setobjectMode in your stream child class constructor onthe options object. SettingobjectMode mid-stream is not safe.

For Duplex streamsobjectMode can be set exclusively for readable orwritable side withreadableObjectMode andwritableObjectModerespectively. These options can be used to implement parsers andserializers with Transform streams.

const util = require('util');const StringDecoder = require('string_decoder').StringDecoder;const Transform = require('stream').Transform;util.inherits(JSONParseStream, Transform);// Gets \n-delimited JSON string data, and emits the parsed objectsfunction JSONParseStream() {  if (!(this instanceof JSONParseStream))    return new JSONParseStream();  Transform.call(this, { readableObjectMode : true });  this._buffer = '';  this._decoder = new StringDecoder('utf8');}JSONParseStream.prototype._transform = function(chunk, encoding, cb) {  this._buffer += this._decoder.write(chunk);  // split on newlines  var lines = this._buffer.split(/\r?\n/);  // keep the last partial line buffered  this._buffer = lines.pop();  for (var l = 0; l < lines.length; l++) {    var line = lines[l];    try {      var obj = JSON.parse(line);    } catch (er) {      this.emit('error', er);      return;    }    // push the parsed object out to the readable consumer    this.push(obj);  }  cb();};JSONParseStream.prototype._flush = function(cb) {  // Just handle any leftover  var rem = this._buffer.trim();  if (rem) {    try {      var obj = JSON.parse(rem);    } catch (er) {      this.emit('error', er);      return;    }    // push the parsed object out to the readable consumer    this.push(obj);  }  cb();};

stream.read(0)#

There are some cases where you want to trigger a refresh of theunderlying readable stream mechanisms, without actually consuming anydata. In that case, you can callstream.read(0), which will alwaysreturn null.

If the internal read buffer is below thehighWaterMark, and thestream is not currently reading, then callingstream.read(0) will triggera low-levelstream._read() call.

There is almost never a need to do this. However, you will see somecases in Node.js's internals where this is done, particularly in theReadable stream class internals.

stream.push('')#

Pushing a zero-byte string or Buffer (when not inObject mode) has aninteresting side effect. Because itis a call tostream.push(), it will end thereading process. However, itdoesnot add any data to the readable buffer, so there's nothing fora user to consume.

Very rarely, there are cases where you have no data to provide now,but the consumer of your stream (or, perhaps, another bit of your owncode) will know when to check again, by callingstream.read(0).In those cases, youmay callstream.push('').

So far, the only use case for this functionality is in thetls.CryptoStream class, which is deprecated in Node.js/io.js v1.0. If youfind that you have to usestream.push(''), please consider anotherapproach, because it almost certainly indicates that something ishorribly wrong.


[8]ページ先頭

©2009-2025 Movatter.jp