Reactive Extensions入門 + メソッド早見解説表

Silverlight Toolkitに密かに隠された宝石"System.Reactive.dll"が発見されてから1年。Reactive FrameworkからReactive Extensionsに名前が変わりDevLabsでプロジェクトサイトが公開され、何度となく派手にAPIが消滅したり追加されたりしながら、JavaScript版まで登場して、ここまで来ました。IObservable<T>とIObserver<T>インターフェイスは.NET Framework 4に搭載されたことで、将来的なSP1での標準搭載は間違いなさそう。Windows Phone 7にはベータ版の開発キットに搭載されているように、間違いなく標準搭載されそう。

現在はAPIもかなり安定したし、Windows Phone 7の登場も迫っている。学ぶならまさに今こそベスト!そんなわけで、Rxの機能の簡単な紹介と導入コード、重要そうなエッセンス紹介、そして(ほぼ)全メソッド一行紹介をします。明日から、いや、今日からRxを使いましょう。

その前にRxとは何ぞや?ですが、Linq to EventsもしくはLinq to Asynchronus。イベントや非同期処理をLinqっぽく扱えます。

Rxの出来る事

まずReactive Extensions for .NET (Rx)からインストール。そして、System.CoreEx、System.Reactiveを参照に加え(Rxにはもう一つ、System.Interactiveが含まれていて、これはEnumerableの拡張メソッド群になります)れば準備は終了。

// Rxの出来る事その1. イベントのLinq化 
var button = new Button(); // WPFのButton
Observable.FromEvent<RoutedEventArgs>(button, "Click")
   .Subscribe(ev => Debug.WriteLine(ev.EventArgs));

// Rxの出来る事その2. 非同期のLinq化
Func<int, int> func = i => i * 100; // intを100倍する関数
Observable.FromAsyncPattern<int, int>(func.BeginInvoke, func.EndInvoke)
   .Invoke(5) // Invokeで非同期関数実行開始(Invokeは任意のタイミングで可)
   .Subscribe(i => Debug.WriteLine(i)); // 500

// Rxの出来る事その3. 時間のLinq化
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(5))
   .Subscribe(l => Debug.WriteLine(l)); // 5秒毎に発火

// Rxの出来る事その4. Pull型のPush型への変換
var source = new[] { 1, 10, 100, 1000 };
source.ToObservable()
   .Subscribe(i => Debug.WriteLine(i));

それぞれ一行でIObservable<T>に変換出来ます。あとは、LinqなのでSelectやWhereなどお馴染みのメソッドが、お馴染みなように使えます。そして最後にSubscribe。これは、まあ、foreachのようなものとでも捉えてもらえれば(今はね!)。

イベントをLinq化して何が嬉しいの?

合成出来るのが嬉しいよ!クリックしてクリックイベントが発動する、程度なら別にうまみはありません。でも、イベントは切り目をつけられないものも多数あります。例えばドラッグアンドドロップは「マウスダウン→マウスムーブ→マウスアップ」の連続的なイベント。従来は各関数の「外」で状態管理する変数を持ってやりくりしていましたが、Rxならば、スムーズにこれらを結合して一本の「ドラッグアンドドロップ」ストリームとして作り上げることが出来ます。逆に言えば、ただたんにイベントをLinq化しても嬉しいことはあまりありません。合成して初めて真価を発揮します。そのためには合成系のメソッド(SelectMany, Merge, Zip, CombineLatest, Join)を知る必要がある、のですがまだ当サイトのブログでは記事書いてません。予定は未定じゃなくて近日中には必ず紹介します……。

非同期をLinq化して何が嬉しいの?

それはもう自明で、単純にBeginInvoke/EndInvokeで待ち合わせるのは面倒くさいから。たった一行でラッピング出来る事の素晴らしさ!でも、同期的に書いてBackgroundWorkerで動かせばいいぢゃない。というのは、一面としては正しい。正しくないのは、Silverlightや、JavaScriptは非同期APIしか用意されていません。なので、クラウド時代のモダンなウェブアプリケーションでは、非同期と付き合うより道はないのです。

RxではBeginXxx/EndXxxという形で.NETの各メソッドにある非同期のパターンが簡単にラップ出来るようになっています。ジェネリクスの型として、引数と戻り値の型を指定して、あとはBeginInvokeとEndInvokeを渡すだけ。あの面倒くさい非同期処理がこんなにも簡単に!それだけで嬉しくありませんか?

Pull型をPush型に変えると何が嬉しいの?

分配出来るようになります。え?具体的には、C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsという記事で紹介しました。そもそもPullとPushって何?という場合はメソッド探訪第7回:IEnumerable vs IObservableをどうぞ。

Rxを使うのに覚えておきたい大切な3つのこと

あまり深く考えなくても使えるけれど、少しポイントを押さえると、驚くほど簡単に見えてくる。「HotとColdの概念を掴むこと」「Schedulerが実行スレッドを決定すること」「Subjectでテストする」。この3つ。まあ、後の二つは実際のとここじつけみたいなもので、本当に大事なのはHotとColdです。あまりにも大事なのだけど、それに関して書くには余白が狭すぎる。ではなくて、以前にメソッド探訪第7回:IEnumerable vs IObservableとして書いたのでそちらで。とりあえず、ColdはSubscribeしたら即座に実行される、HotはSubscribeしても何もしないでイベント待ち。ぐらいの感覚でOKです。

Scheduler

Schedulerを使うと「いつ」「どこで」実行するかを決定することが出来ます。Rx内部でのメソッド実行は大抵このSchedulerの上に乗っかっています。

// 大抵の生成メソッドはISchedulerを受けるオーバーロードを持つ
// それに対してSchedulerを渡すと、実行スレッドを決定出来る
Observable.Range(1, 10, Scheduler.CurrentThread);
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool);

基本的には引数に渡すだけ。「いつ」「どこで」ですが、「いつ」に関してはRxの各メソッドが受け持つので、基本的には「どのスレッドで」実行するかを決めることになります。なお、当然デフォルト値もあるわけですが、RangeはCurrentThreadでTimerはThreadPoolだったりと、各メソッドによって若干違ったりすることに注意(但しTimerでCurrentThreadを選ぶと完全にブロックされてTimerというかSleepになるので、挙動として当然といえば当然のこと)

生成メソッドに渡す以外に、まだ使う場所があります。

// WPFでbutton1というボタンとtextBlock1というtextBlockがあるとする
Observable.FromEvent<RoutedEventArgs>(button1, "Click")
    .ObserveOn(Scheduler.ThreadPool) // 重い処理をUIスレッドでするのを避けるためThreadPoolへ対比
    .Do(_ => Thread.Sleep(3000)) // 猛烈に重い処理をすることをシミュレート
    .ObserveOnDispatcher() // Dispatcherに戻す
    .Subscribe(_ => textBlock1.Text = "clickした"); // UIスレッドのものを触るのでThreadPool上からだと例外

UIスレッドのコントロールに他のスレッドから触れると例外が発生します。でも、重たい処理をUIスレッド上でやろうものなら、フリーズしてしまいます。なので、重たい処理は別スレッドに退避し、コントロールの部品を弄る時だけUIスレッドに戻したい。という場合に、ObserveOnを使うことで簡単に実行スレッドのコントロールが可能になります。もうDispatcher.BeginInvokeとはサヨナラ!

Subjectって何?

SubjectはIObservableでありIObserverでもあるもの。というだけではさっぱり分かりません。これは、イベントのRxネイティブ表現です。なので、C#におけるeventと対比させてみると理解しやすいはず。eventはそのクラス内部からはデリゲートとして実行出来ますが、外からだと追加/削除しか出来ませんよね?Subjectはこれを再現するために、外側へはIObservableとして登録のみ出来るようにし、内部からのみ登録されたものへ実行(OnNext/OnError/OnCompleted)して値を渡します。なお、ただキャストしただけでは、外からダウンキャストすればイベントを直接実行出来るということになってしまうので、Subjectを外に公開する時は AsObservableメソッド(IObservableでラップする)を使って隠蔽します。

どんな時に使うかというとRx専用のクラスを作るとき、もしくはObservableの拡張メソッドを作る時、に有効活用出来るはずです。もしくは、メソッドを試すときの擬似的なイベント代わりに使うと非常に便利です。

// Buttonのイベントをイメージ
var buttonA = new Subject<int>();
var buttonB = new Subject<int>();

// Zipの動きを確認してみる……
buttonA.Zip(buttonB, (a, b) => new { a, b })
   .Subscribe(a => Console.WriteLine(a));

buttonA.OnNext(1); // ボタンClickをイメージ
buttonA.OnNext(2); // Subscribeへ値が届くのはいつ?
buttonB.OnNext(10); // デバッグのステップ実行で一行ずつ確認

buttonA.OnCompleted(); // 片方が完了したら
buttonB.OnNext(3); // もう片方にClickが入ったときZipはどういう挙動する?

動きがよくわからないメソッドも、この方法で大体何とか分かります。Subjectには他に非同期実行を表現したAsyncSubjectなど、幾つか亜種があるのでそちらも見ると、Rxのメソッドの動きがよりイメージしやすくなります。例えばFromAsyncPatternは中ではAsyncSubjectを使っているので、AsyncSubjectの動き(OnCompletedの前後でHotとColdが切り替わる、OnNextはOnCompletedが来るまで配信されず、OnCompleted後に最後の値をキャッシュしてColdとして配信し続ける)を丁寧に確認することで、FromAsyncPatternの挙動の理解が簡単になります。

メソッド分類早見表

決して全部というわけではなく、幾つか飛ばしていますが簡単に各メソッドを分類して紹介。

生成系メソッド雑多分類

イベント(hot)
   FromEvent - 文字列で与える以外のやり方もありますよ

非同期系(hot/cold)
   Start - ToAsync().Invoke()の省略形
   ToAsync - 拡張メソッドとしてじゃなくそのまま使うのが型推論効いて素敵
   FromAsyncPattern - ToAsyncも結局これの省略形なだけだったりする
   ForkJoin - 非同期処理が全て完了するのを待ち合わせて結果をまとめて返す

Enumerableっぽいの系(cold)
   Range - いつもの
   Return - ようするにRepeat(element, 1)
   Repeat - 無限リピートもあるよ
   ToObservable - pull to push
   Generate - ようするにUnfold(と言われても困る?)
   Using - 無限リピートリソース管理付き

Timer系(cold)
   Timer - 実はcold
   Interval - Timer(period, period)の省略形なだけだったり
   GenerateWithTime - 引数地獄

空っぽ系(cold)
   Empty - OnCompletedだけ発動
   Throw - OnErrorだけ発動
   Never - 本当に何もしない

その他
   Defer - 生成の遅延
   Create - 自作したい場合に(戻り値はDispose時の実行関数を返す)
   CreateWithDisposable - 同じく、ただし戻り値はIDisposableを返す

こうしてみるとColdばかりで、Hotなのってイベントだけ?的だったりしますねー。では、IObservableの拡張メソッドも。

合成系
   SelectMany - Enumerableと同じ感じですが、Rxでは合成のように機能する
   Zip - 左右のイベントが揃ったらイベント発行(揃うまでQueueでキャッシュしてる)
   CombineLatest - 最新のキャッシュと結合することで毎回イベント発行
   Merge - Y字みたいなイメージで、左右の線を一本に連結
   Join(Plan/And/Then) - Joinパターンとかいう奴らしいですが、Zipの強化版みたいな
   Concat - 末尾に接続
   StartWith - 最初に接続

