Tokio と Future のチュートリアルとかのまとめ+α

tokio, futures についてだらだら調べたときのメモを放置してたので, なんとか読めるレベルに整理した. Tokioバージョンは 0.1.6 とする. TokioDesigning futures for Rust · Aaron Turon はFutureの実装の仕方などは丁寧に書かれているとは思うが, ランタイムを含めた処理の流れなどの説明があまりされてない気がしたので, 気になってそれに関連する内部実装も少し読んだ. Tokio 単体のコード自体はそんなに巨大ではないが, Tokio がべったり依存している futures もあるし, おまけに各モジュールに同名・類似名の構造体, 関数, マクロが多数存在し, しかもそれらが同時に使われたりもしてて意外と大変だった.

結局 Tokio とはなんなのか

futures::{Future, Stream} で実装された非同期かつゼロコストなグリーンスレッドを使ってネットワークプログラミングするためのフレームワークであり, 最近マルチスレッドに対応した. その前から futures-cpupool を使って自前でマルチスレッドなタスクキューを実装することはできたが、tokio::runtime で使われてる tokio-threadpool の方が10倍速いらしい. グリーンスレッド自体は Rust のコアに昔はあったらしいがもう取り除かれてて, 今は futures と tokio をみんな使ってるらしい. tokio 自体はOSからのイベントの感知のために mio (epoll) を内部で使っており, また, hyper とか actix みたいな web のライブラリは大体 tokio の上に乗っかっているっぽいので, 多分勉強する価値がある. 多分 goroutine とかと同じ土俵にいると思う (go書いたことないので怪しい).

既存の非同期イベント処理モデルの問題点

冒頭にリンクした記事からの引用である. 例えばfの完了時にコールバックf_done, 同様にg完了時にg_done, さらにf_donef_doneの両方が呼ばれたらboth_doneを実行したいとする. 普通の future ランタイムとかの場合, fg が別スレッドで実行される可能性があり, 最後に完了した方だけにboth_doneを呼ぶ責任があるため, both_done の実行は synchronized にする必要がある. これはf_doneg_doneで共有される状態を管理することに相当し, ヒープアロケーションを要する (RustだとArcを使う) ため遅い. し, 手で実装する場合は面倒.

また, single-threaded なイベントループを作ってそこにワーカースレッドが イベント(コールバック)を投げていき, メインスレッドがイベントループのキューを処理するというモデルもある. しかしそれはそれでさっき 書いたように各コールバックを抽象型 (要ヒープアロケーション) に統一しないと単一のイベントループでは扱えないという問題がある (node.jsとかAndroidとか?).

ゼロコストfuture?

Future はブロックするかもしれない計算の単位で, 遅延評価される. 複数の Future をメソッドチェーンで繋げたものもまた Future であり, その末端 (leaf)である Future<Item=(), Error=()> は Task と呼ばれる. Task はランタイムがあれば非同期なグリーンスレッドとして振る舞える. 最初は Future をDAG的にどんどんつなげて Task を1つ作り, それを tokio::run で Executor に投げればいい感じに回してくれるものかと思ったが若干違って, ある Task の実行中にも内部で新しい Task が作られ得る. 例えばあるFuture f の評価結果をキャプチャした新しいFuture g がその内部で作られ, tokio::spawn(g) すれば別スレッドで g.poll() するように Executor にお願いできる. ここで, おおよそ以下の3項目がゼロコストを実現する鍵らしい(だいたい合ってると思うが自信がない):

  • Future の値の計算を polling ベースにし, 関連型を使って実装すれば各 Future のメソッド呼び出しに仮想関数テーブルがいらない (https://tokio.rs/docs/getting-started/futures/に詳しい)
  • tokio::runtokio::spawnFuture<Item=(), Error=()> に unify された型しか受け取らないようになっている (つまりBoxing不要) ため, それをもとに内部で作られる Task オブジェクトのメソッド呼び出しに仮想関数テーブルがいらない.
  • Task はランタイムにとっては外部から与えられたものなのでライフタイムに関して心配する必要がない.

3つ目が誤解してる気もするが, ご多分に漏れず, とにかく静的ディスパッチにしてスタック上で処理するということらしい. 少なくとも従来のモデルが抱える問題点は克服しており, 実際速いらしい.

あと, Tokio はプリエンプション (ランタイムによる割り込み) をサポートしない & Task は基本的に1つのスレッド上で実行されるため, 1 Taskを小さくするのもポイントである. そうしないとそのTaskによるスレッド/CPUコアの占有時間が増えて回転率が落ちる.

素人なのでどういう処理単位を Task とすればいいかの決定が難しくね?と思ったが, ひとまずソケット待ちみたいな OS がノンブロッキングをサポートしてる処理を Future としとけば良さそう. 基本的には tokio::net に実装されてるソケットの listen, accept をする Future から map とか flatmap とかして発展させていく感じだと思う.

Future + Tokio runtime の実行時の流れ

本題. 以下の2項目を前提とする

  • Reactor (epoll インスタンスを内包するイベントループで, バックグラウンドで回っている)に関してはあらゆるワーカースレッドがそのハンドルを持っているが, Reactor の実体は1つとする. 実際は1つじゃないこともあるらしいが理解のためそうする.
  • Task は1つのスレッド上で実行され, Taskインスタンスはそのスレッドにローカルな変数に保存されるため子Future から見える (futures::Task::current で取得できる)

まず tokio::run(task) を実行すると Runtime が task.poll() し, task からどんどん Future::poll で子Futureを辿っていく. すると準備ができていない TcpListener などがブロックせずに NotReady を即座に返そうとするが, 返す前に, 自分が属している Task を futures::Task::current で取得し, それを Reactor の io_dispatch というルックアップテーブルに登録する. 登録時のキーは TcpListener 事前に付与されているトークン (正確には TcpListener の実体である mio::Evented オブジェクト, もっというとソケットディスクリプタに付与されているトークン)である:

// tokio/tokio-reactor/src/lib.rs

/// Registers interest in the I/O resource associated with `token`.
/// `t` はブロックした I/O resource が属する Task.
fn register(&self, token: usize, dir: Direction, t: Task) {
    debug!("scheduling direction for: {}", token);
    let io_dispatch = self.io_dispatch.read().unwrap();

    // io_dispatch (Slabというルックアップテーブル) から token に対応する ScheduledIO オブジェクトを取得.
    // これは前もって insert されてるので unwrap してもパニックしないぽい
    let sched = io_dispatch.get(token).unwrap();

    let (task, ready) = match dir {
        Direction::Read => (&sched.reader, !mio::Ready::writable()),
        Direction::Write => (&sched.writer, mio::Ready::writable()),
    };

    // それに t をセット
    task.register_task(t);

    if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
        task.notify();
    }
}

そして改めて return NotReady し, そのTask を強制終了させる. ポイントは return なのでブロックしないし, yield でないので制御はもう戻ってこないことである. ちなみにこのTask登録の流れは Tokio が内部でやってることなので実装者は Future::poll 内で return NotReady だけしとけばいい. もっというと TcpListener の accept くらいならtokio::net::Incomingとかが上手いことやってくれるのでそれすらしなくていい.

しばらくして, OS (epoll) からさっきブロックした TcpListener が NotReady => Ready になった旨の通知が Reactor に来る. この通知にはさっきの TcpListener の Token が含まれているため, io_dispatch[token] により対応する Task が取り出せる. そして Task::notify によってその Task をスレッドプールに submit し, 誰か知らないワーカーがre-pollする. re-poll というのは再びその Task を頭から子futureまで poll して回るということである.

このルーチンをひたすら繰り返す. Task はtokio::run(foo) で1つ生成されるが, Task 実行時に内部で tokio::spawn(bar) することでも生成されるため, 結果的にはかなりの数の Task がRuntime内に存在することになり, スケジューラは忙しく働き続けないといけない. このときポイントなのが, イベントループはいろんな種類のイベントを扱う必要があるが, Task が具象型なのでメモリをアロケートしなくていいということである.

ちなみに, return NotReady 時に Task を止めて後で通知されるようにすることは "park", 通知を受けて Executor にタスクスケジューリングを依頼することは "unpark" と呼ばれている.

スレッド間の通信

違うTask (Future) にデータを渡したい場合, 古典的には Arc<Mutex<T>> みたいなスレッド間の共有オブジェクトを作って 各々それに対して値を書いたり読んだりすればよさそうだが, まあ辛そうだし, それがブロックする処理だと困る. ブロックを回避するために読み書き処理を Future にしてしまうこともできるが, 例えば系列を舐めるループ内とかでそれをやってしまうと return NotReady で簡単にループを抜けられてしまい困る場合がある. そういうときは futures::mpsc::channel でメッセージパッシングをすればいいらしい. 非同期に, かつ Task を抜けずに次々とデータを他スレッドに渡せる. 要は tokio runtime と future と チャネルを組み合わせて使えば go のランタイムと似たようなことが実現できるっぽい. チャネルを利用するには

let (tx, rx): (Sender, Receiver) = mpsc::channel(size);

とやって SenderReceiver 作る. Sender からメッセージを送信するには tx.send(msg) すればいい. 一方 Receiverfutures::Stream を実装しているため, キューに溜まってるメッセージを rx.poll()で取得できる. これらの送受信は非同期に行われる. 例えば チャネルのバッファに読み込むデータがない状態で rx.poll するとOk(NotReady) を返そうとするが, その前に「新しいデータが届いたら後で re-poll されるようにスケジュール」してから返す. これも Future 同様ランタイムがやってくれる.