Reactive Extensions入門 + メソッド早見解説表

Silverlight Toolkitに密かに隠された宝石"System.Reactive.dll"が発見されてから1年。Reactive FrameworkからReactive Extensionsに名前が変わりDevLabsでプロジェクトサイトが公開され、何度となく派手にAPIが消滅したり追加されたりしながら、JavaScript版まで登場して、ここまで来ました。IObservable<T>とIObserver<T>インターフェイスは.NET Framework 4に搭載されたことで、将来的なSP1での標準搭載は間違いなさそう。Windows Phone 7にはベータ版の開発キットに搭載されているように、間違いなく標準搭載されそう。

現在はAPIもかなり安定したし、Windows Phone 7の登場も迫っている。学ぶならまさに今こそベスト!そんなわけで、Rxの機能の簡単な紹介と導入コード、重要そうなエッセンス紹介、そして(ほぼ)全メソッド一行紹介をします。明日から、いや、今日からRxを使いましょう。

その前にRxとは何ぞや?ですが、Linq to EventsもしくはLinq to Asynchronus。イベントや非同期処理をLinqっぽく扱えます。

Rxの出来る事

まずReactive Extensions for .NET (Rx)からインストール。そして、System.CoreEx、System.Reactiveを参照に加え(Rxにはもう一つ、System.Interactiveが含まれていて、これはEnumerableの拡張メソッド群になります)れば準備は終了。

// Rxの出来る事その1. イベントのLinq化 
var button = new Button(); // WPFのButton
Observable.FromEvent<RoutedEventArgs>(button, "Click")
   .Subscribe(ev => Debug.WriteLine(ev.EventArgs));

// Rxの出来る事その2. 非同期のLinq化
Func<int, int> func = i => i * 100; // intを100倍する関数
Observable.FromAsyncPattern<int, int>(func.BeginInvoke, func.EndInvoke)
   .Invoke(5) // Invokeで非同期関数実行開始(Invokeは任意のタイミングで可)
   .Subscribe(i => Debug.WriteLine(i)); // 500

// Rxの出来る事その3. 時間のLinq化
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(5))
   .Subscribe(l => Debug.WriteLine(l)); // 5秒毎に発火

// Rxの出来る事その4. Pull型のPush型への変換
var source = new[] { 1, 10, 100, 1000 };
source.ToObservable()
   .Subscribe(i => Debug.WriteLine(i));

それぞれ一行でIObservable<T>に変換出来ます。あとは、LinqなのでSelectやWhereなどお馴染みのメソッドが、お馴染みなように使えます。そして最後にSubscribe。これは、まあ、foreachのようなものとでも捉えてもらえれば(今はね!)。

イベントをLinq化して何が嬉しいの?

合成出来るのが嬉しいよ!クリックしてクリックイベントが発動する、程度なら別にうまみはありません。でも、イベントは切り目をつけられないものも多数あります。例えばドラッグアンドドロップは「マウスダウン→マウスムーブ→マウスアップ」の連続的なイベント。従来は各関数の「外」で状態管理する変数を持ってやりくりしていましたが、Rxならば、スムーズにこれらを結合して一本の「ドラッグアンドドロップ」ストリームとして作り上げることが出来ます。逆に言えば、ただたんにイベントをLinq化しても嬉しいことはあまりありません。合成して初めて真価を発揮します。そのためには合成系のメソッド(SelectMany, Merge, Zip, CombineLatest, Join)を知る必要がある、のですがまだ当サイトのブログでは記事書いてません。予定は未定じゃなくて近日中には必ず紹介します……。

非同期をLinq化して何が嬉しいの?

それはもう自明で、単純にBeginInvoke/EndInvokeで待ち合わせるのは面倒くさいから。たった一行でラッピング出来る事の素晴らしさ!でも、同期的に書いてBackgroundWorkerで動かせばいいぢゃない。というのは、一面としては正しい。正しくないのは、Silverlightや、JavaScriptは非同期APIしか用意されていません。なので、クラウド時代のモダンなウェブアプリケーションでは、非同期と付き合うより道はないのです。

RxではBeginXxx/EndXxxという形で.NETの各メソッドにある非同期のパターンが簡単にラップ出来るようになっています。ジェネリクスの型として、引数と戻り値の型を指定して、あとはBeginInvokeとEndInvokeを渡すだけ。あの面倒くさい非同期処理がこんなにも簡単に!それだけで嬉しくありませんか?

Pull型をPush型に変えると何が嬉しいの?

分配出来るようになります。え?具体的には、C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsという記事で紹介しました。そもそもPullとPushって何?という場合はメソッド探訪第7回:IEnumerable vs IObservableをどうぞ。

Rxを使うのに覚えておきたい大切な3つのこと

あまり深く考えなくても使えるけれど、少しポイントを押さえると、驚くほど簡単に見えてくる。「HotとColdの概念を掴むこと」「Schedulerが実行スレッドを決定すること」「Subjectでテストする」。この3つ。まあ、後の二つは実際のとここじつけみたいなもので、本当に大事なのはHotとColdです。あまりにも大事なのだけど、それに関して書くには余白が狭すぎる。ではなくて、以前にメソッド探訪第7回:IEnumerable vs IObservableとして書いたのでそちらで。とりあえず、ColdはSubscribeしたら即座に実行される、HotはSubscribeしても何もしないでイベント待ち。ぐらいの感覚でOKです。

Scheduler

Schedulerを使うと「いつ」「どこで」実行するかを決定することが出来ます。Rx内部でのメソッド実行は大抵このSchedulerの上に乗っかっています。

// 大抵の生成メソッドはISchedulerを受けるオーバーロードを持つ
// それに対してSchedulerを渡すと、実行スレッドを決定出来る
Observable.Range(1, 10, Scheduler.CurrentThread);
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool);

基本的には引数に渡すだけ。「いつ」「どこで」ですが、「いつ」に関してはRxの各メソッドが受け持つので、基本的には「どのスレッドで」実行するかを決めることになります。なお、当然デフォルト値もあるわけですが、RangeはCurrentThreadでTimerはThreadPoolだったりと、各メソッドによって若干違ったりすることに注意(但しTimerでCurrentThreadを選ぶと完全にブロックされてTimerというかSleepになるので、挙動として当然といえば当然のこと)

生成メソッドに渡す以外に、まだ使う場所があります。

// WPFでbutton1というボタンとtextBlock1というtextBlockがあるとする
Observable.FromEvent<RoutedEventArgs>(button1, "Click")
    .ObserveOn(Scheduler.ThreadPool) // 重い処理をUIスレッドでするのを避けるためThreadPoolへ対比
    .Do(_ => Thread.Sleep(3000)) // 猛烈に重い処理をすることをシミュレート
    .ObserveOnDispatcher() // Dispatcherに戻す
    .Subscribe(_ => textBlock1.Text = "clickした"); // UIスレッドのものを触るのでThreadPool上からだと例外

UIスレッドのコントロールに他のスレッドから触れると例外が発生します。でも、重たい処理をUIスレッド上でやろうものなら、フリーズしてしまいます。なので、重たい処理は別スレッドに退避し、コントロールの部品を弄る時だけUIスレッドに戻したい。という場合に、ObserveOnを使うことで簡単に実行スレッドのコントロールが可能になります。もうDispatcher.BeginInvokeとはサヨナラ!

Subjectって何?

SubjectはIObservableでありIObserverでもあるもの。というだけではさっぱり分かりません。これは、イベントのRxネイティブ表現です。なので、C#におけるeventと対比させてみると理解しやすいはず。eventはそのクラス内部からはデリゲートとして実行出来ますが、外からだと追加/削除しか出来ませんよね?Subjectはこれを再現するために、外側へはIObservableとして登録のみ出来るようにし、内部からのみ登録されたものへ実行(OnNext/OnError/OnCompleted)して値を渡します。なお、ただキャストしただけでは、外からダウンキャストすればイベントを直接実行出来るということになってしまうので、Subjectを外に公開する時は AsObservableメソッド(IObservableでラップする)を使って隠蔽します。

どんな時に使うかというとRx専用のクラスを作るとき、もしくはObservableの拡張メソッドを作る時、に有効活用出来るはずです。もしくは、メソッドを試すときの擬似的なイベント代わりに使うと非常に便利です。

// Buttonのイベントをイメージ
var buttonA = new Subject<int>();
var buttonB = new Subject<int>();

// Zipの動きを確認してみる……
buttonA.Zip(buttonB, (a, b) => new { a, b })
   .Subscribe(a => Console.WriteLine(a));

buttonA.OnNext(1); // ボタンClickをイメージ
buttonA.OnNext(2); // Subscribeへ値が届くのはいつ?
buttonB.OnNext(10); // デバッグのステップ実行で一行ずつ確認

buttonA.OnCompleted(); // 片方が完了したら
buttonB.OnNext(3); // もう片方にClickが入ったときZipはどういう挙動する?

動きがよくわからないメソッドも、この方法で大体何とか分かります。Subjectには他に非同期実行を表現したAsyncSubjectなど、幾つか亜種があるのでそちらも見ると、Rxのメソッドの動きがよりイメージしやすくなります。例えばFromAsyncPatternは中ではAsyncSubjectを使っているので、AsyncSubjectの動き(OnCompletedの前後でHotとColdが切り替わる、OnNextはOnCompletedが来るまで配信されず、OnCompleted後に最後の値をキャッシュしてColdとして配信し続ける)を丁寧に確認することで、FromAsyncPatternの挙動の理解が簡単になります。

メソッド分類早見表

決して全部というわけではなく、幾つか飛ばしていますが簡単に各メソッドを分類して紹介。

生成系メソッド雑多分類

イベント(hot)
   FromEvent - 文字列で与える以外のやり方もありますよ

非同期系(hot/cold)
   Start - ToAsync().Invoke()の省略形
   ToAsync - 拡張メソッドとしてじゃなくそのまま使うのが型推論効いて素敵
   FromAsyncPattern - ToAsyncも結局これの省略形なだけだったりする
   ForkJoin - 非同期処理が全て完了するのを待ち合わせて結果をまとめて返す

Enumerableっぽいの系(cold)
   Range - いつもの
   Return - ようするにRepeat(element, 1)
   Repeat - 無限リピートもあるよ
   ToObservable - pull to push
   Generate - ようするにUnfold(と言われても困る?)
   Using - 無限リピートリソース管理付き

Timer系(cold)
   Timer - 実はcold
   Interval - Timer(period, period)の省略形なだけだったり
   GenerateWithTime - 引数地獄

空っぽ系(cold)
   Empty - OnCompletedだけ発動
   Throw - OnErrorだけ発動
   Never - 本当に何もしない

その他
   Defer - 生成の遅延
   Create - 自作したい場合に(戻り値はDispose時の実行関数を返す)
   CreateWithDisposable - 同じく、ただし戻り値はIDisposableを返す

こうしてみるとColdばかりで、Hotなのってイベントだけ?的だったりしますねー。では、IObservableの拡張メソッドも。

合成系
   SelectMany - Enumerableと同じ感じですが、Rxでは合成のように機能する
   Zip - 左右のイベントが揃ったらイベント発行(揃うまでQueueでキャッシュしてる)
   CombineLatest - 最新のキャッシュと結合することで毎回イベント発行
   Merge - Y字みたいなイメージで、左右の線を一本に連結
   Join(Plan/And/Then) - Joinパターンとかいう奴らしいですが、Zipの強化版みたいな
   Concat - 末尾に接続
   StartWith - 最初に接続

時間系
   Delay - 値を一定時間遅延させる、coldに使うと微妙なことになるので注意
   Sample - 一定時間毎に、通過していた最新の値だけを出す
   Throttle - 一定時間、値が通過していなければ、その時の最新の値を出す
   TimeInterval - 値と前回の時間との差分を包んだオブジェクトを返す
   RemoveTimeInterval - 包んだオブジェクトを削除して値のみに戻す
   Timestamp - 値と通過した時間で包んだオブジェクトを返す
   RemoveTimestamp - 包んだオブジェクトを削除して値のみに戻す
   Timeout - 一定時間値が来なければTimeoutExceptionを出す

Connectable系(ColdをHotに変換する、細部挙動はSubjectでイメージするといい)
   Publish - Subjectを使ったもの(引数によってはBehaviorSubject)
   Prune - AsyncSubjectを使ったもの
   Replay  - ReplaySubjectを使ったもの

Enumerableに変換系(Push to Pull、使い道わかりません)
   Next - MoveNext後に同期的にブロックして値が来るまで待機
   Latest - 値を一つキャッシュするNext(キャッシュが切れると待機)
   MostRecent - ブロックなしでキャッシュを返し続ける

例外ハンドリング系
   OnErrorResumeNext - 例外来たら握りつぶして予備シーケンスへ移す
   Catch - 対象例外が来たら握りつぶして次へ
   Finally - 例外などで止まっても最後に必ず実行するのがOnCompletedとの違い

実行スレッド変更系
   SubscribeOn - メソッド全体の実行スレッドを変える
   ObserveOn - 以降に流れる値の実行スレッドを変える

クエリ系
   Select - 射影(SelectManyはこっちじゃないのって話ももも)
   Where - フィルタリング
   Scan - Aggregateの経過も列挙するバージョン、一つ過去の値を持てるというのが重要
   Scan0 - seed含む
   GroupBy - グルーピング、なのだけどIGroupedObservableは扱いが少し面倒かなあ
   BufferWithCount - 個数分だけListにまとめる
   BufferWithTime - 一定時間内の分だけListにまとめる
   BufferWithTimeOrCount - そのまんま、上二つが合わさったの
   DistinctUntilChanged - 連続して同じ値が来た場合は流さない

すっとばす系
   Skip - N個飛ばす
   SkipWhile - 条件に引っかかる間は飛ばす
   SkipLast - 最後N個を飛ばす(Lastを除いたTakeという趣向)
   SkipUntil - 右辺のOnNextを察知する「まで」は飛ばす
   Take - N個取る
   TakeWhile - 条件に引っかかる間は取る
   TakeLast - 最後N個だけを取る
   TakeUntil - 右辺のOnNextを察知する「まで」は取る

Aggregate系
   AggregateとかAllとかSumとかEnumerableにもある色々 - 値が確定したとき一つだけ流れてくる

変換系
   ToEnumerable - 同期的にブロックしてIEnumerableに変換する、Hotだと一生戻ってこない
   ToQbservable - IQueryableのデュアルらしい、完全にイミフすぎてヤバい
   Start - ListなんだけどObservableという微妙な状態のものに変換する

その他
   Materiallize - OnNext,OnError,OnCompletedをNotificationにマテリア化
   Dematerialize - マテリア化解除
   Repeat - OnCompletedが来ると最初から繰り返し
   Let - 一時変数いらず
   Switch - SelectMany書かなくていいよ的なの
   AsObservable - IObservableにラップ、Subjectと合わせてどうぞ

疲れた。間違ってるとかこれが足りない(いやまあ、実際幾つか出してないです)とか突っ込み希望。

JavaScript版もあります

RxJSというJavaScript版のReactive Extensionsもあったりします。ダウンロードは.NET版と同じところから。何が出来るかというと、若干、というかかなりメソッドが少なくなってはいるものの、大体.NETと同じことが出来ます。SchedulerにTimeout(JavaScriptにはスレッドはないけどsetTimeoutがあるので、それ使って実行を遅らせるというもの)があったりと、相違も面白い。

JavaScriptは、まずAjaxが非同期だし、イベントだらけなのでRxが大変効果を発揮する。強力なはず、なのですが注目度はそんなに高くない。うむむ?jQueryと融合出来るバインディングも用意されていたりと、かなりイケてると思うのですがー。日本だとJSDeferredがあるね、アレの高機能だけど重い版とかとでも思ってもらえれば。

ところでObservableがあるということはEnumerableもありますか?というと、もちろんありますよ!linq.js - LINQ for JavaScriptとかいうライブラリを使えばいいらしいです!最近Twitterの英語圏でも話題沸騰(で、ちょっと浮かれすぎて頭がフワフワタイムだった)。RxJSと相互に接続できるようになっていたり、jQueryプラグインになるバージョンもあったりと、jQuery - linq.js - RxJSでJavaScriptとは思えない素敵コードが書けます。

JavaScriptはIEnumerableとIObservableが両方そなわり最強に見える。

Over the Language

Linqとは何ぞや。というと、一般にはLinq=クエリ構文=SQLみたいなの、という解釈が依然として主流のようで幾分か残念。これに対する異論は何度か唱えているけれど、では実際何をLinqと指すのだろう。公式の見解はともあれ勝手に考えると、対象をデータソースとみなし、Whereでフィルタリングし、Selectで射影するスタイルならば、それはLinqではないだろうか。

Linq to ObjectsはIEnumerableが、Linq to XmlではXElementが、Linq to SqlではExpression Treeが、Reactive ExtensionsにはIObservableの実装が必要であり、それぞれ中身は全く違う。昔はExpression Treeを弄ること、QueryProviderを実装することがLinq to Hogeの条件だと考えていたところがあったのだけど、今は、Linqの世界(共通のAPIでの操作)に乗っていれば、それはLinqなのだと思っている。

だからLinqは言語にも.NET Frameworkにも依存していない。Linqとは考え方にすぎない。例えば、Linq to Objectsはクロージャさえあればどの言語でも成り立つ(そう……JavaScriptでもね?)。むしろ重要なのは「Language INtegrated」なことであり、表面的なスタイル(SQLライクなシンタックス!)は全く重要ではない。言語に統合されていれば、異物感なく自然に扱え、IDEやデバッガなど言語に用意されているツールのサポートが得られる。(例えば……JavaScriptでガリガリと入力補完効かせてみたりね?)

言語を超えて共有される、より高い次元の抽象化としてのLinq。私はそんな世界に魅せられています。RxはLinqにおけるデータソースの概念をイベントや非同期にまで拡張(まさにExtension)して、更なる可能性を見せてくれました。次なる世界はDryad? まだまだLinqは熱い!

まとめ

ていうか改めてHaskellは偉大。でも、取っ付きやすさは大事。難しげなことを簡単なものとして甘く包んで掲示したLinqは、凄い。Rxも、取っ付きづらいFunctional Reactive Programmingを、Linqというお馴染みの土台に乗せたことで理解までの敷居を相当緩和させた。素晴らしい仕事です。

難しいことが簡単に出来る、というのがLinqのキモで、Rxも同じ。難しかったこと(イベントの合成/非同期)が簡単にできる。それが大事だし、その事をちゃんと伝えていきたいなあ。そして、Realworldでの実践例も。そのためにはアプリケーション書かなければ。アプリケーション、書きたいです……。書きます。

そういえばついでに、Rx一周年ということで、大体一年分の記事が溜まった(そしてロクに書いてないことが判明した)のと、少し前にlinq.jsのRT祭りがあった熱に浮かされて、応募するだけならタダ理論により10月期のMicrosoft MVPに応募しちゃったりなんかしました。恥ずかしぃー。分野にLinqがあれば!とか意味不明なことを思ったのですが、当然無いのでC#です、応募文句は、linq.js作った(DL数累計たった1000)と、Rx紹介書いてる、の二つだけって無理ですね明らかに。これから割と詳細に活動内容を書いて、送らなきゃいけないのですが、オール空白状態。応募したことに泣きたくなってきたよ、とほほ。

Windows Phone 7 + Reactive ExtensionsによるXml取得

Windows Phone 7にはReactive Extensionsが標準搭載されていたりするのだよ!なんだってー!と、いうわけで、Real World Rx。じゃないですけれど、Rxを使って非同期処理をゴニョゴニョとしてみましょう。ネットワークからデータ取って何とかする、というと一昔前はRSSリーダーがサンプルの主役でしたが、最近だとTwitterリーダーなのでしょうね。というわけで、Twitterリーダーにします。といっても、ぶっちゃけただたんにデータ取ってリストボックスにバインドするだけです。そしてGUI部分はSilverlightを使用してWindows Phone 7でTwitterアプリケーションを構築 - @ITのものを丸ごと使います。手抜き!というわけで、差分としてはRxを使うか否かといったところしかありません。

なお、別に全然Windows Phone 7ならでは!なことはやらないので、WPFでもSilverlightでも同じように書けます。ちょっとしたRxのサンプルとしてどうぞ。今回は出たばかりのWindows Phone Developer Tools Betaを使います。Windows Phone用のBlendがついていたりと盛り沢山。

Xmlを読み込む

とりあえずLinq to XmlなのでXElement.Load(string uri)ですね。違います。そのオーバーロードはSilverlightでは使えないのであった。えー。なんでー。とはまあ、つまり、同期系APIの搭載はほとんどなくて、全部非同期系で操作するよう強要されているわけです。XElement.Loadは同期でネットワークからXMLを引っ張ってくる→ダウンロード時間中はUI固まる→許すまじ!ということのようで。みんな大好きBackgroundWorkerたん使えばいいぢゃない、みたいなのは通用しないそうだ。

MSDNにお聞きすれば方法 : LINQ to XML で任意の URI の場所から XML ファイルを読み込むとあります。ネットワークからデータを取ってくるときはWebClient/HttpWebRequest使えというお話。

では、とりあえず、MainPage.xamlにペタペタと書いて、MessageBox.Showで確認していくという原始人な手段を取っていきましょう。XElementの利用にはSystem.Xml.Linqの参照が別途必要です。

public MainPage()
{
    InitializeComponent();
    
    var wc = new WebClient();
    wc.OpenReadCompleted += (sender, e) =>
    {
        var elem = XElement.Load(e.Result); // e.ResultにStreamが入ってる
        MessageBox.Show(elem.ToString()); // 確認
    };
    wc.OpenReadAsync(new Uri("http://twitter.com/statuses/public_timeline.xml")); // 非同期読み込み呼び出し開始
}

別に難しいこともなくすんなりと表示されました。簡単なことが簡単に書けるって素晴らしい。で、WebClientのプロパティをマジマジと見ているとAllowReadStreamBufferingなんてものが。trueの場合はメモリにバッファリングされる。うーん、せっかくなので完全ストリーミングでやりたいなあ。これfalseならバッファリングなしってことですよね?じゃあ、バッファリング無しにしてみますか。

var wc = new WebClient();
wc.AllowReadStreamBuffering = false; // デフォはtrueでバッファリングあり、今回はfalseに変更
wc.OpenReadCompleted += (sender, e) =>
{
    try
    {
        var elem = XElement.Load(e.Result); // ここで例外出るよ!
    }
    catch (Exception ex)
    {
        // Read is not supporeted on the main thread when buffering is disabled.
        MessageBox.Show(ex.ToString());
    }
};

例外で死にました。徹底して同期的にネットワーク絡みの処理が入るのは許しません、というわけですね、なるほど。じゃあ別スレッドでやるよ、ということでとりあえずThreadPoolに突っ込んでみた。

wc.OpenReadCompleted += (sender, e) =>
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        try
        {
            var elem = XElement.Load(e.Result);
            MessageBox.Show(elem.ToString()); // 今度はここで例外!
        }
        catch(Exception ex)
        {
            // Invalid cross-thread access.
            Debug.WriteLine(ex.ToString());
        }
    });
};

読み込みは出来たけど、今度はMessageBox.Showのところで、Invalid Cross Thread Accessで死んだ。そっか、MessageBoxもUIスレッドなのか。うーむ、世の中難しいね!というわけで、とりあえずDispatcher.BeginInvokeしますか。

wc.OpenReadCompleted += (sender, e) =>
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        var elem = XElement.Load(e.Result);
        Dispatcher.BeginInvoke(() => MessageBox.Show(elem.ToString()));
    });
};

これで完全なストリームで非同期呼び出しでのXmlロードが出来たわけですね。これは面倒くさいし、Invoke系の入れ子が酷いことになってますよ、うわぁぁ。

Rxを使う

というわけで、非Rxでやると大変なのがよく分かりました。そこでRxの出番です。標準搭載されているので、参照設定を開きMicrosoft.Phone.ReactiveとSystem.Observableを加えるだけで準備完了。

var wc = new WebClient { AllowReadStreamBuffering = false };

Observable.FromEvent<OpenReadCompletedEventArgs>(wc, "OpenReadCompleted")
    .ObserveOn(Scheduler.ThreadPool) // ThreadPoolで動かすようにする
    .Select(e => XElement.Load(e.EventArgs.Result))
    .ObserveOnDispatcher() // UIスレッドに戻す
    .Subscribe(x => MessageBox.Show(x.ToString()));

wc.OpenReadAsync(new Uri("http://twitter.com/statuses/public_timeline.xml"));

非常にすっきり。Rxについて説明は、必要か否か若干悩むところですが説明しますと、イベントをLinq化します。今回はOpenReadCompletedイベントをLinqにしました。Linq化すると何が嬉しいって、ネストがなくなることです。非常に見やすい。更にRxの豊富なメソッド群を使えば普通ではやりにくいことがいとも簡単に出来ます。今回はObserveOnを使って、どのスレッドで実行するかを設定しました。驚くほど簡単に、分かりやすく。メソッドの流れそのままです。

FromAsyncPattern

WebClientだけじゃなく、ついでなのでHttpWebRequestでもやってみましょう。(HttpWebRequest)WebRequest.Create()死ね、といつも言ってる私ですが、SilverlightにはWebRequest.CreateHttpでHttpWebRequestが作れるじゃありませんか。何ともホッコリとします。微妙にこの辺、破綻した気がしますがむしろ見なかったことにしよう。

var req = WebRequest.CreateHttp("http://twitter.com/statuses/public_timeline.xml");
req.AllowReadStreamBuffering = false;
req.BeginGetResponse(ar =>
{
    using (var res = req.EndGetResponse(ar))
    using (var stream = res.GetResponseStream())
    {
        var x = XElement.Load(res.GetResponseStream());
        Dispatcher.BeginInvoke(() => MessageBox.Show(x.ToString()));
    }
}, null);

非同期しかないのでBeginXxx-EndXxxを使うのですが、まあ、結構面倒くさい。そこで、ここでもまたRxの出番。BeginXxx-EndXxx、つまりAPM(Asynchronus Programming Model:非同期プログラミングモデル)の形式の非同期メソッドをラップするFromAsyncPatternが使えます。

var req = HttpWebRequest.CreateHttp("http://twitter.com/statuses/public_timeline.xml");
req.AllowReadStreamBuffering = false;

Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
    .Invoke() // 非同期実行開始(Invoke()じゃなくて()でもOKです、ただのDelegateなので)
    .Select(res => XElement.Load(res.GetResponseStream()))
    .ObserveOnDispatcher()
    .Subscribe(x => MessageBox.Show(x.ToString()));

ラップは簡単で型として戻り値を指定してBeginXxxとEndXxxを渡すだけ。あとはそのまま流れるように書けてしまいます。普通だと面倒くさいはずのHttpWebRequestのほうがWebClientよりも素直に書けてしまう不思議!FromAsyncPatter、恐ろしい子。WebClient+FromEventは先にイベントを設定してURLで発動でしたが、こちらはURLを指定してから実行開始という、より「同期的」と同じように書ける感じがあって好き。WebClient使うのやめて、みんなHttpWebRequest使おうぜ!(ふつーのアプリのほうでは逆のこと言ってるのですががが)

ところで、非同期処理の実行開始タイミングはInvokeした瞬間であって、Subscribeした時ではありません。どーなってるかというと、ぶっちゃけRxは実行結果をキャッシュしてます。細かい話はまた後日ちゃんと紹介するときにでも。

バインドする

GUIはScottGu氏のサンプルを丸々頂いてしまいます。リロードボタンを押したらPublicTLを呼ぶだけ、みたいなのに簡略化してしまいました。

<Grid x:Name="LayoutRoot" Background="Transparent">
    <Grid.RowDefinitions>
        <RowDefinition Height="Auto"/>
        <RowDefinition Height="*"/>
    </Grid.RowDefinitions>

    <Button Grid.Row="0" Height="72" Width="200" Content="Reload" Name="Reload"></Button>
    <ListBox Grid.Row="1" Name="TweetList" DataContext="{Binding}">
        <ListBox.ItemTemplate>
            <DataTemplate>
                <StackPanel Orientation="Horizontal">
                    <Image Source="{Binding Image}" Height="73" Width="73" VerticalAlignment="Top" />
                    <StackPanel Width="350">
                        <TextBlock Text="{Binding Name}" Foreground="Red" />
                        <TextBlock Text="{Binding Text}" TextWrapping="Wrap" />
                    </StackPanel>
                </StackPanel>
            </DataTemplate>
        </ListBox.ItemTemplate>
    </ListBox>
</Grid>

あとは、ボタンへのイベント設定と、Twitterのクラスを作る必要があります。

public class TwitterStatus
{
    public long Id { get; set; }
    public string Text { get; set; }
    public string Name { get; set; }
    public string Image { get; set; }

    public TwitterStatus(XElement element)
    {
        Id = (long)element.Element("id");
        Text = (string)element.Element("text");
        Name = (string)element.Element("user").Element("screen_name");
        Image = (string)element.Element("user").Element("profile_image_url");
    }
}

public partial class MainPage : PhoneApplicationPage
{
    public MainPage()
    {
        InitializeComponent();
        Reload.Click += new RoutedEventHandler(Reload_Click); // XAMLに書いてもいいんですけど。
    }

    void Reload_Click(object sender, RoutedEventArgs e)
    {
        var req = HttpWebRequest.CreateHttp("http://twitter.com/statuses/public_timeline.xml");
        req.AllowReadStreamBuffering = false;

        Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
            .Invoke()
            .Select(res => XElement.Load(res.GetResponseStream()))
            .Select(x => x.Descendants("status").Select(xe => new TwitterStatus(xe)))
            .ObserveOnDispatcher()
            .Subscribe(ts => TweetList.ItemsSource = ts);
    }
}

実行するとこんな具合に表示されます。簡単ですねー。ただ、これだとリロードで20件しか表示されないので、リロードしたら継ぎ足されるように変更しましょう。

イベントを合成する

継ぎ足しの改善、のついでに、一定時間毎に更新も加えよう。基本は一定時間毎に更新だけど、リロードボタンしたら任意のタイミングでリロード。きっとよくあるパターン。Reload.Click+=でハンドラ足すのはやめて、その部分もFromEventでObservable化してしまいましょう。そして一定時間毎のイベント発動はObservable.Timerで。

// 30秒毎もしくはリロードボタンクリックでPublicTimeLineを更新
Observable.Merge(
        Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(30), Scheduler.NewThread).Select(_ => (object)_),
        Observable.FromEvent<RoutedEventArgs>(Reload, "Click").Select(_ => (object)_))
    .SelectMany(_ =>
    {
        var req = HttpWebRequest.CreateHttp("http://twitter.com/statuses/public_timeline.xml");
        req.AllowReadStreamBuffering = false;
        return Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)();
    })
    .Select(res => XStreamingReader.Load(res.GetResponseStream()))
    .SelectMany(x => x
        .Descendants("status")
        .Select(xe => new TwitterStatus(xe))
        .Reverse()) // 古い順にする
    .Scan((before, current) => before.Id > current.Id ? before : current) // 最後に通した記事よりも古ければ通さない(で、同じ記事を返す)
    .DistinctUntilChanged(t => t.Id) // 同じ記事が連続して来た場合は何もしないでreturn
    .ObserveOnDispatcher()
    .Subscribe(t => TweetList.Items.Insert(0, t)); // Insertだって...

流れるようにメソッド足しまくるの楽しい!楽しすぎて色々足しすぎて悪ノリしている感が否めません、とほほ。解説しますと、まず一行目のMerge。これは複数本のイベントを一本に統一します。統一するためには型が同じでなければならないのですが、今回はTimer(long)と、Click(RoutedEventArgs)なのでそのままでは合成出来ません。どちらも発火タイミングが必要なだけでlongもRoutedEventArgsも不必要なため、Objectにキャストしてやって合流させました。

こういう場合、Linq to Objectsなら.Cast<object>()なんですよね。Castないんですか?というと、一応あるにはあるんですが、実質無いようなもので。というわけで、今のところキャストしたければ.Select(=>(object))を使うしかありません。多分。もっとマシなやり方がある場合は教えてください。

続いてSelectMany。TimerもしくはClickは発火のタイミングだけで、後ろに流すのはFromAsyncPatternのデータ。こういった、最初のイベントは発火タイミングにだけ使って、実際に流すものは他のイベントに摩り替える(例えばマウスクリックで発動させて、あとはマウスムーブを使うとか)というのは定型文に近い感じでよく使うことになるんじゃないかと思います。SelectMany大事。

XMLの読み込み部は、せっかくなので、こないだ作ったバッファに貯めこむことなくXmlを読み込めるXStreamingReaderを使います。こんな風に、XMLを読み取ってクラスに変換する程度ならXElement.Loadで丸々全体のツリーを作るのも勿体無い。XStreamingReaderなら完全ストリーミングでクラスに変換出来ますよー。という実例。

その下は更にもう一個SelectMany。こっちはLinq to Objectsのものと同じ意味で、IEnumerableを平たくしています。で、ScanしたDistinctUntilChangedして(解説が面倒になってきた)先頭にInsert(ちょっとダサい)。これで古いものから上に足される = 新しい順番に表示される、という形になりました。XAML側のListBoxを直に触ってInsertとか、明らかにダサい感じなのですが、まあ今回はただのサンプルなので見逃してください。

RxのMergeに関しては、後日他のイベント合流系メソッド(CombineLatest, Zip, And/Then/Plan/Join)と一緒に紹介したいと思っています。合流系大事。

まとめ

驚くほどSilverlightで開発簡単。っぽいような印象。C#書ける人ならすぐにとっかかれますねー。素晴らしい開発環境だと思います。そして私は同時に、Silverlight全然分かってないや、という現実を改めて突きつけられて参ってます。XAMLあんま書けない。Blend使えない。MVVM分からない。モバイル開発云々の前に、基本的な技量が全然欠けているということが良く分かったし、それはそれで良い収穫でした。この秋なのか冬なのかの発売までには、ある程度は技術を身につけておきたいところです。

そしてそれよりなにより開発機欲すぃです。エミュレータの起動も速いし悪くないのですが、やっぱ実機ですよ、実機!配ってくれぇー。

XStreamingReader - Linq to Xml形式でのストリーミングXml読み込み

CodePlex : XStreamingReader - Streaming for Linq to Xml

1クラスライブラリシリーズ。もしくはストリーミングをIEnumerableに変換していこうシリーズ。またはシンプルだけど小粒でピリッと隙間にぴったりはまるシリーズ(を、目指したい)。といったわけで、100行程度ではあるのですが、表題の機能を持つコードをCodePlexに公開しました。それとおまけとして、XMLファイルからC#クラス自動生成T4 Templateも同梱。

Linq to Xml風にXmlを読み込めるけれど、ツリーを構築せずストリームで、完全遅延評価で実行します。Linq to Xmlには、書き込み用にXStreamingElementというものがあるため、それと対比した読み込み用という位置付けのつもりです。メモリの厳しいモバイル機器や、巨大なXMLを読み込む際に使えるんじゃないかと思っています。

利用例

ぶっちゃけまるっきりXElementと同じです。例としてYahoo!天気情報のRSSから京都と東京を取り出し。

// XElement
var kyoto = XElement.Load(@"http://rss.weather.yahoo.co.jp/rss/days/6100.xml")
    .Descendants("item")
    .Select(x => new
    {
        Title = (string)x.Element("title"),
        Description = (string)x.Element("description"),
        PubDate = (DateTime)x.Element("pubDate")
    })
    .Where(a => !a.Title.StartsWith("[PR]")) // itemが広告の場合は除外
    .ToArray();

// XStreamingReader
var tokyo = XStreamingReader.Load(@"http://rss.weather.yahoo.co.jp/rss/days/4410.xml")
    .Descendants("item")
    .Select(x => new
    {
        Title = (string)x.Element("title"),
        Description = (string)x.Element("description"),
        PubDate = (DateTime)x.Element("pubDate")
    })
    .Where(a => !a.Title.StartsWith("[PR]")) // itemが広告の場合は除外
    .ToArray();

Load/Parseで生成し、ElementsやDescendantsで抽出。あとは、IEnumerable<XElement>となっているので、SelectしたりWhereしたり。完全にLinq to Xmlと同じAPIです。同じすぎてこれだけだと利点がさっぱり見えませんが、100%遅延評価+ストリーミング読み込みで逐次生成という違いがあります。詳しくは次のセクションで。

バックグラウンド

Androidでは性能のためにDOMじゃなくてSAXでXML扱うんだ。という話を良く聞いて、確かにただデータ取るためだけにDOM構築ってのは嫌だし、そりゃ避けたい。対象がDOMなら素直にそう思いますが、しかし、もしそれがLinq to Xmlならどうだろう?Windows Phone 7だったらLinq to Xml使うに決まってるよ、と言いたいのですが、これってDOMと同じく、すぐに(LoadなりParseなりした直後)ツリーを構築しています。Elements()なりDescendants()なりの戻り値がIEnumerableなため、遅延評価かと思ってしまうわけですが、遅延評価されるのはツリーの探索が、というだけであって、構築自体は即時でされています。

DOMに比べて軽量(という謳い文句)であることと、非常に軽々と書けるため抵抗感がないわけですが、考えてみれば Load.Descendants.Select みたいな、API叩いて何らかのクラスなり匿名型なりに変換するという程度の、しかしよくある定型作業は、わざわざツリー作る必要はなくストリーミングで取れるし、それならばストリーミングで取るべきではある。しかし、今時XmlReaderを直で触るなんて、時代への逆行のようなことはやりたくない。

ストリームはIEnumerableに変換するのがLinq以降のC#の常識。というのを日々連呼しているので、今回はXmlReaderをIEnumerable<T>に変換しなければなりません。しかし、困ったのが、<T>のTを何にすればいいのか、ということ。ファイル読み込みなら一行のString。データベースなら、IDataRecord(DbExecutorというライブラリとしてCodePlexに公開しています)を用いましたが、XmlReaderだと適当なのが見当たらない。XmlReaderを直接渡すのは危なっかしいし、そもそも渡したところで面倒くさいことにかわりなくてダメだ。何か適切なコンテナが……。

と、考えたり考えなかったりで、Twitterでもにょもにょと言っていたら

@neuecc Linq to Xml を使うにしても XmlReaderからReadSubtreeで切り出した断片に対してかなー、XML全体をオンメモリさせる必然性がなければStreamから読んで処理した端からGCに捨てて貰えるようにしておきたいだけだけど
http://twitter.com/kazuk/status/18193188205

うぉ!うぉぉぉぉぉ!なるほど、断片をXElementに変換してそれを渡せば、操作しやすいし感覚的にもXElement.Loadなどと変わらないしでベストだ!言われてみればそりゃそうだよねー、ですが全然頭になかった、まさにコロンブスの卵。こういうことがサラッと出てくることこそが、凄さだよね。

と、感嘆しつつ、それそのまま頂き、というわけで、TをXElementにするという形で解決しました。

public IEnumerable<XElement> Descendants(XName name)
{
    using (var reader = readerFactory())
    {
        while (reader.ReadToFollowing(name.LocalName, name.NamespaceName))
        {
            yield return XElement.Load(reader.ReadSubtree());
        }
    }
}

Descendantsの実装はこんな感じで、断片から都度XElement生成しているという、それだけの単純明快な代物です。そのため挙動はXElement.Load.Descendantsと完全同一というわけじゃありません。例えばサブツリー中に同名の要素がある場合、XElementでDescendantsの場合はサブツリー中の要素も列挙しますが、XStreamingReaderではトップ要素のものだけが拾われます。

他に注意点としては、それぞれのXElementは完全に独立しているため、ParentやNextNodeなどは全てnullです。よってAncestorsで先祖と比較しながらの複雑な抽出、などといったことも出来ません。TwitterのAPIのような、ウェブサービスとして用意されているXMLなら素直な構造なので問題はありませんが、SGMLReaderでLinq to HTMLなどといった場合は、結構複雑なクエリで抽出することになるため使えないでしょう。その場合は素直にXElement.Loadを使うのが良いと思います。

おまけ(Xml→自動クラス生成)

Xmlから人力でClass作るのって定型作業で面倒だよねー。ということで、自動生成するT4 Templateも同梱しました。プロパティ定義だけではなく(ちゃんとPascalCaseに直します)、コンストラクタにXElementを投げるとマッピングもしてくれます。つまりは、XStreamingReaderの仕様に合わせたものです。

.ttの上の方にある3つの項目を適当に書き換えると

string XmlString = new WebClient().DownloadString("http://twitter.com/statuses/public_timeline.xml");
const string DescendantsName = "status"; // select class root
const string Namespace = "Twitter"; // namespace
namespace Twitter
{
    public class Status
    {
        public string CreatedAt { get; set; }
        public string Id { get; set; }
        // snip...
        public User User { get; set; }
        public string Geo { get; set; }

        public Status(XElement element)
        {
            this.CreatedAt = (string)element.Element("created_at");
            this.Id = (string)element.Element("id");
            this.User = new User(element.Element("user"));
            this.Geo = (string)element.Element("geo");
        }
    }
    
    public class User
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public string ScreenName { get; set; }
        // snip...
        public string FollowRequestSent { get; set; }

        public User(XElement element)
        {
            this.Id = (string)element.Element("id");
            this.Name = (string)element.Element("name");
            this.ScreenName = (string)element.Element("screen_name");
            this.FollowRequestSent = (string)element.Element("follow_request_sent");
        }
    }
}

こんなのが生成されます。型は全部stringになるので、手動で直してください。半自動生成。T4で生成→新しいクラスファイル作って生成結果をコピペ→型を直す。みたいな使い方をイメージしています。完全自動生成じゃないと変更に対する自動追随ってのが出来ないので、自動生成する意味が半減。しかし、型かあ、スキーマないと無理ですな。まあ、ウェブサービスのAPIなどは基本的には固定で変化がないでしょうから、ある程度は手間を省けるんじゃないかと思われます。

まとめ

断片とはいえ、XElement作るのは無駄じゃないの?というと、無駄ではあります。抽出したらすぐ用済みでポイなわけなので、純粋にパフォーマンスの観点から言えばXmlReaderを直で触ったほうが良いに決まっています。しかし、さすがにそこまで来ると無視して良いと思うわけです。例えばLinqで一時的な匿名型は使わないって?ああ、むしろLinqなんてやめて全部forループにでもします?言いだいたらキリがない。

今回で大事なのは、ストリーミング化しても、決して使いやすさは損なわれていないということです。ツリー構築型と全く同じように快適に書ける。それが何より大切。「性能のために書きやすさが犠牲になるぐらいなら性能なんていらない!」と、現実は言えなくても心では言ってしまいます。ユーザー視点だと逆ですが……。ただ、中長期的には、スパゲティコードは開発者を幸せにしない→機能追加速度低下/洗練が鈍る→ユーザーも不幸せになる、のループが回るので綺麗さは重要。勿論、そこが性能上本当にボトルネックになっているならば気合入れて叩く必要がありますが、気分的に、もしくはマイクロベンチマーク的にちょっと性能Downな程度でパフォーマンスチューニングとか言い出すのならシバいてよし。

といったわけかで、私なりにWindows Phone 7プログラミングへの準備を進めています。これで、準備になってる?……だと?ご冗談を。ですね、はい、すみません。開発キットのベータ版が出たので、次回はWindows Phone 7で何か作ろう紹介でも書く予定は未定。

IEnumerableのCastを抹殺するためのT4 Templateの使い方

.NET Framework 1.0の負の遺産、HogeCollection。それらの大半はIEnumerable<T>を実装していない、つまるところ一々Cast<T>しなければLinqが使えない。ほんとどうしょうもない。大抵のHogeCollectionは実質Obsoleteみたいなもので、滅多に使わないのだけれど、ただ一つ、RegexのMatchCollectionだけは頻繁に使うわけで、Castにイラつかされるので殺害したい。RegexにはMatchCollection、GroupCollection、CaptureCollectionという恐怖の連鎖が待っているので余計に殺したい。(ところで全く本題とは関係ないのですが、Captureは今ひとつ使い道がわからな……)

// わざとらしい例ですが
var q = Regex.Matches("aag0 hag5 zag2", @"(.)ag(\d)")
    .Cast<Match>()
    .SelectMany(m => m.Groups.Cast<Group>().Skip(1).Select(g => g.Value))
    .ToArray(); // a0h5z2

おお、何というCast地獄!つーか.NET 4でBCL書き直したとか言うんなら、その辺も少し融通聞かせてIEnumerable<T>にしてくれてもさー。あ、要望出さないのが悪いとかなのでしょうか……。それなら自己責任ですね、ちゃんと出していかないと。なのはともかく、自己責任ならば自己責任なりに、文句だけ言っててもしょうがないので自前で何とかしましょう。

ようするに.Cast<Hoge>()を自動で挟めばいいわけですよね。んー、ぴこーん!T4でジェネレートすればいいんじゃね?というわけで、T4 Templateを使ってみました。実際のところT4試してみたかったんだけどネタがなかったので、ネタが出てきて万歳!が本音だったりはします。

何もないところからテンプレートじゃあ作りようもないので、ひとまず完成系を書いてみる。

public static class MatchCollectionExtensions
{
    public static IEnumerable<TResult> Select<TResult>(this MatchCollection source, Func<Match, TResult> selector)
    {
        return source.Cast<Match>().Select(selector);
    }
    
    // Where, Aggregate, ....
}

こんな形。グッとイメージしやすくなります。型引数のTSourceを消して、Castを挟んで……。やるべき事が大体見えてきました。まずは、Enumerableの拡張メソッドの抽出を。

var extMethods = typeof(Enumerable)
    .GetMethods()
    .Where(mi => Attribute.IsDefined(mi, typeof(ExtensionAttribute)));

特にBindingFlagsは設定しませんが、ExtensionAttributeが指定されているものがあれば拡張メソッド、という判定で問題なく取り出すことが出来ます。続いて戻り値を抽出。

var returnType = extMethods
    .Select(mi => mi.ReturnType)
    .Select(mi => Regex.Replace(mi.Name, "`.*$", "")
        + (mi.IsGenericType ? ("<" + string.Join(", ", mi.GetGenericArguments().Select(t => t.Name)) + ">") : ""));

IEnumerable<T>のNameはIEnumerable1になっているので1を正規表現で削除。そして引数を並べる。ただまあ、これだけだとジェネリック引数がネストしたものに対応出来ていなかったりTSourceが除去できてなかったりダメなのですが、それはそれ(最終的なコードは下記の実例のほうを見てください)。

といったわけで、相変わらずリフレクション+Linqは鉄板ですね。というかLinqなしのリフレクションとかやりたくない……。こんな感じにポチポチと素材集めをしたら、T4化します。

<#@ template language="C#" #>
<#@ output extension="cs" #>
<#@ assembly Name="System.Core.dll" #>
<#@ import namespace="System" #>
<#@ import namespace="System.Collections.Generic" #>
<#@ import namespace="System.Linq" #>
<#@ import namespace="System.Text" #>
<#@ import namespace="System.Runtime.CompilerServices" #>
<#@ import namespace="System.Text.RegularExpressions" #>
<#@ import namespace="System.Reflection" #>
<#
    var target = new Dictionary<string, string>
    {
        {"MatchCollection", "Match"},
        {"GroupCollection", "Group"},
        {"CaptureCollection", "Capture"}
    };
#>
<#
    var ignoreMethods = new HashSet<string>
    {
        "Max", "Min", "Average", "Sum", "Zip", "OfType", "Cast",
        "Join", "GroupJoin", "ThenBy", "ThenByDescending", "LongCount"
    };
#>
using System;
using System.Collections.Generic;
using System.Linq;

