30歳からのプログラミング

30歳無職から独学でプログラミングを開始した人間の記録。

Node.js Stream の初歩

Node.js には Stream というインターフェイスが用意されており、これを使うことでデータをストリーミングできる。
Stream を使うことで、データの全てをメモリに保持するのではなく、少しずつ順番にデータを処理していくことが可能になる。

この記事では、Stream の基本的な使い方について説明していく。

WHATWG で定義している Stream はまた別の概念なので、注意する。この記事で扱っている Stream は、それとは別に以前から Node.js に実装されている Stream である。

以下の環境で動作確認している。

  • Node.js のバージョン
    • 16.15.1
  • 使っている npm ライブラリ
    • @types/node@16.11.43
    • ts-node-dev@2.0.0
    • typescript@4.7.4

環境構築

まず最初に、手元で実際にコードを動かすための環境を構築する。
TypeScript で書くための準備作業なので、JavaScript で書く場合はこの工程は不要。

適当なディレクトリを作り、以下のコマンドを実行する。

% yarn init -y
% yarn add -D typescript ts-node-dev @types/node@16
% touch index.ts

最後に、以下の内容のtsconfig.jsonを作る。

{
  "compilerOptions": {
    "strict": true,
    "lib": ["esnext", "DOM"],
    "module": "NodeNext",
    "target": "ESNext"
  }
}

この状態でyarn ts-node-dev --respawn index.tsを実行すると、index.tsに書いた内容を実行してくれるようになる。

Stream はインターフェイス

Stream はインターフェイスであり、そのインターフェイスを満たしているものなら何であれ Stream として扱うことができる。
自分で Stream を実装してもいいが、予め Node.js が用意しているものを利用することが多い。
例えば、fsモジュールはファイルを Stream として扱う仕組みを用意しており、これを使うことでファイルの読み書きをストリーミングで行うことができる。

データを読み込むための Stream である Readable Stream や、書き込むための Stream である Writable Stream など、Stream にはいくつかの種類があり、目的に応じて使い分ける。
先程のファイルの例でいえば、ファイルの読み込みは Readable Stream を使って行い、書き込みは Writable Stream を使って行う。

早速、Stream でファイルを読み込んでみる。あくまでも雰囲気を掴むためのものなので、この時点でコードの内容を理解する必要はない。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs: Readable = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 8,
});

async function main() {
  for await (const chunk of rs) {
    console.log(chunk);
    console.log("\n==delimit==\n");
  }
}

main();

上記のコードを実行すると、以下のログが流れるはず。

import {

==delimit==

 createR

==delimit==

eadStrea

==delimit==

// 以下省略

__filenameindex.ts自身を指しているので、このファイルの内容を8バイトずつ読み込み、その内容をconsole.logに渡している。
このように、データを少しずつ扱う、ということが Stream によって可能になる。

Readable Stream

まずは、fs.createReadStreamを題材にして、Readable Stream の基本的な使い方を見ていく。

createReadStreamにファイルのパスを渡すことで、Readable Stream を作ることができる。
第二引数はオプションで、ここでencodingを指定すると、読み込んだデータをその形式でデコードするようになる。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
});

console.log(rs instanceof Readable); // true

Readable Stream のdataイベントにリスナーを設定すると、そのリスナーにデータが渡される。
そして全てのデータが消費されると、endイベントが発生する。
そのため下記の例では、このコードの内容とend!がログに流れる。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
});

rs.on("data", (chunk) => {
  console.log(chunk);
});

rs.on("end", () => {
  console.log("end!");
});

また、Readable Stream は AsyncIterable でもあるので、for await...of構文を使ってデータを取り出すこともできる。

import { createReadStream } from "fs";

const rs: AsyncIterable<string> = createReadStream(__filename, {
  encoding: "utf-8",
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
  console.log("end!");
}

receive();

消費されたデータは消えてしまうので、下記の例ではreceiveを 3 回実行しているがデータの読み取りは 1 回しか発生しない。
そのため、ファイルの内容が表示されたあと、end!が 3 つ並んで表示される。

import { createReadStream } from "fs";

const rs: AsyncIterable<string> = createReadStream(__filename, {
  encoding: "utf-8",
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
  console.log("end!");
}

receive();
receive();
receive();

highWaterMark

Stream は、その内部にバッファとしてデータを保持する。そして各 Stream は、バッファにどの程度のデータまで保持するのかを示す、highWaterMark という名前の閾値を持っている。
この閾値を過度に超えないように上手く制御することで、「データを少しずつ処理する」ことが可能になる。

Readable Stream の場合、readableHighWaterMarkに highWaterMark の値が格納されている。
createReadStream で作られた Readable Stream の場合、デフォルト値は65536バイト(64 * 1024 = 65536なので64キロバイト)だが、highWaterMarkオプションで設定することもできる。

import { createReadStream } from "fs";

const rs1 = createReadStream(__filename);
const rs2 = createReadStream(__filename, { highWaterMark: 16 });

console.log(rs1.readableHighWaterMark); // 65536
console.log(rs2.readableHighWaterMark); // 16

下記の例だと highWaterMark を16に設定しているので、まず16バイト分のデータだけ内部バッファに格納し、それをconsole.logに渡す。
そしてそのデータは捨てられ、また次の16バイトが読み込まれる。そのため、内部バッファには常に16バイト以下のデータしか格納されないようになっている。

import { createReadStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 16,
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
}

receive();

Writable Stream

データの書き込みには、Writable Stream を使う。

ファイルの場合はcreateWriteStreamで Writable Stream を作れる。
以下のように書くと、dest.txtというファイルへの書き込みを行う Writable Stream が作られる。
createWriteStreamの場合、encodingはデフォルトでutf8なので、この記事では特に指定していない。

import { createWriteStream } from "fs";
import { Writable } from "stream";

const ws: Writable = createWriteStream("dest.txt");

Readable Stream はwriteメソッドを持っており、それを使ってデータの書き込みができる。
書き込みが終わったことを Writeable Stream に伝えるにはendメソッドを使う。そして書き込みが終わったことを知った Writable Stream は、finishイベントを発火する。

以下のコードの実行すると、このコードの内容を16バイト毎に改行しながらdest.txtに書き込んでいき、それが完了するとログにfinish!が流れる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 16,
});

const ws = createWriteStream("dest.txt");

ws.on("finish", () => {
  console.log("finish!");
});

rs.on("data", (chunk) => {
  ws.write(`${chunk}\n`);
});

highWaterMark を頼りに「水位」を調節する

Writable Stream も内部バッファを持っており、そこにデータが蓄積されていく。
そしてやはり highWaterMark を設定することができる。

import { createWriteStream } from "fs";

const ws1 = createWriteStream("dest.txt");
const ws2 = createWriteStream("dest.txt", { highWaterMark: 8 });

console.log(ws1.writableHighWaterMark); // 16384
console.log(ws2.writableHighWaterMark); // 8

そしてこの値に応じて内部バッファに格納されるデータ量が自動的に調節される、というわけではない。
highWaterMark はただ閾値を表現しているだけであり、それを超えないことを保証するようなものではない。

ではどうすればいいのかというと、データ量を監視しながら上手く調節する必要がある。
データ量の状況を知るための手段がいくつか用意されているので、それを見ながら、手動で調節を行う。

まずは、状況を知るための手段について。
writeメソッドの返り値」と「drainイベント」を利用することで、highWaterMark を超えているか確認できる。

writeメソッドは、内部バッファに格納されているデータ量が highWaterMark を超えていなければtrueを、超えてしまっていればfalseを返す。

以降のコードの実行結果は、筆者の手元の環境によるもの。
データの処理速度は常に一定というわけではないので、結果が変わることもある。

以下のコードだと 5 回書き込みが行われるが、writeメソッドはいずれもtrueを返す。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 256,
});

rs.on("data", (chunk) => {
  console.log(ws.write(chunk));
});

同じ内容で Writable Stream の highWaterMark だけ16に変更する。
そうすると、5 回ともfalseを返す。highWaterMark を小さくしたことで、内部バッファに格納されているデータ量が常にそれを超えるようになってしまった。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.on("data", (chunk) => {
  console.log(ws.write(chunk));
});

drainイベントは、highWaterMark を超えてしまったあと、内部バッファに格納されている全てのデータが排出されたときに発生する。
Node.js Stream では、drain や highWaterMark のように、「水」に関係した単語でデータ量の状況を表現している。

下記のコードだと、drain!がログに流れる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

ws.on("drain", () => {
  console.log("drain!");
});

rs.on("data", (chunk) => {
  ws.write(chunk);
});

だが以下のようにすると、そもそも一度も highWaterMark を超えないので、drainイベントも発生しないままプログラムが終了する。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 256,
});

ws.on("drain", () => {
  console.log("drain!");
});

rs.on("data", (chunk) => {
  ws.write(chunk);
});

これら 2 つの指標を使い、流れ込んでくるデータ量を制御することで、内部バッファに格納されているデータ量が highWaterMark を過度に超えないようにすることができる。
具体的には、writefalseを返したらデータの読み込みを一時停止し、drainが発生したら再開する。

それを実装したのが、以下のコード。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

ws.on("drain", () => {
  console.log("drain!");
  rs.resume(); // 読み込みを再開する
});

rs.on("data", (chunk) => {
  const ok = ws.write(chunk);
  if (!ok) {
    rs.pause(); // 読み込みを一時停止する
  }
});

このコードを実行するとdrain!が何度もログに流れる。
このことから、「highWaterMark を超えたのでデータの読み込みを一時停止する → drain イベントが発生したので再開する → 再び highWaterMark を超える」というサイクルが何度も繰り返されていることが分かる。

pipe による自動調節

Readable Stream はpipeというメソッドを持っており、引数に Writable Stream を渡すことで、読み込んだデータを Writable Stream に流し込むことができる。
このメソッドを使えば、writeメソッドやdataイベントを使わずにデータを書き込める。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.pipe(ws);

さらにpipeメソッドは、データ量の調節も自動的に行なってくれる。
そのため実は、pipeメソッドを使えば、先程書いたような調節処理を自分で実装する必要はない。

下記のコードを実行するとpause!が複数回表示されるので、pipeメソッドを実行すると内部的にpauseメソッドが呼ばれていることが分かる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.on("pause", () => {
  console.log("pause!");
});

rs.pipe(ws);

参考資料