.NET Reactive Framework メソッド探訪第三回:Subscribe

メソッド探訪とか言いながら、ちっとも探訪してません。今回までが基礎知識で、Linq to Objectsで言ったらイテレータがどうこう、という段階。次回以降はメソッドを見ていきたいと思います。では、最後の予習ということで、Subscribe。SubscribeはIObservable<T>連鎖の終点となるもので(但し、別にSubscribeだけが終点というわけではない、FirstとかCountとか、終点になるものは他にもあります)ForEachのようなもの。戻り値はIDisposableで、イベント発生の監視を止めたい時はDisposeを呼ぶ。と、第一回の時に書きましたので、今回は別の方向から見ていきます。

static void Main(string[] args)
{
    Observable.Range(1, 10).Subscribe(Observer.Create(
        (int i) => Console.WriteLine(i), // OnNext
        e => { throw e; }, // OnError
        () => Console.WriteLine("Completed") // OnCompleted
    ));

    Console.ReadLine();
}

今回はコンソールアプリで。結果は想像つくとおり、1から10、最後にCompletedを表示。Observable.RangeはEnumerable.Rangeと同じです。というか中身的にはEnumerable.Range.ToObservableで変換されているだけです。RepeatとEmptyも用意されているので、ちょっとしたメソッド確認用に便利に使えると思います。それにしてもコード、ゴチャゴチャしてますねえ、わけわかんない書き方してわざと難解にやってるんじゃないだろうかって感じですが、その通りです、ので、普通の書き方も下の方でちゃんと書きます。で、SubscribeはIObservable<T>のメソッドとして定義されています。IEnumerable<T>がGetEnumeratorを持つように、IObservableはSubscribeを持つ。そして、原則Subscribeの引数はIObserver<T>です。

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T>
{
    void OnCompleted();
    void OnError(Exception exception);
    void OnNext(T value);
}

インターフェイスなので直接生成することは出来ない。よってファクトリメソッドObserver.Createを用いて生成します。生成されるのは前回見たAnonymousEnumeratorと同じくinternalのジェネリッククラス「AnonymousObserver<T>」で、インターフェイスの各メソッドにデリゲートを直接放り込むだけの単純明快なものです。勿論、IObservable<T>を継承して自前の専用のものを用意しても構いませんが、クロージャを活かせば、自前で定義する意味など少しもありません。コンパイラの自動生成クラス任せでOK。

これの動作はメソッド名通りで、OnNextは値が来るたびに実行されるメソッド。OnCompletedは全て完了した時(FromEvent経由のものなど、実質無限リピート状態の場合は、Disposeが呼ばれた時)に実行されるメソッド。OnErrorは例外が発生した時に実行されるメソッド。発生した例外は原則catchされるので、再スローしたい時は、そのまんまですがthrow eの明示が必要です。

……それにしても面倒くさい。Observer.Createは。onNextだけが書ければいいんだよ!ってシーンにわざわざ空の式を書けとでも?(ちなみに空は()=>{}です) 大体がして、推論出来ないから(int i)だとか、型を書かなければならないのもかったるい。というわけで、拡張メソッドが用意されています。

public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action onNext);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action onNext, Action<Exception> onError);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action onNext, Action<Exception> onError, Action onCompleted);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);

いっぱいありますけど、ようするに無視した部分は空のメソッドが代わりに埋められる、というだけの話。これを使えば、最初の例は、onNextだけにすると

Observable.Range(1, 10).Subscribe(i => Console.WriteLine(i));

と、簡潔明快に記述出来るわけです。メデタシメデタシ。

過去記事

.NET Reactive Framework メソッド探訪第一回:FromEvent
.NET Reactive Framework メソッド探訪第二回:AnonymousEnumerable

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive