Archive - Rx

Immutable CollectionsとSubject(Rx)の高速化について

最近はUniRxというUnity向けのReactive Extensionsの実装を書いているので、そこにImmutableなCollectionのちょーどよく分かりやすい使い道の実例があるので紹介しようかと思います。Rx自体はImmutable Collections使ってるわけではありませんが、同様の(簡易的)実装を内部で持っています。UniRxも同様に簡易実装を中で持つ形です。

Immutable Collectionsを知らにゃい?詳しくはNET Framework Blog - Immutable collections ready for prime timeを。または、以前に私がセッションで発表した資料もありますので、それも見てください。neue cc - .NETのコレクション概要とImmutable Collectionsについて。1.0リリースからもベータ版のリリースは続いていて、今回はそのベータのほうを使います(ダウンロードはNuGetでプリリリースのものを有効にするだけです)。何故かと言うと、今回使うImmutableArrayはベータのほうにしか入っていないからです。

素朴なSubject

最も素朴なSubjectを作ってみましょう。SubjectはEventのRx的な表現で+=とInvokeが出来るもの、とでも思ってもらえれば。

public class MySubject<T> : IObservable<T>, IObserver<T>
{
    List<IObserver<T>> observers = new List<IObserver<T>>();
 
    // Subscribeするとリストに貯めて
    public IDisposable Subscribe(IObserver<T> observer)
    {
        observers.Add(observer);
        return null; // 本来は戻り値をDisposeするとRemoveだけど省略
    }
 
    // OnNextで配信
    public void OnNext(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }
    }
 
    // OnErrorとOnCompletedは中略
}

こんなもんですね、簡単簡単。実際使う場合は

// とりあえずこういうの用意しとかないとメンドーなので。
// Rxを参照してるならSubescribe(x => { })でいいよ!
public class ActionObserver<T> : IObserver<T>
{
    readonly Action<T> onNext;
 
    public ActionObserver(Action<T> onNext)
    {
        this.onNext = onNext;
    }
 
    public void OnNext(T value)
    {
        onNext(value);
    }
 
    // OnErrorとOnCompletedは中略
}
 
// で、こんなかんぢ
var subject = new MySubject<int>();
 
subject.Subscribe(new ActionObserver<int>(x => Console.WriteLine(x)));
subject.Subscribe(new ActionObserver<int>(x => Console.WriteLine(x * 2)));
 
subject.OnNext(500); // 500, 1000

概ね見たまんまな単純な話ですねー、さて、この実装は素朴すぎるので簡単に死にます。マルチスレッドで、とかそういうことじゃなく、例えば……

// 呼ばれるとイベント登録しに走るような場合
subject.Subscribe(new ActionObserver<int>(x => subject.Subscribe(new ActionObserver<int>(_ => Console.WriteLine(x)))));
 
//ハンドルされていない例外: System.InvalidOperationException: コレクションが変更されました。列挙操作は実行されない可能性があります。
subject.OnNext(10000);

foreachの最中にList本体にAddやRemoveといった操作は許可されていないのですねー。そんなのしねーよ、と突っぱねることはRxの使い方の場合は実際できないので、対処が必要です。一番簡単なのはまるっとコピーすること。

// MySubject<T>.OnNext
public void OnNext(T value)
{
    foreach (var item in observers.ToArray()) // 列挙はコピー
    {
        item.OnNext(value);
    }
}

こういう対処はLINQ to XMLのドキュメント宣言型コードと命令型コードの混在のバグ (LINQ to XML)でも薦められている、特別でもない一般的なテクニックということで、場合によっては普通に使っても構わない話だと思います。スレッドセーフにするのもlock仕込むだけ。

public IDisposable Subscribe(IObserver<T> observer)
{
    lock (observers)
    {
        observers.Add(observer);
    }
    return null;
}
 
public void OnNext(T value)
{
    IObserver<T>[] array;
    lock (observers)
    {
        array = observers.ToArray();
    }
    foreach (var item in array)
    {
        item.OnNext(value);
    }
}

高速化する

素朴な実装の問題は、まぁパフォーマンス。コピーだから一概に悪いとは言わなくて、場合によっては全然普通に使って構わないというのは頭に入れて欲しいのですけれど、さすがにOnNextのような、イベントが叩かれるような、頻度の高いもので毎回コピーが走るのは些か厳しい。じゃあどうしよう?そうだConcurrent Collectionだ!ふむ……。でもConcurrentQueueとかだと(今回省いてますが)Removeするのがむつかしい。ConcurrentDictionaryで代替だ!でも列挙の具合が不透明(並列コレクションの列挙の挙動は結構色々なのでそれなりに注意が必要です)、パフォーマンス的にもただのforeachよりは劣るよねえ、せっかくやるならエクストリームな性能を追い求めたい気もする。

と、そこで出てくるのが(?)Immutable Collections、の、ImmutableArray。

注意しなきゃいけないのは別にImmutable Collections使ったからって必ずしも早いとかってわけじゃないです。むしろ多くの場合でImmutable Collectionsは不適でしょう。コレクションには特性があって、それにうまく合致しなければむしろ遅いです。今回のシチュエーションではImmutableArrayが割と最適にハマります(同じAPIを持ったリスト的なものにImmutableListがありますが、今回だとArrayのほうが良い)。とりあえず見てみましょう、か。

class MySubject<T> : IObservable<T>, IObserver<T>
{
    ImmutableArray<IObserver<T>> observers = ImmutableArray.Create<IObserver<T>>();
 
    public IDisposable Subscribe(IObserver<T> observer)
    {
        // スレッドセーフな入れ替え
        while (true)
        {
            var oldCollection = observers;
            var newCollection = oldCollection.Add(observer);
            var comparedCollection = ImmutableInterlocked.InterlockedCompareExchange(ref observers, newCollection, oldCollection);
 
            if (comparedCollection == oldCollection) return null; // 変更対象がAddしている間に変わってなければ成功
        };
    }
 
    public void OnNext(T value)
    {
        // 普通にぐるぐる回しても安全
        foreach (var item in observers)
        {
            item.OnNext(value);
        }
    }
 
    // OnErrorとOnCompletedは中略
}

Listの宣言をImmutableArrayに変えて、あとは、フィールドの代入が全然変わってる!そう、ImmutableArrayの差し替えはちょっと面倒くさいのです。所謂CAS(Compare And Swap)という奴で、「大抵の場合は衝突しないけど原理的にたまに衝突する」という場合のために、グルグル回って比較して置き換えるという手段を取ります。ImmutableCollectionsにはそういった処理のためのヘルパーメソッドがImmutableInterlockedクラスに幾つか用意されています。ここではImmutableArrayで使えるImmutableInterlocked.InterlockedCompareExchangeを使いました。

特性

ImmutableArrayの中身は、配列です。Addは中で内部の配列をまるっとコピーして、新しい配列を作っています。foreachはその内部の配列に対して列挙かけるだけなので、普通の配列を回すのと性能はまるっきり変わらない。なので、Addのコストは非常に高いけれど、他は通常の配列と変わらないぐらい高速というのが特性です。

なんでSubjectの実装にImmutableArrayが適切かというと、「追加や削除よりも圧倒的に多く列挙が呼ばれる」からですね。そもそも普通にOnNext書けば毎回コピーが走るので、だったら追加の時のコピー一発で済ませられるなら遥かに高効率と思われるのではないでしょーか。

これにより、イベント的な使用でのSubjectのパフォーマンスは、ノーロック・ノーコピーで、配列とほぼ同等の性能が出ます。完璧!

まとめ

Immutable Collectionsは、まぁ、実際のとこガチッと使えるシーンがはまるケースはぶっちけあんまないと思います!コレクションとしての重要度は 普通のジェネリックコレクション>コンカレントコレクション>超えられない壁>イミュータブルコレクション でしょうし、使うコレクションを探す場合も、そこから順番で考えたほうが良いでしょふ。neue cc - .NETのコレクション概要とImmutable Collectionsについてでも書いたのですけれど、別にイミューラブルコレクション=速い、というのは大間違いです。むしろかなりピーキーで、性能特性をしっかり考えないと全く使いこなせません。

それでも今回のように使えるかもしれない!?ような局面というもの自体は存在するので、覚えておいて損はないと思います。次の.NET Frameworkに標準で入るのかどうかは今のところ分かりませんが、多分入るんじゃないかなー、Roslynで使いたいようだしー、って感じなので先取りしちゃりましょう!

A Beginners Guide to Reactive Extensions with UniRx

どうも始めましての人は始めまして、@neueccといいます。この記事はUnity アセット真夏のアドベントカレンダー 2014 Summer!というイベントの23日目です。クリスマスのアレ!真夏に……!しかしクリスマスのアレは比較的脱落も少なくのないのですが、これは見事ーに続いてます。しかも日付が変わった瞬間に公開されることの多いこと多いこと。〆切というのは23:59:59だと思っている私には辛い話です……。さて、前日はnaichiさんの【うに部屋】Unityのゲーム投稿サイトにアセット検索機能を付けてみたでした。便利でいいですねー、UniRxも使ったアセットとして沢山並ぶ日が来ると、いいなぁ。

Reactive Programming

とは。と、ここで7/30に行われた【第1回】UnityアセットまみれのLT大会で使ったスライドが!

Reactive Programming by UniRx for Asynchronous & Event Processing from Yoshifumi Kawai

LTということで制限時間5分だったんですが当然終わるわけなくて凄まじく早口でまくしたてて強引に終わらせたせいで、全くワカラン!という感想を頂きましたありがとうございますごめんなさい。簡単にかいつまみますと、

Reactive Programmingはガートナーのハイプサイクル(記事では2013ですがこないだ出た2014年版のApplication DevelopmentでもOn the Riseに入っています)やThought Works Technology Raderといった有名な技術指標にもラインナップされるほど、注目を浴びている技術です。Scala周辺からもThe Reactive Manifestoといった文章が出ていますし、JavaでReactive Programmingを実現するRxJavaはGitHubのStarが2995、Objective-C用のReactiveCocoaは5341、JavaScriptでもRxJSが1792、bacon.jsが2864と、知名度・注目度、使われている度は非常に大きくなっています。

Reactive Programming自体は別に近年始まったわけでもなく、昔からたまに盛り上がっては消え、って感じなので、「へぇ~この技術2年前くらいに流行ってたよね 2年前くらい前に見たわ」って思う人もいるかもですが、大事なのは、ちゃんと実用に乗った、ということです。実験的なライブラリの段階はとうに超えて、RxJavaやReactiveCocoaの知名度が示す通り、完全に実用レベルに乗りました。

UniRxはReactive Extensions(Rx)というMicrosoftの開発した.NET用のライブラリ(現在はOSS化)を私がUnity用に移植したものです。現在のReactive(Rx)Hogeの源流は、この.NETのRxにあります。ほとんどのライブラリから言及され、原理原則や用語はRx.NETに従っていることが多い。というわけでRx.NETは革命的に素晴らしいわけなのです、が、しかし、Unityでは動きません。それはRx.NETが本来のC#の機能を全面的に使いすぎてUnityのC#では動かせないから……。少なくともiOSのAOT問題を全く突破できない……。

が、どうしてもUnityで使いたいので移植(といってもソースコードレベルではほとんど自前で書いてるのでインターフェイスと挙動を合わせているという感じで割と書き下ろしです、一部は純粋に移植してますが)+UnityはUnityで.NETとは異なるところもあるので、Unityで使って自然になるような改良を施したのがUniRxになります。

ムリョーですよ、ムリョー。FREE!。私はLINQやRxの大ファンなんで、とにかく使ってもらいたい欲求のほうが強くて。そして勿論、自分で使えない状態にも耐えられなくて!気になったらGitHubでStarつけてください(Star乞食)、勿論AssetStoreのほうでもいいですよ:)

UniRxは非同期やイベント処理をReactive Programmingの概念を元に大きく簡単にします。Unityにも非同期ライブラリ、イベントライブラリは沢山あります。それらと比べたUniRxの強みは「Rxであるということ」です。Reactive Programmingは現状かなりブームになっているとおりに、その手法の正しさ、威力に関しては実証済みです。また、手法やメソッド名などが同一であるということは、既に普及しているRx系のライブラリのドキュメントがまんま使えます。そしてUniRxで学んだやり方は他のプラットフォームに移っても同じように使えるでしょう。ネイティブAndroid(RxJava)でもネイティブiOS(ReactiveCocoa)でも.NET(Rx.NET)でもJavaScript(RxJS)でも、そういった先々への応用性もまた、選ぶべき理由になると思います。

Introduction

前置きが長い!さて、この記事を出すちょっと前に【翻訳】あなたが求めていたリアクティブプログラミング入門という素晴らしい記事が翻訳されました!はてブでも500以上集まってましたし、実際めっちゃ良い記事です。この記事はRxJSを用いて具体的な説明を行っていますが、勿論UniRxでも同様のことができます(というわけで、結論んとしては↑の記事読んでもらえればいいんでさー、とか投げてみたい)。ちょっとやってみましょう。最初の例はダブルクリックの検出です。

using System;
using UniRx;
using UnityEngine;
 
public class Intro : MonoBehaviour
{
    void Start()
    {
        // 左クリックのストリーム
        var clickStream = Observable.EveryUpdate()
            .Where(_ => Input.GetMouseButtonDown(0));
 
        // Buffer:250ミリ秒以内に連続してクリックされたものをまとめる
        clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250)))
            .Select(xs => xs.Count) // 250ミリ秒以内検出したクリック数
            .Where(x => x >= 2) // 2個以上のみにフィルタ
            .Subscribe(_ => Debug.Log("ダブルクリックされた!")); // foreachみたいな
    }
}

このスクリプト(Intro.cs)をMainCameraでもなんでも適当な何かに貼り付けてもらえれば、画面のどこでもダブルクリックされればログに流れます。

この例は6行しかない単純なものですが、多くの要素が詰め込まれています!そして、実際、詰め込まれすぎていてかなり難しい!わからん!というわけで真面目に分解していきます。

Rx is LINQ

Rxについてまとめると「時間軸に乗るストリームに対するLINQ」です。時魔法です。先の記事でも「FRPは非同期データストリームを用いるプログラミングである」といってました。(ドヤァするわけですが、私は遥か昔、2011年の時点で同じこと言ってましたからね!)。このことさえピンと来れば割としっくり来るんですが、同時にこれがしっくり来るというところまでが敷居となる。ところが、RxはこのことをLINQとして表現することによりグッと敷居を下げました。なんだ、LINQと一緒じゃん!って。え、LINQがワカラナイ?それは、普通にC#の必須技術なので是非学んでください!(色々種類ありますがLINQ to Objectsだけでいいです。以前に@ITでLINQの仕組み&遅延評価の正しい基礎知識を書いたりAn Internal of LINQ to Objectsというスライドで発表したりしてるんで読んでください)

簡単に同じように見れることを説明すると、LINQ to Objectsでは

new[] { 1, 2, 3, 4, 5 }
    .Where(x => x % 2 == 0)
    .Select(x => x * x);

のように、配列をフィルタリングして別の形に射影できます。

配列、int[]の横軸は当然ながら長さです。Rxは時間を横軸に取ることができます。どういうことか、というと、例えば何かをタップするというイベントは図にしたらこういう表現ができます。

ということは、同様にWhereしたりSelectしたりできる。

少しピンと来ました?さて、配列はnew[]{}やGetComponentsなどで手に入れることができますが、Rxで扱うためのイベントストリーム(IObservable<T>、ちなみにLINQ to Objectsは配列をIEnumerable<T>として扱う)はどこに転がっているのか。UniRxにおいて一番お手軽なのはObservable.EveryUpdateです。ゲームループは、ループというぐらいに1フレーム毎にUpdateが毎回呼ばれるサイクルなわけですが(参考:Script Lifecycle Flowchart)、つまり60fpsなら1/60秒毎に発生する時間軸にのったイベントとみなせられます。一度、時間軸上に乗るイベントとみなせられれば、それは全て無条件にRxで取り扱えます。

実際のところ、イベントに限らずあらゆるものがRxに見せかけることができます。非同期だってx秒後に一度だけ発生するイベントと考えれば?配列は0秒で沢山の値が発行されるイベントと考えれば?Unityのコルーチン(IEnumerator)だって乗せられる。

全てのものをRx化(IObservable化)すれば、あとは好きなようにLINQのメソッドで合成してしまえる。あらゆる素材(イベント・非同期・配列・コルーチン)を鍋(IObservable)に突っ込んで、料理(Where, Select, etc…)して食べる(Subscribe)。というのがRxの基本の基です。料理方法は色々あるので、そこが次のキモですね。

Composable

というわけでclickStreamがマウスの左クリックのストリームになったということは分かったでしょうか!?

var clickStream = Observable.EveryUpdate()
    .Where(_ => Input.GetMouseButtonDown(0));

左クリックがあったフレームだけにフィルタリングしているということですねー。では次は?

clickStream.Buffer(clickStream.Throttle(TimeSpan.FromMilliseconds(250)))

Rxの利点として、イベントが変数として扱えることです。どういうことか、戻り値にすることもできるし、フィールドやプロパティとして公開することもできるし、自分自身と結合することもできる。というわけで、clickStreamが二個出てるのは、自分との合成なんですね(わっかりづらい!)

そこまではいいとしてThrottleが分かりづらい!Throttleは一定時間毎(この場合は250ミリ秒)に値が観測できなかったら値を流す、という挙動です。

スロットル、流れてくる値を絞り込んでいるんですね。Rxは時間軸上に乗っているので、こうした時間関係を扱うメソッドが豊富です。Sample(一定時間毎のもののみを流す)、Delay(一定時間後に流す)、Timeout(一定時間たっても値がなければエラーにする)、Buffer(一定時間の間値を溜めてから流す)などなど。一見分かりづらいですが、そもそも普通にも書きづらい、そういったものがメソッド一発で書けるというのもRxの魅力です。

じゃあどういうことかというと、もうまだるっこしいので全部のせますが

ThrottleしたものをBufferしているのは、値が流れてくるまで値を溜める(そして配列にして後続に流す)、ということです。あとはSelectで配列の個数、つまり250ミリ秒の間にクリックされた回数に変形して、Whereでフィルタリング(ダブルクリック、つまり2個以上ならばOK)。なるほどねー?

ちなみにコレは(250ミリ秒以内に)クリックされ続けてるとずっと実行されません。最後にクリックされてから250ミリ秒後に、その間に2回以上クリックされてると実行される。が正しい表現でしょうか。この動作が望ましい場合もあれば望ましくない場合もある、ダブルクリックと一口でいっても定義は案外複雑なので、その辺はチューンしてみてください。その辺もclickStream.Timestamp()や.TimeInterval()でその時刻や前との差分なんかが簡単に取れます。

Subscribeって?

Subscribeはforeachです。配列をforeachで消費するように、RxではSubscribeで消費する。イベントストリームなのだからeventのSubscribe(購読)。foreachしなければ配列は動かないように、RxもSubscribeしなければ始まりません!(というのは正確には語弊があり、ObservableにはHotとColdという性質があり、HotはSubscribeしなくても動いている、とかかんとか、とかがありますが今は無視します)。RxにおけるSubscribeは、値・エラー・完了を一手に受け取ります。

Observable.EveryUpdate() // 0, 1, 2, 3, 4, 5, 6, 7, 8,....
    .Take(5)
    // .Do(x => { if(x == 3) throw new Exception(); } )
    .Subscribe(
        x => Debug.Log(x + "Frame"), // OnNext
        ex => Debug.Log("Exception!" + ex), // OnError
        () => Debug.Log("Complete!")); // OnCompleted

通常、イベントに終わりはありませんが、Rxのイベントストリームは終わりを持たせることができます。Takeは値をx個取得したら強制的に終点ということにするもの。長さ5のイベントストリーム。というわけで↑のコードはOnNextが5回呼ばれた後にOnCompletedが呼ばれます。もしDo(値が通った時にメソッドを実行する)のコメントアウトを外すと、OnNextが3回呼ばれた後にOnErrorが呼ばれます。この場合はOnCompletedは呼ばれません。なお、OnNext/OnError/OnCompletedはどれも書いても書かなくてもいいです(その場合はOnErrorはグローバルに例外をそのままthrow、OnNextとOnCompletedは何もしない、ということになる)

Rxのイベントストリームは以下の原則に必ず従います。これは他のRx系列のReactive Programmingライブラリも同じものを採用しています。

OnNext* (OnError | Oncompleted)?

OnNextが0回以上呼ばれた後に、OnErrorもしくはOnCompletedが1回または0回よばれます。

購読ということは解除(Unsubscribe)はあるのか、というと、あります!Subscribeの戻り値は必ずIDisposeableで、それをDisposeすることが解除になります。

var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
    .Subscribe(_ => Debug.Log("hogehoge!"));
 
Observable.EveryUpdate()
    .Where(_ => Input.GetMouseButtonDown(0))
    .Take(1)
    .Subscribe(_ => subscription.Dispose());

Intervalはx秒毎に値を発行するというもの、↑の例では1秒おきにhogehoge!と表示されます。もし左クリックがあったら、その戻り値をDisposeしているので、これで値の発行は止まります。Take(1)なので、左クリックを監視するストリームもTake(1)が終わったら自動的にEveryUpdateの監視を解除しています。

var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
    .Subscribe(_ => Debug.Log("hogehoge!"));
 
Observable.EveryUpdate()
    .Where(_ => Input.GetMouseButtonDown(0))
    .Take(1)
    .Subscribe(_ => subscription.Dispose());

この形式の何がいいか、というと、入れ物に入れてまとめて購読解除できます。

CompositeDisposable eventResources = new CompositeDisposable();
 
void Start()
{
    Observable.Interval(TimeSpan.FromSeconds(1))
        .Subscribe(_ => Debug.Log("hogehoge!"))
        .AddTo(eventResources);
 
    Observable.EveryUpdate()
        .Where(_ => Input.GetMouseButtonDown(0))
        .Subscribe(_ => Debug.Log("click!"))
        .AddTo(eventResources);
}
 
void OnDestroy()
{
    eventResources.Dispose();
}

CompositeDisposableはIList[IDisposable]みたいなもので、IDisposableをまとめて突っ込めます。AddToはメソッドチェーンのまま突っ込めるようにするUniRxの定義している拡張メソッド。こうしてためておいて、Destroyでまとめて解除、ができます。こうした変数で扱えるという性質により、イベントの管理がかなり容易になっています。勿論、文字列で止めて、などもない完全なタイプセーフですしね。

余談:片方のストリームが発動したら止める、という処理は割と定形なので明示的にsubscriptionをDisposeするようなコードを書かなくても、TakeUntilが使える。

Observable.Interval(TimeSpan.FromSeconds(1))
    .TakeUntil(Observable.EveryUpdate().Where(_ => Input.GetMouseButtonDown(0)))
    .Subscribe(_ => Debug.Log("hogehoge!"));

色々あるってことです!メソッド一覧を眺めて使い道を考えよう!

非同期について

Unityにある素敵な素敵なコルーチンはあまり素敵ではない。WWWでyieldできて非同期扱えてサイコー、ではない。なんで?C#のyieldが非同期を扱うためのものじゃないから。try-catchできないからyield return StartCoroutineしたら例外はあの世に飛んでいく。それを避けるためにWWWのようにwww.textとwww.errorを持つようにあらゆる非同期はどうでもいいコンテナを持たなければならなくて?そしてそもそもIEnumeratorは戻り値を持てない。だから戻り値を持たせたかったらコールバックの形に返るしかない。酷い、酷い、醜悪な話だ。

// こんなこるーちんを用意して
IEnumerator GetGoogle(Action<string> onCompleted, Action<Exception> onError)
{
    var www = new WWW("http://google.com/");
    yield return www;
 
    if (!www.error) onError(new Exception(www.error));
    else onCompleted(www.text);
}
 
// なんかダラダラしてる
IEnumerator OnMouseDown()
{
    string result;
    Exception error;
    yield return StartCoroutine(GetGoogle(x => result = x, x => error = x));
    if(error != null) { /* なんかする */ }
 
    string result2;
    Exception error2;
    yield return StartCoroutine(GetGoogle(x => result2 = x, x => error2 = x));
    if(error2 != null) { /* なんかする */ }
}

基本的に破綻している。別に分離しないで単純に単純なWWWを延々と連鎖している限りは、少しはまともに綺麗になるかもしれないけれど、それは処理を分離できないという問題を産む。一つの巨大な無駄にデカいCoroutineと重複コードを避けられない。何れにせよコルーチンは非同期を扱うためのベストソリューションでは全くない(ところでNode.jsはyieldで立派に上手く非同期を扱っているじゃないか!と思う方もいるかもしれませんが、アレはyieldが戻り値を返しているから可能であって、どちらかといえばC# 5.0のasync/awaitに近い。C#のyield returnとは少し別物)

Rxならどう書くか?全てをObservableの連鎖フローに変換する。

// xが完了したらそれでy、完了したらzのダウンロードの連鎖のフローをLINQクエリ式で
var query = from x in ObservableWWW.Get("http://google.co.jp/")
            from y in ObservableWWW.Get(x)
            from z in ObservableWWW.Get(y)
            select new { x, y, z };
 
// Subscribe = "最後に全部まとまったあとの"コールバック(ネストしないから処理が楽)
query.Subscribe(x => Debug.Log(x), ex => Debug.LogException(ex));

メソッドチェーン(もしくはクエリ式)で、コールバックのネスト数を最小に抑えてフロー化、一体となって処理する。

また、並列ダウンロードなども簡単に行えるのは大きな利点かもしれない。

var parallel = Observable.WhenAll(
    ObservableWWW.Get("http://google.com/"),
    ObservableWWW.Get("http://bing.com/"),
    ObservableWWW.Get("http://unity3d.com/"));
 
var cancel = parallel.Subscribe(xs =>
{
    Debug.Log(xs[0]); // google
    Debug.Log(xs[1]); // bing
    Debug.Log(xs[2]); // unity
});
 
// これでキャンセルできる
cancel.Dispose();

こんな風に並べて簡単に並列処理できます。また、全てのRxのSubscribeは戻り値をDisposeすることでキャンセルできる。文字列でStopCoroutineする時代はさようなら。

コルーチンについて

と、コルーチンを腐しましたが、しかし実際コルーチンは素晴らしいツールだと思っています。フレームワークネイティブの機構であり、それはやはり強いのです。無理にRxで全部書くことはまったくもって完全に”可能”なのですが、それよりは素直にコルーチンで書いて、Rxはそれをまとめることに徹してみればいい。コルーチンもイベントと同じく一つの素材。と考えればイイ。

public IObservable<string> AsyncA(string msg)
{
    return Observable.FromCoroutine(observer => AsyncA(msg, observer));
}
 
// 戻り値のあるコルーチン(IObserverがコールバックを扱うコンテナとして考える、値通知とエラー通知、両方を内包する)
private IEnumerator AsyncACore(string msg, IObserver<string> observer)
{
    Debug.Log("a start");
    yield return new WaitForSeconds(1);
    observer.OnNext(msg); // 値を通知
    observer.OnCompleted();
    Debug.Log("a end");
}
 
public IObservable<string> AsyncB()
{
    return Observable.FromCoroutine(AsyncBCore);
}
 
// 戻り値のないコルーチン
private IEnumerator AsyncBCore()
{
    Debug.Log("b start");
    yield return new WaitForEndOfFrame();
    Debug.Log("b end");
}
 
// こんな使い方
var cancel = AsyncA()
    .SelectMany(_ => AsyncB())
    .Subscribe();
 
// 例によってコルーチンを止めたければ戻り値をDisposeする
cancel.Dispose();

Observable.FromCoroutineによって変換できて、あとは好きなように合成できる。FromCoroutineは戻り値があってもなくてもOK。作る時にお薦めなのはIEnumeratorはprivateにして、変換後のIObservableのほうだけをpublicにすること。

uGUI

iOS用のReactiveCocoaが非常に受け入れられているように、リアクティブプログラミングはGUIとの相性が非常に良いです。そこでUniRx ver.4.5では既にUnityの新GUIシステムに対応!.AsObservable()と書くだけで変換できます。例えば

GetComponentInChildren<Button>().AsObservable()
    .Subscribe(x => Debug.Log("クリックされた!"));

のように書けます。詳しい話はいつか!

そのうち書く何か(予告!)

「Pull vs Push」RxはPush型。Pull型のアーキテクチャはFindGameObjectsで探してきて何かする、もしくはなんとかManagerでオブジェクトを維持して、何れにせよそれらで蓄えて配列上のものに対して処理をグルッと書く。RxはPush型、何かして欲しい何かを各GameObject自身が通知する。ようするにイベント。イベントと違うのはRxならばイベントの集合体を圧倒的にコントロールしやすいこと(沢山のイベントストリームを処理するためのメソッドがある!)。本来、必要な時に必要な情報だけを送ってくるPush型のほうが、必要かどうかを取得してから考える必要のあるPull型よりもパフォーマンスも良くなる可能性が高いし。しかし生のイベントはコントロールが難しすぎて中々使えなかった。それをRxが解き放つ。とかうんたらかんたら。

学習リソース

UniRxの特徴として、Reactive Extensions系の学習リソースを流用できることが上げられますが、実際RxJavaのWikiは非常にお薦めです。沢山あるメソッドの説明が図入りで説明されていて非常に分かりやすい、例えばFiltering Observablesとか。

また、Introduction to Rxは非常に充実したチュートリアルを提供し、RxMarbles: Interactive diagrams of Rx Observables ではインタラクティブにメソッドの挙動を確認できます。

こうしたリソースにひたすらタダ乗り出来るのが強い!

更新履歴。

さて、いまさらですが、UniRxの最初の発表日は2014/04/19に開催されたすまべん特別編「Xamarin 2.0であそぼう!」@関東での発表でした。

UniRx - Reactive Extensions for Unity from Yoshifumi Kawai

この時にGitHubにリポジトリを公開して、アセットストアに審査出し。そこから審査に3回ほどこけて、一月後に2014年05月28日に公開されました、わーぱちぱち。ってしかしブログに解説書く書く詐欺で解説を書かないでいました、ほげえ。だからこの記事が最初の解説記事なんですねー、えー……。なんとなく書かないでいたのは、次のバージョンではもっと良くなってるから!を延々と繰り返してたから説。特に大きく変わったのがver.4.3(ちなみにUniRxのバージョンが4始まりなのは、審査にこける度に間違ってメジャーバージョンを上げちゃってたからです、気付いた時には戻せず……)

ver 4.3 - 2014/7/2
 
Fix iOS AOT Safe totally
MainThreadSchedule's schedule(dueTime) acquired time accuracy
MainThreadDispatcher avoid deadlock at recursive call
Add Observable.Buffer(count, skip)
Change OfType, Cast definition
Change IScheduler definition
Add AotSafe Utilities(AsSafeEnumerable, WrapValueToClass)
Change Unit, TimeInterval and Timestamped to class(for iOS AOT)
Add Examples/Sample7_OrchestratIEnumerator.cs

Unity + iOSのAOTでの例外の発生パターンと対処法という記事を書いて、そこそこ反響あったのですが、その成果を突っ込んでます。というわけで、このバージョンでiOSのAOT問題を大きく解決しました。そしてver4.4。

ver 4.4 - 2014/7/30
 
Add : Observable.FromEvent
Add : Observable.Merge Overload(params IObservable[TSource][] / IEnumerable[IObserable[TSource]])
Add : Observable.Buffer Overload(timeSpan, timeShift)
Add : IDisposable.AddTo
Add : ObservableLogger(UniRx.Diagnostics)
Add : Observable.StartAsCoroutine
Add : MainThreadDispatcher.RegisterUnhandledExceptionCallback
Add : Examples/Sample08, Sample09, Sample10, Sample11
Performance Improvment : Subject[T], OnNext avoids copy and lock
Performance Improvment : MainThreadDispatcher, avoids copy on every update
Change : Observable.ToCoroutine -> ToAwaitableEnumerator
Fix : ObservableMonoBehaviour's OnTriggerStay2D doesn't pass Collider2D

機能的にはObservableLoggerを入れたのとIDisposable.AddToでリソース管理のガイドを示したのが大きいんですが、一番大きいのはパフォーマンス改善かなあ、と。ガチで使うと大量に出てくるSubject[T]や、絶対経由することになるMainThreadScheduler/Disptacherの性能を限界まで向上させたので、あまりネックになることはないのではかな、と。で、公開まだなver4.5。

ver 4.5 - 2014/8/19(アセットストアへは審査中)
 
Add : ObservableWWW Overload(byte[] postData)
Add : Observable.Buffer Overload(windowBoundaries)
Add : LazyTask - yieldable value container like Task
Add : Observable.StartWith
Add : Observable.Distinct
Add : Observable.DelaySubscription
Add : UnityEvent.AsObservable - only for Unity 4.6 uGUI
Add : UniRx.UI.ObserveEveryValueChanged(Extension Method)
Add : RefCountDisposable
Add : Scheduler.MainThreadIgnoreTimeScale - difference with MainThreadScheduler, not follow Unity Timescale
Add : Scheduler.DefaultSchedulers - can configure default scheduler for any operation
Fix : DistinctUntilChanged iOS AOT issue.
Fix : Remove IObservable/IObserver/ISubject's covariance/contravariance(Unity is not support)
Fix : UnityDebugSink throws exception when called from other thread
Fix : Remove compiler error for Windows Phone 8/Windows Store App
Breaking Change : MainThreadSchduler follow Unity Timescale
Breaking Change : All Timebased operator's default scheduler changed to MainThreadScheduler
Breaking Change : Remove TypedMonoBehaviour.OnGUI for performance improvment
Performance Improvment : AsyncSubject[T]
Performance Improvment : CurrentThreadScheduler
Performance Improvment : MainThreadScheduler

本当はこの記事と同時にアセットストアでも公開!と行きたかったんですが審査がまだー……。uGUI対応のイベントハンドリングを足したり、UIで使うの見越したUniRx.UI.ObserveEveryValueChangedの追加とか、uGUIへの対応を見越した基礎部分を足してっていってる感じですね。こうしたUIでの利用法はuGUIのノウハウと共に貯めていきたい/公開していきたいと思っています。

あと凄く大きいのが時間ベースのメソッドで使われるデフォルトのスケジューラをScheduler.MainThreadに変えたことで、ふつーに使う分には全てがUnityのTimescaleの影響下にあるシングルスレッドで動く状態になるので、違和感というかハマりどころ(ObserverOnMainThreadしなかったから死んだ!オマジナイにObserveOnMainThreadって書きまくったせいで性能が!)を消せたのかなー、と思います。ここは本家Rxとは当然デフォルトが違うことになりますが、Unityネイティブに寄せるべきだろう、という判断です。

あと地味にWindows Phone 8やWindows Store Appにも対応しました。いや、最初のバージョンでは対応してたんですが機能足してるうちに、どうやらコンパイル通らなくなってしまっていて……。Platform切り替えないと気づけないのが辛いですねえ、Unity Cloud BuildのWindows Phone対応はよ!いちおう「We’ll be adding more platforms as the service matures.」ってあるので、適当に待ちましょう。そもそもBetaの現状は重すぎてそういう次元ですらないですしね。

次回更新では、現状ExecuteInEditModeでは動かないので、それに対応したものを出す予定です。

最後に

と、いうわけでどうでしょう?使ってみたくなってもらえれば幸いです。怒涛の更新のとおりにやる気はかなりあります、というか私はグラニという会社のCTOをしているんですが(CM放送などをした「神獄のヴァルハラゲート」が代表作です)、開発中の次のプロダクトに投下していて、実プロダクトで使う気満々というかドッグフーディングというか、ともあれ現状「枯れてない」というのは否定出来ないのですが、基本的なバグは既に概ね殺せているのではかなぁ、と、そこは信頼してくれると嬉しいですね。今回はUniRxの初めての記事だったので基礎の基礎的な話になりましたが(そうか?)、今後は応用的な記事などもどんどん出していきます。

Reactive Extensions自体は、私は2011年には@ITに連載:Reactive Extensions(Rx)入門という記事を書いていたり、そもそも2009年に最初のベータ版が出た時から追っかけて記事を書き続けていたりとneue.cc/category/programming/rx、5年間延々とRxを触っているので、さすがにかなり詳しいのではないかと自負するところです(ちなみに最初の記事は.NET Reactive Framework メソッド探訪第一回:FromEventでした。そう、当初はReactive Frameworkって名前だったんですね、更にもっと源流はMicrosoft Live Labs Voltaになります)。

繰り返しますが日本ではReactive Programmingのブームは定期的に起こっては消え(最初のほうでバズったのは2010年のやさしいFunctional reactive programming(概要編)でしょうか)、って感じですが、Microsoftは理論やプロトタイプに留まらず延々と改良を続け、完全に実用ベースに載せ、本物のブームを作り上げたことには本当に感嘆します。勿論、ブーム自体はMicrosoftよりは、そこから波及していったRxJavaやReactiveCocoaの貢献が非常に大きいです。私もUniRxで、Reactive Programingの強力さをUnityでも示し、大きなブームが巻き起こせればなあ、なんて野望は抱いていますね!

UniRxへの質問があれば、GitHubのIssuesUnityのForumに立ててあるスレッドでもぜひぜひですが、そこでは英語でお願いしたいのでちょっと敷居がー、ということであれば、普通にTwitterの@neuecc宛てに気楽に言ってください。またはTwitterで「UniRx」で常時検索してますんで独り言みたいな感じでポストしてもらえればチェックします。

あと会社単位でも、会社間交流ということで共催の社内勉強会などできれば嬉しいかなー、と思ってますので、是非グラニと勉強会やりたいという方いらっしゃいましたらお声がけください。今までもKLabさん - 2014年3月度ALMレポート(株式会社グラニ様との合同開催)やドリコムさん等と行ってきています。グラニのUnity以外の技術、というか現状はそちらがメインなのですが、というのはgihyo.jpのグラニがC#にこだわる理由という記事を見て頂ければなのですが、サーバーサイドをPHPやPython、RubyじゃなくてC#(Windows Server + ASP.NET)でやる、というのが強みです。勿論、今後クライアントサイドもC#(Unity)でやっていきます。

さて、明日は野生の男さんの「無料アセットで簡単IBL!」です。楽しみ楽しみー、ではでは!

Amazon Kinesis + Reactive Extensionsによる簡易CEP

AWSのAmazon Kinesis!大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス。うーん、いかにもわくわくしそうなキーワードが並んでいます。そしてついに先日、東京リージョンでも利用可能になったということでAWS Summitの最中もwktkして、どうやって利用したもんかと考えてました。だって、リアルタイムにイベントデータが流れてくる→オブザーバブルシーケンス→Reactive Extensions(Rx)、という連想になるのは自然なことですよね?

Kinesisとは

Rx、の前にKinesisとは。【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理を見れば事足りますが、表現するなら土管、ですかね。イベントデータの。以下ぽんち絵

Streamの中はShardという単位で分かれていて、データを放り込む時はPartitionKeyを元に、どのShardに突っ込まれるか決まる。読み書き性能自体は完全にShardの数で決まっていて、1シャード毎にWriteは1MB/sec - 1000Req/sec, Readは2MB/sec - 5Req/secとなってます。事前に負荷状況を予測していくのと、随時、Split(Shardの分割)とMerge(Shardの統合)してスケーリングしていく、って感じですかねえ。API自体は単純で、あんま数もないので簡単に理解できるかと。

APIが単純なのはやれることが少ないから。土管。情報を左から右に流すだけのパイプ。その代わり入力は限りなく無限にスケールしていく(Shardを増やしまくれば)。では出力は?というと、Kinesis Applicationとよばれる、といっても実体は、別にAPIをほぼほぼポーリングで叩いてデータ取り出して何か処理するものをそう呼んでるだけ。で、取り出すのはAPI叩いて保存されたデータを読むだけ。

そう、ポーリング。Kinesis自体は一時保管所であって、本当のリアルタイムでPubSub配信するわけじゃあない(用途としては問題ないレベルで低遅延にはなるけれど)。保存時間は24時間で、その間はStream中のどこから(最初からでも最新からでも任意の位置から)でも取り出すことができる。一時保管所がわりにS3を使ったりすると、ゴミは貯まるしどこまで取ったかとか煩わしくなるけれど、Kinesisの場合はStreamの形状になっているのでとてもやりやすい。ただしKinesisは制限として1レコード辺り50KBまで。更にHTTPで投げる際にBase64になってブヨっと膨らむ。

ObservableKinesisClient

C#でKinesisを使うには、AWS SDK for .NETを使えばAmazonKinesisClient入ってます。ソースコードも公開されてるしNuGetでも入れられるし、APIはとりあえずAsyncに対応してるし、APIデザインもちょっと奇妙なところもあるけれど、一応全て統一されたモデルでデザインされてるので、割と結構良いと思ってます。

Kinesis、データの登録はPutRecordでバイナリ投げるだけなので単純なのですが、取り出しの方はいささか面倒で、DescribeStreamによるStream内のShard情報の取得、GetShardIteratorによるShardIterator(どの位置から取得開始するか、の情報)の取得、それを元にGetRecord、そして延々とポーリングのためのループ。と、繰り返す必要があります。

というわけかで、まずは利用例の方を。

// とりあえずAWSのキーと、ストリーム名で生成する感じ
var client = new ObservableKinesisClient("awsAccessId", "awsSecretAccessKey", RegionEndpoint.APNortheast1, streamName: "KinesisTest");
 
// データの登録。オブジェクトを投げ込むとJSONシリアライズしたのを叩き込む。
await client.PutRecordAsync(new { Date = DateTime.Now, Value = "ほげほげほげほげ" });
 
// ObserveRecordDynamicでJSONのストリームとして購読できる
client.ObserveRecordDynamic()
    .Where(x => x.Value != "ほげ") // xはdynamicなのでどんなSchemaのJSONも自由に辿れる
    .Select(x => x.Date + ":" + x.Value)
    .Subscribe(Console.WriteLine);

はい。ObserveRecordDynamicで、リアルタイムに流れてくるデータを簡単に購読できます。IObservableなので、Rxによって自由にクエリを書くことが可能。また、何のデータが流れてくるか分からないストリームのために、JSONはdynamicの形でデシリアライズされています。(IntelliSenseの補助は効きませんが)スキーマレスに、あらゆるデータをRxで処理できます。もちろん、型付けされたものが欲しければObserverRecord<T>を、今は実装してないですが、まあ簡単につくれます:)

以下ObservableKinesisClient本体。

// JSON.NET, AWSSDK, Rx-Mainの参照が必要
public class ObservableKinesisClient
{
    readonly UTF8Encoding encoding = new UTF8Encoding(false);
    readonly JsonSerializer serializer = new JsonSerializer() { Formatting = Newtonsoft.Json.Formatting.None }; // ThreadSafeだよ
    readonly string streamName;
    readonly AmazonKinesisClient kinesis; // ThreadSafeなのかは知らない(ぉぃ
 
    // コンストラクタはもっとまぢめにやりましょう
    public ObservableKinesisClient(string awsAccessId, string awsSecretAccessKey, RegionEndpoint endPoint, string streamName)
    {
        this.kinesis = new AmazonKinesisClient(awsAccessId, awsSecretAccessKey, endPoint);
        this.streamName = streamName;
    }
 
    // ようするにObjectを1レコードずつJSONで突っ込むもの
    public async Task<PutRecordResponse> PutRecordAsync(object value)
    {
        using (var ms = new MemoryStream())
        using (var sw = new StreamWriter(ms, encoding))
        using (var jw = new JsonTextWriter(sw) { Formatting = Formatting.None })
        {
            serializer.Serialize(jw, value);
            jw.Flush();
            ms.Position = 0;
 
            var request = new PutRecordRequest
            {
                StreamName = streamName,
                Data = ms,
                PartitionKey = Guid.NewGuid().ToString() // PartitionKeyは適当にランダム
            };
 
            // つまり1レコード1HTTP POSTということになる。
            // 大量に投げる際は素朴すぎてアレゲ感があるので、実際にやるときはまとめてから放り込んで
            // 取り出す側も↑の構造を前提にして取り出すよーな感じにしたほうがいーかもデスネー
            return await kinesis.PutRecordAsync(request).ConfigureAwait(false);
        }
    }
 
    // Dynamicが嫌な場合はSerialize<T>でおk。とりあえずこの例ではdynamicでやります。
    // Client内部で分配しちゃったほうがきっと自然にやさしい(Publish().RefCount())
    public IObservable<dynamic> ObserveRecordDynamic()
    {
        return Observable.Create<dynamic>(async (observer, cancellationToken) =>
        {
            var isRunningNextPipeline = false;
            try
            {
                // まずShard一覧を取得する
                // TODO:これを使いまわしちゃうとShardsの増減には対応してないよ!
                // 毎回DescribeStream読むのもアレだしたまに問い合わせとかがいいの?
                var describeStreamResponse = await kinesis.DescribeStreamAsync(new DescribeStreamRequest { StreamName = streamName }).ConfigureAwait(false);
                var shards = describeStreamResponse.StreamDescription.Shards;
 
                var nextIterators = new List<string>();
                foreach (var shard in shards)
                {
                    if (cancellationToken.IsCancellationRequested) return; // CancellationTokenの監視だいぢだいぢ
 
                    // ShardIteratorTypeは実際は取り出した位置を記録しておいてAFTER_SEQUENCE_NUMBERでやるか、LATESTでやるかがいーんじゃないでしょーか?
                    var shardIterator = await kinesis.GetShardIteratorAsync(new GetShardIteratorRequest
                    {
                        StreamName = streamName,
                        ShardId = shard.ShardId,
                        ShardIteratorType = ShardIteratorType.TRIM_HORIZON, // TRIM_HORIZON = 最初から, LATEST = 最新, AT_SEQUENCE_NUMBER = そこから, AFTER_SEQUENCE_NUMBER = 次から
                    }).ConfigureAwait(false);
 
                    var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator.ShardIterator }).ConfigureAwait(false);
 
                    // Shardの順番で回してるので、このPushの順番は必ずしも「時系列ではない」ことにチューイ!
                    foreach (var item in record.Records)
                    {
                        PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                    }
 
                    nextIterators.Add(record.NextShardIterator);
                }
 
                // NextShardIteratorがある状態で無限ぐるぐる
                do
                {
                    if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part2
 
                    for (int i = 0; i < nextIterators.Count; i++)
                    {
                        if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part3
 
                        var shardIterator = nextIterators[i];
 
                        var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator }).ConfigureAwait(false);
 
                        // こちらでも、やはりShardの順番で回してるので、状況によって必ずしも時系列にはならないことにチューイ!
                        foreach (var item in record.Records)
                        {
                            PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                        }
 
                        nextIterators[i] = record.NextShardIterator;
                    }
 
                    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // 実質ポーリングなのでなんとなくDelayをちょっと入れてみる
 
                    nextIterators = nextIterators.Where(x => x != null).ToList(); // 明らかに非効率なこの実装はテキトーなんで真面目にやるなら真面目に書いてください:)
                } while (nextIterators.Any());
            }
            catch (Exception ex)
            {
                if (isRunningNextPipeline)
                {
                    throw;
                }
                else
                {
                    observer.OnError(ex);
                }
 
                return;
            }
 
            observer.OnCompleted();
        });
    }
 
    void PushRecord(Record record, IObserver<dynamic> observer, ref bool isRunningNextPipeline)
    {
        using (var sr = new StreamReader(record.Data, encoding)) // item.DataにMemoryStreamの形で1レコードが受け取れる
        using (var jr = new JsonTextReader(sr))
        {
            var obj = serializer.Deserialize(jr);
            isRunningNextPipeline = true;
            observer.OnNext(obj); // 1レコードをPush
            isRunningNextPipeline = false;
        }
    }
}

PutRecordAsyncはまんま、JSONにシリアライズしたデータを投げ込んでるだけです。ObserverRecordDynamicのほうはちょっと複雑っぽいですが、やってることは順に、DescribeStreamAsyncでShard一覧を取得→それぞれのShardでGetShardIteratorAsyncで始点の取得・GetRecordsAsyncで最初のデータを取得しobserverに配信→取得できたNextShardIteratorを元にデータ取得と配信の無限ループ。です。

コメントで色々書いてありますが、Shard単位で処理していくのでレコードのSequenceNumberの順にPushされているわけではないことと、ShardがSplitやMergeで変動することへの対応は必要よね、とか考えることは色々ありますね。あと、Readの制限が5Req/secとかなり少ないので、複数処理する必要があるなら、できればリクエストは分配してやりたいところ。RxならPublishで分配、ついでにRefCountでSubscriberが0になったら購読解除というのが自然に書けるので、その辺も入れてやるといいかなー、なんて思います。とはいえ、基本的にはデータ取ってOnNextで垂れ流すという、それだけに収まってはいます(ほんとだよ!)。

従来はこの手のコードはyield returnで処理するはずですが、それがOnNextに変わっているという事実が面白い!勿論、同期API + yield returnにすることも可能ですが、AWS SDKの同期APIは非同期のものを.Resultで取ってるだけで非同期のほうがネイティブになるので、同期API使うのはお薦めしません。非同期時代のLINQ、非同期時代のイテレータ。中々面白くありません?UniRx - Reactive Extensions for UnityのFromCoroutineでも、IObserverをyielderとして渡して、非同期のイテレータを作れる(コンバートできる)ようにしています。こういうのも一つのデザイン。

like CEP(with LINQPad)

CEP(Complex Event Processing)は最近良く聞くようになりましたねー、MicrosoftにもStreamInsightというかなり立派なプロダクトがあるのですが、あんまり話を聞かないし将来性もビミョーそうなので見なかったことにしましょう。ちなみにStreamInsightは2.1からRxと統合されたりして、この手のイベントストリームとRxとが相性良いこと自体は証明済みです。

そんなわけでMicrosoft周辺では全然聞きませんが、日本だとLINEでのEsper CEPの活用例とかNorikra:Schema-less Stream Processing with SQLで盛んに聞いて、まーたMicrosoft周辺によくある、一歩先を行ったと思ったら周回遅れ現象か!とか思ったり思わなかったり。

というわけで、Norikraの紹介スライドのクエリ5つをRxで書いてみましょう。また、動作確認はLINQPadのDumpでリアルタイムに表示が可能です(asynchronousにクエリが走ってる最中はResultsのところにリアルタイムにグリッドが追加されていく!)

// Queries:(1)
client.ObserveRecordDynamic()
    .Select(x => new{ x.Name, x.Age })
    .Dump();
 
// Queries:(2)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Shibuya")
    .Select(x => new{ x.Name, x.Age })
    .Dump();
 
// Queries:(3)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();
 
// Queries:(4)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.Max(x => x.Age))
    .Dump();
 
// Queries:(5)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Kyoto" && x.Attend[0] && x.Attend[1])
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.User.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

5分間だったらBufferもしくはWindowが使えます(量が少なそうならBufferのほうが、後続クエリにLINQ to Objectsが使えて分かりやすい、量が多いならWindowで、同様にRxで集計クエリが書ける)。他に何ができるかはRxJavaのWikiのOperator一覧でもどうぞ。めちゃくちゃ何でもできます。

SQL vs Rx

SQLである必要は、あるようで、ない。テキストベースのDSLを作るならSQLが共通知識として期待できるので、SQLに寄せる必要性はかなり高い。けれど、Rxならば、LINQとしての共通知識と、C#そのものであるというコンパイルセーフな点と何でもできること、メソッドチェーン(+IntelliSense)による書きやすさ。SQLライクなものを使いたい理由は全くない。

(とはいえ勿論いちだいのRxがぶんさんごりごりのに勝てるとは思ってないんで、そこはまぁかじゅあるなはなしです)

TODO

というわけで見てきたわけですが、まあ所詮まだ単純なコードによるコンセプトレベルの話ですね!本格的にこれからやるとしたら

  • ObservableKinesisClientをもっとしっかりしたものに
  • Kinesis ApplicationをホストするためのServiceとプラグイン機構
  • ログ転送側としてSLABのKinesis用Sink

ですかねえ。まぁ、これらはJavaですでに用意されているamazon-kinesis-clientamazon-kinesis-connectorsを.NET環境で代替するために必要だ、といったところですね。素直にJava書けば?っていうのは一理あるけれど、どーなんですかね、C#でやりたいんですよ(笑)

Semantic Logging Application Block(SLAB)というのは構造化ロガー(正確にはロガーは含まれないけれど)と収集サービスがセットになったライブラリです。面白いのはOut-Of-Processでの動作が選べて、その場合はWindowsネイティブのEvent Tracing for Windows (ETW)経由でログが運ばれるので、非常に高速に動作する、というところ。Sinkというのは出力用プラグインみたいなものです。なので、アプリケーション→EventSourceロガー→SLAB Service(+ KinesisSink)→Kinesis という構造を作ることで、データをリアルタイムに投下するところまでは行ける。あとはRedShiftに送って解析(amazon-kinesis-connectorsには既にありますね)するなり、他のKinesis Application作るなりよしなに出来るかなぁ、できればいいかなぁ、と。ラムダアーキテクチャ、というホドデハ・モチロンナイ。

AWS + Windows(C#)

先週の木・金に開催されたAWS Summit Tokyo 2014にて、AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践と題して、セッションを行いました。

AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践 from Yoshifumi Kawai

まとめで書きましたが、C#+AWSは現実解、だと思ってます。そしてAWSだからといって特別なこともなく、そしてC#だからといって特別なこともない。Kinesisもちゃんと使えるわけだし、結構面白いことがまだまだ出来るんじゃないかな、って思ってます。なんでAzure使わないんですか?というのには、よく聞かれるのでお茶を濁して答えないとして(!)、AzureにもKinesisのようなAzure Event Hubsというものが先週プレビューリリースされました。C#からの活用という点では、こちらにも注目していきたいところです。Event Hubs Developer Guideなんか見ると普通に色々参考になるし、機能的にはHTTP以外にAMQP使えたり、ちょっと強そうではある。

非同期時代のLINQ

この記事はC# Advent Calendar 2013の4日目となります。2012年はMemcachedTranscoder - C#のMemcached用シリアライザライブラリというクソニッチな記事で誰得でした(しかもその後、私自身もMemcached使ってないし)。その前、2011年はModern C# Programming Style Guide、うーん、もう2年前ですかぁ、Modernじゃないですねえ。2011年の時点ではC# 5.0はCTPでしたが、もう2013年、当然のようにC# 5.0 async/awaitを使いまくる時代です。変化は非常に大きくプログラミングスタイルも大きく変わりますが、特にコレクションの、LINQの取り扱いに癖があります。今回は、非同期時代においてLINQをどう使いこなしていくかを見ていきましょう。

Selectは非同期時代のForEach

これ超大事。これさえ掴んでもらえれば十二分です。さて、まず単純に、Selectで値を取り出す場合。

// こんな同期版と非同期版のメソッドがあるとする
static string GetName(int id)
{
    return "HogeHoge:" + id;
}
 
static async Task<string> GetNameAsync(int id)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100)); // 適当に待機
    return "HogeHoge:" + id;
}
 
// 以後idsと出てきたらこれのこと指してるとします
var ids = Enumerable.Range(1, 10);
 
// 同期バージョン
var names1 = ids.Select(x => new { Id = x, Name = GetName(x) }).ToArray();
 
// 非同期バージョン
var names2 = await Task.WhenAll(ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }));

ラムダ内でasyncを書き、結果はIEnumerable<Task<T>>となるので、配列に戻してやるためにTask.WhenAllとセットで使っていくのが基本となります。Task.WhenAllで包むのはあまりにも頻出なので、以下の様な拡張メソッドを定義するといいでしょう。

// こういう拡張メソッドを定義しておけば
public static class TaskEnumerableExtensions
{
    public static Task WhenAll(this IEnumerable<Task> tasks)
    {
        return Task.WhenAll(tasks);
    }
 
    public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> tasks)
    {
        return Task.WhenAll(tasks);
    }
}
 
// スッキリ書ける
var names2 = await ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }).WhenAll();

では、foreachは?

// 同期
foreach (var id in ids)
{
    Console.WriteLine(GetName(id));
}
 
// 非同期
foreach (var id in ids)
{
    Console.WriteLine(await GetNameAsync(id));
}

そりゃそーだ。……。おっと、しかしせっかく非同期なのに毎回待機してループしてたらMottaiなくない?GetNameAsyncは一回100ミリ秒かかっているから、100*10で1秒もかかってしまうんだ!ではどうするか、そこでSelectです。

// 同期(idsがList<int>だとする)
ids.ForEach(id =>
{
    Console.WriteLine(GetName(id));
});
 
// 非同期
await ids.Select(async id =>
{
    Console.WriteLine(await GetNameAsync(id));
})
.WhenAll();

ForEachの位置にSelect。ラムダ式中では戻り値を返していませんが、asyncなので、Taskを返していることになります(Task<T>ではなく)。同期ではvoidとなりLINQで扱えませんが、非同期におけるvoidのTaskは、Selectを通ります。あとはWhenAllで待機してやれば出来上がり。これは全て同時に走るので100msで完了します。10倍の高速化!

ただし、この場合処理順序は保証されません、同時に走っているので。例えばとある時はこうなりました。

HogeHoge:1
HogeHoge:10
HogeHoge:8
HogeHoge:7
HogeHoge:4
HogeHoge:2
HogeHoge:6
HogeHoge:3
HogeHoge:9
HogeHoge:5

処理順序を保証したいなら?WhenAll後に処理ループを回せばいいぢゃない。

// こうすれば全て並列でデータを取得したあと、取得順のままループを回せる
var data = await ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) }).WhenAll();
foreach (var item in data)
{
    Console.WriteLine(item.Name);
}

一旦、一気に詰めた(100ms)後に、再度回す(0ms)。これはアリです。そんなわけで、非同期時代のデータの処理方法は三択です。逐次await, ForEach代わりのSelect, 一気に配列に詰める。どれがイイということはないです、場合によって選べばいいでしょう。

ただ言えるのは、超大事なのは、Selectがキーであるということ、ForEachのような役割を担うこと。しっかり覚えてください。

非同期とLINQ、そしてプリロードについて

さて、SelectだけではただのForEachでLINQじゃない。LINQといったらWhereしてGroupByして、ほげ、もげ……。そんなわけでWhereしてみましょう?

// 非同期の ラムダ式 をデリゲート型 'System.Func<int,int,bool>' に変換できません。
// 非同期の ラムダ式 は void、Task、または Task<T> を返しますが、
// いずれも 'System.Func<int,int,bool>' に変換することができません。
ids.Where(async x =>
{
    var name = await GetNameAsync(x);
    return name.StartsWith("Hoge");
});

おお、コンパイルエラー!無慈悲なんでなんで?というのも、asyncを使うと何をどうやってもTask<bool>しか返せなくて、つまりFunc<T,Task<bool>>となってしまい、Whereの求めるFunc<T,bool>に合致させることは、できま、せん。

Whereだけじゃありません。ラムダ式を求めるものは、みんな詰みます。また、Selectで一度Task<T>が流れると、以降のパイプラインは全てasyncが強いられ、結果として……

// asyncでSelect後はTask<T>になるので以降ラムダ式は全てasyncが強いられる
// これはコンパイル通ってしまいますがkeySelectorにTaskを渡していることになるので
// 実行時エラーで死にます
ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) })
   .OrderBy(async x => (await x).Id)
   .ToArray();

Selectがパイプラインにならず、むしろ出口(ForEach)になっている。自由はない。

ではどうするか。ここは、一度、配列に詰めましょう。

// とある非同期メソッドのあるClassがあるとして
var models = Enumerable.Range(1, 10).Select(x => new ToaruClass());
 
// 以降の処理で使う非同期系のメソッドなり何かを、全てawaitで実体化して匿名型に詰める
var preload = await models
    .Select(async model => new
    {
        model,
        a = await model.GetAsyncA(),
        b = await model.GetAsyncB(),
        c = await model.GetAsyncC()
    })
    .WhenAll();
 
// そうして読み取ったもので処理して、(必要なら)最後に戻す
preload.Where(x => x.a == 100 && x.b == 20).Select(x => x.model);

概念的にはプリロード。というのが近いと思います。最初に非同期なデータを全て取得しまえば、扱えるし、ちゃんと並列でデータ取ってこれる。LINQの美徳である無限リストが取り扱えるような遅延実行の性質は消えてしまいますが、それはshoganai。それに、LINQにも完全な遅延実行と、非ストリーミングな遅延実行の二種類があります。非ストリーミングとは、例えばOrderBy。これは並び替えのために、実行された瞬間に全要素を一度蓄えます。例えばGroupBy。これもグルーピングのために、実行された瞬間に全要素を舐めます。非同期LINQもまた、それらと同種だと思えば、少しは納得いきませんか?現実的な妥協としては、このラインはアリだと私は思っています。分かりやすいしパフォーマンスもいい。

AsyncEnumerableの幻想、或いはRxとの邂逅

それでも妥協したくないならば、次へ行きましょう。まだ手はあります、良いかどうかは別としてね。注:ここから先は上級トピックなので適当に読み飛ばしていいです

そう、例えばWhereAsyncのようにして、Func<T,bool>じゃなくFunc<T,Task<bool>>を受け入れてくれるオーバーロードがあれば、いいんじゃない?って思ってみたり。こんな風な?

public static class AsyncEnumerable
{
    // エラー:asyncとyield returnは併用できないよ
    public static async IEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        using (var e = source.GetEnumerator())
        {
            while (e.MoveNext())
            {
                if (await predicate(e.Current))
                {
                    yield return e.Current;
                }
            }
        }
    }
}

ただ、問題の本質はそんなことじゃあない。別にyield returnが使えなければ手書きで作ればいいわけで。そして作ってみれば、本質的な問題がどこにあるのか気づくことができます。

class WhereAsyncEnumerable<T> : IEnumerable<T>, IEnumerator<T>
{
    IEnumerable<T> source;
    Func<T, Task<bool>> predicate;
    T current = default(T);
    IEnumerator<T> enumerator;
 
    public WhereAsyncEnumerable(IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        this.source = source;
        this.predicate = predicate;
    }
 
    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }
 
    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
 
    public T Current
    {
        get { return current; }
    }
 
    object System.Collections.IEnumerator.Current
    {
        get { return Current; }
    }
 
    public void Reset()
    {
        throw new NotSupportedException();
    }
 
    public void Dispose()
    {
 
    }
 
    // ↑まではdoudemoii
    // MoveNextが本題
 
    public bool MoveNext()
    {
        if (enumerator == null) enumerator = source.GetEnumerator();
 
        while (enumerator.MoveNext())
        {
            // MoveNextはasyncじゃないのでawaitできないからコンパイルエラー
            if (await predicate(enumerator.Current))
            {
                current = enumerator.Current;
                return true;
            }
        }
        return false;
    }
}

MoveNextだけ見てもらえればいいのですが、predicateを使うのはMoveNextなわけです。ここがasyncじゃないと、AsyncなLINQは成立しません。さて、もしMoveNextがasyncだと?

public async Task<bool> MoveNext()
{
    // ここで取得するenumeratorのMoveNextも
    // 全て同一のインターフェイスであることが前提条件なのでTask<bool>とする
    if (enumerator == null) enumerator = source.GetEnumerator();
 
    while (await enumerator.MoveNext())
    {
        if (await predicate(enumerator.Current))
        {
            current = enumerator.Current;
            return true;
        }
    }
    return false;
}

これは機能します。MoveNextをasyncにするということは連鎖的に全てのMoveNextがasync。それが上から下まで統一されれば、このLINQは機能します。ただ、それってつまり、IEnumerator<T>を捨てるということ。MoveNextがasyncなのは、似て非なるものにすぎない。当然LINQっぽい何かもまた、全て、このasyncなMoveNextを前提にしたものが別途用意されなければならない。そして、それが、Ix-Async

Ix-Asyncのインターフェイスは、上で出したasyncなMoveNextを持ちます。

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetEnumerator();
}
 
public interface IAsyncEnumerator<out T> : IDisposable
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}

そして当然、各演算子はIAsyncEnumerableを求めます。

public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate);

これの何が便利?IEnumerable<T>からIAsyncEnumerable<T>へはToAsyncEnumerableで変換できはするけれど……、求めているのはIEnumerable<Task<T>>の取り扱いであったりpredicateにTaskを投げ込めたりすることであり、何だかどうにもなく、これじゃない感が否めない。

そもそも、LINQ to Objectsから完全に逸脱した新しいものなら、既にあるじゃない?非同期をLINQで扱うなら、Reactive Extensionsが。

Reactive Extensionsと非同期LINQ

ではRxで扱ってみましょう。の前に、まず、predicateにTaskは投げ込めません。なのでその前処理でロードするのは変わりません。ただ、そのまま続けてLINQ的に処理可能なのが違うところです。

await ids.ToObservable()
    .SelectMany(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

おお、LINQだ?勿論、Where以外にも何でもアリです。RxならLINQ to Objects以上の山のようなメソッドを繋げまわることが可能です。ところで、ここで出てきているのはSelectMany。LINQ to ObjectsでのSelectの役割を、Rxの場合はSelectManyが担っています。asyncにおいてForEachはSelectでRxでSelectはSelectMany……。混乱してきました?

なお、これの結果は順不同です。もしシーケンスの順序どおりにしたい場合はSelect + Concatを代わりに使います。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Concat()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

ソーナンダー?ちなみにSelectManyはSelect + Mergeに等しい。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Merge()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

この辺のことがしっくりくればRxマスター。つまり、やっぱRxムズカシイデスネ。とはいえ、見たとおり、Rx(2.0)からは、asyncとかなり統合されて、シームレスに取り扱うことが可能になっています。対立じゃなくて協調。自然に共存できます。ただし、単品でもわけわからないものが合わさって更なるカオス!強烈強力!

まとめ

後半のAsyncEnumerableだのIx-AsyncだのRxだのは、割とdoudemoii話です、覚えなくていいです。特にIx-Asyncはただの思考実験なだけで実用性ゼロなので本気でdoudemoiiです。Rxは便利なので覚えてくれてもいいのですが……。

大事なのは、async + Selectです。SelectはForEachなんだー、というのがティンとくれば、勝ったも同然。そして、プリロード的な使い方。そこさえ覚えれば非同期でシーケンス処理も大丈夫。

asyncって新しいので、今まで出来たことが意外と出来なくてはまったりします。でも、それも、どういう障壁があって、どう対処すればいいのか分かっていればなんてことはない話です。乗り越えた先には、間違いなく素晴らしい未来が待っているので、是非C# 5.0の非同期、使いこなしてください。

RxとRedisを用いたリモートPub/Sub通信

今日から始まったBuild Insiderで、RedisとBookSleeveの記事を書きました - C#のRedisライブラリ「BookSleeve」の利用法。Redis、面白いし、Windowsで試すのも想像以上に簡単なので、是非是非試してみて欲しいです。そして何よりもBookSleeve!使うと、強制的に全てがasyncになるので、C# 5.0でのコーディングの仕方のトレーニングになる(笑)。にちじょー的にasync/awaitを使い倒すと、そこから色々アイディアが沸き上がっきます。ただたんにsyncがasyncになった、などというだけじゃなく、アプリケーションの造りが変わります。そういう意味では、Taskは結構過小評価されてるのかもしれないな、なんて最近は思っています。

さて、RedisにはPub/Sub機能がついているわけですが、Pub/Sub→オブザーバーパターン→Rx!これはティンと来た!と、いうわけで、これ、Rxに乗せられました、とても自然に。というわけで、□□を○○と見做すシリーズ、なCloudStructuresにRx対応を載せました。

追加したクラスはRedisSubject<T>。これはSubjectやAsyncSubjectなどと同じく、ISubject<T>になっています。IObservableであり、IObserverでもある代物。とりあえず、コード例を見てください。

var settings = new RedisSettings("127.0.0.1");
 
var subject = new RedisSubject<string>(settings, "PubSubTest");
 
// SubscribeはIObservable<T>なのでRxなLINQで書ける
var a = subject
    .Select(x => DateTime.Now.Ticks + " " + x)
    .Subscribe(x => Console.WriteLine(x));
 
var b = subject
    .Where(x => !x.StartsWith("A"))
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("completed!"));
 
// IObserverなのでOnNext/OnError/OnCompletedでメッセージ配信
subject.OnNext("ABCDEFGHIJKLM");
subject.OnNext("あいうえお");
subject.OnNext("なにぬねの");
 
Thread.Sleep(200); // 結果表示を待つ...
 
a.Dispose(); // UnsubscribeはDisposeで
subject.OnCompleted(); // OnCompletedを受信したSubscriberもUnsubscribeされる

はい、別になんてこともない極々フツーのRxのSubjectです。が、しかし、これはネットワークを通って、Redisを通して、全てのSubscriberへとメッセージを配信しています。おお~。いやまあ、コードの見た目からじゃあそういうの分からないので何も感動するところもないのですが、とにかく、本当に極々自然に普通に、しかし、ネットワークを超えます。この、見た目何も変わらずに、というところがいいところなわけです。

Subscribeする側はIObservableなので別にフツーに合成していけますし、Publishする側もIObserverなので、Subscribeにしれっと潜り込ませたりしても構わない。もう、全然、普通にインメモリなRxでやる時と同じことが、できます。

オワリ。地味だ。

う、うーん。ほ、ほらほら!!

// ネットワーク経由
var settings = new RedisSettings("xx.xxx.xxx.xxx");
 
var subject = new RedisSubject<DateTime>(settings, "PubSubTest");
 
// publisherはこちらのコード
while (true)
{
    Console.ReadLine();
    var now = DateTime.Now;
    Console.WriteLine(now.Ticks);
    subject.OnNext(now);
}
 
// subscriberはこちらのコードを動かす
subject.Subscribe(x =>
{
    var now = DateTime.Now;
    Console.Write(x.Ticks + " => " + now.Ticks);
    Console.WriteLine(" | " + (now - x));
});

というわけで、実際にネットワーク経由(AWS上に立てたRedisサーバーを通してる)で動かしてみた結果がこんな感じです。ネットワークを超えたことで用法は幾らでもある!夢膨らみまくり!

で、↑のコードは遅延時間のチェックを兼ねてるのですが、概ね、0.03秒ぐらい。たまにひっかかって0.5秒超えてるのがあって、ぐぬぬですが。実際のとこRedis/Linuxの設定で結構変わってくるところがあるので、その辺は煮詰めてくださいといったところでしょうか。

ともあれチャットとかなら全然問題なし、ゲームでもアクションとかタイミングにシビアじゃないものなら余裕ですね、ボードゲームぐらいなら全く問題ない。ちょっとしたMMOぐらいならいけるかも。これからはネットワーク対戦はRedisで、Rxで!!!

余談

CloudStructuresですが、.configからの設定読み込み機能も地味につけました。

<configSections>
    <section name="cloudStructures" type="CloudStructures.Redis.CloudStructuresConfigurationSection, CloudStructures" />
</configSections>
 
<cloudStructures>
    <redis>
        <group name="cache">
            <add host="127.0.0.1" />
            <add host="127.0.0.2" port="1000" />
        </group>
        <group name="session">
            <add host="127.0.0.1" db="2" valueConverter="CloudStructures.Redis.ProtoBufRedisValueConverter, CloudStructures" />
        </group>
    </redis>
</cloudStructures>
// これで設定を読み込める
var groups = CloudStructuresConfigurationSection.GetSection().ToRedisGroups();

色々使いやすくなってきて良い感じじゃあないでしょーか。

コールバック撲滅

そうそう、最後に実装の話を。元々BookSleeveのPub/Sub購読はコールバック形式です。「public Task Subscribe(string key, Action<string, byte[]> handler)」handlerはstringがキー, byte[]が送られてくるオブジェクトを指します。コールバック is ダサい。コールバック is 扱いにくい。ので、コールバックを見かけたらObservableかTaskに変換することを考えましょう!それがC# 5.0世代の常識です!

というわけで、以下のようにして変換しました。

public IDisposable Subscribe(IObserver<T> observer)
{
    var channel = Connection.GetOpenSubscriberChannel();
 
    var disposable = System.Reactive.Disposables.Disposable.Create(() =>
    {
        channel.Unsubscribe(Key).Wait();
    });
 
    // ここが元からあるコールバック
    channel.Subscribe(Key, (_, xs) =>
    {
        using (var ms = new MemoryStream(xs))
        {
            var value = RemotableNotification<T>.ReadFrom(ms, valueConverter);
            value.Accept(observer); // この中でobserverのOnNext/OnError/OnCompletedが叩かれる
            if (value.Kind == NotificationKind.OnError || value.Kind == NotificationKind.OnCompleted)
            {
                disposable.Dispose(); // ErrorかCompletedでもUnsubscribeしますん
            }
        }
    }).Wait();
 
    return disposable; // もしDisposableが呼ばれたらUnsubscribeしますん
}

こんなふぅーにRxで包むことで、相当使いやすさがアップします。感動した。

複数の値とC# 5.0 Async再び、或いはAsyncEnumerableへの渇望とRx

以前にReactive Extensions + asyncによるC#5.0の非同期処理では、単体の値であったらasync、複数の値であったらIObservable<T>が使い分け、とかかんとかと言ってましたが、本当にそうなの?もしくは、そもそも複数の値のシチュエーションって分かるような分からないようななのだけど?などなどと思ったりする昨今を如何様にお過ごしでしょうか。というわけで、今回はグッとこの部分に深く迫ってみましょう。

同期的なシチュエーション

さて、例、なのですけれど、データベースで行きましょう。生DataReaderを転がしてます。

// 接続文字列に Asynchronous Processing=true は非同期でやるなら欠かさずに
const string ConnectionString = @"Data Source=.;Initial Catalog=AdventureWorks2012;Integrated Security=True;Asynchronous Processing=true;";
 
// こういうreader.Readが尽きるまで列挙するだけのヘルパーがあるだけで
static IEnumerable<IDataRecord> EachReader(IDbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) connection.Open();
 
        using (var reader = command.ExecuteReader())
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}
 
static void Main(string[] args)
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // LINQでSelectとかいろいろ出来る!便利!抱いて!
        var result = EachReader(conn, "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .ToArray();
 
        // PKでとりたければFirstOrDefualtとかでいいわけです
        var customer = EachReader(conn, "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .FirstOrDefault();
    }
}

ExecuteReaderの結果のIDataRederは、yield returnで列挙してやると、LINQで加工できるようになるので、SelectしてToArrayとか、SelectしてFirstOrDefaultとか、非常にやりやすくて便利なわけです。

ここまでは、いいと思います。じゃあ、非同期でやると、どうするの、と。

内部イテレータ的に考える

.NET Framework 4.5からは主要な非同期メソッドに全てXxxAsyncという名前のものがつきました。ADO.NETにおいては、OpenAsyncやExecuteReaderAyncなどがあります。というわけで、試してみましょう。

// 非同期だったこうしたい、でもこれはコンパイル通らない!
static async Task<IEnumerable<DbDataReader>> EachReaderAsync(DbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}

残念ながら、asyncとyield returnを共存させることはできないので、どうにもなりません。……はい。しかし出来ない、では困る。さすがに、非同期実行するところに全部生のままで、こんなCreateCommand…, ExecuteReaderAsync, …. なんて書いてられないし。

じゃあどうするか、というと、内部イテレータ的にしましょう。つまりList<T>にあるようなForEachです。yield returnが外部イテレータ的であり、それが無理なら、内部イテレータ的にすればいいぢゃない。

// ループを回してる最中に実行するaction引数をつけた(FuncじゃなくてAction<DbDataReader>のオーバーロードも作るとベター)
static async Task ForEachAsync(DbConnection connection, string query, Func<DbDataReader, Task> action)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) await action(reader);
        }
    }
}
 
static async Task<List<Customer>> GetCustomers()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // これで、Selectっぽくできてないこともないと言えなくもない
        var list = new List<Customer>();
 
        await ForEachAsync(conn, "select * from Sales.Customer", async x =>
        {
            var customer = new Customer
            {
                CustomerID = await x.GetFieldValueAsync<int>(0),
                PersonID = !x.IsDBNull(1) ? await x.GetFieldValueAsync<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? await x.GetFieldValueAsync<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? await x.GetFieldValueAsync<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? await x.GetFieldValueAsync<string>(4) : null
            };
            list.Add(customer);
        });
 
        return list;
    }
}
 
// LINQじゃないので匿名型は使えないのね
public class Customer
{
    public int CustomerID { get; set; }
    public int? PersonID { get; set; }
    public int? StoreID { get; set; }
    public int? TerritoryID { get; set; }
    public string AccountNumber { get; set; }
}

ForEachAsync!というわけで、Actionを渡してやって、そこでグルグルッとすることにより制限を回避もどき。ToArrayしたい?古き良きListにAddすればいいぢゃない、といったものですよ、ははは。LINQじゃないので匿名型は使えないがね!

さて、匿名型が使えないのはいいとしても、問題は、LINQの特徴である合成可能性を欠いているところです。一番困るのは、これ、FirstOrDefaultできないね、って。全件取るんですか?まあ、PKだったら一件なのが保証されてるし、そうでないならtop 1とでも書いておけよ、と言えなくもないですが、しかしどうなのよこれ、と。

そんなわけでForEachAsyncと、もう一つ、ExecuteSingleAsyncという名前で、一件のみを列挙するようなものを別途作る必要があります。とはいえ、それでも対応できているのは一件と全件だけ。例えばTakeWhileみたいなのがやりたい、SkipWhileみたいなのがやりたいとなったらどうするの、と。答えは、どうにもなりません。諦めるしかない。

AsyncEnumerableで救われる

どうしても諦められないのならば、AsyncEnumerableを授けましょう。NuGetからIx_Experimental-Asyncを引っ張ってきます。Ix、そう、みんな大好きReactive Extensionsの兄弟なわけですが、しかし紹介しておいてアレですが、このIx_Experimental-Asyncはお薦めはしません!完全に実験的に、できるから、というだけで実装例を掲示してみせてくれたというだけなノリがぷんぷんしているからです。実際、最初のAsync CTPが出た時に公開されて、それから更新されてませんしね……。

ともあれ、どんなコンセプトの代物なのかは見ておきましょう。

// こういうDB列挙用のIAsyncEnumerable/Enumeratorを作る。
// yield returnのようなコンパイラサポートはないので手書きするんだよ!
public class AsyncDbEnumerable : IAsyncEnumerable<DbDataReader>, IAsyncEnumerator<DbDataReader>
{
    DbConnection connection;
    DbCommand command;
    DbDataReader reader;
    string query;
 
    public AsyncDbEnumerable(DbConnection connection, string query)
    {
        this.connection = connection;
        this.query = query;
    }
 
    public IAsyncEnumerator<DbDataReader> GetEnumerator()
    {
        return this;
    }
 
    public DbDataReader Current
    {
        get { return reader; }
    }
 
    public async Task<bool> MoveNext(System.Threading.CancellationToken cancellationToken)
    {
        if (command == null)
        {
            if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
            command = connection.CreateCommand();
            command.CommandText = query;
            reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
        }
 
        if (await reader.ReadAsync())
        {
            return true;
        }
        return false;
    }
 
    public void Dispose()
    {
        reader.Dispose();
        command.Dispose();
    }
}
 
static IAsyncEnumerable<DbDataReader> EachReaderAsync(SqlConnection connection, string query)
{
    return new AsyncDbEnumerable(connection, query);
}
 
static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // 完全にLINQなのでSelectしてToArrayで匿名型もOK
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();
 
        // 勿論FirstOrDefaultもできる
        var customer = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .FirstOrDefault();
    }
}

IAsyncEnumerableの実装は完全に手作りです!真面目に使うなら、AnonymousAsyncEnumerableとかを作って、それを使って構築しますが、今回はてきとーな感じに実装しておきました。てきとーと言っても、ちゃんと動きますよ、はい。

さて、これによってIAsyncEnumerableに変換されたシロモノは、LINQのメソッドが全て使えます。おー、やったね、これなら完璧。

と、言いたいのですが、よーくSelectの中のラムダ式を見ると、ForEachAsyncの時のものと違うのが分かるでしょうか?ForEachAsyncの時はGetFieldValueAsyncといった、値取得まで非同期のものを使いました。でも、今回はそれは使ってない。何故かというと、そこでasyncにしてしまうと戻り値がIAsyncEnumerable<Task<T>>になってしまうから。

何がいけないのか、というと、例えばToArrayした結果で見るとresult[0]はTaskなんですよ。実態はresult[0].Resultとしなきゃあいけません。おまけに、全部Taskということは、実行中かもしれないわけです。じゃあ全部待てばいいのか、 await Task.WhenAll(result) とすればいいか、というと、そうなると、一つのコネクションで複数実行が走る、この場合Connectionは複数実行は許容されていないので、まあ例外が飛んできてしまうでしょう。

じゃあasyncは諦めるの?というと、幾つか手はある。ひとつはAwaitとかいうような拡張メソッドを作って、IAsyncEnumerable<Task<T>> から IAsyncEnumerable<T> に戻すようなものを作ればいい。作るのは、まあ、難しくはないのだけど結局yield returnがないので完全手書きしなきゃならないので面倒なので例は割愛。

もしくはSelectなどのselectorに、ラムダ式がasyncで書かれている場合(戻り値がTaskになっている場合)はawaitするようなオーバーロードがあっても良かったと思うのですがねえ。んー、まあ、それはさすがに大きなお世話っぽいからダメか……。

もしくは、ToArrayなどで外に出すのではなく、ForEachAsyncというメソッドが用意されているので、それを使ってawaitするか。こうなると内部イテレータとやってること同じになってきますが。

とはいえともかく、例に出したAwaitメソッドのような、そういうのがないと、実用性に欠けると言わざるを得ません。また、これはあくまでもAsyncEnumerableであって、IEnumerableへのLINQ to Objectsとは別物です。Select, Where, Firstなどは挙動が同じというだけで、中身は全然違います。で、当然ながらコンセプト実装なので、内部的にもあまりこなれてませんね。なので、Ix_Experimental-Asyncは所詮はコンセプト実装であり、面白いとは思いますし満喫はしましたが、実世的なものとは言いがたいと結論づけます。

IObservable<T>の中へ

と、ここまで見てきたので、最後はRxで〆ましょう。ちなみにRxはバージョンが幾つかありますが、私としてはRx 2.0-RCしか使う気はありません。正直、1.0系とは雲泥の差ですからねえ。

static IObservable<DbDataReader> EachReaderAsync(DbConnection connection, string query)
{
    // CreateAsyncで作る。OnErrorなどは手書きですが、十分に書きやすい
    return Observable.CreateAsync<DbDataReader>(async observer =>
    {
        try
        {
            using (var command = connection.CreateCommand())
            {
                command.CommandText = query;
                if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
                using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                {
                    while (!reader.IsClosed && reader.Read()) observer.OnNext(reader);
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return;
        }
        observer.OnCompleted();
    });
}
 
static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // Rxなので完全にLINQ、awaitableなのでToArrayをawaitしたりも出来るのでノリは完全に一緒
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();
 
        // もちろん、そのままSubscribeしたり、FirstOrDefaultAsyncしたりしてもいい
    }
}

というわけでRxでも生成はしやすいし、出力結果も相当扱いやすい。素晴らしい!けれど、これもIAsyncEnumerableと同様にSelectの中でasyncしちゃうと厄介なことになるので、基本的にはできない。

Await拡張メソッドを作るのは簡単だけど、Rxの場合はスレッドセーフの問題が結構難しくて、うまく決めるのは難しい……。

それに加えて、ほとんど決まりきっている、ただたんに列挙したいだけ、のものにRxを使うというのはやり過ぎ感があり、性能面でちょっとね、と。うーん、言うほど悪くはないし、今まで散々持ち上げといてなんだよそれって感じですが、目の前にTaskが転がっていて、そこまで利点が大きくない中で選ぶか?と迫られたら、選びにくいなあ、って。思ってしまうのです。

おまけ:Entity Framework 6について

Entity Frameworkはロードマップによりバージョン6から非同期対応すると明言しています。また、デザインノートTask-based Asynchronous Pattern support in EF.も公開されていたり、それとEFはソースコードが公開されているのですが、CodePlexから最新版を落としてくれば、そこには既にTaskによる実装が存在しています。

軽く見たところ、EF内部で使う用のAsyncEnumerableを定義してありました。それを使って、非同期系を動かしてましたね。ただ、完全に内部用なので外からは使えないし、色々限定的ですけれど。また、最終的に出力する場合は、やはり内部イテレータ的に、await ForEachAsyncしてListに変換するなりしていました。ふんふん、なるほどねー、と、結構眺めてて面白いのでお薦めです。

まとめ

C# 5.0で非同期は簡単になった!そして、同様に非同期はやはり難しい!こうして案を見ていったわけですが、結局どれを選ぶの?というと、まあ、内部イテレータ案が一番無難で良いと思います。

それにしても、いやあもう頭ぱっつんぱっつんです。で、何でDBがネタになっているかというと、新Micro-ORMライブラリを作成中で(DbExecutor v.Next)、今から作るなら非同期対応しないとかありえないよねー、ということで色々考えてはいるんですが、これが中々にビシッ!としっくり決めるのが大変で。結構良い感じにはなってきてつもりではあるんですが、全然まだまだで。あんま人に見られたくない段階なんですが諸事情で見れる人には見れるようになってしまってて恥ずかちい。

そんなわけでVS2012登場まで、もうすぐです(MSDN会員は一週間切ってる、そうでない人も一ヶ月後)。C# 5.0への備え、できてますか?さあ、カオスな非同期時代に突入しましょう!

Reactive Extensions + asyncによるC#5.0の非同期処理

Reactive Extensions(Rx)の利点ってなんですかー、というと、合成可能なんです!ということです。合成可能って何?というと、LINQが使えるということなんです!です。じゃあ他には、ということで…… 詳しくはこの動画/スライド見るといいです。 Curing Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012 | Channel 9。最初のほうの例が非常に分かりやすいので、とりあえずその部分だけ引っ張ってきますと

// sender, argsの型がふわふわ
exchange.StockTick += (sender, args) => // senderの型が消えてる
{
    if (args.Quote.Symbol == "MSFT")
    {
        // 合成できないからイベントの中でベタ書きしかない
    }
};
 
exchange.StockTick -= /* ラムダ式でイベント登録すると解除不能 */

これが通常のイベントの弱点です。Rxにすると

// <Quote>という型が生きてる
IObservable<Quote> stockQuotes = ...; // 変数に渡せる
 
// LINQクエリ演算子が使える
var msft = stockQuotes
    .Where(quote => quote.Symbol == "MSFT");
 
var subscription = msft.Subscribe(quote => /* */);
 
// イベント解除が容易
subscription.Dispose();

といった感じで、実に素晴らしい!じゃあEventはもうObsoleteでいいぢゃん、というと、まあいいと思うのですがそれはそれとして、C#ネイティブだからこそデザイナがイベントが大前提で考慮されていたり、軽くて実行速度が良かったり、といったところは勿論あります。あとRxだとイベントのIObservable化が面倒だとかもね。この辺は最初から言語サポートの効いてるF#のほうが強いんですよねー。

非同期のリトライ

Visual Studio 2012も、もうRCということで間近感が相当にあります。一方でReactive Extensions v2.0 Release Candidate available now!ということで、こちらも間近感があります。一度2.0使うと1.0には戻れないよ!(NuGetではRx-Main -Preで入れられます)

じゃあRx 2.0の紹介でもしますかー、というと、しません!(ぉぃ)。その前に、asyncとRxの関係性にケリをつけておきましょう。

で、asyncの非同期とRxの非同期はやっぱり使い分けフワフワッという感じ。複数の値が来るときはRxでー、とか言われても、そもそも複数っていうのがそんなにー、とか。あと、それ以外にないの?というと、Rxの合成の強さが非同期にも発揮してRetry処理とか柔軟でー、とか。確かにそれっぽい。けれど、どうもフワッとしてピンと来ないかもしれない。

ので、例を出していきましょう。まず、リトライ処理。リトライ処理を素の非同期で書くと泣きたくなりますが、C# 5.0を使えばasync/awaitで何も悩むことなくスッキリと!

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
    goto RETRY;
}
 
static void Main(string[] args)
{
    var google = DownloadStringAsyncWithRetry("http://google.com/404", 3);
    Console.WriteLine(google.Result);
}

簡単です。さて、ではこれをRxで書くと……

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var req = WebRequest.CreateHttp(url);
 
    // retry処理は.Retryで済む
    using (var res = await req.GetResponseAsync().ToObservable().Retry(retryCount))
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

はい。別にRxとasyncは排他じゃありません。使って効果のあるところに差し込んで、Mixしてやれば、ただでさえ強力なasyncが更に強力になります。TaskとIObservableは変換可能なので、ToObservableして、あとはRetryメソッドを繋げるだけ。そしてIObservableはawait可能(LastAsyncと同じ効果で、最後の値を取る。非同期処理の場合は値が一つなので問題なし)なので、まんまawaitしてasyncと繋げてやればいい。素敵ですねー。

が、上のコードはちょっと間違ってます。どこが間違っているか分かりますか?

エラーの帰ってくるページ(google/404などは404エラーを返してくれるのでテストに楽←別に500にすれば500を返してくれるわけじゃなくて、ただたんに存在しないページだから404なだけで、別にどこでもいいです)を指定してFiddlerなどで観察すれば分かりますが、一回しかリクエスト飛ばしません。Retry(3)としても一回しか飛んでいません。ちゃんとRetryは3回しているのに。

どういうことかというと、GetResponseAsync()の時点でリクエストに失敗しているからです。失敗済みのリクエストに対しては、何回Retryしても失敗しか返しません。ここは本当にはまりやすくて注意所なので、よく気を付けてください!

解決策は、Retryで生成を毎回やり直すこと。Deferで包めばいいです。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    // Retry時に毎回WebRequestを作り直す
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .Retry(retryCount);
 
    // retry処理は.Retryで済む
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

ちょっと罠があるしコードも増えてしまったけれど、それでも、まあ、まだ割といいかな、って感じでしょうか?

さて、リトライは即時じゃなくて一定間隔置いた後にリトライして欲しいってことが多いと思います。同期処理だとThread.Sleepで待っちゃうところですが、それはちょっとスレッド勿体ない。C# 5.0からはawait Task.Delayを使いましょう。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay); // これで待つ
    }
 
    goto RETRY;
}

以前のものにTask.Delayを足しただけで簡単です。わーお、素晴らしい、なかなか強力強烈です。ではRxは、というと、同じように遅延する演算子を足すだけ。Delay、ではダメでDelaySubscription(Rx 2.0から追加)を使います。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    // DelaySubscriptionで遅延させる
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .DelaySubscription(retryDelay)
        .Retry(retryCount);
 
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

できました!できました?いや、これだと初回リクエスト時にも遅延されちゃってて、ちょっとイケてない。修正しましょう。

// 外部変数用意するのがダサい
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => count++ == 0 ? xs : xs.DelaySubscription(retryDelay))
    .Retry(retryCount);

はい、ダサいし、なんだか何やってるのかさっぱりになってきました、サイテー。LetもRx 1.0にはなくて(それ以前にはあったのですが削られた)2.0から復活になります。Letは、一時変数を置かなくて済むというチェーン病にかかった人がお世話になる処方薬です。んなもん読みにくくさせるだけじゃねーか、という感じですが、もしLetがないと変数を置いて var xs = ToObservable(); xs = () ? xs : xs.Delay..; xs = xs.Retry(); としなければならなくて、非常に面倒くさいのです。だから、使いどころを守って乱用しなければ、割とイケてます。結構大事。

が、しかし、これも間違っています!(えー)。というかLetではなくて変数に展開してみるとオカシイとはっきり分かるのですが、Letの内部はRetryとか関係なく一回しか評価されないので、これだと必ずDelaySubscriptionなしのほうしか通りません。この路線で行くなら、更にやけくそでDeferを追加しましょうか。

// Deferだらけとかダサすぎるにも程がある
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => Observable.Defer(() => count++ == 0 ? xs : xs.DelaySubscription(retryDelay)))
    .Retry(retryCount);

ダサすぎて話にならない。Defer連打ダサい。Deferまみれになったら、ちょっと根本から方針を疑いましょうか。ついでに外部変数を使うというのがそもそもダサい。もう少し頑張りましょう!クエリ演算子をこねくり回して、と。

// DelayなしのDeferとDelayありのDeferを連結して、DelayありのみをRetryさせている
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => xs.Catch(xs.DelaySubscription(retryDelay).Retry(retryCount - 1)));

どうでしょう。他にもやりようは色々とあるかもですが、正直ワケガワカラナイのでこの辺でよしておいたほうがマシです。実際のところ、以下のような拡張メソッドを作るのがベストだと思っています。

// 結局これが一番なのではかという結論に至る
public static async Task<string> DownloadStringAsyncWithRetry(this WebClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.DownloadStringTaskAsync(url);
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

new WebClient().DownloadStringAsyncWithRetry(”hogehoge”); だけですからねー。拡張メソッド万歳。Rx最終形のような短さとか魔術っぽさはゼロで面白くも何ともない、というか正直クソつまらないコードなわけですが、そこがC#のC#たる所以ですな、ということで。私はRxのようなクールさも勿論大好きなのですが、こういうイモさもまた、C#らしさであって、現実をより良くするための、目的を忘れない素敵な側面だと思っています。

ちなみにWebRequestの場合はそれ自体の作り直しが必要なので(一度エラーを受けたら何度GetResponseを繰り返してもダメぽ)、拡張メソッドダメです。WebClientはイベントベースなのでTask系と相性がアレで今一つなわけですが、WebRequestはWebRequestで、これベースに拡張メソッドだけで整えるのは無理があるのですね……。

.NET Framework 4.5からはHttpClientというクラスが入るので、それを使うとちょっとだけモダンっぽい雰囲気。

// モダンなドトネト的にはHttpClientかしら
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.GetStringAsync(url);
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

別途System.Net.Httpの参照が必要なのが面倒ですが。

非同期のタイムアウト

Rxが色々できるのは分かったけれど、結局そういう部分って拡張メソッドとかに隔離してアプリケーションコードからは離れるので、やっぱそんなでもないんじゃないの!?というと、あー、まあそうかもねえ、とか思いつつ、複雑になればなるほど効果は加速しますが、そうなるとRxでも(見た目はスッキリしたとしても)やっぱ複雑ですからね。さておき、このままだとアレなのでもう少しまともな例を、一番最初に挙げたCuring Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012から引っ張って来ましょうか。

タイムアウトを追加する例です。WebRequestだとTimeout設定すればいいぢゃーん、ではあるものの、そうではないシチュエーションも沢山ありますから、対策を知っていて損はないです。まず、asyncの例を。

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url);
    var timeout = Task.Delay(TimeSpan.FromSeconds(30));
 
    // これ結構トリッキーですよね
    if (await Task.WhenAny(download, timeout) == timeout)
    {
        throw new TimeoutException();
    }
 
    var html = await download;
    return html;
}

WhenAnyが中々トリッキーですね。慣用句として覚えてしまえばどうってことないのですが……。asyncもただawaitするだけじゃなくて、ちょっと慣れてきたらTask.WaitAll/Any, Task.WhenAll/Anyを使いこなすと、性能的な意味でも表現力的な意味でもグッと広がりますので、探究するのがお薦め。

さて、それをRxでやると……

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url)
        .ToObservable()
        .Timeout(TimeSpan.FromSeconds(30));
 
    var html = await download;
    return html;
}

ToObservableして、Retryの時のようにTimeoutを足すだけ。非常に直観的で、楽ちん、分かりやすい。演算子が豊富なのはRxの強みです。だからRetryにTimeoutがつけられるオーバーロードが最初から用意されていれば、Letとかで複雑になってしまった例もスッキリ仕上がって、ドヤァと言えたんですけどね(笑)

// こういうのを作っておけば!
static IObservable<T> Retry<T>(this IObservable<T> source, int retryCount, TimeSpan retryDelay)
{
    return source.Catch(source.DelaySubscription(retryDelay).Retry(retryCount - 1));
}
 
// 神がかってシンプルに!Rx最強!
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    return await Observable.Defer(() => client.GetStringAsync(url).ToObservable()).Retry(retryCount, retryDelay);
}

標準で足りない演算子は自分で作ればいいので、また、asyncが出来たことで、今まで自作が大変だった演算子も作るのが大分容易になりました!ので、ガンガン作ってしまうといいです。汎用的に使える演算子が集まれば集まるほど、Rxの合成可能という性質が価値を発揮します。

リトライやタイムアウトをC# 4.0でRxなしで書くと

死ぬほど面倒なので書きません。いや無理でしょ常識的に考えて。

まとめ

というわけで、Rxとasyncは手を取り合って仲良く高みを目指せばいいわけです。使いこなしが必要なのはどっちも変わらない!

さて、@ITの連載、Reactive Extensions(Rx)入門 - @IT の次回は非同期のはずですが(聞こえなーい)、ええと、はい、すみません……。ええと、あと次はRx 2.0の強化事項を、ええと、まあそのうちいつか……。はい、すみません。

諸事情あって今色々詰まってて本気でヤバいんですが、それはそれとして、現在全力で一年以上やるやる詐欺だったlinq.jsの改修を進めていまして、これは本当に本当に絶対近日中にベータを出すのでお楽しみに。相当にイイ出来で、割と革命的に凄い内容になってます。いやほんと。かなり自信ありますよ。

他の積みタスクは、ReactiveOAuth(バグ修正のPull Requestを放置中というサイテーな有様、OAuth 2.0対応しないの?とか)、ReactiveProperty(WinRT対応まだー?)、Utakotoha(現在動いてない模様なので要改修)、DbExecutor(全面再構築まだー?DataSet殺すんでしょー?)とかでしょうか、って結構ありますね、うわぉぅ。というかReactive系は2.0対応とWinRT対応をやらなきゃならないので作業量的に面倒くさくて、ついつい手が遠ざかってしまいですね。はい、でも、やります。

にゃー、という感じでブログも結構アレなWebFormsとDataSetディスもそろそろさようならして、通常営業に戻ってきませう。

RxとパフォーマンスとユニットテストとMoles再び

C# Advent Calendar 2011、順調に進んでいますね。どのエントリも力作で大変素晴らしいです。私はこないだModern C# Programming Style Guideというものを書きました。はてブ数は現段階で45、うーん、あまり振るわない……。私の力不足はともかくとしても、他の言語だったらもっと伸びてるだろうに、と思うと、日本のC#の現状はそんなものかなあ、はぁ、という感じではあります。はてブが全てではない(むしろ斜陽?)とはいえ、Twitterでの言及数などを見ても、やっぱまだまだまだまだまだまだ厳しいかなあ、といったところ。Unityなどもあって、見ている限りだと人口自体は着実に増えている感じではありますけれど、もっともっと、関心持ってくれる人が増えるといいな。私も微力ながら尽力したいところです。

ところで、id:ZOETROPEさんのAdvent Calendarの記事、Reactive Extensionsでセンサプログラミングが大変素晴らしい!センサー、というと私だとWindows Phone 7から引っ張ってくるぐらいしか浮かばないのですが(最近だとKinectもHotですか、私は全然触れてませんが……)おお、USB接続のレンジセンサ!完全に門外漢な私としては、そういうのもあるのか!といったぐらいなわけですが、こうしてコード見させていただくと、実践的に使うRxといった感じでとてもいいです。

記事中で扱われているトピックも幅広いわけですが、まず、パフォーマンスに関しては少し補足を。@okazukiさんの見せてもらおうじゃないかReactive Extensionsの性能とやらを! その2のコメント欄でもちょっと言及したのですが、この測り方の場合、Observable.Rangeに引っ張られているので、ベンチマークの値はちょっと不正確かな、と思います。

// 1000回イベントが発火(発火の度に長さ3000のbyte配列が得られる)を模写
static IObservable<byte[]> DummyEventsRaised()
{
    return Observable.Repeat(new byte[3000], 1000, Scheduler.Immediate);
}
 
// 配列をバラす処理にObservable.Rangeを用いた場合
static IObservable<byte> TestObservableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Observable.Range(0, xs.Length, Scheduler.Immediate).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// 配列をバラす処理にEnumerable.Rangeを用いた場合(ForEachはIxのもの)
static IObservable<byte> TestEnumerableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Enumerable.Range(0, xs.Length).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// SelectManyでバラす場合
static IObservable<byte> TestSelectMany()
{
    return DummyEventsRaised().SelectMany(xs => xs);
}
 
static void Main(string[] args)
{
    // ベンチマーク補助関数
    Action<Action, string> bench = (action, label) =>
    {
        var sw = Stopwatch.StartNew();
        action();
        Console.WriteLine("{0,-12}{1}", label, sw.Elapsed);
    };
 
    // 配列をばらすケースは再度連結する(ToList)
    bench(() => TestObservableRange().ToList().Subscribe(), "Ob.Range");
    bench(() => TestEnumerableRange().ToList().Subscribe(), "En.Range");
    bench(() => TestSelectMany().ToList().Subscribe(), "SelectMany");
    // 配列をばらして連結せず直接処理する場合
    bench(() => TestSelectMany().Subscribe(), "DirectRx");
    // byte[]をばらさず直接処理する場合
    bench(() => DummyEventsRaised().Subscribe(xs => { foreach (var x in xs);}), "DirectLoop");
 
    // 実行結果
    // Ob.Range    00:00:02.2619670
    // En.Range    00:00:00.2600460
    // SelectMany  00:00:00.2701137
    // DirectRx    00:00:00.0852836
    // DirectLoop  00:00:00.0152816
}

得られる配列をダイレクトに処理するとして、Observable.Rangeで配列のループを回すと論外なほど遅い。のですが、しかし、この場合ですとEnumerable.Rangeで十分なわけで、そうすれば速度は全然変わってきます(もっと言えば、ここではEnumerable.Rangeではなくforeachを使えば更に若干速くなります)。更に、これは配列を平坦化している処理とみなすことができるので、observerを直に触らず、SelectManyを使うこともできますね。そうすれば速度はほとんど変わらず、コードはよりすっきり仕上がります。

と、いうわけで、遅さの原因はObservable.Rangeです。Rangeが遅いということはRepeatやGenerateなども同様に遅いです。遅い理由は、値の一つ一つをISchedulerを通して流しているから。スケジューラ経由であることは大きな柔軟性をもたらしていますが、直にforeachするよりもずっとずっと遅くなる。なので、Enumerableで処理出来る局面ならば、Enumerableを使わなければなりません。これは、使うほうがいい、とかではなくて、圧倒的な速度差となるので、絶対に、Enumerableのほうを使いましょう。

また、一旦配列をバラして、再度連結というのは、無駄極まりなく、大きな速度差にも現れてきます。もし再度連結しないでそのまま利用(ベンチ結果:DirectRx)すれば直接ループを回す(ベンチ結果:DirectLoop)よりも5倍程度の遅さで済んでいます。このぐらいなら許容範囲と言えないでしょうか?とはいえ、それでも、遅さには違いないわけで、避けれるのならば避けたほうがよいでしょう。

ZOETROPEさんの記事にあるように、ここはばらさないほうが良い、というのが結論かなあ、と思います。正しくは上流ではばらさない。一旦バラしたものは復元不可能です。LINQで、パイプラインで処理を接続することが可能という性質を活かすのならば、なるべく後続で自由の効く形で流してあげたほうがいい。アプリケーション側でバラす必要があるなら、それこそSelectMany一発でばらせるのだから。

例えばWebRequestで配列状態のXMLを取ってくるとします。要素は20個あるとしましょう。最初の文字列状態だけを送られてもあまり意味はないので、XElement.Parseして、実際のクラスへのマッピングまではやります。例えばここではPersonにマッピングするとして、長さ1のIObservable<Person[]>です。しかし、それをSelectManyして長さ20のIObservable<Person>にはしないほうがいい。ここでバラしてしまうと長さという情報は消滅してしまうし、一回のリクエスト単位ではなくなるのも不都合が生じやすい。もしアプリケーション的にフラットになっていたほうが都合が良いのなら、それはまたそれで別のメソッドとして切り分けましょう。

成功と失敗の一本化

ZOETROPEさんの記事の素晴らしいのは、通常のルート(DataReceived)と失敗のルート(ErrorReceived)を混ぜあわせているところ!これもまたイベントの合成の一つの形なわけなんですねー。こういう事例はWebClientのDownloadStringAsyncのような、EAP(Eventbased Asynchronous Programming)をTaskCompletionSourceでラップしてTaskに変換する 方法: タスクに EAP パターンをラップする←なんかゴチャゴチャしていますが、TrySetCanceled, TrySetException, TrySetResultで結果を包んでいます、というのと似た話だと見なせます。

WebClientではEventArgsがCancelledやErrorといったステータスを持っているのでずっと単純ですが、SerialPortではエラーは別のイベントでやってくるのですね。というわけで、私もラップしてみました。

public static class SerialPortExtensions
{
    // 面倒くさいけれど単純なFromEventでのイベントのRx化
    public static IObservable<SerialDataReceivedEventArgs> DataReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.DataReceived += h, h => serialPort.DataReceived -= h);
    }
 
    public static IObservable<SerialErrorReceivedEventArgs> ErrorReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.ErrorReceived += h, h => serialPort.ErrorReceived -= h);
    }
 
    // DataReceived(プラスbyte[]化)とErrorReceivedを合成する
    public static IObservable<byte[]> ObserveReceiveBytes(this SerialPort serialPort)
    {
        var received = serialPort.DataReceivedAsObservable()
            .TakeWhile(e => e.EventType != SerialData.Eof) // これでOnCompletedを出す
            .Select(e =>
            {
                var buf = new byte[serialPort.BytesToRead];
                serialPort.Read(buf, 0, buf.Length);
                return buf;
            });
 
        var error = serialPort.ErrorReceivedAsObservable()
            .Take(1) // 届いたらすぐに例外だすので長さ1として扱う(どうせthrowするなら関係ないけど一応)
            .Do(x => { throw new Exception(x.EventType.ToString()); });
 
        return received.TakeUntil(error); // receivedが完了した時に同時にerrorをデタッチする必要があるのでMergeではダメ
    }
}

成功例と失敗例を合成して一本のストリーム化。また、DataReceivedはそのままじゃデータすっからかんなので、Selectでbyte[]に変換してあげています。これで、ObserveReceiveBytes拡張メソッドを呼び出すだけで、かなり扱いやすい形になっている、と言えるでしょう。パフォーマンスも、これなら全く問題ありません。

MolesとRx

と、ドヤ顔しながら書いていたのですが、とーぜんセンサーの実物なんて持ってませんので動作確認しようにもできないし。ま、まあ、そういう時はモックとか用意して、ってSerialDataReceivedEventArgsはパブリックなコンストラクタないし、ああもうどうすればー。と、そこで出てくるのがMoles - Isolation framework。以前にRx + MolesによるC#での次世代非同期モックテスト考察という記事で紹介したのですが、めちゃくちゃ強力なモックライブラリです。パブリックなコンストラクタがないとか関係なくダミーのインスタンスを生成可能だし、センサーのイベントだから作り出せないし、なんてこともなく自由にダミーのイベントを発行しまくれます。

[TestClass]
public class SerialPortExtensionsTest : ReactiveTest
{
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnCompleted()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        result.Messages.Is(
            OnNext(10, Unit.Default),
            OnNext(20, Unit.Default),
            OnNext(30, Unit.Default),
            OnNext(40, Unit.Default),
            OnCompleted<Unit>(50));
    }
 
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnError()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        /* ↑までOnCompletedのものと共通 */
 
        // 時間35でErrorのイベントを発行
        MSerialPortExtensions.ErrorReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
            OnNext<SerialErrorReceivedEventArgs>(35, new MSerialErrorReceivedEventArgs()));
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        // Exceptionの等値比較ができないので、バラしてAssertする
        result.Messages.Count.Is(4);
 
        result.Messages[0].Is(OnNext(10, Unit.Default));
        result.Messages[1].Is(OnNext(20, Unit.Default));
        result.Messages[2].Is(OnNext(30, Unit.Default));
 
        result.Messages[3].Value.Kind.Is(NotificationKind.OnError);
        result.Messages[3].Time.Is(35);
    }
}

アサーションに使っているIsメソッドは、いつも通りChaining Assertionです。

Molesがいくら強力だとは言っても、イベントをそのまま乗っ取るのはデリゲートの差し替えなどで、割と面倒だったりします。しかし、FromEventでラップしただけのIObservable<T>を用意しておくと…… それを差し替えるだけで済むので超簡単になります。イベント発行については、TestScheduler(Rx-Testingを参照しておく)で、仮想時間で発行する値を作ってしまうと楽です。こういう、任意の時間で任意の値、というダミーの用意もFromEventでラップしただけのIObservable<T>があると、非常に簡単になります。

あとは、scheduler.Startで走らせると(3つの引数はそれぞれcreated, subscribed, disposedの仮想時間、何も指定しないと…… 実は0始まり「ではない」ことに注意。100,200,1000がデフォなので、0はすっ飛ばされています)、その戻り値で結果を受け取って、Messagesに記録されているので、それにたいしてアサートメソッドをしかける。

実に簡単ですね!Molesの力とRxの力が組み合わさると、イベントのテストが恐ろしく簡単になります。素敵じゃないでしょうか?

まとめ

テストなしで書いてたコードは、Molesでテスト走らせたら間違ってました。TakeWhileの条件が==だったのと、Mergeで結合していたり……。はっはっは、ちゃんとユニットテストは書かないとダメですね!そして、Molesのお陰でちゃんと動作するコードが書けたので恥を欠かなくてすみました、やったね。

Reactive Extensionsとスレッドのlock

ぱられるぱられる。もしパラレルにイベントが飛んできたら、どうする?

public class TestParallel
{
    public event Action<int> Log = _ => { }; // nullチェック面倒ぃので
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            Log(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log += x => list.Add(x);
 
        // 実行
        tes.Raise();
    }
}

これは、十中八九、例外が出ます。list.Addはスレッドセーフじゃないので、まあそうだよね、と。では、Rxを使ってみるとどうなるでしょうか。

var list = new List<int>();
var tes = new TestParallel();
 
// イベント登録して
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Subscribe(list.Add);
 
// 実行
tes.Raise();

やはり変わりません。例外出ます。FromEventを中継しているだけですから……。さて、しかし一々Addの手前でlockするのは面倒だ、と、そこでSynchronizeメソッドが使えます。

Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Synchronize()
    .Subscribe(list.Add);
 
// ようするにこんな感じになってる
 
var gate = new Object();
//....
lock(gate)
{
    OnNext();
}

これで、list.Addを問題なく動作させられます。Listとか適度にデリケートなので適当に注意してあげましょう。

Subjectの場合

さて、上のはイベントでしたが、ではSubjectの場合はどうなるでしょう。

public class TestParallel
{
    Subject<int> logMessenger = new Subject<int>();
    public IObservable<int> Log { get { return logMessenger.AsObservable(); } }
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            logMessenger.OnNext(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log.Subscribe(list.Add);
 
        // 実行
        tes.Raise();
    }
}

たまーに例外起こらず処理できることもあるんですが、まあ大体は例外起こるんじゃないかと思います。初期のRxのSubjectは割とガチガチにlockされてたのですが、現在はパフォーマンスが優先されているため挙動が変更され、ゆるゆるです。回避策は同様にSynchronizeを足すことです。

tes.Log.Synchronize().Subscribe(list.Add);

これで問題なし。

余談

手元に残っていた大昔のRxを使って実行してみたら、死ぬほど遅かったり。確実に現在のものはパフォーマンス上がっていますねえ。あと、なんかもう最近面倒でeventだからってEventArgs使わなきゃならないなんて誰が言ったー、とActionばかり使うという手抜きをしてます。だってsenderいらないもん、大抵のばやい。

ReactiveProperty ver 0.3.0.0 - MとVMのバインディングという捉え方

今回の更新よりアイコンが付きました。専用のアイコンがあると、とっても本格的な感じがしますねー。色はRxにあわせて紫-赤紫。デザインは私の好みな幾何学的な感じです。@ocazucoさんに作って頂きました、ありがとうございます!色々ワガママ言ってお手数かけました。

ReactiveProperty - MVVM Extensions for Rx - ver 0.3.0.0

Rxとは何か、というとIObservable<T>と「見なせる」ものを合成するためのライブラリです。だから、見なせるものさえ見つかれば、活躍の幅は広がっていく。ReactivePropertyは色々なものを、そのように「見なして」いくことで、RxでOrchestrateできる幅をドラスティックに広げます。土台にさえ乗せてしまえば、あとはRxにお任せ。その場合に大切なのは、土台に乗せられるよう、閉じないことです。しかし、もし閉じているのなら、開くための鍵を提供します。

デフォルトモード変更

ReactivePropertyのデフォルトモードが DistinctUntilChanged|RaiseLatestValueOnSubscribe になりました。今まではRaise…が入ってなかったのですが、思うところあって変わりました。例えばCombineLatestは、全てが一度は発火していないと動き出しません。ReactiveCommandの条件に使うなどの場合にRaiseしてくれないと不都合極まりなく、かつ、Subscribeと同時にRaiseすることによる不都合なシーンは逆に少ない。ことを考えると、必然的にデフォルトをどちらに振るべきかは、分かりきった話でした。

そのことは0.1の時、サンプル作りながら思ってたんですが悩んだ末に、省いちゃったんですねえ。RaiseLatestValueOnSubscribeが入ると不便なシーンもある(initialValueを設定しないとまず最初にnullが飛んでいくとか)ので、どちらを取るかは悩ましいところではあるんですが、シチュエーションに応じて最適なほうを選んでください、としか言いようがないところです。

ToReactivePropertyAsSynchronized

長い。メソッド名が。

これは何かというとINotifyPropertyChanged->ReactiveProperty変換です。今までもObservePropertyメソッド経由で変換できましたが、それは一度IObservable<T>に変換するため、Model→ReactivePropertyという一方向のPushでしかありませんでした。Two-wayでのバインドで値の同期を取りたい場合は、今回から搭載されたToReactivePropertyAsSynchronizedを使ってください。

// こんな通知付きモデルがあるとして
public class ObservableObject : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set
        {
            name = value;
            PropertyChanged(this, new PropertyChangedEventArgs("Name"));
        }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}
 
// それを使ったViewModelを作るなら
public class TwoWayViewModel
{
    public ReactiveProperty<string> OneWay { get; private set; }
    public ReactiveProperty<string> TwoWay { get; private set; }
 
    public TwoWayViewModel()
    {
        var inpc = new ObservableObject { Name = "ヤマダ" };
 
        // ObservePropertyを使うとIObservable<T>に変換できます
        // ラムダ式でプロパティを指定するので、完全にタイプセーフです
        // それをToReactivePropertyすればOneWayで同期したReactivePropertyになります
        OneWay = inpc.ObserveProperty(x => x.Name).ToReactiveProperty();
 
        // ToReactivePropertyAsSynchronizedで双方向に同期することができます
        TwoWay = inpc.ToReactivePropertyAsSynchronized(x => x.Name);
    }
}

INotifyProeprtyChangedなModelをReactivePropertyなViewModelに持っていきたい時などに、使いやすいのではと思います。また、同期する型が異なっていても対応することができます。コンバーターのようにconvertとconvertBackを指定してください。

ReactiveProperty.FromObject

こちらもToReactivePropertyの亜種ですが、ReactiveProperty→Modelというソース方向への片方向の同期を取ります。ModelはINotifyPropertyChangedである必要はありません。

// こんなただのクラスがあったとして
public class PlainObject
{
    public string Name { get; set; }
}
 
// それと同期させたいとき
public class OneWayToSourceViewModel
{
    public ReactiveProperty<string> OneWayToSource { get; private set; }
 
    public OneWayToSourceViewModel()
    {
        var poco = new PlainObject { Name = "ヤマダ" };
 
        // ReactiveProperty.FromObjectで変換することができます
        // この場合、ReactiveProperty -> Objectの方向のみ値が流れます
        OneWayToSource = ReactiveProperty.FromObject(poco, x => x.Name);
    }
}

片方向の同期が定型的な局面、例えば設定クラスなんかは通知は必要ないと思うのですが、それをUIから一方向で値を投影したい場合に、これを使うことで楽になると思います。

また、Sampleにこれら3つの解説を追加しましたので、実際にどう反映されるのか、動きを確認したい場合はそちらを見てください。

CombineLatestValuesAreAllTrue

長い。メソッド名が。これはReactive Extensionsお題 - かずきのBlog@Hatenaに書かれているもので、使うシーンよくありそうな頻出パターンになりそうだと思ったので、お借りすることにしました。ありがとうございます。使い方を見てもらったほうが速いので、まず例を。

Microsoft Silverlight を入手

<StackPanel>
    <StackPanel Orientation="Horizontal">
        <CheckBox IsChecked="{Binding IsCheckedA.Value, Mode=TwoWay}">Check A</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedB.Value, Mode=TwoWay}">Check B</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedC.Value, Mode=TwoWay}">Check C</CheckBox>
    </StackPanel>
    <Button Command="{Binding ExecCommand}">全部チェックで押せる</Button>
</StackPanel>
// using Codeplex.Reactive.Extensions; (これを忘れないように)
 
public class MainPageViewModel
{
    public ReactiveProperty<bool> IsCheckedA { get; private set; }
    public ReactiveProperty<bool> IsCheckedB { get; private set; }
    public ReactiveProperty<bool> IsCheckedC { get; private set; }
    public ReactiveCommand ExecCommand { get; private set; }
 
    public MainPageViewModel()
    {
        IsCheckedA = new ReactiveProperty<bool>();
        IsCheckedB = new ReactiveProperty<bool>();
        IsCheckedC = new ReactiveProperty<bool>();
 
        ExecCommand = new[] { IsCheckedA, IsCheckedB, IsCheckedC }
            .CombineLatestValuesAreAllTrue()
            .ToReactiveCommand();
 
        ExecCommand.Subscribe(_ => MessageBox.Show("しんぷる!"));
    }
}

3つのチェックボックスが全てONなら実行可能なコマンドを作る、です。こんな風に、全てがtrueの時、といった集約をしたい場合に便利に使うことができます。プレゼンテーションロジック、に該当する部分だと思いますが、ここでもRxは十分以上に活躍できます。また、外部からCanExecuteChangedをぶっ叩くようなカオティックなこともしません、ReactiveCommandならね。

ReactiveTimer

Timerです。.NETはTimerが山のようにあります。Threading.Timer, Timers.Timer, Forms.Timer, DispatcherTimer, Observable.Timer。ここにまたReactiveTimerという新たなるTimerが誕生し、人類を混乱の淵に陥れようとしていた……。まさにカオス。

ちょっと整理しましょう。まず、Threading.Timerは一番ネイティブなTimerと捉えられます。そのままだと少しつかいづらいので、軽くラップしてイベントベースにしたのがTimers.Timer。Forms.TimerとDispatcherTimerは、それぞれのアプリケーション基盤で時間を計って伝達してくれるというもの、UI系でのInvokeが不要になるので便利。と、それなりに役割の違いはあります。微妙な差ですが。

最後のObservable.TimerはIObservableで通達してくれるのでRxと非常に相性が良いタイマー。また、タイマーを行う場所もISchedulerで任意に指定できるので、ThreadPoolでもDispatcherでもCurrentThread(この場合はSleepで止まるので固まりますけどね)でも、もしくは仮想スケジューラ(任意に時間を動かせるのでテストが簡単になる)でも良いという柔軟さが素敵で、Rx以降のプログラミングではタイマーなんてObseravble.Timer一択だろ常識的に考えて。という勢い。(精度は若干落ちるので、よほど精度を求める時はThreading.Timerを使いましょう)。だと思っていた時もありました。

一時停止出来ないんですよ、Observable.Timer。発動したらしっぱなし。Stopはできる(Disposeする)けど、そうしたら再開は出来ない。それじゃあ困る場合があります!はい。結構あります。そういう場合はTimers.TimerをFromEventでラップする。それはそれで良いのですが、Observable.TimerのISchedulerを指定可能という柔軟さを捨てるのは勿体無いなあ、と思ったのでした。

そこで、今回ReactiveTimerを作りました。機能は、Observable.TimerのStop/Start出来る版です。

[TestClass]
public class ReactiveTimerTest : ReactiveTest
{
    [TestMethod]
    public void TimerTest()
    {
        // テスト用の自由に時間を動かせるスケジューラ
        var testScheduler = new TestScheduler();
        var recorder = testScheduler.CreateObserver<long>();
 
        // 作成時点では動き出さない
        var timer = new ReactiveTimer(TimeSpan.FromSeconds(1), testScheduler);
        timer.Subscribe(recorder); // Subscribeしても動き出さない
 
        timer.Start(TimeSpan.FromSeconds(3)); // ここで開始。初期値を与えるとその時間後にスタート
 
        // 時間を絶対時間10秒のポイントまで進める(AdvanceTo)
        testScheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
 
        // MessagesにSubscribeに届いた時間と値が記録されているので、Assertする
        recorder.Messages.Is(
            OnNext(TimeSpan.FromSeconds(3).Ticks, 0L),
            OnNext(TimeSpan.FromSeconds(4).Ticks, 1L),
            OnNext(TimeSpan.FromSeconds(5).Ticks, 2L));
 
        timer.Stop(); // timerを止める
        recorder.Messages.Clear(); // 記録をクリア
 
        // 時間を現在時間から5秒だけ進める(AdvanceBy)
        testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
 
        // timerは止まっているので値は届いてないことが確認できる
        recorder.Messages.Count.Is(0);
    }
}

そう、単体テストしたい場合は、TestSchedulerに差し替えれば、AdvancedBy/Toによって、時間を自由に進めることが可能になります。Assertに使っているIs拡張メソッドはChaining Assertionです。Testing周りの詳しい解説はRx-Testingの使い方 - ZOETROPEの日記に書かれています。

CountNotifier/BooleanNotifier

SignalNotifierという名前はよく分からないので、今回よりCountNotifierに変更しました。また、名前空間をNotifiersに変更しました。更に、二値での通知を行うBooleanNotifierを新規追加しました。どちらも、IObservable経由での通知を行うフラグです。

// using Codeplex.Reactive.Notifiers;
 
// 通知可能(IObservable)なboolean flag
var boolFlag = new BooleanNotifier(initialValue: false);
boolFlag.Subscribe(b => Console.WriteLine(b));
 
boolFlag.TurnOn(); // trueにする, trueの状態だったら何もしない
boolFlag.Value = false; // .Valueで変更、既にfalseの状態でも通知する
boolFlag.SwitchValue(); // 値を反転させる
 
// 通知可能(IObservable)なcount flag
var countFlag = new CountNotifier();
countFlag.Subscribe(x => Console.WriteLine(x));
 
countFlag.Increment(); // incしたり
countFlag.Decrement(); // decしたりの状態が通知される
 
// Empty(0になった状態)という判定でフィルタして状態監視したりできる
countFlag.Where(x => x == CountChangedStatus.Empty);

例えば非同期処理を行う際などの、状態の管理に使うことができます。

Pairwise

neue cc - Reactive Extensionsで前後の値を利用するで書いた、前後の値をまとめる拡張メソッドです。

// { Old = 1, New = 2 }
// { Old = 2, New = 3 }
// { Old = 3, New = 4 }
// { Old = 4, New = 5 }
Observable.Range(1, 5)
    .Pairwise()
    .Subscribe(Console.WriteLine);

古い値と新しい値を使って何かしたい場合などにどうぞ。

CatchIgnore

例外処理用に、OnErrorRetryというものを用意していましたが、今回それ以外にCatchIgnoreを追加しました。

// 1, 2
Observable.Range(1, 5)
    .Do(x => { if (x == 3) throw new Exception(); })
    .CatchIgnore()
    .Subscribe(Console.WriteLine);

ようするに、CatchしてEmptyを返す手間を省くためのものです。onErrorにe => {}と書くのと似てますが、シーケンスの途中で捕まえれるので、メソッドチェーンの繋ぎ方によっては全然異なる役割を持つ可能性があります。

その他の削除やバグ修正や見送ったものなど

RxのExperimental版が更新されてたので、それに合わせました。Rxの更新内容はZipとCombineLatestに大量のオーバーロード+配列を受け入れるようになったので、何でも結合できるようになりました。それにともないReactivePropertyでは独自拡張としてCombineLatestのオーバーロードを用意していたのですが、Experimental版のみ削除しました。パフォーマンスもExperimentalのもののほうがずっと良いので、早くStableにも降りてきて欲しいです。

WebRequestのUploadValuesで、値が&で連結されていないという致命的なバグがあったので修正しました。本当にすみません……。また、Silverlightでデザイン画面がプレビューできなくなる不具合を修正しました。デザインモード怖い。

バリデーション周りは、ちょっと大きめに(といっても内部だけの話で外部的には変わらない予定)変更入れようと思ってたのですが、それは次回で。あと、同期系メソッドもバリデーションの成否によって同期するかしないかを決定しようかなあ、とか思うんですが、ちょっと大変なので後になりそう。

まとめ

今回はデータリンクを主眼に置きました。デフォルトモードの変更もその一環です。直接的に意味を見るのなら、厚めのMをスマートにVMとシンクロナイズさせる、ということになります。冒頭の台詞、閉じた世界を開けるための道具です。ObserveProperty(OneWay)、ToReactivePropertyAsSynchronized(TwoWay)、ReactiveProperty.FromObject(OneWayToSource)。

OneWayとかTwoWayとかOneWayToSourceというとおり、VMとMの間のバインディングエンジンだと見ることができます。VとVMの間をWPFなりのフレームワークが担い吸収するように、ReactivePropertyはVMとMの間を吸収します。手書きでバインディングだと、ボイラープレートでは手間だし見通しも悪くなる。このほうが、ずっと、楽だし自然に書けます。

ReactivePropertyはV-VM間の接続も担うため、結果として全てがV-VM-M-VM-Vとして一つに繋がる。何をどう組もうと自然に一つに繋がっていく。わくわくしませんか?むしろカオスの予感がする?けれど、カオスの先に本当の光がある、……かもしれない。

ちなみに同期系のものはみんなプロパティ指定だけでGetとかSetとか自動でやっていますが、動的コード生成(&キャッシュ)によりハイパー高速化されているので、パフォーマンス上の問題はありません。そこは安心してください。というと何か凄そうなことやってる気がしますが、勿論そんなことはなくて、偉大なるExpressionTreeに全面的にお任せしているだけだったり。

Reactive Extensionsで前後の値を利用する

@Toya256tweetさんの作成されたDependency Variable Libを見て、ReactivePropertyでも大体再現できるかなあ、でもOldValueとNewValueのところが少し面倒なのよね、というところで一例。ReactivePropertyの値の変更時に、古い値と新しい値を同時に得られるようにします。

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Zip(p.Skip(1), (Old, New) => new { Old, New })
    .Subscribe(a => Console.WriteLine(a.Old + " -> " + a.New));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

挙動は@okazukiさんの解説されている通りです。残念ながら、頭やわらかい、というわけではなくて頻出パターンのイディオムなだけなので、ただたんに覚えているから、というだけです、がくり。まあ、LINQにせよRxにせよ、メソッドの組み合わせで成り立っているということは、パターン化しやすいということなのですね。イディオムを知っていればいるほど、更にそのイディオムを組み合わせて、と、手法は無限に広がっていきます。

私は非同期をvoidにしてモデル作り込むっての好きくないです。IObservableなりTaskなりを返してくれれば、先があるのですが、そうでないとやりようがないですから。例えばデータモデル考え中 - 急がば回れ、選ぶなら近道で示される「2」のパターンが、Silverlightなどでの従来のやり方だったと思われます。実行のトリガーだけを外から渡して、モデルの中で結果は閉じる。変更はINotifyPropertyChanged経由で通知。正直言って、私はこのやり方はナシだと思っています。スパゲティになりがちだから。Rxは「3」のパターンに近いと思います。順序の制御は、まさにミドルウェア足るReactive Extensionsが保証する。柔軟性は見ての通りで、無限の広がりがあります。

今まではコールバックしかなかったので必然的に2に収まらざるを得なかったですが、今はRxもあるし、C#5.0からはawaitもあるし、なので、モデルの作り方も「変わっていく」と思います。Viewの機能の強さによってViewModelのありようが変わるように、言語やフレームワークの機能の強さによってModelのありようが変わるのは当然でしょう。

ScanとPairwise

さて、自分自身と結合というのは、結局のところ二つ購読しているということなので、これはIObservableがHotでないと成り立ちません(ReactivePropertyはHotです)。というわけで、ColdなIObservableでも対応したい時はScanを使うといいでしょう。HotとかColdとか何言ってるのか分からないという場合はReactive Extensions再入門 その5「HotとCold」 - かずきのBlog@Hatenaを読むといいでしょう。最近、自分で解説してるのを放棄しだしてる気がするよくない傾向、ではなくて、次回のReactive Extensions(Rx)入門 - @ITではまさにObservable.TimerとフツーのTimerを使ってColdとHotの解説しようと思ってたのですよ!ネタ被った、けれど気にせず書きます:)

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
var oldNewPair = p.Scan(Tuple.Create(0, 0), (t, x) => Tuple.Create(t.Item2, x)).Skip(1);
 
oldNewPair.Subscribe(Console.WriteLine);
 
p.Value = 10; // (1, 10)
p.Value = 100; // (10, 100)

Scanは自分自身の前の値を参照できるので、色々と応用が効きます。値の入れ物のための初期値は不要なのでSkip(1)で除去してやるのがポイント。

もう一つ、メソッドの組み合わせでのパターン化、というのは、つまりパーツ化しやすいということでもあります。拡張メソッドに分離してやりましょう。

public static class ObservablePairwiseExtensions
{
    // OldNewPair<T>はReactivePropertyに入っています
    // using Codeplex.Reactive.Extensions;
 
    public static IObservable<OldNewPair<T>> Pairwise<T>(this IObservable<T> source)
    {
        return source.Scan(
                new OldNewPair<T>(default(T), default(T)),
                (pair, newValue) => new OldNewPair<T>(pair.NewItem, newValue))
            .Skip(1);
    }
 
    public static IObservable<TR> Pairwise<T, TR>(this IObservable<T> source, Func<T, T, TR> selector)
    {
        return source.Pairwise().Select(x => selector(x.OldItem, x.NewItem));
    }
}
 
// ↑というような拡張メソッドを作ってやったとして
var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Pairwise().Subscribe(x => Console.WriteLine(x.OldItem + " -> " + x.NewItem));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

OldNewPairを使ったのは、TupleがSL/WP7にないから、というのと、OldItemとNewItemというプロパティ名に意味があって、分かりやすいから、です。基本的にC#でTupleを使うことはあんまないですね。LINQのパイプライン内でならば匿名型、それを超えるなら面倒くさくてもクラスを立ててあげたほうがいいと、私は思っています。勿論、今後Tupleのための構文やパターンマッチが入るとしたら別ですけど。というか、つまるところ専用構文がない状態ではTupleを使うメリットはそんなにないのです。匿名型かわいいよ匿名型。言語比較の際に、C#はTupleがこんな腐ってるぜー、とかやられるのはちょっと勘弁願いたいところ(まぁでも普通に敵いませんのは認めます、けれど言語・IDE・フレームワークは三位一体だとも思っています。引き離して単独で評価することには、あまり価値を感じません。IDEでうまく機能することを優先した言語、それを前提にしたフレームワーク。どの要素も引き離せませんから。はいはい、C#がお好きなんですね、という感じですが、でも例えばHTML/ブラウザというGUIフレームワークの上だったらJavaScriptがベストだ、といった捉え方でもありますね)

それはともかくとして、Pairwiseは多用しそうなので、次のReactiveProperty(ver.0.3)で入れたいと思います(あとOldNewPairのToStringのオーバーライド)。ちなみにlinq.js - LINQ for JavaScriptにはPairwise、入ってます。そう、Rxでの頻出パターンということは、それはIx(Enumerable)にも存在するパターンなのです。この辺がRxの面白いところです!私にとって、こういった書き方の初出は前後の値も利用したシーケンス処理 - NyaRuRuの日記でした。

ObserveChanged

突然出てきたOldNewPairですが、これが既にReactiveProperty内で定義されているのは、ObservableCollectionの拡張メソッド群で使用しているからです。今まで紹介していなかったと思うので、ここで紹介しましょう。

// using Codeplex.Reactive.Extensionsとすると
// ObservableCollection<T>に(ReactiveColelctionとか継承したものでも可)
// ObserveXxxChangedという拡張メソッドが利用できる
var collection = new ObservableCollection<int>();
 
// 追加されたのを監視できる、IObservable<T>
collection.ObserveAddChanged()
    .Subscribe(x => Console.WriteLine("Add:" + x));
// 削除されたのを監視できる、IObservable<T>
collection.ObserveRemoveChanged()
    .Subscribe(x => Console.WriteLine("Remove:" + x));
// 置換を監視できる、IObservable<OldNewPair<T>>
collection.ObserveReplaceChanged()
    .Subscribe(p => Console.WriteLine(p.OldItem + "→" + p.NewItem));
// リセットを監視できる、IObservable<Unit>
collection.ObserveResetChanged()
    .Subscribe(_ => Console.WriteLine("Clear"));
 
collection.Add(100); // Add:100
collection.Add(1000); // Add:1000
collection[1] = 300; // 1000→300
collection.Remove(100); // Remove:100
collection.Clear(); // Clear

この手の監視では、通常CollectionChangedイベント経由でNotifyCollectionChangedEventArgsを使って値を取り出すわけですが、型がobject[]なので一々キャストしたりなど、非常に使いにくいと思っていました。ObserveXxxChangedを使えば、完全にタイプセーフで、値も取り出しやすい形に整形してくれています。是非是非どうぞ。

まとめ

@Toya256tweetさんにも示唆頂いたのですが、ReactivePropertyはMVVMに限定されない、汎用的なものだと考えています。値の導出ルールを宣言的に書く、というのは色々なところで使える、気がします。でもやはり、Functional Reactive Programmingが全然流行ってないことを考えても、ルールによって自動的に変動する値って、基本的にGUI向けなのだろうなあ、って。そして、GUIで強いのはやっぱJavaとか.NETといったFRP不毛地帯なので、流行るなんて考えられないことでした。しかし、今は違う。C#にはRxが来た。C#で実現できるのならば、強力なGUIプラットフォームが目の前にあるわけなので、かなり可能性はあるんじゃないかな!と思いたいところです。

d.y.d. - ReaJ / Reactive JavaScriptの例は

// RaiseLatestValueOnSubscribeはv0.3ではデフォルトに変更する予定
var mode = ReactivePropertyMode.RaiseLatestValueOnSubscribe;
 
var x = new ReactiveProperty<int>(10, mode);
var y = x.Select(n => n + 100).ToReactiveProperty(mode: mode);
x.Value = 20;
x.Value = 30;
Console.WriteLine(y.Value); // 130

まあ、不格好です。ReactiveProperty用の専用構文でも用意してくれないとね、rp x = 10; rp y = x + 100; とかで上記の形に整形されたら素敵なのですが。というのはともかくとして、一応、実現できています。GUI環境への反映はWPFのバインディング機構に投げて解決ですし。JavaScriptにおいても、ReactivePropertyを移植して、ベースとしてKnockout.js辺りを採用すればいい感じに実用的になりそうです。その辺は追々やっていきたいところ。

勿論、Rx自体の可能性はGUI(や非同期)だけに閉じているわけではないので、全く別なところでの可能性、使い道というのも追い求めていきたいです。

ともあれともかく、ReactiveProperty、試してみてくださいな。

Rx連載開始とRx本感想とZenbook買ったという話

まーたブログを放置気味な昨今は大変よろしくなく、だらだらTwitterを眺めているだけで一日が終わる症にかかっています。さて、そんなわけですが、@ITにてRx入門の連載を開始しました。

導入なので細かいことは言わず、なんか凄そう!と思ってもらえればいいなー、という構成にしました。用語もそんな並べず、でも、ところどころ引っかかるワードがある、言い方を悪くするとハッタリ気味に、印象に残ってくれればいいなあ、と。初回とはいえ、導入だけであっさり終わってしまったのはちょっと反省。どんな形になるのかイマイチ掴めなくて、もう少し書けばよかったな、と思ってます。あと、図をもう少し入れるべきだったな、と……。そんなこんなな反省を生かし、次回はボリューム増でお送りします。

@okazukiさんがReactive Extensiions再入門を始めたり、@zoetroさんがRx-Testingについて詳しい記事を書かれていたり(これは素晴らしい!)、Rxも盛り上がってきた感じがしますね!それは気のせいです。というだけで終わらせないよう、ガンガン行きましょうー。

Rx本

(やっと)発売されました。まず印象ですが、薄いです。私は電子書籍で買っちゃってるのでリアルな厚さは分からないんですが、180ページです。それでこの値段かよ、という不満を最初は持ってしまうかもしれません。あと、LINQ and Rxです。どういうことかというと中身の半分はフツーのLINQの話です。それを差っ引くとRxは90ページしかありません。更にRxJSやReactiveUIの話もあります。そこを差っ引くと50ページぐらいしかないじゃないかゴルァ。というわけで、Rx本として考えると、分量には不満が残ると思われます。全てのメソッドをカバーする、という内容でもないのでリファレンスとしても使えません。

とはいえ、要素要素は満遍なくカバーできているのと、現在唯一のRx本ではあるので、本で学びたいなあ、と思うならこれしか選択肢はありません。WindowやJoin、Testingなんかは(私の怠慢により)このブログでは少しも紹介していないので、それらを知りたい方や、 Web上から断片的な情報を拾って組み上げるのは手間なので、購入するのは十分アリだとは思います。まあ、私の@IT連載が完了したら、第一の選択肢はそれを見ること、になります(キリッ。となれるように、頑張ります。

ASUS Zenbook UX31

どうでもいいんですが、UX31を買いました。Intelの推奨するUltrabookの第一弾の中では大本命の一品です。さて、Ultrabookとは何か、というと、ようするところWindows版Macbook Airです。薄く軽く速い。宗教上の理由で林檎はお断りだ!な人にとっては救いの手なわけです。内心羨ましいとか思ってたりしたんだからね!しかしですよ、Win系の勉強会ではそうでもないですが、それ以外の勉強会でのMac率の高さといったら!会場の9割がMacだよ、とかドヤ顔でツイートされた日には!多様性は善、はどこに行ったんだよという話です。

側面は、実用的な意味ではフルフラットのほうが良いのでしょうし、特に日本メーカーはそこに拘っている印象があるのですが、審美的にはこうした処理をしたほうがいいですね、視覚上のトリックとはいえ、圧倒的に薄く見えるので。ちなみに側面から見ると本当にMacbook Airソックリでパク……と口から出てしまうのも已むを得ないかな、とは思いますが、それ以外の部分はそんなに似てるわけでもないですよ。

ZenbookはSSDがSATA 3.0(6Gbps)ということもあって、滅茶苦茶速いですね。今後はこの速度がスタンダードになっていくのかと思うと、いい時代です、ほんと。

その他の印象ですが、キーボードはまぁまぁ、タッチパッドはサイテー。タッチパッドはキー入力中の誤動作率の高さ(位置とか大きさが悪いのだろうなあ)も酷くてストレスフル。基本はマウス使いますけれど、いつもマウス持ち歩くのもねえ。ああ、あと、UX31はUltrabookの中で唯一解像度が1600×900と高い(他は1366×768)のがポイントです。フルHDじゃないのかよ!とVAIO Zのオーダーメイドな人が言ってくるかもしれませんが、まぁVAIO Zは店頭モデルはともかくフルHDでオーダーすると高いですからね。こっちは10万円なのでコストパフォーマンスが違うわけです、はい。あと、13インチで1600×900は程良いですよ。フルHDだとちょっと文字が細かすぎになる感も。

総じて満足度は高くお薦めなので、是非買ってください(上のリンク先から!)

悲しいことに私はいきなりACアダプタのコネクタを破壊してしまって充電不能に陥りました、オゥノゥ。地震で物が降ってきてですね。脆いものです。

追記:ASUSのサポートセンターに連絡し、交換してもらいました。非常に対応もよかったので、全然問題ないです。ネガティブな方向でURLがばら蒔かれてしまって想定外だったのですが、全然大丈夫ですよ、とは書いておきます。

ReactiveProperty ver.0.2.0.0

ver.0.2!ご意見ご感想は随時募集中で、コメントなりTwitterで私に@を投げてくれるなり、ただたんにTwitterでReactivePropertyと含めてつぶやいてくれるなり(検索経由で拾えるので)、ブログで記事を書いてくださるついでにクエスチョンしてみたりなどなど、ちょっとした疑問でも要望でも、何でもどうぞ。特に、細かな使用感の向上というのはリクエストがあってこそですので!斜め上からやってきた結果として世界最先端(但し逆向き)を体感出来るのは今だけです!斜め上なのでReactivePropertyのうまい使い方は今のところ誰にも分かりません、私もわかりません(えー)。というわけで、みんなで模索できたらいいな、と思います。

国内はもとよりReactiveUIの作者からも言及頂いて結構褒めてもらったりなどなど、RxのForumで宣伝したかいがあったね!というわけで、私自身かなり真剣に取り組んでますので、付き合って頂ければ幸いです。 /* 現在ReactiveOAuthをほっぽりだしてるという信頼感のなさがアレなので、そちらも早めに何とかします…… */

今回は、0.1では中途半端な存在だったReactiveCollectionを徹底的に考察して再デザインしました。他に細かい追加が幾つか。まずは小さな追加から。

追加したり変わったりしたもの

ObserverPropertyが、最初のSubscribe時に値をPushするようになりました(引数でfalseを指定するとオフにも出来る、そうすると、普通にFromEventしたのと同じ)

public class ToaranaiViewModel
{
    ToaruModel model;
    public ReactiveProperty<string> Name { get; private set; }
 
    public ToaranaiViewModel()
    {
        // こんなINotifyPropertyChangedなModelがあるとして
        model = new ToaruModel { Name = "Anders" };
 
        // 初期値として現在値(この場合"Anders")を持つ
        Name = model.ObserveProperty(x => x.Name).ToReactiveProperty();
    }
}
 
public class ToaruModel : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set { name = value; PropertyChanged(this, new PropertyChangedEventArgs("Name")); }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}

これにより、既存のModelからToObservablePropertyしてViewModelにする際などに、デフォルトで値が同期されるので多くのシチュエーションで、より便利になったと思います。という提案を@okazukiさんにリクエスト貰ったので実装しました:) @okazukiさんはReactivePropertyを使ってみた感想 イケテル!気持ちいい!ハードルは高い? - かずきのBlog@Hatenaという記事も書いてくれました、わーい。

ObserverProeprtyはINotifyPropertyChangedへの拡張メソッドです。また、今回よりINotifyPropertyChangingにObservePropertyChanging拡張メソッドを追加しました。ObserverProeprtyと同様な感覚で使えます。

それとReactiveCommand(無印)のExecuteが引数なしでnullをぶん投げるようになりました。なお、これがあるのは無印のほうのみで<T>のほうにはありません。だって、ジェネリックするということはパラメータが欲しい前提ですものね。ジェネリックのほうはExecute(T parmeter)を受け入れるうようにオーバーロードを隠蔽。こういう細かいところの使いやすさの向上ってのは随時取り組みたいところです。

また、ReactiveCommand(無印・ジェネリック共に)をDisposeすると、SubscribeしてたものにOnCompletedを投げるように変更しました。なお、ReactiveCommandをDisposeすると、CanExecuteもfalseになります。永久的にfalseにする、という意味合いで使えるかと思いますが、使うシチュエーションは分かりません。

ReactiveCollectionの再デザイン

ReactiveCollectionに大きめの変更を入れました。今まで通知をIScheduler上で行なっていましたが、これを廃止しました。かわりにToReactiveCollectionなどIObservableからの変換時は、Addと通知、両方をIScheduler上にしました。また、IScheduler上で各種操作(Add, Clear, Remove)を行うメソッド AddOnScheduler などを追加しました。この変更のデザイン上のポリシーは以下になります。

ObservableCollectionとスレッドセーフ・ディスパッチャーセーフというのは非常に難しい。まず、ObservableColectionは変更と通知がワンセットだと考えられる。コレクションが変更され通知を出し、通知され側(主にUI)がコレクションを読みに来る。これは全部ひとまとまりでなければならない。通知され側がコレクションを読みに行く際に、ズレがあってはならない。よって、通知をUIスレッドで行うなら、変更もUIスレッドで行われる必要がある。

しかし、全ての操作を内部で片っ端からDispatcherにBeginInvokeするアプローチを取ると、それはそれで都合が悪い。例えば別スレッドでAddしたりRemoveしたりClearしても、そのコード上では変更はすぐには反映されない。ClearしてもCountは変わらない。AddしてもCountは変わらない。そんな気味の悪いコレクションクラスは使えません。WPFではDispatcher.Invokeがあるので、変更と通知を強制的にUIスレッド上で行う、ということが可能でしたが、SilverlightにはBeginInvokeしかないので、操作をUIスレッドで行うことを保証するコレクションクラスの作成は不可能。(Caliburn MicroのBindableCollectionは全部UIスレッド上で行うようにしているみたいですね、まあBindableにのみ焦点を当てるなら現実的なので、それはそれでいいと思います)

だから、コレクションを触る時は利用側がDispatcher.BeginInvokeして、明示的にDispatcherの中へ入ろう。というのが、整合性が取れて一番良いのだと思います。今まで、ReactiveCollectionは通知だけIScheduler上で行うようになっていました。でも、これはあまり良いデザインではない、操作と通知は同一スレッド上で行うべきなのだから、これでは乖離する可能性がある。単純なAddだけのようなケースでは問題になることは少ないし、利便性としては、その方が簡単にバインドで出来て良いよね、ではあるのだけど、決して良いデザインではない。いずれ発覚する破綻への気づきを遅らせているという点で、むしろ限りなく悪い。

よって、内部で片っ端からDispatcherにBeginInvokeする代わりに、AddOnSchedulerなど、(ReactiveCollection生成時に指定した/デフォルトはUIDispatcher)スケジューラ上で操作を行うと利用側が明示するアプローチを取ってみました。Rxには使い勝手の良いISchedulerが存在する。だからこそアリなやり方かな、と思います。この辺はまだまだ考えどころだと思いますので、ご意見ありましたらお願いします。

<Grid>
    <ListBox ItemsSource="{Binding TimeItems}" />
</Grid>
public class ToaruViewModel
{
    public ReactiveCollection<string> TimeItems { get; private set; }
 
    public ToaruViewModel()
    {
        // 1秒毎に現在時刻表示が追加されるコレクション
        TimeItems = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .ToReactiveCollection();
 
        // 5秒間隔で上記コレクションをクリアする
        Observable.Interval(TimeSpan.FromSeconds(5))
            .Subscribe(_ => TimeItems.ClearOnScheduler());
    }
}

今回考えるにあたっては青柳 臣一 ブログ(技術系): [.NET] スレッドセーフな ObservableCollection<T> が欲しいをとっても参考にさせて頂きました。

ReactiveProperty, ReactiveCommandは確固たる意思のもとに作ったんですが、ReactiveCollectionは非常に中途半端でした。が、今回ようやく理念が立てれたのではかと思います。なお、ObservableCollection/ReactiveCollectionにはObserveAddChangedなど、変更通知をIObservableで受け取ることのできる拡張メソッドを足してあるので、そちらも便利に使うことが可能です(素のNotifyCollectionChangedEventArgsはIList(ジェネリックじゃない!)であったりして非常に触りにくいので、その辺をきっちり整理してあります)。

とか言ってますが、コードにしたらたかが十数行なのですよね。それを決めるのに、ここ一週間ずっと考えてました。つまり私の生産性は一日一行です(キリッ

まだ追加してないもの

Validation周りをValidationSummaryやDescriptionViewerに対応させる。とか、OnErrorRetryが値を返せるようにする。などは次に載せるつもりです。これらを加えてから、と思ったんですがReactiveCollectionの変更が大きいので、先に出したくて見送りました。

ReactiveProperty-Experimental

今回からExperimental版のRxにも対応しました。NuGetではReactiveProperty-Experimentalです。しかし、 .NET 4.5やWinRTへの対応は、まだしていません。せっかくRx(Experimental)がWinRT対応したので、それに合わせたいと思ったのですが断念。理由としてはVS11がCode Contractsに対応していないから、です。Code Contractsのバイナリリライトかけないと動かないので、どうにもなりません……。それがなければ今すぐにでも対応させたいのですけれどねえ。こういう時に機敏に動けないのはとても悲しいので、次回からはCode Contractsの採用は見送りたいと思ってしまいます……。

ところで、それを意識してではありますが、.NET 4.0版のDLL名をReactiveProperty.NET40.dllに変えました。複数プラットフォームに対応する場合、全てのDLLを同じ名前にする(JSON.NETなどはそうですね)か、全てのDLLにプラットフォームの識別子をつける(MVVMLightなどはそうです)か。前者のほうがスマートではあるのですが、分かりやすさを考え、後者を選びました。Stable版とExperimental版の区別もありますし、DLL名から判定出来たほうがいいかな、と。

今後

okazukiさんの記事にもあるように「MVVMライブラリにも精通しつつReactive Extensionsのことも知っててReactivePropertyの概要を把握してないといけない上に必要に応じてMVVMライブラリとReactivePropertyを繋ぐような機能を作りこまないといけない」きゃー、難しそう!でも事実だ!

私としてはReactivePropertyを通してReactive Extensionsを学習してもらえればいいかなあ、と思っています。Rxはイベントが合成出来る!というけれど、合成しようにもイベントのソースがないと始まらない。ReactivePropertyを使うと、手軽に合成のためのソースが手に入るので、イベント周りのRxでのこね方の学習に最適なのではかと思います。……多分。

既存MVVMライブラリとの使い分けなどに関しては、この類の「選択肢が増えます系」の永遠の課題ですねえ。結局、どう使い分けるかの判断をユーザーに丸投げしているわけですもの。ガイドなどを掲示できればベストなのですが、そもそも私がMVVMに全然詳しくないのであった。そもそも私自身がどう使えばいいのか分かってないぐらいなので(えー)、触ってみて、ついでに足りなかったり、これがこうなってたらいい、とかいう思いがあったら、私がそういうのに全然気づいてない確率100%なので、是非言ってやってください。

Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理

Reactive Extensionsv1.1.11011 (Experimental Release)がリリースされました。リリース対象はExperimental(実験)版のみです。Stable(安定)版のほうは変更ありません。別件で少しコメントで質問したところ、近いうちにStable版の更新もあるかも、とのことでしたので、Stableはそちらを待ちましょう。リリース内容の詳細な解説はフォーラムにあります

今回の大きな追加は.NET Framework 4.5 Developer PreviewWinRTへの対応です。というわけで、WinRT関連では、WinRT用のスケジューラであったりイベントであったりへの対応とまぁまぁ想像つく普通のもの。あと非同期処理を他言語と結びつけるIAsyncOperationへの書き出し、などなどもサポートされるようですね。

そして.NET 4.5周りでは、C#5.0のAsyncサポート・クラスライブラリがTask中心に書き換わることが念頭に置かれ、大規模に変更が入っています。今後のRxの方針がよく見えますので、Experimentalではありますが注意深く観察してみる必要がありそうです。というわけで、しっかり紹介します。なお、以下の話は.NET 4.5のRxの話なので、.NET 4.0以前の場合では直接は関係なく、Obsoleteにもなっていません。が、将来フレームワークのバージョン上げたらObsolete祭りでモニョるのは覚悟が必要かしらん。もう一つ注意としては、あくまでExperimentalなので、将来的にもこのままかどうかは保証されません。現時点での話です。

FromAsyncPatternがObsolete

はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。

毎回ToObservableなんて面倒くさい、という人のためにSelectManyに限っては、Taskを受け取るオーバーロードが用意されているので、ToObservableは不要です。内部では予想つく通り、ただ単にToObservableしているだけですね。その他の合成系メソッド(MergeやSwitch)などは残念ながらというか当然というか、ToObservableしてください。

IObservableはAwaitable

IObservable<T>も非同期を扱うものなので、awaitできます。正確に言えばGetAwaiterが定義された、といったところでしょうか。

var req = WebRequest.Create("http://google.com/");
var response = await Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)();

基本的にRxの非同期はFromAsyncPatternを初めとして長さが1のものを扱っていますが、複数の値が流れる場合はどうなるかというと、挙動は「最後の値」です。正確に言えばAsyncSubjectが利用されているので、OnCompletedの直前のOnNextの値。ではEmpty(OnCompletedのみ)やNever(何もなし)ではどうなるのか、というと……

// 10(最後の値)
var a = await Observable.Range(1, 10);
 
// InvalidOperationException(シーケンスに値がない)
var b = await Observable.Empty<int>();
 
// ある意味フリーズ、ここで永遠に止まる
var c = await Observable.Never<int>();

となります。これだけ見るとNeverって使い道がイミフですが、何かとMergeする必要があるときに、マージ対象がないときはNeverを渡すなどなど、ライブラリに近い部分では結構使う場所あります。私の書いているReactivePropertyというライブラリでもそうして利用しています。

FirstなどがObsolete、かわりにFirstAsyncなどとWaitが追加

え?という感じですがObsoleteです。対象はFirst,Last,Single、それとForEachも。FirstOrDefaultなど、XxxDefault系も同様です。つまり同期的にブロックするタイプのものが全てObsolete行きになりました。代わりにFirstAsyncなどXxxAsyncが用意されています。それとawaitを組み合わせてください。

var source = Observable.Return("async?");
 
var value0 = await source; // LastAsyncと挙動は同じ
var value1 = await source.FirstAsync();
var value2 = await source.LastAsync();

長さ1と分かっている状況なら、何もなくそのままawaitでも良いのではかしらん。LastAsyncがそれに相当しますね。さて、しかしawaitのお陰で同期的「のように」書けるには違いないけれど、FirstやLastなどはブロックして「同期」な挙動を取っていたわけなので、単純にObsolete行きにされたら困ってしまいます。そこで、同期的に待機して最後の値を取り出すWaitメソッドが用意されました(これは.NET 4.0やSilverlightなどでも使える、新たに追加されたメソッドです)。

var source = Observable.Return("async?");
 
var value0 = source.Wait();
var value1 = source.FirstAsync().Wait();
var value2 = source.LastAsync().Wait();

ちなみにWaitの中身はLastです。Lastという名前から離して、同期的に待機して値を取り出す、と明示させたのですね。それはいいと思います。Waitはいい。Waitはいいんですが、FirstをObsoleteにしてFirstAsyncを追加するのは、正直気にいりません。私は反対です。

ターゲットフレームワーク間でコードが共有出来なくなる、IQbservableプロバイダに影響が出る、そもそも標準クエリ演算子から離れるのはどうよ。などなど。だいたい、AllやAny、Maxなどは長さ1のIObservable<T>を返すようになっていました。Firstなどだけです、同期的に待機して値を取り出す、という別の意味が与えられていたのは。だから、ここはFirstはObsoleteにせず、FirstAsyncの挙動、つまりIObservable<T>を返すように変更すればいいのです。同期待ちについては、Waitが搭載されたので心配無用です。これで、全ての挙動に統一が取れる。

唯一問題点を挙げれば、本当に本当に「破壊的変更」になるんですよね。それも、Stableとか銘打ったものへの影響も出る。メソッド名同じで戻り値が変わる。そういう変更を許せるものか。私は、許してしまってもいいと思うのですけれど。Firstを廃止してFirstAsyncを追加、などという歪な形を将来に残すよりかは、ずっといい。

なので、Forumにもそうコメント入れましたところ返答貰えました。「Stableリリースが存在する以上、XxxAsyncのままでいるしかない。これが不幸なことは同意しますけれど、暫くはこのままでいるしかない。」とのことでした。というわけで、Stableと銘打つのが早まったな…… としか言いようがなく。あの段階でここまで読めなかったのはしょうがないところ、と思いつつ、やはり手痛いミスかなあ。うーん、将来に渡っての完璧なAPIを作り上げるというのは実に難しい。

まあ、Firstを多用する(といっても単体テストや動作確認時ぐらいですけど)のは、値を一つ取り出したい、ということなので、await sourceかsource.Wait() で済む。FirstAsyncやLastAsyncを直接使うことは恐らく少なくて、ならば実際上の問題というのはそこまでないかもしれません。

長さ1の非同期処理の戻り値はTaskを選ぶべきか、Rxを選ぶべきか

メソッドを作るときの非同期処理の戻り値。これは、Taskを選ぶべきです。それは.NET標準と合わせるべきという理由からもそうですし、この.NET 4.5向けのRxの指針からしてそうなっています。長さ1のIObservableで非同期を表現する、というのは特殊だったと言わざるを得ないので、メソッドを作るとき、非同期処理の戻り値はTaskにしたほうが間違いなく良いでしょう。

ただ、アプリケーショに全体でRxによる合成を中心に置く場合は、ToObservableが面倒くさい、というだけじゃなく逆にオーバーヘッドになる可能性もあるので、IObservable中心にしたほうが良いでしょう。この辺は一概には言えずケースバイケースでしょうか。どちらにせよToObservable<->ToTaskで相互変換が可能なわけなので、あまりガチガチに捉える必要もないですけれど。

あと、あくまで.NET 4.5の話でasync/awaitが入るからTaskのほうが良いと言ってるのであって、.NET 4.0以前ならまた違う話です。というかその場合だとRx一択です。

ねぇ、Rxってもう要らない子?

時代の徒花でしたね、短い命だった……。

って、ちょっと待ったー。それはYESでもあり、NOでもあります。YESなのは、単純な形での非同期処理ならば、遥かに楽になりますし、それに何よりもRxを通すよりもパフォーマンスは良いと思われるので、むしろasync/awaitを使うべきです。具体的には、SelectManyしてSubscribeするだけ、あとCatchで少し例外処理、みたいなコードなら、もう全面的にRxさようならでいいでしょう。

でも往々にしてそういう処理だけじゃないよね?以前にも紹介しましたがSwitch(新しい処理が入ったら以前の非同期処理はキャンセルして新しい処理のみを後続に流す)などを手書きせず演算子一つにパッケージ化できることや、全体的にTaskのメソッド群よりも合成や待ち合わせが容易に記述できる、などなど。そして、「複数の戻り値のある非同期処理、例えばStreamのBeginReadは細切れにbyte[]が得られますが、それを複数回分の非同期処理をまとめてシーケンスとして、IObservable<byte[]>としてまとめることは現状ではRxしかできません。

// 複数回のBeginReadをRx+Asyncで一つにまとめる例
static IObservable<byte[]> ReadMultipleAsync(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(async observer =>
    {
        try
        {
            while (true)
            {
                var buffer = new byte[bufferSize];
                var readCount = await stream.ReadAsync(buffer, 0, bufferSize);
 
                if (readCount == 0) break;
                if (readCount != bufferSize)
                {
                    var newBuffer = new byte[readCount];
                    Array.Copy(buffer, newBuffer, readCount);
                    buffer = newBuffer;
                }
 
                observer.OnNext(buffer); // yield returnのノリで書く
            }
            observer.OnCompleted(); // 完了合図と
        }
        catch (Exception ex)
        {
            observer.OnError(ex); // 例外は自前で明示的に
        }
    });
}

ExperimentalリリースではObservable.Createがasync/await対応しているので、擬似的なyield returnとして、非同期での列挙をそこそこ簡単に記述することができます。OnErrorとOnCompletedは自前で管理する必要がありますけれど。

なので、メソッド単体で分けた場合の戻り値は「長さ1」ならTask、複数ならIObservable。それらメソッドを使って非同期処理を組み上げる時は、単純ならawaitのみ、複雑ならRx。というのが使い分けの指針です。とはいえ、使い分けっていうのは幻想に近くて、実際はどっちか一つになりがちだとは思っています。そして、それならTask中心になるでしょうねえ、とも。非同期における大抵のシチュエーションでRxがサヨウナラ気味になるのはしかたのない話です。ぶっちゃけSelectManyしてSubscribeがほとんどだし、それ以外のことだって、同期的のように書けるのなら、いくらでもやりようはありますから。

まあ、未来を待たなくても今使える解としてなら十分ですし、それ自体がawait可能なので、今書いたコードは将来に渡っても無駄にはなりません。FromAsyncPatternがObsoleteというのは、まあ単純に.NET4.5本来のXxxAsyncに置き換えればいいというだけなので無駄になる、とは言わないでしょう。

けれど、それだけじゃあ、すごく後ろ向きで、寂しいよね。Rx自体の持つ力というのは、別に非同期に限らない。ただの幾つもある側面のうちの一つにすぎない。そこで出した私の答えがReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリです。ReactivePropertyはC#5.0によってプレゼンスが低下するReactive Extensionsに新たな価値をもたらしたいという危機感から作ったものだったりします(後付け、じゃなくてこれは本当の話です、次の一手を指すならRx自体に注目の集まっている今しかない、とも)

ReactivePropertyを通して見ると、非同期だけではない、Rxの持つポテンシャルがよく分かるのではないでしょうか?Rxの真の強みはイベント単独や非同期単独ではなくて、それらが、ただの配列も含めて、統一的に扱える、だから全て一本のストリームになって合成処理が自由自在。というOrchestrateな部分にある。ReactivePropertyはそれを全面的に押し出してます(半強制的に全てが一本に繋がるようになってる)

Rxは、この先も力強く存在し続けるので、学習する価値は間違いなくありますよ!

まとめ

相変わらずフリーダムな変更が続いているRxですが、あくまでExperimental版の話です。Stableでは、こんなバカバカと変わっていくことはないので、安心して使えばいいです。あと、Experimentalなのでまだまだ変動の余地はある、と思いますので、意見あればForumで直接言っておくとよさそう。私も、何やらかんやらと意見言っておきました(昔は書くのにビビッてたのに、今は平然と書いててアレです、慣れですね、ようするに)。

ところでRxは.NET 4.5に標準搭載されるのか否か、ですが、なんかまだまだ全然色々と変更や模索する気満々なようなので、この様子だと、仮にそういう話があったとしても間に合わなさそうという点で、標準搭載はなさそうですねー。ほぼ完成してるのに標準入りしない、とかだと悲しいんですが、そういう形で標準搭載見送り、ならばむしろ喜ばしい話なので、いいかなー、と思います。

それにしてもRx本は出すタイミング難しいですねえ。今回の変更は非同期周りの話がガラッと変わってきちゃうわけなので。また延期かしらねえ。さすがにそれはないか。ところで私もLINQ + Rx本をオライリーから出したいです(←ただたんに表紙をデンキウナギ(Rxのロゴはピンクのデンキウナギ)にしてウナギ本と呼ばれたいという一点だけの話なので間に受けないでください)

ReactivePropertyのデモをしました

Silverlightを囲む会in東京#4にて、先日公開したReactivePropertyについてお話しました。

本題のセッションの後の、お楽しみセッションということで、LT的に5分程度とか思っていたつもりなのですが、大幅に時間オーバーして17分も喋っていました。これは酷い。色々と寛容に見て頂き感謝です。さおさんありがとうー。IIJさんも本当にありがとうございます。時間オーバーを許してくれたというのと(笑)、それと、ネットワークが良好だったお陰でTwitterインクリメンタルサーチのデモが出来たので。毎度ながら凄まじい画質のSmooth Streamingといい、神会場すぎます。

いつまで残るか分かりませんが、会場で行ったセッションの録画です。Silverlight を囲む会 in 東京 #4 @ IIJ 神保町三井ビル。私のセッションは04:01:30 - 04:19:00です。ライブコーディングしているのは04:05:30-04:16:30ですね。

色々アレゲなのはいいとして、以前にスマベンで話をしたときにも反省事項だったのですがすっかり失念してた声の小ささはダメですねー。次は気をつけます。むしろ早口気味なのかと気にしてたんですが、録画を見るとそうでもないというか、このぐらいで調度良いぐらいですね。スライドはちゃっちゃと進めて欲しいし、本題のDemoは素早く進行して欲しいですから。ライブコーディングは好評だったようで何よりです。ちなみに、スムーズにプログラム書いていて凄い!と評価いただきましたが、やる内容が決まっているから書けたというだけで、例えばギターやピアノの演奏などと同じなわけで、普段は頭抱えながらゆったり書いてます。

ちなみに最後のTwitter検索のコードは若干アレだったので、修正したのをここに載せておきます。

<StackPanel>
    <TextBox Text="{Binding CurrentText.Value, UpdateSourceTrigger=PropertyChanged}" />
    <ListBox ItemsSource="{Binding SearchResults.Value}" />
</StackPanel>
public class MainWindowViewModel
{
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveProperty<string[]> SearchResults { get; private set; }
 
    public MainWindowViewModel()
    {
        CurrentText = new ReactiveProperty<string>();
 
        SearchResults = CurrentText
            .Select(word => new WebClient()
                .DownloadStringObservableAsync("http://search.twitter.com/search.atom?q=" + Uri.EscapeUriString(word)))
            .Switch()
            .Select(s =>
            {
                var xml = XElement.Parse(s);
                var ns = xml.Name.Namespace;
                return xml.Descendants(ns + "title").Select(x => x.Value).ToArray();
            })
            .OnErrorRetry((WebException e) => Debug.WriteLine(e))
            .ToReactiveProperty();
    }
}

SelectManyよりもSelect->Switchのほうがいいのと、OnErrorRetryの書く場所は、WebClientの真下だと永遠にリクエストをリピートしちゃうのでダメでしたね。

Prev | | Next

Search/Archive

Category

Profile


Yoshifumi Kawai
Microsoft MVP for Visual Studio and Development Technologies(C#)

April 2011
|
July 2019

Twitter:@neuecc
GitHub:neuecc
ils@neue.cc