Archive - Rx

RxとパフォーマンスとユニットテストとMoles再び

C# Advent Calendar 2011、順調に進んでいますね。どのエントリも力作で大変素晴らしいです。私はこないだModern C# Programming Style Guideというものを書きました。はてブ数は現段階で45、うーん、あまり振るわない……。私の力不足はともかくとしても、他の言語だったらもっと伸びてるだろうに、と思うと、日本のC#の現状はそんなものかなあ、はぁ、という感じではあります。はてブが全てではない(むしろ斜陽?)とはいえ、Twitterでの言及数などを見ても、やっぱまだまだまだまだまだまだ厳しいかなあ、といったところ。Unityなどもあって、見ている限りだと人口自体は着実に増えている感じではありますけれど、もっともっと、関心持ってくれる人が増えるといいな。私も微力ながら尽力したいところです。

ところで、id:ZOETROPEさんのAdvent Calendarの記事、Reactive Extensionsでセンサプログラミングが大変素晴らしい!センサー、というと私だとWindows Phone 7から引っ張ってくるぐらいしか浮かばないのですが(最近だとKinectもHotですか、私は全然触れてませんが……)おお、USB接続のレンジセンサ!完全に門外漢な私としては、そういうのもあるのか!といったぐらいなわけですが、こうしてコード見させていただくと、実践的に使うRxといった感じでとてもいいです。

記事中で扱われているトピックも幅広いわけですが、まず、パフォーマンスに関しては少し補足を。@okazukiさんの見せてもらおうじゃないかReactive Extensionsの性能とやらを! その2のコメント欄でもちょっと言及したのですが、この測り方の場合、Observable.Rangeに引っ張られているので、ベンチマークの値はちょっと不正確かな、と思います。

// 1000回イベントが発火(発火の度に長さ3000のbyte配列が得られる)を模写
static IObservable<byte[]> DummyEventsRaised()
{
    return Observable.Repeat(new byte[3000], 1000, Scheduler.Immediate);
}
 
// 配列をバラす処理にObservable.Rangeを用いた場合
static IObservable<byte> TestObservableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Observable.Range(0, xs.Length, Scheduler.Immediate).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// 配列をバラす処理にEnumerable.Rangeを用いた場合(ForEachはIxのもの)
static IObservable<byte> TestEnumerableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Enumerable.Range(0, xs.Length).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// SelectManyでバラす場合
static IObservable<byte> TestSelectMany()
{
    return DummyEventsRaised().SelectMany(xs => xs);
}
 
static void Main(string[] args)
{
    // ベンチマーク補助関数
    Action<Action, string> bench = (action, label) =>
    {
        var sw = Stopwatch.StartNew();
        action();
        Console.WriteLine("{0,-12}{1}", label, sw.Elapsed);
    };
 
    // 配列をばらすケースは再度連結する(ToList)
    bench(() => TestObservableRange().ToList().Subscribe(), "Ob.Range");
    bench(() => TestEnumerableRange().ToList().Subscribe(), "En.Range");
    bench(() => TestSelectMany().ToList().Subscribe(), "SelectMany");
    // 配列をばらして連結せず直接処理する場合
    bench(() => TestSelectMany().Subscribe(), "DirectRx");
    // byte[]をばらさず直接処理する場合
    bench(() => DummyEventsRaised().Subscribe(xs => { foreach (var x in xs);}), "DirectLoop");
 
    // 実行結果
    // Ob.Range    00:00:02.2619670
    // En.Range    00:00:00.2600460
    // SelectMany  00:00:00.2701137
    // DirectRx    00:00:00.0852836
    // DirectLoop  00:00:00.0152816
}

得られる配列をダイレクトに処理するとして、Observable.Rangeで配列のループを回すと論外なほど遅い。のですが、しかし、この場合ですとEnumerable.Rangeで十分なわけで、そうすれば速度は全然変わってきます(もっと言えば、ここではEnumerable.Rangeではなくforeachを使えば更に若干速くなります)。更に、これは配列を平坦化している処理とみなすことができるので、observerを直に触らず、SelectManyを使うこともできますね。そうすれば速度はほとんど変わらず、コードはよりすっきり仕上がります。

と、いうわけで、遅さの原因はObservable.Rangeです。Rangeが遅いということはRepeatやGenerateなども同様に遅いです。遅い理由は、値の一つ一つをISchedulerを通して流しているから。スケジューラ経由であることは大きな柔軟性をもたらしていますが、直にforeachするよりもずっとずっと遅くなる。なので、Enumerableで処理出来る局面ならば、Enumerableを使わなければなりません。これは、使うほうがいい、とかではなくて、圧倒的な速度差となるので、絶対に、Enumerableのほうを使いましょう。

また、一旦配列をバラして、再度連結というのは、無駄極まりなく、大きな速度差にも現れてきます。もし再度連結しないでそのまま利用(ベンチ結果:DirectRx)すれば直接ループを回す(ベンチ結果:DirectLoop)よりも5倍程度の遅さで済んでいます。このぐらいなら許容範囲と言えないでしょうか?とはいえ、それでも、遅さには違いないわけで、避けれるのならば避けたほうがよいでしょう。

ZOETROPEさんの記事にあるように、ここはばらさないほうが良い、というのが結論かなあ、と思います。正しくは上流ではばらさない。一旦バラしたものは復元不可能です。LINQで、パイプラインで処理を接続することが可能という性質を活かすのならば、なるべく後続で自由の効く形で流してあげたほうがいい。アプリケーション側でバラす必要があるなら、それこそSelectMany一発でばらせるのだから。

例えばWebRequestで配列状態のXMLを取ってくるとします。要素は20個あるとしましょう。最初の文字列状態だけを送られてもあまり意味はないので、XElement.Parseして、実際のクラスへのマッピングまではやります。例えばここではPersonにマッピングするとして、長さ1のIObservable<Person[]>です。しかし、それをSelectManyして長さ20のIObservable<Person>にはしないほうがいい。ここでバラしてしまうと長さという情報は消滅してしまうし、一回のリクエスト単位ではなくなるのも不都合が生じやすい。もしアプリケーション的にフラットになっていたほうが都合が良いのなら、それはまたそれで別のメソッドとして切り分けましょう。

成功と失敗の一本化

ZOETROPEさんの記事の素晴らしいのは、通常のルート(DataReceived)と失敗のルート(ErrorReceived)を混ぜあわせているところ!これもまたイベントの合成の一つの形なわけなんですねー。こういう事例はWebClientのDownloadStringAsyncのような、EAP(Eventbased Asynchronous Programming)をTaskCompletionSourceでラップしてTaskに変換する 方法: タスクに EAP パターンをラップする←なんかゴチャゴチャしていますが、TrySetCanceled, TrySetException, TrySetResultで結果を包んでいます、というのと似た話だと見なせます。

WebClientではEventArgsがCancelledやErrorといったステータスを持っているのでずっと単純ですが、SerialPortではエラーは別のイベントでやってくるのですね。というわけで、私もラップしてみました。

public static class SerialPortExtensions
{
    // 面倒くさいけれど単純なFromEventでのイベントのRx化
    public static IObservable<SerialDataReceivedEventArgs> DataReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.DataReceived += h, h => serialPort.DataReceived -= h);
    }
 
    public static IObservable<SerialErrorReceivedEventArgs> ErrorReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.ErrorReceived += h, h => serialPort.ErrorReceived -= h);
    }
 
    // DataReceived(プラスbyte[]化)とErrorReceivedを合成する
    public static IObservable<byte[]> ObserveReceiveBytes(this SerialPort serialPort)
    {
        var received = serialPort.DataReceivedAsObservable()
            .TakeWhile(e => e.EventType != SerialData.Eof) // これでOnCompletedを出す
            .Select(e =>
            {
                var buf = new byte[serialPort.BytesToRead];
                serialPort.Read(buf, 0, buf.Length);
                return buf;
            });
 
        var error = serialPort.ErrorReceivedAsObservable()
            .Take(1) // 届いたらすぐに例外だすので長さ1として扱う(どうせthrowするなら関係ないけど一応)
            .Do(x => { throw new Exception(x.EventType.ToString()); });
 
        return received.TakeUntil(error); // receivedが完了した時に同時にerrorをデタッチする必要があるのでMergeではダメ
    }
}

成功例と失敗例を合成して一本のストリーム化。また、DataReceivedはそのままじゃデータすっからかんなので、Selectでbyte[]に変換してあげています。これで、ObserveReceiveBytes拡張メソッドを呼び出すだけで、かなり扱いやすい形になっている、と言えるでしょう。パフォーマンスも、これなら全く問題ありません。

MolesとRx

と、ドヤ顔しながら書いていたのですが、とーぜんセンサーの実物なんて持ってませんので動作確認しようにもできないし。ま、まあ、そういう時はモックとか用意して、ってSerialDataReceivedEventArgsはパブリックなコンストラクタないし、ああもうどうすればー。と、そこで出てくるのがMoles - Isolation framework。以前にRx + MolesによるC#での次世代非同期モックテスト考察という記事で紹介したのですが、めちゃくちゃ強力なモックライブラリです。パブリックなコンストラクタがないとか関係なくダミーのインスタンスを生成可能だし、センサーのイベントだから作り出せないし、なんてこともなく自由にダミーのイベントを発行しまくれます。

[TestClass]
public class SerialPortExtensionsTest : ReactiveTest
{
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnCompleted()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        result.Messages.Is(
            OnNext(10, Unit.Default),
            OnNext(20, Unit.Default),
            OnNext(30, Unit.Default),
            OnNext(40, Unit.Default),
            OnCompleted<Unit>(50));
    }
 
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnError()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        /* ↑までOnCompletedのものと共通 */
 
        // 時間35でErrorのイベントを発行
        MSerialPortExtensions.ErrorReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
            OnNext<SerialErrorReceivedEventArgs>(35, new MSerialErrorReceivedEventArgs()));
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        // Exceptionの等値比較ができないので、バラしてAssertする
        result.Messages.Count.Is(4);
 
        result.Messages[0].Is(OnNext(10, Unit.Default));
        result.Messages[1].Is(OnNext(20, Unit.Default));
        result.Messages[2].Is(OnNext(30, Unit.Default));
 
        result.Messages[3].Value.Kind.Is(NotificationKind.OnError);
        result.Messages[3].Time.Is(35);
    }
}

アサーションに使っているIsメソッドは、いつも通りChaining Assertionです。

Molesがいくら強力だとは言っても、イベントをそのまま乗っ取るのはデリゲートの差し替えなどで、割と面倒だったりします。しかし、FromEventでラップしただけのIObservable<T>を用意しておくと…… それを差し替えるだけで済むので超簡単になります。イベント発行については、TestScheduler(Rx-Testingを参照しておく)で、仮想時間で発行する値を作ってしまうと楽です。こういう、任意の時間で任意の値、というダミーの用意もFromEventでラップしただけのIObservable<T>があると、非常に簡単になります。

あとは、scheduler.Startで走らせると(3つの引数はそれぞれcreated, subscribed, disposedの仮想時間、何も指定しないと…… 実は0始まり「ではない」ことに注意。100,200,1000がデフォなので、0はすっ飛ばされています)、その戻り値で結果を受け取って、Messagesに記録されているので、それにたいしてアサートメソッドをしかける。

実に簡単ですね!Molesの力とRxの力が組み合わさると、イベントのテストが恐ろしく簡単になります。素敵じゃないでしょうか?

まとめ

テストなしで書いてたコードは、Molesでテスト走らせたら間違ってました。TakeWhileの条件が==だったのと、Mergeで結合していたり……。はっはっは、ちゃんとユニットテストは書かないとダメですね!そして、Molesのお陰でちゃんと動作するコードが書けたので恥を欠かなくてすみました、やったね。

Reactive Extensionsとスレッドのlock

ぱられるぱられる。もしパラレルにイベントが飛んできたら、どうする?

public class TestParallel
{
    public event Action<int> Log = _ => { }; // nullチェック面倒ぃので
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            Log(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log += x => list.Add(x);
 
        // 実行
        tes.Raise();
    }
}

これは、十中八九、例外が出ます。list.Addはスレッドセーフじゃないので、まあそうだよね、と。では、Rxを使ってみるとどうなるでしょうか。

var list = new List<int>();
var tes = new TestParallel();
 
// イベント登録して
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Subscribe(list.Add);
 
// 実行
tes.Raise();

やはり変わりません。例外出ます。FromEventを中継しているだけですから……。さて、しかし一々Addの手前でlockするのは面倒だ、と、そこでSynchronizeメソッドが使えます。

Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Synchronize()
    .Subscribe(list.Add);
 
// ようするにこんな感じになってる
 
var gate = new Object();
//....
lock(gate)
{
    OnNext();
}

これで、list.Addを問題なく動作させられます。Listとか適度にデリケートなので適当に注意してあげましょう。

Subjectの場合

さて、上のはイベントでしたが、ではSubjectの場合はどうなるでしょう。

public class TestParallel
{
    Subject<int> logMessenger = new Subject<int>();
    public IObservable<int> Log { get { return logMessenger.AsObservable(); } }
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            logMessenger.OnNext(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log.Subscribe(list.Add);
 
        // 実行
        tes.Raise();
    }
}

たまーに例外起こらず処理できることもあるんですが、まあ大体は例外起こるんじゃないかと思います。初期のRxのSubjectは割とガチガチにlockされてたのですが、現在はパフォーマンスが優先されているため挙動が変更され、ゆるゆるです。回避策は同様にSynchronizeを足すことです。

tes.Log.Synchronize().Subscribe(list.Add);

これで問題なし。

余談

手元に残っていた大昔のRxを使って実行してみたら、死ぬほど遅かったり。確実に現在のものはパフォーマンス上がっていますねえ。あと、なんかもう最近面倒でeventだからってEventArgs使わなきゃならないなんて誰が言ったー、とActionばかり使うという手抜きをしてます。だってsenderいらないもん、大抵のばやい。

ReactiveProperty ver 0.3.0.0 - MとVMのバインディングという捉え方

今回の更新よりアイコンが付きました。専用のアイコンがあると、とっても本格的な感じがしますねー。色はRxにあわせて紫-赤紫。デザインは私の好みな幾何学的な感じです。@ocazucoさんに作って頂きました、ありがとうございます!色々ワガママ言ってお手数かけました。

ReactiveProperty - MVVM Extensions for Rx - ver 0.3.0.0

Rxとは何か、というとIObservable<T>と「見なせる」ものを合成するためのライブラリです。だから、見なせるものさえ見つかれば、活躍の幅は広がっていく。ReactivePropertyは色々なものを、そのように「見なして」いくことで、RxでOrchestrateできる幅をドラスティックに広げます。土台にさえ乗せてしまえば、あとはRxにお任せ。その場合に大切なのは、土台に乗せられるよう、閉じないことです。しかし、もし閉じているのなら、開くための鍵を提供します。

デフォルトモード変更

ReactivePropertyのデフォルトモードが DistinctUntilChanged|RaiseLatestValueOnSubscribe になりました。今まではRaise…が入ってなかったのですが、思うところあって変わりました。例えばCombineLatestは、全てが一度は発火していないと動き出しません。ReactiveCommandの条件に使うなどの場合にRaiseしてくれないと不都合極まりなく、かつ、Subscribeと同時にRaiseすることによる不都合なシーンは逆に少ない。ことを考えると、必然的にデフォルトをどちらに振るべきかは、分かりきった話でした。

そのことは0.1の時、サンプル作りながら思ってたんですが悩んだ末に、省いちゃったんですねえ。RaiseLatestValueOnSubscribeが入ると不便なシーンもある(initialValueを設定しないとまず最初にnullが飛んでいくとか)ので、どちらを取るかは悩ましいところではあるんですが、シチュエーションに応じて最適なほうを選んでください、としか言いようがないところです。

ToReactivePropertyAsSynchronized

長い。メソッド名が。

これは何かというとINotifyPropertyChanged->ReactiveProperty変換です。今までもObservePropertyメソッド経由で変換できましたが、それは一度IObservable<T>に変換するため、Model→ReactivePropertyという一方向のPushでしかありませんでした。Two-wayでのバインドで値の同期を取りたい場合は、今回から搭載されたToReactivePropertyAsSynchronizedを使ってください。

// こんな通知付きモデルがあるとして
public class ObservableObject : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set
        {
            name = value;
            PropertyChanged(this, new PropertyChangedEventArgs("Name"));
        }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}
 
// それを使ったViewModelを作るなら
public class TwoWayViewModel
{
    public ReactiveProperty<string> OneWay { get; private set; }
    public ReactiveProperty<string> TwoWay { get; private set; }
 
    public TwoWayViewModel()
    {
        var inpc = new ObservableObject { Name = "ヤマダ" };
 
        // ObservePropertyを使うとIObservable<T>に変換できます
        // ラムダ式でプロパティを指定するので、完全にタイプセーフです
        // それをToReactivePropertyすればOneWayで同期したReactivePropertyになります
        OneWay = inpc.ObserveProperty(x => x.Name).ToReactiveProperty();
 
        // ToReactivePropertyAsSynchronizedで双方向に同期することができます
        TwoWay = inpc.ToReactivePropertyAsSynchronized(x => x.Name);
    }
}

INotifyProeprtyChangedなModelをReactivePropertyなViewModelに持っていきたい時などに、使いやすいのではと思います。また、同期する型が異なっていても対応することができます。コンバーターのようにconvertとconvertBackを指定してください。

ReactiveProperty.FromObject

こちらもToReactivePropertyの亜種ですが、ReactiveProperty→Modelというソース方向への片方向の同期を取ります。ModelはINotifyPropertyChangedである必要はありません。

// こんなただのクラスがあったとして
public class PlainObject
{
    public string Name { get; set; }
}
 
// それと同期させたいとき
public class OneWayToSourceViewModel
{
    public ReactiveProperty<string> OneWayToSource { get; private set; }
 
    public OneWayToSourceViewModel()
    {
        var poco = new PlainObject { Name = "ヤマダ" };
 
        // ReactiveProperty.FromObjectで変換することができます
        // この場合、ReactiveProperty -> Objectの方向のみ値が流れます
        OneWayToSource = ReactiveProperty.FromObject(poco, x => x.Name);
    }
}

片方向の同期が定型的な局面、例えば設定クラスなんかは通知は必要ないと思うのですが、それをUIから一方向で値を投影したい場合に、これを使うことで楽になると思います。

また、Sampleにこれら3つの解説を追加しましたので、実際にどう反映されるのか、動きを確認したい場合はそちらを見てください。

CombineLatestValuesAreAllTrue

長い。メソッド名が。これはReactive Extensionsお題 - かずきのBlog@Hatenaに書かれているもので、使うシーンよくありそうな頻出パターンになりそうだと思ったので、お借りすることにしました。ありがとうございます。使い方を見てもらったほうが速いので、まず例を。

Microsoft Silverlight を入手

<StackPanel>
    <StackPanel Orientation="Horizontal">
        <CheckBox IsChecked="{Binding IsCheckedA.Value, Mode=TwoWay}">Check A</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedB.Value, Mode=TwoWay}">Check B</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedC.Value, Mode=TwoWay}">Check C</CheckBox>
    </StackPanel>
    <Button Command="{Binding ExecCommand}">全部チェックで押せる</Button>
</StackPanel>
// using Codeplex.Reactive.Extensions; (これを忘れないように)
 
public class MainPageViewModel
{
    public ReactiveProperty<bool> IsCheckedA { get; private set; }
    public ReactiveProperty<bool> IsCheckedB { get; private set; }
    public ReactiveProperty<bool> IsCheckedC { get; private set; }
    public ReactiveCommand ExecCommand { get; private set; }
 
    public MainPageViewModel()
    {
        IsCheckedA = new ReactiveProperty<bool>();
        IsCheckedB = new ReactiveProperty<bool>();
        IsCheckedC = new ReactiveProperty<bool>();
 
        ExecCommand = new[] { IsCheckedA, IsCheckedB, IsCheckedC }
            .CombineLatestValuesAreAllTrue()
            .ToReactiveCommand();
 
        ExecCommand.Subscribe(_ => MessageBox.Show("しんぷる!"));
    }
}

3つのチェックボックスが全てONなら実行可能なコマンドを作る、です。こんな風に、全てがtrueの時、といった集約をしたい場合に便利に使うことができます。プレゼンテーションロジック、に該当する部分だと思いますが、ここでもRxは十分以上に活躍できます。また、外部からCanExecuteChangedをぶっ叩くようなカオティックなこともしません、ReactiveCommandならね。

ReactiveTimer

Timerです。.NETはTimerが山のようにあります。Threading.Timer, Timers.Timer, Forms.Timer, DispatcherTimer, Observable.Timer。ここにまたReactiveTimerという新たなるTimerが誕生し、人類を混乱の淵に陥れようとしていた……。まさにカオス。

ちょっと整理しましょう。まず、Threading.Timerは一番ネイティブなTimerと捉えられます。そのままだと少しつかいづらいので、軽くラップしてイベントベースにしたのがTimers.Timer。Forms.TimerとDispatcherTimerは、それぞれのアプリケーション基盤で時間を計って伝達してくれるというもの、UI系でのInvokeが不要になるので便利。と、それなりに役割の違いはあります。微妙な差ですが。

最後のObservable.TimerはIObservableで通達してくれるのでRxと非常に相性が良いタイマー。また、タイマーを行う場所もISchedulerで任意に指定できるので、ThreadPoolでもDispatcherでもCurrentThread(この場合はSleepで止まるので固まりますけどね)でも、もしくは仮想スケジューラ(任意に時間を動かせるのでテストが簡単になる)でも良いという柔軟さが素敵で、Rx以降のプログラミングではタイマーなんてObseravble.Timer一択だろ常識的に考えて。という勢い。(精度は若干落ちるので、よほど精度を求める時はThreading.Timerを使いましょう)。だと思っていた時もありました。

一時停止出来ないんですよ、Observable.Timer。発動したらしっぱなし。Stopはできる(Disposeする)けど、そうしたら再開は出来ない。それじゃあ困る場合があります!はい。結構あります。そういう場合はTimers.TimerをFromEventでラップする。それはそれで良いのですが、Observable.TimerのISchedulerを指定可能という柔軟さを捨てるのは勿体無いなあ、と思ったのでした。

そこで、今回ReactiveTimerを作りました。機能は、Observable.TimerのStop/Start出来る版です。

[TestClass]
public class ReactiveTimerTest : ReactiveTest
{
    [TestMethod]
    public void TimerTest()
    {
        // テスト用の自由に時間を動かせるスケジューラ
        var testScheduler = new TestScheduler();
        var recorder = testScheduler.CreateObserver<long>();
 
        // 作成時点では動き出さない
        var timer = new ReactiveTimer(TimeSpan.FromSeconds(1), testScheduler);
        timer.Subscribe(recorder); // Subscribeしても動き出さない
 
        timer.Start(TimeSpan.FromSeconds(3)); // ここで開始。初期値を与えるとその時間後にスタート
 
        // 時間を絶対時間10秒のポイントまで進める(AdvanceTo)
        testScheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
 
        // MessagesにSubscribeに届いた時間と値が記録されているので、Assertする
        recorder.Messages.Is(
            OnNext(TimeSpan.FromSeconds(3).Ticks, 0L),
            OnNext(TimeSpan.FromSeconds(4).Ticks, 1L),
            OnNext(TimeSpan.FromSeconds(5).Ticks, 2L));
 
        timer.Stop(); // timerを止める
        recorder.Messages.Clear(); // 記録をクリア
 
        // 時間を現在時間から5秒だけ進める(AdvanceBy)
        testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
 
        // timerは止まっているので値は届いてないことが確認できる
        recorder.Messages.Count.Is(0);
    }
}

そう、単体テストしたい場合は、TestSchedulerに差し替えれば、AdvancedBy/Toによって、時間を自由に進めることが可能になります。Assertに使っているIs拡張メソッドはChaining Assertionです。Testing周りの詳しい解説はRx-Testingの使い方 - ZOETROPEの日記に書かれています。

CountNotifier/BooleanNotifier

SignalNotifierという名前はよく分からないので、今回よりCountNotifierに変更しました。また、名前空間をNotifiersに変更しました。更に、二値での通知を行うBooleanNotifierを新規追加しました。どちらも、IObservable経由での通知を行うフラグです。

// using Codeplex.Reactive.Notifiers;
 
// 通知可能(IObservable)なboolean flag
var boolFlag = new BooleanNotifier(initialValue: false);
boolFlag.Subscribe(b => Console.WriteLine(b));
 
boolFlag.TurnOn(); // trueにする, trueの状態だったら何もしない
boolFlag.Value = false; // .Valueで変更、既にfalseの状態でも通知する
boolFlag.SwitchValue(); // 値を反転させる
 
// 通知可能(IObservable)なcount flag
var countFlag = new CountNotifier();
countFlag.Subscribe(x => Console.WriteLine(x));
 
countFlag.Increment(); // incしたり
countFlag.Decrement(); // decしたりの状態が通知される
 
// Empty(0になった状態)という判定でフィルタして状態監視したりできる
countFlag.Where(x => x == CountChangedStatus.Empty);

例えば非同期処理を行う際などの、状態の管理に使うことができます。

Pairwise

neue cc - Reactive Extensionsで前後の値を利用するで書いた、前後の値をまとめる拡張メソッドです。

// { Old = 1, New = 2 }
// { Old = 2, New = 3 }
// { Old = 3, New = 4 }
// { Old = 4, New = 5 }
Observable.Range(1, 5)
    .Pairwise()
    .Subscribe(Console.WriteLine);

古い値と新しい値を使って何かしたい場合などにどうぞ。

CatchIgnore

例外処理用に、OnErrorRetryというものを用意していましたが、今回それ以外にCatchIgnoreを追加しました。

// 1, 2
Observable.Range(1, 5)
    .Do(x => { if (x == 3) throw new Exception(); })
    .CatchIgnore()
    .Subscribe(Console.WriteLine);

ようするに、CatchしてEmptyを返す手間を省くためのものです。onErrorにe => {}と書くのと似てますが、シーケンスの途中で捕まえれるので、メソッドチェーンの繋ぎ方によっては全然異なる役割を持つ可能性があります。

その他の削除やバグ修正や見送ったものなど

RxのExperimental版が更新されてたので、それに合わせました。Rxの更新内容はZipとCombineLatestに大量のオーバーロード+配列を受け入れるようになったので、何でも結合できるようになりました。それにともないReactivePropertyでは独自拡張としてCombineLatestのオーバーロードを用意していたのですが、Experimental版のみ削除しました。パフォーマンスもExperimentalのもののほうがずっと良いので、早くStableにも降りてきて欲しいです。

WebRequestのUploadValuesで、値が&で連結されていないという致命的なバグがあったので修正しました。本当にすみません……。また、Silverlightでデザイン画面がプレビューできなくなる不具合を修正しました。デザインモード怖い。

バリデーション周りは、ちょっと大きめに(といっても内部だけの話で外部的には変わらない予定)変更入れようと思ってたのですが、それは次回で。あと、同期系メソッドもバリデーションの成否によって同期するかしないかを決定しようかなあ、とか思うんですが、ちょっと大変なので後になりそう。

まとめ

今回はデータリンクを主眼に置きました。デフォルトモードの変更もその一環です。直接的に意味を見るのなら、厚めのMをスマートにVMとシンクロナイズさせる、ということになります。冒頭の台詞、閉じた世界を開けるための道具です。ObserveProperty(OneWay)、ToReactivePropertyAsSynchronized(TwoWay)、ReactiveProperty.FromObject(OneWayToSource)。

OneWayとかTwoWayとかOneWayToSourceというとおり、VMとMの間のバインディングエンジンだと見ることができます。VとVMの間をWPFなりのフレームワークが担い吸収するように、ReactivePropertyはVMとMの間を吸収します。手書きでバインディングだと、ボイラープレートでは手間だし見通しも悪くなる。このほうが、ずっと、楽だし自然に書けます。

ReactivePropertyはV-VM間の接続も担うため、結果として全てがV-VM-M-VM-Vとして一つに繋がる。何をどう組もうと自然に一つに繋がっていく。わくわくしませんか?むしろカオスの予感がする?けれど、カオスの先に本当の光がある、……かもしれない。

ちなみに同期系のものはみんなプロパティ指定だけでGetとかSetとか自動でやっていますが、動的コード生成(&キャッシュ)によりハイパー高速化されているので、パフォーマンス上の問題はありません。そこは安心してください。というと何か凄そうなことやってる気がしますが、勿論そんなことはなくて、偉大なるExpressionTreeに全面的にお任せしているだけだったり。

Reactive Extensionsで前後の値を利用する

@Toya256tweetさんの作成されたDependency Variable Libを見て、ReactivePropertyでも大体再現できるかなあ、でもOldValueとNewValueのところが少し面倒なのよね、というところで一例。ReactivePropertyの値の変更時に、古い値と新しい値を同時に得られるようにします。

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Zip(p.Skip(1), (Old, New) => new { Old, New })
    .Subscribe(a => Console.WriteLine(a.Old + " -> " + a.New));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

挙動は@okazukiさんの解説されている通りです。残念ながら、頭やわらかい、というわけではなくて頻出パターンのイディオムなだけなので、ただたんに覚えているから、というだけです、がくり。まあ、LINQにせよRxにせよ、メソッドの組み合わせで成り立っているということは、パターン化しやすいということなのですね。イディオムを知っていればいるほど、更にそのイディオムを組み合わせて、と、手法は無限に広がっていきます。

私は非同期をvoidにしてモデル作り込むっての好きくないです。IObservableなりTaskなりを返してくれれば、先があるのですが、そうでないとやりようがないですから。例えばデータモデル考え中 - 急がば回れ、選ぶなら近道で示される「2」のパターンが、Silverlightなどでの従来のやり方だったと思われます。実行のトリガーだけを外から渡して、モデルの中で結果は閉じる。変更はINotifyPropertyChanged経由で通知。正直言って、私はこのやり方はナシだと思っています。スパゲティになりがちだから。Rxは「3」のパターンに近いと思います。順序の制御は、まさにミドルウェア足るReactive Extensionsが保証する。柔軟性は見ての通りで、無限の広がりがあります。

今まではコールバックしかなかったので必然的に2に収まらざるを得なかったですが、今はRxもあるし、C#5.0からはawaitもあるし、なので、モデルの作り方も「変わっていく」と思います。Viewの機能の強さによってViewModelのありようが変わるように、言語やフレームワークの機能の強さによってModelのありようが変わるのは当然でしょう。

ScanとPairwise

さて、自分自身と結合というのは、結局のところ二つ購読しているということなので、これはIObservableがHotでないと成り立ちません(ReactivePropertyはHotです)。というわけで、ColdなIObservableでも対応したい時はScanを使うといいでしょう。HotとかColdとか何言ってるのか分からないという場合はReactive Extensions再入門 その5「HotとCold」 - かずきのBlog@Hatenaを読むといいでしょう。最近、自分で解説してるのを放棄しだしてる気がするよくない傾向、ではなくて、次回のReactive Extensions(Rx)入門 - @ITではまさにObservable.TimerとフツーのTimerを使ってColdとHotの解説しようと思ってたのですよ!ネタ被った、けれど気にせず書きます:)

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
var oldNewPair = p.Scan(Tuple.Create(0, 0), (t, x) => Tuple.Create(t.Item2, x)).Skip(1);
 
oldNewPair.Subscribe(Console.WriteLine);
 
p.Value = 10; // (1, 10)
p.Value = 100; // (10, 100)

Scanは自分自身の前の値を参照できるので、色々と応用が効きます。値の入れ物のための初期値は不要なのでSkip(1)で除去してやるのがポイント。

もう一つ、メソッドの組み合わせでのパターン化、というのは、つまりパーツ化しやすいということでもあります。拡張メソッドに分離してやりましょう。

public static class ObservablePairwiseExtensions
{
    // OldNewPair<T>はReactivePropertyに入っています
    // using Codeplex.Reactive.Extensions;
 
    public static IObservable<OldNewPair<T>> Pairwise<T>(this IObservable<T> source)
    {
        return source.Scan(
                new OldNewPair<T>(default(T), default(T)),
                (pair, newValue) => new OldNewPair<T>(pair.NewItem, newValue))
            .Skip(1);
    }
 
    public static IObservable<TR> Pairwise<T, TR>(this IObservable<T> source, Func<T, T, TR> selector)
    {
        return source.Pairwise().Select(x => selector(x.OldItem, x.NewItem));
    }
}
 
// ↑というような拡張メソッドを作ってやったとして
var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Pairwise().Subscribe(x => Console.WriteLine(x.OldItem + " -> " + x.NewItem));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

OldNewPairを使ったのは、TupleがSL/WP7にないから、というのと、OldItemとNewItemというプロパティ名に意味があって、分かりやすいから、です。基本的にC#でTupleを使うことはあんまないですね。LINQのパイプライン内でならば匿名型、それを超えるなら面倒くさくてもクラスを立ててあげたほうがいいと、私は思っています。勿論、今後Tupleのための構文やパターンマッチが入るとしたら別ですけど。というか、つまるところ専用構文がない状態ではTupleを使うメリットはそんなにないのです。匿名型かわいいよ匿名型。言語比較の際に、C#はTupleがこんな腐ってるぜー、とかやられるのはちょっと勘弁願いたいところ(まぁでも普通に敵いませんのは認めます、けれど言語・IDE・フレームワークは三位一体だとも思っています。引き離して単独で評価することには、あまり価値を感じません。IDEでうまく機能することを優先した言語、それを前提にしたフレームワーク。どの要素も引き離せませんから。はいはい、C#がお好きなんですね、という感じですが、でも例えばHTML/ブラウザというGUIフレームワークの上だったらJavaScriptがベストだ、といった捉え方でもありますね)

それはともかくとして、Pairwiseは多用しそうなので、次のReactiveProperty(ver.0.3)で入れたいと思います(あとOldNewPairのToStringのオーバーライド)。ちなみにlinq.js - LINQ for JavaScriptにはPairwise、入ってます。そう、Rxでの頻出パターンということは、それはIx(Enumerable)にも存在するパターンなのです。この辺がRxの面白いところです!私にとって、こういった書き方の初出は前後の値も利用したシーケンス処理 - NyaRuRuの日記でした。

ObserveChanged

突然出てきたOldNewPairですが、これが既にReactiveProperty内で定義されているのは、ObservableCollectionの拡張メソッド群で使用しているからです。今まで紹介していなかったと思うので、ここで紹介しましょう。

// using Codeplex.Reactive.Extensionsとすると
// ObservableCollection<T>に(ReactiveColelctionとか継承したものでも可)
// ObserveXxxChangedという拡張メソッドが利用できる
var collection = new ObservableCollection<int>();
 
// 追加されたのを監視できる、IObservable<T>
collection.ObserveAddChanged()
    .Subscribe(x => Console.WriteLine("Add:" + x));
// 削除されたのを監視できる、IObservable<T>
collection.ObserveRemoveChanged()
    .Subscribe(x => Console.WriteLine("Remove:" + x));
// 置換を監視できる、IObservable<OldNewPair<T>>
collection.ObserveReplaceChanged()
    .Subscribe(p => Console.WriteLine(p.OldItem + "→" + p.NewItem));
// リセットを監視できる、IObservable<Unit>
collection.ObserveResetChanged()
    .Subscribe(_ => Console.WriteLine("Clear"));
 
collection.Add(100); // Add:100
collection.Add(1000); // Add:1000
collection[1] = 300; // 1000→300
collection.Remove(100); // Remove:100
collection.Clear(); // Clear

この手の監視では、通常CollectionChangedイベント経由でNotifyCollectionChangedEventArgsを使って値を取り出すわけですが、型がobject[]なので一々キャストしたりなど、非常に使いにくいと思っていました。ObserveXxxChangedを使えば、完全にタイプセーフで、値も取り出しやすい形に整形してくれています。是非是非どうぞ。

まとめ

@Toya256tweetさんにも示唆頂いたのですが、ReactivePropertyはMVVMに限定されない、汎用的なものだと考えています。値の導出ルールを宣言的に書く、というのは色々なところで使える、気がします。でもやはり、Functional Reactive Programmingが全然流行ってないことを考えても、ルールによって自動的に変動する値って、基本的にGUI向けなのだろうなあ、って。そして、GUIで強いのはやっぱJavaとか.NETといったFRP不毛地帯なので、流行るなんて考えられないことでした。しかし、今は違う。C#にはRxが来た。C#で実現できるのならば、強力なGUIプラットフォームが目の前にあるわけなので、かなり可能性はあるんじゃないかな!と思いたいところです。

d.y.d. - ReaJ / Reactive JavaScriptの例は

// RaiseLatestValueOnSubscribeはv0.3ではデフォルトに変更する予定
var mode = ReactivePropertyMode.RaiseLatestValueOnSubscribe;
 
var x = new ReactiveProperty<int>(10, mode);
var y = x.Select(n => n + 100).ToReactiveProperty(mode: mode);
x.Value = 20;
x.Value = 30;
Console.WriteLine(y.Value); // 130

まあ、不格好です。ReactiveProperty用の専用構文でも用意してくれないとね、rp x = 10; rp y = x + 100; とかで上記の形に整形されたら素敵なのですが。というのはともかくとして、一応、実現できています。GUI環境への反映はWPFのバインディング機構に投げて解決ですし。JavaScriptにおいても、ReactivePropertyを移植して、ベースとしてKnockout.js辺りを採用すればいい感じに実用的になりそうです。その辺は追々やっていきたいところ。

勿論、Rx自体の可能性はGUI(や非同期)だけに閉じているわけではないので、全く別なところでの可能性、使い道というのも追い求めていきたいです。

ともあれともかく、ReactiveProperty、試してみてくださいな。

Rx連載開始とRx本感想とZenbook買ったという話

まーたブログを放置気味な昨今は大変よろしくなく、だらだらTwitterを眺めているだけで一日が終わる症にかかっています。さて、そんなわけですが、@ITにてRx入門の連載を開始しました。

導入なので細かいことは言わず、なんか凄そう!と思ってもらえればいいなー、という構成にしました。用語もそんな並べず、でも、ところどころ引っかかるワードがある、言い方を悪くするとハッタリ気味に、印象に残ってくれればいいなあ、と。初回とはいえ、導入だけであっさり終わってしまったのはちょっと反省。どんな形になるのかイマイチ掴めなくて、もう少し書けばよかったな、と思ってます。あと、図をもう少し入れるべきだったな、と……。そんなこんなな反省を生かし、次回はボリューム増でお送りします。

@okazukiさんがReactive Extensiions再入門を始めたり、@zoetroさんがRx-Testingについて詳しい記事を書かれていたり(これは素晴らしい!)、Rxも盛り上がってきた感じがしますね!それは気のせいです。というだけで終わらせないよう、ガンガン行きましょうー。

Rx本

(やっと)発売されました。まず印象ですが、薄いです。私は電子書籍で買っちゃってるのでリアルな厚さは分からないんですが、180ページです。それでこの値段かよ、という不満を最初は持ってしまうかもしれません。あと、LINQ and Rxです。どういうことかというと中身の半分はフツーのLINQの話です。それを差っ引くとRxは90ページしかありません。更にRxJSやReactiveUIの話もあります。そこを差っ引くと50ページぐらいしかないじゃないかゴルァ。というわけで、Rx本として考えると、分量には不満が残ると思われます。全てのメソッドをカバーする、という内容でもないのでリファレンスとしても使えません。

とはいえ、要素要素は満遍なくカバーできているのと、現在唯一のRx本ではあるので、本で学びたいなあ、と思うならこれしか選択肢はありません。WindowやJoin、Testingなんかは(私の怠慢により)このブログでは少しも紹介していないので、それらを知りたい方や、 Web上から断片的な情報を拾って組み上げるのは手間なので、購入するのは十分アリだとは思います。まあ、私の@IT連載が完了したら、第一の選択肢はそれを見ること、になります(キリッ。となれるように、頑張ります。

ASUS Zenbook UX31

どうでもいいんですが、UX31を買いました。Intelの推奨するUltrabookの第一弾の中では大本命の一品です。さて、Ultrabookとは何か、というと、ようするところWindows版Macbook Airです。薄く軽く速い。宗教上の理由で林檎はお断りだ!な人にとっては救いの手なわけです。内心羨ましいとか思ってたりしたんだからね!しかしですよ、Win系の勉強会ではそうでもないですが、それ以外の勉強会でのMac率の高さといったら!会場の9割がMacだよ、とかドヤ顔でツイートされた日には!多様性は善、はどこに行ったんだよという話です。

側面は、実用的な意味ではフルフラットのほうが良いのでしょうし、特に日本メーカーはそこに拘っている印象があるのですが、審美的にはこうした処理をしたほうがいいですね、視覚上のトリックとはいえ、圧倒的に薄く見えるので。ちなみに側面から見ると本当にMacbook Airソックリでパク……と口から出てしまうのも已むを得ないかな、とは思いますが、それ以外の部分はそんなに似てるわけでもないですよ。

ZenbookはSSDがSATA 3.0(6Gbps)ということもあって、滅茶苦茶速いですね。今後はこの速度がスタンダードになっていくのかと思うと、いい時代です、ほんと。

その他の印象ですが、キーボードはまぁまぁ、タッチパッドはサイテー。タッチパッドはキー入力中の誤動作率の高さ(位置とか大きさが悪いのだろうなあ)も酷くてストレスフル。基本はマウス使いますけれど、いつもマウス持ち歩くのもねえ。ああ、あと、UX31はUltrabookの中で唯一解像度が1600×900と高い(他は1366×768)のがポイントです。フルHDじゃないのかよ!とVAIO Zのオーダーメイドな人が言ってくるかもしれませんが、まぁVAIO Zは店頭モデルはともかくフルHDでオーダーすると高いですからね。こっちは10万円なのでコストパフォーマンスが違うわけです、はい。あと、13インチで1600×900は程良いですよ。フルHDだとちょっと文字が細かすぎになる感も。

総じて満足度は高くお薦めなので、是非買ってください(上のリンク先から!)

悲しいことに私はいきなりACアダプタのコネクタを破壊してしまって充電不能に陥りました、オゥノゥ。地震で物が降ってきてですね。脆いものです。

追記:ASUSのサポートセンターに連絡し、交換してもらいました。非常に対応もよかったので、全然問題ないです。ネガティブな方向でURLがばら蒔かれてしまって想定外だったのですが、全然大丈夫ですよ、とは書いておきます。

ReactiveProperty ver.0.2.0.0

ver.0.2!ご意見ご感想は随時募集中で、コメントなりTwitterで私に@を投げてくれるなり、ただたんにTwitterでReactivePropertyと含めてつぶやいてくれるなり(検索経由で拾えるので)、ブログで記事を書いてくださるついでにクエスチョンしてみたりなどなど、ちょっとした疑問でも要望でも、何でもどうぞ。特に、細かな使用感の向上というのはリクエストがあってこそですので!斜め上からやってきた結果として世界最先端(但し逆向き)を体感出来るのは今だけです!斜め上なのでReactivePropertyのうまい使い方は今のところ誰にも分かりません、私もわかりません(えー)。というわけで、みんなで模索できたらいいな、と思います。

国内はもとよりReactiveUIの作者からも言及頂いて結構褒めてもらったりなどなど、RxのForumで宣伝したかいがあったね!というわけで、私自身かなり真剣に取り組んでますので、付き合って頂ければ幸いです。 /* 現在ReactiveOAuthをほっぽりだしてるという信頼感のなさがアレなので、そちらも早めに何とかします…… */

今回は、0.1では中途半端な存在だったReactiveCollectionを徹底的に考察して再デザインしました。他に細かい追加が幾つか。まずは小さな追加から。

追加したり変わったりしたもの

ObserverPropertyが、最初のSubscribe時に値をPushするようになりました(引数でfalseを指定するとオフにも出来る、そうすると、普通にFromEventしたのと同じ)

public class ToaranaiViewModel
{
    ToaruModel model;
    public ReactiveProperty<string> Name { get; private set; }
 
    public ToaranaiViewModel()
    {
        // こんなINotifyPropertyChangedなModelがあるとして
        model = new ToaruModel { Name = "Anders" };
 
        // 初期値として現在値(この場合"Anders")を持つ
        Name = model.ObserveProperty(x => x.Name).ToReactiveProperty();
    }
}
 
public class ToaruModel : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set { name = value; PropertyChanged(this, new PropertyChangedEventArgs("Name")); }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}

これにより、既存のModelからToObservablePropertyしてViewModelにする際などに、デフォルトで値が同期されるので多くのシチュエーションで、より便利になったと思います。という提案を@okazukiさんにリクエスト貰ったので実装しました:) @okazukiさんはReactivePropertyを使ってみた感想 イケテル!気持ちいい!ハードルは高い? - かずきのBlog@Hatenaという記事も書いてくれました、わーい。

ObserverProeprtyはINotifyPropertyChangedへの拡張メソッドです。また、今回よりINotifyPropertyChangingにObservePropertyChanging拡張メソッドを追加しました。ObserverProeprtyと同様な感覚で使えます。

それとReactiveCommand(無印)のExecuteが引数なしでnullをぶん投げるようになりました。なお、これがあるのは無印のほうのみで<T>のほうにはありません。だって、ジェネリックするということはパラメータが欲しい前提ですものね。ジェネリックのほうはExecute(T parmeter)を受け入れるうようにオーバーロードを隠蔽。こういう細かいところの使いやすさの向上ってのは随時取り組みたいところです。

また、ReactiveCommand(無印・ジェネリック共に)をDisposeすると、SubscribeしてたものにOnCompletedを投げるように変更しました。なお、ReactiveCommandをDisposeすると、CanExecuteもfalseになります。永久的にfalseにする、という意味合いで使えるかと思いますが、使うシチュエーションは分かりません。

ReactiveCollectionの再デザイン

ReactiveCollectionに大きめの変更を入れました。今まで通知をIScheduler上で行なっていましたが、これを廃止しました。かわりにToReactiveCollectionなどIObservableからの変換時は、Addと通知、両方をIScheduler上にしました。また、IScheduler上で各種操作(Add, Clear, Remove)を行うメソッド AddOnScheduler などを追加しました。この変更のデザイン上のポリシーは以下になります。

ObservableCollectionとスレッドセーフ・ディスパッチャーセーフというのは非常に難しい。まず、ObservableColectionは変更と通知がワンセットだと考えられる。コレクションが変更され通知を出し、通知され側(主にUI)がコレクションを読みに来る。これは全部ひとまとまりでなければならない。通知され側がコレクションを読みに行く際に、ズレがあってはならない。よって、通知をUIスレッドで行うなら、変更もUIスレッドで行われる必要がある。

しかし、全ての操作を内部で片っ端からDispatcherにBeginInvokeするアプローチを取ると、それはそれで都合が悪い。例えば別スレッドでAddしたりRemoveしたりClearしても、そのコード上では変更はすぐには反映されない。ClearしてもCountは変わらない。AddしてもCountは変わらない。そんな気味の悪いコレクションクラスは使えません。WPFではDispatcher.Invokeがあるので、変更と通知を強制的にUIスレッド上で行う、ということが可能でしたが、SilverlightにはBeginInvokeしかないので、操作をUIスレッドで行うことを保証するコレクションクラスの作成は不可能。(Caliburn MicroのBindableCollectionは全部UIスレッド上で行うようにしているみたいですね、まあBindableにのみ焦点を当てるなら現実的なので、それはそれでいいと思います)

だから、コレクションを触る時は利用側がDispatcher.BeginInvokeして、明示的にDispatcherの中へ入ろう。というのが、整合性が取れて一番良いのだと思います。今まで、ReactiveCollectionは通知だけIScheduler上で行うようになっていました。でも、これはあまり良いデザインではない、操作と通知は同一スレッド上で行うべきなのだから、これでは乖離する可能性がある。単純なAddだけのようなケースでは問題になることは少ないし、利便性としては、その方が簡単にバインドで出来て良いよね、ではあるのだけど、決して良いデザインではない。いずれ発覚する破綻への気づきを遅らせているという点で、むしろ限りなく悪い。

よって、内部で片っ端からDispatcherにBeginInvokeする代わりに、AddOnSchedulerなど、(ReactiveCollection生成時に指定した/デフォルトはUIDispatcher)スケジューラ上で操作を行うと利用側が明示するアプローチを取ってみました。Rxには使い勝手の良いISchedulerが存在する。だからこそアリなやり方かな、と思います。この辺はまだまだ考えどころだと思いますので、ご意見ありましたらお願いします。

<Grid>
    <ListBox ItemsSource="{Binding TimeItems}" />
</Grid>
public class ToaruViewModel
{
    public ReactiveCollection<string> TimeItems { get; private set; }
 
    public ToaruViewModel()
    {
        // 1秒毎に現在時刻表示が追加されるコレクション
        TimeItems = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .ToReactiveCollection();
 
        // 5秒間隔で上記コレクションをクリアする
        Observable.Interval(TimeSpan.FromSeconds(5))
            .Subscribe(_ => TimeItems.ClearOnScheduler());
    }
}

今回考えるにあたっては青柳 臣一 ブログ(技術系): [.NET] スレッドセーフな ObservableCollection<T> が欲しいをとっても参考にさせて頂きました。

ReactiveProperty, ReactiveCommandは確固たる意思のもとに作ったんですが、ReactiveCollectionは非常に中途半端でした。が、今回ようやく理念が立てれたのではかと思います。なお、ObservableCollection/ReactiveCollectionにはObserveAddChangedなど、変更通知をIObservableで受け取ることのできる拡張メソッドを足してあるので、そちらも便利に使うことが可能です(素のNotifyCollectionChangedEventArgsはIList(ジェネリックじゃない!)であったりして非常に触りにくいので、その辺をきっちり整理してあります)。

とか言ってますが、コードにしたらたかが十数行なのですよね。それを決めるのに、ここ一週間ずっと考えてました。つまり私の生産性は一日一行です(キリッ

まだ追加してないもの

Validation周りをValidationSummaryやDescriptionViewerに対応させる。とか、OnErrorRetryが値を返せるようにする。などは次に載せるつもりです。これらを加えてから、と思ったんですがReactiveCollectionの変更が大きいので、先に出したくて見送りました。

ReactiveProperty-Experimental

今回からExperimental版のRxにも対応しました。NuGetではReactiveProperty-Experimentalです。しかし、 .NET 4.5やWinRTへの対応は、まだしていません。せっかくRx(Experimental)がWinRT対応したので、それに合わせたいと思ったのですが断念。理由としてはVS11がCode Contractsに対応していないから、です。Code Contractsのバイナリリライトかけないと動かないので、どうにもなりません……。それがなければ今すぐにでも対応させたいのですけれどねえ。こういう時に機敏に動けないのはとても悲しいので、次回からはCode Contractsの採用は見送りたいと思ってしまいます……。

ところで、それを意識してではありますが、.NET 4.0版のDLL名をReactiveProperty.NET40.dllに変えました。複数プラットフォームに対応する場合、全てのDLLを同じ名前にする(JSON.NETなどはそうですね)か、全てのDLLにプラットフォームの識別子をつける(MVVMLightなどはそうです)か。前者のほうがスマートではあるのですが、分かりやすさを考え、後者を選びました。Stable版とExperimental版の区別もありますし、DLL名から判定出来たほうがいいかな、と。

今後

okazukiさんの記事にもあるように「MVVMライブラリにも精通しつつReactive Extensionsのことも知っててReactivePropertyの概要を把握してないといけない上に必要に応じてMVVMライブラリとReactivePropertyを繋ぐような機能を作りこまないといけない」きゃー、難しそう!でも事実だ!

私としてはReactivePropertyを通してReactive Extensionsを学習してもらえればいいかなあ、と思っています。Rxはイベントが合成出来る!というけれど、合成しようにもイベントのソースがないと始まらない。ReactivePropertyを使うと、手軽に合成のためのソースが手に入るので、イベント周りのRxでのこね方の学習に最適なのではかと思います。……多分。

既存MVVMライブラリとの使い分けなどに関しては、この類の「選択肢が増えます系」の永遠の課題ですねえ。結局、どう使い分けるかの判断をユーザーに丸投げしているわけですもの。ガイドなどを掲示できればベストなのですが、そもそも私がMVVMに全然詳しくないのであった。そもそも私自身がどう使えばいいのか分かってないぐらいなので(えー)、触ってみて、ついでに足りなかったり、これがこうなってたらいい、とかいう思いがあったら、私がそういうのに全然気づいてない確率100%なので、是非言ってやってください。

Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理

Reactive Extensionsv1.1.11011 (Experimental Release)がリリースされました。リリース対象はExperimental(実験)版のみです。Stable(安定)版のほうは変更ありません。別件で少しコメントで質問したところ、近いうちにStable版の更新もあるかも、とのことでしたので、Stableはそちらを待ちましょう。リリース内容の詳細な解説はフォーラムにあります

今回の大きな追加は.NET Framework 4.5 Developer PreviewWinRTへの対応です。というわけで、WinRT関連では、WinRT用のスケジューラであったりイベントであったりへの対応とまぁまぁ想像つく普通のもの。あと非同期処理を他言語と結びつけるIAsyncOperationへの書き出し、などなどもサポートされるようですね。

そして.NET 4.5周りでは、C#5.0のAsyncサポート・クラスライブラリがTask中心に書き換わることが念頭に置かれ、大規模に変更が入っています。今後のRxの方針がよく見えますので、Experimentalではありますが注意深く観察してみる必要がありそうです。というわけで、しっかり紹介します。なお、以下の話は.NET 4.5のRxの話なので、.NET 4.0以前の場合では直接は関係なく、Obsoleteにもなっていません。が、将来フレームワークのバージョン上げたらObsolete祭りでモニョるのは覚悟が必要かしらん。もう一つ注意としては、あくまでExperimentalなので、将来的にもこのままかどうかは保証されません。現時点での話です。

FromAsyncPatternがObsolete

はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。

毎回ToObservableなんて面倒くさい、という人のためにSelectManyに限っては、Taskを受け取るオーバーロードが用意されているので、ToObservableは不要です。内部では予想つく通り、ただ単にToObservableしているだけですね。その他の合成系メソッド(MergeやSwitch)などは残念ながらというか当然というか、ToObservableしてください。

IObservableはAwaitable

IObservable<T>も非同期を扱うものなので、awaitできます。正確に言えばGetAwaiterが定義された、といったところでしょうか。

var req = WebRequest.Create("http://google.com/");
var response = await Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)();

基本的にRxの非同期はFromAsyncPatternを初めとして長さが1のものを扱っていますが、複数の値が流れる場合はどうなるかというと、挙動は「最後の値」です。正確に言えばAsyncSubjectが利用されているので、OnCompletedの直前のOnNextの値。ではEmpty(OnCompletedのみ)やNever(何もなし)ではどうなるのか、というと……

// 10(最後の値)
var a = await Observable.Range(1, 10);
 
// InvalidOperationException(シーケンスに値がない)
var b = await Observable.Empty<int>();
 
// ある意味フリーズ、ここで永遠に止まる
var c = await Observable.Never<int>();

となります。これだけ見るとNeverって使い道がイミフですが、何かとMergeする必要があるときに、マージ対象がないときはNeverを渡すなどなど、ライブラリに近い部分では結構使う場所あります。私の書いているReactivePropertyというライブラリでもそうして利用しています。

FirstなどがObsolete、かわりにFirstAsyncなどとWaitが追加

え?という感じですがObsoleteです。対象はFirst,Last,Single、それとForEachも。FirstOrDefaultなど、XxxDefault系も同様です。つまり同期的にブロックするタイプのものが全てObsolete行きになりました。代わりにFirstAsyncなどXxxAsyncが用意されています。それとawaitを組み合わせてください。

var source = Observable.Return("async?");
 
var value0 = await source; // LastAsyncと挙動は同じ
var value1 = await source.FirstAsync();
var value2 = await source.LastAsync();

長さ1と分かっている状況なら、何もなくそのままawaitでも良いのではかしらん。LastAsyncがそれに相当しますね。さて、しかしawaitのお陰で同期的「のように」書けるには違いないけれど、FirstやLastなどはブロックして「同期」な挙動を取っていたわけなので、単純にObsolete行きにされたら困ってしまいます。そこで、同期的に待機して最後の値を取り出すWaitメソッドが用意されました(これは.NET 4.0やSilverlightなどでも使える、新たに追加されたメソッドです)。

var source = Observable.Return("async?");
 
var value0 = source.Wait();
var value1 = source.FirstAsync().Wait();
var value2 = source.LastAsync().Wait();

ちなみにWaitの中身はLastです。Lastという名前から離して、同期的に待機して値を取り出す、と明示させたのですね。それはいいと思います。Waitはいい。Waitはいいんですが、FirstをObsoleteにしてFirstAsyncを追加するのは、正直気にいりません。私は反対です。

ターゲットフレームワーク間でコードが共有出来なくなる、IQbservableプロバイダに影響が出る、そもそも標準クエリ演算子から離れるのはどうよ。などなど。だいたい、AllやAny、Maxなどは長さ1のIObservable<T>を返すようになっていました。Firstなどだけです、同期的に待機して値を取り出す、という別の意味が与えられていたのは。だから、ここはFirstはObsoleteにせず、FirstAsyncの挙動、つまりIObservable<T>を返すように変更すればいいのです。同期待ちについては、Waitが搭載されたので心配無用です。これで、全ての挙動に統一が取れる。

唯一問題点を挙げれば、本当に本当に「破壊的変更」になるんですよね。それも、Stableとか銘打ったものへの影響も出る。メソッド名同じで戻り値が変わる。そういう変更を許せるものか。私は、許してしまってもいいと思うのですけれど。Firstを廃止してFirstAsyncを追加、などという歪な形を将来に残すよりかは、ずっといい。

なので、Forumにもそうコメント入れましたところ返答貰えました。「Stableリリースが存在する以上、XxxAsyncのままでいるしかない。これが不幸なことは同意しますけれど、暫くはこのままでいるしかない。」とのことでした。というわけで、Stableと銘打つのが早まったな…… としか言いようがなく。あの段階でここまで読めなかったのはしょうがないところ、と思いつつ、やはり手痛いミスかなあ。うーん、将来に渡っての完璧なAPIを作り上げるというのは実に難しい。

まあ、Firstを多用する(といっても単体テストや動作確認時ぐらいですけど)のは、値を一つ取り出したい、ということなので、await sourceかsource.Wait() で済む。FirstAsyncやLastAsyncを直接使うことは恐らく少なくて、ならば実際上の問題というのはそこまでないかもしれません。

長さ1の非同期処理の戻り値はTaskを選ぶべきか、Rxを選ぶべきか

メソッドを作るときの非同期処理の戻り値。これは、Taskを選ぶべきです。それは.NET標準と合わせるべきという理由からもそうですし、この.NET 4.5向けのRxの指針からしてそうなっています。長さ1のIObservableで非同期を表現する、というのは特殊だったと言わざるを得ないので、メソッドを作るとき、非同期処理の戻り値はTaskにしたほうが間違いなく良いでしょう。

ただ、アプリケーショに全体でRxによる合成を中心に置く場合は、ToObservableが面倒くさい、というだけじゃなく逆にオーバーヘッドになる可能性もあるので、IObservable中心にしたほうが良いでしょう。この辺は一概には言えずケースバイケースでしょうか。どちらにせよToObservable<->ToTaskで相互変換が可能なわけなので、あまりガチガチに捉える必要もないですけれど。

あと、あくまで.NET 4.5の話でasync/awaitが入るからTaskのほうが良いと言ってるのであって、.NET 4.0以前ならまた違う話です。というかその場合だとRx一択です。

ねぇ、Rxってもう要らない子?

時代の徒花でしたね、短い命だった……。

って、ちょっと待ったー。それはYESでもあり、NOでもあります。YESなのは、単純な形での非同期処理ならば、遥かに楽になりますし、それに何よりもRxを通すよりもパフォーマンスは良いと思われるので、むしろasync/awaitを使うべきです。具体的には、SelectManyしてSubscribeするだけ、あとCatchで少し例外処理、みたいなコードなら、もう全面的にRxさようならでいいでしょう。

でも往々にしてそういう処理だけじゃないよね?以前にも紹介しましたがSwitch(新しい処理が入ったら以前の非同期処理はキャンセルして新しい処理のみを後続に流す)などを手書きせず演算子一つにパッケージ化できることや、全体的にTaskのメソッド群よりも合成や待ち合わせが容易に記述できる、などなど。そして、「複数の戻り値のある非同期処理、例えばStreamのBeginReadは細切れにbyte[]が得られますが、それを複数回分の非同期処理をまとめてシーケンスとして、IObservable<byte[]>としてまとめることは現状ではRxしかできません。

// 複数回のBeginReadをRx+Asyncで一つにまとめる例
static IObservable<byte[]> ReadMultipleAsync(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(async observer =>
    {
        try
        {
            while (true)
            {
                var buffer = new byte[bufferSize];
                var readCount = await stream.ReadAsync(buffer, 0, bufferSize);
 
                if (readCount == 0) break;
                if (readCount != bufferSize)
                {
                    var newBuffer = new byte[readCount];
                    Array.Copy(buffer, newBuffer, readCount);
                    buffer = newBuffer;
                }
 
                observer.OnNext(buffer); // yield returnのノリで書く
            }
            observer.OnCompleted(); // 完了合図と
        }
        catch (Exception ex)
        {
            observer.OnError(ex); // 例外は自前で明示的に
        }
    });
}

ExperimentalリリースではObservable.Createがasync/await対応しているので、擬似的なyield returnとして、非同期での列挙をそこそこ簡単に記述することができます。OnErrorとOnCompletedは自前で管理する必要がありますけれど。

なので、メソッド単体で分けた場合の戻り値は「長さ1」ならTask、複数ならIObservable。それらメソッドを使って非同期処理を組み上げる時は、単純ならawaitのみ、複雑ならRx。というのが使い分けの指針です。とはいえ、使い分けっていうのは幻想に近くて、実際はどっちか一つになりがちだとは思っています。そして、それならTask中心になるでしょうねえ、とも。非同期における大抵のシチュエーションでRxがサヨウナラ気味になるのはしかたのない話です。ぶっちゃけSelectManyしてSubscribeがほとんどだし、それ以外のことだって、同期的のように書けるのなら、いくらでもやりようはありますから。

まあ、未来を待たなくても今使える解としてなら十分ですし、それ自体がawait可能なので、今書いたコードは将来に渡っても無駄にはなりません。FromAsyncPatternがObsoleteというのは、まあ単純に.NET4.5本来のXxxAsyncに置き換えればいいというだけなので無駄になる、とは言わないでしょう。

けれど、それだけじゃあ、すごく後ろ向きで、寂しいよね。Rx自体の持つ力というのは、別に非同期に限らない。ただの幾つもある側面のうちの一つにすぎない。そこで出した私の答えがReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリです。ReactivePropertyはC#5.0によってプレゼンスが低下するReactive Extensionsに新たな価値をもたらしたいという危機感から作ったものだったりします(後付け、じゃなくてこれは本当の話です、次の一手を指すならRx自体に注目の集まっている今しかない、とも)

ReactivePropertyを通して見ると、非同期だけではない、Rxの持つポテンシャルがよく分かるのではないでしょうか?Rxの真の強みはイベント単独や非同期単独ではなくて、それらが、ただの配列も含めて、統一的に扱える、だから全て一本のストリームになって合成処理が自由自在。というOrchestrateな部分にある。ReactivePropertyはそれを全面的に押し出してます(半強制的に全てが一本に繋がるようになってる)

Rxは、この先も力強く存在し続けるので、学習する価値は間違いなくありますよ!

まとめ

相変わらずフリーダムな変更が続いているRxですが、あくまでExperimental版の話です。Stableでは、こんなバカバカと変わっていくことはないので、安心して使えばいいです。あと、Experimentalなのでまだまだ変動の余地はある、と思いますので、意見あればForumで直接言っておくとよさそう。私も、何やらかんやらと意見言っておきました(昔は書くのにビビッてたのに、今は平然と書いててアレです、慣れですね、ようするに)。

ところでRxは.NET 4.5に標準搭載されるのか否か、ですが、なんかまだまだ全然色々と変更や模索する気満々なようなので、この様子だと、仮にそういう話があったとしても間に合わなさそうという点で、標準搭載はなさそうですねー。ほぼ完成してるのに標準入りしない、とかだと悲しいんですが、そういう形で標準搭載見送り、ならばむしろ喜ばしい話なので、いいかなー、と思います。

それにしてもRx本は出すタイミング難しいですねえ。今回の変更は非同期周りの話がガラッと変わってきちゃうわけなので。また延期かしらねえ。さすがにそれはないか。ところで私もLINQ + Rx本をオライリーから出したいです(←ただたんに表紙をデンキウナギ(Rxのロゴはピンクのデンキウナギ)にしてウナギ本と呼ばれたいという一点だけの話なので間に受けないでください)

ReactivePropertyのデモをしました

Silverlightを囲む会in東京#4にて、先日公開したReactivePropertyについてお話しました。

本題のセッションの後の、お楽しみセッションということで、LT的に5分程度とか思っていたつもりなのですが、大幅に時間オーバーして17分も喋っていました。これは酷い。色々と寛容に見て頂き感謝です。さおさんありがとうー。IIJさんも本当にありがとうございます。時間オーバーを許してくれたというのと(笑)、それと、ネットワークが良好だったお陰でTwitterインクリメンタルサーチのデモが出来たので。毎度ながら凄まじい画質のSmooth Streamingといい、神会場すぎます。

いつまで残るか分かりませんが、会場で行ったセッションの録画です。Silverlight を囲む会 in 東京 #4 @ IIJ 神保町三井ビル。私のセッションは04:01:30 - 04:19:00です。ライブコーディングしているのは04:05:30-04:16:30ですね。

色々アレゲなのはいいとして、以前にスマベンで話をしたときにも反省事項だったのですがすっかり失念してた声の小ささはダメですねー。次は気をつけます。むしろ早口気味なのかと気にしてたんですが、録画を見るとそうでもないというか、このぐらいで調度良いぐらいですね。スライドはちゃっちゃと進めて欲しいし、本題のDemoは素早く進行して欲しいですから。ライブコーディングは好評だったようで何よりです。ちなみに、スムーズにプログラム書いていて凄い!と評価いただきましたが、やる内容が決まっているから書けたというだけで、例えばギターやピアノの演奏などと同じなわけで、普段は頭抱えながらゆったり書いてます。

ちなみに最後のTwitter検索のコードは若干アレだったので、修正したのをここに載せておきます。

<StackPanel>
    <TextBox Text="{Binding CurrentText.Value, UpdateSourceTrigger=PropertyChanged}" />
    <ListBox ItemsSource="{Binding SearchResults.Value}" />
</StackPanel>
public class MainWindowViewModel
{
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveProperty<string[]> SearchResults { get; private set; }
 
    public MainWindowViewModel()
    {
        CurrentText = new ReactiveProperty<string>();
 
        SearchResults = CurrentText
            .Select(word => new WebClient()
                .DownloadStringObservableAsync("http://search.twitter.com/search.atom?q=" + Uri.EscapeUriString(word)))
            .Switch()
            .Select(s =>
            {
                var xml = XElement.Parse(s);
                var ns = xml.Name.Namespace;
                return xml.Descendants(ns + "title").Select(x => x.Value).ToArray();
            })
            .OnErrorRetry((WebException e) => Debug.WriteLine(e))
            .ToReactiveProperty();
    }
}

SelectManyよりもSelect->Switchのほうがいいのと、OnErrorRetryの書く場所は、WebClientの真下だと永遠にリクエストをリピートしちゃうのでダメでしたね。

ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリ

MVVM拡張、という言い方が適切かは不明ですが、ともあれ、RxでXAMLによるUIシステムとの親和性を高めるライブラリを作成し、リリースしました。

中身は大きく分けて二つで、一つはReactivePropertyというXAMLと双方向にバインド可能なIObservable<T>、ReactiveCommandというIObservable<bool>からCanExecuteの条件を宣言的に生成するコマンドなど、MVVM的なUI絡みのクラス群。もう一つはWebClientやWebRequestなど、非同期処理のための拡張メソッド群になります。

名前はUI中心に見えますが、UI絡みはいらないよ、という人は非同期周りだけを使ってくれても問題ありません。それと、機能紹介の前に一つ。決して既存のMVVMフレームワークを置き換えたり、同等の機能を提供するものではありません。ViewModelBaseやMessengerなどに相当するものはないので、その辺は適宜、既存のMVVMフレームワークを使えばいいと思います。というか、併用することを推奨します。だから「拡張ライブラリ」と名乗っています。

UIへのバインディング

ReactivePropertyとは何か。というと、双方向にバインド可能なIObservable<T>です。まず、ViewModel(Model)->Viewという片方向のバインドを見てみましょう。時計のようなものを作ります。

<Grid>
    <TextBlock Text="{Binding DisplayText.Value}"  HorizontalAlignment="Center" VerticalAlignment="Center" />
</Grid>
public class SimpleClockViewModel
{
    // 双方向にバインド可能なIObservable<T>
    public ReactiveProperty<string> DisplayText { get; private set; }
 
    public SimpleClockViewModel()
    {
        // 1秒毎に値を発行、Selectで現在時刻に変換してToReactivePropertyでバインド可能にする
        DisplayText = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .ToReactiveProperty();
    }
}

Microsoft Silverlight を入手

XAML側ではDisplayText.Valueというように、.Valueまで指定してバインドします。実に簡単にIObservableがバインドできると分かるのではないでしょうか?

IObservableとは、時間軸に沿って値が変わるものです。Intervalはx秒置きに等間隔で変わるので、まさに「時間」といったものですが、それ以外のものも全て時間軸に乗っていると考えることが可能です。例えばイベント、クリックやマウスムーブ、ジェスチャーやセンサーイベントで考えると、タッチした、x秒後にまたタッチした、x秒後にまたタッチした…… 発行される時間が不定期なだけで、時間軸に沿って次の値が出力されるという図式は同じです。

非同期処理もそうで、x秒後に一回だけ値が来る。Rangeや配列のToObservableは0.0001秒刻みに値が来る。Rxで、IObservableで表現することが出来る値というのは、時間軸に乗って変わる/発行する値ということになります。そして、見渡してみると、IObservableになる、時間によって変わるという表現がマッチするものは意外と多い。特にリッチクライアントでは。UI自身の値の変化(バインディング/イベントによる通知)もそうだし、ModelのINotifyPropertyChangedもそう。INotifyPropertyChangedとは、或るプロパティの値が変化したという通知を行うオブジェクト。そのプロパティだけに着目してみれば、時間軸上で連続的に変化する値とみなせる、つまりIObservableで表現できます。

UIからのバインディング

では、UIからのバインディングもしてみましょう。これは、空のReactivePropertyを作成してバインディングします。これにより、UIからの入力をIObservableとして他へと中継することができます。

<StackPanel>
    <!-- このTriggerは入力と同時に発火させるために(SL4では)必要なもの -->
    <TextBox Text="{Binding CurrentText.Value, Mode=TwoWay}">
        <i:Interaction.Behaviors>
            <prism:UpdateTextBindingOnPropertyChanged />
        </i:Interaction.Behaviors>
    </TextBox>
    <TextBlock Text="{Binding DisplayText.Value}" />
</StackPanel>
public class FromUIViewModel
{
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveProperty<string> DisplayText { get; private set; }
 
    public FromUIViewModel()
    {
        // UIからのテキスト入力の受け口
        CurrentText = new ReactiveProperty<string>();
 
        // そして、それを元にして加工してUIへ返してみたり
        DisplayText = CurrentText
            .Select(x => x.ToUpper()) // 全て大文字にして
            .Delay(TimeSpan.FromSeconds(3)) // 3秒後に値を流す
            .ToReactiveProperty();
    }
}

Microsoft Silverlight を入手

Interaction.Behaviorは本題とは関係なくて、値の更新通知のタイミングを、値の変更と同時にするためのものです(デフォルトだとフォーカスが移ったときかな)。WPFでは、こういった小細工がなくてもいいのですが、SL4,WP7では必要なので已むを得ず。詳しくは Silverlight 4のTextBoxのTextプロパティの変更のタイミングでBindingのSourceを更新したい - MSDN Samples Gallery に。というわけで、このUpdateTextBindingOnPropertyChangedはPrismからコピペってきたものです。

さて、入力の受け付けをベースにするものは、newで空の物を作ります。出力中心のものはToReactivePropertyなわけですね。あとは、文字が非連続的に、(同じスレッド上の)非同期でやってくるので、LINQで加工します。Selectで大文字にして、そして、Rxなので時間系のものも使えるので、Delayを使ってみたりしながら、UIに戻しました。なお、Delayの時点で値の実行スレッドはUIスレッドからスレッドプールに移りますが、ReactivePropertyを使う限りは、ReactiveProperty内部でスレッド間の値の通知を解決するため、Dispatcher.BeginInvokeも、ObserveOnDispatcherも不必要です。実行スレッドを意識するなんて原始的ですよね、ReactivePropertyなら、全く意識する必要がなくなります。非同期は自然のまま非同期で扱える。だって、そもそも全てが非同期なのだもの。

ReactiveCommand

ReactivePropertyのもう一つの大事な機構が、ReactiveCommandです。これは、IObservable<bool>という、実行可否の変化のストリームからICommandを生成します。一般的なMVVMフレームワークで使われるRelayCommand, DelegateCommandとは発想が異なるのですが、私はこのReactiveCommandのアプローチこそがベストだと考えます。まずは例を。

<StackPanel>
    <StackPanel Orientation="Horizontal">
        <CheckBox IsChecked="{Binding IsChecked1.Value, Mode=TwoWay}">CheckBox1</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked2.Value, Mode=TwoWay}">CheckBox2</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked3.Value, Mode=TwoWay}">CheckBox3</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked4.Value, Mode=TwoWay}">CheckBox4</CheckBox>
    </StackPanel>
    <TextBox Text="{Binding CurrentText.Value, Mode=TwoWay}" />
    <Button Command="{Binding ExecCommand}">Execute?</Button>
</StackPanel>
using Codeplex.Reactive.Extensions; // 拡張メソッドを使う場合はこれを忘れず。
 
public class CommmandDemoViewModel
{
    public ReactiveProperty<bool> IsChecked1 { get; private set; }
    public ReactiveProperty<bool> IsChecked2 { get; private set; }
    public ReactiveProperty<bool> IsChecked3 { get; private set; }
    public ReactiveProperty<bool> IsChecked4 { get; private set; }
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveCommand ExecCommand { get; private set; }
 
    public CommmandDemoViewModel()
    {
            var mode = ReactivePropertyMode.RaiseLatestValueOnSubscribe | ReactivePropertyMode.DistinctUntilChanged;
 
            IsChecked1 = new ReactiveProperty<bool>(mode: mode);
            IsChecked2 = new ReactiveProperty<bool>(mode: mode);
            IsChecked3 = new ReactiveProperty<bool>(mode: mode);
            IsChecked4 = new ReactiveProperty<bool>(mode: mode);
            CurrentText = new ReactiveProperty<string>(
                initialValue: "テキストが空の時はボタン押せないよ",
                mode: mode);
 
            ExecCommand = IsChecked1.CombineLatest(IsChecked2, IsChecked3, IsChecked4, CurrentText,
                    (a, b, c, d, txt) => a && b && c && d && txt != "")
                .ToReactiveCommand();
 
            ExecCommand.Subscribe(_ => MessageBox.Show("Execute!"));
    }
}

Microsoft Silverlight を入手

全てのチェックがONで、かつ、テキストボックスに文字が含まれていないとボタンを押せません(テキストボックスの値の判定はフォーカスが外れてからになります)。CombineLatestというのは、二つの値のうち、どちらか一つが更新されると、片方は新しい値、片方はキャッシュから値を返して、イベントを合成するものです。もし片方にキャッシュがない場合はイベントは起こしません。「二つの値」というように、標準では二つの合成しか出来ないのですが、ReactivePropertyではこれを拡張して7つの値まで同時に合成できるようにしました(それ以上合成したい場合は、匿名型にまとめて、再度CombineLatestを繋げればよいでしょう)。この拡張はCodeplex.Reactive.Extensionsをusingすることで使えるようになります。Extensions名前空間には、他にも有益な拡張メソッドが大量に定義されていますので、是非覗いてみてください。

さて、つまりCombineLatestの結果というのは、ボタンが押せるか否かの、条件のストリームです。どういうことかというと、連続的なCanExecuteです。なので、そのままICommandに変換してしまいましょう、というのがToReactiveCommandになります。CanExecuteに変更があることを、条件のほうからPushして伝えるので、従来使われてきたコマンドを集中管理するCommandManager.RequerySuggestedや、(イベントなので)本来外から叩けないCanExecuteChangedを外から叩けるようにする、などの手立ては不要です。

常にtrueのコマンドならば、new ReactiveCommand()で生成できます。

ところで、mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe というのは、Subscribeされる時に(ToReactivePropertyやToReactiveCommandは内部でSubscribeしています)、同時に最新の値を返します(最新の値がない場合は初期値を返します。初期値は指定することもできますし、もし指定していない場合はdefault(T)になります)。どういうことかというと、判定のタイミングの問題があります。CombineLatestは全てに一度は値の通知が来ている(キャッシュが存在する)状態じゃないと、イベントを起こしてくれません。なので、CanExecuteを初回時から判定させるために、これの指定が必要です。なお、ReactivePropertyModeのデフォルトはDistinctUntilChangedのみで、RaiseLatestValueOnSubscribeは明示的に指定しなければなりません。一件便利そうに見えるRaiseLatestValueOnSubscribeですが、問題も抱えていまして、後でも詳しく説明しますがバリデーションの時。バリデーションの場合は初回実行はして欲しくない(画面を表示したら、いきなり真っ赤っかだと嫌でしょう?)ケースがほとんどのはずです。RaiseLatestValueOnSubscribeを指定すると、初回実行してしまうので、そういう場合にとても都合が悪いのです。これは、良し悪しなので、適宜判断して、最適な方をお選びください。

宣言的であるということ

ReactiveCommandは、条件を宣言的に記述しました。そして、外部から叩くことは禁じられているので、その宣言以外のことが絡む可能性はありません。また、状態を外部変数から取得する(RelayCommandなどはそうなりますね)わけではないので、CanExecuteの変化のスコープはToReactiveCommandをする一連のシーケンスを読むだけですみます。変数を返す場合は、変数を使う範囲、つまりオブジェクト全体から変更可能性が混ざるという、大きなスコープの観察を余儀なくされます。読む場合だけでなく、書く場合でも、集中的に変化の条件を記述することができるので、ずっと楽でしょう。

ReactivePropertyもまた、同じです。値の変化の条件を宣言的に記述しました(こちらはReactiveCommandと違い、(Two-wayでバインド可能にするという都合上外部から叩くことが可能なのでスコープは閉じていませんが)。大事なのは、スコープを小さくすること。大きなスコープは往々に管理しきれないものです。リッチクライアントはステートを持つ。その通りだ。プロパティが、オブジェクトが、絡みあう。それは実に複雑なのは間違いない。でも複雑だから難しくて当然、複雑だからテストできない、複雑さを複雑なまま放っておいたら、それはただの新世代のスパゲティにすぎない。

ステートを捨てようじゃあなくて、宣言的にやろう。それがReactivePropertyの解決策、提案です。

INotifyPropertyChangedと一緒に。

全てReactivePropertyで相互作用を記述する、というのは理想的ですが過激派です。それに、既存のModelや自動生成のModelなど、様々なところにINotifyPropertyChangedはあります。理想だけじゃ生きていけません。それに、私もプレーンなModelはPOCO(+INotifyPropertyChanged)のほうが嬉しい。でも、IObservableになっていないと、関係の合成が不可能で困るので、INotifyPropertyChanged -> ReactivePropery変換を可能にしました。ここでは説明しませんが、その逆のReactiveProperty -> INotifyPropertyChanged変換も可能です。

using Codeplex.Reactive.Extensions; // ObservePropertyもこれをusingで。
 
public class ObserveViewModel
{
    public ReactiveProperty<string> ModelText { get; private set; }
 
    public ObserveViewModel()
    {
        ModelText = new ToaruModel()
            .ObserveProperty(x => x.Text) // タイプセーフにIObservable<T>に変換
            .ToReactiveProperty();
    }
}
 
// WCFからだったりEntity Frameworkだったり既存のModelだったり他のViewModelだったり
// ともかく、INotifyPropertyChangedは至る所に存在します
public class ToaruModel : INotifyPropertyChanged
{
    private string text;
    public string Text
    {
        get { return text; }
        set { text = value; PropertyChanged(this, new PropertyChangedEventArgs("Text")); }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}

INotifyPropertyChangedの仕組みって、とあるプロパティが変更された、と名前でPushして、変更通知を受けた方はその名前を元にPullで取り出す。描画フレームワークが面倒を見ているなら、それでいいのですが、通常使うには、とてもまどろっこしい。だから、そうしたオブジェクトという大きな土台から名前ベースのPush & Pull通知を、プロパティ単位の小さなPush通知に変換してやりました。指定はExpressionで行うのでタイプセーフですしね。

ReactivePropertyのほうがINotifyPropertyChangedよりも細かいハンドリングが効くのは当たり前の話で、単位が小さいから。逆に、だから、INotifyPropertyChangedという大きい単位で関係を作り込んでいくのは、非常に複雑でスパゲティの元だと言わざるを得ない。勿論、Reactive Extensionsという、プロパティ単位でのPushを自在に扱う仕組みが背後にあってこそのやり方ではあるのですが。

MとVMの境界

が、曖昧にみえるのはその通りかもしれません。けれど、処理がVMに偏りすぎるように見えるのなら、それは素直にMに移せばいい。細かいMを束ねるMを導入すればいい。名前は、サービスでもファサードでもプロキシーでもアプリケーションでもコントローラーでも、なんでもいい(わけではないけれど)。移せばいいなんて簡単にいいますが、それは簡単にできるからです。VM-M-VMが一気通貫してループを描いているなら、ローカル変数への依存もなくメソッドチェーンを切った貼ったするだけなので、どこに置くのも移すのは楽です。

そもそも、最初から明確に分けようとしたってどうせうまくいかないもの。インターフェイスだって、具象型から抽象を見出したほうが簡単だし、ずっとうまくいくでしょう?ネーミングだってリファクタリングで徐々に洗練させる。そもそもVMがヘヴィになりがちなのは、目で見える境界がないから、なせいでしょう。VはXAMLで線引きされるけれど、それ以外はコードで地続き。理想論だけで線を引こうとしたって空疎だし、そもそも、無理な話。境界を見出すには具体的に積み重なった後じゃないと無理でしょう(勿論、境界の敷き方を常日頃考える、研究することは有意義だと思います。そもそも考えていないと、いざ境界を見出そうとしても見えませんから)

そもそもMVVMなのか

UIに対するReactive Programmingなのは間違いないと思ってます。Reactive ProgrammingはUI向きだ。よく聞くその話は、実際その通りだと思うのですが、しかし同時にUI(というか、WPF/SL/WP7などXAML)とRxってどうもイマイチフィットしないなぁ、と悩んでいました。その理由は、最終的に描画を司るフレームワーク(XAML)とミスマッチなせいにあるのだと、気づきました。フレームワークの要求(INotifyPropertyChangedなオブジェクトであったりICommandであったり)と異なるものを、そのまま使おうとしたところで、良い結果は得られない。ゴリ押ししてもXAMLの旨みが生かせないし、Reactive Programmingを大前提に置いた描画フレームワークを構築すれば、もっと違う形になるでしょうが、そんなものは非現実的な話です。膨大な投資のされた、現在のXAML中心のシステムより良いもの……。やはり、絵空事にしか見えません。それに、XAMLは何だかんだ言って、良いものです。

それを認識したならば、必要なのは、境界を繋ぐシステムだと導ける。そのことを念頭においてReactivePropertyとReactiveCommandをデザインしました。MVVMライクなのは描画フレームワークに合わせた結果です、だから、MVVMでもあり、そうでもないようでもある。ただ、それによってパラダイムがミックスされてどちらの長所も活かせるし、世界最高峰のシステムであるXAMLアプリケーションに乗っかれるので今すぐ実用的という面もあるわけなので、これでいいと思うんです。いや、これがいいんです。マルチパラダイムは悪いことではない。あとは、ミックス故に生まれる新しい悩みをどう解消していくか、です。

マルチパラダイムといえば、ReactivePropertyは描画フレームワークからの言語への要求が変化(吸収)しているので、F#でも美味しくXAMLアプリケーションを書くことが可能になるでしょう。多分。

非同期拡張メソッド群

Rxは非同期の苦痛を癒す。とはいっても、実のところ素の状態だと罠が多くて、意外と使いづらかったりします。WebClientは、実行順序の問題があり、そのままではRxで扱いにくい。WebRequestはWebRequestでプリミティブすぎて機能が乏しいし、そのままではリソース処理に問題を抱えたりします。どちらも、ただFromEvent, FromAsyncするだけでは足りなくて、もう一手間かけたRx化が必要です。そのため、WebClient, WebRequestに対して拡張メソッドを用意し、簡単に実行出来るようにしました。

ReactivePropertyと合わせてのインクリメンタルサーチの例を。これは、ReactivePropertyのダウンロードファイルに含む非同期サンプルですので、是非ダウンロードして、サンプルを実際に実行してみてください。

using Codeplex.Reactive.Asynchronous; // 非同期系の拡張メソッド群を格納
using Codeplex.Reactive.Extensions; // OnErrorRetryはこちら
 
public class AsynchronousViewModel
{
    public ReactiveProperty<string> SearchTerm { get; private set; }
    public ReactiveProperty<string> SearchingStatus { get; private set; }
    public ReactiveProperty<string> ProgressStatus { get; private set; }
    public ReactiveProperty<string[]> SearchResults { get; private set; }
 
    public AsynchronousViewModel()
    {
        // IncrementしたりDecrementしたりすることでイベント(Empty ,Inc, Dec, Max)が発生する
        // それはネットワークの状態を管理するのに都合が良い(IObservable<SignalChangedStatus>)
        var connect = new SignalNotifier();
        // 指定したスケジューラ(デフォルトはUIDispatcherScheduler)上で任意にイベントを起こせる
        // 主にProgressと併用して進捗報告に利用する
        var progress = new ScheduledNotifier<DownloadProgressChangedEventArgs>();
 
        SearchTerm = new ReactiveProperty<string>();
 
        // 検索は当然非同期で行い、それをダイレクトにバインドしてしまう
        SearchResults = SearchTerm
            .Select(term =>
            {
                connect.Increment(); // 非同期なのでリクエストは一つじゃなく並列になるので、これで管理
                return WikipediaModel.SearchTermAsync(term, progress)
                    .Finally(() => connect.Decrement()); // リクエストが終了したら、確実にカウントを下げる
            })
            .Switch() 
            .OnErrorRetry((WebException ex) => ProgressStatus.Value = "error occured")
            .Select(w => w.Select(x => x.ToString()).ToArray())
            .ToReactiveProperty();
 
        // SignalChangedStatus : Increment(network open), Decrement(network close), Empty(all complete)
        SearchingStatus = connect
            .Select(x => (x != SignalChangedStatus.Empty) ? "loading..." : "complete")
            .ToReactiveProperty();
 
        ProgressStatus = progress
            .Select(x => string.Format("{0}/{1} {2}%", x.BytesReceived, x.TotalBytesToReceive, x.ProgressPercentage))
            .ToReactiveProperty();
    }
}
 
// 非同期リクエストとデータ。単純ですが、Modelということで。
public class WikipediaModel
{
    const string ApiFormat = "http://en.wikipedia.org/w/api.php?action=opensearch&search={0}&format=xml";
 
    public string Text { get; set; }
    public string Description { get; set; }
 
    public WikipediaModel(XElement item)
    {
        var ns = item.Name.Namespace;
        Text = (string)item.Element(ns + "Text");
        Description = (string)item.Element(ns + "Description");
    }
 
    // WebClientの他に、WebRequestやWebResponseへの非同期拡張メソッドも多数用意されています
    // また、ほとんど全ての非同期拡張メソッドにはプログレス通知を受け付けるオーバーロードがあります
    public static IObservable<WikipediaModel[]> SearchTermAsync(string term, IProgress<DownloadProgressChangedEventArgs> progress)
    {
        var clinet = new WebClient();
        return clinet.DownloadStringObservableAsync(new Uri(string.Format(ApiFormat, term)), progress)
            .Select(Parse);
    }
 
    static WikipediaModel[] Parse(string rawXmlText)
    {
        var xml = XElement.Parse(rawXmlText);
        var ns = xml.Name.Namespace;
        return xml.Descendants(ns + "Item")
            .Select(x => new WikipediaModel(x))
            .ToArray();
    }
 
    public override string ToString()
    {
        return Text + ":" + Description;
    }
}

色々な機能を一度に説明しようとしているので、些か複雑かもしれません。まず、非同期リクエストは並列になります。例えばボタンを、通信中はDisabledにするのに、単純にbooleanで管理してもうまくいきません。どれか一つのアクセスが始まったらDisabledにしてどれか一つのアクセスが終わったらEnabledにする。それではダメです。どれか一つのアクセスが終わったところで、他のリクエストが通信中かもしれないケースに対応できませんから。

そこで、ReactivePropertyではSignalNotifierというものを用意しました。これは、IncrementかDecrementの操作によって、「ゼロになった」「インクリメントされた」「デクリメントされた」「Max(初期値で指定した場合)になった」というイベントを発行します。イベントといっても、自身がIObservable<SignalStatus>になっているので、直接Rxで扱えます。これのネットワークリクエストへの適用はシンプルで、通信開始されたらインクリメント。通信終了したらデクリメントする。そして、ゼロになったか否かを見れば、それが通信中か否かの判定になります。

非同期拡張メソッドはキャンセルに対しても強く考慮されています。WebClient(のSubscribeの戻り値)にDisposeするとCancelAsyncを、WebRequest(のSubscribeの戻り値)にDisposeするとAbortを呼ぶようになっています。このような挙動は、単純にFromEvent, FromAsyncしただけでは実現できないので、大きくて間を省けることでしょう。ネットワークリクエストを自身でキャンセルすることは少ないかもしれませんが、上の例であげたSwitchは内部でDisposeを呼びまくる仕組みになっていますので、しっかり対応している、というのは実行上、大きなアドバンテージとなります。

Switchは複数の非同期リクエストが確認された場合に、前のリクエストをキャンセル+キャンセルが遅れた場合でも遮断して結果を後続に返さないことで、最新のリクエストの結果のみを返します。そのため、非同期リクエストが抱える結果が前後してしまう可能性、例えばインクリメンタルサーチではLINQと検索したのに、LINQの結果よりも後にLIの結果が返ってきたために、表示されるのがLIの結果になってしまう。などという自体が防げます。

また、OnErroRetryに注目してください。これはReactivePropertyが独自に定義している拡張メソッドで、例外発生時の処理をすると同時に、Retry(ここでいうとSearchTermの再購読なので、つまりチェーンの状態が維持される、ということになる)します。ToReactivePropertyを使い、ダイレクトに結び付けている場合は、例外が発生するとチェーンが終了して困るのですが、例外処理にこのOnErrorRetryを使うことで、そのような悩みは不要になります。なお、このOnErrorRetryは勿論ReactiveProperty専用というわけでもなく汎用的に使えます。例えば、もしネットワークからのダウンロードに失敗したら、一定間隔を置いて再度ダウンロードをする、但しリトライの挑戦は指定回数まで。というよくありそうな処理が、引数で回数とTimeSpanが指定できるので、簡単に記述できます。

進捗レポートも非同期処理では欠かせませんが、これは非同期拡張メソッドとScheduledNotifierを組み合わせることで、簡単に実現出来ます。これら非同期周りのサポートはReactivePropertyの重要な柱だと考えているので、UI周りの機能は必要ない、という人も、是非試してみて欲しいです。

同期 vs 非同期

SLやWP7はともかく、WPFでこのように強烈に非同期サポートする意味はあるのでしょうか。というと、あります(ただたんにコード共有しているから、というだけではなく)。まず、WinRTがそうなように、時間のかかる処理は時間のかかる処理なわけなので、強制的に非同期になっていたほうが、ViewModelなり束ねるModelなりで、そこら中に、明示的にスレッド管理(ただたんにTaskに投げるのも含む)をしないで済みます。本質的に非同期(CPU依存ではない形で時間がかかる)なものは非同期として扱ったほうが易しいのです。

もう一つは、Switchのような、キャンセルを多用した処理が書きやすいこと。それに、自然な形でプログレス処理もサポートできます。更には、ReactivePropertyを全面に使うのなら、全てがReactiveに通知しあう世界、つまり全てが非同期で回っているので、非同期のほうが圧倒的に相性が良いです。同期プログラミングさようなら。大丈夫です、何も問題ありません。

C#5.0 Async vs Rx

従来通りに書く。シンプルに同期のように。のならば、async/awaitのほうがずっと良い。そういう使い方をする場合は、Rxを非同期に使う必要性というのは、今後はなくなるでしょう。ではRxでの非同期に価値はなくなってしまうのか?というと、それに関しては明確にNOと答えます。

Rxの場合はLINQということで、宣言的なスタイル、演算子という形に汎用的な処理を閉じ込められる高いモジュール性。というのがあります。上で見たきたように、Switchのようなこと、OnErrorRetryのようなこと、これらを演算子という形で定義して、メソッド一発で適用出来るのはRxならではの利点です。もし自分でそれらの処理を書くとしたら…… あまり考えたくはないし、もしメソッドの形でまとめあげるとしても、Rxのように綺麗に適用させるのは不可能でしょう。どこか歪んだシグネチャを抱えることになります。

ReactivePropertyと親和性が高いのでViewModelへの伝達に使いやすいというのもポイントですね(TaskとIObservableは相互に変換可能なので、ToTaskしたりToObservableしたりするだけなので、別段問題でもないですけど)

使い分けというのは実際のところ幻想みたいなことなので、人によりどちらか主体のスタイルには落ち着くでしょう。私は、とりあえずRx主体で行きたいかなあと思ってますが、ライブラリ的な部分ではasync/awaitを使って書くでしょう(演算子の組み合わせでやろうとすると書くのも難しいし、パフォーマンスも出ないので)。現在のシーケンスに対する、yield returnで汎用的な演算子を作って、通常使うシーンではLINQ to Objectsで、定義した演算子を含めて使っていく。というのと同じスタイルが良さそうだと想像します。async/awaitの書きやすさ・パフォーマンスと、Rxのモジュール性の両立はその辺かなあ、って。

あと、連続的な非同期処理を一纏めにするというのが(今のところ)async/awaitだと出来ない(Task<T>の戻り値は一つだけだから)ので、その辺をやりたい場合にもRx(IObservable<T>は元より複数の戻り値を内包する)頼みになります。ここは将来的にどういう形になるのか、まだ不明瞭なところなので断言はしませんが。

Validation

ReactivePropertyでは、三種類のバリデーションに対応しています。DataAnnotationsによる属性ベース、IDataErrorInfoによるPull型のエラー確認、INotifyDataErrorInfoによるPush型/非同期のエラー確認。ただし、WPFではINotifyDataErrorInfoは使えなく(.NET4.5からは入るようですが)、WP7ではDataAnnotationsが使えません。これはクラスライブラリの問題なので私の方では如何ともしがたくで。

// XAMLは省略。詳しくはSample/Validationを見てください
 
public class ValidationViewModel
{
    [Required]
    [Range(0, 100)]
    public ReactiveProperty<string> ValidationAttr { get; private set; }
    public ReactiveProperty<string> ValidationData { get; private set; }
    [StringLength(5)]
    public ReactiveProperty<string> ValidationBoth { get; private set; }
    public ReactiveProperty<string> ValidationNotify { get; private set; }
    public ReactiveProperty<string> ErrorInfo { get; private set; }
    public ReactiveCommand NextCommand { get; private set; }
 
    public ValidationViewModel()
    {
        // 属性ベースのバリデーションは、自身のプロパティをExpressionで指定することで適用できます
        // 通常属性ベースの場合、例外経由ですが、ReactivePropertyではIDataErrroInfo経由になります
        // そのため、XAML側ではValidatesOnDataErrors=Trueにしてください
        ValidationAttr = new ReactiveProperty<string>()
            .SetValidateAttribute(() => ValidationAttr);
 
        // IDataErrorInfoではエラー時のメッセージを渡します、nullの場合は成功の判定になります
        ValidationData = new ReactiveProperty<string>()
            .SetValidateError(s => s.All(Char.IsUpper) ? null : "not all uppercase");
 
        // 三種類の指定は、重ねることが可能です(但し同じ種類のものを複数指定するのは不可能)
        ValidationBoth = new ReactiveProperty<string>()
            .SetValidateAttribute(() => ValidationBoth)
            .SetValidateError(s => s.All(Char.IsLower) ? null : "not all lowercase");
 
        // INotifyDataErrorInfoの場合は、IObservable<IEnumerable>を返してください
        // 第一引数はself、つまりIObservable<T>になっていて、最終的にSelectでIEnumerableに変換します
        // IObservableということで、非同期での検証が可能になっているのがポイントです
        // これもIDataErrorInfoと同じく、nullの場合は成功という判定になります
        ValidationNotify = new ReactiveProperty<string>("foo!", ReactivePropertyMode.RaiseLatestValueOnSubscribe)
            .SetValidateNotifyError(self => self
                .Delay(TimeSpan.FromSeconds(3)) // DB問い合わせなど非同期なバリデーション(が可能)
                .Select(s => string.IsNullOrEmpty(s) ? null : new[] { "not empty string" }));
 
        // バリデーションの結果は、三種類全てまとめられてObserveErrorChangedから購読できます
        var errors = Observable.Merge(
            ValidationAttr.ObserveErrorChanged,
            ValidationData.ObserveErrorChanged,
            ValidationBoth.ObserveErrorChanged,
            ValidationNotify.ObserveErrorChanged);
 
        // もし、それらを分類したいときは、OfTypeを使うといいでしょう
        ErrorInfo = Observable.Merge(
                errors.Where(o => o == null).Select(_ => ""), // 成功はnull
                errors.OfType<Exception>().Select(e => e.Message), // 属性からはException
                errors.OfType<string>(), // IDataErrorInfoからはstring
                errors.OfType<string[]>().Select(xs => xs[0]))  // INotifyDataErrorInfoからは、IEnumerableの何か
            .ToReactiveProperty();
 
        // 検証が全部通ったら実行可能にするコマンド、などもこうやって書けますね!
        NextCommand = errors.Select(x => x == null).ToReactiveCommand(initialValue: false);
        NextCommand.Subscribe(_ => MessageBox.Show("Can go to next!"));
    }
}

Microsoft Silverlight を入手

一点だけ通常と異なるのは、属性ベースのものを例外ではなくてIDataErrorInfoとして扱います(この辺はRxのパイプラインを通す都合上、例外を出すという形での実現が不可能だったので)

Event to Observable

イベントをXAML側で指定して、ReactivePropetyにバインドすることが可能です。

<Grid>
    <i:Interaction.Triggers>
        <i:EventTrigger EventName="MouseMove">
            <r:EventToReactive ReactiveProperty="{Binding MouseMove}" />
        </i:EventTrigger>
    </i:Interaction.Triggers>
    <TextBlock Text="{Binding CurrentPoint.Value}" />
</Grid>
public class EventToReactiveViewModel
{
    public ReactiveProperty<MouseEventArgs> MouseMove { get; private set; }
    public ReactiveProperty<string> CurrentPoint { get; private set; }
 
    public EventToReactiveViewModel()
    {
        // UIからのイベントストリームを受信
        MouseMove = new ReactiveProperty<MouseEventArgs>();
 
        // とりあえず座標を表示する、というもの
        CurrentPoint = MouseMove
            .Select(m => m.GetPosition(null))
            .Select(p => string.Format("X:{0} Y:{1}", p.X, p.Y))
            .ToReactiveProperty("MouseDown and drag move");
    }
}

Microsoft Silverlight を入手

お手軽なので、結構便利だと思います。あの長大なFromEventPatternを書くよりかは(笑)

シリアライズ

特にWP7では頻繁な休止と復帰で、シリアライズ/デシリアライズによる状態の回復が重要です。そこで、値の回復を可能にするシリアライズ用のヘルパーを用意しました。

// こんなビューモデルがあるとして
public class SerializationViewModel
{
    // なにもつけてないと普通にシリアライズ対象
    public ReactiveProperty<bool> IsChecked { get; private set; }
    [IgnoreDataMember] // Ignoreつけたら無視
    public ReactiveProperty<int> SelectedIndex { get; private set; }
    [DataMember(Order = 3)] // Orderつけたら、デシリアライズの順序を規程
    public ReactiveProperty<int> SliderPosition { get; private set; }
}
 
// 例えばWindows Phone 7のトゥームストーンなシチュエーションを考えてみると
private SerializationViewModel viewmodel = new SerializationViewModel();
private string viewmodelData = null;
 
protected override void OnNavigatingFrom(System.Windows.Navigation.NavigationEventArgs e)
{
    viewmodelData = SerializeHelper.PackReactivePropertyValue(viewmodel);
}
 
protected override void OnNavigatedTo(System.Windows.Navigation.NavigationEventArgs e)
{
    SerializeHelper.UnpackReactivePropertyValue(viewmodel, viewmodelData);
}

デシリアライズの順序はDataMember属性のOrderに従います。詳しくはデータ メンバーの順序を参照のこと。Pushしあう関係の都合上、デシリアライズの順序によって正確な復元ができないこともあるでしょうから、その場合は、Orderをつけると、ある程度制御できます。また、IgnoreDataMember属性をつけておくと、シリアライズ対象から除外することが可能です。

スニペットとサンプル

NuGetから入れてもらってもいいのですが、ダウンロードしてもらえるとコードスニペットとサンプルがついてきますので、最初はダウンロードのほうがいいかもです。コードスニペットはrpropでReactiveProperty<T> PropertyName{ get; private set; }という頻繁に書くことになる宣言が展開されます。他にはrcomm(ReactiveCommand)など。

サンプルはWPF/SL4/WP7全てで用意しました。サンプルを割としっかり用意した最大の理由は、ただ渡されても、もしかしなくてもどう書けばいいのかさっぱり分からないのでは、と思ったのがあります。決して複雑ではなく、むしろシンプルだし記述量は大幅に減るわけです、が、従来のやり方からするとあまりにも突飛なのは否めないので、いきなりスイスイ書くというのは無理ですよねぇ、と。

その他紹介していない、サンプルに載ってない機能は、まだいっぱい。こんなに記事がなくなっちゃってもまだ全然足りない。でも、いきなりてんこ盛りだと引いてしまうので、基本的にはReactivePropertyとReactiveCommandが主体で、慣れたら徐々に周囲を見てもらえばな、ぐらいに思っています。

まとめ

仕上がりはかなり良いと、興奮しています。この長い記事!興奮を伝えたいという気持ちでいっぱいだからです。今後も、利用シーンの模索と合わせて、どんどん進化させていくつもりです。初回リリースですし、というのもありますが、コアコンセプトの実現と、使い勝手としてのAPIの錬成に力を注いだので、それ以外の部分の研究が疎かになっているというのは否めませんので、そこのところの強化も行なっていきます。また、JavaScriptへの移植もノリ気なので、まずKnockout.jsを試して、その上に構築させたいなあ、とか考えています。

ところで、10/8土曜日、明日のSilverlightを囲む会in東京#4の一番最後に、少し、デモ中心でお話をするつもりなので(最後のオマケなのでほんの少しだけですけどね)良ければ見に来てください。ギリギリではありますが、まだ申し込みも出来ると思います。また、もしよければ会場/懇親会でつかまえて聞いてくれたりすると泣いて喜びます。会場に来れなくてもIIJさんのSmooth Streamingで超高画質な配信が行われると思われますので、そちらでも是非是非。

SL/WP7のSilverlight Unit Test Frameworkについて少し深く

の、前に少し。DynamicJsonAnonymousComparerをNuGetに登録しました。どちらも.csファイル一個のお手軽クラスですが、NuGetからインストール可能になったことで、より気楽に使えるのではかと思います。機能説明は省略。

そして、昨日の今日ですがChaining AssertionSilverlight Unit Test Frameworkに対応させました。リリースのバージョンは1.6.0.1ということで。NuGetではChainingAssertion-SLChainingAssertion-WP7になります。

Silverlight Unit Test Framework

Silverlightで使う場合は(WP7じゃなくてね、という意味です)、一応Silverlight Toolkitに同梱という話ではあるのですが、テンプレートなどの用意が面倒くさいので、NuGet経由で入れるのが最も楽のようです。Install-Package Silverlight.UnitTestで。

まず、Silverlightアプリケーションを新規作成。Webサイトでのホストはなしでいいです。それとブラウザで実行させる必要もないので、プロジェクトのプロパティからOut of Browserに変更してしまいましょう。次に、NuGetからInstall-Package Silverlight.UnitTest。これでライブラリの参照と、ApplicationExtensions.cs(イニシャライズ用拡張メソッド)、UnitTest.cs(テスト用テンプレ)が追加されているはずです。次にApp.xaml.csのStartupを以下のように書き換えます。

private void Application_Startup(object sender, StartupEventArgs e)
{
    // this.StartTestRunnerDelayed();
    this.StartTestRunnerImmediate();
}

StartTestRunnerDelayedはテストランナー起動時に実行オプション(指定属性のもののみ実行するなど)を選択可能にするもの、Immediateはすぐに全テストを実行する、というものです。どちらかを選択すればOK。それで、とりあえず実行(Ctrl+F5)してみれば、テストランナーが立ち上がって、デフォテンプレに含まれるUnitTest.csのものが実行されているんじゃないかしらん。あとは、それを適宜書き換えていけばよし。なお、テンプレのテストクラスはSilverlightTestを継承していますが、これは必ずしも継承する必要はありません。後述しますが、Asynchronousのテストを行いたいときは必須ですが、そうでないならば、普通にMSTestでの場合と同じように、[TestClass]と[TestMethod]属性がついているものがテスト対象になっています。

なお、MainPage.xaml/.xaml.csは不要なので削除してしまってOK。StartTestRunnerによって、参照DLLのほうに含まれるxamlが呼ばれているためです。

WP7の場合。

一応NuGetにも用意されてるっぽい(silverlight.unittest.wp7)んですが、動きませんでした。ので、今のところ手動で色々用意する必要があります。詳しくはWindows Phone 7用の単体テストツール? その2「使ってみた」 - かずきのBlog@Hatenaに全部書いてあるのでそちらを参照のことということで。参照するためのDLLを拾ってくる→App.xaml.cs、ではなくてMainPage.xaml.csを書き換える、という、Silverlight版とやることは一緒なのですけどね。こういう状況なのはMangoのSDKがベータだったからとかなんとかのせいだとは思うので、近いうちに解決するのではかと、楽観視したいところです。

Chaining Assertionを使ってみる

Chaining Assertion ver 1.6.0.0の解説で紹介した失敗結果が丁寧に表示されるよー、をチェックしてみませう。

// こんなクラスがあるとして
public class Person
{
    public int Age { get; set; }
    public string FamilyName { get; set; }
    public string GivenName { get; set; }
}
 
[TestClass]
public class ToaruTest
{
    [TestMethod]
    public void PersonTest()
    {
        // こんなPersonがあるとすると
        var person = new Person { Age = 50, FamilyName = "Yamamoto", GivenName = "Tasuke" };
        // こんな風にメソッドチェーンで書ける(10歳以下でYamadaTarouであることをチェックしてます)    
        // 実際の値は50歳でYamamotoTasukeなので、このアサーションは失敗するでしょう
        person.Is(p => p.Age <= 10 && p.FamilyName == "Yamada" && p.GivenName == "Tarou");
    }
}

はい、ちゃんと表示されます。Chaining Assertionを使うと、メソッドチェーンスタイルで、実際の値.Is(期待値の条件)というように、 簡潔な記述でテストを書くことが出来るのがうりです。また、失敗時には、この場合personの値を詳細に出力してくれるので、何故失敗したのかが大変分かりやすい。もし、普通に書くと以下のようになりますが、

// もし普通に書く場合
var person = new Person { Age = 50, FamilyName = "Yamamoto", GivenName = "Tasuke" };
Assert.IsTrue(person.Age <= 10);
Assert.AreEqual("Yamada", person.FamilyName);
Assert.AreEqual("Tarou", person.GivenName);

まず、Assert.IsTrueでは失敗時にperson.Ageの値を出してくれないので、確認が面倒です。また、この場合、Personが正しいかをチェックしたいわけなので、FamilyNameやGivenNameも同時に判定して欲しいところですが、Ageを判定した時点で失敗のため、そこでテストは終了してしまうため、FamilyNameやGivienNameの実際の値を知ることは出来ません。

などなどの利点があるので、Chaining Assertionはお薦めです!この記事はSilverlight Unit Test Frameworkの紹介の体をとっていますが、実態はChaining Assertionの宣伝記事ですからね(キリッ

非同期テストをしてみる

Silverlightといったら非同期は避けて通れない。というわけで、Silverlight Unit Test Frameworkには非同期をテストできる機構が備わっています。[Asynchronous]というように、Asynchronous属性をつければそれだけでOK。と、思っていた時もありました。実際に試してみると全然違って、独特なシステムのうえにのっかっていて、かなり面倒くさかった……。

準備。まず、非同期テストをしたいクラスはSilverlightTestクラスを継承します。そしてAsynchronous属性をつけます。すると、そのテストメソッドはTestCompleteが呼ばれるか例外を検知するまでは、終了しなくなります。というわけで、こんな感じ。

[TestClass]
public class ToaruTest : SilverlightTest
{
    [TestMethod]
    [Asynchronous]
    public void AsyncTest()
    {
        var req = WebRequest.Create("http://www.google.co.jp/");
        req.BeginGetResponse(ar =>
        {
            try
            {
                req.EndGetResponse(ar)
                    .ResponseUri.ToString()
                    .Is("http://www.google.co.jp/");
            }
            catch (Exception ex)
            {
                EnqueueCallback(() => { throw ex; }); // 例外はテスト用スレッドに投げる必要がある
                return;
            }
 
            // ↓は定型句なので、EnqueueTestComplete(); という単純化されたのが用意されている
            EnqueueCallback(() => TestComplete()); // 何事もなければ終了でマーク
        }, null);
    }
}

このUnitTestの非同期は、独自のスレッドモデル(のようなもの)で動いていて、Dispatcherのようなキューにたいしてアクションを放り投げてあげる必要があります。別スレッドからUIスレッドは触れないように、「成功(TestComplete)」か「失敗(例外発生)」を伝えるには、EnqueueCallbackを経由しなければなりません。この辺はDispatcher.BeginInvokeするようなもの、と考えるといいかもしれません。

上のは少し原理に忠実にやりすぎた。まるごとEnqueueCallbackしてしまえばスレッドを意識する必要性は少しだけ減ります。

[TestMethod, Asynchronous]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/404"); //404なので例外出してくれる
    req.BeginGetResponse(ar =>
    {
        EnqueueCallback(() => req.EndGetResponse(ar)
            .ResponseUri.ToString()
            .Is("http://www.google.co.jp/"));
 
        EnqueueTestComplete();
    }, null);
}

といっても、これは非常に単純なケースなだけであって、複雑なケースを書くとどんどん泣きたくなっていくでしょう……。一応、Enqueueには他にEnqueueConditionalという、条件式がtrueになるまで待機し続けるというものが用意されているので、若干制御はできなくもないんですが、あんまりできるとは言い難い仕組みがあります。詳しくは述べませんというか、別に使いやすいシステムじゃないのでどうでもいいです。

Rxを使ってみる

結果・もしくは例外を別のスレッドシステムに投げる。どこかで聞いたことあるような。ここでティンと来るのはReactive ExtensionsのObserveOnDispatcherです。Dispatcher.BeginInvokeのかわりにEnqueueCallback。丸っきりそっくり。なので、ObserveOnTestQueueのようなメソッドが作れれば、非常に使い勝手がいいんじゃないか。と思い浮かぶわけです。

と、浮かんだ人は実に素敵な発想力を持っていますね。浮かんだのは私じゃなくて海外の人です。はい。Writing asynchronous unit tests with Rx and the Silverlight Unit Testing Framework | Richard Szalayに、実装が書かれています。

そのRxによるScheduler実装を使うと(WP7版なのでSystem.ObservableとMicrosoft.Phone.Reactiveも参照してください)

[TestMethod, Asynchronous]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/");
    Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse,req.EndGetResponse)()
        .ObserveOnTest(this)
        .Subscribe(r => 
            r.ResponseUri.ToString().Is("http://www.google.co.jp/"),
            () => TestComplete());
}

EnqueueCallbackの管理がなくなり、非常に簡単に記述できました。Rxのスケジューラのシステムの柔軟さの賜物ですね。これはRxの素晴らしい応用例だと本当に感動しました。Richard Szalayさんに乾杯。それと、私がこの記事を知ったのはInfoQ: Rx と Silverlight で非同期テストを記述するからなので、紹介したInfoQと、そして翻訳した勇 大地さんにも大変感謝します。

Silverlightの場合

Richard SzalayさんのコードはWP7のMicrosoft.Phone.Reactiveのためのものなので、Silverlight用Rxの場合はそのままでは動きません。はい。残念ながら、WP7版RxとDataCenter版Rxとでは、互換性がかなり崩壊しているので、そのまま動くことなんてないんです。悲しいですねえ……。これに関しては銀の光と藍い空: 「Rx と Silverlight で非同期テストを記述する」をWeb版にも使えるようにしたい!に書かれていますが、Silverlight用に移植してあげればよいようです。

既に、上記記事で田中さんが移植されているのですが、二番煎じに書いてみました(と、※欄で書いたものを流用です、毎回、流用させてもらっていてすみません……)

public static class TestHarnessSchedulerObservableExtensions
{
    public static IObservable<T> ObserveOnTestHarness<T>(this IObservable<T> source, WorkItemTest workItemTest)
    {
        return source.ObserveOn(new TestHarnessScheduler(workItemTest));
    }
 
    public static IDisposable RunAsyncTest<T>(this IObservable<T> source, WorkItemTest workItemTest, Action<T> assertion)
    {
        return source.ObserveOnTestHarness(workItemTest).Subscribe(assertion, () => workItemTest.TestComplete());
    }
}
 
public class TestHarnessScheduler : IScheduler, IDisposable
{
    readonly WorkItemTest workItemTest;
    readonly CompositeDisposable subscriptions;
 
    public TestHarnessScheduler(WorkItemTest workItemTest)
    {
        var completionSubscription =
            Observable.FromEventPattern<TestMethodCompletedEventArgs>(
                h => workItemTest.UnitTestHarness.TestMethodCompleted += h,
                h => workItemTest.UnitTestHarness.TestMethodCompleted -= h)
            .Take(1)
            .Subscribe(_ => Dispose());
 
        this.subscriptions = new CompositeDisposable(completionSubscription);
        this.workItemTest = workItemTest;
    }
 
    public void Dispose()
    {
        subscriptions.Dispose();
    }
 
    public DateTimeOffset Now
    {
        get { return DateTimeOffset.Now; }
    }
 
    public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
    {
        return Schedule(state, dueTime - Now, action);
    }
 
    public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
    {
        if (subscriptions.IsDisposed) return Disposable.Empty;
 
        workItemTest.EnqueueDelay(dueTime);
        return Schedule(state, action);
    }
 
    public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
    {
        if (subscriptions.IsDisposed) return Disposable.Empty;
 
        var cancelToken = new BooleanDisposable();
 
        workItemTest.EnqueueCallback(() =>
        {
            if (!cancelToken.IsDisposed) action(this, state);
        });
 
        subscriptions.Add(cancelToken);
        return Disposable.Create(() => subscriptions.Remove(cancelToken));
    }
}

Richard Szalayさんのコードが非常に素晴らしく、あらゆるケースへのキャンセルに対して完全に考慮されているという感じなので、そのまま持ってきました。実際のところ、テスト用なので「例外発生/TestCompleteが呼ばれる」で実行自体が終了してしまうわけなので、こうもギチギチに考えなくてもいいのではかなー、とか緩いことを思ってしまいますが、まあ、よく出来ているならよく出来ているままに使わさせてもらいます。

メソッド名は、ObserveOnTestHarnessに変更しました。ObserveOnTestだけだと何かイマイチかなー、と思いまして。それと、時間のスケジューリングは、NotSupportedではなくて、EnqueueDelayというのものがあるので、それを使うことにしてみました。それと、ObserveOn -> Subscribe -> onCompletedにTestCompleteが定形文句なので、それらをひとまとめにしたRunAsyncTestを追加。こんな風に書けます。

var req = WebRequest.Create("http://www.google.co.jp/444");
Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
    .RunAsyncTest(this, res => 
        res.ResponseUri.ToString().Is("http://www.google.co.jp/"));

定形文句が減る、つまりうっかりミスで書き忘れて死亡というのがなくなる、というのはいいことです。

通常のMSTestの場合

ところで、もしSilverlight/WP7固有の機能は使っていなくて、WPFでも利用出来るようなコードならば、コードをリンク共有の形でWPF側に持っていってしまって、そこでテスト実行してしまうと非常に楽です。まず第一に、MSTestやNUnitなどの通常のテストフレームワークが使えるため、Visual Studio統合やCIが簡単に行えます。第二に、非同期のテストが(Rxを使った場合)更に簡単になります。

[TestMethod]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/");
    var result = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
        .First(); // First()で同期的に待機して値が取れる。複数の場合はToEnumerable().ToArray()で。
 
    result.ResponseUri.ToString().Is("http://www.google.co.jp/");
}

FirstやToEnumerable.ToArrayにより、同期的に待機することが出来るので、簡単にテストすることができます。通常のコードは同期的待機はすべきではないのですが、こうしたユニットテストの場合は便利に使えます。

じゃあSilverlightのユニットテストでも待機できるのはないか?というと、それはできません。理由はWindows Phone 7で同期APIを実現するたった つの冴えないやり方で書いたのですが、WebRequestなどのネットワーク問い合わせは、一度Dispatcherに積まれて、現在のメソッドを抜けた後に実行開始されるので、テスト実行スレッドで同期的に待って値を取り出すことは不可能なのです。

こういった細部の違いもあるので、コード共有してMSTestでチェックするのは楽でいいのですが、やはりSilverlight/WP7の実際の環境で動かされるユニットテストのほうも必要不可欠かなー、と。どこまでやるか、にもよりますが。

まとめ

Chaining Assertionは便利なので是非試してみてね!

なお、Rxを使うとTestScheduler(時間を好きなように進められる)やITestableObserver(通知の時間と値を記録できる)といった、イベント/非同期のテストを強力に支援する仕組みが備わっているので、それらと併用することで、より簡単に、もしくは今までは不可能だったことを記述できるようになります。それはまた後日そのうち。

SL/WP7のテストは、本当はIDE統合されてるといいんですけどねー。まあ、エミュレータ動かさなければならないので、しょうがないかな、というところもありますけれど。その辺も次期VisualStudioでは改善されるのかされないのか、怪しいところです。現在DeveloperPreviewで出ているVS11は、特に何も手をつけられてる感じがしないので、そのままな可能性はなきにしもあらず。どうなるかしらん。async/awaitが入ることだし、色々変わってくるとは思うんですけれど。

Rxにおける並行非同期実行とリソース処理の問題

非同期(Asynchronous)だの並列(Parallel)だの並行(Concurerrent)だの、よくわからない単語が並びます。ParallelがやりたければPLINQ使うべし、と思うわけですがそれはさておき、Rxを使うと、意図しても意図しなくても、並行な状態にはなります。そして、その意図していないという状態は危うい線を踏んでいるので、きちんと認識しておく必要があります。また、危ういと同時に、Rxはその並行をうまくコントロールするメソッドが揃っているので、覚えておくと世界が一気に広がります。

例えば、こういう非同期メソッドがあるとして。

IObservable<T> AsyncModoki<T>(T value, int second)
{
    // second秒後にvalueを返す非同期処理をシミュレート
    return Observable.Return(value)
        .Delay(TimeSpan.FromSeconds(second));
}
 
static void Main(string[] args)
{
    // 1,2,3,4という入力をすぐに送り込む
    new[] { 1, 2, 3, 4 }
        .ToObservable()
        .SelectMany(x => AsyncModoki(x, 3))
        .Subscribe(x => Console.Write(x + "->"));
 
    Console.ReadLine();
}

Microsoft Silverlight を入手

1~4は、全て同時にリクエストが開始されます。だから、3秒後に同時に結果が表示されます。

同時に実行が開始されているということは、非同期の結果が完了する時間にズレがある場合、結果が前後することがあります。実際、上のものも何度か実行すると毎回結果が変わると思います(Delayは(デフォルトだと)値をThreadPoolに投げて遅延させます。ThreadPoolに入った時点で、順序の保証が消滅する)。というわけで、基本的にSelectManyを使った場合1:1で渡していくわけではなければ、順序は壊れると考えてください。さて、それだと困る場合もあるのではと思いますので、結果の順序を制御する方法が幾つかあります。

Switch

Switchは実に有意義なメソッドで、分かると、SelectMany以上に多用することが多くなるのではと思います。

clickEventObservable // クリック毎に
    .Select(x => AsyncModoki(x, 1)) // 何らかの非同期処理をするとする
    .Switch() // IObservable<IObservable<T>>の状態なので、Switch
    .Subscribe(Console.WriteLine);

Microsoft Silverlight を入手

クリックすると1秒遅延(非同期処理でもしていると考えてください)して、値が表示されます。しかし、1秒以内に次の値がクリックされた場合はキャンセルされ、表示しません。

つまり、最新の値だけを返すことを保証します。それ以前のものはキャンセル(Disposeが呼ばれる)されます。どういう時に使うかというと、例えばインクリメンタルサーチ。L, LI, LIN, LINQと入力が変わる度に非同期リクエストを発生させますが、欲しい結果は最後の一件のみで、次のキー入力があった場合は以前のものはキャンセルして欲しい。キャンセルはともかく、非同期実行だと結果が前後してしまうことだってあります。LINQと入力したのにLIの結果一覧が表示されてしまったら困る。そんな場合に、まさに、うってつけです。そして存外、こういったシチュエーションは多いのではないかと思われます。例えば私の以前作ったUtakotohaというWP7用歌詞表示アプリケーションも、曲のスキップに応じて最新のものだけを表示するために、Switchを利用しました。(コードが激しく酷いのと機能貧弱っぷりなので、そろそろ書き直したい)

Merge/Concat

Switch以外にも色々あります。

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .SelectMany(x => AsyncModoki(x, 1)) // 全て並行実行(最初の例です)
    .Subscribe(x => Console.Write(x + "->"));
 
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1)) // IO<IO<T>>
    .Merge() // こちらも全て並行実行、SelectMany(xs => xs)と同じ
    .Subscribe(x => Console.Write(x + "->"));
 
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Merge(2) // 2件ずつ並行実行する(並行実行数の指定が可能)
    .Subscribe(x => Console.Write(x + "->"));
 
 
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Concat() // 1件ずつ実行する(Merge(1)と同じ)
    .Subscribe(x => Console.Write(x + "->"));

Microsoft Silverlight を入手

ネストはSelectManyで一気に崩してしまうケースが一般的でしょうけれど、IObservable<IObservable<T>>といったネストした状態にすると、選択肢がSwitchもそうですが、更に、MergeとConcatを選択することができます。ちなみに、このintで並行実行数が指定可能なMergeはWP7同梱版のRxには存在しません。残念。(もう一つ余談ですが、SelectManyはRx内部ではSelect(selector).Merge()という実装になっていたりします)

実行タイミングの問題

上のSilverlight、Merge2とMerge2Exの二つを用意しましたが、Merge2Exのほうは4つ同時に表示されるのが確認出来るはずです。コードはほぼ同一なのですが、AsyncModokiを似たようで別なものに差し替えました。

// Merge(2):Ex
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki2(x, 1)) // これが差分
    .Merge(2)
    .Subscribe(x => Console.Write(x + "->"));
 
// スレッドプール上で非同期実行(結果は指定秒数後に返る)のシミュレート
// second秒後にネットワーク問い合わせが返る、的なものをイメージしてみてください
static IObservable<T> AsyncModoki2<T>(T value, int second)
{
    var subject = new AsyncSubject<T>();
 
    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(second)); // 指定秒数待機
        subject.OnNext(value);
        subject.OnCompleted(); // 完了(2つでワンセット)
    });
 
    return subject; // これ自体はすぐに返す(FromAsyncPatternの中身はこんな感じ)
}

このAsyncModoki2は、このメソッドを通ると即座にThreadPoolに送り込んで「実行」しています。Subscribeされるかどうかとは関係なく、Subscribeの「前に」。対してAsyncModokiはSubscribeされないと実行が開始されません。同じようで違う、この二つの状態をRxでは「Hot」と「Cold」と呼んで区別しています。HotはSubscribeとは関係なく動いているもの、イベントなんかはそうですね。ColdはSubscribeされて初めて動き出すもの、Observable.ReturnであったりRangeであったりと、Rxからの生成子の場合は、こちらのパターンが多いです。

実はFromAsyncPatternはHotなので、Subscribeとは関係なく即座に(といっても戻り値はFuncなのでInvokeしたら、ですが)非同期実行が開始されたりします。これは、あまり都合が良くなく(例えば上の例で見たように、MergeはSubscribeのタイミングによって実行数をコントロールしている)、Coldに変換したほうが扱いやすいです。そのためのメソッドがDefer。

static IObservable<WebResponse> AsyncModoki3<T>(WebRequest req)
{
    return Observable.Defer(()=>
        Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse,req.EndGetResponse)());
}

こちらのほうが、大抵の利用シーンにはマッチするかと思われます。

キャンセル時のリソース処理の問題

Switchは実に有意義なのですが、それの行っていることは、次の値を検知すると前の値をキャンセルする、ということです。普段はあまりキャンセルはしないと思うのですが、Switch内部では大量のキャンセルが発生しています。さて、どのような問題が発生するか、というと、例えば……。

using System;
using System.Net;
using System.Reactive.Linq;
 
class Program
{
    static void Main(string[] args)
    {
        // ネットワークの最大接続数。通常、デフォルトは2になっているはず。
        ServicePointManager.DefaultConnectionLimit = 2;
 
        // テキストボックスのTextChangedイベントをイメージした、インクリメンタルサーチで来る文字列群
        new[] { "w", "wi", "wik", "wiki", "wikip", "wikipe", "wikiped", "wikipedi", "wikipedia" }
            .ToObservable()
            .Select((word, id) =>
            {
                // wikipediaのAPIにリクエスト飛ばす
                var url = "http://en.wikipedia.org/w/api.php?action=opensearch&search=" + word + "&format=xml";
                var req = (HttpWebRequest)WebRequest.Create(url);
                req.UserAgent = "test";
 
                return Observable.FromAsyncPattern<WebResponse>((ac, state) =>
                    {
                        Console.WriteLine("ASYNC START:" + id);
                        return req.BeginGetResponse(ac, state);
                    }, ar =>
                    {
                        Console.WriteLine("ASYNC END:" + id);
                        return req.EndGetResponse(ar);
                    })()
                    .Select(res =>
                    {
                        using (res) // ここのセクションが呼ばれることはない
                        {
                            Console.WriteLine("CALLED NEXT:" + id);
                            return "response string:" + id;
                        }
                    });
            })
            .Switch()
            .ForEach(Console.WriteLine); // 終了を待機する形でのSubscribe
    }
}
 
// ConsoleApplication用のコードですが、是非実行してみてください。結果は以下のようになります。
 
ASYNC START:0
ASYNC START:1
ASYNC START:2
ASYNC START:3
ASYNC START:4
ASYNC START:5
ASYNC START:6
ASYNC START:7
ASYNC START:8
ASYNC END:0
ASYNC END:1
// そしてフリーズ...

これは、フリーズします。何故かというと、まず8件の非同期処理が一斉に開始されます(ASYNC STARTの表示)。一斉に開始はされますが、ネットワークの最大接続数は2なので、それ以外のものは内部的には待機されています。そして、Switchによる切り替えは最新のものだけを通すようにするため、7件はキャンセルされます。その後、最初の二件分のネットワークリクエストが終了し(ASYNC ENDの表示)、キャンセルされているためメソッドチェーンの続きであるSelectは呼ばれません。そして、フリーズ。

何故フリーズしてしまうかというと、EndGetResponseで取得した最初の二件のWebResponseが解放されていないためです。キャンセルが呼ばれなければ、Selectを通り、そこでusingにより利用+解放されるのですが、そのセクションを通らなければ何の意味がありません。使われることなく虚空に放り出されたWebResponseが、永遠にネットワーク接続を握ったままになってしまっています。

当然、大問題。

Switchを諦めてSelectMany(全件キャンセルせずに並行実行、どうせネットワーク自体の最大接続数で制限かかっているし)というのも手ではあります。大体の場合は結果は問題ないでしょう。けれど、Switchの利点は何でしたっけ、と。結果が前後しないことです。LINQを検索しようとしていたのに、検索結果が前後したせいでLINQ→LINの順番に結果が得られた結果、表示されるのがLINの結果では困ってしまいます。Switchなら、後に実行したものが必ず最後に来ると保証されるので、そのようなことにはなりません。反面、SelectManyは並行実行のため、前後する可能性が出てきます。Switchはこの例で挙げたような、インクリメンタルサーチのようなものと相性がとても良いんですね。

ではどうするか?

WebResponseのDispose(Close)を呼べれば解決するので、FromAsyncPatternのEnd部分に少し細工を加えてやる、ということが考えられます。

// こんなFromAsyncPatternを用意して
public static IObservable<TResult> SafeFromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
    where TResult : IDisposable
{
    // WP7版ではCreateWithDisposableで(この辺の細かな差異が割とウザい)
    return Observable.Create<TResult>(observer =>
    {
        var disposable = new BooleanDisposable();
 
        Observable.FromAsyncPattern<TResult>(begin, ar =>
        {
            var result = end(ar);
            if (disposable.IsDisposed) result.Dispose(); // キャンセルされてたらDispose
            return result;
        })().Subscribe(observer);
 
        return disposable; // Disposeが呼ばれるとIsDisposedがtrueになる
    });
}
 
// こんな風に使うとか
public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest req)
{
    return ObservableEx.SafeFromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}

これにより、キャンセルされたかどうかをEnd部分で判定することが出来ます。よってEnd時にキャンセルされていたらリソースをDisposeしてしまう(ここでreturnしたオブジェクトは、チェーンは切れているので別に使われることなく虚空を彷徨うだけ)。これにより、FromAsyncPatternがリソースを返し、かつ、いつキャンセルされても問題なくなります。

他にも色々なアプローチが考えられます。CompositeDisposable/MutableDisposable/SingleAssignDisposableなどを使い、Disposeが呼ばれたら同時に管理下のリソースをDisposeしてしまう、といった手法。これは、リソースのDisposeされる瞬間が逆にコントロールしにくくなって、例えばWebResponseですと、その後のStreamを呼んでる最中にWebResponseがDisposeされてしまうなどの自体も起こりうるので、少し厄介に思えました。。リソースを後続に渡すまでは責任を持つ。それ以降はノータッチなので好きにやらせる、利用も解放も後続側が責任を。その方が自然だし、素直な動きになるので、いいかな。

他には、キャンセルを伝搬しないようなメソッドを作り、Disposeが呼ばれてもリソースを受け取れるようにし、後続でリソースをDisposeする、などの手段も考えられます。そうすればSafeFromAsyncPatternなどといった、独自のFromAsyncPatternを作る必要はなく、全てに適用できて汎用性は高いのですが、チェーンでの保証が途切れてしまうのが若干微妙かな、と……。この辺は悩ましいところです。

そもそもWebRequestなら、DisposeでAbortしてしまったほうが、キャンセルらしくていいかもしれない。

public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
{
    return Observable.Create<WebResponse>(observer =>
    {
        Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse,
            ar =>
            {
                try
                {
                    return request.EndGetResponse(ar); // Abort後の場合は例外発生
                }
                catch (WebException ex)
                {
                    if (ex.Status == WebExceptionStatus.RequestCanceled) return null;
                    throw; // キャンセル時以外は再スロー
                }
            })()
            .Subscribe(observer);
        return () => request.Abort(); // Dispose時にこのActionが呼ばれる
    });
}

Disposeが呼ばれるとwebRequest.Abortが呼ばれます。その後にEndGetResponseを呼ぶとRequestCanceledなWebExceptionが発生するので、キャンセルされていたならnullを(どちらにせよ、Dispose済みなので、ここでreturnしたものは次のメソッドチェーンで使われることはない)、そうでない例外ならば再スローを、という方針です。悪くなさそうですが、どうでしょうか。私的にはこれを採用するのがベストかなー、と考え中です。

まとめ

SwitchやMergeなどで、従来扱いにくかった並行処理時の非同期のコントロールが簡単になりました。単純に一本の非同期をSelectManyで摩り替えるだけもアリですけれど、せっかくの多機能なのだから、並行にリクエストなどを飛ばして、より速いアプリケーション作りを目指してもいいかもしれません。同期リクエストをTask.Factory.StartNewで包んで振り回すよりかは、ずっと楽です。また、現在行われているMSのイベントBUILDで発表されたWinRTなどは、完全に非同期主体です。C#5.0でasync/awaitが入り、非同期がより扱いやすくなることで、それに併せてModelの有り様も、同期から非同期へと変わっていき、それにあわせてVMなどの書き方も変わってくるのではかと思われます。

ただ、リソースの問題にだけは気をつけて!上で挙げた問題は、本質的にはFromAsyncPatternに限らず、リソース処理が引き離されている場合の全てで該当します。リソースを扱うのは難しい。とはいえ、全面的に問題になるのは、このFromAsyncPatternぐらいな気はします。Observable.Usingなども用意されているので、不用意にリソースをチェーン間で渡したりしなければ原則的には起こらない。けれど、そのFromAsyncPatternこそがリソースを扱うシチュエーションで最も使われるものなんですよね、とほほほ。

キャンセル(Dispose)を不用意に呼ばなければ問題は起こらないといえば起こらないんですが(そのため、不適切に書いてしまっていても、多くのケースで問題が表面化することはないでしょう)、Switchのようなアプローチが取れなくなるのがどうにも。現状だと、とりあえず気をつけましょう、としか言いようがないので、気をつけましょう。もし何かうまい具合に動かないなあ、と思ったら、この辺を疑ってみると良いかもしれません。

その辺難しいなあ、という場合は、近いうちに私の出すRx拡張ライブラリを使いましょう。特に考えなくても済むよう、色々配慮してあります。いつ出るの?というと、はい、最近ゴリゴリと書いてますんで(ブログがちょっと放置気味だった程度には)、必ず近いうちに出します。

Re:FromEvent vs FromEventPattern

現在のRxで最大の分かりにくいポイントになっているFromEventとFromEventPattern。以前にRxでのイベント変換まとめ - FromEvent vs FromEventPatternとして軽くまとめましたが、改めて、詳細に考えてみたいと思います。なお、ここでいうFromEventPatternはWP7版のRxではFromEventを指しています(ここも分かりにくいポイントです)。そして、ここでいうFromEventはWP7版のRxには未搭載です、あらあらかしこ。

ネタ元ですが、銀の光と藍い空: Silverlight 5 の新機能その3 番外編 DoubleClickTrigger をRxっぽくしてみたを拝見して、FromEventに関して実行時に例外が出るとのことなので、その部分の説明を書こうかと。最初コメント欄に書いたのですが、少しコメントにトチってしまったので、自分のブログに書かせていただきます、どうもすみません……。

FromEventとFromEventPatternの最大の違いは、FromEventがAction<EventArgs>を対象にしていて、FromEventPatternはAction<object, EventArgs>を対象にしているということです。Action<object, EventArgs>は、つまりEventHandler。ではAction<EventArgs>って何なんだよ、というと、通常は存在しません。というのもeventはデリゲートなら何でもアリということに仕様上はなっていますが、慣例としてobject sender, EventArgs eを引数に持つデリゲートを選択しているはずですから。

さて、デリゲート間には同じ引数の型・同じ戻り値の型を持っていても、型自体に互換性がないので(例えばEventHandlerとEventHandler<T>)、FromEventもFromEventPatternも、引数を3つ持つオーバーロードの第一引数は conversionという、デリゲートの型を変換するラムダ式を受け入れるようになっています。よって

Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(
    h => h.Invoke,
    h => AssociatedObject.MouseLeftButtonDown += h,
    h => AssociatedObject.MouseLeftButtonDown -= h);

これが、リフレクションなしで変換出来る形式になります。conversionが不要なオーバーロードもあるのですが(+=と-=を書くだけ)、それはリフレクションを使ってデリゲートの変換をしていて今一つ効率が悪いので、わざわざ+=, -=を書いているのだから、もう一手間かけて、 h => h.Invoke を書いておいたほうがお得です。(もし対象がEventHandler<T>の場合は事情が違ってconversionが不要なので+=と-=だけで済みます、この辺の事情の違いが面倒臭く混乱を招きがちなんですよね…… Rxチームには「便利そうだから」機能を追加する、とかやる前に、もう少し深く考えてくださいと苦情を言いたい)

h => h.Invoke だけで何故変換出来ているのかを詳しく説明します。これは正しく書くのならば

(EventHandler<MouseButtonEventArgs> h) => new MouseButtonEventHandler(h)

が正解です。(左側の型に関しては省略可能ですが、説明用には型を書いていたほうが分かりやすいので明記しておきます、また、この最初からEventHandler<T>であることが、対象がEventHandler<T>である場合はconversionが不要な理由になっています)。ただし、関数を直接渡すとコンパイラがデリゲートの型を変換してくれるため、h => h.Invokeを渡した場合は

h => new MouseButtonEventHandler(h.Invoke)

という風に内部的には自動で整形してくれます。そのため、new MouseButtonEventHandlerを書く手間が省けるということになっています。h と h.Invoke はやってることは完全一緒なのですけど、この辺はコンパイラの仕組みの都合に合わせるという感じで。むしろ仕組みの隙間をついたやり方といいましょうか。

では、FromEventなのですが、まず正しく変換出来る形を見ると

Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
    h => (sender, e) => h(e),
    h => AssociatedObject.MouseLeftButtonDown += h,
    h => AssociatedObject.MouseLeftButtonDown -= h);

です。もし第一引数を省いた場合はAction<EventArgs>を探してリフレクションをかけるようになっていて、そして、通常はそんなイベントを使うことはないので、十中八九例外が出るのではかと思われます(だからこういう混乱を招くだけのオーバーロードを入れるなとRxチームには苦情を言いたい)。

型まで明記すれば

Action<MouseButtonEventArgs> h => (object sender, MouseEventArgs e) => h(e)

となっているわけで、senderを捨ててMouseEventArgsだけを引数に渡す独自のconversionを渡しています。これですが、FromEventPatternであっても

h => (sender, e) => h(sender, e)

とも書けるので(つまるところ h.Invoke って何かといえば (sender, e) => h(sender,e) なのです)、それのsender抜きバージョンを渡しているということになります。

わかりづらい?例えば

.Subscribe(Console.WriteLine)
.Subscribe(x => Console.WriteLine(x))

この二つはやってること一緒なんですよ、ということですね。ラムダ式が入れ子になるとワケガワカラナイ度が加速されるので、私は関数型言語erにはなれないな、と思ったり思わなかったり。

まとめ

ただたんに使うにあたっては、こんなことは知ってる必要はなくh => h.Invoke と h => (sender, e) => h(e) を定型句だと思って暗記してもらうだけで十分です。はい。本来は、こういう部分はちゃんと隠蔽されてたほうがいいんですけれど、まあ、C#の限界としてはそうはいかないというとこですね(F#だとイベントがもう少し扱いやすいんですが)。

また、FromEventにせよFromEventPatternにせよFromAsyncPatternにせよ、実際に使うコードに直接書いてくにはノイズが多すぎるので、Rxでのイベント変換まとめ - FromEvent vs FromEventPatternで書いたように、拡張メソッドに隔離するのを私はお薦めしています。そうこうして裏側で地道に努力することでF#とC#の壁を縮める!とかなんとかかんとか。

ReactiveProperty : Rx + MVVMへの試み

Reactive Extensionsといったら非同期、じゃなくて、その前にイベントですよ!イベント!というわけで、随分手薄になっていたイベント周りの話を増強したいこの頃です。イベントと一口に言っても色々あります。UI(クリックやマウスムーブ)、センサー、変更通知(INotifyPropertyChanged)などなど。中でも一番よく使うのは、UI周りのイベントでしょう。

しかし、UIの持つTextChangedイベントだのから直接FromEventPatternで変換してしまったら、Viewと密接に結びついてしまってよろしくない。ここはMVVM的にやりましょう。でも、どうやって?

View(UI)が持つネイティブなイベントを、ViewModelの持つ更新通知付きのプロパティに変換します。これはバインディングにより可能です。そこはWPF/SLの仕組みに任せましょう。ということで、RxでUIに対してプログラミングするというのは、ViewModelの通知に対してプログラミングするという形になります。

テキストボックスの変更に反応して、1秒ディレイをかけた後に表示する、という簡単な例を(何の面白みもありません、すみません)

Microsoft Silverlight を入手

public class ToaruViewModel : INotifyPropertyChanged
{
    private string input;
    public string Input
    {
        get { return input; }
        set { input = value; RaiseEvent("Input"); }
    }
 
    private string output;
    public string Output
    {
        get { return output; }
        set { output = value; RaiseEvent("Output"); }
    }
 
    public ToaruViewModel()
    {
        Observable.FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                h => (sender, e) => h(e),
                h => this.PropertyChanged += h, h => this.PropertyChanged -= h)
            .Where(e => e.PropertyName == "Input") // Inputが更新されたら
            .Select(_ => Input) // Inputの値を
            .Delay(TimeSpan.FromSeconds(1)) // 1秒遅らせて
            .ObserveOnDispatcher() // Dispatcherで(Silverlightではこれ必要・WPFでは不要)
            .Subscribe(s => Output = "入力が1秒後に表示される:" + s); // Outputへ代入
    }
 
    // この辺は別途、ライブラリを使って持ってくるほうが良いかも
    public void RaiseEvent(string propertyName)
    {
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
    }
 
    public event PropertyChangedEventHandler PropertyChanged;
}
 
// xaml.csはInitializeだけ、xamlのバインディングは各プロパティへ当てるだけ。
// ただしSL/WP7はUpdateSourceTrigger=PropertyChangedに対応してないので別途Behaviorの適用が必要
// 詳しくは、最後にソース配布(WPF/SL/WP7全て含む)URLを置いているのでそちらを見てください

……実にダサい。はい。全くいけてないです。バインディング可能なのはプロパティなので、そういった中間レイヤへの中継が発生していて、冗長だし、美味しさがかなり損なわれています。わかりきったINotifyPropertyChangedのWhere, Selectは無駄そのもので。勿論、簡単にDelayを混ぜられるといった時間の扱いの容易さはRxならでは、ではあるのですけれど。

ReactiveProperty

中継が手間ならば、中間レイヤだけを抜き出してやればいい。通知処理を内包したIObservable<T>があれば解決する。というわけで、ReactivePropertyと名付けたものを作りました。それを使うと、こうなります。

public class SampleViewModel : INotifyPropertyChanged
{
    public ReactiveProperty<string> ReactiveIn { get; private set; }
    public ReactiveProperty<string> ReactiveOut { get; private set; }
 
    public SampleViewModel()
    {
        // UIから入力されるものはnewで作成、デフォルト値も同時に指定出来る。
        ReactiveIn = new ReactiveProperty<string>(_ => RaiseEvent("ReactiveIn"), "でふぉると");
 
        // UIへ出力するIO<T>はToReactivePropertyで、初期値での発火も自動的にされます。
        ReactiveOut = ReactiveIn
            .Delay(TimeSpan.FromSeconds(1))
            .Select(s => "入力が1秒後に表示される:" + s)
            .ToReactiveProperty(_ => RaiseEvent("ReactiveOut"));
    }
 
    // 通常は、他のMVVMフレームワークなりを使い、それの更新通知システムを利用するといいでしょう
    // Rxを使ったからって、決してMVVMフレームワークと競合するわけではなく、むしろ協調すると考えてください
    public void RaiseEvent(string propertyName)
    {
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
    }
 
    public event PropertyChangedEventHandler PropertyChanged;
}
// これはWPF版のもの
<Window x:Class="ReactiveProperty.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:l="clr-namespace:ReactiveProperty"
        Title="MainWindow" Height="350" Width="525">
    <Window.DataContext>
        <l:SampleViewModel />
    </Window.DataContext>
    <StackPanel>
        <TextBox Text="{Binding ReactiveIn.Value, UpdateSourceTrigger=PropertyChanged}" />
        <TextBlock Text="{Binding ReactiveOut.Value}" />
    </StackPanel>
</Window>

XAMLではPathに必ず.Valueまで指定します。これによりGetが求められれば最新の値を返し、値をSetされればPushするようになります。

今回はUI->ReactiveProperty->クエリ演算->ReactiveProperty->UIという風に戻してやりましたが、勿論、UIからの入力をModelに流してそれで止めてもいいし、Modelからの値をUIに流すだけでもいいし、トリガーはタイマーであってもいいし、その辺は完全に自由です。普通の通知プロパティと何も変わりません。また普通のプロパティとして使いたい時は.Valueで値を取り出す/セットできます。

かなりシンプルに仕上がります。通知付きプロパティは、本質的に値の変更毎に通知される無限長のIObservable<T>と見なせるので、そのことにより表現がより自然になっています。書き味も、リアクティブプログラミング(といわれてパッと浮かばれる値が自動更新されるという奴)にかなり近い感じの風合い。XAMLでのバインドも簡単ですし、VMの実装も自動実装プロパティだけで書けるので記述が楽チン。

そして、Rxを使うことによる最大の利点である、他のイベント(他の変更通知プロパティ)と合成しやすかったり、時間が扱いやすくなったり、非同期と混ぜても同じように扱えたり、スレッドの切り替えが簡単であったり、などを最大限に甘受できます。VMとして独立している、かつ全てがRxに乗っているため、単体テストも非常に作成しやすい状態です(時間軸を扱う処理のテストは通常難しいのですが、Rxの場合は自分で時間をコントロール可能なSchedulerを中間に挟むと、好きなように時間を進められるようになります、イベントのテストも、この状態ならばプロパティを変更するだけで生成されますし)。また、決して他のMVVMフレームワークと競合が起こるわけではない(多分……)のも見逃せない利点です。

単純な例なのでModelがありませんが、まあこんな感じ?(それと今はコマンドがないので単純なデータバインドのみの図です)。Modelへのアクセスは通常恐らくRx:Query内で行い、Modelの形態は色々だと思いますが、通信してデータを処理して返す、みたいなものはRxになっているとVMのReactiveProperty側での合成処理が容易なので、非同期にしてIObservable<T>で返すと良いのではかと思います。自身が通知を持つReactivePropertyになっていてもいいですね。そうなると、コードのほとんどがLINQになるという素敵な夢が見れる気がしますが気のせいです。

実装

ReactivePropertyの実装はこんな感じです。ご自由にコピペって使ってみてください。

using System;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive.Linq;
using System.Reactive.Subjects;
#endif
 
public class ReactiveProperty<T> : IObservable<T>, IDisposable
{
    T latestValue;
    IObservable<T> source;
    Subject<T> anotherTrigger = new Subject<T>();
    IDisposable sourceDisposable;
 
    public ReactiveProperty(Action<T> propertyChanged, T initialValue = default(T))
        : this(Observable.Never<T>(), propertyChanged, initialValue)
    { }
 
    public ReactiveProperty(IObservable<T> source, Action<T> propertyChanged, T initialValue = default(T))
    {
        this.latestValue = initialValue;
 
        var merge = source.Merge(anotherTrigger)
            .DistinctUntilChanged()
            .Publish(initialValue);
        this.sourceDisposable = merge.Connect();
 
        // PropertyChangedの発火はUIスレッドで行うことにする
        // UIへの反映の際に、WPFでは問題ないが、SL/WP7ではUIスレッドから発行しないと例外が出るため
        merge.ObserveOnDispatcher().Subscribe(x =>
        {
            latestValue = x;
            propertyChanged(x);
        });
 
        this.source = merge;
    }
 
    public T Value
    {
        get
        {
            return latestValue;
        }
        set
        {
            latestValue = value;
            anotherTrigger.OnNext(value);
        }
    }
 
    public IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(observer);
    }
 
    public void Dispose()
    {
        sourceDisposable.Dispose();
    }
}
 
// 拡張メソッド
public static class ObservableExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, Action<T> propertyChanged, T initialValue = default(T))
    {
        return new ReactiveProperty<T>(source, propertyChanged, initialValue);
    }
}

Valueで値の中継をしているという、それだけです。Publish(value)はBehaviorSubjectというものを使った分配で、必ず最新の値一つをキャッシュとして持っていて、Subscribeされると同時に、まずその値で通知してくれます。これにより「初期値での自動発火」が自然に行える、という仕組みになっています。また、プロパティの変更時に同値の場合は変更通知をしない、というよくあるほぼ必須処理も、ここでDistinctUntilChangedを挟んで行っています(オプションで選択制にしてもいいかもしれない)。

それReactiveUI?

ReactiveUIというRxを前提にしたMVVMフレームワークがあって、それに用意されているObservableAsPropertyHelperと、ReactivePropertyはかなり近いです(ということにプロトタイプ作ってから気づいた、ReactiveUIはこれまで名前は知ってたけど中身完全ノーチェックだったので)。ただ、機能的にはOAPHは双方向バインディングに対応していないので、ReactivePropertyのほうが上です。また、OAPHは使い勝手もあまり良くないし、名前がダサい(ObservableAsPropertyHelperは長すぎるし型名として宣言させるにはイマイチに思える……)などなどで、あまり気に入るものではなかったです。

ReactiveUIは全体的には軽く眺めた程度なのですが、今ひとつ私には合わない。ちょっと、いや、かなり気にいらない。なので、私としてはそのうち他のMVVMライブラリをベースに置いた上での拡張として、Rx用のUI周りライブラリを作りたい。独自に上から下まで面倒を見るフレームワーク、という指針は今一つに思えるので、Rxならではの特異な部分だけを、最初から他のMVVMフレームワークの拡張として用意していく、という方向性のほうが良いものが作れると思っています。素のままのRxでは辛いので、何かしらの中間層が必要なのは間違いないので。

次は、ReactiveCommandを!あー、あとReactiveCollectionも必要かしら。Validationとかも……。まあ、そういうところは普通に書けばいいんですよ、何も全部Rxでやる必要はないですからね。

まとめ

WPFのバインディングの美味しさをRxで更に美味しくする、ということでした。世の中的には弱参照が~などなどというお話もありますが、それには全然追いついてませんので、おいおいちかぢかそのうち。

今回のコードの全体(WPF/SL/WP7)はneuecc / ReactiveProperty /Bitbucketに置いてありますので、好きに見てください。例が単純すぎると美味しさもよくわからないので、もう少し複雑な例で、サンプル準備中なのでしばしお待ちを。

ところで9/15にいよいよRx本が出ます。

オライリーで出ているProgramming C#の著者と、ReactiveUIの作者(元Microsoft Office Labs、つい最近Githubに転職した模様)の共著です。私も買いますので、うーん、読書会とかやったら来てくれる方います?

文字列を先頭から見て同じところまで除去をlinq.jsとC#で解いてみた

JavaScript で「文字列を先頭から見て同じところまで除去」をやってみました。という記事を見て、「linq.js を使いたかったのですが使いどころがパッと思い浮かびませんでした」とのことなので、linq.js - LINQ for JavaScriptで答えてみます。お題の元はお題:文字列を先頭から見て同じところまで除去からです。解き方も色々あると思いますが、最長の一致する文字を見つけて、それを元に文字列を削除していく、という方法を取ることにしました。

function dropStartsSame(array)
{
    var seq = Enumerable.From(array);
    return Enumerable.From(seq.First())
        .Scan("$+$$")
        .TakeWhile(function (x) { return seq.All(function (y) { return y.indexOf(x) == 0 }) })
        .Insert(0, [""]) // 一つもマッチしなかった場合のため
        .TakeFromLast(1)
        .SelectMany(function (x) { return seq.Select(function (y) { return y.substring(x.length) }) });
}
 
dropStartsSame(["abcdef", "abc123"]).WriteLine();
dropStartsSame(["あいうえお", "あいさんさん", "あいどる"]).WriteLine();
dropStartsSame(["12345", "67890", "12abc"]).WriteLine();

はい、ワンライナーで書けました、って何だか意味不明ですね!まず、例えば”abcdef”から[”a”,”ab”,”abc”,”abcd”,”abcde”,”abcdef”]を作ります。これはものすごく簡単で、Scanを使うだけです。

// ["a","ab","abc","abcd","abcde","abcdef"]
Enumerable.From("abcdef").Scan("$+$$")

素晴らしい!そうして比較のタネができたら、あとは全てのindexOfが0(先頭に一致する)の間だけ取得(TakeWhile)します。[”abcdef”,”abc123″]だとシーケンスは[”a”,”ab”,”abc”]に絞られます。必要なのは最長のもの一つだけなのでTakeFromLast(1)で最後のものだけを取得。もし一つもマッチしなかった場合は代わりに”"が通るようにInsertで事前に先頭にさしてやってます。あとは、その”abc”を元にして文字列を置換したシーケンスを返してやるようにすればいい、というわけです、はい。

少し修正

SelectManyで繋げるのは悪趣味なので、ちょっと変えましょう。

function dropStartsSame(array)
{
    var seq = Enumerable.From(array);
    var pre = Enumerable.From(seq.First())
        .Scan("$+$$")
        .TakeWhile(function (x) { return seq.All(function (y) { return y.indexOf(x) == 0 }) })
        .LastOrDefault("");
 
    return seq.Select(function (x) { return x.substring(pre.length) });
}

変数を一つ置いてやるだけで随分とすっきり。無理に全部繋げるのはよくないね、という当たり前の話でした。

C# + Ix

C#とIxで書くとこうなるかな?基本的には同じです。(Ixって何?という人はneue cc - LINQ to Objects & Interactive Extensions & linq.js 全メソッド概説を参照ください)

static IEnumerable<string> DropStartsSame(params string[] args)
{
    var pre = args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .LastOrDefault() ?? "";
    return args.Select(x => x.Substring(pre.Length));
}
 
static void Main()
{
    var x = DropStartsSame("abcdef", "abc123").SequenceEqual(new[] { "def", "123" });
    var y = DropStartsSame("あいうえお", "あいさんさん", "あいどる").SequenceEqual(new[] { "うえお", "さんさん", "どる" });
    var z = DropStartsSame("12345", "67890", "12abc").SequenceEqual(new[] { "12345", "67890", "12abc" });
 
    Console.WriteLine(x == y == z == true);
}

Ixで使ってるのはScanだけですけれど。

Deferの使い道

ところで、上のコードは遅延評価なのか遅延評価でないのか、微妙な感じです。preの計算までは即時で、その後は遅延されています。まるごと遅延したい場合はIxのDeferというメソッドが使えます。

// Deferで生成を遅延する
static IEnumerable<string> DropStartsSame2(params string[] args)
{
    return EnumerableEx.Defer(() =>
    {
        var pre = args.First()
            .Scan("", (x, y) => x + y)
            .TakeWhile(x => args.All(y => y.StartsWith(x)))
            .LastOrDefault() ?? "";
        return args.Select(x => x.Substring(pre.Length));
    });
}
 
// もしくはyield returnを使ってしまうという手も私はよく使っていました
static IEnumerable<string> DropStartsSame3(params string[] args)
{
    var pre = args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .LastOrDefault() ?? "";
    var query = args.Select(x => x.Substring(pre.Length));
 
    foreach (var item in query) yield return item;
}
 
// 勿論、全部LINQで組んでしまってもOK
static IEnumerable<string> DropStartsSame4(params string[] args)
{
    return args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .StartWith("") // linq.jsではInsert(0, [])でした
        .TakeLast(1) // linq.jsではTakeFromLastでした
        .SelectMany(x => args.Select(y => y.Substring(x.Length)));
}

私はIx以前はyield returnを結構よく使ってました。今は、Deferのほうが、例えば if(args == null) throw new ArgumentNullException(); とかがそのまま書けるのでDeferを選びたいかも。この辺の評価タイミングの話は前回、詳説Ix Share/Memoize/Publish編(もしくはyield returnの注意点)で書きました。

まとめ

というわけで、Scanの使い方でした。Scan可愛いよScan。ようするにAggregateの計算途中も列挙する版なわけなので、これ、標準クエリ演算子にも入って欲しかったなあ。結構使えるシーン多いです。

ああ、あとJavaScriptでもforなんて使いません(キリッ。linq.jsは真面目に普通に多機能なので遅い、じゃなくて、いや、それはまあ事実なんですが、便利には違いないです。他の普通のコレクションライブラリじゃ出来ないことも平然と出来ます。でもかわりに(ry

詳説Ix Share/Memoize/Publish編(もしくはyield returnの注意点)

LINQ to Objects & Interactive Extensions & linq.js 全メソッド概説でのIxのPublishの説明がおざなりだったので、一族について詳しく説明したいと思います(Ixの詳細・入手方法などは先の全メソッド概説のほうを参照ください)。ええ、これらはIBuffer<T>を返すという一族郎党だという共通点があります。なので、並べてみれば分かりやすい、かも?挙動を検証するためのコードは後で出しますので、ひとまず先に図をどうぞ。星がGetEnumerator、丸がMoveNext(+Current)、矢印線が二つ目の列挙子を取得したタイミングを表しています。

これだけじゃよく分かりません。と、いうわけで、少しだけ上の図を頭に入れて、以下の解説をどうぞ。

Memoize

Memoizeはメモ化。メモイズ。メモライズじゃないっぽいです。二度目三度目の列挙時は値をキャッシュから返すので、前段がリソースから取得したり複雑な計算処理をしていたりで重たい場合に有効、という使い道が考えられます。とりあえずEnumeratorを取ってきて、それでじっくり動きを観察してみましょう。

// ShareやMemoizeやPublishの戻り値の型はIBuffer<T>で、これはIDisposableです。
// もしリソースを抱えるものに適用するなら、usingしたほうがいいかも?
using (var m = Enumerable.Range(1, int.MaxValue)
    .Do(x => Console.WriteLine("spy:" + x)) // 元シーケンスが列挙される確認
    .Memoize())
{
    var e1 = m.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); // spy:1,spy:2,spy:3
    Console.WriteLine(e1.Current); // 3
 
    var e2 = m.GetEnumerator();
    e2.MoveNext(); // キャッシュから返されるので元シーケンスは不動
    Console.WriteLine(e2.Current); // 1
 
    e2.MoveNext(); e2.MoveNext(); e2.MoveNext(); // spy:4
    Console.WriteLine(e2.Current); // 4
 
    e1.MoveNext(); // 今度はe2が先行したので、こちらもキャッシュから
    Console.WriteLine(e1.Current); // 4
}

特に考える必要もなく分かりやすい内容です。オーバーロードにはreaderCountを受けるものがありますが、それの挙動は以下のようになります。

// Memoizeの第二引数はreaderCount(省略した場合は無限個)
using (var m = Enumerable.Range(1, int.MaxValue).Memoize(2))
{
    // readerCountはEnumeratorの取得可能数を表す
    var e1 = m.GetEnumerator();
    e1.MoveNext(); // MoveNextするとMemoizeに登録される
    var e2 = m.GetEnumerator();
 
    var e3 = m.GetEnumerator();
    e3.MoveNext();
 
    e2.MoveNext(); // 3つめはNGなので例外が飛んで来る
}

これを使うと、キャッシュから値が随時削除されていくので、メモ化しつつ巨大なシーケンスを取り扱いたい場合には、メモリ節約で有効なこともあるかもしれません。とはいっても、普段はあまり考えなくてもいいかな?毎回削除処理が入るので、実行時間は当然ながら遅くなります。

Share

Shareは列挙子を共有します。

using (var s = Enumerable.Range(1, int.MaxValue).Share())
{
    var e1 = s.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 3
 
    var e2 = s.GetEnumerator(); // 列挙子が共有されているので3からスタート
    Console.WriteLine(e2.Current); // 0。共有されるといっても、MoveNext前の値は不定です
    e2.MoveNext();
    Console.WriteLine(e2.Current); // 4。正しく初期値といえるのはここ
 
    Console.WriteLine(e1.Current); // 3。e1とe2は同じものを共有していますが、Currentの値はMoveNextしない限りは変わらない
 
    e1.MoveNext();
    Console.WriteLine(e1.Current); // 5。列挙子を共有しているので、e2の続き
}

といった形です。これの何が美味しいの?というと、例えば自分自身と結合させると、隣り合った値とくっつけられます。

// Share, Memoize, Publishにはselectorを受けるオーバーロードがある
// このselectorには var xs = source.Share() の xsが渡される
// つまり、一度外部変数に置かなくてもよいという仕組み、Zipなどと相性が良い
 
// 結果は {x = 1, y = 2}, { x = 3, y = 4}, { x = 5, y = 6}
// 列挙子を共有して自分自身と結合するので、隣り合った値とくっつく
Enumerable.Range(1, 6)
    .Share(xs => xs.Zip(xs, (x, y) => new { x, y }))
    .ForEach(x => Console.WriteLine(x));

なんだか、へぇー、という感じの動き。このShareを使うとstringの配列からDictionary<string, string>への変換 まとめ - かずきのBlog@Hatenaのコードは

// {"1":"one"}, {"2":"two"}
var array = new[] { "1", "one", "2", "two" };
var dict = array.Share(xs => xs.Zip(xs, Tuple.Create))
    .ToDictionary(t => t.Item1, t => t.Item2);

物凄くシンプルになります。ループを回すなんて、やはり原始人のやることでしたね!

Publish

PublishはRxでは値を分散させましたが、Ixでも分散です。ただ、挙動にはかなりクセがあり、あまりお薦め出来ないというか……。動きとしては、取得時には共有された列挙子から流れてくるのでShareのようであり、列挙子取得後は全て必ず同じ値が返ってくることからMemoizeのようでもある。

using (var p = Enumerable.Range(1, int.MaxValue).Publish())
{
    var e1 = p.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 3
 
    var e2 = p.GetEnumerator(); // 取得時は列挙子の状態が共有されているので3からスタート
    Console.WriteLine(e2.Current); // 0。 共有されるといっても、MoveNext前の値はやはり不定
    e2.MoveNext();
    Console.WriteLine(e2.Current); // 4。正しく初期値といえるのはここ
 
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 8
 
    e2.MoveNext(); // 取得後の状態はそれぞれ別、またキャッシュから返される
    Console.WriteLine(e2.Current); // 5
}

このPublish、こうして生イテレータを操作している分には理解できますが、普通に使うように演算子を組み合わせると予測不能の挙動になります。例えば

// 自分自身と結合、GetEnumeratorのタイミングが同じなので同値で結合される
// {1,1},{2,2},{3,3},{4,4},{5,5}
Enumerable.Range(1, 5)
    .Publish(xs => xs.Zip(xs, (x, y) => new { x, y }))
    .ForEach(a => Console.WriteLine(a));
 
// もし後者のほうをSkip(1)したらこうなります
// {1,3},{2,4},{3,5}
Enumerable.Range(1, 5)
    .Publish(xs => xs.Zip(xs.Skip(1), (x, y) => new { x, y }))
    .ForEach(a => Console.WriteLine(a));

Skip(1)すると {1,2},{2,3}… ではなくて {1,3},{2,4}… になる理由、すぐにティンと来ますか?正直私はわけがわかりませんでした。Zipの実装を見ながら考えると、少し分かりやすくなります。

static IEnumerable<TR> Zip<T1, T2, TR>(this IEnumerable<T1> source1, IEnumerable<T2> source2, Func<T1, T2, TR> selector)
{
    using (var e1 = source1.GetEnumerator())
    using (var e2 = source2.GetEnumerator())
    {
        while (e1.MoveNext() && e2.MoveNext())
        {
            yield return selector(e1.Current, e2.Current);
        }
    }
}

Skip(1)のない、そのままZipで結合したものはEnumeratorを取得するタイミングは同じなので、 {1,1},{2,2}… になるのは妥当です。では、source2がSkip(1)である場合は、というと、source2.GetEnumeratorの時点で取得されるのはSkip(1)のEnumeratorであり、Publishで分配されているEnumeratorはまだ取得開始されていません。では、いつPublishされているEnumeratorを取得するか、というと、これは最初にe2.MoveNextが呼ばれたときになります。なので、e1.MoveNextにより一回列挙されているから、e2の(MoveNext済みでの)初期値は2。更にSkip(1)するので、{1,3},{2,4}… という結果が導かれます。

ZipやSkipなど、他のメソッドを組み合わせるなら、それらの内部をきっちり知らなければ挙動が導けないという、ものすごく危うさを抱えているので、Publishを上手く活用するのは難しい印象です、今のところ、私には。もともとPublishはRxに分配のためのメソッドとして存在して、その鏡としてIxにも移植されているという出自なのですが、どうしてもPull型で実現するには不向きなため、不自然な挙動となってしまっています。分配はPull(Ix)じゃなくてPush(Rx)のほうが向いている、というわけで、分配したいのならToObservableして、Observable側のPublishを使ったほうが、素直な動きをして良いと思います。

yield returnを突っつく

MemoizeのreaderCountの例でもそうでしたが、Publish/Memoizeされている列挙子を取得するのがGetEnumerator時ではなくて最初のMoveNextの時、になるのはyield returnを使うとそういう挙動で実装されるからです。例えば

static IEnumerable<T> Hide<T>(this IEnumerable<T> source)
{
    Console.WriteLine("列挙前");
    using (var e = source.GetEnumerator()) // 通常は、foreachを使いますが。
    {
        while (e.MoveNext()) yield return e.Current;
    }
    Console.WriteLine("Dispose済み");
} // yield break
 
static void Main(string[] args)
{
    var e = Enumerable.Repeat("hoge", 1).Hide().GetEnumerator(); // ここではまだ何も起こらない
    e.MoveNext(); // 列挙前 ← ここでメソッド冒頭からyield return e.Currentのところまで移動
    Console.WriteLine(e.Current); // hoge
    e.MoveNext(); // Dispose済み ← 最終行まで到達して終了
}

イテレータの自動実装でメソッド本文を動きだすのは、最初のMoveNextから、というわけです。また、イテレータ内でusingなどリソースを掴む実装をしている場合は、普通にブロックを(using,lock,try-finally)超えた時に解放されます。ただし、ちゃんとブロックを超えるまでMoveNextを呼びきれる保証なんてない(例外が発生したり)ので、GetEnumeratorする時はusingも忘れずに、は大原則です。using、つまりEnumeratorをDisposeすると、using,lock,finallyがメソッド本文中で呼ばれていなかった場合は、呼ぶようになってます。

ところで、本文が動き出すのは最初のMoveNextから、であることが困る場合もあります。例えば引数チェック。

public static IEnumerable<string> Hoge(string arg)
{
    if (arg == null) throw new ArgumentNullException();
 
    yield return arg;
}
 
void Main(string[] args)
{
    var hoge = Hoge(null); // ここでは何も起こらない!
    hoge.GetEnumerator().MoveNext(); // ArgumentNullException発生
}

nullチェックはメソッドに渡したその時にチェックして欲しいわけで、これではタイミングが違って良くない。これを回避するにはどうすればいいか、というと

// 先にnullチェックを済ませて普通にreturn
public static IEnumerable<string> Hoge(string arg)
{
    if (arg == null) throw new ArgumentNullException();
 
    return HogeCore(arg);
}
 
// privateなメソッドで、こちらにyield returnで本体を書く
private static IEnumerable<string> HogeCore(string arg)
{
    yield return arg;
}
 
static void Main(string[] args)
{
    var hoge = Hoge(null); // 例外発生!
}

こうすれば、完璧な引数チェックの完成。実際に、LINQ to Objectsの実装はそうなっています。この時のprivateメソッドの命名には若干困りますが、私は今のところXxxCoreという形で書くようにしてます。MicrosoftのEnumerable.csではXxxIteratorという命名のようですね。また、Ixを覗くとXxx_という名前を使っている感じ。みんなバラバラなので好きな命名でいいのではかと。

なお、こんなことのためだけにメソッドを分割しなければならないというのは、無駄だしバッドノウハウ的な話なので、かなり嫌いです。インラインにラムダ式でyield returnが使えればこんなことにはならないんだけれどなー、チラチラ(次期C#に期待)

まとめ

再度、冒頭の図を眺め直してもらうと、ああ、なるほどそういうことね、と分かりますでしょうか?

とはいえ、ShareもMemoizeもPublishもあんま使うことはないかと思いますぶっちゃけ!Memoizeは、使いたいシチュエーションは確かに多いのですけれど、しかし事前にToArrayしちゃってたりしちゃうことのほうが多いかなー、と。Shareは面白いんだけど使いどころを選ぶ。Publishは挙動が読みきれなくなりがちなので、避けたほうがいいと思います。

Prev |

Search/Archive

Category

Profile


Yoshifumi Kawai
Microsoft MVP for Visual C#

April 2011
|
March 2012

Twitter:@neuecc