Reactive ExtensionsとPublishによるシーケンスの分配

Twitter上で見たお題、10個コンソールから整数の入力を受けて最少と最大を出す。Linqで書くならこうでしょうか。通常は0-10でfor文を回すところはRangeで。あとはMinとMax。int.Parseなので数字以外は例外飛んでしまいますが、その辺は無視で。

var input = Enumerable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()));

var max = input.Max();
var min = input.Min();

はい。これだと20回入力させることになってしまいます。Maxで列挙され、Minでも列挙され。ダメですね。というわけで、配列に一度保存しますか。

var input = Enumerable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .ToArray();

var max = input.Max();
var min = input.Min();

よろしい。しかしこれの列挙回数は3回です。ToArrayで一度、Maxで一度、Minで一度。しかもMaxとMinを出したいだけなのに、わざわざ配列に保存してしまっています。もし配列が巨大になる場合はメモリ的な意味でもちょっと嫌ですねえ。さて、そこで出番なのが、プッシュ型シーケンスを持つReactive Extensions。プッシュ型ならば分配が可能です。やってみましょう?

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs => xs.Min().Zip(xs.Max(), (min, max) => new { min, max }))
    .Subscribe(Console.WriteLine);

ふむ、これはクールだ……。Rxの格好良さは異常。

Publishは今までにも何度も出してきましたが、シーケンスを分配するメソッドです。今までは引数なしにして、IConnectableObservableを変数に受けて、それに対してポコポコ足して、最後にConnect。としていましたが、Publishには引数有り版も用意されています。引数有りの場合は分配されたIObservableが変数として渡ってくるわけです。つまり上のコードは

var input = Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish();

input.Min().Zip(input.Max(), (min, max) => new { min, max })
    .Subscribe(Console.WriteLine);

input.Connect();

と等しいということになります。ちなみにPublish内でSubscribeしたって構わないわけなので

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs =>
    {
        xs.Min().Subscribe(x => Console.WriteLine("min:" + x));
        xs.Max().Subscribe(x => Console.WriteLine("max:" + x));
        xs.Sum().Subscribe(x => Console.WriteLine("sum:" + x));
        xs.Average().Subscribe(x => Console.WriteLine("avg:" + x));
        return xs;
    })
    .Subscribe();

といったような書き方も可能ですね。では、min,max,sum,avgと4つを返したい場合はどうしよう。Join-And-Thenを使います。

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs => Observable.Join(xs
        .Min().And(xs.Max()).And(xs.Sum()).And(xs.Average())
        .Then((min, max, sum, avg) => new { min, max, sum, avg })))
    .Subscribe(Console.WriteLine);

この例だと単純に直列に繋いでいるだけなので、Zipと同じようにAndで繋いでThen。というだけですね。ちょいとゴチャッとしていますが。Joinの強力なところはAnd-Thenで作るPlan(Thenの戻り値)を複数受けられるという点にあります。これにより複雑な結合を生成する事ができます。ZipとMergeが混ざったような、でももう少し複雑な何か。といった雰囲気。

Publish

Publishはオーバーロードが8つもあって最初戸惑ってしまいますが、分類としては二つに分けられます(そしてそれぞれFuncを受け取る/スケジューラを受け取るの組み合わせで4つに分かれて2x4=8のオーバーロード)。一つは引数として初期値を受け取らないもの。もう一つは受け取るもの。今までは全部、受け取らないものを見てきました。内部的にはSubjectを使って分配しています。一方、初期値を受け取るものは内部的にBehaviorSubjectを使って分配します。というわけで、どんな挙動を取るのかよくわからない、という場合はBehaviorSubjectを試してみると良いでしょう!(何だこの解説する気ゼロな投げっぱなしな説明は)

ところでな余談

注意しないといけないのは、ふつーのシーケンスがあって、それをRxで分配やるとはっきし言って遅いです。列挙数が多くなると、ただ配列舐めるのに比べて数万倍のパフォーマンス差が出たりします。ほとんどのシチュエーションでRxで分配するぐらいなら二度三度舐めたほうが速いです。まあ、分配に限らずRxが普通に遅いということなのですがー。ご利用は計画的に。今回の例のConsole.ReadLineで、というのは、当然Rxのほうが速いですが。入力とリアルタイムに応答していますからね。Reactive!ようするところ、Rxが活きるのってそういうところ、かしらん。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive