rayonの真価は分割統治にアリ

κeenです。やや釣りっぽいタイトルですがRustのデータ並列ライブラリのrayonについて。イテレータを並列に計算できるだけでなくjoinで自分で並列処理を書くこともできるんだよという記事です。

rayonとpar_iter

rayonはRustのデータ並列ライブラリです。あの化学繊維のレーヨンです。Thread(糸)にちなんだ名前なんですかね。

rayonで一番良く使われるのはpar_iterでしょう。このように使えます。

use rayon::prelude::*;
let vec = vec![1, 2, 3, 4, 5, 6];
// _par_iter でデータ並列計算
let max = vec.par_iter().max();
assert_eq!(max, Some(6));

このようにほぼRustのイテレータを置き換える形で使えます。

その反面、イテレータらしい処理でないとpar_iterは使えません。 たとえば次のminとmaxを同時に求めるアルゴリズムはどうでしょう。 要素を同時に2つ取って、大小の区別をつけてからmin、maxと比較するのでminとmaxを個別に求めるのに比べて比較回数が3/4くらいになります。 しかし同時に2つ取るのでidiomaticなイテレータからは外れてしまいます。

fn main() {
    let vec = vec![1, 2, 3, 4, 5, 6];
    let (min, max) = min_max(&vec);
    assert_eq!(min, Some(&1));
    assert_eq!(max, Some(&6));
}

fn min_max<T: Ord>(v: &[T]) -> (Option<&T>, Option<&T>) {
    if v.is_empty() {
        return (None, None);
    } else {
        let (min, max) = min_max_(v);
        (Some(min), Some(max))
    }
}

fn min_max_<T: Ord>(v: &[T]) -> (&T, &T) {
    use std::cmp;
    debug_assert!(0 < v.len());
    let mut iter = v.iter();
    let mut min;
    let mut max;
    if v.len() % 2 == 0 {
        min = iter.next().unwrap();
        max = iter.next().unwrap();
    } else {
        min = iter.next().unwrap();
        max = min;
    }
    while let Some(a) = iter.next() {
        let b = iter.next().unwrap();
        let (small, large) = if a < b { (a, b) } else { (a, b) };
        min = cmp::min(min, small);
        max = cmp::max(max, large);
    }
    (min, max)
}

このアルゴリズムはrayonのイテレータでは記述出来ないでしょう。 こういうときにデータ並列操作を自分で書けるのがjoinです。

join

par_iterが高レベルAPIなのに対してjoinはカスタムジョブを書くためのAPIとされています。以下のような型シグネチャを持つ関数です。

pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) where
    A: FnOnce() -> RA + Send,
    B: FnOnce() -> RB + Send,
    RA: Send,
    RB: Send,

少しややこしいですが、普通のスレッドのspawnのように新たなタスクを始める関数です。ただし引数に処理関数を2つ取ります。このシグネチャから「タスクを半分に分割しなさい。そうすればそれぞれを並列に解くことが出来ます。」というメッセージが伝わりますね。

今回のmin_maxの例でいくとまずスライスを半分に分割し、それぞれでmin_maxを求めたあとでmin同士、max同士を比べるとよさそうです。こうなるでしょうか。

fn main() {
    let vec = vec![1, 2, 3, 4, 5, 6];
    let (min, max) = min_max_rayon(&vec);
    assert_eq!(min, Some(&1));
    assert_eq!(max, Some(&6));
}

fn min_max_rayon<T: Ord + Send + Sync>(v: &[T]) -> (Option<&T>, Option<&T>) {
    match v.len() {
        0 => (None, None),
        1 => (Some(&v[0]), Some(&v[0])),
        _ => {
            let (min, max) = min_max_rayon_(v);
            (Some(min), Some(max))
        }
    }
}
fn min_max_rayon_<T: Ord + Send + Sync>(v: &[T]) -> (&T, &T) {
    use std::cmp;
    debug_assert!(1 < v.len());

    match v.len() {
        2 => {
            let a = &v[0];
            let b = &v[1];
            if a < b {
                (a, b)
            } else {
                (b, a)
            }
        }
        _ => {
            fn doit<T: Ord + Send + Sync>(v: &[T]) -> (&T, &T) {
                let mid = match v.len() % 4 {
                    0 => v.len() / 2,
                    2 => v.len() / 2 + 2,
                    _ => unreachable!(),
                };
                let ((min1, max1), (min2, max2)) =
                    // ここで `rayon::join` を用いている
                    rayon::join(|| min_max_rayon_(&v[..mid]), || min_max_rayon_(&v[mid..]));
                (cmp::min(min1, min2), cmp::max(max1, max2))
            }
            if v.len() % 2 == 1 {
                let t = &v[0];
                let v = &v[1..];
                let (min, max) = doit(v);
                (cmp::min(t, min), cmp::max(t, max))
            } else {
                doit(v)
            }
        }
    }
}

コメントに書きましたが rayon::join(|| min_max_rayon_(&v[..mid]), || min_max_rayon_(&v[mid..])); が今回の核心です。ほとんど並列を意識させずにコードを書けていますね。 事実 (min_max_rayon_(&v[..mid]), min_max_rayon_(&v[mid..])) のようにただのタプルにしても結果は変わりません。

joinと再帰とWork Stealing

rayon::join(|| min_max_rayon_(&v[..mid]), || min_max_rayon_(&v[mid..]));をよく見ると再帰呼び出ししています。すると再帰先でもまたjoinを呼び、今回の基底ケースはv.len() == 2なので 100万要素のスライスに対して50万個のタスクができることになります。 素直に50万個のスレッドを作るわけにもいきませんしシンプルなジョブキューを作っても50万回の排他ロックはとてつもなく遅いでしょう

// 1タスクスレッドは重すぎる
[Thread][Thread][Thread][Thread][Thread]
[Thread][Thread][Thread][Thread][Thread]
[Thread][Thread][Thread][Thread][Thread]
[Thread][Thread][Thread][Thread][Thread]
....
// キューでもロックが遅い

                 |---|',
                 |   |  |
                 |---|  |
                 | . |  50
                 | . |  |
                 |---|  |
                 |   |,'
                 +---+
                 >lock<
                   .
               .   .   .
            .    . .  .   .
         .     .   .   .     .
      .      .     .     .      .
   .       .       .       .       .
[Thread][Thread][Thread][Thread][Thread]

rayonが採用しているのはWork Stealingです。他の言語でも使われているのでご存知の方も多いでしょう。 私は元論文は読んだことはないのですが興味のある方は目を通してみて下さい。ここではスピリチュアルに説明します。 まず、各スレッドが各自のDequeを持っています。

 |   |   |   |   |   |   |   |
 +---+   +---+   +---+   +---+
 |   |   |   |   |   |   |   |
 +---+   +---+   +---+   +---+
 |   |   |   |   |   |   |   |
 +---+   +---+   +---+   +---+
   |       |       |       |
[Thread][Thread][Thread][Thread]

このうちの1つのスレッドの動きに着目しましょう。 最初は大きなJobがいます。

 +---+
 |100|
 +---+
   |
[Thread]

このタスクを実行すると先程のmin_maxのようにjoinを使うと半分に分割されますね。 それは1つは即座に実行、もう一つはDequeにpush_frontされます。

 +---+
 |50 |
 +---+
   |
 +---+
 |50 |
 +---+
[Thread]

これを何度か繰り返します。再帰しているので小さなタスクが手前にきます。

 +---+
 |50 |
 +---+
 |26 |
 +---+
 |12 |
 +---+
 |6  |
 +---+
   |
 +---+
 |6  |
 +---+
[Thread]

ここまでは全て自分のスレッド内での作業なのでロックは必要ありません。 ここで何か他のスレッドが全てのタスクを終えて空いているとしましょう

 +---+
 |50 |
 +---+
 |26 |
 +---+
 |12 |
 +---+
 |6  |
 +---+
   |
 +---+     |   |
 |6  |     +---+
 +---+       |
[Thread1] [Thread2]

するとThread2はThread1からJobを奪います。奪うのは一番後ろ、50です。これは他のたとえばThread3も同じように奪いに来る可能性もあるのでロックを取ります。

 >lock< -----+
 +---+       |
 |26 |       |
 +---+       |
 |12 |       |
 +---+       |
 |6  |       |
 +---+       v
   |       +---+
 +---+     |50 |
 |6  |     +---+
 +---+       |
[Thread1] [Thread2]

これでロックを出来る限り少なく、一番大きなタスクを他のスレッドに渡すことができました。 このようにrayonはrayon::joinと再帰呼出しに向いたスケジューリングを採用しています。

joinscope

rayonにはカスタムjobのためのAPIとしてjoinの他に scopeも用意されています。こちらは2つとは限らずに好き勝手タスクをspawnできるので自由度が高いです。なんならscopeを用いてjoinを実装することも可能です。

しかし可能な限りjoinの方を使えとあります。joinの方がWork Stealingに合わせたAPIなのでこっちの方が内部実装が高速だとのこと。 上記の通り恐らくrayonはWork Stealingを前提に作られているのでjoinが本体でscopeは副産物なのでしょう。

ベンチマーク

さて、並列というものはby definitionで速くないといけません。ベンチマークをとりましょう。

nightlyの機能のベンチマークを使います。また、適当な配列を用意するのにrandを使い、このような関数で実装します。

extern crate rand;
fn random_vec(n: usize) -> Vec<i32> {
    let mut rng = rand::thread_rng();
    (0..n).into_iter().map(|_| rng.gen::<i32>()).collect()
}

これを用いて以下のようにベンチマークをとります。実行するのは4コア8スレッドのIntel(R) Core(TM) i7-4910MQ CPU @ 2.90GHzです。

#[bench]
fn bench_min_max(b: &mut Bencher) {
    let v = random_vec(1024 * 1024 * 32);
    b.iter(|| min_max(&v))
}

#[bench]
fn bench_min_max_rayon(b: &mut Bencher) {
    let v = random_vec(1024 * 1024 * 32);
    b.iter(|| min_max_rayon(&v))
}

結果は

$ cargo +nightly bench
test bench_min_max           ... bench:   2,001,679 ns/iter (+/- 236,508)
test bench_min_max_rayon     ... bench:   5,608,823 ns/iter (+/- 1,225,619)

なんとrayonを使ったほうが遅いです。

詳細なグラフはこちら。

これは第一にrayon化するだけでどうやらオーバーヘッドが乗るから。もう一つに、いくらwork stealingとはいえ毎度jobを作っているとバカにならないコストが発生するからです。 なのである程度のサイズ以下になったらmin_max_rayonにスイッチするとよいでしょう。

min_max_rayon_

    match v.len() {
        2 => {
            let a = &v[0];
            let b = &v[1];
            if a < b {
                (a, b)
            } else {
                (b, a)
            }
        }
        _ => {

の部分を

    if v.len() <= 8192 {
        min_max_(v)
    } else {

に書換えます。この8192というのは私の手元のベンチマークに合わせてチューニングした結果です。 どうしてもrayonに載せるだけでそれなりにオーバーヘッドがかかるので並列化がペイするためにはそれなりに大きくないといけなくなります。

こうするとぐっと速くなって

test bench_min_max           ... bench:   1,982,283 ns/iter (+/- 124,381)
test bench_min_max_rayon     ... bench:     412,036 ns/iter (+/- 89,327)

と大体(/ 1982283.0 412036.0) = 4.9倍速くなってますね。

join と分割統治

joinの素晴らしい点は思考のフレームワークにもなっている点です。 小さな問題に分割して解いて、その解を組み合わせて全体の問題を解く。分割統治って言うらしいです。 クイックソートやマージソートがよく知られる例ですね。私はアルゴリズムに詳しいわけでは無いので細かな点は近日中にネットに出没するらしい詳しい記事に譲るとします。

この例の他にはたとえば最大上位者数問題、つまり全ての要素の中での「自身より右側にあって自身より大きい要素の数(上位者数)」の最大値を求める問題は素朴にはこう書けます。

fn msc<T: PartialOrd + Send + Sync>(v: &[T]) -> usize {
    v.iter()
        .enumerate()
        .map(|(n, t)| scount(t, &v[n..]))
        .max()
        .unwrap()
}

fn scount<T: PartialOrd + Send + Sync>(t: &T, v: &[T]) -> usize {
    v.iter().filter(|s| t < s).count()
}

これは $O(n^2)$ の計算量を持ちますがこれをうまく分割統治統治すると $O(n \log n)$ で求められます。

fn msc_divide<T: PartialOrd + Send + Sync>(v: &[T]) -> usize {
    table(v).into_iter().map(|(_, n)| n).max().unwrap()
}

fn table<T: PartialOrd + Send + Sync>(v: &[T]) -> Vec<(&T, usize)> {
    if v.len() == 1 {
        vec![(&v[0], 0)]
    } else {
        let mid = v.len() / 2;
        let xs = &v[0..mid];
        let ys = &v[mid..];
        let xs = table(xs);
        let ys = table(ys);
        join(&xs, &ys)
    }
}
fn join<'a, 'v, T: PartialOrd + Send + Sync>(
    mut xs: &'a [(&'v T, usize)],
    mut ys: &'a [(&'v T, usize)],
) -> Vec<(&'v T, usize)> {
    let mut v = Vec::new();
    loop {
        if ys.is_empty() {
            v.extend(xs);
            return v;
        } else if xs.is_empty() {
            v.extend(ys);
            return v;
        } else {
            let xt = &xs[0];
            let yt = &ys[0];
            if xt.0 < yt.0 {
                v.push((xt.0, xt.1 + ys.len()));
                xs = &xs[1..];
            } else {
                v.push(yt.clone());
                ys = &ys[1..];
            }
        }
    }
}

このコードはほぼストレートにtableをrayon化できるのは見て取れるでしょう。 良いアルゴリズムと良い並列化が両立するのは素晴らしいですね。

まとめ

  • rayonにはpar_iter以外にもjoinがあるよ
  • joinを使うといろんなアルゴリズムを並列化出来るよ
    • ただし無闇に並列化すると遅くなるから注意
  • joinと分割統治は相性がいいよ

ノート

  • このブログの大半の内容はrayonの作者による解説でカバーされています。
  • min_max の例はアルゴリズムイントロダクションから採りました
  • 最大上位者数問題は関数プログラミング 珠玉のアルゴリズムデザインから採りました
  • 今回用いたコードはこちらにあります。
  • 以下にチューニングの結果を示します。閾値を小さくするとオーバーヘッドが無視出来ず、大きくすると並列性能が出ないことが見て取れると思います。 データが多くて見づらいですがラベルをクリックするとデータ毎に表示/非表示を切り替えられるので色々比べてみて下さい。

余談

因みにですが個別にminmaxで求めたものと比べると

const N: usize = 1024 * 1024;

#[bench]
fn bench_min_and_max(b: &mut Bencher) {
    let v = random_vec(N);
    b.iter(|| (v.iter().min(), v.iter().max()))
}

#[bench]
fn bench_min_max(b: &mut Bencher) {
    let v = random_vec(N);
    b.iter(|| min_max(&v))
}

#[bench]
fn bench_min_and_max_rayon(b: &mut Bencher) {
    use rayon::prelude::*;
    let v = random_vec(N);
    b.iter(|| (v.par_iter().min(), v.par_iter().max()))
}

#[bench]
fn bench_min_max_rayon(b: &mut Bencher) {
    let v = random_vec(N);
    b.iter(|| min_max_rayon(&v))
}

なんと全体的にはminとmaxを個別に求めた方が速い結果になりました。

test bench_min_and_max       ... bench:     593,406 ns/iter (+/- 21,954)
test bench_min_and_max_rayon ... bench:     416,664 ns/iter (+/- 68,554)
test bench_min_max           ... bench:   1,982,283 ns/iter (+/- 124,381)
test bench_min_max_rayon     ... bench:     412,036 ns/iter (+/- 89,327)

min_max_rayonは辛勝…。

色々試すとどうやらループの中でifを使ってるのが遅いらしく、ループ内のifを取り除いたら、つまり比較演算の削減を諦めてmin, maxを個別に求めるのと変わらないループを書くと速くなりました。不思議…。 別にイテレータのminmaxがSIMDを生成していたとかではなくて、単に生成されるコードの品質の問題でした。

Written by κeen