Reactive Extensions for .NET (Rx) メソッド探訪第7回:IEnumerable vs IObservable

物凄く期間を開けてしまいましたが、Reactive Extensions for .NET (Rx)紹介を再開していきます。もはやRxってなんだっけ?という感じなので、今回は最も基本である、IObservableについて扱います。ボケーッとしている間にIQbservable(IQueryableのデュアル)とか出てきてて置いてかれちゃってるし。

そんなこんなで、IObservableはIEnumerableのデュアルなんだよ、とか言われてもぶっちゃけさっぱり分かりません。なので、その辺のことはスルーして普通にコードで対比させながら見ていくことにします。

Microsoft Silverlight の取得

// IEnumerable (RunはForEachです、ようするに)
Enumerable.Range(1, 10)
    .Where(i => i % 2 == 0)
    .Select(i => i * 2)
    .Run(Console.WriteLine, () => Console.WriteLine("completed!"));

// IObservable
Observable.Range(1, 10)
    .Where(i => i % 2 == 0)
    .Select(i => i * 2)
    .Subscribe(Console.WriteLine, () => Console.WriteLine("completed!"));

ボタンを押して確認する、までもなく同じ結果です。1から10までを偶数だけ通して二倍して出力。見た目は同じですが、中身は丸っきり違います。見た目が一緒すぎて言葉で表現出来ないので図に表してみました。

何という下手っぴな図、さっぱり伝わらん。……。というのはおいておいて、矢印の向きに注目。IEnumerableの連鎖は、列挙を消費する時にIEnumeratorの伝搬に変わります。Run->Select->Where->RangeとMoveNextが駆け上がったら、今度はRange->Where->Select->RunとCurrentが降りていきます。末尾(Run)が値を要求(MoveNext)して値(Current)を取り出すという連鎖。末端から根元の値を引っ張ってくる(Pull)ようなイメージ。

IObservableは、根元自体が値を押し出していく(Push)ようなイメージ。こちらはIObserverの連鎖になっていて、根元からOnNextで値を伝えていきます。

Pushのメリット

Observable.Rangeのような、もしくはEunmerableに対してToObservableした時のような、普通のPull型シーケンスをPush型に変換することのメリットは?イベントや非同期など、他の形式から生成されたIObservableと連携出来る、というのは当然一番の話ですが、もう一つ、要素を分配出来るようになります。

このイミフな図の言わんとしていることが伝わる、わけはないので説明。Pull型はソースと1対1の関係である必要があるため、複数の列挙の消費者(RunだったりCountだったりSumだったりLastだったり)がいる場合、接続した回数だけ列挙が最初から回ることになります。かたやPush型は、1対多の関係を持つことが出来るため、一度の列挙で全ての消費者に値を配分することが可能です。

Hot vs Cold

同じように見えるIObservableにも、HotとColdという性質があります。それはyield returnで作る遅延評価のIEnumerableと、配列のように既に値が生成済みのIEnumerableとの違い、のようなものかもしれません。

Microsoft Silverlight の取得

var seq = Observable.Range(1, 5)
    .Do(i => Console.WriteLine("source -> " + i));

button1.Click += (sender, e) =>
    seq.Subscribe(i => Console.WriteLine("button1 -> " + i));

button2.Click += (sender, e) =>
    seq.Subscribe(i => Console.WriteLine("button2 -> " + i));

Doは、列挙に通ったものを取り出しつつも素通しします。つまり、 Select(i => { action(i); return i; }) です。今回は列挙がその箇所を通ったかどうかを書き出しています。余談ですが、IEnumerableならNyaRuRuさんの作成されたAchiralにはHookというメソッドがあって、細かい列挙中のモニタリングが出来るようになっています。

実行結果を見てみると、ボタンを押す=Subsribeを繋げると、即座に列挙が開始されていて、これだとIEnumerableのforeachと何も変わません。よって、このIObservableはColdです。もう値は生成され終わっているので。Subscribeの度に即座に全ての値をPushします。

ではHotは?

Microsoft Silverlight の取得

// FromEvent(canvas,"MouseMove")は手軽ですが、丁寧にこう書くほうが理想的かしら
Func<IObservable<Point>> GetMouseMovePosition = () =>
    Observable.FromEvent<MouseEventHandler, MouseEventArgs>(
            h => (sender, e) => h(sender, e),
            h => canvas.MouseMove += h,
            h => canvas.MouseMove -= h)
        .Select(e => e.EventArgs.GetPosition(canvas));

// ICollection<IDisposable>です。
var disposables = new CompositeDisposable();

evenButton.Click += (sender, e) =>
{
    disposables.Add(
        GetMouseMovePosition()
            .Where(p => p.X % 2 == 0 && p.Y % 2 == 0)
            .Subscribe(p => Console.WriteLine("Even -> " + p.X + ":" + p.Y)));
};

oddButton.Click += (sender, e) =>
{
    disposables.Add(
        GetMouseMovePosition()
            .Where(p => p.X % 2 != 0 && p.Y % 2 != 0)
            .Subscribe(p => Console.WriteLine("Odd -> " + p.X + ":" + p.Y)));
};

disposeButton.Click += (sender, e) =>
{
    // Disposeでイベントのデタッチ + 再登録不可
    // Clearでイベントのデタッチ + 再登録可
    disposables.Clear();
};

例えばマウスイベント。クリックの度にOnNextに値を送る、ムーブの度に値を送るといったイベントをIObservable化するFromEventはHot。無限リスト状態になっているものは、接続しただけでは値が送られてこないとも言えるので、幾つでもSubscribeすることが出来ます。サンプルでは、ボタンをクリックすればしただけ、右側のログ表示に同内容のものが連続して表示されるのが確認出来ます。

両者が混ざったような挙動をするIObservableもあります(例えばReplaySubject)ので、HotなのかColdなのか両方なのか。というのを意識してみると理解が深まるかもしれません。また、メソッドの動作確認などの際にHotとColdを区別せずにいると、思わぬ挙動で混乱するかもしれないので注意。というか、私はよくやります……。Observable.Rangeばかりで確認していてイミフ!と思ったら、FromEventでチェックしたら何て分かりやすいこと!というのが何度も。

CompositeDisposable

本題と離れますがTips。イベントのデタッチが簡単なのもRxのメリットの一つです。さて、複数イベントをデタッチする場合はどうしましょうか?List<IDisposable>に格納してforeachで列挙してDispose、というのも悪くないですが、そういう用途で使うためのCompositeDisposableというICollection<IDisposable>なクラスが用意されているので、そちらを使ったほうがよりスマートに書けます。

上のSilverlightのHotのサンプルコードでは、ボタンを押す(=Subscribeする=イベントを登録する)度にCompositeDisposableにAdd。そしてDisposeAllボタンでまとめてデタッチしています。

var subject = new Subject<int>();
var d1 = subject.Subscribe(i => Console.WriteLine(i));
var d2 = subject.Subscribe(i => Console.WriteLine(i * i));
using (new CompositeDisposable(d1, d2))
{
    subject.OnNext(2); // 2, 4
    subject.OnNext(3); // 3, 9
}
subject.OnNext(2); // usingを抜けデタッチ済みなので何も起こらない

List<IDisposable>に対するCompositeDisposableのメリットは、Disposeで解除出来るということ。つまり、using構文に放りこむことが可能です。多段Usingよりも綺麗に見えるのでお薦め。

上の例にコソッと出したSubjectクラスはPush型シーケンスの大本で、OnNextやOnCompletedを後続に送ることが出来ます。イベントのラップじゃなく、Rxネイティブなクラスを作る場合に使います。Subjectはちゃんと詳しく書かなきゃいけない大事なクラスの一つなので、また次にでもきっちり紹介する予定は未定。

列挙の分配

Pushのメリットとして分配可能なことを挙げたのに、Coldなので分配出来ません。以上終了。で終わるわけは当然ないわけで、Cold to Hot変換メソッドが使えます。Publishです。Publishの戻り値はIConnectableObservable。

public interface IConnectableObservable<out T> : IObservable<T>
{
    IDisposable Connect();
}

IObservableなのでメソッドチェインを繋げることが出来ます。そして、Subscribeしても列挙は始まりません。Connectを呼んだ時に、一度だけ列挙することが出来ます(二度以降Connectを呼んでも何もしない)

私はダムの堰止をイメージしています。何もしないとドバドバと水が流れてしまうのでPublishで一時的に止めて、Connectで放水。放水後は空っぽ。みたいな。

Max/Sumなど集計系

インターフェイスを挙げただけじゃよく分からないので実例を。SumやMaxといった集計系メソッドと合わせて使ってみます。そこら中にモニタリング用のDoが入っていてコードが若干分かりづらいですが、実行結果で、どのタイミングで値が通過するのかを確認してみてください。

Microsoft Silverlight の取得

var source = Enumerable.Range(1, 5)
    .Do(i => Console.WriteLine("Source -> " + i));

enumerableButton.Click += (sender, e) =>
{
    var sum = source.Sum();
    var max = source.Max();
    var all = source.All(i => i < 3);
    Console.WriteLine("sum = " + sum);
    Console.WriteLine("max = " + max);
    Console.WriteLine("all = " + all);
};

observableButton.Click += (sender, e) =>
{
    var connectable = source.ToObservable().Publish();

    connectable.Subscribe(_ => { }, () => Console.WriteLine("OnCompleted"));

    var sum = default(int);
    connectable
        .Do(i => Console.WriteLine("BeforeSum -> " + i))
        .Sum()
        .Do(i => Console.WriteLine("AfterSum -> " + i))
        .Subscribe(i => sum = i);

    var max = default(int);
    connectable
        .Do(i => Console.WriteLine("BeforeMax -> " + i))
        .Max()
        .Do(i => Console.WriteLine("AfterMax -> " + i))
        .Subscribe(i => max = i);

    var all = default(bool);
    connectable
        .Do(i => Console.WriteLine("BeforeAll -> " + i))
        .All(i => i < 3)
        .Do(b => Console.WriteLine("AfterAll -> " + b))
        .Subscribe(b => all = b);

    connectable.Connect();
    Console.WriteLine("sum = " + sum);
    Console.WriteLine("max = " + max);
    Console.WriteLine("all = " + all);
};

値が確定した時、Allならば全ての列挙が完了した(OnCompletedを受信する)か、条件がfalseのものが見つかったときに、1つだけSubscribeに値が届きます。SumやMaxは、全ての列挙が完了しないと算出出来ないので、全て完了したとき。こういった結果の確定するタイミングは、Enumerableでの場合と変わりません。

このような動作(戻り値が長さ1のIObservable)をするものには、 Aggreagte, Count, Any... 、ようするにIEnumerableにもあって戻り値がIEnumerableじゃないメソッドは全てそうです。全部似たりよったりなので具体的な紹介は省きます。

Pushのデメリット

IObservable便利すぎてIEnumerableいらなくネ? と、言いたいところですが、例えばこれら集計系メソッドは全て長さ1のIObservableになります。Sumの場合、欲しいのはintであってIObservableではありません。長さ1のIObservableは、いつConnectされるか分からないのでSubscribeで外の値に受け渡してやらなければならないわけですが、見た目が美しくなく宣言も冗長になる。

また、集計するのに複数回列挙は確かに格好悪いな!よし、そういう場合はRx使おう。と思った場合はとりあえず待った。ただの配列からの列挙程度の場合は、ふつーに複数回列挙したほうがPublishで分岐させるよりも遥かに速かったりします。元ソースが複雑にLinqで繋いであって重たかったり、ファイルやネットワーク経由だったりで複数呼び出しを避けたい、副作用があって複数回呼びだすと内容が変化している、という場合はRxです。が、一旦ToListしてキャッシュすれば済むシーンならば、キャッシュした方が分かりやすく速い場合が多かったりします。

Publishの具体的な使い処としては、以前に、TwitterのStreamAPIをRxを使って分配するという記事で紹介しました。

まとめ

PullとPushは、むしろ動作的にはPushのほうが素直で分かりやすい雰囲気。難解だと思って避けていたそこのアナタ、さあ、Rxを使おう! しかしColdとHotは大いなる罠。初見ではきっとつまづく。この区別は本当に大事。Rxが難解っぽいとしたら、Cold/Hotのせい。挙動がまるっと変わるんだもの。でも、ゆっくり紐解けば全然大丈夫。さあ、Rxを使おう!Publishや集計系はそんなには使わないかもですが、覚えておくと便利な時も割とある。さあ、Rxを(ry

個人的にRxの特色・使いどころは「イベントの合成」「タイマー・ネットワーク・スレッドなど非同期処理の一元化」「シーケンスの分配」の3つだと思っているのですが、このブログでは、延々とシーケンス分配という、3つの中で一番どうでもいい機能しか紹介していない!という酷い事実に気がつきました。そんなんじゃRxのポテンシャルを全然伝えられない。

というわけで、次回はタイマー辺りを紹介したいと思います予定は未定。というか計画ではObservableの合流周りとMarble Diagramについてを書く予定。Rxの知名度も徐々に上がってきているようなので、しっかり紹介していきたいですし、他の人も書いて欲すぃ。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive