Reactive Extensionsによる非同期クエリ

InfoQ: Future、性能、依存性の低減など多くの改善がされたAkka 1.1リリースから「- Futureは完全にモナドになった。したがってfor内包表記を利用できる。」。リスト内包表記は、ようするところLINQなわけなので、では、とりあえずC# + Reactive Extensionsで。

// 非同期に対するLINQ(map, filter, etc...)
var asyncQuery =
    from a in Observable.Start(() => 10 / 2)
    from b in Observable.Start(() => a + 1)
    from c in Observable.Start(() => a - 1)
    select b * c;

// 非同期のまま実行したいならSubscribe
var canceler = asyncQuery.Subscribe(Console.WriteLine);

// 実行をキャンセルする場合はSubscribe時の戻り値をDispose
canceler.Dispose();

// 同期的に待って値取得したいならFirst
var result = asyncQuery.First();

Rx抜きで、TaskのContinueWithで↑を書くのはカッタルイ。また、Rxでもメソッド構文でSelectManyを連鎖でも辛い。何故か、というと、aの値をcの部分で使えないから。メソッドチェーンの形だと、どうしても一つ手前の値しか持ち越せない。そこで、クエリ構文が活きます。また、LINQであるが故にクエリ構文が使えるRxの嬉しさ。

じゃあ、調子にのってFutures (Scala) — Akka DocumentationをRxで書き換えてみようかしら。同じような内容としては以前にRxを使って非同期プログラミングを簡単にという記事でTaskと比較していたのでそちらも参照を。

// directly
var f1 = Observable.Return("Hello World");

// LINQが使える
var f2 = f1.Select(x => x.Length);

// Subscribeまで実行が遅延されるのでSleepしないよ
var f3 = Observable.Defer(() =>
{
    Thread.Sleep(1000);
    return Observable.Return("Hello World");
});
var f4 = f3.Select(x => x.Length); // まだ実行されないよ
var result = f4.First(); // ここで実行

// Observableの連鎖はSelectManyで
var f5 = f1.SelectMany(x => f3);

// SelectManyはクエリ構文のfrom連打でも置き換えられる
var f6 = from a in Observable.Return(10 / 2)
         from b in Observable.Return(a + 1)
         from c in Observable.Return(a - 1)
         select b * c;

と、この辺まではいいんですが、Composing Futuresが何やってるのかよくわからないので(Scala知識ゼロですみません)、眺めながらAsync CTPでも持ち出します。

async void HomuHomu()
{
    var f1 = Observable.Return(100); // IObservable<int>
    var f2 = TaskEx.FromResult(200); // Task<int>

    var a = await f1; // 何気にIObservable<T>はawait出来る
    var b = await f2; // 当然ですがTask<T>もawait出来る
    var result = a + b; // 300

    // じゃあObservableが幾つも値持ってる場合は?
    var f3 = new[] { "homu", "mado" }.ToObservable();
    var c = await f3;
    Console.WriteLine(c); // "mado"

    // つまり、完了まで待って(OnCompleted)、最後の値が取得される

    // ところでObservable.Return = TaskEx.FromResultなわけですが
    // 以下の3つも同じと捉えていいです
    Task.Factory.StartNew(() => 100);
    TaskEx.Run(() => 200);
    Observable.Start(() => 300); // つまりfunc自体は即時実行
    
    // こちらも等しい(実行が遅延される)
    var t = new Task<int>(() => 100);
    var o = Observable.ToAsync(() => 100);
    // 実行するには
    t.Start();
    o.Invoke(); // もしくは o() ←ただのデリゲートなので

    // ToAsyncでは実行時に引数を渡すことも可能
    var o2 = Observable.ToAsync((int arg) => arg * 2);
    o2(1000).Select(x => x).Subscribe(Console.WriteLine);
}

雰囲気で何となくそうなのだろうと思いつつ、よくわからないので、適当に解釈しながら次。

// IObservable<T>はそのものがリスト状態とも言えるので、
// 複数値を持てるし、LINQなのでSelectしてAggregateも出来る
var futureSum = Observable.Range(1, 1000)
    .Select(x => x * 2)
    .Sum();
    
var sum = futureSum.First();

とりあえずこの辺で(特に言いたいことはない)。全く読めないとこういう時辛い。ActorとReactiveの関係とは、とか、見えそうな見えないような気持ち悪さが脳に渦巻いていて、勉強したいところです。F#で。

上の話とは関係なく告知

二件ほどお話を頂いたので、セッションします。まず、2011/05/21(Sat)にすまべん特別編でRxについて。内容はRx全般になるので、WP7ではなくても適用出来る話になります。

Rxの多くの機能のうち非同期に絞って、かつ、初心者向けに説明しますので、Rxって何それ食べれるのという感じでも全然大丈夫です。また、既に触っている人も、割とためになるTipsが得られるのではないかなと思いながら資料作成中。そうなるよう頑張ります。なので、是非聞きに来てください。セッション資料は、通信環境があればセッション終了後即座に上げるつもりです。なければまた後日で。Ustreamとかもあるのかな?あれば、そちらでも。

もう一件、5月23日(月)にC#ユーザー会でCode Contractsについて。

背景であるDbCなどについてはufcppさんが説明してくださるので、私はCode Contractsとして実装されていることを、Reflectorでこうリライトされるんですねー、とか見ながらデモ中心に、「一から使ってみよう」といった内容にしようと思っています。Code Contracts…… 名前だけなら聞いたことがある、いや、名前も聞いたことない何それ、ぐらいからが対象なのぜ、是非どうぞ。もう使っている、という人には物足りないかも(むしろそこは私が教えて欲しいもがもがもがもが)。

どちらも、まだ参加申し込み出来るようなので是非聞きにきてください。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive