30歳からのプログラミング

30歳無職から独学でプログラミングを開始した人間の記録。

継続渡しスタイルを使ってプログラムの見通しをよくする

この記事では、継続渡しスタイル(continuation passing style、以下 CPS)の概要と、CPS の活用例を書いていく。

この記事に出てくるコードの動作確認は TypeScript の4.7.4で行っている。

後続の処理を引数として渡す

関数が終わった後に実行される後続の処理をその関数の引数として渡すスタイル、そういったプログラムの書き方を、 CPS と呼ぶ。

例えば、以下のようなコードがあるとする。

const getLength = (str: string): number => str.length;

const n: number = getLength("hello");
console.log(n); // 5

getLength("hello")の結果をnに代入し、それを使ってconsole.logを実行している。

getLengthを CPS に書き換えると次のようになる。

const getLengthCps = <T>(cont: (x: number) => T, str: string): T =>
  cont(str.length);

getLengthstr.lengthを返していたが、getLengthCpsstr.lengthを「関数が終わった後に実行される後続の処理」であるcontに渡している。

numberを受け取る関数ならどんなものでも、contとして渡すことができる。

const getLengthCps = <T>(cont: (x: number) => T, str: string): T =>
  cont(str.length);

getLengthCps(console.log, "hello"); // 5
console.log(getLengthCps((length) => length * 3, "foo")); // 9

CPS を使ったリファクタリング

CPS の利用例のひとつとして、特定の条件を満たしたときにのみ後続の処理を実行する、というプログラムを書いてみる。

CPS を使っていない、以下のコードがあったとする。
少し長いが、getEmployeesPagePropsgetOfficesPagePropsの概要さえ理解できれば問題ない。
これらの関数の返り値をコンポーネントに渡して View を作ることを想定している。

type Employees = string[];

type Offices = string[];

type GetPageProps<T> = (
  sessionId: string
) => { ok: false; message: string } | { ok: true; data: T };

const CORRECT_SESSION_ID = "123";

const auth = (
  sessionId: string
): { ok: true; companyId: string } | { ok: false } => {
  if (sessionId === CORRECT_SESSION_ID) {
    return {
      ok: true,
      companyId: "1",
    };
  }
  return {
    ok: false,
  };
};

const getEmployeesPageProps: GetPageProps<{ employees: Employees | null }> = (
  sessionId
) => {
  const authResult = auth(sessionId);
  if (authResult.ok === false) {
    return {
      ok: false,
      message: "Unauthorized",
    };
  }

  const dummyDb = new Map([["1", ["Alice", "Bob"]]]);

  return {
    ok: true,
    data: { employees: dummyDb.get(authResult.companyId) ?? null },
  };
};

const getOfficesPageProps: GetPageProps<{ offices: Offices | null }> = (
  sessionId
) => {
  const authResult = auth(sessionId);
  if (authResult.ok === false) {
    return {
      ok: false,
      message: "Unauthorized",
    };
  }

  const dummyDb = new Map([["1", ["London", "Paris"]]]);

  return {
    ok: true,
    data: { offices: dummyDb.get(authResult.companyId) ?? null },
  };
};

console.log(getEmployeesPageProps("xyz")); // { ok: false, message: 'Unauthorized' }
console.log(getEmployeesPageProps("123")); // { ok: true, data: { employees: [ 'Alice', 'Bob' ] } }
console.log(getOfficesPageProps("xyz")); // { ok: false, message: 'Unauthorized' }
console.log(getOfficesPageProps("123")); // { ok: true, data: { offices: [ 'London', 'Paris' ] } }

getEmployeesPagePropsgetOfficesPagePropsはどちらも、以下の処理を行っている。

  1. 引数として渡されたsessionIdを使って認証を行う
  2. 認証に失敗した場合はその旨を返し、処理を終了する
  3. 認証に成功した場合は手に入れたcompanyIdを使ってEmployeesもしくはOfficesを取得し、それを含んだGetPagePropsを返す

このうち、12は全く同じコードなので、これを共通化したい。
CPS を使って「認証が成功した後の処理を引数として渡す」という書き方にすることで、これを実現できる。

以下のcontinueWithAuthは「認証が成功した後の処理」をcontとして受け取り、認証が成功したときにのみcontを呼び出している。
こうすることで、12の処理を共通化し、3として任意の処理を渡せるようになる。

const continueWithAuth = <T>(
  cont: (companyId: string) => { ok: true; data: T },
  sessionId: string
): ReturnType<GetPageProps<T>> => {
  const authResult = auth(sessionId);
  if (authResult.ok === false) {
    return {
      ok: false,
      message: "Unauthorized",
    };
  }
  return cont(authResult.companyId);
};

const getEmployeesPageProps: GetPageProps<{ employees: Employees | null }> = (
  sessionId
) =>
  continueWithAuth((companyId) => {
    const dummyDb = new Map([["1", ["Alice", "Bob"]]]);
    return {
      ok: true,
      data: {
        employees: dummyDb.get(companyId) ?? null,
      },
    };
  }, sessionId);

const getOfficesPageProps: GetPageProps<{ offices: Offices | null }> = (
  sessionId
) =>
  continueWithAuth((companyId) => {
    const dummyDb = new Map([["1", ["London", "Paris"]]]);
    return {
      ok: true,
      data: {
        offices: dummyDb.get(companyId) ?? null,
      },
    };
  }, sessionId);

参考資料

Node.js Stream の初歩

Node.js には Stream というインターフェイスが用意されており、これを使うことでデータをストリーミングできる。
Stream を使うことで、データの全てをメモリに保持するのではなく、少しずつ順番にデータを処理していくことが可能になる。

この記事では、Stream の基本的な使い方について説明していく。

WHATWG で定義している Stream はまた別の概念なので、注意する。この記事で扱っている Stream は、それとは別に以前から Node.js に実装されている Stream である。

以下の環境で動作確認している。

  • Node.js のバージョン
    • 16.15.1
  • 使っている npm ライブラリ
    • @types/node@16.11.43
    • ts-node-dev@2.0.0
    • typescript@4.7.4

環境構築

まず最初に、手元で実際にコードを動かすための環境を構築する。
TypeScript で書くための準備作業なので、JavaScript で書く場合はこの工程は不要。

適当なディレクトリを作り、以下のコマンドを実行する。

% yarn init -y
% yarn add -D typescript ts-node-dev @types/node@16
% touch index.ts

最後に、以下の内容のtsconfig.jsonを作る。

{
  "compilerOptions": {
    "strict": true,
    "lib": ["esnext", "DOM"],
    "module": "NodeNext",
    "target": "ESNext"
  }
}

この状態でyarn ts-node-dev --respawn index.tsを実行すると、index.tsに書いた内容を実行してくれるようになる。

Stream はインターフェイス

Stream はインターフェイスであり、そのインターフェイスを満たしているものなら何であれ Stream として扱うことができる。
自分で Stream を実装してもいいが、予め Node.js が用意しているものを利用することが多い。
例えば、fsモジュールはファイルを Stream として扱う仕組みを用意しており、これを使うことでファイルの読み書きをストリーミングで行うことができる。

データを読み込むための Stream である Readable Stream や、書き込むための Stream である Writable Stream など、Stream にはいくつかの種類があり、目的に応じて使い分ける。
先程のファイルの例でいえば、ファイルの読み込みは Readable Stream を使って行い、書き込みは Writable Stream を使って行う。

早速、Stream でファイルを読み込んでみる。あくまでも雰囲気を掴むためのものなので、この時点でコードの内容を理解する必要はない。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs: Readable = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 8,
});

async function main() {
  for await (const chunk of rs) {
    console.log(chunk);
    console.log("\n==delimit==\n");
  }
}

main();

上記のコードを実行すると、以下のログが流れるはず。

import {

==delimit==

 createR

==delimit==

eadStrea

==delimit==

// 以下省略

__filenameindex.ts自身を指しているので、このファイルの内容を8バイトずつ読み込み、その内容をconsole.logに渡している。
このように、データを少しずつ扱う、ということが Stream によって可能になる。

Readable Stream

まずは、fs.createReadStreamを題材にして、Readable Stream の基本的な使い方を見ていく。

createReadStreamにファイルのパスを渡すことで、Readable Stream を作ることができる。
第二引数はオプションで、ここでencodingを指定すると、読み込んだデータをその形式でデコードするようになる。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
});

console.log(rs instanceof Readable); // true

Readable Stream のdataイベントにリスナーを設定すると、そのリスナーにデータが渡される。
そして全てのデータが消費されると、endイベントが発生する。
そのため下記の例では、このコードの内容とend!がログに流れる。

import { createReadStream } from "fs";
import { Readable } from "stream";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
});

rs.on("data", (chunk) => {
  console.log(chunk);
});

rs.on("end", () => {
  console.log("end!");
});

また、Readable Stream は AsyncIterable でもあるので、for await...of構文を使ってデータを取り出すこともできる。

import { createReadStream } from "fs";

const rs: AsyncIterable<string> = createReadStream(__filename, {
  encoding: "utf-8",
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
  console.log("end!");
}

receive();

消費されたデータは消えてしまうので、下記の例ではreceiveを 3 回実行しているがデータの読み取りは 1 回しか発生しない。
そのため、ファイルの内容が表示されたあと、end!が 3 つ並んで表示される。

import { createReadStream } from "fs";

const rs: AsyncIterable<string> = createReadStream(__filename, {
  encoding: "utf-8",
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
  console.log("end!");
}

receive();
receive();
receive();

highWaterMark

Stream は、その内部にバッファとしてデータを保持する。そして各 Stream は、バッファにどの程度のデータまで保持するのかを示す、highWaterMark という名前の閾値を持っている。
この閾値を過度に超えないように上手く制御することで、「データを少しずつ処理する」ことが可能になる。

Readable Stream の場合、readableHighWaterMarkに highWaterMark の値が格納されている。
createReadStream で作られた Readable Stream の場合、デフォルト値は65536バイト(64 * 1024 = 65536なので64キロバイト)だが、highWaterMarkオプションで設定することもできる。

import { createReadStream } from "fs";

const rs1 = createReadStream(__filename);
const rs2 = createReadStream(__filename, { highWaterMark: 16 });

console.log(rs1.readableHighWaterMark); // 65536
console.log(rs2.readableHighWaterMark); // 16

下記の例だと highWaterMark を16に設定しているので、まず16バイト分のデータだけ内部バッファに格納し、それをconsole.logに渡す。
そしてそのデータは捨てられ、また次の16バイトが読み込まれる。そのため、内部バッファには常に16バイト以下のデータしか格納されないようになっている。

import { createReadStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 16,
});

async function receive() {
  for await (const chunk of rs) {
    console.log(chunk);
  }
}

receive();

Writable Stream

データの書き込みには、Writable Stream を使う。

ファイルの場合はcreateWriteStreamで Writable Stream を作れる。
以下のように書くと、dest.txtというファイルへの書き込みを行う Writable Stream が作られる。
createWriteStreamの場合、encodingはデフォルトでutf8なので、この記事では特に指定していない。

import { createWriteStream } from "fs";
import { Writable } from "stream";

const ws: Writable = createWriteStream("dest.txt");

Readable Stream はwriteメソッドを持っており、それを使ってデータの書き込みができる。
書き込みが終わったことを Writeable Stream に伝えるにはendメソッドを使う。そして書き込みが終わったことを知った Writable Stream は、finishイベントを発火する。

以下のコードの実行すると、このコードの内容を16バイト毎に改行しながらdest.txtに書き込んでいき、それが完了するとログにfinish!が流れる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 16,
});

const ws = createWriteStream("dest.txt");

ws.on("finish", () => {
  console.log("finish!");
});

rs.on("data", (chunk) => {
  ws.write(`${chunk}\n`);
});

highWaterMark を頼りに「水位」を調節する

Writable Stream も内部バッファを持っており、そこにデータが蓄積されていく。
そしてやはり highWaterMark を設定することができる。

import { createWriteStream } from "fs";

const ws1 = createWriteStream("dest.txt");
const ws2 = createWriteStream("dest.txt", { highWaterMark: 8 });

console.log(ws1.writableHighWaterMark); // 16384
console.log(ws2.writableHighWaterMark); // 8

そしてこの値に応じて内部バッファに格納されるデータ量が自動的に調節される、というわけではない。
highWaterMark はただ閾値を表現しているだけであり、それを超えないことを保証するようなものではない。

ではどうすればいいのかというと、データ量を監視しながら上手く調節する必要がある。
データ量の状況を知るための手段がいくつか用意されているので、それを見ながら、手動で調節を行う。

まずは、状況を知るための手段について。
writeメソッドの返り値」と「drainイベント」を利用することで、highWaterMark を超えているか確認できる。

writeメソッドは、内部バッファに格納されているデータ量が highWaterMark を超えていなければtrueを、超えてしまっていればfalseを返す。

以降のコードの実行結果は、筆者の手元の環境によるもの。
データの処理速度は常に一定というわけではないので、結果が変わることもある。

以下のコードだと 5 回書き込みが行われるが、writeメソッドはいずれもtrueを返す。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 256,
});

rs.on("data", (chunk) => {
  console.log(ws.write(chunk));
});

同じ内容で Writable Stream の highWaterMark だけ16に変更する。
そうすると、5 回ともfalseを返す。highWaterMark を小さくしたことで、内部バッファに格納されているデータ量が常にそれを超えるようになってしまった。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.on("data", (chunk) => {
  console.log(ws.write(chunk));
});

drainイベントは、highWaterMark を超えてしまったあと、内部バッファに格納されている全てのデータが排出されたときに発生する。
Node.js Stream では、drain や highWaterMark のように、「水」に関係した単語でデータ量の状況を表現している。

下記のコードだと、drain!がログに流れる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

ws.on("drain", () => {
  console.log("drain!");
});

rs.on("data", (chunk) => {
  ws.write(chunk);
});

だが以下のようにすると、そもそも一度も highWaterMark を超えないので、drainイベントも発生しないままプログラムが終了する。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 256,
});

ws.on("drain", () => {
  console.log("drain!");
});

rs.on("data", (chunk) => {
  ws.write(chunk);
});

これら 2 つの指標を使い、流れ込んでくるデータ量を制御することで、内部バッファに格納されているデータ量が highWaterMark を過度に超えないようにすることができる。
具体的には、writefalseを返したらデータの読み込みを一時停止し、drainが発生したら再開する。

それを実装したのが、以下のコード。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

ws.on("drain", () => {
  console.log("drain!");
  rs.resume(); // 読み込みを再開する
});

rs.on("data", (chunk) => {
  const ok = ws.write(chunk);
  if (!ok) {
    rs.pause(); // 読み込みを一時停止する
  }
});

このコードを実行するとdrain!が何度もログに流れる。
このことから、「highWaterMark を超えたのでデータの読み込みを一時停止する → drain イベントが発生したので再開する → 再び highWaterMark を超える」というサイクルが何度も繰り返されていることが分かる。

pipe による自動調節

Readable Stream はpipeというメソッドを持っており、引数に Writable Stream を渡すことで、読み込んだデータを Writable Stream に流し込むことができる。
このメソッドを使えば、writeメソッドやdataイベントを使わずにデータを書き込める。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.pipe(ws);

さらにpipeメソッドは、データ量の調節も自動的に行なってくれる。
そのため実は、pipeメソッドを使えば、先程書いたような調節処理を自分で実装する必要はない。

下記のコードを実行するとpause!が複数回表示されるので、pipeメソッドを実行すると内部的にpauseメソッドが呼ばれていることが分かる。

import { createReadStream, createWriteStream } from "fs";

const rs = createReadStream(__filename, {
  encoding: "utf-8",
  highWaterMark: 64,
});

const ws = createWriteStream("dest.txt", {
  highWaterMark: 16,
});

rs.on("pause", () => {
  console.log("pause!");
});

rs.pipe(ws);

参考資料