イベントベースの非同期処理をReactive Extensionsで扱う方法

Reactive Extensionsを使用して複数のサービス非同期コールを連続して呼べるか試してみた。その2 - y_maeyamaの日記 という記事にて、Rxを使ってWCFを綺麗に呼んでました。なるほど!Silverlightで!WCFで!非同期コールを連続して呼ぶ!とっても実践的なテーマ。XboxInfoほげほげがどうのこうの、とかやってたどこかの私と違います、げふんげふん。とても素晴らしいテーマとコードだと思ったので、拝借して私も少し書いてみました。

// 実行部分(実行後10秒後ぐらいにWCFサービスの連鎖を経てメッセージボックスに結果を表示)
var client = new ServiceReference1.Service1Client();

var asyncQuery = from a in Observable.Defer(() => client.GetAAsObservable("てすと"))
                 from b in client.GetBAsObservable(a.EventArgs.Result)
                 from c in client.GetCAsObservable(b.EventArgs.Result)
                 select c.EventArgs.Result;

asyncQuery.Subscribe(s => MessageBox.Show("チェーン終了 - " + s));

「前に呼んだサービスの引数にアクセス」に関しては、SelectManyで匿名型を作る、が解だと思います。で、幾つも連鎖する時はクエリ構文を使うのが大変お手軽。ただしクエリ構文は独自定義のメソッドと繋がりが悪くなるという欠点があります。RxではDoとかObserveOnとか結構使いますから、クエリ構文、便利な用で使いどころに困る代物。その辺は実際に書く内容によって判断つけるところですねー。

XxxAsObservable

突然出てきているGetAAsObservableって何だ、といったら、勿論、拡張メソッドです。FromEventやFromAsyncPatternなどはコード本体に書いてあると鬱陶しいので、基本は拡張メソッドで隔離してしまいましょう。命名規則に特に決まりはありませんが、私はXxxAsObservableという規則で書いています。今回はWCFのサービス参照で自動生成されたイベントベースの非同期処理のラップなので、FromEventをベースに、ただし少し小細工を。

public static class IObservableExtensions
{
    // イベントベースの非同期はRxと今一つ相性が悪いので変換する
    public static IObservable<IEvent<T>> ToAsynchronousObservable<T>(this IObservable<IEvent<T>> source) where T : EventArgs
    {
        var connectable = source
            .Take(1) // 実行は一回のみ(これしとかないとイベントハンドラの解除がされないし、そもそもPruneが動かなくなる)
            .Prune(); // AsyncSubjectに変換
        var detacher = connectable.Connect(); // コネクト(イベント登録実行)
        return connectable.Finally(() => detacher.Dispose()); // 任意のDispose時にイベントのデタッチ(なくても構いません)
    }
}

public static class Service1Extensions
{
    // 量産なので多いようならT4テンプレートで生成してしまうと良いでしょう
    public static IObservable<IEvent<GetACompletedEventArgs>> GetAAsObservable(this Service1Client client, string key)
    {
        var o = Observable.FromEvent<GetACompletedEventArgs>(
                h => client.GetACompleted += h, h => client.GetACompleted -= h)
            .ToAsynchronousObservable();
        client.GetAAsync(key); // 非同期実行開始
        return o;
    }
    
    // あと二つほど
}

ふつーにFromEventをラップして終わり、ではなくて少々細工を仕込んでいます。理由は、イベントベースの非同期処理はそのままだとRxでは扱いづらいから。

イベントベース非同期処理とReactive Extensions

.NET Frameworkには基本的に2つの非同期処理の方法があります。一つはBeginXxx-EndXxxによるAPM(非同期プログラミングモデル)。もう一つはXxxAsyncとXxxCompletedによるイベントベースの非同期処理。イベントベースは素で扱う分にはAPMよりもずっと書きやすかった。だからWebClient、BackgroundWorkerなど、手軽に使える系のものに採用された(というのが理由かは知りませんが)。そしてサービス参照の自動生成のものもイベントベース。

しかし、ラッピングして使う場合はとにかく使いづらい!一度の登録で何度も何度も実行されてしまうことは合成時に都合が悪い。また、ラップしたメソッドと、引数を渡す処理の実行箇所が離れてしまうことは(FromEvent.Subscribe で登録して client.GetAsync で実行)書くのが二度手間になり面倒、という他に、Subscribeよりも前に非同期実行して、更にSubscribeよりも前に非同期実行が完了してしまった場合は何も起こらなくなる。といった実行タイミングの面倒くさい問題まで絡んでくる。

というわけで、イベントベースの非同期処理をReactive Extensionsに載せる場合は、ただたんにFromEventで包むだけではなく、一工夫することをお薦めします。

それが上で定義したToAsynchronousObservable拡張メソッド。これはRxを使って非同期プログラミングを簡単にの時に少し出しました。 source.Take(1).Prune() ですって! ふむ、よくわからん。一行ずつ見ていくと、sourceは、この場合はFromEvent後のものを想定しています(なので拡張メソッドの対象はIObservable<IEvent>)。それをTake(1)。これは、ふつーの非同期処理では実行完了は1回だけだから、それを模しています。イベントベースのラップなので何もしないと無限回になっていますから。

続けてPrune。これは内部処理をAsyncSubjectに変換します。AsyncSubjectに関してはReactive Extensionsの非同期周りの解説と自前実装で簡易実装しましたが、処理が完了(OnCompleted)まで待機して、完了後はキャッシュした値を流すという、非同期処理で不都合(実行タイミングの問題など)が起こらなくするようにしたもの。対になるのはSubjectでイベント処理を模したもの。FromEventを使うと内部ではSubjectが使われることになるので、それを非同期で都合が良い形であるAsyncSubjectに変換する、ためにPruneを使いました。そしてここでConnectすることで即座にイベントにアタッチします(実行タイミングの前後の問題をなくすため)。

awaitは救いの手?

Reactive Extensionsは準備さえ済んでしまえば、非同期が恐ろしく簡単に書ける、のですが、如何せん準備が決して簡単とは言いません。さて、それがC# 5.0のasync/awaitは魔法のように解決してくれるのか、というと、そんなことはありません。現在のAsync CTPではAsyncCtpLibrary.dllにAsyncCtpExtensionsというクラスが用意されていて、その中には既存クラスに対して涙ぐましいまでにAsyncで使うのに最適なようにとXxxAsync拡張メソッドが大量に定義されています。例えばWebClientのDownloadStringにはDownloadStringTaskAsyncが。少しその中身を見てみれば、上で書いたものとやっていることは同じようなもの。イベントを登録して、完了時にはイベントハンドラを解除して。

既存クラスは用意してもらったのがあるからいいけど、サービス参照のような自動生成されるクラスにたいしてはどうすれば?答えは勿論、自分で書こうね!orz。まだまだ、そんな受難な時代は続きそうです。でも、Rxは一度分かってしまえばTake(1).Prune()で済むし、T4で自動生成もそんな難しくない雰囲気なので、まあ悪くはないんじゃないでしょーか。悪いのはむしろイベントベースの非同期処理なのでは、という気がしてきた昨今。

まとめ

今月はまだまだ非同期周りの記事を書くよ!いっぱい書くよ!

そういえばで、今回のソースをBitbucketに上げました。今後も何か書くときは合わせてBitbucketに上げていきたいと思っています。

neuecc / EventToAsync / overview – Bitbucket

RSSなどのアイコンが並んでいる部分の一番右のget sourceからzipで落とせます(Bitbucketはナビゲーションが少しわかりづらいのよねえ、Mercurialベースで非常に快適なソースホスティングサービスだとは思うのですが)。また、RxのライブラリはNuGetを使って参照しています(なのでRx本体のインストールがしてなくても試せます、かわりにNuGetが必要?不必要?その辺まだ仕組みがよく分かってないのですが……)。以前はNuPackっていう名前でしたが、名前が被るとかで変更されたそうです。VS2010の拡張機能マネージャのオンラインギャラリーからもVS拡張がインストール出来るので、とりあえずお薦め。色々なライブラリが簡単にインストール出来ます。

私もlinq.js登録しちゃおうかな、かな……。まあ、Fixしなきゃいけないことがかなり溜まってるので、まずはそれやれよって話なのですががが。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive