30歳からのプログラミング

30歳無職から独学でプログラミングを開始した人間の記録。

I/O 多重化で TCP サーバの並行処理能力を改善する

I/O とはデータの入出力のことであり、ネットワークプログラミングの文脈では「ネットワーク経由でのデータの送受信」を指すことが多い。そして、サーバが複数のクライアントの対応を同時に行う場合、複数の I/O を上手く処理する必要がある。これがまずいと、スループットやリソース効率などに問題があるサーバになってしまう。
複数の I/O を上手く処理するための戦略のひとつが、 I/O 多重化。これは、ひとつのスレッドで複数の I/O を同時に監視・管理する手法。epoll などのシステムコールを使って実装できる。
この記事では、 epoll による I/O 多重化を行った TCP サーバの例を示し、それによってどのような課題を解決できるのか見ていく。

epoll は Linux の API であり macOS などでは使えないので、 Linux でコンパイルや動作確認を行う。
この記事の内容は以下の環境で動作確認を行った。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 24.04.3 LTS
Release:    24.04
Codename:   noble
$ rustc --version
rustc 1.91.0 (f8297e351 2025-10-28)
$ cargo --version
cargo 1.91.0 (ea2d97820 2025-10-10)

Rust の Edition は2024

反復サーバの問題点

まず、 I/O 多重化を行っておらず、接続要求のあったクライアントをひとつずつ順番に処理していく TCP サーバを用意する。
このような仕組みのサーバは反復サーバと呼ばれる。

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};

fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
    let mut buffer = [0u8; 4096];
    loop {
        let n = stream.read(&mut buffer)?;
        if n == 0 {
            break;
        }
        stream.write_all(&buffer[..n])?;
    }
    Ok(())
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:9090")?;
    println!("ブロッキングエコーサーバー起動: 0.0.0.0:9090");
    loop {
        match listener.accept() {
            Ok((stream, addr)) => {
                println!(
                    "クライアント {} から接続要求を受け取ったので許可しエコー処理を開始",
                    addr
                );
                let _ = handle_client(stream);
                println!("クライアント {} の対応が完了", addr)
            }
            Err(e) => eprintln!("エラー: {}", e),
        }
    }
}

クライアントから送信されてきたデータを読み込み、その内容をそのままクライアントに返すだけの、シンプルなサーバ。

次に、この反復サーバに接続するクライアントを用意する。

use std::io::{Read, Write};
use std::net::TcpStream;

fn main() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:9090")?;
    println!("接続しました");

    stream.write_all(b"Hello, World!\n")?;
    println!("送信完了");

    let mut buffer = [0u8; 1024];
    let n = stream.read(&mut buffer)?;
    println!("受信: {}", String::from_utf8_lossy(&buffer[..n]));

    Ok(())
}

サーバに接続しHello, World!\nという文字列を送る。
送信後、サーバからのレスポンスを読み込み標準出力に出力する。

先程のサーバを起動した状態でクライアントを実行してみる。

$ ./target/debug/client-normal
接続しました
送信完了
受信: Hello, World!

今回用意したサーバはクライアントから送られてきた内容を返すだけのサーバなので、Hello, World!\nが返ってくる。

次に、クライアントをもうひとつ用意する。

use std::io::{Read, Write};
use std::net::TcpStream;
use std::thread::sleep;
use std::time::Duration;

fn main() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:9090")?;
    println!("接続しました");

    let message = b"Hello, World!\n";
    for byte in message {
        stream.write_all(&[*byte])?;
        stream.flush()?;
        println!("送信: {}", *byte as char);
        sleep(Duration::from_secs(1));
    }
    println!("送信完了");

    let mut buffer = [0u8; 1024];
    let n = stream.read(&mut buffer)?;
    println!("受信: {}", String::from_utf8_lossy(&buffer[..n]));

    Ok(())
}

データ送信の部分のみが先程のクライアントと異なっており、 1 秒ごとに 1 バイトを TCP サーバに送信している。そうすることで、処理速度が遅いクライアントを擬似的に再現している。

1 バイト毎に送信していき、全てを送信し終わったあとにサーバからのレスポンスを読み込み、その内容を標準出力に出力している。

では、処理速度が遅いクライアントがサーバに接続している最中に他のクライアント(最初に示したクライアント)が接続するとどうなるのか。

処理速度が遅いクライアントの対応が終わるまで、後続のクライアントにレスポンスが返ってこない。
この反復サーバはクライアントをひとつずつしか処理できず、現在対応中のクライアントの処理が終わるまで(クライアントが接続を閉じるまで)後続のクライアントは待たされるため、このような挙動になってしまう。

サーバの出力も同時に見てみると分かりやすい。

まず最初に処理速度が遅いクライアント(127.0.0.1:51990)が接続要求をしてきたので、その対応が始まる。すると、クライアントが接続を閉じるまで、具体的にはread()0を返すまで、それ以外のクライアントの対応を行うことができない。127.0.0.1:52000は待つしかなく、127.0.0.1:51990の対応が完了してようやく127.0.0.1:52000の対応が始まる。

この問題を解決するアプローチは複数考えられるが、I/O 多重化もそのひとつである。

epoll で I/O 多重化を行う

以下が、 I/O 多重化を行った TCP サーバ。

use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
use nix::sys::socket::{SockaddrStorage, accept, getpeername};
use nix::unistd::close;
use std::collections::HashMap;
use std::net::TcpListener;
use std::os::fd::{AsRawFd, BorrowedFd, RawFd};

const BUFFER_SIZE: usize = 4096;
const MAX_EVENTS: usize = 1024;

struct EchoContext {
    buffer: Vec<u8>,
}

impl EchoContext {
    fn new() -> Self {
        Self {
            buffer: vec![0; BUFFER_SIZE],
        }
    }
}

struct EventLoop {
    epoll: Epoll,
    contexts: HashMap<RawFd, EchoContext>,
}

impl EventLoop {
    fn new() -> std::io::Result<Self> {
        let epoll = Epoll::new(EpollCreateFlags::empty())?;
        Ok(Self {
            epoll,
            contexts: HashMap::new(),
        })
    }

    fn add_fd(&mut self, fd: RawFd) -> nix::Result<()> {
        let event = EpollEvent::new(EpollFlags::EPOLLIN, fd as u64);
        let borrowed_fd = unsafe { BorrowedFd::borrow_raw(fd) };
        self.epoll.add(borrowed_fd, event)?;
        self.contexts.insert(fd, EchoContext::new());
        Ok(())
    }

    fn remove_fd(&mut self, fd: RawFd) {
        let borrowed_fd = unsafe { BorrowedFd::borrow_raw(fd) };
        let _ = self.epoll.delete(borrowed_fd);
        self.contexts.remove(&fd);
        match getpeername::<SockaddrStorage>(fd) {
            Ok(addr) => println!("クライアント {} の対応が完了", addr),
            Err(e) => println!("fd: {}, addr取得失敗: {}", fd, e),
        }
        let _ = close(fd);
    }

    fn handle_echo(&mut self, fd: RawFd) -> bool {
        let ctx = match self.contexts.get_mut(&fd) {
            Some(c) => c,
            None => return false,
        };

        let borrowed_fd = unsafe { BorrowedFd::borrow_raw(fd) };

        let bytes = match nix::unistd::read(fd, &mut ctx.buffer) {
            Ok(0) => return false,
            Ok(n) => n,
            Err(nix::errno::Errno::EAGAIN) | Err(nix::errno::Errno::EINTR) => return true,
            Err(_) => return false,
        };

        let mut sent = 0;
        while sent < bytes {
            match nix::unistd::write(borrowed_fd, &ctx.buffer[sent..bytes]) {
                Ok(n) => {
                    println!("ファイルディスクリプタ {} に対してデータ送信", fd);
                    sent += n;
                }
                Err(nix::errno::Errno::EAGAIN) | Err(nix::errno::Errno::EINTR) => continue,
                Err(_) => return false,
            }
        }

        true
    }

    fn handle_accept(&mut self, lfd: RawFd) -> bool {
        match accept(lfd) {
            Ok(fd) => {
                match getpeername::<SockaddrStorage>(fd) {
                    Ok(addr) => println!(
                        "クライアント {} から接続要求を受け取ったので、これを許可。このクライアントと接続しているソケットのファイルディスクリプタは {} 。",
                        addr, fd
                    ),
                    Err(e) => println!("fd: {}, addr取得失敗: {}", fd, e),
                }
                if self.add_fd(fd).is_err() {
                    let _ = close(fd);
                }
                true
            }
            Err(nix::errno::Errno::EAGAIN) | Err(nix::errno::Errno::EINTR) => true,
            Err(_) => false,
        }
    }

    fn run(&mut self, lfd: RawFd) -> nix::Result<()> {
        let mut events = vec![EpollEvent::empty(); MAX_EVENTS];

        loop {
            let num = self.epoll.wait(&mut events, EpollTimeout::NONE)?;

            for event in &events[..num] {
                let fd = event.data() as RawFd;

                let success = if fd == lfd {
                    self.handle_accept(lfd)
                } else {
                    self.handle_echo(fd)
                };

                if !success && fd != lfd {
                    self.remove_fd(fd);
                }
            }
        }
    }
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:9090")?;
    let lfd = listener.as_raw_fd();

    println!("epollエコーサーバー起動: 0.0.0.0:9090");

    let mut event_loop = EventLoop::new()?;
    event_loop.add_fd(lfd)?;
    event_loop.run(lfd)?;

    Ok(())
}

nixクレートを使っているので、Cargo.tomldependenciesを以下のように記述しておく。

[dependencies]
nix = { version = "0.29", features = ["net", "event"] }

epoll を使ったイベントループによって I/O 多重化を実現しているが、その詳細についてはこの記事では扱わない。
epoll の使い方や挙動、イベントループの仕組みは、以下の記事を参照のこと。

numb86-tech.hatenablog.com

このサーバを起動し、先程と同様に、処理速度が遅いクライアントがサーバに接続している最中に他のクライアントから接続してみる。

処理速度が遅いクライアントの対応中でも、他のクライアントの対応ができている。

反復サーバではクライアントをひとつずつ順番に処理していたが、このサーバでは同時に複数のクライアントと接続することができる。接続要求がある度にそれを許可し、そのクライアント(正確には、クライアントと接続しているソケット、を指し示すファイルディスクリプタ)を epoll による監視対象に追加していく。
そして監視対象のクライアントからデータが届く度に、それを読み取りそれと同じ内容をクライアントに返す、という処理を行っている。データが到着したときにのみ処理を行うことで、データ送信が遅いクライアントの対応によって他のクライアントの対応が遅れる、という事態が発生することを防いでいる。

これについても、サーバの出力を同時に見ると分かりやすい。

まず、処理速度が遅いクライアント127.0.0.1:39498の接続要求を許可し、このクライアントとやり取りをするためのソケットを作成している。そのソケットのファイルディスクリプタが5
このクライアント(ファイルディスクリプタ5)は 1 秒に 1 バイトずつデータを送信してくるので、その都度、同じ内容のデータをクライアントに送信している。
その途中で他のクライアント127.0.0.1:39502が接続要求をしてきたので、先ほどと同様にソケットを作成。ファイルディスクリプタは6
これで、このサーバには 2 つのクライアントが接続されている状態になった。そして引き続き、クライアントからデータが到着する度に処理を行う。
このように I/O が多重化されていることで、後続のクライアントが待たされずに済むのである。

参考資料