時間系
   Delay - 値を一定時間遅延させる、coldに使うと微妙なことになるので注意
   Sample - 一定時間毎に、通過していた最新の値だけを出す
   Throttle - 一定時間、値が通過していなければ、その時の最新の値を出す
   TimeInterval - 値と前回の時間との差分を包んだオブジェクトを返す
   RemoveTimeInterval - 包んだオブジェクトを削除して値のみに戻す
   Timestamp - 値と通過した時間で包んだオブジェクトを返す
   RemoveTimestamp - 包んだオブジェクトを削除して値のみに戻す
   Timeout - 一定時間値が来なければTimeoutExceptionを出す

Connectable系(ColdをHotに変換する、細部挙動はSubjectでイメージするといい)
   Publish - Subjectを使ったもの(引数によってはBehaviorSubject)
   Prune - AsyncSubjectを使ったもの
   Replay  - ReplaySubjectを使ったもの

Enumerableに変換系(Push to Pull、使い道わかりません)
   Next - MoveNext後に同期的にブロックして値が来るまで待機
   Latest - 値を一つキャッシュするNext(キャッシュが切れると待機)
   MostRecent - ブロックなしでキャッシュを返し続ける

例外ハンドリング系
   OnErrorResumeNext - 例外来たら握りつぶして予備シーケンスへ移す
   Catch - 対象例外が来たら握りつぶして次へ
   Finally - 例外などで止まっても最後に必ず実行するのがOnCompletedとの違い

実行スレッド変更系
   SubscribeOn - メソッド全体の実行スレッドを変える
   ObserveOn - 以降に流れる値の実行スレッドを変える

クエリ系
   Select - 射影(SelectManyはこっちじゃないのって話ももも)
   Where - フィルタリング
   Scan - Aggregateの経過も列挙するバージョン、一つ過去の値を持てるというのが重要
   Scan0 - seed含む
   GroupBy - グルーピング、なのだけどIGroupedObservableは扱いが少し面倒かなあ
   BufferWithCount - 個数分だけListにまとめる
   BufferWithTime - 一定時間内の分だけListにまとめる
   BufferWithTimeOrCount - そのまんま、上二つが合わさったの
   DistinctUntilChanged - 連続して同じ値が来た場合は流さない

すっとばす系
   Skip - N個飛ばす
   SkipWhile - 条件に引っかかる間は飛ばす
   SkipLast - 最後N個を飛ばす(Lastを除いたTakeという趣向)
   SkipUntil - 右辺のOnNextを察知する「まで」は飛ばす
   Take - N個取る
   TakeWhile - 条件に引っかかる間は取る
   TakeLast - 最後N個だけを取る
   TakeUntil - 右辺のOnNextを察知する「まで」は取る

Aggregate系
   AggregateとかAllとかSumとかEnumerableにもある色々 - 値が確定したとき一つだけ流れてくる

変換系
   ToEnumerable - 同期的にブロックしてIEnumerableに変換する、Hotだと一生戻ってこない
   ToQbservable - IQueryableのデュアルらしい、完全にイミフすぎてヤバい
   Start - ListなんだけどObservableという微妙な状態のものに変換する

その他
   Materiallize - OnNext,OnError,OnCompletedをNotificationにマテリア化
   Dematerialize - マテリア化解除
   Repeat - OnCompletedが来ると最初から繰り返し
   Let - 一時変数いらず
   Switch - SelectMany書かなくていいよ的なの
   AsObservable - IObservableにラップ、Subjectと合わせてどうぞ

疲れた。間違ってるとかこれが足りない(いやまあ、実際幾つか出してないです)とか突っ込み希望。

JavaScript版もあります

RxJSというJavaScript版のReactive Extensionsもあったりします。ダウンロードは.NET版と同じところから。何が出来るかというと、若干、というかかなりメソッドが少なくなってはいるものの、大体.NETと同じことが出来ます。SchedulerにTimeout(JavaScriptにはスレッドはないけどsetTimeoutがあるので、それ使って実行を遅らせるというもの)があったりと、相違も面白い。

JavaScriptは、まずAjaxが非同期だし、イベントだらけなのでRxが大変効果を発揮する。強力なはず、なのですが注目度はそんなに高くない。うむむ?jQueryと融合出来るバインディングも用意されていたりと、かなりイケてると思うのですがー。日本だとJSDeferredがあるね、アレの高機能だけど重い版とかとでも思ってもらえれば。

ところでObservableがあるということはEnumerableもありますか?というと、もちろんありますよ!linq.js - LINQ for JavaScriptとかいうライブラリを使えばいいらしいです!最近Twitterの英語圏でも話題沸騰(で、ちょっと浮かれすぎて頭がフワフワタイムだった)。RxJSと相互に接続できるようになっていたり、jQueryプラグインになるバージョンもあったりと、jQuery - linq.js - RxJSでJavaScriptとは思えない素敵コードが書けます。

JavaScriptはIEnumerableとIObservableが両方そなわり最強に見える。

Over the Language

Linqとは何ぞや。というと、一般にはLinq=クエリ構文=SQLみたいなの、という解釈が依然として主流のようで幾分か残念。これに対する異論は何度か唱えているけれど、では実際何をLinqと指すのだろう。公式の見解はともあれ勝手に考えると、対象をデータソースとみなし、Whereでフィルタリングし、Selectで射影するスタイルならば、それはLinqではないだろうか。

Linq to ObjectsはIEnumerableが、Linq to XmlではXElementが、Linq to SqlではExpression Treeが、Reactive ExtensionsにはIObservableの実装が必要であり、それぞれ中身は全く違う。昔はExpression Treeを弄ること、QueryProviderを実装することがLinq to Hogeの条件だと考えていたところがあったのだけど、今は、Linqの世界(共通のAPIでの操作)に乗っていれば、それはLinqなのだと思っている。

だからLinqは言語にも.NET Frameworkにも依存していない。Linqとは考え方にすぎない。例えば、Linq to Objectsはクロージャさえあればどの言語でも成り立つ(そう……JavaScriptでもね?)。むしろ重要なのは「Language INtegrated」なことであり、表面的なスタイル(SQLライクなシンタックス!)は全く重要ではない。言語に統合されていれば、異物感なく自然に扱え、IDEやデバッガなど言語に用意されているツールのサポートが得られる。(例えば……JavaScriptでガリガリと入力補完効かせてみたりね?)

言語を超えて共有される、より高い次元の抽象化としてのLinq。私はそんな世界に魅せられています。RxはLinqにおけるデータソースの概念をイベントや非同期にまで拡張(まさにExtension)して、更なる可能性を見せてくれました。次なる世界はDryad? まだまだLinqは熱い!

まとめ

ていうか改めてHaskellは偉大。でも、取っ付きやすさは大事。難しげなことを簡単なものとして甘く包んで掲示したLinqは、凄い。Rxも、取っ付きづらいFunctional Reactive Programmingを、Linqというお馴染みの土台に乗せたことで理解までの敷居を相当緩和させた。素晴らしい仕事です。

難しいことが簡単に出来る、というのがLinqのキモで、Rxも同じ。難しかったこと(イベントの合成/非同期)が簡単にできる。それが大事だし、その事をちゃんと伝えていきたいなあ。そして、Realworldでの実践例も。そのためにはアプリケーション書かなければ。アプリケーション、書きたいです……。書きます。

そういえばついでに、Rx一周年ということで、大体一年分の記事が溜まった(そしてロクに書いてないことが判明した)のと、少し前にlinq.jsのRT祭りがあった熱に浮かされて、応募するだけならタダ理論により10月期のMicrosoft MVPに応募しちゃったりなんかしました。恥ずかしぃー。分野にLinqがあれば!とか意味不明なことを思ったのですが、当然無いのでC#です、応募文句は、linq.js作った(DL数累計たった1000)と、Rx紹介書いてる、の二つだけって無理ですね明らかに。これから割と詳細に活動内容を書いて、送らなきゃいけないのですが、オール空白状態。応募したことに泣きたくなってきたよ、とほほ。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive