Rust から Intel MKL を使う

諸事情で、バッチのサイズ分だけ sgemm (単精度一般行列積) を呼ぶ必要があった。 最初は OpenBLAS の sgemm を FFI で呼び出すというのをバッチサイズ回やってみたりしてて、効率よくなさそうだなーと思ってたところ、こんなのを見つけた:

software.intel.com

つまり、この cblas_sgemm_batch を使うためには OpenBLAS から MKL に移行する必要がある。 しかし Rust には blasバインディング (-sys) は充実しているが MKL のものはない、が、辛うじてバインディングを作るための -src クレートは見つかった:

github.com

これを使って MKL の C-API を Rust から呼び出す。手順:

extern crate libc;
extern crate intel_mkl_src;

// libc クレートとかを使って頑張って cblas_sgemm_batch を extern "C" する

#[repr(C)]
enum CblasTranspose {
    CblasNoTrans = 111,
    CblasTrans = 112,
    CblasConjTrans = 113,
}

#[repr(C)]
enum CblasLayout {
    CblasRowMajor=101, 
    CblasColMajor=102
}

type MklInt = i64;

extern "C" {
    fn cblas_sgemm_batch(
        layout: CblasLayout,
        transa_array: *const CblasTranspose,
        transb_array: *const CblasTranspose,
        m_array: *const MklInt,
        n_array: *const MklInt,
        k_array: *const MklInt, 
        alpha_array: *const libc::c_float,
        a_array: *const *const libc::c_float,
        lda_array: *const MklInt,
        b_array: *const *const libc::c_float,
        ldb_array: *const MklInt,
        beta_array: *const libc::c_float,
        c_array: *mut *mut libc::c_float,
        ldc_array: *const MklInt,
        group_count: MklInt,
        group_size: *const MklInt,
    );
}

これで、cblas_sgemm_batch(.. が呼び出せる。 OpenBLAS は大抵リンク時に fortran コンパイラを要求してきて面倒なので、MKL で済むならその方がいいと思った。

Tokio と Future のチュートリアルとかのまとめ+α

tokio, futures についてだらだら調べたときのメモを放置してたので, なんとか読めるレベルに整理した. Tokioバージョンは 0.1.6 とする. TokioDesigning futures for Rust · Aaron Turon はFutureの実装の仕方などは丁寧に書かれているとは思うが, ランタイムを含めた処理の流れなどの説明があまりされてない気がしたので, 気になってそれに関連する内部実装も少し読んだ. Tokio 単体のコード自体はそんなに巨大ではないが, Tokio がべったり依存している futures もあるし, おまけに各モジュールに同名・類似名の構造体, 関数, マクロが多数存在し, しかもそれらが同時に使われたりもしてて意外と大変だった.

結局 Tokio とはなんなのか

futures::{Future, Stream} で実装された非同期かつゼロコストなグリーンスレッドを使ってネットワークプログラミングするためのフレームワークであり, 最近マルチスレッドに対応した. その前から futures-cpupool を使って自前でマルチスレッドなタスクキューを実装することはできたが、tokio::runtime で使われてる tokio-threadpool の方が10倍速いらしい. グリーンスレッド自体は Rust のコアに昔はあったらしいがもう取り除かれてて, 今は futures と tokio をみんな使ってるらしい. tokio 自体はOSからのイベントの感知のために mio (epoll) を内部で使っており, また, hyper とか actix みたいな web のライブラリは大体 tokio の上に乗っかっているっぽいので, 多分勉強する価値がある. 多分 goroutine とかと同じ土俵にいると思う (go書いたことないので怪しい).

既存の非同期イベント処理モデルの問題点

冒頭にリンクした記事からの引用である. 例えばfの完了時にコールバックf_done, 同様にg完了時にg_done, さらにf_donef_doneの両方が呼ばれたらboth_doneを実行したいとする. 普通の future ランタイムとかの場合, fg が別スレッドで実行される可能性があり, 最後に完了した方だけにboth_doneを呼ぶ責任があるため, both_done の実行は synchronized にする必要がある. これはf_doneg_doneで共有される状態を管理することに相当し, ヒープアロケーションを要する (RustだとArcを使う) ため遅い. し, 手で実装する場合は面倒.

また, single-threaded なイベントループを作ってそこにワーカースレッドが イベント(コールバック)を投げていき, メインスレッドがイベントループのキューを処理するというモデルもある. しかしそれはそれでさっき 書いたように各コールバックを抽象型 (要ヒープアロケーション) に統一しないと単一のイベントループでは扱えないという問題がある (node.jsとかAndroidとか?).

ゼロコストfuture?

Future はブロックするかもしれない計算の単位で, 遅延評価される. 複数の Future をメソッドチェーンで繋げたものもまた Future であり, その末端 (leaf)である Future<Item=(), Error=()> は Task と呼ばれる. Task はランタイムがあれば非同期なグリーンスレッドとして振る舞える. 最初は Future をDAG的にどんどんつなげて Task を1つ作り, それを tokio::run で Executor に投げればいい感じに回してくれるものかと思ったが若干違って, ある Task の実行中にも内部で新しい Task が作られ得る. 例えばあるFuture f の評価結果をキャプチャした新しいFuture g がその内部で作られ, tokio::spawn(g) すれば別スレッドで g.poll() するように Executor にお願いできる. ここで, おおよそ以下の3項目がゼロコストを実現する鍵らしい(だいたい合ってると思うが自信がない):

  • Future の値の計算を polling ベースにし, 関連型を使って実装すれば各 Future のメソッド呼び出しに仮想関数テーブルがいらない (https://tokio.rs/docs/getting-started/futures/に詳しい)
  • tokio::runtokio::spawnFuture<Item=(), Error=()> に unify された型しか受け取らないようになっている (つまりBoxing不要) ため, それをもとに内部で作られる Task オブジェクトのメソッド呼び出しに仮想関数テーブルがいらない.
  • Task はランタイムにとっては外部から与えられたものなのでライフタイムに関して心配する必要がない.

3つ目が誤解してる気もするが, ご多分に漏れず, とにかく静的ディスパッチにしてスタック上で処理するということらしい. 少なくとも従来のモデルが抱える問題点は克服しており, 実際速いらしい.

あと, Tokio はプリエンプション (ランタイムによる割り込み) をサポートしない & Task は基本的に1つのスレッド上で実行されるため, 1 Taskを小さくするのもポイントである. そうしないとそのTaskによるスレッド/CPUコアの占有時間が増えて回転率が落ちる.

素人なのでどういう処理単位を Task とすればいいかの決定が難しくね?と思ったが, ひとまずソケット待ちみたいな OS がノンブロッキングをサポートしてる処理を Future としとけば良さそう. 基本的には tokio::net に実装されてるソケットの listen, accept をする Future から map とか flatmap とかして発展させていく感じだと思う.

Future + Tokio runtime の実行時の流れ

本題. 以下の2項目を前提とする

  • Reactor (epoll インスタンスを内包するイベントループで, バックグラウンドで回っている)に関してはあらゆるワーカースレッドがそのハンドルを持っているが, Reactor の実体は1つとする. 実際は1つじゃないこともあるらしいが理解のためそうする.
  • Task は1つのスレッド上で実行され, Taskインスタンスはそのスレッドにローカルな変数に保存されるため子Future から見える (futures::Task::current で取得できる)

まず tokio::run(task) を実行すると Runtime が task.poll() し, task からどんどん Future::poll で子Futureを辿っていく. すると準備ができていない TcpListener などがブロックせずに NotReady を即座に返そうとするが, 返す前に, 自分が属している Task を futures::Task::current で取得し, それを Reactor の io_dispatch というルックアップテーブルに登録する. 登録時のキーは TcpListener 事前に付与されているトークン (正確には TcpListener の実体である mio::Evented オブジェクト, もっというとソケットディスクリプタに付与されているトークン)である:

// tokio/tokio-reactor/src/lib.rs

/// Registers interest in the I/O resource associated with `token`.
/// `t` はブロックした I/O resource が属する Task.
fn register(&self, token: usize, dir: Direction, t: Task) {
    debug!("scheduling direction for: {}", token);
    let io_dispatch = self.io_dispatch.read().unwrap();

    // io_dispatch (Slabというルックアップテーブル) から token に対応する ScheduledIO オブジェクトを取得.
    // これは前もって insert されてるので unwrap してもパニックしないぽい
    let sched = io_dispatch.get(token).unwrap();

    let (task, ready) = match dir {
        Direction::Read => (&sched.reader, !mio::Ready::writable()),
        Direction::Write => (&sched.writer, mio::Ready::writable()),
    };

    // それに t をセット
    task.register_task(t);

    if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
        task.notify();
    }
}

そして改めて return NotReady し, そのTask を強制終了させる. ポイントは return なのでブロックしないし, yield でないので制御はもう戻ってこないことである. ちなみにこのTask登録の流れは Tokio が内部でやってることなので実装者は Future::poll 内で return NotReady だけしとけばいい. もっというと TcpListener の accept くらいならtokio::net::Incomingとかが上手いことやってくれるのでそれすらしなくていい.

しばらくして, OS (epoll) からさっきブロックした TcpListener が NotReady => Ready になった旨の通知が Reactor に来る. この通知にはさっきの TcpListener の Token が含まれているため, io_dispatch[token] により対応する Task が取り出せる. そして Task::notify によってその Task をスレッドプールに submit し, 誰か知らないワーカーがre-pollする. re-poll というのは再びその Task を頭から子futureまで poll して回るということである.

このルーチンをひたすら繰り返す. Task はtokio::run(foo) で1つ生成されるが, Task 実行時に内部で tokio::spawn(bar) することでも生成されるため, 結果的にはかなりの数の Task がRuntime内に存在することになり, スケジューラは忙しく働き続けないといけない. このときポイントなのが, イベントループはいろんな種類のイベントを扱う必要があるが, Task が具象型なのでメモリをアロケートしなくていいということである.

ちなみに, return NotReady 時に Task を止めて後で通知されるようにすることは "park", 通知を受けて Executor にタスクスケジューリングを依頼することは "unpark" と呼ばれている.

スレッド間の通信

違うTask (Future) にデータを渡したい場合, 古典的には Arc<Mutex<T>> みたいなスレッド間の共有オブジェクトを作って 各々それに対して値を書いたり読んだりすればよさそうだが, まあ辛そうだし, それがブロックする処理だと困る. ブロックを回避するために読み書き処理を Future にしてしまうこともできるが, 例えば系列を舐めるループ内とかでそれをやってしまうと return NotReady で簡単にループを抜けられてしまい困る場合がある. そういうときは futures::mpsc::channel でメッセージパッシングをすればいいらしい. 非同期に, かつ Task を抜けずに次々とデータを他スレッドに渡せる. 要は tokio runtime と future と チャネルを組み合わせて使えば go のランタイムと似たようなことが実現できるっぽい. チャネルを利用するには

let (tx, rx): (Sender, Receiver) = mpsc::channel(size);

とやって SenderReceiver 作る. Sender からメッセージを送信するには tx.send(msg) すればいい. 一方 Receiverfutures::Stream を実装しているため, キューに溜まってるメッセージを rx.poll()で取得できる. これらの送受信は非同期に行われる. 例えば チャネルのバッファに読み込むデータがない状態で rx.poll するとOk(NotReady) を返そうとするが, その前に「新しいデータが届いたら後で re-poll されるようにスケジュール」してから返す. これも Future 同様ランタイムがやってくれる.

Slab (データ構造) について

slab という小さなクレートを見つけたので、メモっておく。 コード片は全て公式のExampleと実装から引っ張ってきたもの。

slab::Slab とは

"Slab" でググるLinuxカーネルで使われているメモリアロケータの一手法として有名らしい。 Rust では slab クレートとして提供されているデータ構造であり、データ構造としてはかなり単純である。個人的に、

  • 辞書データ構造を使いたいが、キーが正整数である

場合に"O(1) な辞書"として使えると思った。が、メモリ的に少し懸案事項がありそうなので後述する。

まず最初に書いてある公式サンプルは以下:

let mut slab = Slab::new();

let hello: usize = slab.insert("hello");
let world: usize = slab.insert("world");

assert_eq!(slab[hello], "hello");
assert_eq!(slab[world], "world");

slab[world] = "earth";
assert_eq!(slab[world], "earth");

要するに insert すると insert された場所のインデックスが返ってくるベクタであり、 これだけだとメリットが良くわからない。とりあえず Slab の定義を確認した:

pub struct Slab<T> {
    entries: Vec<Entry<T>>,
    len: usize,
    // 次に `insert(item)` するときに item が位置することになる場所
    next: usize,
}

enum Entry<T> {
    Vacant(usize),
    Occupied(T),
}

さらに、Slab::remove の内部実装を確認した:

pub fn remove(&mut self, key: usize) -> T {
        // Swap the entry at the provided value
        let prev = mem::replace(
            &mut self.entries[key],
            Entry::Vacant(self.next));

        ...
        }
}

つまり、普通のベクタとは異なり、中間に位置する要素を remove しても内部的には Occupied(T)Vacant(usize) に置き換えられるだけなので データの再アラインが起こらない。また、次に Slab::insert するときにはそこが再利用されるため再アロケーションが防げる。 逆に言えば remove してもサイズが減らないため、

  • 要素の追加に対して削除が著しく多い
  • どれくらいのキャパシティを用意すればいいか予想がつかず、場合によってはかなりの量になるかもしれない

場合にはヒープが無駄に Vacant に埋め尽くされてしまう可能性があるため、使わない方が良いかもしれない。

slab::VacantEntry

また、実際に Slab::insert する前に key が欲しいというせっかちな人のために、Slab::vacant_entry() というヘルパーが用意されている。

let mut slab = Slab::new();

let hello = {
    let entry = slab.vacant_entry();
    let key = entry.key();
    // 典型的には key を挿入するエントリに含めたい場合。
    entry.insert((key, "hello"));
    key
};

assert_eq!(hello, slab[hello].0);
assert_eq!("hello", slab[hello].1);

epoll のサンプルを読んだ

epoll について勉強した。nginx や node.js (libuv) の内部でも普通に使われているらしい (知らなかった)。 具体的にはシステムコールとして提供されている epoll_create, epoll_wait, epoll_ctl について、 epoll の man (https://linuxjm.osdn.jp/html/LDP_man-pages/man7/epoll.7.html) に載っているサンプルにコメントを書いて理解した。 因みに epoll は Linux に特有であり, BSD では kqueue というものが使われているらしい。

#define MAX_EVENTS 10

// listen_sock は listen socket を参照するfdであり、socket(2) の戻り値。たくさんのリクエストを受け付ける。
// 今は省いているが、事前にポートに bind(2) しておく必要がある。
int listen_sock;

// ready になった listen_sock を accept(2) して得られるfd。
// peer からのデータの読み込みや、サーバーからのデータ書き込みは `listen_sock` でなくこれを通して行う。
int conn_sock;

// ブロックが解除された時点での、ready な fd の数であり、後の epoll_wait(2) の戻り値がバインドされる。
// 同時点での `events` のサイズと同じだと思われる。
int nfds;

// `epoll_create1` で内部的に作成された epoll インスタンスを参照する fd。
int epfd = epoll_create1(0);

// listen_sock に対応づいたイベント。
// "listen_sock が読み込み可能になる" をイベントとする。
// また、listen_sock はブロッキングにする。つまり listen_sock が読み込み ready にならない間は、epoll_wait(2) で停止するようにする。
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_sock;

// epoll_event 構造体の配列。listen_sock や conn_sock の Ready イベントがごちゃ混ぜになって入っていく。
// これは多分 epoll_wait(2) の内部だけで変更される。
struct epoll_event events[MAX_EVENTS];

// epoll_ctl は epfd が参照する epoll インスタンスをコントロールする関数。
// 今回は listen_sock を epoll の監視対象に"加える"。以降、listen_sock はずっと epoll の監視下にある
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ev)

for (;;) {
    // ここでのみ、events が破壊的に変更される。
    // 具体的には、epoll の監視下にある fd の中で ready なものすべてがキューイングされているが、
    // それが events にダンプされる。(`events` 内の順番は多分保証されてない)
    nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);

    // ready な fd を舐めるループ。
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {  // listen_sock の ready イベント。
            conn_sock = accept(listen_sock, (struct sockaddr *) &local, &addrlen);
            // conn_sock はノンブロッキングにしないと、エッジトリガーの場合に後の epoll_wait(2) がハングする可能性がある
            //  (c.f. https://linuxjm.osdn.jp/html/LDP_man-pages/man7/epoll.7.html)。
            setnonblocking(conn_sock);
            // 読み込み可能をイベントとする。また、エッジトリガーにすることで、conn_sock が "ready になった瞬間" にのみ、
            // 後の epoll_wait(2) が解除される。
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            // conn_sock を監視対象に加える
            epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev)
        } else {
            // do_use_fd() はコネクト済み、かつ読み込み ready になった fd を実際に使う関数。しかし、注意が必要。
            // 例えば fd から read(2) するとき、read(2) が EAGAIN (もうソケットのバッファが空で、次もう一回呼ぶとブロックするぞというエラー値) 
            // を返すまでは繰り返し read(2) すべき。つまり、イベントは確実に消化すべき。そうしないと、peer は自身がすでに送りつけた
            // データ (ソケットバッファに残っているものを含む) に基いて応答を期待している場合に、次の epoll_wait(2) でハングする。
            do_use_fd(events[n].data.fd);
        }
    }
}