Rxにおける並行非同期実行とリソース処理の問題

非同期(Asynchronous)だの並列(Parallel)だの並行(Concurerrent)だの、よくわからない単語が並びます。ParallelがやりたければPLINQ使うべし、と思うわけですがそれはさておき、Rxを使うと、意図しても意図しなくても、並行な状態にはなります。そして、その意図していないという状態は危うい線を踏んでいるので、きちんと認識しておく必要があります。また、危ういと同時に、Rxはその並行をうまくコントロールするメソッドが揃っているので、覚えておくと世界が一気に広がります。

例えば、こういう非同期メソッドがあるとして。

IObservable<T> AsyncModoki<T>(T value, int second)
{
    // second秒後にvalueを返す非同期処理をシミュレート
    return Observable.Return(value)
        .Delay(TimeSpan.FromSeconds(second));
}

static void Main(string[] args)
{
    // 1,2,3,4という入力をすぐに送り込む
    new[] { 1, 2, 3, 4 }
        .ToObservable()
        .SelectMany(x => AsyncModoki(x, 3))
        .Subscribe(x => Console.Write(x + "->"));

    Console.ReadLine();
}

Microsoft Silverlight を入手

1~4は、全て同時にリクエストが開始されます。だから、3秒後に同時に結果が表示されます。

同時に実行が開始されているということは、非同期の結果が完了する時間にズレがある場合、結果が前後することがあります。実際、上のものも何度か実行すると毎回結果が変わると思います(Delayは(デフォルトだと)値をThreadPoolに投げて遅延させます。ThreadPoolに入った時点で、順序の保証が消滅する)。というわけで、基本的にSelectManyを使った場合1:1で渡していくわけではなければ、順序は壊れると考えてください。さて、それだと困る場合もあるのではと思いますので、結果の順序を制御する方法が幾つかあります。

Switch

Switchは実に有意義なメソッドで、分かると、SelectMany以上に多用することが多くなるのではと思います。

clickEventObservable // クリック毎に
    .Select(x => AsyncModoki(x, 1)) // 何らかの非同期処理をするとする
    .Switch() // IObservable<IObservable<T>>の状態なので、Switch
    .Subscribe(Console.WriteLine);

Microsoft Silverlight を入手

クリックすると1秒遅延(非同期処理でもしていると考えてください)して、値が表示されます。しかし、1秒以内に次の値がクリックされた場合はキャンセルされ、表示しません。

つまり、最新の値だけを返すことを保証します。それ以前のものはキャンセル(Disposeが呼ばれる)されます。どういう時に使うかというと、例えばインクリメンタルサーチ。L, LI, LIN, LINQと入力が変わる度に非同期リクエストを発生させますが、欲しい結果は最後の一件のみで、次のキー入力があった場合は以前のものはキャンセルして欲しい。キャンセルはともかく、非同期実行だと結果が前後してしまうことだってあります。LINQと入力したのにLIの結果一覧が表示されてしまったら困る。そんな場合に、まさに、うってつけです。そして存外、こういったシチュエーションは多いのではないかと思われます。例えば私の以前作ったUtakotohaというWP7用歌詞表示アプリケーションも、曲のスキップに応じて最新のものだけを表示するために、Switchを利用しました。(コードが激しく酷いのと機能貧弱っぷりなので、そろそろ書き直したい)

Merge/Concat

Switch以外にも色々あります。

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .SelectMany(x => AsyncModoki(x, 1)) // 全て並行実行(最初の例です)
    .Subscribe(x => Console.Write(x + "->"));

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1)) // IO<IO<T>>
    .Merge() // こちらも全て並行実行、SelectMany(xs => xs)と同じ
    .Subscribe(x => Console.Write(x + "->"));

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Merge(2) // 2件ずつ並行実行する(並行実行数の指定が可能)
    .Subscribe(x => Console.Write(x + "->"));


new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Concat() // 1件ずつ実行する(Merge(1)と同じ)
    .Subscribe(x => Console.Write(x + "->"));

Microsoft Silverlight を入手

ネストはSelectManyで一気に崩してしまうケースが一般的でしょうけれど、IObservable<IObservable<T>>といったネストした状態にすると、選択肢がSwitchもそうですが、更に、MergeとConcatを選択することができます。ちなみに、このintで並行実行数が指定可能なMergeはWP7同梱版のRxには存在しません。残念。(もう一つ余談ですが、SelectManyはRx内部ではSelect(selector).Merge()という実装になっていたりします)

実行タイミングの問題

上のSilverlight、Merge2とMerge2Exの二つを用意しましたが、Merge2Exのほうは4つ同時に表示されるのが確認出来るはずです。コードはほぼ同一なのですが、AsyncModokiを似たようで別なものに差し替えました。

// Merge(2):Ex
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki2(x, 1)) // これが差分
    .Merge(2)
    .Subscribe(x => Console.Write(x + "->"));

// スレッドプール上で非同期実行(結果は指定秒数後に返る)のシミュレート
// second秒後にネットワーク問い合わせが返る、的なものをイメージしてみてください
static IObservable<T> AsyncModoki2<T>(T value, int second)
{
    var subject = new AsyncSubject<T>();

    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(second)); // 指定秒数待機
        subject.OnNext(value);
        subject.OnCompleted(); // 完了(2つでワンセット)
    });

    return subject; // これ自体はすぐに返す(FromAsyncPatternの中身はこんな感じ)
}

このAsyncModoki2は、このメソッドを通ると即座にThreadPoolに送り込んで「実行」しています。Subscribeされるかどうかとは関係なく、Subscribeの「前に」。対してAsyncModokiはSubscribeされないと実行が開始されません。同じようで違う、この二つの状態をRxでは「Hot」と「Cold」と呼んで区別しています。HotはSubscribeとは関係なく動いているもの、イベントなんかはそうですね。ColdはSubscribeされて初めて動き出すもの、Observable.ReturnであったりRangeであったりと、Rxからの生成子の場合は、こちらのパターンが多いです。

実はFromAsyncPatternはHotなので、Subscribeとは関係なく即座に(といっても戻り値はFuncなのでInvokeしたら、ですが)非同期実行が開始されたりします。これは、あまり都合が良くなく(例えば上の例で見たように、MergeはSubscribeのタイミングによって実行数をコントロールしている)、Coldに変換したほうが扱いやすいです。そのためのメソッドがDefer。

static IObservable<WebResponse> AsyncModoki3<T>(WebRequest req)
{
    return Observable.Defer(()=>
        Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse,req.EndGetResponse)());
}

こちらのほうが、大抵の利用シーンにはマッチするかと思われます。

キャンセル時のリソース処理の問題

Switchは実に有意義なのですが、それの行っていることは、次の値を検知すると前の値をキャンセルする、ということです。普段はあまりキャンセルはしないと思うのですが、Switch内部では大量のキャンセルが発生しています。さて、どのような問題が発生するか、というと、例えば……。

using System;
using System.Net;
using System.Reactive.Linq;

class Program
{
    static void Main(string[] args)
    {
        // ネットワークの最大接続数。通常、デフォルトは2になっているはず。
        ServicePointManager.DefaultConnectionLimit = 2;

        // テキストボックスのTextChangedイベントをイメージした、インクリメンタルサーチで来る文字列群
        new[] { "w", "wi", "wik", "wiki", "wikip", "wikipe", "wikiped", "wikipedi", "wikipedia" }
            .ToObservable()
            .Select((word, id) =>
            {
                // wikipediaのAPIにリクエスト飛ばす
                var url = "http://en.wikipedia.org/w/api.php?action=opensearch&search=" + word + "&format=xml";
                var req = (HttpWebRequest)WebRequest.Create(url);
                req.UserAgent = "test";

                return Observable.FromAsyncPattern<WebResponse>((ac, state) =>
                    {
                        Console.WriteLine("ASYNC START:" + id);
                        return req.BeginGetResponse(ac, state);
                    }, ar =>
                    {
                        Console.WriteLine("ASYNC END:" + id);
                        return req.EndGetResponse(ar);
                    })()
                    .Select(res =>
                    {
                        using (res) // ここのセクションが呼ばれることはない
                        {
                            Console.WriteLine("CALLED NEXT:" + id);
                            return "response string:" + id;
                        }
                    });
            })
            .Switch()
            .ForEach(Console.WriteLine); // 終了を待機する形でのSubscribe
    }
}

// ConsoleApplication用のコードですが、是非実行してみてください。結果は以下のようになります。

ASYNC START:0
ASYNC START:1
ASYNC START:2
ASYNC START:3
ASYNC START:4
ASYNC START:5
ASYNC START:6
ASYNC START:7
ASYNC START:8
ASYNC END:0
ASYNC END:1
// そしてフリーズ...

これは、フリーズします。何故かというと、まず8件の非同期処理が一斉に開始されます(ASYNC STARTの表示)。一斉に開始はされますが、ネットワークの最大接続数は2なので、それ以外のものは内部的には待機されています。そして、Switchによる切り替えは最新のものだけを通すようにするため、7件はキャンセルされます。その後、最初の二件分のネットワークリクエストが終了し(ASYNC ENDの表示)、キャンセルされているためメソッドチェーンの続きであるSelectは呼ばれません。そして、フリーズ。

何故フリーズしてしまうかというと、EndGetResponseで取得した最初の二件のWebResponseが解放されていないためです。キャンセルが呼ばれなければ、Selectを通り、そこでusingにより利用+解放されるのですが、そのセクションを通らなければ何の意味がありません。使われることなく虚空に放り出されたWebResponseが、永遠にネットワーク接続を握ったままになってしまっています。

当然、大問題。

Switchを諦めてSelectMany(全件キャンセルせずに並行実行、どうせネットワーク自体の最大接続数で制限かかっているし)というのも手ではあります。大体の場合は結果は問題ないでしょう。けれど、Switchの利点は何でしたっけ、と。結果が前後しないことです。LINQを検索しようとしていたのに、検索結果が前後したせいでLINQ→LINの順番に結果が得られた結果、表示されるのがLINの結果では困ってしまいます。Switchなら、後に実行したものが必ず最後に来ると保証されるので、そのようなことにはなりません。反面、SelectManyは並行実行のため、前後する可能性が出てきます。Switchはこの例で挙げたような、インクリメンタルサーチのようなものと相性がとても良いんですね。

ではどうするか?

WebResponseのDispose(Close)を呼べれば解決するので、FromAsyncPatternのEnd部分に少し細工を加えてやる、ということが考えられます。

// こんなFromAsyncPatternを用意して
public static IObservable<TResult> SafeFromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
    where TResult : IDisposable
{
    // WP7版ではCreateWithDisposableで(この辺の細かな差異が割とウザい)
    return Observable.Create<TResult>(observer =>
    {
        var disposable = new BooleanDisposable();

        Observable.FromAsyncPattern<TResult>(begin, ar =>
        {
            var result = end(ar);
            if (disposable.IsDisposed) result.Dispose(); // キャンセルされてたらDispose
            return result;
        })().Subscribe(observer);

        return disposable; // Disposeが呼ばれるとIsDisposedがtrueになる
    });
}

// こんな風に使うとか
public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest req)
{
    return ObservableEx.SafeFromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}

これにより、キャンセルされたかどうかをEnd部分で判定することが出来ます。よってEnd時にキャンセルされていたらリソースをDisposeしてしまう(ここでreturnしたオブジェクトは、チェーンは切れているので別に使われることなく虚空を彷徨うだけ)。これにより、FromAsyncPatternがリソースを返し、かつ、いつキャンセルされても問題なくなります。

他にも色々なアプローチが考えられます。CompositeDisposable/MutableDisposable/SingleAssignDisposableなどを使い、Disposeが呼ばれたら同時に管理下のリソースをDisposeしてしまう、といった手法。これは、リソースのDisposeされる瞬間が逆にコントロールしにくくなって、例えばWebResponseですと、その後のStreamを呼んでる最中にWebResponseがDisposeされてしまうなどの自体も起こりうるので、少し厄介に思えました。。リソースを後続に渡すまでは責任を持つ。それ以降はノータッチなので好きにやらせる、利用も解放も後続側が責任を。その方が自然だし、素直な動きになるので、いいかな。

他には、キャンセルを伝搬しないようなメソッドを作り、Disposeが呼ばれてもリソースを受け取れるようにし、後続でリソースをDisposeする、などの手段も考えられます。そうすればSafeFromAsyncPatternなどといった、独自のFromAsyncPatternを作る必要はなく、全てに適用できて汎用性は高いのですが、チェーンでの保証が途切れてしまうのが若干微妙かな、と……。この辺は悩ましいところです。

そもそもWebRequestなら、DisposeでAbortしてしまったほうが、キャンセルらしくていいかもしれない。

public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
{
    return Observable.Create<WebResponse>(observer =>
    {
        Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse,
            ar =>
            {
                try
                {
                    return request.EndGetResponse(ar); // Abort後の場合は例外発生
                }
                catch (WebException ex)
                {
                    if (ex.Status == WebExceptionStatus.RequestCanceled) return null;
                    throw; // キャンセル時以外は再スロー
                }
            })()
            .Subscribe(observer);
        return () => request.Abort(); // Dispose時にこのActionが呼ばれる
    });
}

Disposeが呼ばれるとwebRequest.Abortが呼ばれます。その後にEndGetResponseを呼ぶとRequestCanceledなWebExceptionが発生するので、キャンセルされていたならnullを(どちらにせよ、Dispose済みなので、ここでreturnしたものは次のメソッドチェーンで使われることはない)、そうでない例外ならば再スローを、という方針です。悪くなさそうですが、どうでしょうか。私的にはこれを採用するのがベストかなー、と考え中です。

まとめ

SwitchやMergeなどで、従来扱いにくかった並行処理時の非同期のコントロールが簡単になりました。単純に一本の非同期をSelectManyで摩り替えるだけもアリですけれど、せっかくの多機能なのだから、並行にリクエストなどを飛ばして、より速いアプリケーション作りを目指してもいいかもしれません。同期リクエストをTask.Factory.StartNewで包んで振り回すよりかは、ずっと楽です。また、現在行われているMSのイベントBUILDで発表されたWinRTなどは、完全に非同期主体です。C#5.0でasync/awaitが入り、非同期がより扱いやすくなることで、それに併せてModelの有り様も、同期から非同期へと変わっていき、それにあわせてVMなどの書き方も変わってくるのではかと思われます。

ただ、リソースの問題にだけは気をつけて!上で挙げた問題は、本質的にはFromAsyncPatternに限らず、リソース処理が引き離されている場合の全てで該当します。リソースを扱うのは難しい。とはいえ、全面的に問題になるのは、このFromAsyncPatternぐらいな気はします。Observable.Usingなども用意されているので、不用意にリソースをチェーン間で渡したりしなければ原則的には起こらない。けれど、そのFromAsyncPatternこそがリソースを扱うシチュエーションで最も使われるものなんですよね、とほほほ。

キャンセル(Dispose)を不用意に呼ばなければ問題は起こらないといえば起こらないんですが(そのため、不適切に書いてしまっていても、多くのケースで問題が表面化することはないでしょう)、Switchのようなアプローチが取れなくなるのがどうにも。現状だと、とりあえず気をつけましょう、としか言いようがないので、気をつけましょう。もし何かうまい具合に動かないなあ、と思ったら、この辺を疑ってみると良いかもしれません。

その辺難しいなあ、という場合は、近いうちに私の出すRx拡張ライブラリを使いましょう。特に考えなくても済むよう、色々配慮してあります。いつ出るの?というと、はい、最近ゴリゴリと書いてますんで(ブログがちょっと放置気味だった程度には)、必ず近いうちに出します。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive