Reactive Extensionsの非同期周りの解説と自前実装

最近id:okazukiさんが凄い勢いでRx解説を書いていて凄い!そして、刺激を受けますねー。というわけで、今回はRxの非同期周りの解説をします。今日の昼に非同期処理を行うメソッドの戻り値は全てIObservable<T>にしてしまえばいいんじゃないんだろうかを読んで、Twitterで少しAsyncSubjectについて書いたのでそれをまとめて……、と思ったらReactive Extensions入門 11「非同期処理用のSubject」ですって!早!私なんて一年かけて何も書いていやしなかったりするのに、この早さは本当に見習いたい、ごほごほ。

そんなわけかで、色々と被ってしまっているのですが、Rxの非同期実行についてまとめます。まず、Rxの非同期の起点はStart, ToAsync, FromAsyncPatternの3つ。Start,ToAsyncはFromAsyncPatternの簡易化みたいなものなので、実質は1つです。これらを使うと長さ1のIObservable<T>として非同期処理が扱えるわけです。ところで非同期処理が完了するまでの間に複数Subscribeしたらどうなるのだろう、または非同期処理が完了してしまった後にSubscribeしたらどうなるのだろう? Silverlightで実際に触って試してみてください。

Microsoft Silverlight の取得

ボタンだらけでイミフ? 解説します……。Rxの非同期系処理は全てAsyncSubjectを通ります。AsyncSubjectのOnNextが非同期で実行された値の戻り値を、OnCompletedが非同期処理の完了を示します。通常はOnNextとOnCompletedはワンセットで非同期処理です。というわけで、例えばSubscribeを二回押す→OnNextを一回押す→OnCompletedを一回押すで、非同期のIObservable<int>に対して二つSubscribe(Console.WriteLine)したということになり、1が二つ右側のログに表示されたはずです。続けてSubscribeを押す(非同期処理が完了した後にSubscribeした場合)とどうなるか?というと、1が追加されたはずです。

以前にHot vs ColdとしてIObservableの性質を少し取り上げました。ColdはSubscribeするとすぐに実行される。HotはSubscribeしても値が流れてくるまで待機する。非同期におけるIObservable<T> = AsyncSubjectは、つまり、両方の性質を持ちます。OnCompleted以前はHotで、値が流れてくる(非同期処理が完了する)まで待機される。OnCompleted以後はColdとなって、Subscribeするとすぐに値を流す。

何となくAsyncSubjectの中身が想像付いてきました?そう、非同期の実行結果である値をキャッシュしています。もし複数回OnNextされたらどうなるか、というと、これは最後の値だけが残ります。(一度Resetしてから)OnNextを連打してからOnCompletedを押して、Subscribeしてみてください。

非同期というとキャンセルはどうするの?というと、ありますよ!Subscribeの戻り値はIDisposable。イベントのRx化、FromEventの場合はイベントのデタッチでしたが、非同期で使う場合はキャンセルになります。例えばSubscribeを押してから、Disposeを押して(キャンセル!)、OnNext→OnCompleted(非同期処理完了)を押してみてください。ログに何も出てきません。非同期処理が完了する前にDisposeしたということで、Subscribeがキャンセルされた、ということです。では、続けて(OnCompleted完了後)Subscribeを押すと……、ログに値が表示されます。Disposeは処理自体をキャンセルするわけではなく、 Subscribeのキャンセルということですね。

そうそう、ResetボタンはSubscribeやDispose、AsyncSubjectの状態をリセットして初期化します。Clearボタンはログの消去になります。

AsyncSubjectの簡易実装

AsyncSubjectの実態が大体分かったので、次は自分で実装してみましょう!その前にSubjectって何ぞや、というと、IObservable<T>かつIObserver<T>。これはRxネイティブのイベントだと思ってください。詳しくはReactive Extensions入門 + メソッド早見解説表をどうぞ。

とりあえず実装したコードを。

public class MyAsyncSubject<T> : IObservable<T>, IObserver<T>
{
    bool isCompleted = false;
    T lastValue = default(T);
    readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    // OnCompleted済みなら即座にOnNext呼び出し、そうでないならListへAdd
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (isCompleted)
        {
            observer.OnNext(lastValue);
            observer.OnCompleted();
            return Disposable.Empty;
        }
        else
        {
            observers.Add(observer);
            // 正しくはキャンセルが可能なように、Disposeが呼ばれた際にListからremoveされるよう
            // ラップした特別なIDisposableを返す必要があるけれど、簡略化した例ということでその辺は省きます
            return Disposable.Empty;
        }

    }

    // 初回呼び出しの場合は最後の値で全てのobserverのOnNextとOnCompletedを呼ぶ
    public void OnCompleted()
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnNext(lastValue);
            o.OnCompleted();
        });
        observers.Clear();
    }

    // OnCompletedと同じ。これも呼ばれたらCompleted済みとなる
    public void OnError(Exception error)
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnError(error);
        });
        observers.Clear();
    }

    // Completed済みでなければキャッシュを置き換える
    public void OnNext(T value)
    {
        if (isCompleted) return;
        lastValue = value;
    }
}

あくまでこれは簡易化したものです。実際はこれより、もう少し複雑です。あとlockを省いているのにも注意。まあ、ちゃんとしたのが知りたい場合はリフレクタとキャッキャウフフしてくださいということで。コードの中身ですが、Silverlightのデモで見てきた通りの、比較的単純な作りになっています。OnCompleted前後のSubscribeの挙動なんて、コードで見たほうが一目瞭然で早いですね。

これの挙動がRxの非同期系の挙動です。もしAsyncSubjectではなくSubject(値のキャッシュなし)を使った場合は、非同期開始からSubscribeまでの間に完了してしまった場合、何も起きないことになってしまいます。キャッシュを取るというのは、非同期に最適な理に叶った、というか、少なくとも不都合は起きないような挙動になります。もし自前で非同期でIObservable<T>を返すようなメソッドを実装する場合は、必ずAsyncSubjectを使いましょう。ユーザーの期待する挙動を取らなければならないという、義務として。

FromAsyncPatternの簡易実装

ここまで来たら、ついでなのでFromAsyncPatternも実装してしまいましょう!AsyncSubjectがあれば簡単です。あ、こちらでも断っておくと、あくまで簡易実装であって実際のものとは若干異なります。

static class Program
{
    static Func<T, IObservable<TR>> MyFromAsyncPattern<T, TR>(Func<T, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TR> end)
    {
        return arg =>
        {
            var asyncSubject = new AsyncSubject<TR>(Scheduler.ThreadPool); // おや、引数に……

            // 引数を渡されたら、Subscribeを待たずBeginInvokeで非同期実行が始まります
            begin.Invoke(arg, ar =>
            {
                TR result;
                try
                {
                    result = end.Invoke(ar); // EndInvokeで結果を得て
                }
                catch (Exception error)
                {
                    asyncSubject.OnError(error);
                    return;
                }
                asyncSubject.OnNext(result); // OnNext!
                asyncSubject.OnCompleted();
            }, null);

            return asyncSubject.AsObservable(); // SubjectのOnNextなどを隠す(なので昔はHideというメソッド名でした)
        };
    }

    // ToAsyncはFunc/ActionのBeginInvoke,EndInvoke簡略化版です
    // というのは少し嘘です、実際はSchedulerを使うのであってBeginInvokeは使いません、詳しくはまたそのうち
    static Func<T, IObservable<TR>> MyToAsync<T, TR>(this Func<T, TR> func)
    {
        return MyFromAsyncPattern<T, TR>(func.BeginInvoke, func.EndInvoke);
    }

    // StartはToAsyncの引数無しのものを即時実行というものです
    // Func<T>のみのFromAsyncPatternを作っていないので、Rx本来のToAsyncで書きます
    static IObservable<T> MyStart<T>(Func<T> func)
    {
        return Observable.ToAsync(func).Invoke(); // ここで即座にInvoke
    }

    static void Main(string[] args)
    {
        // *が引数の分だけ並ぶという関数があったとする
        Func<int, string> repeat = i => new String('*', i);

        var obs = MyFromAsyncPattern<int, string>(repeat.BeginInvoke, repeat.EndInvoke)
            .Invoke(3); // Subscribe時ではなく、Invokeした瞬間に非同期の実行は開始されていることに注意

        obs.Subscribe(Console.WriteLine); // ***
        Console.ReadKey();
    }
}

FromAsyncPatternは引数の多さや引数のイミフさにビビりますが、デリゲートのBeginInvoke, EndInvokeに合わせてやるだけです。こんなの私もソラでは書けませんよー。さて、今回は引数を一つ取るFromAsyncPatternを定義してみました。戻り値がFunc<T, IObservable<TR>>なのは、引数を一つ取ってIObservable<T>にする、ということです。

中身は割と簡単で、とりあえずBeginInvokeして、EndInvokeを待たずに(そりゃ非同期なので当然だ)とりあえずAsyncSubjectを戻します。つまり、これでIObservableとしてはHotの状態。そして何らかの処理が終えたらEndInvokeで戻り値を得て、AsyncSubjectのOnNext, OnCompletedを呼んでやる。これでSubscribeされていたものが実行されて、ついでにIObservableとしてColdになる。これだけ。意外とシンプル。

ついでのついでなのでObservable.ToAsyncとObservable.Startも定義してみました。ToAsyncはFunc/Actionを簡単にRx化するもの。FromAsyncPatternだとジェネリックの引数を書いたりと、何かと面倒くさいですから。StartはToAsyncを更に簡略化したもので、引数なしのものならInvoke不要で即時実行するというもの。

実行開始タイミングについて

FromAsyncPatternは(ToAsync/Startも)Invokeした瞬間に非同期実行が始まります。 FromAsyncPattern().Invoke().Subscribe() といったように、直接繋げる場合は気にする必要も特にないかもですが、一時変数などに置いたりする場合などは、Subscribeされたときに初めて非同期実行が開始されて欲しい、と思ったりあるかもですね。そんな場合はDeferを使います。

Microsoft Silverlight の取得

// 100を返すだけのどうでもいい関数
Func<int> func = () =>
{
    Console.WriteLine("fire");
    return 100;
};

var start = Observable.Start(func);
var defer = Observable.Defer(() => Observable.Start(func));
var prune= Observable.Defer(() => Observable.Start(func)).Prune();

StartButton.Click += (sender, e) =>
{
    start.Subscribe(Console.WriteLine);
};

DeferButton.Click += (sender, e) =>
{
    defer.Subscribe(Console.WriteLine);
};

var isConnected = false;
ReplayButton.Click += (sender, e) =>
{
    if (!isConnected) { prune.Connect(); isConnected = true; }
    prune.Subscribe(Console.WriteLine);
};

何もボタンを押さなくてもログにfireと出てしまっています。Observable.Startのせいなわけですががが。そんなわけで、Deferを使うとSubscribeまで実行を遅延することができます。ところで注意なのが、どちらもIObservable<T>なのですが、StartはColdとしてキャッシュされた値を返し続けるのに対して、Deferは中の関数を再度実行します。もしキャッシュしたい場合は、Pruneを使うといいでしょう。Pruneはこのブログでも延々と出してきた値の分配のためのPublishの親戚で、Connect後は最後の値をキャッシュして返し続けるという、AsyncSubjectと同じ動作をします(というか中身がAsyncSubjectなのですが)。この辺の使い分けというのもヤヤコシイところですねえ、そもそもPruneっていうメソッド名がイミフ……。

まとめ

Linq to Objectsで、最初分からなかったんですよ、yield returnなどで返される遅延評価としてのIEnumerable<T>と、配列(これもIEnumerable<T>ではある)の違いが。初めてLinqを知った後、半年ぐらいは分からないままだった(C#歴も半年でしたが)。同じインターフェイスなのに、状態としては、ちょっと違う。こういうのって結構分かりづらくて、躓いてしまうところです。

Rxの厄介なところは、IObservable<T>が一つで色々な状態を持ちすぎ。HotとColdの違いもある上に、更には混じり合った状態まである、Deferと非Deferも、外からだけだと全く区別がつかない。もう分かりづらいったらない。これに関しては一個一個丁寧に見ていくしかない、かな。今回はRxにおける非同期を徹底的に解剖してみました。一つ一つ、丁寧に。Rxも徐々に盛り上がりつつあるようなので、これからも、私なりに出来る限りに情報を発信していけたらと思います。

ところで、凄くシンプルなんですよね、Rxって。何を言ってるんだ?って話ですが、ええと、ほら、あくまでも「簡易」だからってのもありますが、実装のコード行数も少ないし全然難しいことやってないという。Linq to Objectsもそれ自体は凄くシンプルで、シンプルなのにとんでもなく強力という。それと同じで。Rx触ってて、内部をリフレクタでちょろちょろと見てて、本当に驚く。シンプルなのだけど、自分じゃ絶対書けないし思いつけないしっていう。悔しいし、そして、憧れるところですねえ。

特に魔法もなく、素直に中間層厚めな実装は、パフォーマンスは(もがもがもがもが)。何か言いました?

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive