Node.js には Stream というインターフェイスが用意されており、これを使うことでデータをストリーミングできる。
Stream を使うことで、データの全てをメモリに保持するのではなく、少しずつ順番にデータを処理していくことが可能になる。
この記事では、Stream の基本的な使い方について説明していく。
WHATWG で定義している Stream はまた別の概念なので、注意する。この記事で扱っている Stream は、それとは別に以前から Node.js に実装されている Stream である。
以下の環境で動作確認している。
- Node.js のバージョン
- 16.15.1
- 使っている npm ライブラリ
- @types/node@16.11.43
- ts-node-dev@2.0.0
- typescript@4.7.4
環境構築
まず最初に、手元で実際にコードを動かすための環境を構築する。
TypeScript で書くための準備作業なので、JavaScript で書く場合はこの工程は不要。
適当なディレクトリを作り、以下のコマンドを実行する。
% yarn init -y % yarn add -D typescript ts-node-dev @types/node@16 % touch index.ts
最後に、以下の内容のtsconfig.json
を作る。
{ "compilerOptions": { "strict": true, "lib": ["esnext", "DOM"], "module": "NodeNext", "target": "ESNext" } }
この状態でyarn ts-node-dev --respawn index.ts
を実行すると、index.ts
に書いた内容を実行してくれるようになる。
Stream はインターフェイス
Stream はインターフェイスであり、そのインターフェイスを満たしているものなら何であれ Stream として扱うことができる。
自分で Stream を実装してもいいが、予め Node.js が用意しているものを利用することが多い。
例えば、fs
モジュールはファイルを Stream として扱う仕組みを用意しており、これを使うことでファイルの読み書きをストリーミングで行うことができる。
データを読み込むための Stream である Readable Stream や、書き込むための Stream である Writable Stream など、Stream にはいくつかの種類があり、目的に応じて使い分ける。
先程のファイルの例でいえば、ファイルの読み込みは Readable Stream を使って行い、書き込みは Writable Stream を使って行う。
早速、Stream でファイルを読み込んでみる。あくまでも雰囲気を掴むためのものなので、この時点でコードの内容を理解する必要はない。
import { createReadStream } from "fs"; import { Readable } from "stream"; const rs: Readable = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 8, }); async function main() { for await (const chunk of rs) { console.log(chunk); console.log("\n==delimit==\n"); } } main();
上記のコードを実行すると、以下のログが流れるはず。
import { ==delimit== createR ==delimit== eadStrea ==delimit== // 以下省略
__filename
はindex.ts
自身を指しているので、このファイルの内容を8
バイトずつ読み込み、その内容をconsole.log
に渡している。
このように、データを少しずつ扱う、ということが Stream によって可能になる。
Readable Stream
まずは、fs.createReadStream
を題材にして、Readable Stream の基本的な使い方を見ていく。
createReadStream
にファイルのパスを渡すことで、Readable Stream を作ることができる。
第二引数はオプションで、ここでencoding
を指定すると、読み込んだデータをその形式でデコードするようになる。
import { createReadStream } from "fs"; import { Readable } from "stream"; const rs = createReadStream(__filename, { encoding: "utf-8", }); console.log(rs instanceof Readable); // true
Readable Stream のdata
イベントにリスナーを設定すると、そのリスナーにデータが渡される。
そして全てのデータが消費されると、end
イベントが発生する。
そのため下記の例では、このコードの内容とend!
がログに流れる。
import { createReadStream } from "fs"; import { Readable } from "stream"; const rs = createReadStream(__filename, { encoding: "utf-8", }); rs.on("data", (chunk) => { console.log(chunk); }); rs.on("end", () => { console.log("end!"); });
また、Readable Stream は AsyncIterable でもあるので、for await...of
構文を使ってデータを取り出すこともできる。
import { createReadStream } from "fs"; const rs: AsyncIterable<string> = createReadStream(__filename, { encoding: "utf-8", }); async function receive() { for await (const chunk of rs) { console.log(chunk); } console.log("end!"); } receive();
消費されたデータは消えてしまうので、下記の例ではreceive
を 3 回実行しているがデータの読み取りは 1 回しか発生しない。
そのため、ファイルの内容が表示されたあと、end!
が 3 つ並んで表示される。
import { createReadStream } from "fs"; const rs: AsyncIterable<string> = createReadStream(__filename, { encoding: "utf-8", }); async function receive() { for await (const chunk of rs) { console.log(chunk); } console.log("end!"); } receive(); receive(); receive();
highWaterMark
Stream は、その内部にバッファとしてデータを保持する。そして各 Stream は、バッファにどの程度のデータまで保持するのかを示す、highWaterMark という名前の閾値を持っている。
この閾値を過度に超えないように上手く制御することで、「データを少しずつ処理する」ことが可能になる。
Readable Stream の場合、readableHighWaterMark
に highWaterMark の値が格納されている。
createReadStream で作られた Readable Stream の場合、デフォルト値は65536
バイト(64 * 1024 = 65536
なので64
キロバイト)だが、highWaterMark
オプションで設定することもできる。
import { createReadStream } from "fs"; const rs1 = createReadStream(__filename); const rs2 = createReadStream(__filename, { highWaterMark: 16 }); console.log(rs1.readableHighWaterMark); // 65536 console.log(rs2.readableHighWaterMark); // 16
下記の例だと highWaterMark を16
に設定しているので、まず16
バイト分のデータだけ内部バッファに格納し、それをconsole.log
に渡す。
そしてそのデータは捨てられ、また次の16
バイトが読み込まれる。そのため、内部バッファには常に16
バイト以下のデータしか格納されないようになっている。
import { createReadStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 16, }); async function receive() { for await (const chunk of rs) { console.log(chunk); } } receive();
Writable Stream
データの書き込みには、Writable Stream を使う。
ファイルの場合はcreateWriteStream
で Writable Stream を作れる。
以下のように書くと、dest.txt
というファイルへの書き込みを行う Writable Stream が作られる。
createWriteStream
の場合、encoding
はデフォルトでutf8
なので、この記事では特に指定していない。
import { createWriteStream } from "fs"; import { Writable } from "stream"; const ws: Writable = createWriteStream("dest.txt");
Readable Stream はwrite
メソッドを持っており、それを使ってデータの書き込みができる。
書き込みが終わったことを Writeable Stream に伝えるにはend
メソッドを使う。そして書き込みが終わったことを知った Writable Stream は、finish
イベントを発火する。
以下のコードの実行すると、このコードの内容を16
バイト毎に改行しながらdest.txt
に書き込んでいき、それが完了するとログにfinish!
が流れる。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 16, }); const ws = createWriteStream("dest.txt"); ws.on("finish", () => { console.log("finish!"); }); rs.on("data", (chunk) => { ws.write(`${chunk}\n`); });
highWaterMark を頼りに「水位」を調節する
Writable Stream も内部バッファを持っており、そこにデータが蓄積されていく。
そしてやはり highWaterMark を設定することができる。
import { createWriteStream } from "fs"; const ws1 = createWriteStream("dest.txt"); const ws2 = createWriteStream("dest.txt", { highWaterMark: 8 }); console.log(ws1.writableHighWaterMark); // 16384 console.log(ws2.writableHighWaterMark); // 8
そしてこの値に応じて内部バッファに格納されるデータ量が自動的に調節される、というわけではない。
highWaterMark はただ閾値を表現しているだけであり、それを超えないことを保証するようなものではない。
ではどうすればいいのかというと、データ量を監視しながら上手く調節する必要がある。
データ量の状況を知るための手段がいくつか用意されているので、それを見ながら、手動で調節を行う。
まずは、状況を知るための手段について。
「write
メソッドの返り値」と「drain
イベント」を利用することで、highWaterMark を超えているか確認できる。
write
メソッドは、内部バッファに格納されているデータ量が highWaterMark を超えていなければtrue
を、超えてしまっていればfalse
を返す。
以降のコードの実行結果は、筆者の手元の環境によるもの。
データの処理速度は常に一定というわけではないので、結果が変わることもある。
以下のコードだと 5 回書き込みが行われるが、write
メソッドはいずれもtrue
を返す。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 256, }); rs.on("data", (chunk) => { console.log(ws.write(chunk)); });
同じ内容で Writable Stream の highWaterMark だけ16
に変更する。
そうすると、5 回ともfalse
を返す。highWaterMark を小さくしたことで、内部バッファに格納されているデータ量が常にそれを超えるようになってしまった。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 16, }); rs.on("data", (chunk) => { console.log(ws.write(chunk)); });
drain
イベントは、highWaterMark を超えてしまったあと、内部バッファに格納されている全てのデータが排出されたときに発生する。
Node.js Stream では、drain や highWaterMark のように、「水」に関係した単語でデータ量の状況を表現している。
下記のコードだと、drain!
がログに流れる。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 16, }); ws.on("drain", () => { console.log("drain!"); }); rs.on("data", (chunk) => { ws.write(chunk); });
だが以下のようにすると、そもそも一度も highWaterMark を超えないので、drain
イベントも発生しないままプログラムが終了する。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 256, }); ws.on("drain", () => { console.log("drain!"); }); rs.on("data", (chunk) => { ws.write(chunk); });
これら 2 つの指標を使い、流れ込んでくるデータ量を制御することで、内部バッファに格納されているデータ量が highWaterMark を過度に超えないようにすることができる。
具体的には、write
がfalse
を返したらデータの読み込みを一時停止し、drain
が発生したら再開する。
それを実装したのが、以下のコード。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 16, }); ws.on("drain", () => { console.log("drain!"); rs.resume(); // 読み込みを再開する }); rs.on("data", (chunk) => { const ok = ws.write(chunk); if (!ok) { rs.pause(); // 読み込みを一時停止する } });
このコードを実行するとdrain!
が何度もログに流れる。
このことから、「highWaterMark を超えたのでデータの読み込みを一時停止する → drain イベントが発生したので再開する → 再び highWaterMark を超える」というサイクルが何度も繰り返されていることが分かる。
pipe による自動調節
Readable Stream はpipe
というメソッドを持っており、引数に Writable Stream を渡すことで、読み込んだデータを Writable Stream に流し込むことができる。
このメソッドを使えば、write
メソッドやdata
イベントを使わずにデータを書き込める。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 16, }); rs.pipe(ws);
さらにpipe
メソッドは、データ量の調節も自動的に行なってくれる。
そのため実は、pipe
メソッドを使えば、先程書いたような調節処理を自分で実装する必要はない。
下記のコードを実行するとpause!
が複数回表示されるので、pipe
メソッドを実行すると内部的にpause
メソッドが呼ばれていることが分かる。
import { createReadStream, createWriteStream } from "fs"; const rs = createReadStream(__filename, { encoding: "utf-8", highWaterMark: 64, }); const ws = createWriteStream("dest.txt", { highWaterMark: 16, }); rs.on("pause", () => { console.log("pause!"); }); rs.pipe(ws);