Reactive Extensionsによる非同期クエリ
- 2011-05-17
InfoQ: Future、性能、依存性の低減など多くの改善がされたAkka 1.1リリースから「- Futureは完全にモナドになった。したがってfor内包表記を利用できる。」。リスト内包表記は、ようするところLINQなわけなので、では、とりあえずC# + Reactive Extensionsで。
// 非同期に対するLINQ(map, filter, etc...)
var asyncQuery =
from a in Observable.Start(() => 10 / 2)
from b in Observable.Start(() => a + 1)
from c in Observable.Start(() => a - 1)
select b * c;
// 非同期のまま実行したいならSubscribe
var canceler = asyncQuery.Subscribe(Console.WriteLine);
// 実行をキャンセルする場合はSubscribe時の戻り値をDispose
canceler.Dispose();
// 同期的に待って値取得したいならFirst
var result = asyncQuery.First();
Rx抜きで、TaskのContinueWithで↑を書くのはカッタルイ。また、Rxでもメソッド構文でSelectManyを連鎖でも辛い。何故か、というと、aの値をcの部分で使えないから。メソッドチェーンの形だと、どうしても一つ手前の値しか持ち越せない。そこで、クエリ構文が活きます。また、LINQであるが故にクエリ構文が使えるRxの嬉しさ。
じゃあ、調子にのってFutures (Scala) — Akka DocumentationをRxで書き換えてみようかしら。同じような内容としては以前にRxを使って非同期プログラミングを簡単にという記事でTaskと比較していたのでそちらも参照を。
// directly
var f1 = Observable.Return("Hello World");
// LINQが使える
var f2 = f1.Select(x => x.Length);
// Subscribeまで実行が遅延されるのでSleepしないよ
var f3 = Observable.Defer(() =>
{
Thread.Sleep(1000);
return Observable.Return("Hello World");
});
var f4 = f3.Select(x => x.Length); // まだ実行されないよ
var result = f4.First(); // ここで実行
// Observableの連鎖はSelectManyで
var f5 = f1.SelectMany(x => f3);
// SelectManyはクエリ構文のfrom連打でも置き換えられる
var f6 = from a in Observable.Return(10 / 2)
from b in Observable.Return(a + 1)
from c in Observable.Return(a - 1)
select b * c;
と、この辺まではいいんですが、Composing Futuresが何やってるのかよくわからないので(Scala知識ゼロですみません)、眺めながらAsync CTPでも持ち出します。
async void HomuHomu()
{
var f1 = Observable.Return(100); // IObservable<int>
var f2 = TaskEx.FromResult(200); // Task<int>
var a = await f1; // 何気にIObservable<T>はawait出来る
var b = await f2; // 当然ですがTask<T>もawait出来る
var result = a + b; // 300
// じゃあObservableが幾つも値持ってる場合は?
var f3 = new[] { "homu", "mado" }.ToObservable();
var c = await f3;
Console.WriteLine(c); // "mado"
// つまり、完了まで待って(OnCompleted)、最後の値が取得される
// ところでObservable.Return = TaskEx.FromResultなわけですが
// 以下の3つも同じと捉えていいです
Task.Factory.StartNew(() => 100);
TaskEx.Run(() => 200);
Observable.Start(() => 300); // つまりfunc自体は即時実行
// こちらも等しい(実行が遅延される)
var t = new Task<int>(() => 100);
var o = Observable.ToAsync(() => 100);
// 実行するには
t.Start();
o.Invoke(); // もしくは o() ←ただのデリゲートなので
// ToAsyncでは実行時に引数を渡すことも可能
var o2 = Observable.ToAsync((int arg) => arg * 2);
o2(1000).Select(x => x).Subscribe(Console.WriteLine);
}
雰囲気で何となくそうなのだろうと思いつつ、よくわからないので、適当に解釈しながら次。
// IObservable<T>はそのものがリスト状態とも言えるので、
// 複数値を持てるし、LINQなのでSelectしてAggregateも出来る
var futureSum = Observable.Range(1, 1000)
.Select(x => x * 2)
.Sum();
var sum = futureSum.First();
とりあえずこの辺で(特に言いたいことはない)。全く読めないとこういう時辛い。ActorとReactiveの関係とは、とか、見えそうな見えないような気持ち悪さが脳に渦巻いていて、勉強したいところです。F#で。
上の話とは関係なく告知
二件ほどお話を頂いたので、セッションします。まず、2011/05/21(Sat)にすまべん特別編でRxについて。内容はRx全般になるので、WP7ではなくても適用出来る話になります。
Rxの多くの機能のうち非同期に絞って、かつ、初心者向けに説明しますので、Rxって何それ食べれるのという感じでも全然大丈夫です。また、既に触っている人も、割とためになるTipsが得られるのではないかなと思いながら資料作成中。そうなるよう頑張ります。なので、是非聞きに来てください。セッション資料は、通信環境があればセッション終了後即座に上げるつもりです。なければまた後日で。Ustreamとかもあるのかな?あれば、そちらでも。
もう一件、5月23日(月)にC#ユーザー会でCode Contractsについて。
背景であるDbCなどについてはufcppさんが説明してくださるので、私はCode Contractsとして実装されていることを、Reflectorでこうリライトされるんですねー、とか見ながらデモ中心に、「一から使ってみよう」といった内容にしようと思っています。Code Contracts…… 名前だけなら聞いたことがある、いや、名前も聞いたことない何それ、ぐらいからが対象なのぜ、是非どうぞ。もう使っている、という人には物足りないかも(むしろそこは私が教えて欲しいもがもがもがもが)。
どちらも、まだ参加申し込み出来るようなので是非聞きにきてください。