この広告は、90日以上更新していないブログに表示しています。
今日は、フロントのプログラミングスタイルに、にまた一つ大きな変化をもたらすであろう Stream というAPI についてです。
この仕様は現時点でまだ策定中であるため、API は変更される恐れがある点にご注意ください。
以前 「Node.js の Stream API で「データの流れ」を扱う方法」 という記事を書きましたが、簡単に言うとあれがブラウザにもやってくるという話です。
もう何度も書いた話なので駆け足で。
JS はシングルスレッドでイベント駆動な世界なので、何をするにも非同期であり、コールバックを登録することで完了した結果を受け取るAPI が基本です。これは、ブラウザの DOM のAPI でも、 Node.js でも共通しています。
概念を疑似コードで書くと以下のような感じです。
console.log('1');file.open('path', (err, data) =>{// 非同期なデータの読み出し終わってから実行される console.log(data);});console.log('2');// 1 -> data -> 2
実行順序についてはもういいでしょう。
そして、最近ではこれを抽象化(あるいは部品化)する仕組みとして、 Promise の導入が進んでいます。ざっくり言うとこんな感じ。
var p = file.open('path');p.then((data) =>{ console.log(data);}).catch((err) =>{ console.error(err);});
非同期に取得/生成される結果自体をオブジェクトにし、コールバックの適応方法を切り離したことにより、例えば部品化やエラー処理の集約ができるようになりました。API が Promise を返さないものは、自分で Promsie オブジェクトにくるんで、同様のインタフェースに寄せることができます。
統一したインタフェース(thenable)に則っていることにより、他の部品との組み合わせも以下のようにできます。
promise .then(b) .then(c) .catch(console.error.bind(console));しかし、どちらもコールバックは、処理の完了後に一回実行される実装で考えるのが普通です。つまり上の例はいずれも、「ファイルを読み終わったら、その結果をまとめ、一回だけコールバックを実行する」となります。
上記の例は、一度だけその時のファイル内容をまるっと表示しますが、大きなファイルを読む場合は、読み出せたところから表示できる方が適した場面があります。
そうした連続したイベントを扱うのが StreamAPI です。
Node では Stream が以前からあり、今 v1, v2, v3 と来てちょっと移行段階なので、これもざっくり概念疑似コードで書くと以下のようなイメージ。
filestream = file.createReadStream(path);filestream.on('data', (data) =>{ console.log(data);}).on('error', (err) =>{ console.error(err);});
例えば Node.js では、 Stream の実装が EventEmitter になっており、ファイルを開くとその中身を Chunk ごとに読み出して、その結果を引数にして逐一data というイベントを発火します。
これにより、 Promise の例とは違い大きなファイルも読み出した端から表示されます。
すばらしい点は、 Stream も他の Stream と組み合わせられるところです。Node.js ではpipe() というメソッドに stream を渡すと、それらを連結されることができます。これはUnix のパイプ (|) と同じです。
Node.js の Stream には 4 つの種類があります。
Transfer, Duplex は Readable と Writable の組み合わせでもあります。
こんな感じ。
// readable.pipe(transform).pipe(writable);getStream.pipe(transferStream).pipe(fileWritableStream);データがイベントループの導きによって、流れるように処理されて行く様が見て取れますね。
「Stream を制す者は Node.js を制す」
は決して大げさではないのです。
せっかくサーバサイドで流れるように処理されたデータを、せっかく WebSocket のようなステートフルな接続で送っても、送った先がコールバックや Promise の世界に戻ってしまうと、片手落ちです。
「フロントでも Stream を使いたい。」
こうして人類はブラウザでも動く Stream の実装を吐いて捨てるほど作ってきました。ちなみに俺は「全く同じものを!」ということで、 Node.js の Stream をコツコツ移植しています。(その副作用で移植したAssert の方が使われてるというのは、また別のお話)
そして、同様に「この仕組みを標準にしよう」という話が進んでおり、もうすぐフロントも Stream ベースな時代が来る!というのが今日のお話。
ちなみにこままで駆け足で説明してきたことを、 Node.js のコミッタが熱く語る話はこちら。mozaic.fm #10 node.js sideshow
本題です。ずばりWHATWG がメンテするドラフトにStream API が追加されました!!
現時点での仕様はこちらです。
Streams Living Standard:https://streams.spec.whatwg.org/
この仕様では、 readable, writable, transform の三種類が定義されています。(duplex はありません)
これらの仕様は fetch, service worker, source extension, video, audio などで、流れるデータを表現するのに使われていくことになります。
例えば、 video や audio のデータを readable stream として受け取り、それを transform stream に繋ぐ(pipe)ことで、エフェクトをかけたりすることができるし、圧縮されたデータを xhr で取得し、その readable stream を、解凍する transform stream に通してから、ファイルに書き込むための writable stream に渡すなんて使い方が想定されます。
完全に Node.js と同じですね。
全体の構成はこんな感じになります。詳細は以降順次解説していきます。

実装は ES6 の class ベースになっており、継承による拡張についても仕様に言及されています。
ReadableStream は以下のような E@6 のクラスとして定義されています。実際にデータが生成される生成源は underlying source と呼ばれます。そこから取り出したデータを chunk ごとに内部に管理されている queue に追加して行きます。
class ReadableStream{ constructor({ start = (enqueue, close, error) =>{}, pull = (enqueue, close, error) =>{}, cancel = (reason) =>{}, strategy = %DefaultReadableStreamStrategy%} ={}) get closed() get state() cancel(reason) pipeThrough({ writable, readable}, options) pipeTo(dest,{ preventClose, preventAbort, preventCancel} ={}) read() wait()}
動く環境はまだ無いですが、以下のようになる予定です。WebSocket をラップした ReadableStream です。
function makeReadableWebSocketStream(url, protocols){const ws =new WebSocket(url, protocols); ws.binaryType ="arraybuffer";returnnew ReadableStream({ start(enqueue, close, error){ ws.onmessage =event => enqueue(event.data); ws.onend = close; ws.onerror = error;}, cancel(){ ws.close();}});}var webSocketStream = makeReadableWebSocketStream("http://example.com", 80);webSocketStream.pipeTo(writableStream) .then(() => console.log("All data successfully written!")) .catch(e => console.error("Something went wrong!", e));
同様に WritableStream の定義と実装例です。write() で渡されてきた chunk を処理します。
class WritableStream{ constructor({ start = (error) =>{}, write = (chunk) =>{}, close = () =>{}, abort = (reason) => close(), strategy = %DefaultWritableStreamStrategy%} ={}) get closed() get state() abort(reason) close() wait() write(chunk)}
start() の部分は Promise を許容する仕様になているので、解決するように橋渡しします。
function makeWritableWebSocketStream(url, protocols){const ws =new WebSocket(url, protocols);returnnew WritableStream({ start(error){ ws.onerror = error;returnnew Promise(resolve => ws.onopen = resolve);}, write(chunk){ ws.send(chunk);}, close(){returnnew Promise((resolve, reject) =>{ ws.onclose = resolve; ws.close();});}});}var webSocketStream = makeWritableWebSocketStream("http://example.com", 80);readableStream.pipeTo(webSocketStream) .then(() => console.log("All data successfully written!")) .catch(e => console.error("Something went wrong!", e));
TODO: WIP
Stream はデータの流れを表現し、pipeTo でそれらを組み合わせる訳ですが、source や sink もしくは transfer が行う処理などによっては、データの流れる早さにギャップが生じることがあります。
例えば、ローカルのファイルからデータを読み出す ReadableStream から、それをネットワークに流す WritableStream と繋ぐような場合は、前者のデータ生成が、後者のデータ処理よりも早くなる可能性があります。
すると、ReadableStream の内部 Queue に処理待ちの chunk が溜まり続け、そのままでは溢れてしまうため、この場合はデータ生成源であるファイルからの読み出しを止める必要があります。
こうした処理は Back Pressure と呼ばれ、各 Stream の中でそれぞれの Queue は適切に管理する必要があります。
各クラスにあるstrategy というプロパティは、この内部 Queue の管理戦略です。
実装としてはこんな感じらしい。
両方定義としてはこんな感じ。コンストラクタで HWM (HighWaterMark) を決める。
class Strategy{ constructor({ highWaterMark}) shouldApplyBackpressure(queueSize) size(chunk)}
Node.js の StreamAPI でもちょくちょく話題になる、 Backpressure の話や、ソースが Pull/Push ベースなのかどうかに関する話題も、きちんと議論されているようです。
これでサーバからクライアントまで、全て奇麗にデータが流れる Stream の列が出来ると、非常に奇麗にリアルタイムな表現が実装できるようになりそうでうs。
すでにChromium への実装が始まっており、polyfill の実装 も公開されています(ES6 ですよ時代は)。
@yosuke_furukawa ESの人はDemenicだけどストリームの人はDominicで (yusukeとyosuke並にw) スペルが違うんだよこれ豆
間違いでした。 domenic と dominic さんです。
引用をストックしました
引用するにはまずログインしてください
引用をストックできませんでした。再度お試しください
限定公開記事のため引用できません。