namespace System.Text.RegularExpressions
{
<#
foreach (var kvp in target)
{
#>
    public static class <#= kvp.Key.Replace(".","") #>Extensions
    {
<#
foreach (var methodInfo in typeof(Enumerable).GetMethods().Where(mi => Attribute.IsDefined(mi, typeof(ExtensionAttribute))))
{
    if(ignoreMethods.Contains(methodInfo.Name)) continue;
#>
        public static <#= MakeReturnType(methodInfo, kvp.Value) #> <#= methodInfo.Name #><#= MakeGenericArguments(methodInfo) #>(this <#= kvp.Key #> source<#= MakeParameters(methodInfo, kvp.Value) #>)
        {
            return source.Cast<<#= kvp.Value #>>().<#= MakeMethodBody(methodInfo) #>;
        }

<#}#>
    }
<#}#>
}
<#+
    const string TSource = "TSource";

    static string ConstructTypeString(Type type, string castType)
    {
        var result = type.Name.Contains(TSource)
            ? type.Name.Replace(TSource, castType)
            : Regex.Replace(type.Name, "`.*$", "");
        
        if (type.IsGenericType)
        {
            result += string.Format("<{0}>", string.Join(", ", type.GetGenericArguments().Select(t => ConstructTypeString(t, castType))));
        }
        return result;
    }
    
    static string MakeReturnType(MethodInfo info, string castType)
    {
        return ConstructTypeString(info.ReturnType, castType);
    }
    
    static string MakeGenericArguments(MethodInfo info)
    {
        var types = info.GetGenericArguments().Select(t => t.Name).Where(s => s != TSource);
        return types.Any() ? string.Format("<{0}>", string.Join(", ", types)) : "";
    }
    
    static string MakeParameters(MethodInfo info, string castType)
    {
        var param = info.GetParameters()
            .Skip(1)
            .Select(pi => new { pi.Name, ParameterType = ConstructTypeString(pi.ParameterType, castType) });
        
        return param.Any()
            ? ", " + string.Join(", ", param.Select(a => a.ParameterType + " " + a.Name))
            : "";
    }
    
    static string MakeMethodBody(MethodInfo info)
    {
        var args = info.GetParameters().Skip(1).Select(pi => pi.Name);
        return string.Format("{0}({1})", info.Name, args.Any() ? string.Join(", ", args) : "");
    }
#>

上のほうの、ディクショナリ(target)の初期化子を弄ることで対象の型を増減できます。namespaceはテンプレートに埋め込みなので変える場合は適当に変えてください。ハッシュセット(ignoreMethods)はその名の通り、除外したい拡張メソッドを指定します。今回はMax,Minなどと、Zip,Join,GroupJoin(これらは若干弄らないと対応出来ないので見送り)を除外しています。あとLongCountも外してます、理由はRxのSystem.InteractiveがLongCountで競合するから(多分、Rxチームのミスだと思うのでそのうち直ると思います)。

どんなクラスにも対応出来る(はず)ので、もしキャストが必要なウザいHogeCollectionがあったら、このテンプレートを使ってみると良いかもしれません。WinFormsのControl.ControlCollectionとかWPFのUIElementCollectionとか(そういうのは、元よりごった煮で詰め込むの前提なので、UIElementでSelect出来ても嬉しくはないかなー)。ともあれ、利用はご自由にどうぞ。

こんな感じに、MatchCollection, GroupCollection, CaptureCollectionだと合計1100行ぐらいのコードが生成されます。これで、CastいらずにLinqが書けるようになりました。メデタシメデタシ。

T4 Template

T4 Templateはかなり良いですね。VisualStudioと密接に動作して、生成出来ないようならエラーですぐ知らせてくれるのが嬉しい。これ大事。超大事。それがないと書けません。C#もそうだけれど、とりあえず書く→コンパイラエラー→直す、をリアルタイムで繰り返せるのは素晴らしい。現代のプログラミング環境はこうでないと、な良さに溢れてます。アドインを入れれば入力補完やシンタックスハイライトも付いてくるので非常に快適。

T4 Templateは標準搭載の機能だし実に強力なので、積極的に使っていきたいものです。MSDNだとコード生成とテキスト テンプレート辺りかな。例によって、読んでもさっぱり意味がわかりません(笑) 今のところオフィシャルだとこんなドキュメントしかないのかなあ、少し厳しめ。いやまあ、T4自体は構文がシンプルなので、ただ書くだけならサンプル改変で何とかなる、というか、私もサンプル改変以上の機能は知らないのですががが。

Rx(Reactive Extensions)を自前簡易再実装する

という表題でUstreamやりました。Reactive Extensions for .NET (Rx)のSelect, Where, ToObservable, FromEventを実装することで、挙動を知ろうという企画。結果?酷いものです!

Shift+Alt+F10はお友達。それにしたってぐだぐだ。想像以上に頭が真っ白。セッションやライブコーディングしてる人は凄いね、と実感する。プレゼンどころか人と話すのも苦手です、な私には敷居が高かった。とにかく説明ができない。デバッガで動かせば分かりやすいよねー、なんてやる前は思ってたんですが、人がデバッガ動かしてるの見ててもさっぱり分かりやすくないよ!ということに途中で気づいて青ざめる。

まあ、こういうのも経験積まないとダメよね、と考えると、リスクゼロ(見てくれた人には申し訳ないですが)で練習出来るので、これからもネタがあればやっていきたいとは思います。反省は活かして。ネタはあまりないのでリクエストあればお願いします。Ustreamの高画質配信については、去年に書いた高画質配信するためのまとめ記事が自分で役に立ったぜ、経験が活きたな、的な。私自身の環境はちょっと、というかかなり変わったのですが、配信の基本的部分に関しては今も昔も(といっても1年前か)変わってなかったね。

さて、そんなUstreamはともかくとして、Rxの基本的な拡張メソッド「Select, Where」と、基本的な生成メソッド「ToObservable, FromEvent」を自前で実装してみる/デバッガで追ってみましょう。自分の手で動かして追うと理解しやすくなります。なので、以下に出すソースはコピペでもいいので、実際にVisualStudio上で動かしてもらえればと思います。

IEnumerableで考える

IObservableの拡張メソッド実装、の前に復習を兼ねてIEnumerableの拡張メソッドを実装してみましょう。

public static IEnumerable<TR> Select<T, TR>(IEnumerable<T> source, Func<T, TR> selector)
{
    foreach (var item in source)
    {
        yield return selector(item);
    }
}

恐ろしく簡単です。こんなにも簡単に書けるのは、yield returnのお陰。裏では、コンパイラが自動で対応するIEnumerable, IEnumeratorを生成してくれます。もしこれを教科書通りに自前で書くとしたら

public static IEnumerable<TR> Select<T, TR>(IEnumerable<T> source, Func<T, TR> selector)
{
    return new SelectEnumerable<TR>(); // 本当は引数も必要ですが省略
}

class SelectEnumerable<T> : IEnumerable<T>
{
    public IEnumerator<T> GetEnumerator()
    {
        return new SelectEnumerator<T>();
    }
    // 以下略
    // IEnumerator IEnumerable.GetEnumerator()
}

class SelectEnumerator<T> : IEnumerator<T>
{
    // Current, Dispose, MoveNextが必要ですが略
}

ああ、長い。やってられない。こんなものがオブジェクト指向だなどと言うならば、クソったれだと唾を吐きたくなる。そこで、AnonymousHogeパターンを用いれば……

public static IEnumerable<TR> Select<T, TR>(this IEnumerable<T> source, Func<T, TR> selector)
{
    return new AnonymousEnumerable<TR>(() =>
    {
        var enumerator = source.GetEnumerator();
        return new AnonymousEnumerator<TR>(
            () => enumerator.MoveNext(),
            () => selector(enumerator.Current),
            () => enumerator.Dispose()
        );
    });
}

驚くほどスッキリ。デザインパターンの本はC#でラムダ式全開でやり直すと、考え方はともかく、コードは全然違った内容になるんじゃないかなあ、とか思いつつ。この突然出てきたAnonymousEnumerableに関しては.NET Reactive Framework メソッド探訪第二回:AnonymousEnumerableを参照にどうぞ。去年の9月ですか……。AnonymousObservableも紹介する、といって10ヶ月後にようやく果たせている辺りが、やるやる詐欺すぎて本当にごめんなさい。

簡単に説明すれば、コンストラクタにラムダ式で各メソッドの本体を与えてあげることで、その場でクラスを作ることが出来るという代物です。クロージャによる変数キャプチャにより、引数を渡し回す必要もないため非常にすっきり書く事ができます。

これってようするにJavaの無名クラスでしょ?と言うと、その通り。おお、Java、大勝利。なんてこたぁーない。大は小を兼ねない、むしろこれは、小は大を兼ねる事の証明。

AnonymousObservable

IObservableはIEnumerableのようなコンパイラサポートはないので、自前で書かなければなりません。が、普通に書くと面倒なので、AnonymousObservableを使って書くことにしましょう。

public class AnonymousObservable<T> : IObservable<T>
{
    Func<IObserver<T>, IDisposable> subscribe;

    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        this.subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return subscribe(observer);
    }
}

public class AnonymousObserver<T> : IObserver<T>
{
    Action<T> onNext;
    Action<Exception> onError;
    Action onCompleted;

    public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }

    public void OnCompleted()
    {
        onCompleted();
    }

    public void OnError(Exception error)
    {
        onError(error);
    }

    public void OnNext(T value)
    {
        onNext(value);
    }
}

public class AnonymousDisposable : IDisposable
{
    Action dispose;
    bool isDisposed = false;

    public AnonymousDisposable(Action dispose)
    {
        this.dispose = dispose;
    }

    public void Dispose()
    {
        if (!isDisposed)
        {
            isDisposed = true;
            dispose();
        }
    }
}

そのまま書き出すだけなので、難しいことは何一つありませんが、面倒くさい……。なお、今回はRx抜きでの実装のためこうして自前で定義していますが、RxにはObservable.Create/CreateWithDisposable、Observer.Create、Disposable.Createというメソッドが用意されていて、それらは今回定義したAnonymousHogeと同一です。new ではなくCreateメソッドで生成するため型推論が効くのが嬉しい。

Observable.Select/Where

下準備が済んだので実装していきましょう。まずはSelect。

public static IObservable<R> Select<T, TR>(this IObservable<T> source, Func<T, TR> selector)
{
    return new AnonymousObservable<TR>(observer => source.Subscribe(
        new AnonymousObserver<T>(
            t => observer.OnNext(selector(t)),
            observer.OnError,
            observer.OnCompleted)));
}

Enumerableと似ているようで非常に分かりにくい。AnonymousObservableの引数のラムダ式は、Subscribeされた時に実行されるもの。というわけで、突然出てきているかのような引数のobserverは、Subscribeによって一つ後ろのメソッドチェーンから渡されるものとなります。

Observable.Range(1, 10) // これがsource
    .Select(i => i * i)
    .Subscribe(i => Console.WriteLine(i)); // これがobserver

こんな前後関係の図式になっています。ドットの一つ前のメソッドがsource、一つ後ろのメソッドがobserver。 最終的な目的としては元ソースからOnNext->OnNext->OnNextと値を伝搬させる必要があるわけですが、元ソースは末端どころか次に渡す先すら知りません。そのため、まず最初(Subscribeされた時)にsource.Subscribeの連鎖で元ソースまで遡ってやる必要がある、というわけです。非常に説明しづらいのでデバッガで追ってみてください。

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new AnonymousObservable<T>(observer => source.Subscribe(
        new AnonymousObserver<T>(
            t => { if (predicate(t)) observer.OnNext(t); },
            observer.OnError,
            observer.OnCompleted)));
}

WhereはSelectのOnNext部分が違うだけのもの。コピペ量産体制。

ToObservable

Selectなどと同じくreturn new AnonymousObservableですが、もうSubscribeはしません(そもそもIObservable sourceがないので出来ないですが)。ここからは、末端から伝達されてきたobserverに対して値をPushしてやります。

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source)
{
    return new AnonymousObservable<T>(observer =>
    {
        var isErrorOccured = false;
        try
        {
            foreach (var item in source)
            {
                observer.OnNext(item);
            }
        }
        catch (Exception e)
        {
            isErrorOccured = true;
            observer.OnError(e);
        }
        if (!isErrorOccured) observer.OnCompleted();

        return new AnonymousDisposable(() => { });
    });
}

Subscribeされると即座にforeachが回ってOnNext呼びまくる。ToObservableはHot or ColdのうちColdで、Subscribeされるとすぐに値が列挙されるわけです。Coldってのは、なんてことはなく、ようはすぐforeachされるからってだけの話でした。

戻り値のIDisposableは、FromEventではイベントのデタッチなどの処理がありますが、ToObservableでは何もする必要がないので何も無し。

FromEvent徹底解剖

Coldだけでは、別にEnumerbaleと全然変わらなくて全く面白くないので、Hot Observableも見てみます。Hotの代表格はFromEvent。そんなFromEventには4つのオーバーロードがあります。せっかくなので、細かく徹底的に見てみましょう。

public class EventSample
{
    public event EventHandler BlankEH;
    public event EventHandler<SampleEventArgs> GenericEH;
    public event SampleEventHandler SampleEH;
}

public class SampleEventArgs : EventArgs { }
public delegate void SampleEventHandler(object sender, SampleEventArgs e);

static void Main(string[] args)
{
    var sample = new EventSample();
    // 1. EventHandlerに対応するもの
    Observable.FromEvent(
        h => sample.BlankEH += h, h => sample.BlankEH -= h);
    // 2. EventHandler<EventArgs>に対応するもの
    Observable.FromEvent<SampleEventArgs>(
        h => sample.GenericEH += h, h => sample.GenericEH -= h);
    // 3. 独自EventHandlerに対応するもの
    Observable.FromEvent<SampleEventHandler, SampleEventArgs>(
        h => new SampleEventHandler(h),
        h => sample.SampleEH += h, h => sample.SampleEH -= h);
    // 4. リフレクション
    Observable.FromEvent<SampleEventArgs>(sample, "GenericEH");
    Observable.FromEvent<SampleEventArgs>(sample, "SampleEH");
}

FromEventと言ったら文字列で渡して―― という感じだったりですが、むしろそれのほうが例外的なショートカットで、基本はeventをadd/removeする関数を渡します。3つもありますが、基本的には三番目、conversionが必要なものが最も多く出番があるでしょうか。ただのEventHandlerなんて普通は使わないし、ジェネリクスのEventHandlerもほとんど見かけないしで、どうせみんな独自のEventHandlerなんでしょ、みたいな。もしEventHandler<T>で統一されていれば、こんな面倒くさいconversionなんて必要なかったのに!もしくは、みんなAction<object, TEventArgs>で良かった。名前付きデリゲートの氾濫の弊害がこんなところにも……。

実際のとこ文字列渡しで良いよねー、と思います。リフレクションのコストはどうせ最初の一回だけだし。リファクタリング効かないといっても、別にイベントの名前なんて変更しないっしょっていうか、フレームワークに用意されてるイベントは固定だし、って話ですし。

FromEventの作成

そんなわけで、今回は3引数のFromEventを作ります。FromEventの戻り値はIEventなので、IEventの定義も一緒に。

public interface IEvent<TEventArgs> where TEventArgs : EventArgs
{
    object Sender { get; }
    TEventArgs EventArgs { get; }
}

public class AnonymousEvent<TEventArgs> : IEvent<TEventArgs> where TEventArgs : EventArgs
{
    readonly object sender;
    readonly TEventArgs eventArgs;

    public AnonymousEvent(object sender, TEventArgs eventArgs)
    {
        this.sender = sender;
        this.eventArgs = eventArgs;
    }

    public object Sender
    {
        get { return sender; }
    }

    public TEventArgs EventArgs
    {
        get { return eventArgs; }
    }
}

public static IObservable<IEvent<TEventArgs>> FromEvent<TDelegate, TEventArgs>(
    Func<EventHandler<TEventArgs>, TDelegate> conversion,
    Action<TDelegate> addHandler,
    Action<TDelegate> removeHandler) where TEventArgs : EventArgs
{
    return new AnonymousObservable<IEvent<TEventArgs>>(observer =>
    {
        var handler = conversion((sender, e) =>
        {
            observer.OnNext(new AnonymousEvent<TEventArgs>(sender, e));
        });
        addHandler(handler);
        return new AnonymousDisposable(() => removeHandler(handler));
    });
}

感覚的にはToObservableの時と一緒。Subscribeされたら実行される関数を書く。Subscribe時に実際に実行されるのはaddHandlerだけ。つまりイベント登録。そしてイベントが発火した場合は、conversionのところのラムダ式に書いたものが呼び出される、つまり次のobserverに対してOnNextでIEventを送る。そして、DisposeされたらremoveHandlerの実行。

これが、Hotなわけですね。つまりSubscribeだけではOnNextが呼ばれず、もう一段階、奥から実行される。

// 実行例としてObservableCollectionなどを用意。
var collection = new ObservableCollection<int>();

var collectionChanged = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => new NotifyCollectionChangedEventHandler(h),
            h => collection.CollectionChanged += h,
            h => collection.CollectionChanged -= h)
        .Select(e => (int)e.EventArgs.NewItems[0]);

// attach
collectionChanged.Subscribe(new AnonymousObserver<int>(i => Console.WriteLine(i), e => { }, () => { }));
collectionChanged.Subscribe(new AnonymousObserver<int>(i => Console.WriteLine(i * i), e => { }, () => { }));

collection.Add(100); // 100, 10000
collection.Add(200); // 200, 40000

利用時は大体こんな感じになります。いたって普通。

まとめ

というわけで実装を見ていきましたが、意外と簡単です。リフレクタでToObservable見たけどこんな簡単じゃなかったぞ!と言われると、そうですね、実際のRxはScheduler(カレントスレッドで実行するかスレッドプールで実行するか、などなどが選べる)が絡むので実装はもう少し、というかもうかなり複雑です。だからこそ惑わされてしまうというわけで、基本的な骨格部分にのみ絞ってみれば十二分にシンプル、というのを掴むのが肝要じゃないかと思います。

次回は前回予告の通りに、後回しにしちゃってるけれど結合周りを紹介できればいいなあ。あと、FromAsyncか、Timer周辺か、Schedulerか……。RxJSもちゃんと例を出したいし、例を出したいといえば、そう、メソッド紹介だけじゃなく実例も出していきたいなあ、だし。うーん。まあ、ボチボチとやっていきます。最近ほんとRxの知名度・注目度が高まってるような気がしてます。ぐぐる検索で私のへっぽこ記事が上位に出てしまうという現状なので、申し訳ない、じゃなくて、それ相応の責任を果たすという方向で頑張りたいと思います。つまりは記事をちゃんと充実させよう。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive