Archive - Rx

Amazon Kinesis + Reactive Extensionsによる簡易CEP

AWSのAmazon Kinesis!大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス。うーん、いかにもわくわくしそうなキーワードが並んでいます。そしてついに先日、東京リージョンでも利用可能になったということでAWS Summitの最中もwktkして、どうやって利用したもんかと考えてました。だって、リアルタイムにイベントデータが流れてくる→オブザーバブルシーケンス→Reactive Extensions(Rx)、という連想になるのは自然なことですよね?

Kinesisとは

Rx、の前にKinesisとは。【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理を見れば事足りますが、表現するなら土管、ですかね。イベントデータの。以下ぽんち絵

Streamの中はShardという単位で分かれていて、データを放り込む時はPartitionKeyを元に、どのShardに突っ込まれるか決まる。読み書き性能自体は完全にShardの数で決まっていて、1シャード毎にWriteは1MB/sec - 1000Req/sec, Readは2MB/sec - 5Req/secとなってます。事前に負荷状況を予測していくのと、随時、Split(Shardの分割)とMerge(Shardの統合)してスケーリングしていく、って感じですかねえ。API自体は単純で、あんま数もないので簡単に理解できるかと。

APIが単純なのはやれることが少ないから。土管。情報を左から右に流すだけのパイプ。その代わり入力は限りなく無限にスケールしていく(Shardを増やしまくれば)。では出力は?というと、Kinesis Applicationとよばれる、といっても実体は、別にAPIをほぼほぼポーリングで叩いてデータ取り出して何か処理するものをそう呼んでるだけ。で、取り出すのはAPI叩いて保存されたデータを読むだけ。

そう、ポーリング。Kinesis自体は一時保管所であって、本当のリアルタイムでPubSub配信するわけじゃあない(用途としては問題ないレベルで低遅延にはなるけれど)。保存時間は24時間で、その間はStream中のどこから(最初からでも最新からでも任意の位置から)でも取り出すことができる。一時保管所がわりにS3を使ったりすると、ゴミは貯まるしどこまで取ったかとか煩わしくなるけれど、Kinesisの場合はStreamの形状になっているのでとてもやりやすい。ただしKinesisは制限として1レコード辺り50KBまで。更にHTTPで投げる際にBase64になってブヨっと膨らむ。

ObservableKinesisClient

C#でKinesisを使うには、AWS SDK for .NETを使えばAmazonKinesisClient入ってます。ソースコードも公開されてるしNuGetでも入れられるし、APIはとりあえずAsyncに対応してるし、APIデザインもちょっと奇妙なところもあるけれど、一応全て統一されたモデルでデザインされてるので、割と結構良いと思ってます。

Kinesis、データの登録はPutRecordでバイナリ投げるだけなので単純なのですが、取り出しの方はいささか面倒で、DescribeStreamによるStream内のShard情報の取得、GetShardIteratorによるShardIterator(どの位置から取得開始するか、の情報)の取得、それを元にGetRecord、そして延々とポーリングのためのループ。と、繰り返す必要があります。

というわけかで、まずは利用例の方を。

// とりあえずAWSのキーと、ストリーム名で生成する感じ
var client = new ObservableKinesisClient("awsAccessId", "awsSecretAccessKey", RegionEndpoint.APNortheast1, streamName: "KinesisTest");
 
// データの登録。オブジェクトを投げ込むとJSONシリアライズしたのを叩き込む。
await client.PutRecordAsync(new { Date = DateTime.Now, Value = "ほげほげほげほげ" });
 
// ObserveRecordDynamicでJSONのストリームとして購読できる
client.ObserveRecordDynamic()
    .Where(x => x.Value != "ほげ") // xはdynamicなのでどんなSchemaのJSONも自由に辿れる
    .Select(x => x.Date + ":" + x.Value)
    .Subscribe(Console.WriteLine);

はい。ObserveRecordDynamicで、リアルタイムに流れてくるデータを簡単に購読できます。IObservableなので、Rxによって自由にクエリを書くことが可能。また、何のデータが流れてくるか分からないストリームのために、JSONはdynamicの形でデシリアライズされています。(IntelliSenseの補助は効きませんが)スキーマレスに、あらゆるデータをRxで処理できます。もちろん、型付けされたものが欲しければObserverRecord<T>を、今は実装してないですが、まあ簡単につくれます:)

以下ObservableKinesisClient本体。

// JSON.NET, AWSSDK, Rx-Mainの参照が必要
public class ObservableKinesisClient
{
    readonly UTF8Encoding encoding = new UTF8Encoding(false);
    readonly JsonSerializer serializer = new JsonSerializer() { Formatting = Newtonsoft.Json.Formatting.None }; // ThreadSafeだよ
    readonly string streamName;
    readonly AmazonKinesisClient kinesis; // ThreadSafeなのかは知らない(ぉぃ
 
    // コンストラクタはもっとまぢめにやりましょう
    public ObservableKinesisClient(string awsAccessId, string awsSecretAccessKey, RegionEndpoint endPoint, string streamName)
    {
        this.kinesis = new AmazonKinesisClient(awsAccessId, awsSecretAccessKey, endPoint);
        this.streamName = streamName;
    }
 
    // ようするにObjectを1レコードずつJSONで突っ込むもの
    public async Task<PutRecordResponse> PutRecordAsync(object value)
    {
        using (var ms = new MemoryStream())
        using (var sw = new StreamWriter(ms, encoding))
        using (var jw = new JsonTextWriter(sw) { Formatting = Formatting.None })
        {
            serializer.Serialize(jw, value);
            jw.Flush();
            ms.Position = 0;
 
            var request = new PutRecordRequest
            {
                StreamName = streamName,
                Data = ms,
                PartitionKey = Guid.NewGuid().ToString() // PartitionKeyは適当にランダム
            };
 
            // つまり1レコード1HTTP POSTということになる。
            // 大量に投げる際は素朴すぎてアレゲ感があるので、実際にやるときはまとめてから放り込んで
            // 取り出す側も↑の構造を前提にして取り出すよーな感じにしたほうがいーかもデスネー
            return await kinesis.PutRecordAsync(request).ConfigureAwait(false);
        }
    }
 
    // Dynamicが嫌な場合はSerialize<T>でおk。とりあえずこの例ではdynamicでやります。
    // Client内部で分配しちゃったほうがきっと自然にやさしい(Publish().RefCount())
    public IObservable<dynamic> ObserveRecordDynamic()
    {
        return Observable.Create<dynamic>(async (observer, cancellationToken) =>
        {
            var isRunningNextPipeline = false;
            try
            {
                // まずShard一覧を取得する
                // TODO:これを使いまわしちゃうとShardsの増減には対応してないよ!
                // 毎回DescribeStream読むのもアレだしたまに問い合わせとかがいいの?
                var describeStreamResponse = await kinesis.DescribeStreamAsync(new DescribeStreamRequest { StreamName = streamName }).ConfigureAwait(false);
                var shards = describeStreamResponse.StreamDescription.Shards;
 
                var nextIterators = new List<string>();
                foreach (var shard in shards)
                {
                    if (cancellationToken.IsCancellationRequested) return; // CancellationTokenの監視だいぢだいぢ
 
                    // ShardIteratorTypeは実際は取り出した位置を記録しておいてAFTER_SEQUENCE_NUMBERでやるか、LATESTでやるかがいーんじゃないでしょーか?
                    var shardIterator = await kinesis.GetShardIteratorAsync(new GetShardIteratorRequest
                    {
                        StreamName = streamName,
                        ShardId = shard.ShardId,
                        ShardIteratorType = ShardIteratorType.TRIM_HORIZON, // TRIM_HORIZON = 最初から, LATEST = 最新, AT_SEQUENCE_NUMBER = そこから, AFTER_SEQUENCE_NUMBER = 次から
                    }).ConfigureAwait(false);
 
                    var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator.ShardIterator }).ConfigureAwait(false);
 
                    // Shardの順番で回してるので、このPushの順番は必ずしも「時系列ではない」ことにチューイ!
                    foreach (var item in record.Records)
                    {
                        PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                    }
 
                    nextIterators.Add(record.NextShardIterator);
                }
 
                // NextShardIteratorがある状態で無限ぐるぐる
                do
                {
                    if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part2
 
                    for (int i = 0; i < nextIterators.Count; i++)
                    {
                        if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part3
 
                        var shardIterator = nextIterators[i];
 
                        var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator }).ConfigureAwait(false);
 
                        // こちらでも、やはりShardの順番で回してるので、状況によって必ずしも時系列にはならないことにチューイ!
                        foreach (var item in record.Records)
                        {
                            PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                        }
 
                        nextIterators[i] = record.NextShardIterator;
                    }
 
                    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // 実質ポーリングなのでなんとなくDelayをちょっと入れてみる
 
                    nextIterators = nextIterators.Where(x => x != null).ToList(); // 明らかに非効率なこの実装はテキトーなんで真面目にやるなら真面目に書いてください:)
                } while (nextIterators.Any());
            }
            catch (Exception ex)
            {
                if (isRunningNextPipeline)
                {
                    throw;
                }
                else
                {
                    observer.OnError(ex);
                }
 
                return;
            }
 
            observer.OnCompleted();
        });
    }
 
    void PushRecord(Record record, IObserver<dynamic> observer, ref bool isRunningNextPipeline)
    {
        using (var sr = new StreamReader(record.Data, encoding)) // item.DataにMemoryStreamの形で1レコードが受け取れる
        using (var jr = new JsonTextReader(sr))
        {
            var obj = serializer.Deserialize(jr);
            isRunningNextPipeline = true;
            observer.OnNext(obj); // 1レコードをPush
            isRunningNextPipeline = false;
        }
    }
}

PutRecordAsyncはまんま、JSONにシリアライズしたデータを投げ込んでるだけです。ObserverRecordDynamicのほうはちょっと複雑っぽいですが、やってることは順に、DescribeStreamAsyncでShard一覧を取得→それぞれのShardでGetShardIteratorAsyncで始点の取得・GetRecordsAsyncで最初のデータを取得しobserverに配信→取得できたNextShardIteratorを元にデータ取得と配信の無限ループ。です。

コメントで色々書いてありますが、Shard単位で処理していくのでレコードのSequenceNumberの順にPushされているわけではないことと、ShardがSplitやMergeで変動することへの対応は必要よね、とか考えることは色々ありますね。あと、Readの制限が5Req/secとかなり少ないので、複数処理する必要があるなら、できればリクエストは分配してやりたいところ。RxならPublishで分配、ついでにRefCountでSubscriberが0になったら購読解除というのが自然に書けるので、その辺も入れてやるといいかなー、なんて思います。とはいえ、基本的にはデータ取ってOnNextで垂れ流すという、それだけに収まってはいます(ほんとだよ!)。

従来はこの手のコードはyield returnで処理するはずですが、それがOnNextに変わっているという事実が面白い!勿論、同期API + yield returnにすることも可能ですが、AWS SDKの同期APIは非同期のものを.Resultで取ってるだけで非同期のほうがネイティブになるので、同期API使うのはお薦めしません。非同期時代のLINQ、非同期時代のイテレータ。中々面白くありません?UniRx - Reactive Extensions for UnityのFromCoroutineでも、IObserverをyielderとして渡して、非同期のイテレータを作れる(コンバートできる)ようにしています。こういうのも一つのデザイン。

like CEP(with LINQPad)

CEP(Complex Event Processing)は最近良く聞くようになりましたねー、MicrosoftにもStreamInsightというかなり立派なプロダクトがあるのですが、あんまり話を聞かないし将来性もビミョーそうなので見なかったことにしましょう。ちなみにStreamInsightは2.1からRxと統合されたりして、この手のイベントストリームとRxとが相性良いこと自体は証明済みです。

そんなわけでMicrosoft周辺では全然聞きませんが、日本だとLINEでのEsper CEPの活用例とかNorikra:Schema-less Stream Processing with SQLで盛んに聞いて、まーたMicrosoft周辺によくある、一歩先を行ったと思ったら周回遅れ現象か!とか思ったり思わなかったり。

というわけで、Norikraの紹介スライドのクエリ5つをRxで書いてみましょう。また、動作確認はLINQPadのDumpでリアルタイムに表示が可能です(asynchronousにクエリが走ってる最中はResultsのところにリアルタイムにグリッドが追加されていく!)

// Queries:(1)
client.ObserveRecordDynamic()
    .Select(x => new{ x.Name, x.Age })
    .Dump();
 
// Queries:(2)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Shibuya")
    .Select(x => new{ x.Name, x.Age })
    .Dump();
 
// Queries:(3)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();
 
// Queries:(4)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.Max(x => x.Age))
    .Dump();
 
// Queries:(5)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Kyoto" && x.Attend[0] && x.Attend[1])
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.User.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

5分間だったらBufferもしくはWindowが使えます(量が少なそうならBufferのほうが、後続クエリにLINQ to Objectsが使えて分かりやすい、量が多いならWindowで、同様にRxで集計クエリが書ける)。他に何ができるかはRxJavaのWikiのOperator一覧でもどうぞ。めちゃくちゃ何でもできます。

SQL vs Rx

SQLである必要は、あるようで、ない。テキストベースのDSLを作るならSQLが共通知識として期待できるので、SQLに寄せる必要性はかなり高い。けれど、Rxならば、LINQとしての共通知識と、C#そのものであるというコンパイルセーフな点と何でもできること、メソッドチェーン(+IntelliSense)による書きやすさ。SQLライクなものを使いたい理由は全くない。

(とはいえ勿論いちだいのRxがぶんさんごりごりのに勝てるとは思ってないんで、そこはまぁかじゅあるなはなしです)

TODO

というわけで見てきたわけですが、まあ所詮まだ単純なコードによるコンセプトレベルの話ですね!本格的にこれからやるとしたら

  • ObservableKinesisClientをもっとしっかりしたものに
  • Kinesis ApplicationをホストするためのServiceとプラグイン機構
  • ログ転送側としてSLABのKinesis用Sink

ですかねえ。まぁ、これらはJavaですでに用意されているamazon-kinesis-clientamazon-kinesis-connectorsを.NET環境で代替するために必要だ、といったところですね。素直にJava書けば?っていうのは一理あるけれど、どーなんですかね、C#でやりたいんですよ(笑)

Semantic Logging Application Block(SLAB)というのは構造化ロガー(正確にはロガーは含まれないけれど)と収集サービスがセットになったライブラリです。面白いのはOut-Of-Processでの動作が選べて、その場合はWindowsネイティブのEvent Tracing for Windows (ETW)経由でログが運ばれるので、非常に高速に動作する、というところ。Sinkというのは出力用プラグインみたいなものです。なので、アプリケーション→EventSourceロガー→SLAB Service(+ KinesisSink)→Kinesis という構造を作ることで、データをリアルタイムに投下するところまでは行ける。あとはRedShiftに送って解析(amazon-kinesis-connectorsには既にありますね)するなり、他のKinesis Application作るなりよしなに出来るかなぁ、できればいいかなぁ、と。ラムダアーキテクチャ、というホドデハ・モチロンナイ。

AWS + Windows(C#)

先週の木・金に開催されたAWS Summit Tokyo 2014にて、AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践と題して、セッションを行いました。

AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践 from Yoshifumi Kawai

まとめで書きましたが、C#+AWSは現実解、だと思ってます。そしてAWSだからといって特別なこともなく、そしてC#だからといって特別なこともない。Kinesisもちゃんと使えるわけだし、結構面白いことがまだまだ出来るんじゃないかな、って思ってます。なんでAzure使わないんですか?というのには、よく聞かれるのでお茶を濁して答えないとして(!)、AzureにもKinesisのようなAzure Event Hubsというものが先週プレビューリリースされました。C#からの活用という点では、こちらにも注目していきたいところです。Event Hubs Developer Guideなんか見ると普通に色々参考になるし、機能的にはHTTP以外にAMQP使えたり、ちょっと強そうではある。

非同期時代のLINQ

この記事はC# Advent Calendar 2013の4日目となります。2012年はMemcachedTranscoder - C#のMemcached用シリアライザライブラリというクソニッチな記事で誰得でした(しかもその後、私自身もMemcached使ってないし)。その前、2011年はModern C# Programming Style Guide、うーん、もう2年前ですかぁ、Modernじゃないですねえ。2011年の時点ではC# 5.0はCTPでしたが、もう2013年、当然のようにC# 5.0 async/awaitを使いまくる時代です。変化は非常に大きくプログラミングスタイルも大きく変わりますが、特にコレクションの、LINQの取り扱いに癖があります。今回は、非同期時代においてLINQをどう使いこなしていくかを見ていきましょう。

Selectは非同期時代のForEach

これ超大事。これさえ掴んでもらえれば十二分です。さて、まず単純に、Selectで値を取り出す場合。

// こんな同期版と非同期版のメソッドがあるとする
static string GetName(int id)
{
    return "HogeHoge:" + id;
}
 
static async Task<string> GetNameAsync(int id)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100)); // 適当に待機
    return "HogeHoge:" + id;
}
 
// 以後idsと出てきたらこれのこと指してるとします
var ids = Enumerable.Range(1, 10);
 
// 同期バージョン
var names1 = ids.Select(x => new { Id = x, Name = GetName(x) }).ToArray();
 
// 非同期バージョン
var names2 = await Task.WhenAll(ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }));

ラムダ内でasyncを書き、結果はIEnumerable<Task<T>>となるので、配列に戻してやるためにTask.WhenAllとセットで使っていくのが基本となります。Task.WhenAllで包むのはあまりにも頻出なので、以下の様な拡張メソッドを定義するといいでしょう。

// こういう拡張メソッドを定義しておけば
public static class TaskEnumerableExtensions
{
    public static Task WhenAll(this IEnumerable<Task> tasks)
    {
        return Task.WhenAll(tasks);
    }
 
    public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> tasks)
    {
        return Task.WhenAll(tasks);
    }
}
 
// スッキリ書ける
var names2 = await ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }).WhenAll();

では、foreachは?

// 同期
foreach (var id in ids)
{
    Console.WriteLine(GetName(id));
}
 
// 非同期
foreach (var id in ids)
{
    Console.WriteLine(await GetNameAsync(id));
}

そりゃそーだ。……。おっと、しかしせっかく非同期なのに毎回待機してループしてたらMottaiなくない?GetNameAsyncは一回100ミリ秒かかっているから、100*10で1秒もかかってしまうんだ!ではどうするか、そこでSelectです。

// 同期(idsがList<int>だとする)
ids.ForEach(id =>
{
    Console.WriteLine(GetName(id));
});
 
// 非同期
await ids.Select(async id =>
{
    Console.WriteLine(await GetNameAsync(id));
})
.WhenAll();

ForEachの位置にSelect。ラムダ式中では戻り値を返していませんが、asyncなので、Taskを返していることになります(Task<T>ではなく)。同期ではvoidとなりLINQで扱えませんが、非同期におけるvoidのTaskは、Selectを通ります。あとはWhenAllで待機してやれば出来上がり。これは全て同時に走るので100msで完了します。10倍の高速化!

ただし、この場合処理順序は保証されません、同時に走っているので。例えばとある時はこうなりました。

HogeHoge:1
HogeHoge:10
HogeHoge:8
HogeHoge:7
HogeHoge:4
HogeHoge:2
HogeHoge:6
HogeHoge:3
HogeHoge:9
HogeHoge:5

処理順序を保証したいなら?WhenAll後に処理ループを回せばいいぢゃない。

// こうすれば全て並列でデータを取得したあと、取得順のままループを回せる
var data = await ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) }).WhenAll();
foreach (var item in data)
{
    Console.WriteLine(item.Name);
}

一旦、一気に詰めた(100ms)後に、再度回す(0ms)。これはアリです。そんなわけで、非同期時代のデータの処理方法は三択です。逐次await, ForEach代わりのSelect, 一気に配列に詰める。どれがイイということはないです、場合によって選べばいいでしょう。

ただ言えるのは、超大事なのは、Selectがキーであるということ、ForEachのような役割を担うこと。しっかり覚えてください。

非同期とLINQ、そしてプリロードについて

さて、SelectだけではただのForEachでLINQじゃない。LINQといったらWhereしてGroupByして、ほげ、もげ……。そんなわけでWhereしてみましょう?

// 非同期の ラムダ式 をデリゲート型 'System.Func<int,int,bool>' に変換できません。
// 非同期の ラムダ式 は void、Task、または Task<T> を返しますが、
// いずれも 'System.Func<int,int,bool>' に変換することができません。
ids.Where(async x =>
{
    var name = await GetNameAsync(x);
    return name.StartsWith("Hoge");
});

おお、コンパイルエラー!無慈悲なんでなんで?というのも、asyncを使うと何をどうやってもTask<bool>しか返せなくて、つまりFunc<T,Task<bool>>となってしまい、Whereの求めるFunc<T,bool>に合致させることは、できま、せん。

Whereだけじゃありません。ラムダ式を求めるものは、みんな詰みます。また、Selectで一度Task<T>が流れると、以降のパイプラインは全てasyncが強いられ、結果として……

// asyncでSelect後はTask<T>になるので以降ラムダ式は全てasyncが強いられる
// これはコンパイル通ってしまいますがkeySelectorにTaskを渡していることになるので
// 実行時エラーで死にます
ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) })
   .OrderBy(async x => (await x).Id)
   .ToArray();

Selectがパイプラインにならず、むしろ出口(ForEach)になっている。自由はない。

ではどうするか。ここは、一度、配列に詰めましょう。

// とある非同期メソッドのあるClassがあるとして
var models = Enumerable.Range(1, 10).Select(x => new ToaruClass());
 
// 以降の処理で使う非同期系のメソッドなり何かを、全てawaitで実体化して匿名型に詰める
var preload = await models
    .Select(async model => new
    {
        model,
        a = await model.GetAsyncA(),
        b = await model.GetAsyncB(),
        c = await model.GetAsyncC()
    })
    .WhenAll();
 
// そうして読み取ったもので処理して、(必要なら)最後に戻す
preload.Where(x => x.a == 100 && x.b == 20).Select(x => x.model);

概念的にはプリロード。というのが近いと思います。最初に非同期なデータを全て取得しまえば、扱えるし、ちゃんと並列でデータ取ってこれる。LINQの美徳である無限リストが取り扱えるような遅延実行の性質は消えてしまいますが、それはshoganai。それに、LINQにも完全な遅延実行と、非ストリーミングな遅延実行の二種類があります。非ストリーミングとは、例えばOrderBy。これは並び替えのために、実行された瞬間に全要素を一度蓄えます。例えばGroupBy。これもグルーピングのために、実行された瞬間に全要素を舐めます。非同期LINQもまた、それらと同種だと思えば、少しは納得いきませんか?現実的な妥協としては、このラインはアリだと私は思っています。分かりやすいしパフォーマンスもいい。

AsyncEnumerableの幻想、或いはRxとの邂逅

それでも妥協したくないならば、次へ行きましょう。まだ手はあります、良いかどうかは別としてね。注:ここから先は上級トピックなので適当に読み飛ばしていいです

そう、例えばWhereAsyncのようにして、Func<T,bool>じゃなくFunc<T,Task<bool>>を受け入れてくれるオーバーロードがあれば、いいんじゃない?って思ってみたり。こんな風な?

public static class AsyncEnumerable
{
    // エラー:asyncとyield returnは併用できないよ
    public static async IEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        using (var e = source.GetEnumerator())
        {
            while (e.MoveNext())
            {
                if (await predicate(e.Current))
                {
                    yield return e.Current;
                }
            }
        }
    }
}

ただ、問題の本質はそんなことじゃあない。別にyield returnが使えなければ手書きで作ればいいわけで。そして作ってみれば、本質的な問題がどこにあるのか気づくことができます。

class WhereAsyncEnumerable<T> : IEnumerable<T>, IEnumerator<T>
{
    IEnumerable<T> source;
    Func<T, Task<bool>> predicate;
    T current = default(T);
    IEnumerator<T> enumerator;
 
    public WhereAsyncEnumerable(IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        this.source = source;
        this.predicate = predicate;
    }
 
    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }
 
    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
 
    public T Current
    {
        get { return current; }
    }
 
    object System.Collections.IEnumerator.Current
    {
        get { return Current; }
    }
 
    public void Reset()
    {
        throw new NotSupportedException();
    }
 
    public void Dispose()
    {
 
    }
 
    // ↑まではdoudemoii
    // MoveNextが本題
 
    public bool MoveNext()
    {
        if (enumerator == null) enumerator = source.GetEnumerator();
 
        while (enumerator.MoveNext())
        {
            // MoveNextはasyncじゃないのでawaitできないからコンパイルエラー
            if (await predicate(enumerator.Current))
            {
                current = enumerator.Current;
                return true;
            }
        }
        return false;
    }
}

MoveNextだけ見てもらえればいいのですが、predicateを使うのはMoveNextなわけです。ここがasyncじゃないと、AsyncなLINQは成立しません。さて、もしMoveNextがasyncだと?

public async Task<bool> MoveNext()
{
    // ここで取得するenumeratorのMoveNextも
    // 全て同一のインターフェイスであることが前提条件なのでTask<bool>とする
    if (enumerator == null) enumerator = source.GetEnumerator();
 
    while (await enumerator.MoveNext())
    {
        if (await predicate(enumerator.Current))
        {
            current = enumerator.Current;
            return true;
        }
    }
    return false;
}

これは機能します。MoveNextをasyncにするということは連鎖的に全てのMoveNextがasync。それが上から下まで統一されれば、このLINQは機能します。ただ、それってつまり、IEnumerator<T>を捨てるということ。MoveNextがasyncなのは、似て非なるものにすぎない。当然LINQっぽい何かもまた、全て、このasyncなMoveNextを前提にしたものが別途用意されなければならない。そして、それが、Ix-Async

Ix-Asyncのインターフェイスは、上で出したasyncなMoveNextを持ちます。

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetEnumerator();
}
 
public interface IAsyncEnumerator<out T> : IDisposable
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}

そして当然、各演算子はIAsyncEnumerableを求めます。

public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate);

これの何が便利?IEnumerable<T>からIAsyncEnumerable<T>へはToAsyncEnumerableで変換できはするけれど……、求めているのはIEnumerable<Task<T>>の取り扱いであったりpredicateにTaskを投げ込めたりすることであり、何だかどうにもなく、これじゃない感が否めない。

そもそも、LINQ to Objectsから完全に逸脱した新しいものなら、既にあるじゃない?非同期をLINQで扱うなら、Reactive Extensionsが。

Reactive Extensionsと非同期LINQ

ではRxで扱ってみましょう。の前に、まず、predicateにTaskは投げ込めません。なのでその前処理でロードするのは変わりません。ただ、そのまま続けてLINQ的に処理可能なのが違うところです。

await ids.ToObservable()
    .SelectMany(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

おお、LINQだ?勿論、Where以外にも何でもアリです。RxならLINQ to Objects以上の山のようなメソッドを繋げまわることが可能です。ところで、ここで出てきているのはSelectMany。LINQ to ObjectsでのSelectの役割を、Rxの場合はSelectManyが担っています。asyncにおいてForEachはSelectでRxでSelectはSelectMany……。混乱してきました?

なお、これの結果は順不同です。もしシーケンスの順序どおりにしたい場合はSelect + Concatを代わりに使います。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Concat()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

ソーナンダー?ちなみにSelectManyはSelect + Mergeに等しい。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Merge()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

この辺のことがしっくりくればRxマスター。つまり、やっぱRxムズカシイデスネ。とはいえ、見たとおり、Rx(2.0)からは、asyncとかなり統合されて、シームレスに取り扱うことが可能になっています。対立じゃなくて協調。自然に共存できます。ただし、単品でもわけわからないものが合わさって更なるカオス!強烈強力!

まとめ

後半のAsyncEnumerableだのIx-AsyncだのRxだのは、割とdoudemoii話です、覚えなくていいです。特にIx-Asyncはただの思考実験なだけで実用性ゼロなので本気でdoudemoiiです。Rxは便利なので覚えてくれてもいいのですが……。

大事なのは、async + Selectです。SelectはForEachなんだー、というのがティンとくれば、勝ったも同然。そして、プリロード的な使い方。そこさえ覚えれば非同期でシーケンス処理も大丈夫。

asyncって新しいので、今まで出来たことが意外と出来なくてはまったりします。でも、それも、どういう障壁があって、どう対処すればいいのか分かっていればなんてことはない話です。乗り越えた先には、間違いなく素晴らしい未来が待っているので、是非C# 5.0の非同期、使いこなしてください。

RxとRedisを用いたリモートPub/Sub通信

今日から始まったBuild Insiderで、RedisとBookSleeveの記事を書きました - C#のRedisライブラリ「BookSleeve」の利用法。Redis、面白いし、Windowsで試すのも想像以上に簡単なので、是非是非試してみて欲しいです。そして何よりもBookSleeve!使うと、強制的に全てがasyncになるので、C# 5.0でのコーディングの仕方のトレーニングになる(笑)。にちじょー的にasync/awaitを使い倒すと、そこから色々アイディアが沸き上がっきます。ただたんにsyncがasyncになった、などというだけじゃなく、アプリケーションの造りが変わります。そういう意味では、Taskは結構過小評価されてるのかもしれないな、なんて最近は思っています。

さて、RedisにはPub/Sub機能がついているわけですが、Pub/Sub→オブザーバーパターン→Rx!これはティンと来た!と、いうわけで、これ、Rxに乗せられました、とても自然に。というわけで、□□を○○と見做すシリーズ、なCloudStructuresにRx対応を載せました。

追加したクラスはRedisSubject<T>。これはSubjectやAsyncSubjectなどと同じく、ISubject<T>になっています。IObservableであり、IObserverでもある代物。とりあえず、コード例を見てください。

var settings = new RedisSettings("127.0.0.1");
 
var subject = new RedisSubject<string>(settings, "PubSubTest");
 
// SubscribeはIObservable<T>なのでRxなLINQで書ける
var a = subject
    .Select(x => DateTime.Now.Ticks + " " + x)
    .Subscribe(x => Console.WriteLine(x));
 
var b = subject
    .Where(x => !x.StartsWith("A"))
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("completed!"));
 
// IObserverなのでOnNext/OnError/OnCompletedでメッセージ配信
subject.OnNext("ABCDEFGHIJKLM");
subject.OnNext("あいうえお");
subject.OnNext("なにぬねの");
 
Thread.Sleep(200); // 結果表示を待つ...
 
a.Dispose(); // UnsubscribeはDisposeで
subject.OnCompleted(); // OnCompletedを受信したSubscriberもUnsubscribeされる

はい、別になんてこともない極々フツーのRxのSubjectです。が、しかし、これはネットワークを通って、Redisを通して、全てのSubscriberへとメッセージを配信しています。おお~。いやまあ、コードの見た目からじゃあそういうの分からないので何も感動するところもないのですが、とにかく、本当に極々自然に普通に、しかし、ネットワークを超えます。この、見た目何も変わらずに、というところがいいところなわけです。

Subscribeする側はIObservableなので別にフツーに合成していけますし、Publishする側もIObserverなので、Subscribeにしれっと潜り込ませたりしても構わない。もう、全然、普通にインメモリなRxでやる時と同じことが、できます。

オワリ。地味だ。

う、うーん。ほ、ほらほら!!

// ネットワーク経由
var settings = new RedisSettings("xx.xxx.xxx.xxx");
 
var subject = new RedisSubject<DateTime>(settings, "PubSubTest");
 
// publisherはこちらのコード
while (true)
{
    Console.ReadLine();
    var now = DateTime.Now;
    Console.WriteLine(now.Ticks);
    subject.OnNext(now);
}
 
// subscriberはこちらのコードを動かす
subject.Subscribe(x =>
{
    var now = DateTime.Now;
    Console.Write(x.Ticks + " => " + now.Ticks);
    Console.WriteLine(" | " + (now - x));
});

というわけで、実際にネットワーク経由(AWS上に立てたRedisサーバーを通してる)で動かしてみた結果がこんな感じです。ネットワークを超えたことで用法は幾らでもある!夢膨らみまくり!

で、↑のコードは遅延時間のチェックを兼ねてるのですが、概ね、0.03秒ぐらい。たまにひっかかって0.5秒超えてるのがあって、ぐぬぬですが。実際のとこRedis/Linuxの設定で結構変わってくるところがあるので、その辺は煮詰めてくださいといったところでしょうか。

ともあれチャットとかなら全然問題なし、ゲームでもアクションとかタイミングにシビアじゃないものなら余裕ですね、ボードゲームぐらいなら全く問題ない。ちょっとしたMMOぐらいならいけるかも。これからはネットワーク対戦はRedisで、Rxで!!!

余談

CloudStructuresですが、.configからの設定読み込み機能も地味につけました。

<configSections>
    <section name="cloudStructures" type="CloudStructures.Redis.CloudStructuresConfigurationSection, CloudStructures" />
</configSections>
 
<cloudStructures>
    <redis>
        <group name="cache">
            <add host="127.0.0.1" />
            <add host="127.0.0.2" port="1000" />
        </group>
        <group name="session">
            <add host="127.0.0.1" db="2" valueConverter="CloudStructures.Redis.ProtoBufRedisValueConverter, CloudStructures" />
        </group>
    </redis>
</cloudStructures>
// これで設定を読み込める
var groups = CloudStructuresConfigurationSection.GetSection().ToRedisGroups();

色々使いやすくなってきて良い感じじゃあないでしょーか。

コールバック撲滅

そうそう、最後に実装の話を。元々BookSleeveのPub/Sub購読はコールバック形式です。「public Task Subscribe(string key, Action<string, byte[]> handler)」handlerはstringがキー, byte[]が送られてくるオブジェクトを指します。コールバック is ダサい。コールバック is 扱いにくい。ので、コールバックを見かけたらObservableかTaskに変換することを考えましょう!それがC# 5.0世代の常識です!

というわけで、以下のようにして変換しました。

public IDisposable Subscribe(IObserver<T> observer)
{
    var channel = Connection.GetOpenSubscriberChannel();
 
    var disposable = System.Reactive.Disposables.Disposable.Create(() =>
    {
        channel.Unsubscribe(Key).Wait();
    });
 
    // ここが元からあるコールバック
    channel.Subscribe(Key, (_, xs) =>
    {
        using (var ms = new MemoryStream(xs))
        {
            var value = RemotableNotification<T>.ReadFrom(ms, valueConverter);
            value.Accept(observer); // この中でobserverのOnNext/OnError/OnCompletedが叩かれる
            if (value.Kind == NotificationKind.OnError || value.Kind == NotificationKind.OnCompleted)
            {
                disposable.Dispose(); // ErrorかCompletedでもUnsubscribeしますん
            }
        }
    }).Wait();
 
    return disposable; // もしDisposableが呼ばれたらUnsubscribeしますん
}

こんなふぅーにRxで包むことで、相当使いやすさがアップします。感動した。

複数の値とC# 5.0 Async再び、或いはAsyncEnumerableへの渇望とRx

以前にReactive Extensions + asyncによるC#5.0の非同期処理では、単体の値であったらasync、複数の値であったらIObservable<T>が使い分け、とかかんとかと言ってましたが、本当にそうなの?もしくは、そもそも複数の値のシチュエーションって分かるような分からないようななのだけど?などなどと思ったりする昨今を如何様にお過ごしでしょうか。というわけで、今回はグッとこの部分に深く迫ってみましょう。

同期的なシチュエーション

さて、例、なのですけれど、データベースで行きましょう。生DataReaderを転がしてます。

// 接続文字列に Asynchronous Processing=true は非同期でやるなら欠かさずに
const string ConnectionString = @"Data Source=.;Initial Catalog=AdventureWorks2012;Integrated Security=True;Asynchronous Processing=true;";
 
// こういうreader.Readが尽きるまで列挙するだけのヘルパーがあるだけで
static IEnumerable<IDataRecord> EachReader(IDbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) connection.Open();
 
        using (var reader = command.ExecuteReader())
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}
 
static void Main(string[] args)
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // LINQでSelectとかいろいろ出来る!便利!抱いて!
        var result = EachReader(conn, "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .ToArray();
 
        // PKでとりたければFirstOrDefualtとかでいいわけです
        var customer = EachReader(conn, "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .FirstOrDefault();
    }
}

ExecuteReaderの結果のIDataRederは、yield returnで列挙してやると、LINQで加工できるようになるので、SelectしてToArrayとか、SelectしてFirstOrDefaultとか、非常にやりやすくて便利なわけです。

ここまでは、いいと思います。じゃあ、非同期でやると、どうするの、と。

内部イテレータ的に考える

.NET Framework 4.5からは主要な非同期メソッドに全てXxxAsyncという名前のものがつきました。ADO.NETにおいては、OpenAsyncやExecuteReaderAyncなどがあります。というわけで、試してみましょう。

// 非同期だったこうしたい、でもこれはコンパイル通らない!
static async Task<IEnumerable<DbDataReader>> EachReaderAsync(DbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}

残念ながら、asyncとyield returnを共存させることはできないので、どうにもなりません。……はい。しかし出来ない、では困る。さすがに、非同期実行するところに全部生のままで、こんなCreateCommand…, ExecuteReaderAsync, …. なんて書いてられないし。

じゃあどうするか、というと、内部イテレータ的にしましょう。つまりList<T>にあるようなForEachです。yield returnが外部イテレータ的であり、それが無理なら、内部イテレータ的にすればいいぢゃない。

// ループを回してる最中に実行するaction引数をつけた(FuncじゃなくてAction<DbDataReader>のオーバーロードも作るとベター)
static async Task ForEachAsync(DbConnection connection, string query, Func<DbDataReader, Task> action)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) await action(reader);
        }
    }
}
 
static async Task<List<Customer>> GetCustomers()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // これで、Selectっぽくできてないこともないと言えなくもない
        var list = new List<Customer>();
 
        await ForEachAsync(conn, "select * from Sales.Customer", async x =>
        {
            var customer = new Customer
            {
                CustomerID = await x.GetFieldValueAsync<int>(0),
                PersonID = !x.IsDBNull(1) ? await x.GetFieldValueAsync<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? await x.GetFieldValueAsync<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? await x.GetFieldValueAsync<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? await x.GetFieldValueAsync<string>(4) : null
            };
            list.Add(customer);
        });
 
        return list;
    }
}
 
// LINQじゃないので匿名型は使えないのね
public class Customer
{
    public int CustomerID { get; set; }
    public int? PersonID { get; set; }
    public int? StoreID { get; set; }
    public int? TerritoryID { get; set; }
    public string AccountNumber { get; set; }
}

ForEachAsync!というわけで、Actionを渡してやって、そこでグルグルッとすることにより制限を回避もどき。ToArrayしたい?古き良きListにAddすればいいぢゃない、といったものですよ、ははは。LINQじゃないので匿名型は使えないがね!

さて、匿名型が使えないのはいいとしても、問題は、LINQの特徴である合成可能性を欠いているところです。一番困るのは、これ、FirstOrDefaultできないね、って。全件取るんですか?まあ、PKだったら一件なのが保証されてるし、そうでないならtop 1とでも書いておけよ、と言えなくもないですが、しかしどうなのよこれ、と。

そんなわけでForEachAsyncと、もう一つ、ExecuteSingleAsyncという名前で、一件のみを列挙するようなものを別途作る必要があります。とはいえ、それでも対応できているのは一件と全件だけ。例えばTakeWhileみたいなのがやりたい、SkipWhileみたいなのがやりたいとなったらどうするの、と。答えは、どうにもなりません。諦めるしかない。

AsyncEnumerableで救われる

どうしても諦められないのならば、AsyncEnumerableを授けましょう。NuGetからIx_Experimental-Asyncを引っ張ってきます。Ix、そう、みんな大好きReactive Extensionsの兄弟なわけですが、しかし紹介しておいてアレですが、このIx_Experimental-Asyncはお薦めはしません!完全に実験的に、できるから、というだけで実装例を掲示してみせてくれたというだけなノリがぷんぷんしているからです。実際、最初のAsync CTPが出た時に公開されて、それから更新されてませんしね……。

ともあれ、どんなコンセプトの代物なのかは見ておきましょう。

// こういうDB列挙用のIAsyncEnumerable/Enumeratorを作る。
// yield returnのようなコンパイラサポートはないので手書きするんだよ!
public class AsyncDbEnumerable : IAsyncEnumerable<DbDataReader>, IAsyncEnumerator<DbDataReader>
{
    DbConnection connection;
    DbCommand command;
    DbDataReader reader;
    string query;
 
    public AsyncDbEnumerable(DbConnection connection, string query)
    {
        this.connection = connection;
        this.query = query;
    }
 
    public IAsyncEnumerator<DbDataReader> GetEnumerator()
    {
        return this;
    }
 
    public DbDataReader Current
    {
        get { return reader; }
    }
 
    public async Task<bool> MoveNext(System.Threading.CancellationToken cancellationToken)
    {
        if (command == null)
        {
            if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
            command = connection.CreateCommand();
            command.CommandText = query;
            reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
        }
 
        if (await reader.ReadAsync())
        {
            return true;
        }
        return false;
    }
 
    public void Dispose()
    {
        reader.Dispose();
        command.Dispose();
    }
}
 
static IAsyncEnumerable<DbDataReader> EachReaderAsync(SqlConnection connection, string query)
{
    return new AsyncDbEnumerable(connection, query);
}
 
static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // 完全にLINQなのでSelectしてToArrayで匿名型もOK
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();
 
        // 勿論FirstOrDefaultもできる
        var customer = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .FirstOrDefault();
    }
}

IAsyncEnumerableの実装は完全に手作りです!真面目に使うなら、AnonymousAsyncEnumerableとかを作って、それを使って構築しますが、今回はてきとーな感じに実装しておきました。てきとーと言っても、ちゃんと動きますよ、はい。

さて、これによってIAsyncEnumerableに変換されたシロモノは、LINQのメソッドが全て使えます。おー、やったね、これなら完璧。

と、言いたいのですが、よーくSelectの中のラムダ式を見ると、ForEachAsyncの時のものと違うのが分かるでしょうか?ForEachAsyncの時はGetFieldValueAsyncといった、値取得まで非同期のものを使いました。でも、今回はそれは使ってない。何故かというと、そこでasyncにしてしまうと戻り値がIAsyncEnumerable<Task<T>>になってしまうから。

何がいけないのか、というと、例えばToArrayした結果で見るとresult[0]はTaskなんですよ。実態はresult[0].Resultとしなきゃあいけません。おまけに、全部Taskということは、実行中かもしれないわけです。じゃあ全部待てばいいのか、 await Task.WhenAll(result) とすればいいか、というと、そうなると、一つのコネクションで複数実行が走る、この場合Connectionは複数実行は許容されていないので、まあ例外が飛んできてしまうでしょう。

じゃあasyncは諦めるの?というと、幾つか手はある。ひとつはAwaitとかいうような拡張メソッドを作って、IAsyncEnumerable<Task<T>> から IAsyncEnumerable<T> に戻すようなものを作ればいい。作るのは、まあ、難しくはないのだけど結局yield returnがないので完全手書きしなきゃならないので面倒なので例は割愛。

もしくはSelectなどのselectorに、ラムダ式がasyncで書かれている場合(戻り値がTaskになっている場合)はawaitするようなオーバーロードがあっても良かったと思うのですがねえ。んー、まあ、それはさすがに大きなお世話っぽいからダメか……。

もしくは、ToArrayなどで外に出すのではなく、ForEachAsyncというメソッドが用意されているので、それを使ってawaitするか。こうなると内部イテレータとやってること同じになってきますが。

とはいえともかく、例に出したAwaitメソッドのような、そういうのがないと、実用性に欠けると言わざるを得ません。また、これはあくまでもAsyncEnumerableであって、IEnumerableへのLINQ to Objectsとは別物です。Select, Where, Firstなどは挙動が同じというだけで、中身は全然違います。で、当然ながらコンセプト実装なので、内部的にもあまりこなれてませんね。なので、Ix_Experimental-Asyncは所詮はコンセプト実装であり、面白いとは思いますし満喫はしましたが、実世的なものとは言いがたいと結論づけます。

IObservable<T>の中へ

と、ここまで見てきたので、最後はRxで〆ましょう。ちなみにRxはバージョンが幾つかありますが、私としてはRx 2.0-RCしか使う気はありません。正直、1.0系とは雲泥の差ですからねえ。

static IObservable<DbDataReader> EachReaderAsync(DbConnection connection, string query)
{
    // CreateAsyncで作る。OnErrorなどは手書きですが、十分に書きやすい
    return Observable.CreateAsync<DbDataReader>(async observer =>
    {
        try
        {
            using (var command = connection.CreateCommand())
            {
                command.CommandText = query;
                if (connection.State != ConnectionState.Open) await connection.OpenAsync();
 
                using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                {
                    while (!reader.IsClosed && reader.Read()) observer.OnNext(reader);
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return;
        }
        observer.OnCompleted();
    });
}
 
static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // Rxなので完全にLINQ、awaitableなのでToArrayをawaitしたりも出来るのでノリは完全に一緒
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();
 
        // もちろん、そのままSubscribeしたり、FirstOrDefaultAsyncしたりしてもいい
    }
}

というわけでRxでも生成はしやすいし、出力結果も相当扱いやすい。素晴らしい!けれど、これもIAsyncEnumerableと同様にSelectの中でasyncしちゃうと厄介なことになるので、基本的にはできない。

Await拡張メソッドを作るのは簡単だけど、Rxの場合はスレッドセーフの問題が結構難しくて、うまく決めるのは難しい……。

それに加えて、ほとんど決まりきっている、ただたんに列挙したいだけ、のものにRxを使うというのはやり過ぎ感があり、性能面でちょっとね、と。うーん、言うほど悪くはないし、今まで散々持ち上げといてなんだよそれって感じですが、目の前にTaskが転がっていて、そこまで利点が大きくない中で選ぶか?と迫られたら、選びにくいなあ、って。思ってしまうのです。

おまけ:Entity Framework 6について

Entity Frameworkはロードマップによりバージョン6から非同期対応すると明言しています。また、デザインノートTask-based Asynchronous Pattern support in EF.も公開されていたり、それとEFはソースコードが公開されているのですが、CodePlexから最新版を落としてくれば、そこには既にTaskによる実装が存在しています。

軽く見たところ、EF内部で使う用のAsyncEnumerableを定義してありました。それを使って、非同期系を動かしてましたね。ただ、完全に内部用なので外からは使えないし、色々限定的ですけれど。また、最終的に出力する場合は、やはり内部イテレータ的に、await ForEachAsyncしてListに変換するなりしていました。ふんふん、なるほどねー、と、結構眺めてて面白いのでお薦めです。

まとめ

C# 5.0で非同期は簡単になった!そして、同様に非同期はやはり難しい!こうして案を見ていったわけですが、結局どれを選ぶの?というと、まあ、内部イテレータ案が一番無難で良いと思います。

それにしても、いやあもう頭ぱっつんぱっつんです。で、何でDBがネタになっているかというと、新Micro-ORMライブラリを作成中で(DbExecutor v.Next)、今から作るなら非同期対応しないとかありえないよねー、ということで色々考えてはいるんですが、これが中々にビシッ!としっくり決めるのが大変で。結構良い感じにはなってきてつもりではあるんですが、全然まだまだで。あんま人に見られたくない段階なんですが諸事情で見れる人には見れるようになってしまってて恥ずかちい。

そんなわけでVS2012登場まで、もうすぐです(MSDN会員は一週間切ってる、そうでない人も一ヶ月後)。C# 5.0への備え、できてますか?さあ、カオスな非同期時代に突入しましょう!

Reactive Extensions + asyncによるC#5.0の非同期処理

Reactive Extensions(Rx)の利点ってなんですかー、というと、合成可能なんです!ということです。合成可能って何?というと、LINQが使えるということなんです!です。じゃあ他には、ということで…… 詳しくはこの動画/スライド見るといいです。 Curing Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012 | Channel 9。最初のほうの例が非常に分かりやすいので、とりあえずその部分だけ引っ張ってきますと

// sender, argsの型がふわふわ
exchange.StockTick += (sender, args) => // senderの型が消えてる
{
    if (args.Quote.Symbol == "MSFT")
    {
        // 合成できないからイベントの中でベタ書きしかない
    }
};
 
exchange.StockTick -= /* ラムダ式でイベント登録すると解除不能 */

これが通常のイベントの弱点です。Rxにすると

// <Quote>という型が生きてる
IObservable<Quote> stockQuotes = ...; // 変数に渡せる
 
// LINQクエリ演算子が使える
var msft = stockQuotes
    .Where(quote => quote.Symbol == "MSFT");
 
var subscription = msft.Subscribe(quote => /* */);
 
// イベント解除が容易
subscription.Dispose();

といった感じで、実に素晴らしい!じゃあEventはもうObsoleteでいいぢゃん、というと、まあいいと思うのですがそれはそれとして、C#ネイティブだからこそデザイナがイベントが大前提で考慮されていたり、軽くて実行速度が良かったり、といったところは勿論あります。あとRxだとイベントのIObservable化が面倒だとかもね。この辺は最初から言語サポートの効いてるF#のほうが強いんですよねー。

非同期のリトライ

Visual Studio 2012も、もうRCということで間近感が相当にあります。一方でReactive Extensions v2.0 Release Candidate available now!ということで、こちらも間近感があります。一度2.0使うと1.0には戻れないよ!(NuGetではRx-Main -Preで入れられます)

じゃあRx 2.0の紹介でもしますかー、というと、しません!(ぉぃ)。その前に、asyncとRxの関係性にケリをつけておきましょう。

で、asyncの非同期とRxの非同期はやっぱり使い分けフワフワッという感じ。複数の値が来るときはRxでー、とか言われても、そもそも複数っていうのがそんなにー、とか。あと、それ以外にないの?というと、Rxの合成の強さが非同期にも発揮してRetry処理とか柔軟でー、とか。確かにそれっぽい。けれど、どうもフワッとしてピンと来ないかもしれない。

ので、例を出していきましょう。まず、リトライ処理。リトライ処理を素の非同期で書くと泣きたくなりますが、C# 5.0を使えばasync/awaitで何も悩むことなくスッキリと!

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
    goto RETRY;
}
 
static void Main(string[] args)
{
    var google = DownloadStringAsyncWithRetry("http://google.com/404", 3);
    Console.WriteLine(google.Result);
}

簡単です。さて、ではこれをRxで書くと……

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var req = WebRequest.CreateHttp(url);
 
    // retry処理は.Retryで済む
    using (var res = await req.GetResponseAsync().ToObservable().Retry(retryCount))
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

はい。別にRxとasyncは排他じゃありません。使って効果のあるところに差し込んで、Mixしてやれば、ただでさえ強力なasyncが更に強力になります。TaskとIObservableは変換可能なので、ToObservableして、あとはRetryメソッドを繋げるだけ。そしてIObservableはawait可能(LastAsyncと同じ効果で、最後の値を取る。非同期処理の場合は値が一つなので問題なし)なので、まんまawaitしてasyncと繋げてやればいい。素敵ですねー。

が、上のコードはちょっと間違ってます。どこが間違っているか分かりますか?

エラーの帰ってくるページ(google/404などは404エラーを返してくれるのでテストに楽←別に500にすれば500を返してくれるわけじゃなくて、ただたんに存在しないページだから404なだけで、別にどこでもいいです)を指定してFiddlerなどで観察すれば分かりますが、一回しかリクエスト飛ばしません。Retry(3)としても一回しか飛んでいません。ちゃんとRetryは3回しているのに。

どういうことかというと、GetResponseAsync()の時点でリクエストに失敗しているからです。失敗済みのリクエストに対しては、何回Retryしても失敗しか返しません。ここは本当にはまりやすくて注意所なので、よく気を付けてください!

解決策は、Retryで生成を毎回やり直すこと。Deferで包めばいいです。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    // Retry時に毎回WebRequestを作り直す
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .Retry(retryCount);
 
    // retry処理は.Retryで済む
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

ちょっと罠があるしコードも増えてしまったけれど、それでも、まあ、まだ割といいかな、って感じでしょうか?

さて、リトライは即時じゃなくて一定間隔置いた後にリトライして欲しいってことが多いと思います。同期処理だとThread.Sleepで待っちゃうところですが、それはちょっとスレッド勿体ない。C# 5.0からはawait Task.Delayを使いましょう。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay); // これで待つ
    }
 
    goto RETRY;
}

以前のものにTask.Delayを足しただけで簡単です。わーお、素晴らしい、なかなか強力強烈です。ではRxは、というと、同じように遅延する演算子を足すだけ。Delay、ではダメでDelaySubscription(Rx 2.0から追加)を使います。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    // DelaySubscriptionで遅延させる
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .DelaySubscription(retryDelay)
        .Retry(retryCount);
 
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

できました!できました?いや、これだと初回リクエスト時にも遅延されちゃってて、ちょっとイケてない。修正しましょう。

// 外部変数用意するのがダサい
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => count++ == 0 ? xs : xs.DelaySubscription(retryDelay))
    .Retry(retryCount);

はい、ダサいし、なんだか何やってるのかさっぱりになってきました、サイテー。LetもRx 1.0にはなくて(それ以前にはあったのですが削られた)2.0から復活になります。Letは、一時変数を置かなくて済むというチェーン病にかかった人がお世話になる処方薬です。んなもん読みにくくさせるだけじゃねーか、という感じですが、もしLetがないと変数を置いて var xs = ToObservable(); xs = () ? xs : xs.Delay..; xs = xs.Retry(); としなければならなくて、非常に面倒くさいのです。だから、使いどころを守って乱用しなければ、割とイケてます。結構大事。

が、しかし、これも間違っています!(えー)。というかLetではなくて変数に展開してみるとオカシイとはっきり分かるのですが、Letの内部はRetryとか関係なく一回しか評価されないので、これだと必ずDelaySubscriptionなしのほうしか通りません。この路線で行くなら、更にやけくそでDeferを追加しましょうか。

// Deferだらけとかダサすぎるにも程がある
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => Observable.Defer(() => count++ == 0 ? xs : xs.DelaySubscription(retryDelay)))
    .Retry(retryCount);

ダサすぎて話にならない。Defer連打ダサい。Deferまみれになったら、ちょっと根本から方針を疑いましょうか。ついでに外部変数を使うというのがそもそもダサい。もう少し頑張りましょう!クエリ演算子をこねくり回して、と。

// DelayなしのDeferとDelayありのDeferを連結して、DelayありのみをRetryさせている
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => xs.Catch(xs.DelaySubscription(retryDelay).Retry(retryCount - 1)));

どうでしょう。他にもやりようは色々とあるかもですが、正直ワケガワカラナイのでこの辺でよしておいたほうがマシです。実際のところ、以下のような拡張メソッドを作るのがベストだと思っています。

// 結局これが一番なのではかという結論に至る
public static async Task<string> DownloadStringAsyncWithRetry(this WebClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.DownloadStringTaskAsync(url);
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

new WebClient().DownloadStringAsyncWithRetry(”hogehoge”); だけですからねー。拡張メソッド万歳。Rx最終形のような短さとか魔術っぽさはゼロで面白くも何ともない、というか正直クソつまらないコードなわけですが、そこがC#のC#たる所以ですな、ということで。私はRxのようなクールさも勿論大好きなのですが、こういうイモさもまた、C#らしさであって、現実をより良くするための、目的を忘れない素敵な側面だと思っています。

ちなみにWebRequestの場合はそれ自体の作り直しが必要なので(一度エラーを受けたら何度GetResponseを繰り返してもダメぽ)、拡張メソッドダメです。WebClientはイベントベースなのでTask系と相性がアレで今一つなわけですが、WebRequestはWebRequestで、これベースに拡張メソッドだけで整えるのは無理があるのですね……。

.NET Framework 4.5からはHttpClientというクラスが入るので、それを使うとちょっとだけモダンっぽい雰囲気。

// モダンなドトネト的にはHttpClientかしら
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.GetStringAsync(url);
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

別途System.Net.Httpの参照が必要なのが面倒ですが。

非同期のタイムアウト

Rxが色々できるのは分かったけれど、結局そういう部分って拡張メソッドとかに隔離してアプリケーションコードからは離れるので、やっぱそんなでもないんじゃないの!?というと、あー、まあそうかもねえ、とか思いつつ、複雑になればなるほど効果は加速しますが、そうなるとRxでも(見た目はスッキリしたとしても)やっぱ複雑ですからね。さておき、このままだとアレなのでもう少しまともな例を、一番最初に挙げたCuring Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012から引っ張って来ましょうか。

タイムアウトを追加する例です。WebRequestだとTimeout設定すればいいぢゃーん、ではあるものの、そうではないシチュエーションも沢山ありますから、対策を知っていて損はないです。まず、asyncの例を。

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url);
    var timeout = Task.Delay(TimeSpan.FromSeconds(30));
 
    // これ結構トリッキーですよね
    if (await Task.WhenAny(download, timeout) == timeout)
    {
        throw new TimeoutException();
    }
 
    var html = await download;
    return html;
}

WhenAnyが中々トリッキーですね。慣用句として覚えてしまえばどうってことないのですが……。asyncもただawaitするだけじゃなくて、ちょっと慣れてきたらTask.WaitAll/Any, Task.WhenAll/Anyを使いこなすと、性能的な意味でも表現力的な意味でもグッと広がりますので、探究するのがお薦め。

さて、それをRxでやると……

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url)
        .ToObservable()
        .Timeout(TimeSpan.FromSeconds(30));
 
    var html = await download;
    return html;
}

ToObservableして、Retryの時のようにTimeoutを足すだけ。非常に直観的で、楽ちん、分かりやすい。演算子が豊富なのはRxの強みです。だからRetryにTimeoutがつけられるオーバーロードが最初から用意されていれば、Letとかで複雑になってしまった例もスッキリ仕上がって、ドヤァと言えたんですけどね(笑)

// こういうのを作っておけば!
static IObservable<T> Retry<T>(this IObservable<T> source, int retryCount, TimeSpan retryDelay)
{
    return source.Catch(source.DelaySubscription(retryDelay).Retry(retryCount - 1));
}
 
// 神がかってシンプルに!Rx最強!
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    return await Observable.Defer(() => client.GetStringAsync(url).ToObservable()).Retry(retryCount, retryDelay);
}

標準で足りない演算子は自分で作ればいいので、また、asyncが出来たことで、今まで自作が大変だった演算子も作るのが大分容易になりました!ので、ガンガン作ってしまうといいです。汎用的に使える演算子が集まれば集まるほど、Rxの合成可能という性質が価値を発揮します。

リトライやタイムアウトをC# 4.0でRxなしで書くと

死ぬほど面倒なので書きません。いや無理でしょ常識的に考えて。

まとめ

というわけで、Rxとasyncは手を取り合って仲良く高みを目指せばいいわけです。使いこなしが必要なのはどっちも変わらない!

さて、@ITの連載、Reactive Extensions(Rx)入門 - @IT の次回は非同期のはずですが(聞こえなーい)、ええと、はい、すみません……。ええと、あと次はRx 2.0の強化事項を、ええと、まあそのうちいつか……。はい、すみません。

諸事情あって今色々詰まってて本気でヤバいんですが、それはそれとして、現在全力で一年以上やるやる詐欺だったlinq.jsの改修を進めていまして、これは本当に本当に絶対近日中にベータを出すのでお楽しみに。相当にイイ出来で、割と革命的に凄い内容になってます。いやほんと。かなり自信ありますよ。

他の積みタスクは、ReactiveOAuth(バグ修正のPull Requestを放置中というサイテーな有様、OAuth 2.0対応しないの?とか)、ReactiveProperty(WinRT対応まだー?)、Utakotoha(現在動いてない模様なので要改修)、DbExecutor(全面再構築まだー?DataSet殺すんでしょー?)とかでしょうか、って結構ありますね、うわぉぅ。というかReactive系は2.0対応とWinRT対応をやらなきゃならないので作業量的に面倒くさくて、ついつい手が遠ざかってしまいですね。はい、でも、やります。

にゃー、という感じでブログも結構アレなWebFormsとDataSetディスもそろそろさようならして、通常営業に戻ってきませう。

RxとパフォーマンスとユニットテストとMoles再び

C# Advent Calendar 2011、順調に進んでいますね。どのエントリも力作で大変素晴らしいです。私はこないだModern C# Programming Style Guideというものを書きました。はてブ数は現段階で45、うーん、あまり振るわない……。私の力不足はともかくとしても、他の言語だったらもっと伸びてるだろうに、と思うと、日本のC#の現状はそんなものかなあ、はぁ、という感じではあります。はてブが全てではない(むしろ斜陽?)とはいえ、Twitterでの言及数などを見ても、やっぱまだまだまだまだまだまだ厳しいかなあ、といったところ。Unityなどもあって、見ている限りだと人口自体は着実に増えている感じではありますけれど、もっともっと、関心持ってくれる人が増えるといいな。私も微力ながら尽力したいところです。

ところで、id:ZOETROPEさんのAdvent Calendarの記事、Reactive Extensionsでセンサプログラミングが大変素晴らしい!センサー、というと私だとWindows Phone 7から引っ張ってくるぐらいしか浮かばないのですが(最近だとKinectもHotですか、私は全然触れてませんが……)おお、USB接続のレンジセンサ!完全に門外漢な私としては、そういうのもあるのか!といったぐらいなわけですが、こうしてコード見させていただくと、実践的に使うRxといった感じでとてもいいです。

記事中で扱われているトピックも幅広いわけですが、まず、パフォーマンスに関しては少し補足を。@okazukiさんの見せてもらおうじゃないかReactive Extensionsの性能とやらを! その2のコメント欄でもちょっと言及したのですが、この測り方の場合、Observable.Rangeに引っ張られているので、ベンチマークの値はちょっと不正確かな、と思います。

// 1000回イベントが発火(発火の度に長さ3000のbyte配列が得られる)を模写
static IObservable<byte[]> DummyEventsRaised()
{
    return Observable.Repeat(new byte[3000], 1000, Scheduler.Immediate);
}
 
// 配列をバラす処理にObservable.Rangeを用いた場合
static IObservable<byte> TestObservableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Observable.Range(0, xs.Length, Scheduler.Immediate).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// 配列をバラす処理にEnumerable.Rangeを用いた場合(ForEachはIxのもの)
static IObservable<byte> TestEnumerableRange()
{
    return Observable.Create<byte>(observer =>
    {
        return DummyEventsRaised()
            .Subscribe(xs =>
            {
                Enumerable.Range(0, xs.Length).ForEach(x => observer.OnNext(xs[x]));
            });
    });
}
 
// SelectManyでバラす場合
static IObservable<byte> TestSelectMany()
{
    return DummyEventsRaised().SelectMany(xs => xs);
}
 
static void Main(string[] args)
{
    // ベンチマーク補助関数
    Action<Action, string> bench = (action, label) =>
    {
        var sw = Stopwatch.StartNew();
        action();
        Console.WriteLine("{0,-12}{1}", label, sw.Elapsed);
    };
 
    // 配列をばらすケースは再度連結する(ToList)
    bench(() => TestObservableRange().ToList().Subscribe(), "Ob.Range");
    bench(() => TestEnumerableRange().ToList().Subscribe(), "En.Range");
    bench(() => TestSelectMany().ToList().Subscribe(), "SelectMany");
    // 配列をばらして連結せず直接処理する場合
    bench(() => TestSelectMany().Subscribe(), "DirectRx");
    // byte[]をばらさず直接処理する場合
    bench(() => DummyEventsRaised().Subscribe(xs => { foreach (var x in xs);}), "DirectLoop");
 
    // 実行結果
    // Ob.Range    00:00:02.2619670
    // En.Range    00:00:00.2600460
    // SelectMany  00:00:00.2701137
    // DirectRx    00:00:00.0852836
    // DirectLoop  00:00:00.0152816
}

得られる配列をダイレクトに処理するとして、Observable.Rangeで配列のループを回すと論外なほど遅い。のですが、しかし、この場合ですとEnumerable.Rangeで十分なわけで、そうすれば速度は全然変わってきます(もっと言えば、ここではEnumerable.Rangeではなくforeachを使えば更に若干速くなります)。更に、これは配列を平坦化している処理とみなすことができるので、observerを直に触らず、SelectManyを使うこともできますね。そうすれば速度はほとんど変わらず、コードはよりすっきり仕上がります。

と、いうわけで、遅さの原因はObservable.Rangeです。Rangeが遅いということはRepeatやGenerateなども同様に遅いです。遅い理由は、値の一つ一つをISchedulerを通して流しているから。スケジューラ経由であることは大きな柔軟性をもたらしていますが、直にforeachするよりもずっとずっと遅くなる。なので、Enumerableで処理出来る局面ならば、Enumerableを使わなければなりません。これは、使うほうがいい、とかではなくて、圧倒的な速度差となるので、絶対に、Enumerableのほうを使いましょう。

また、一旦配列をバラして、再度連結というのは、無駄極まりなく、大きな速度差にも現れてきます。もし再度連結しないでそのまま利用(ベンチ結果:DirectRx)すれば直接ループを回す(ベンチ結果:DirectLoop)よりも5倍程度の遅さで済んでいます。このぐらいなら許容範囲と言えないでしょうか?とはいえ、それでも、遅さには違いないわけで、避けれるのならば避けたほうがよいでしょう。

ZOETROPEさんの記事にあるように、ここはばらさないほうが良い、というのが結論かなあ、と思います。正しくは上流ではばらさない。一旦バラしたものは復元不可能です。LINQで、パイプラインで処理を接続することが可能という性質を活かすのならば、なるべく後続で自由の効く形で流してあげたほうがいい。アプリケーション側でバラす必要があるなら、それこそSelectMany一発でばらせるのだから。

例えばWebRequestで配列状態のXMLを取ってくるとします。要素は20個あるとしましょう。最初の文字列状態だけを送られてもあまり意味はないので、XElement.Parseして、実際のクラスへのマッピングまではやります。例えばここではPersonにマッピングするとして、長さ1のIObservable<Person[]>です。しかし、それをSelectManyして長さ20のIObservable<Person>にはしないほうがいい。ここでバラしてしまうと長さという情報は消滅してしまうし、一回のリクエスト単位ではなくなるのも不都合が生じやすい。もしアプリケーション的にフラットになっていたほうが都合が良いのなら、それはまたそれで別のメソッドとして切り分けましょう。

成功と失敗の一本化

ZOETROPEさんの記事の素晴らしいのは、通常のルート(DataReceived)と失敗のルート(ErrorReceived)を混ぜあわせているところ!これもまたイベントの合成の一つの形なわけなんですねー。こういう事例はWebClientのDownloadStringAsyncのような、EAP(Eventbased Asynchronous Programming)をTaskCompletionSourceでラップしてTaskに変換する 方法: タスクに EAP パターンをラップする←なんかゴチャゴチャしていますが、TrySetCanceled, TrySetException, TrySetResultで結果を包んでいます、というのと似た話だと見なせます。

WebClientではEventArgsがCancelledやErrorといったステータスを持っているのでずっと単純ですが、SerialPortではエラーは別のイベントでやってくるのですね。というわけで、私もラップしてみました。

public static class SerialPortExtensions
{
    // 面倒くさいけれど単純なFromEventでのイベントのRx化
    public static IObservable<SerialDataReceivedEventArgs> DataReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.DataReceived += h, h => serialPort.DataReceived -= h);
    }
 
    public static IObservable<SerialErrorReceivedEventArgs> ErrorReceivedAsObservable(this SerialPort serialPort)
    {
        return Observable.FromEvent<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>(
            h => (sender, e) => h(e), h => serialPort.ErrorReceived += h, h => serialPort.ErrorReceived -= h);
    }
 
    // DataReceived(プラスbyte[]化)とErrorReceivedを合成する
    public static IObservable<byte[]> ObserveReceiveBytes(this SerialPort serialPort)
    {
        var received = serialPort.DataReceivedAsObservable()
            .TakeWhile(e => e.EventType != SerialData.Eof) // これでOnCompletedを出す
            .Select(e =>
            {
                var buf = new byte[serialPort.BytesToRead];
                serialPort.Read(buf, 0, buf.Length);
                return buf;
            });
 
        var error = serialPort.ErrorReceivedAsObservable()
            .Take(1) // 届いたらすぐに例外だすので長さ1として扱う(どうせthrowするなら関係ないけど一応)
            .Do(x => { throw new Exception(x.EventType.ToString()); });
 
        return received.TakeUntil(error); // receivedが完了した時に同時にerrorをデタッチする必要があるのでMergeではダメ
    }
}

成功例と失敗例を合成して一本のストリーム化。また、DataReceivedはそのままじゃデータすっからかんなので、Selectでbyte[]に変換してあげています。これで、ObserveReceiveBytes拡張メソッドを呼び出すだけで、かなり扱いやすい形になっている、と言えるでしょう。パフォーマンスも、これなら全く問題ありません。

MolesとRx

と、ドヤ顔しながら書いていたのですが、とーぜんセンサーの実物なんて持ってませんので動作確認しようにもできないし。ま、まあ、そういう時はモックとか用意して、ってSerialDataReceivedEventArgsはパブリックなコンストラクタないし、ああもうどうすればー。と、そこで出てくるのがMoles - Isolation framework。以前にRx + MolesによるC#での次世代非同期モックテスト考察という記事で紹介したのですが、めちゃくちゃ強力なモックライブラリです。パブリックなコンストラクタがないとか関係なくダミーのインスタンスを生成可能だし、センサーのイベントだから作り出せないし、なんてこともなく自由にダミーのイベントを発行しまくれます。

[TestClass]
public class SerialPortExtensionsTest : ReactiveTest
{
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnCompleted()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        result.Messages.Is(
            OnNext(10, Unit.Default),
            OnNext(20, Unit.Default),
            OnNext(30, Unit.Default),
            OnNext(40, Unit.Default),
            OnCompleted<Unit>(50));
    }
 
    [TestMethod, HostType("Moles")]
    public void ObserveReceiveBytesOnError()
    {
        // EventArgsを捏造!
        var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
        var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
        // SerialPort::BytesToRead/SerialPort::Readで何もしない
        MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
        MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
 
        var scheduler = new TestScheduler();
 
        // 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
        MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
                OnNext(10, chars),
                OnNext(20, chars),
                OnNext(30, chars),
                OnNext(40, chars),
                OnNext(50, eof))
            .Select(x => (SerialDataReceivedEventArgs)x);
 
        /* ↑までOnCompletedのものと共通 */
 
        // 時間35でErrorのイベントを発行
        MSerialPortExtensions.ErrorReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
            OnNext<SerialErrorReceivedEventArgs>(35, new MSerialErrorReceivedEventArgs()));
 
        // 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
        var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
 
        // Exceptionの等値比較ができないので、バラしてAssertする
        result.Messages.Count.Is(4);
 
        result.Messages[0].Is(OnNext(10, Unit.Default));
        result.Messages[1].Is(OnNext(20, Unit.Default));
        result.Messages[2].Is(OnNext(30, Unit.Default));
 
        result.Messages[3].Value.Kind.Is(NotificationKind.OnError);
        result.Messages[3].Time.Is(35);
    }
}

アサーションに使っているIsメソッドは、いつも通りChaining Assertionです。

Molesがいくら強力だとは言っても、イベントをそのまま乗っ取るのはデリゲートの差し替えなどで、割と面倒だったりします。しかし、FromEventでラップしただけのIObservable<T>を用意しておくと…… それを差し替えるだけで済むので超簡単になります。イベント発行については、TestScheduler(Rx-Testingを参照しておく)で、仮想時間で発行する値を作ってしまうと楽です。こういう、任意の時間で任意の値、というダミーの用意もFromEventでラップしただけのIObservable<T>があると、非常に簡単になります。

あとは、scheduler.Startで走らせると(3つの引数はそれぞれcreated, subscribed, disposedの仮想時間、何も指定しないと…… 実は0始まり「ではない」ことに注意。100,200,1000がデフォなので、0はすっ飛ばされています)、その戻り値で結果を受け取って、Messagesに記録されているので、それにたいしてアサートメソッドをしかける。

実に簡単ですね!Molesの力とRxの力が組み合わさると、イベントのテストが恐ろしく簡単になります。素敵じゃないでしょうか?

まとめ

テストなしで書いてたコードは、Molesでテスト走らせたら間違ってました。TakeWhileの条件が==だったのと、Mergeで結合していたり……。はっはっは、ちゃんとユニットテストは書かないとダメですね!そして、Molesのお陰でちゃんと動作するコードが書けたので恥を欠かなくてすみました、やったね。

Reactive Extensionsとスレッドのlock

ぱられるぱられる。もしパラレルにイベントが飛んできたら、どうする?

public class TestParallel
{
    public event Action<int> Log = _ => { }; // nullチェック面倒ぃので
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            Log(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log += x => list.Add(x);
 
        // 実行
        tes.Raise();
    }
}

これは、十中八九、例外が出ます。list.Addはスレッドセーフじゃないので、まあそうだよね、と。では、Rxを使ってみるとどうなるでしょうか。

var list = new List<int>();
var tes = new TestParallel();
 
// イベント登録して
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Subscribe(list.Add);
 
// 実行
tes.Raise();

やはり変わりません。例外出ます。FromEventを中継しているだけですから……。さて、しかし一々Addの手前でlockするのは面倒だ、と、そこでSynchronizeメソッドが使えます。

Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Synchronize()
    .Subscribe(list.Add);
 
// ようするにこんな感じになってる
 
var gate = new Object();
//....
lock(gate)
{
    OnNext();
}

これで、list.Addを問題なく動作させられます。Listとか適度にデリケートなので適当に注意してあげましょう。

Subjectの場合

さて、上のはイベントでしたが、ではSubjectの場合はどうなるでしょう。

public class TestParallel
{
    Subject<int> logMessenger = new Subject<int>();
    public IObservable<int> Log { get { return logMessenger.AsObservable(); } }
 
    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            logMessenger.OnNext(x);
        });
    }
}
 
class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();
 
        // イベント登録して
        tes.Log.Subscribe(list.Add);
 
        // 実行
        tes.Raise();
    }
}

たまーに例外起こらず処理できることもあるんですが、まあ大体は例外起こるんじゃないかと思います。初期のRxのSubjectは割とガチガチにlockされてたのですが、現在はパフォーマンスが優先されているため挙動が変更され、ゆるゆるです。回避策は同様にSynchronizeを足すことです。

tes.Log.Synchronize().Subscribe(list.Add);

これで問題なし。

余談

手元に残っていた大昔のRxを使って実行してみたら、死ぬほど遅かったり。確実に現在のものはパフォーマンス上がっていますねえ。あと、なんかもう最近面倒でeventだからってEventArgs使わなきゃならないなんて誰が言ったー、とActionばかり使うという手抜きをしてます。だってsenderいらないもん、大抵のばやい。

ReactiveProperty ver 0.3.0.0 - MとVMのバインディングという捉え方

今回の更新よりアイコンが付きました。専用のアイコンがあると、とっても本格的な感じがしますねー。色はRxにあわせて紫-赤紫。デザインは私の好みな幾何学的な感じです。@ocazucoさんに作って頂きました、ありがとうございます!色々ワガママ言ってお手数かけました。

ReactiveProperty - MVVM Extensions for Rx - ver 0.3.0.0

Rxとは何か、というとIObservable<T>と「見なせる」ものを合成するためのライブラリです。だから、見なせるものさえ見つかれば、活躍の幅は広がっていく。ReactivePropertyは色々なものを、そのように「見なして」いくことで、RxでOrchestrateできる幅をドラスティックに広げます。土台にさえ乗せてしまえば、あとはRxにお任せ。その場合に大切なのは、土台に乗せられるよう、閉じないことです。しかし、もし閉じているのなら、開くための鍵を提供します。

デフォルトモード変更

ReactivePropertyのデフォルトモードが DistinctUntilChanged|RaiseLatestValueOnSubscribe になりました。今まではRaise…が入ってなかったのですが、思うところあって変わりました。例えばCombineLatestは、全てが一度は発火していないと動き出しません。ReactiveCommandの条件に使うなどの場合にRaiseしてくれないと不都合極まりなく、かつ、Subscribeと同時にRaiseすることによる不都合なシーンは逆に少ない。ことを考えると、必然的にデフォルトをどちらに振るべきかは、分かりきった話でした。

そのことは0.1の時、サンプル作りながら思ってたんですが悩んだ末に、省いちゃったんですねえ。RaiseLatestValueOnSubscribeが入ると不便なシーンもある(initialValueを設定しないとまず最初にnullが飛んでいくとか)ので、どちらを取るかは悩ましいところではあるんですが、シチュエーションに応じて最適なほうを選んでください、としか言いようがないところです。

ToReactivePropertyAsSynchronized

長い。メソッド名が。

これは何かというとINotifyPropertyChanged->ReactiveProperty変換です。今までもObservePropertyメソッド経由で変換できましたが、それは一度IObservable<T>に変換するため、Model→ReactivePropertyという一方向のPushでしかありませんでした。Two-wayでのバインドで値の同期を取りたい場合は、今回から搭載されたToReactivePropertyAsSynchronizedを使ってください。

// こんな通知付きモデルがあるとして
public class ObservableObject : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set
        {
            name = value;
            PropertyChanged(this, new PropertyChangedEventArgs("Name"));
        }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}
 
// それを使ったViewModelを作るなら
public class TwoWayViewModel
{
    public ReactiveProperty<string> OneWay { get; private set; }
    public ReactiveProperty<string> TwoWay { get; private set; }
 
    public TwoWayViewModel()
    {
        var inpc = new ObservableObject { Name = "ヤマダ" };
 
        // ObservePropertyを使うとIObservable<T>に変換できます
        // ラムダ式でプロパティを指定するので、完全にタイプセーフです
        // それをToReactivePropertyすればOneWayで同期したReactivePropertyになります
        OneWay = inpc.ObserveProperty(x => x.Name).ToReactiveProperty();
 
        // ToReactivePropertyAsSynchronizedで双方向に同期することができます
        TwoWay = inpc.ToReactivePropertyAsSynchronized(x => x.Name);
    }
}

INotifyProeprtyChangedなModelをReactivePropertyなViewModelに持っていきたい時などに、使いやすいのではと思います。また、同期する型が異なっていても対応することができます。コンバーターのようにconvertとconvertBackを指定してください。

ReactiveProperty.FromObject

こちらもToReactivePropertyの亜種ですが、ReactiveProperty→Modelというソース方向への片方向の同期を取ります。ModelはINotifyPropertyChangedである必要はありません。

// こんなただのクラスがあったとして
public class PlainObject
{
    public string Name { get; set; }
}
 
// それと同期させたいとき
public class OneWayToSourceViewModel
{
    public ReactiveProperty<string> OneWayToSource { get; private set; }
 
    public OneWayToSourceViewModel()
    {
        var poco = new PlainObject { Name = "ヤマダ" };
 
        // ReactiveProperty.FromObjectで変換することができます
        // この場合、ReactiveProperty -> Objectの方向のみ値が流れます
        OneWayToSource = ReactiveProperty.FromObject(poco, x => x.Name);
    }
}

片方向の同期が定型的な局面、例えば設定クラスなんかは通知は必要ないと思うのですが、それをUIから一方向で値を投影したい場合に、これを使うことで楽になると思います。

また、Sampleにこれら3つの解説を追加しましたので、実際にどう反映されるのか、動きを確認したい場合はそちらを見てください。

CombineLatestValuesAreAllTrue

長い。メソッド名が。これはReactive Extensionsお題 - かずきのBlog@Hatenaに書かれているもので、使うシーンよくありそうな頻出パターンになりそうだと思ったので、お借りすることにしました。ありがとうございます。使い方を見てもらったほうが速いので、まず例を。

Microsoft Silverlight を入手

<StackPanel>
    <StackPanel Orientation="Horizontal">
        <CheckBox IsChecked="{Binding IsCheckedA.Value, Mode=TwoWay}">Check A</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedB.Value, Mode=TwoWay}">Check B</CheckBox>
        <CheckBox IsChecked="{Binding IsCheckedC.Value, Mode=TwoWay}">Check C</CheckBox>
    </StackPanel>
    <Button Command="{Binding ExecCommand}">全部チェックで押せる</Button>
</StackPanel>
// using Codeplex.Reactive.Extensions; (これを忘れないように)
 
public class MainPageViewModel
{
    public ReactiveProperty<bool> IsCheckedA { get; private set; }
    public ReactiveProperty<bool> IsCheckedB { get; private set; }
    public ReactiveProperty<bool> IsCheckedC { get; private set; }
    public ReactiveCommand ExecCommand { get; private set; }
 
    public MainPageViewModel()
    {
        IsCheckedA = new ReactiveProperty<bool>();
        IsCheckedB = new ReactiveProperty<bool>();
        IsCheckedC = new ReactiveProperty<bool>();
 
        ExecCommand = new[] { IsCheckedA, IsCheckedB, IsCheckedC }
            .CombineLatestValuesAreAllTrue()
            .ToReactiveCommand();
 
        ExecCommand.Subscribe(_ => MessageBox.Show("しんぷる!"));
    }
}

3つのチェックボックスが全てONなら実行可能なコマンドを作る、です。こんな風に、全てがtrueの時、といった集約をしたい場合に便利に使うことができます。プレゼンテーションロジック、に該当する部分だと思いますが、ここでもRxは十分以上に活躍できます。また、外部からCanExecuteChangedをぶっ叩くようなカオティックなこともしません、ReactiveCommandならね。

ReactiveTimer

Timerです。.NETはTimerが山のようにあります。Threading.Timer, Timers.Timer, Forms.Timer, DispatcherTimer, Observable.Timer。ここにまたReactiveTimerという新たなるTimerが誕生し、人類を混乱の淵に陥れようとしていた……。まさにカオス。

ちょっと整理しましょう。まず、Threading.Timerは一番ネイティブなTimerと捉えられます。そのままだと少しつかいづらいので、軽くラップしてイベントベースにしたのがTimers.Timer。Forms.TimerとDispatcherTimerは、それぞれのアプリケーション基盤で時間を計って伝達してくれるというもの、UI系でのInvokeが不要になるので便利。と、それなりに役割の違いはあります。微妙な差ですが。

最後のObservable.TimerはIObservableで通達してくれるのでRxと非常に相性が良いタイマー。また、タイマーを行う場所もISchedulerで任意に指定できるので、ThreadPoolでもDispatcherでもCurrentThread(この場合はSleepで止まるので固まりますけどね)でも、もしくは仮想スケジューラ(任意に時間を動かせるのでテストが簡単になる)でも良いという柔軟さが素敵で、Rx以降のプログラミングではタイマーなんてObseravble.Timer一択だろ常識的に考えて。という勢い。(精度は若干落ちるので、よほど精度を求める時はThreading.Timerを使いましょう)。だと思っていた時もありました。

一時停止出来ないんですよ、Observable.Timer。発動したらしっぱなし。Stopはできる(Disposeする)けど、そうしたら再開は出来ない。それじゃあ困る場合があります!はい。結構あります。そういう場合はTimers.TimerをFromEventでラップする。それはそれで良いのですが、Observable.TimerのISchedulerを指定可能という柔軟さを捨てるのは勿体無いなあ、と思ったのでした。

そこで、今回ReactiveTimerを作りました。機能は、Observable.TimerのStop/Start出来る版です。

[TestClass]
public class ReactiveTimerTest : ReactiveTest
{
    [TestMethod]
    public void TimerTest()
    {
        // テスト用の自由に時間を動かせるスケジューラ
        var testScheduler = new TestScheduler();
        var recorder = testScheduler.CreateObserver<long>();
 
        // 作成時点では動き出さない
        var timer = new ReactiveTimer(TimeSpan.FromSeconds(1), testScheduler);
        timer.Subscribe(recorder); // Subscribeしても動き出さない
 
        timer.Start(TimeSpan.FromSeconds(3)); // ここで開始。初期値を与えるとその時間後にスタート
 
        // 時間を絶対時間10秒のポイントまで進める(AdvanceTo)
        testScheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
 
        // MessagesにSubscribeに届いた時間と値が記録されているので、Assertする
        recorder.Messages.Is(
            OnNext(TimeSpan.FromSeconds(3).Ticks, 0L),
            OnNext(TimeSpan.FromSeconds(4).Ticks, 1L),
            OnNext(TimeSpan.FromSeconds(5).Ticks, 2L));
 
        timer.Stop(); // timerを止める
        recorder.Messages.Clear(); // 記録をクリア
 
        // 時間を現在時間から5秒だけ進める(AdvanceBy)
        testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
 
        // timerは止まっているので値は届いてないことが確認できる
        recorder.Messages.Count.Is(0);
    }
}

そう、単体テストしたい場合は、TestSchedulerに差し替えれば、AdvancedBy/Toによって、時間を自由に進めることが可能になります。Assertに使っているIs拡張メソッドはChaining Assertionです。Testing周りの詳しい解説はRx-Testingの使い方 - ZOETROPEの日記に書かれています。

CountNotifier/BooleanNotifier

SignalNotifierという名前はよく分からないので、今回よりCountNotifierに変更しました。また、名前空間をNotifiersに変更しました。更に、二値での通知を行うBooleanNotifierを新規追加しました。どちらも、IObservable経由での通知を行うフラグです。

// using Codeplex.Reactive.Notifiers;
 
// 通知可能(IObservable)なboolean flag
var boolFlag = new BooleanNotifier(initialValue: false);
boolFlag.Subscribe(b => Console.WriteLine(b));
 
boolFlag.TurnOn(); // trueにする, trueの状態だったら何もしない
boolFlag.Value = false; // .Valueで変更、既にfalseの状態でも通知する
boolFlag.SwitchValue(); // 値を反転させる
 
// 通知可能(IObservable)なcount flag
var countFlag = new CountNotifier();
countFlag.Subscribe(x => Console.WriteLine(x));
 
countFlag.Increment(); // incしたり
countFlag.Decrement(); // decしたりの状態が通知される
 
// Empty(0になった状態)という判定でフィルタして状態監視したりできる
countFlag.Where(x => x == CountChangedStatus.Empty);

例えば非同期処理を行う際などの、状態の管理に使うことができます。

Pairwise

neue cc - Reactive Extensionsで前後の値を利用するで書いた、前後の値をまとめる拡張メソッドです。

// { Old = 1, New = 2 }
// { Old = 2, New = 3 }
// { Old = 3, New = 4 }
// { Old = 4, New = 5 }
Observable.Range(1, 5)
    .Pairwise()
    .Subscribe(Console.WriteLine);

古い値と新しい値を使って何かしたい場合などにどうぞ。

CatchIgnore

例外処理用に、OnErrorRetryというものを用意していましたが、今回それ以外にCatchIgnoreを追加しました。

// 1, 2
Observable.Range(1, 5)
    .Do(x => { if (x == 3) throw new Exception(); })
    .CatchIgnore()
    .Subscribe(Console.WriteLine);

ようするに、CatchしてEmptyを返す手間を省くためのものです。onErrorにe => {}と書くのと似てますが、シーケンスの途中で捕まえれるので、メソッドチェーンの繋ぎ方によっては全然異なる役割を持つ可能性があります。

その他の削除やバグ修正や見送ったものなど

RxのExperimental版が更新されてたので、それに合わせました。Rxの更新内容はZipとCombineLatestに大量のオーバーロード+配列を受け入れるようになったので、何でも結合できるようになりました。それにともないReactivePropertyでは独自拡張としてCombineLatestのオーバーロードを用意していたのですが、Experimental版のみ削除しました。パフォーマンスもExperimentalのもののほうがずっと良いので、早くStableにも降りてきて欲しいです。

WebRequestのUploadValuesで、値が&で連結されていないという致命的なバグがあったので修正しました。本当にすみません……。また、Silverlightでデザイン画面がプレビューできなくなる不具合を修正しました。デザインモード怖い。

バリデーション周りは、ちょっと大きめに(といっても内部だけの話で外部的には変わらない予定)変更入れようと思ってたのですが、それは次回で。あと、同期系メソッドもバリデーションの成否によって同期するかしないかを決定しようかなあ、とか思うんですが、ちょっと大変なので後になりそう。

まとめ

今回はデータリンクを主眼に置きました。デフォルトモードの変更もその一環です。直接的に意味を見るのなら、厚めのMをスマートにVMとシンクロナイズさせる、ということになります。冒頭の台詞、閉じた世界を開けるための道具です。ObserveProperty(OneWay)、ToReactivePropertyAsSynchronized(TwoWay)、ReactiveProperty.FromObject(OneWayToSource)。

OneWayとかTwoWayとかOneWayToSourceというとおり、VMとMの間のバインディングエンジンだと見ることができます。VとVMの間をWPFなりのフレームワークが担い吸収するように、ReactivePropertyはVMとMの間を吸収します。手書きでバインディングだと、ボイラープレートでは手間だし見通しも悪くなる。このほうが、ずっと、楽だし自然に書けます。

ReactivePropertyはV-VM間の接続も担うため、結果として全てがV-VM-M-VM-Vとして一つに繋がる。何をどう組もうと自然に一つに繋がっていく。わくわくしませんか?むしろカオスの予感がする?けれど、カオスの先に本当の光がある、……かもしれない。

ちなみに同期系のものはみんなプロパティ指定だけでGetとかSetとか自動でやっていますが、動的コード生成(&キャッシュ)によりハイパー高速化されているので、パフォーマンス上の問題はありません。そこは安心してください。というと何か凄そうなことやってる気がしますが、勿論そんなことはなくて、偉大なるExpressionTreeに全面的にお任せしているだけだったり。

Reactive Extensionsで前後の値を利用する

@Toya256tweetさんの作成されたDependency Variable Libを見て、ReactivePropertyでも大体再現できるかなあ、でもOldValueとNewValueのところが少し面倒なのよね、というところで一例。ReactivePropertyの値の変更時に、古い値と新しい値を同時に得られるようにします。

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Zip(p.Skip(1), (Old, New) => new { Old, New })
    .Subscribe(a => Console.WriteLine(a.Old + " -> " + a.New));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

挙動は@okazukiさんの解説されている通りです。残念ながら、頭やわらかい、というわけではなくて頻出パターンのイディオムなだけなので、ただたんに覚えているから、というだけです、がくり。まあ、LINQにせよRxにせよ、メソッドの組み合わせで成り立っているということは、パターン化しやすいということなのですね。イディオムを知っていればいるほど、更にそのイディオムを組み合わせて、と、手法は無限に広がっていきます。

私は非同期をvoidにしてモデル作り込むっての好きくないです。IObservableなりTaskなりを返してくれれば、先があるのですが、そうでないとやりようがないですから。例えばデータモデル考え中 - 急がば回れ、選ぶなら近道で示される「2」のパターンが、Silverlightなどでの従来のやり方だったと思われます。実行のトリガーだけを外から渡して、モデルの中で結果は閉じる。変更はINotifyPropertyChanged経由で通知。正直言って、私はこのやり方はナシだと思っています。スパゲティになりがちだから。Rxは「3」のパターンに近いと思います。順序の制御は、まさにミドルウェア足るReactive Extensionsが保証する。柔軟性は見ての通りで、無限の広がりがあります。

今まではコールバックしかなかったので必然的に2に収まらざるを得なかったですが、今はRxもあるし、C#5.0からはawaitもあるし、なので、モデルの作り方も「変わっていく」と思います。Viewの機能の強さによってViewModelのありようが変わるように、言語やフレームワークの機能の強さによってModelのありようが変わるのは当然でしょう。

ScanとPairwise

さて、自分自身と結合というのは、結局のところ二つ購読しているということなので、これはIObservableがHotでないと成り立ちません(ReactivePropertyはHotです)。というわけで、ColdなIObservableでも対応したい時はScanを使うといいでしょう。HotとかColdとか何言ってるのか分からないという場合はReactive Extensions再入門 その5「HotとCold」 - かずきのBlog@Hatenaを読むといいでしょう。最近、自分で解説してるのを放棄しだしてる気がするよくない傾向、ではなくて、次回のReactive Extensions(Rx)入門 - @ITではまさにObservable.TimerとフツーのTimerを使ってColdとHotの解説しようと思ってたのですよ!ネタ被った、けれど気にせず書きます:)

var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
var oldNewPair = p.Scan(Tuple.Create(0, 0), (t, x) => Tuple.Create(t.Item2, x)).Skip(1);
 
oldNewPair.Subscribe(Console.WriteLine);
 
p.Value = 10; // (1, 10)
p.Value = 100; // (10, 100)

Scanは自分自身の前の値を参照できるので、色々と応用が効きます。値の入れ物のための初期値は不要なのでSkip(1)で除去してやるのがポイント。

もう一つ、メソッドの組み合わせでのパターン化、というのは、つまりパーツ化しやすいということでもあります。拡張メソッドに分離してやりましょう。

public static class ObservablePairwiseExtensions
{
    // OldNewPair<T>はReactivePropertyに入っています
    // using Codeplex.Reactive.Extensions;
 
    public static IObservable<OldNewPair<T>> Pairwise<T>(this IObservable<T> source)
    {
        return source.Scan(
                new OldNewPair<T>(default(T), default(T)),
                (pair, newValue) => new OldNewPair<T>(pair.NewItem, newValue))
            .Skip(1);
    }
 
    public static IObservable<TR> Pairwise<T, TR>(this IObservable<T> source, Func<T, T, TR> selector)
    {
        return source.Pairwise().Select(x => selector(x.OldItem, x.NewItem));
    }
}
 
// ↑というような拡張メソッドを作ってやったとして
var p = new ReactiveProperty<int>(1, mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe);
 
p.Pairwise().Subscribe(x => Console.WriteLine(x.OldItem + " -> " + x.NewItem));
 
p.Value = 10; // 1 -> 10
p.Value = 100; // 10 -> 100

OldNewPairを使ったのは、TupleがSL/WP7にないから、というのと、OldItemとNewItemというプロパティ名に意味があって、分かりやすいから、です。基本的にC#でTupleを使うことはあんまないですね。LINQのパイプライン内でならば匿名型、それを超えるなら面倒くさくてもクラスを立ててあげたほうがいいと、私は思っています。勿論、今後Tupleのための構文やパターンマッチが入るとしたら別ですけど。というか、つまるところ専用構文がない状態ではTupleを使うメリットはそんなにないのです。匿名型かわいいよ匿名型。言語比較の際に、C#はTupleがこんな腐ってるぜー、とかやられるのはちょっと勘弁願いたいところ(まぁでも普通に敵いませんのは認めます、けれど言語・IDE・フレームワークは三位一体だとも思っています。引き離して単独で評価することには、あまり価値を感じません。IDEでうまく機能することを優先した言語、それを前提にしたフレームワーク。どの要素も引き離せませんから。はいはい、C#がお好きなんですね、という感じですが、でも例えばHTML/ブラウザというGUIフレームワークの上だったらJavaScriptがベストだ、といった捉え方でもありますね)

それはともかくとして、Pairwiseは多用しそうなので、次のReactiveProperty(ver.0.3)で入れたいと思います(あとOldNewPairのToStringのオーバーライド)。ちなみにlinq.js - LINQ for JavaScriptにはPairwise、入ってます。そう、Rxでの頻出パターンということは、それはIx(Enumerable)にも存在するパターンなのです。この辺がRxの面白いところです!私にとって、こういった書き方の初出は前後の値も利用したシーケンス処理 - NyaRuRuの日記でした。

ObserveChanged

突然出てきたOldNewPairですが、これが既にReactiveProperty内で定義されているのは、ObservableCollectionの拡張メソッド群で使用しているからです。今まで紹介していなかったと思うので、ここで紹介しましょう。

// using Codeplex.Reactive.Extensionsとすると
// ObservableCollection<T>に(ReactiveColelctionとか継承したものでも可)
// ObserveXxxChangedという拡張メソッドが利用できる
var collection = new ObservableCollection<int>();
 
// 追加されたのを監視できる、IObservable<T>
collection.ObserveAddChanged()
    .Subscribe(x => Console.WriteLine("Add:" + x));
// 削除されたのを監視できる、IObservable<T>
collection.ObserveRemoveChanged()
    .Subscribe(x => Console.WriteLine("Remove:" + x));
// 置換を監視できる、IObservable<OldNewPair<T>>
collection.ObserveReplaceChanged()
    .Subscribe(p => Console.WriteLine(p.OldItem + "→" + p.NewItem));
// リセットを監視できる、IObservable<Unit>
collection.ObserveResetChanged()
    .Subscribe(_ => Console.WriteLine("Clear"));
 
collection.Add(100); // Add:100
collection.Add(1000); // Add:1000
collection[1] = 300; // 1000→300
collection.Remove(100); // Remove:100
collection.Clear(); // Clear

この手の監視では、通常CollectionChangedイベント経由でNotifyCollectionChangedEventArgsを使って値を取り出すわけですが、型がobject[]なので一々キャストしたりなど、非常に使いにくいと思っていました。ObserveXxxChangedを使えば、完全にタイプセーフで、値も取り出しやすい形に整形してくれています。是非是非どうぞ。

まとめ

@Toya256tweetさんにも示唆頂いたのですが、ReactivePropertyはMVVMに限定されない、汎用的なものだと考えています。値の導出ルールを宣言的に書く、というのは色々なところで使える、気がします。でもやはり、Functional Reactive Programmingが全然流行ってないことを考えても、ルールによって自動的に変動する値って、基本的にGUI向けなのだろうなあ、って。そして、GUIで強いのはやっぱJavaとか.NETといったFRP不毛地帯なので、流行るなんて考えられないことでした。しかし、今は違う。C#にはRxが来た。C#で実現できるのならば、強力なGUIプラットフォームが目の前にあるわけなので、かなり可能性はあるんじゃないかな!と思いたいところです。

d.y.d. - ReaJ / Reactive JavaScriptの例は

// RaiseLatestValueOnSubscribeはv0.3ではデフォルトに変更する予定
var mode = ReactivePropertyMode.RaiseLatestValueOnSubscribe;
 
var x = new ReactiveProperty<int>(10, mode);
var y = x.Select(n => n + 100).ToReactiveProperty(mode: mode);
x.Value = 20;
x.Value = 30;
Console.WriteLine(y.Value); // 130

まあ、不格好です。ReactiveProperty用の専用構文でも用意してくれないとね、rp x = 10; rp y = x + 100; とかで上記の形に整形されたら素敵なのですが。というのはともかくとして、一応、実現できています。GUI環境への反映はWPFのバインディング機構に投げて解決ですし。JavaScriptにおいても、ReactivePropertyを移植して、ベースとしてKnockout.js辺りを採用すればいい感じに実用的になりそうです。その辺は追々やっていきたいところ。

勿論、Rx自体の可能性はGUI(や非同期)だけに閉じているわけではないので、全く別なところでの可能性、使い道というのも追い求めていきたいです。

ともあれともかく、ReactiveProperty、試してみてくださいな。

Rx連載開始とRx本感想とZenbook買ったという話

まーたブログを放置気味な昨今は大変よろしくなく、だらだらTwitterを眺めているだけで一日が終わる症にかかっています。さて、そんなわけですが、@ITにてRx入門の連載を開始しました。

導入なので細かいことは言わず、なんか凄そう!と思ってもらえればいいなー、という構成にしました。用語もそんな並べず、でも、ところどころ引っかかるワードがある、言い方を悪くするとハッタリ気味に、印象に残ってくれればいいなあ、と。初回とはいえ、導入だけであっさり終わってしまったのはちょっと反省。どんな形になるのかイマイチ掴めなくて、もう少し書けばよかったな、と思ってます。あと、図をもう少し入れるべきだったな、と……。そんなこんなな反省を生かし、次回はボリューム増でお送りします。

@okazukiさんがReactive Extensiions再入門を始めたり、@zoetroさんがRx-Testingについて詳しい記事を書かれていたり(これは素晴らしい!)、Rxも盛り上がってきた感じがしますね!それは気のせいです。というだけで終わらせないよう、ガンガン行きましょうー。

Rx本

(やっと)発売されました。まず印象ですが、薄いです。私は電子書籍で買っちゃってるのでリアルな厚さは分からないんですが、180ページです。それでこの値段かよ、という不満を最初は持ってしまうかもしれません。あと、LINQ and Rxです。どういうことかというと中身の半分はフツーのLINQの話です。それを差っ引くとRxは90ページしかありません。更にRxJSやReactiveUIの話もあります。そこを差っ引くと50ページぐらいしかないじゃないかゴルァ。というわけで、Rx本として考えると、分量には不満が残ると思われます。全てのメソッドをカバーする、という内容でもないのでリファレンスとしても使えません。

とはいえ、要素要素は満遍なくカバーできているのと、現在唯一のRx本ではあるので、本で学びたいなあ、と思うならこれしか選択肢はありません。WindowやJoin、Testingなんかは(私の怠慢により)このブログでは少しも紹介していないので、それらを知りたい方や、 Web上から断片的な情報を拾って組み上げるのは手間なので、購入するのは十分アリだとは思います。まあ、私の@IT連載が完了したら、第一の選択肢はそれを見ること、になります(キリッ。となれるように、頑張ります。

ASUS Zenbook UX31

どうでもいいんですが、UX31を買いました。Intelの推奨するUltrabookの第一弾の中では大本命の一品です。さて、Ultrabookとは何か、というと、ようするところWindows版Macbook Airです。薄く軽く速い。宗教上の理由で林檎はお断りだ!な人にとっては救いの手なわけです。内心羨ましいとか思ってたりしたんだからね!しかしですよ、Win系の勉強会ではそうでもないですが、それ以外の勉強会でのMac率の高さといったら!会場の9割がMacだよ、とかドヤ顔でツイートされた日には!多様性は善、はどこに行ったんだよという話です。

側面は、実用的な意味ではフルフラットのほうが良いのでしょうし、特に日本メーカーはそこに拘っている印象があるのですが、審美的にはこうした処理をしたほうがいいですね、視覚上のトリックとはいえ、圧倒的に薄く見えるので。ちなみに側面から見ると本当にMacbook Airソックリでパク……と口から出てしまうのも已むを得ないかな、とは思いますが、それ以外の部分はそんなに似てるわけでもないですよ。

ZenbookはSSDがSATA 3.0(6Gbps)ということもあって、滅茶苦茶速いですね。今後はこの速度がスタンダードになっていくのかと思うと、いい時代です、ほんと。

その他の印象ですが、キーボードはまぁまぁ、タッチパッドはサイテー。タッチパッドはキー入力中の誤動作率の高さ(位置とか大きさが悪いのだろうなあ)も酷くてストレスフル。基本はマウス使いますけれど、いつもマウス持ち歩くのもねえ。ああ、あと、UX31はUltrabookの中で唯一解像度が1600×900と高い(他は1366×768)のがポイントです。フルHDじゃないのかよ!とVAIO Zのオーダーメイドな人が言ってくるかもしれませんが、まぁVAIO Zは店頭モデルはともかくフルHDでオーダーすると高いですからね。こっちは10万円なのでコストパフォーマンスが違うわけです、はい。あと、13インチで1600×900は程良いですよ。フルHDだとちょっと文字が細かすぎになる感も。

総じて満足度は高くお薦めなので、是非買ってください(上のリンク先から!)

悲しいことに私はいきなりACアダプタのコネクタを破壊してしまって充電不能に陥りました、オゥノゥ。地震で物が降ってきてですね。脆いものです。

追記:ASUSのサポートセンターに連絡し、交換してもらいました。非常に対応もよかったので、全然問題ないです。ネガティブな方向でURLがばら蒔かれてしまって想定外だったのですが、全然大丈夫ですよ、とは書いておきます。

ReactiveProperty ver.0.2.0.0

ver.0.2!ご意見ご感想は随時募集中で、コメントなりTwitterで私に@を投げてくれるなり、ただたんにTwitterでReactivePropertyと含めてつぶやいてくれるなり(検索経由で拾えるので)、ブログで記事を書いてくださるついでにクエスチョンしてみたりなどなど、ちょっとした疑問でも要望でも、何でもどうぞ。特に、細かな使用感の向上というのはリクエストがあってこそですので!斜め上からやってきた結果として世界最先端(但し逆向き)を体感出来るのは今だけです!斜め上なのでReactivePropertyのうまい使い方は今のところ誰にも分かりません、私もわかりません(えー)。というわけで、みんなで模索できたらいいな、と思います。

国内はもとよりReactiveUIの作者からも言及頂いて結構褒めてもらったりなどなど、RxのForumで宣伝したかいがあったね!というわけで、私自身かなり真剣に取り組んでますので、付き合って頂ければ幸いです。 /* 現在ReactiveOAuthをほっぽりだしてるという信頼感のなさがアレなので、そちらも早めに何とかします…… */

今回は、0.1では中途半端な存在だったReactiveCollectionを徹底的に考察して再デザインしました。他に細かい追加が幾つか。まずは小さな追加から。

追加したり変わったりしたもの

ObserverPropertyが、最初のSubscribe時に値をPushするようになりました(引数でfalseを指定するとオフにも出来る、そうすると、普通にFromEventしたのと同じ)

public class ToaranaiViewModel
{
    ToaruModel model;
    public ReactiveProperty<string> Name { get; private set; }
 
    public ToaranaiViewModel()
    {
        // こんなINotifyPropertyChangedなModelがあるとして
        model = new ToaruModel { Name = "Anders" };
 
        // 初期値として現在値(この場合"Anders")を持つ
        Name = model.ObserveProperty(x => x.Name).ToReactiveProperty();
    }
}
 
public class ToaruModel : INotifyPropertyChanged
{
    private string name;
    public string Name
    {
        get { return name; }
        set { name = value; PropertyChanged(this, new PropertyChangedEventArgs("Name")); }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}

これにより、既存のModelからToObservablePropertyしてViewModelにする際などに、デフォルトで値が同期されるので多くのシチュエーションで、より便利になったと思います。という提案を@okazukiさんにリクエスト貰ったので実装しました:) @okazukiさんはReactivePropertyを使ってみた感想 イケテル!気持ちいい!ハードルは高い? - かずきのBlog@Hatenaという記事も書いてくれました、わーい。

ObserverProeprtyはINotifyPropertyChangedへの拡張メソッドです。また、今回よりINotifyPropertyChangingにObservePropertyChanging拡張メソッドを追加しました。ObserverProeprtyと同様な感覚で使えます。

それとReactiveCommand(無印)のExecuteが引数なしでnullをぶん投げるようになりました。なお、これがあるのは無印のほうのみで<T>のほうにはありません。だって、ジェネリックするということはパラメータが欲しい前提ですものね。ジェネリックのほうはExecute(T parmeter)を受け入れるうようにオーバーロードを隠蔽。こういう細かいところの使いやすさの向上ってのは随時取り組みたいところです。

また、ReactiveCommand(無印・ジェネリック共に)をDisposeすると、SubscribeしてたものにOnCompletedを投げるように変更しました。なお、ReactiveCommandをDisposeすると、CanExecuteもfalseになります。永久的にfalseにする、という意味合いで使えるかと思いますが、使うシチュエーションは分かりません。

ReactiveCollectionの再デザイン

ReactiveCollectionに大きめの変更を入れました。今まで通知をIScheduler上で行なっていましたが、これを廃止しました。かわりにToReactiveCollectionなどIObservableからの変換時は、Addと通知、両方をIScheduler上にしました。また、IScheduler上で各種操作(Add, Clear, Remove)を行うメソッド AddOnScheduler などを追加しました。この変更のデザイン上のポリシーは以下になります。

ObservableCollectionとスレッドセーフ・ディスパッチャーセーフというのは非常に難しい。まず、ObservableColectionは変更と通知がワンセットだと考えられる。コレクションが変更され通知を出し、通知され側(主にUI)がコレクションを読みに来る。これは全部ひとまとまりでなければならない。通知され側がコレクションを読みに行く際に、ズレがあってはならない。よって、通知をUIスレッドで行うなら、変更もUIスレッドで行われる必要がある。

しかし、全ての操作を内部で片っ端からDispatcherにBeginInvokeするアプローチを取ると、それはそれで都合が悪い。例えば別スレッドでAddしたりRemoveしたりClearしても、そのコード上では変更はすぐには反映されない。ClearしてもCountは変わらない。AddしてもCountは変わらない。そんな気味の悪いコレクションクラスは使えません。WPFではDispatcher.Invokeがあるので、変更と通知を強制的にUIスレッド上で行う、ということが可能でしたが、SilverlightにはBeginInvokeしかないので、操作をUIスレッドで行うことを保証するコレクションクラスの作成は不可能。(Caliburn MicroのBindableCollectionは全部UIスレッド上で行うようにしているみたいですね、まあBindableにのみ焦点を当てるなら現実的なので、それはそれでいいと思います)

だから、コレクションを触る時は利用側がDispatcher.BeginInvokeして、明示的にDispatcherの中へ入ろう。というのが、整合性が取れて一番良いのだと思います。今まで、ReactiveCollectionは通知だけIScheduler上で行うようになっていました。でも、これはあまり良いデザインではない、操作と通知は同一スレッド上で行うべきなのだから、これでは乖離する可能性がある。単純なAddだけのようなケースでは問題になることは少ないし、利便性としては、その方が簡単にバインドで出来て良いよね、ではあるのだけど、決して良いデザインではない。いずれ発覚する破綻への気づきを遅らせているという点で、むしろ限りなく悪い。

よって、内部で片っ端からDispatcherにBeginInvokeする代わりに、AddOnSchedulerなど、(ReactiveCollection生成時に指定した/デフォルトはUIDispatcher)スケジューラ上で操作を行うと利用側が明示するアプローチを取ってみました。Rxには使い勝手の良いISchedulerが存在する。だからこそアリなやり方かな、と思います。この辺はまだまだ考えどころだと思いますので、ご意見ありましたらお願いします。

<Grid>
    <ListBox ItemsSource="{Binding TimeItems}" />
</Grid>
public class ToaruViewModel
{
    public ReactiveCollection<string> TimeItems { get; private set; }
 
    public ToaruViewModel()
    {
        // 1秒毎に現在時刻表示が追加されるコレクション
        TimeItems = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .ToReactiveCollection();
 
        // 5秒間隔で上記コレクションをクリアする
        Observable.Interval(TimeSpan.FromSeconds(5))
            .Subscribe(_ => TimeItems.ClearOnScheduler());
    }
}

今回考えるにあたっては青柳 臣一 ブログ(技術系): [.NET] スレッドセーフな ObservableCollection<T> が欲しいをとっても参考にさせて頂きました。

ReactiveProperty, ReactiveCommandは確固たる意思のもとに作ったんですが、ReactiveCollectionは非常に中途半端でした。が、今回ようやく理念が立てれたのではかと思います。なお、ObservableCollection/ReactiveCollectionにはObserveAddChangedなど、変更通知をIObservableで受け取ることのできる拡張メソッドを足してあるので、そちらも便利に使うことが可能です(素のNotifyCollectionChangedEventArgsはIList(ジェネリックじゃない!)であったりして非常に触りにくいので、その辺をきっちり整理してあります)。

とか言ってますが、コードにしたらたかが十数行なのですよね。それを決めるのに、ここ一週間ずっと考えてました。つまり私の生産性は一日一行です(キリッ

まだ追加してないもの

Validation周りをValidationSummaryやDescriptionViewerに対応させる。とか、OnErrorRetryが値を返せるようにする。などは次に載せるつもりです。これらを加えてから、と思ったんですがReactiveCollectionの変更が大きいので、先に出したくて見送りました。

ReactiveProperty-Experimental

今回からExperimental版のRxにも対応しました。NuGetではReactiveProperty-Experimentalです。しかし、 .NET 4.5やWinRTへの対応は、まだしていません。せっかくRx(Experimental)がWinRT対応したので、それに合わせたいと思ったのですが断念。理由としてはVS11がCode Contractsに対応していないから、です。Code Contractsのバイナリリライトかけないと動かないので、どうにもなりません……。それがなければ今すぐにでも対応させたいのですけれどねえ。こういう時に機敏に動けないのはとても悲しいので、次回からはCode Contractsの採用は見送りたいと思ってしまいます……。

ところで、それを意識してではありますが、.NET 4.0版のDLL名をReactiveProperty.NET40.dllに変えました。複数プラットフォームに対応する場合、全てのDLLを同じ名前にする(JSON.NETなどはそうですね)か、全てのDLLにプラットフォームの識別子をつける(MVVMLightなどはそうです)か。前者のほうがスマートではあるのですが、分かりやすさを考え、後者を選びました。Stable版とExperimental版の区別もありますし、DLL名から判定出来たほうがいいかな、と。

今後

okazukiさんの記事にもあるように「MVVMライブラリにも精通しつつReactive Extensionsのことも知っててReactivePropertyの概要を把握してないといけない上に必要に応じてMVVMライブラリとReactivePropertyを繋ぐような機能を作りこまないといけない」きゃー、難しそう!でも事実だ!

私としてはReactivePropertyを通してReactive Extensionsを学習してもらえればいいかなあ、と思っています。Rxはイベントが合成出来る!というけれど、合成しようにもイベントのソースがないと始まらない。ReactivePropertyを使うと、手軽に合成のためのソースが手に入るので、イベント周りのRxでのこね方の学習に最適なのではかと思います。……多分。

既存MVVMライブラリとの使い分けなどに関しては、この類の「選択肢が増えます系」の永遠の課題ですねえ。結局、どう使い分けるかの判断をユーザーに丸投げしているわけですもの。ガイドなどを掲示できればベストなのですが、そもそも私がMVVMに全然詳しくないのであった。そもそも私自身がどう使えばいいのか分かってないぐらいなので(えー)、触ってみて、ついでに足りなかったり、これがこうなってたらいい、とかいう思いがあったら、私がそういうのに全然気づいてない確率100%なので、是非言ってやってください。

Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理

Reactive Extensionsv1.1.11011 (Experimental Release)がリリースされました。リリース対象はExperimental(実験)版のみです。Stable(安定)版のほうは変更ありません。別件で少しコメントで質問したところ、近いうちにStable版の更新もあるかも、とのことでしたので、Stableはそちらを待ちましょう。リリース内容の詳細な解説はフォーラムにあります

今回の大きな追加は.NET Framework 4.5 Developer PreviewWinRTへの対応です。というわけで、WinRT関連では、WinRT用のスケジューラであったりイベントであったりへの対応とまぁまぁ想像つく普通のもの。あと非同期処理を他言語と結びつけるIAsyncOperationへの書き出し、などなどもサポートされるようですね。

そして.NET 4.5周りでは、C#5.0のAsyncサポート・クラスライブラリがTask中心に書き換わることが念頭に置かれ、大規模に変更が入っています。今後のRxの方針がよく見えますので、Experimentalではありますが注意深く観察してみる必要がありそうです。というわけで、しっかり紹介します。なお、以下の話は.NET 4.5のRxの話なので、.NET 4.0以前の場合では直接は関係なく、Obsoleteにもなっていません。が、将来フレームワークのバージョン上げたらObsolete祭りでモニョるのは覚悟が必要かしらん。もう一つ注意としては、あくまでExperimentalなので、将来的にもこのままかどうかは保証されません。現時点での話です。

FromAsyncPatternがObsolete

はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。

毎回ToObservableなんて面倒くさい、という人のためにSelectManyに限っては、Taskを受け取るオーバーロードが用意されているので、ToObservableは不要です。内部では予想つく通り、ただ単にToObservableしているだけですね。その他の合成系メソッド(MergeやSwitch)などは残念ながらというか当然というか、ToObservableしてください。

IObservableはAwaitable

IObservable<T>も非同期を扱うものなので、awaitできます。正確に言えばGetAwaiterが定義された、といったところでしょうか。

var req = WebRequest.Create("http://google.com/");
var response = await Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)();

基本的にRxの非同期はFromAsyncPatternを初めとして長さが1のものを扱っていますが、複数の値が流れる場合はどうなるかというと、挙動は「最後の値」です。正確に言えばAsyncSubjectが利用されているので、OnCompletedの直前のOnNextの値。ではEmpty(OnCompletedのみ)やNever(何もなし)ではどうなるのか、というと……

// 10(最後の値)
var a = await Observable.Range(1, 10);
 
// InvalidOperationException(シーケンスに値がない)
var b = await Observable.Empty<int>();
 
// ある意味フリーズ、ここで永遠に止まる
var c = await Observable.Never<int>();

となります。これだけ見るとNeverって使い道がイミフですが、何かとMergeする必要があるときに、マージ対象がないときはNeverを渡すなどなど、ライブラリに近い部分では結構使う場所あります。私の書いているReactivePropertyというライブラリでもそうして利用しています。

FirstなどがObsolete、かわりにFirstAsyncなどとWaitが追加

え?という感じですがObsoleteです。対象はFirst,Last,Single、それとForEachも。FirstOrDefaultなど、XxxDefault系も同様です。つまり同期的にブロックするタイプのものが全てObsolete行きになりました。代わりにFirstAsyncなどXxxAsyncが用意されています。それとawaitを組み合わせてください。

var source = Observable.Return("async?");
 
var value0 = await source; // LastAsyncと挙動は同じ
var value1 = await source.FirstAsync();
var value2 = await source.LastAsync();

長さ1と分かっている状況なら、何もなくそのままawaitでも良いのではかしらん。LastAsyncがそれに相当しますね。さて、しかしawaitのお陰で同期的「のように」書けるには違いないけれど、FirstやLastなどはブロックして「同期」な挙動を取っていたわけなので、単純にObsolete行きにされたら困ってしまいます。そこで、同期的に待機して最後の値を取り出すWaitメソッドが用意されました(これは.NET 4.0やSilverlightなどでも使える、新たに追加されたメソッドです)。

var source = Observable.Return("async?");
 
var value0 = source.Wait();
var value1 = source.FirstAsync().Wait();
var value2 = source.LastAsync().Wait();

ちなみにWaitの中身はLastです。Lastという名前から離して、同期的に待機して値を取り出す、と明示させたのですね。それはいいと思います。Waitはいい。Waitはいいんですが、FirstをObsoleteにしてFirstAsyncを追加するのは、正直気にいりません。私は反対です。

ターゲットフレームワーク間でコードが共有出来なくなる、IQbservableプロバイダに影響が出る、そもそも標準クエリ演算子から離れるのはどうよ。などなど。だいたい、AllやAny、Maxなどは長さ1のIObservable<T>を返すようになっていました。Firstなどだけです、同期的に待機して値を取り出す、という別の意味が与えられていたのは。だから、ここはFirstはObsoleteにせず、FirstAsyncの挙動、つまりIObservable<T>を返すように変更すればいいのです。同期待ちについては、Waitが搭載されたので心配無用です。これで、全ての挙動に統一が取れる。

唯一問題点を挙げれば、本当に本当に「破壊的変更」になるんですよね。それも、Stableとか銘打ったものへの影響も出る。メソッド名同じで戻り値が変わる。そういう変更を許せるものか。私は、許してしまってもいいと思うのですけれど。Firstを廃止してFirstAsyncを追加、などという歪な形を将来に残すよりかは、ずっといい。

なので、Forumにもそうコメント入れましたところ返答貰えました。「Stableリリースが存在する以上、XxxAsyncのままでいるしかない。これが不幸なことは同意しますけれど、暫くはこのままでいるしかない。」とのことでした。というわけで、Stableと銘打つのが早まったな…… としか言いようがなく。あの段階でここまで読めなかったのはしょうがないところ、と思いつつ、やはり手痛いミスかなあ。うーん、将来に渡っての完璧なAPIを作り上げるというのは実に難しい。

まあ、Firstを多用する(といっても単体テストや動作確認時ぐらいですけど)のは、値を一つ取り出したい、ということなので、await sourceかsource.Wait() で済む。FirstAsyncやLastAsyncを直接使うことは恐らく少なくて、ならば実際上の問題というのはそこまでないかもしれません。

長さ1の非同期処理の戻り値はTaskを選ぶべきか、Rxを選ぶべきか

メソッドを作るときの非同期処理の戻り値。これは、Taskを選ぶべきです。それは.NET標準と合わせるべきという理由からもそうですし、この.NET 4.5向けのRxの指針からしてそうなっています。長さ1のIObservableで非同期を表現する、というのは特殊だったと言わざるを得ないので、メソッドを作るとき、非同期処理の戻り値はTaskにしたほうが間違いなく良いでしょう。

ただ、アプリケーショに全体でRxによる合成を中心に置く場合は、ToObservableが面倒くさい、というだけじゃなく逆にオーバーヘッドになる可能性もあるので、IObservable中心にしたほうが良いでしょう。この辺は一概には言えずケースバイケースでしょうか。どちらにせよToObservable<->ToTaskで相互変換が可能なわけなので、あまりガチガチに捉える必要もないですけれど。

あと、あくまで.NET 4.5の話でasync/awaitが入るからTaskのほうが良いと言ってるのであって、.NET 4.0以前ならまた違う話です。というかその場合だとRx一択です。

ねぇ、Rxってもう要らない子?

時代の徒花でしたね、短い命だった……。

って、ちょっと待ったー。それはYESでもあり、NOでもあります。YESなのは、単純な形での非同期処理ならば、遥かに楽になりますし、それに何よりもRxを通すよりもパフォーマンスは良いと思われるので、むしろasync/awaitを使うべきです。具体的には、SelectManyしてSubscribeするだけ、あとCatchで少し例外処理、みたいなコードなら、もう全面的にRxさようならでいいでしょう。

でも往々にしてそういう処理だけじゃないよね?以前にも紹介しましたがSwitch(新しい処理が入ったら以前の非同期処理はキャンセルして新しい処理のみを後続に流す)などを手書きせず演算子一つにパッケージ化できることや、全体的にTaskのメソッド群よりも合成や待ち合わせが容易に記述できる、などなど。そして、「複数の戻り値のある非同期処理、例えばStreamのBeginReadは細切れにbyte[]が得られますが、それを複数回分の非同期処理をまとめてシーケンスとして、IObservable<byte[]>としてまとめることは現状ではRxしかできません。

// 複数回のBeginReadをRx+Asyncで一つにまとめる例
static IObservable<byte[]> ReadMultipleAsync(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(async observer =>
    {
        try
        {
            while (true)
            {
                var buffer = new byte[bufferSize];
                var readCount = await stream.ReadAsync(buffer, 0, bufferSize);
 
                if (readCount == 0) break;
                if (readCount != bufferSize)
                {
                    var newBuffer = new byte[readCount];
                    Array.Copy(buffer, newBuffer, readCount);
                    buffer = newBuffer;
                }
 
                observer.OnNext(buffer); // yield returnのノリで書く
            }
            observer.OnCompleted(); // 完了合図と
        }
        catch (Exception ex)
        {
            observer.OnError(ex); // 例外は自前で明示的に
        }
    });
}

ExperimentalリリースではObservable.Createがasync/await対応しているので、擬似的なyield returnとして、非同期での列挙をそこそこ簡単に記述することができます。OnErrorとOnCompletedは自前で管理する必要がありますけれど。

なので、メソッド単体で分けた場合の戻り値は「長さ1」ならTask、複数ならIObservable。それらメソッドを使って非同期処理を組み上げる時は、単純ならawaitのみ、複雑ならRx。というのが使い分けの指針です。とはいえ、使い分けっていうのは幻想に近くて、実際はどっちか一つになりがちだとは思っています。そして、それならTask中心になるでしょうねえ、とも。非同期における大抵のシチュエーションでRxがサヨウナラ気味になるのはしかたのない話です。ぶっちゃけSelectManyしてSubscribeがほとんどだし、それ以外のことだって、同期的のように書けるのなら、いくらでもやりようはありますから。

まあ、未来を待たなくても今使える解としてなら十分ですし、それ自体がawait可能なので、今書いたコードは将来に渡っても無駄にはなりません。FromAsyncPatternがObsoleteというのは、まあ単純に.NET4.5本来のXxxAsyncに置き換えればいいというだけなので無駄になる、とは言わないでしょう。

けれど、それだけじゃあ、すごく後ろ向きで、寂しいよね。Rx自体の持つ力というのは、別に非同期に限らない。ただの幾つもある側面のうちの一つにすぎない。そこで出した私の答えがReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリです。ReactivePropertyはC#5.0によってプレゼンスが低下するReactive Extensionsに新たな価値をもたらしたいという危機感から作ったものだったりします(後付け、じゃなくてこれは本当の話です、次の一手を指すならRx自体に注目の集まっている今しかない、とも)

ReactivePropertyを通して見ると、非同期だけではない、Rxの持つポテンシャルがよく分かるのではないでしょうか?Rxの真の強みはイベント単独や非同期単独ではなくて、それらが、ただの配列も含めて、統一的に扱える、だから全て一本のストリームになって合成処理が自由自在。というOrchestrateな部分にある。ReactivePropertyはそれを全面的に押し出してます(半強制的に全てが一本に繋がるようになってる)

Rxは、この先も力強く存在し続けるので、学習する価値は間違いなくありますよ!

まとめ

相変わらずフリーダムな変更が続いているRxですが、あくまでExperimental版の話です。Stableでは、こんなバカバカと変わっていくことはないので、安心して使えばいいです。あと、Experimentalなのでまだまだ変動の余地はある、と思いますので、意見あればForumで直接言っておくとよさそう。私も、何やらかんやらと意見言っておきました(昔は書くのにビビッてたのに、今は平然と書いててアレです、慣れですね、ようするに)。

ところでRxは.NET 4.5に標準搭載されるのか否か、ですが、なんかまだまだ全然色々と変更や模索する気満々なようなので、この様子だと、仮にそういう話があったとしても間に合わなさそうという点で、標準搭載はなさそうですねー。ほぼ完成してるのに標準入りしない、とかだと悲しいんですが、そういう形で標準搭載見送り、ならばむしろ喜ばしい話なので、いいかなー、と思います。

それにしてもRx本は出すタイミング難しいですねえ。今回の変更は非同期周りの話がガラッと変わってきちゃうわけなので。また延期かしらねえ。さすがにそれはないか。ところで私もLINQ + Rx本をオライリーから出したいです(←ただたんに表紙をデンキウナギ(Rxのロゴはピンクのデンキウナギ)にしてウナギ本と呼ばれたいという一点だけの話なので間に受けないでください)

ReactivePropertyのデモをしました

Silverlightを囲む会in東京#4にて、先日公開したReactivePropertyについてお話しました。

本題のセッションの後の、お楽しみセッションということで、LT的に5分程度とか思っていたつもりなのですが、大幅に時間オーバーして17分も喋っていました。これは酷い。色々と寛容に見て頂き感謝です。さおさんありがとうー。IIJさんも本当にありがとうございます。時間オーバーを許してくれたというのと(笑)、それと、ネットワークが良好だったお陰でTwitterインクリメンタルサーチのデモが出来たので。毎度ながら凄まじい画質のSmooth Streamingといい、神会場すぎます。

いつまで残るか分かりませんが、会場で行ったセッションの録画です。Silverlight を囲む会 in 東京 #4 @ IIJ 神保町三井ビル。私のセッションは04:01:30 - 04:19:00です。ライブコーディングしているのは04:05:30-04:16:30ですね。

色々アレゲなのはいいとして、以前にスマベンで話をしたときにも反省事項だったのですがすっかり失念してた声の小ささはダメですねー。次は気をつけます。むしろ早口気味なのかと気にしてたんですが、録画を見るとそうでもないというか、このぐらいで調度良いぐらいですね。スライドはちゃっちゃと進めて欲しいし、本題のDemoは素早く進行して欲しいですから。ライブコーディングは好評だったようで何よりです。ちなみに、スムーズにプログラム書いていて凄い!と評価いただきましたが、やる内容が決まっているから書けたというだけで、例えばギターやピアノの演奏などと同じなわけで、普段は頭抱えながらゆったり書いてます。

ちなみに最後のTwitter検索のコードは若干アレだったので、修正したのをここに載せておきます。

<StackPanel>
    <TextBox Text="{Binding CurrentText.Value, UpdateSourceTrigger=PropertyChanged}" />
    <ListBox ItemsSource="{Binding SearchResults.Value}" />
</StackPanel>
public class MainWindowViewModel
{
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveProperty<string[]> SearchResults { get; private set; }
 
    public MainWindowViewModel()
    {
        CurrentText = new ReactiveProperty<string>();
 
        SearchResults = CurrentText
            .Select(word => new WebClient()
                .DownloadStringObservableAsync("http://search.twitter.com/search.atom?q=" + Uri.EscapeUriString(word)))
            .Switch()
            .Select(s =>
            {
                var xml = XElement.Parse(s);
                var ns = xml.Name.Namespace;
                return xml.Descendants(ns + "title").Select(x => x.Value).ToArray();
            })
            .OnErrorRetry((WebException e) => Debug.WriteLine(e))
            .ToReactiveProperty();
    }
}

SelectManyよりもSelect->Switchのほうがいいのと、OnErrorRetryの書く場所は、WebClientの真下だと永遠にリクエストをリピートしちゃうのでダメでしたね。

ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリ

MVVM拡張、という言い方が適切かは不明ですが、ともあれ、RxでXAMLによるUIシステムとの親和性を高めるライブラリを作成し、リリースしました。

中身は大きく分けて二つで、一つはReactivePropertyというXAMLと双方向にバインド可能なIObservable<T>、ReactiveCommandというIObservable<bool>からCanExecuteの条件を宣言的に生成するコマンドなど、MVVM的なUI絡みのクラス群。もう一つはWebClientやWebRequestなど、非同期処理のための拡張メソッド群になります。

名前はUI中心に見えますが、UI絡みはいらないよ、という人は非同期周りだけを使ってくれても問題ありません。それと、機能紹介の前に一つ。決して既存のMVVMフレームワークを置き換えたり、同等の機能を提供するものではありません。ViewModelBaseやMessengerなどに相当するものはないので、その辺は適宜、既存のMVVMフレームワークを使えばいいと思います。というか、併用することを推奨します。だから「拡張ライブラリ」と名乗っています。

UIへのバインディング

ReactivePropertyとは何か。というと、双方向にバインド可能なIObservable<T>です。まず、ViewModel(Model)->Viewという片方向のバインドを見てみましょう。時計のようなものを作ります。

<Grid>
    <TextBlock Text="{Binding DisplayText.Value}"  HorizontalAlignment="Center" VerticalAlignment="Center" />
</Grid>
public class SimpleClockViewModel
{
    // 双方向にバインド可能なIObservable<T>
    public ReactiveProperty<string> DisplayText { get; private set; }
 
    public SimpleClockViewModel()
    {
        // 1秒毎に値を発行、Selectで現在時刻に変換してToReactivePropertyでバインド可能にする
        DisplayText = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .ToReactiveProperty();
    }
}

Microsoft Silverlight を入手

XAML側ではDisplayText.Valueというように、.Valueまで指定してバインドします。実に簡単にIObservableがバインドできると分かるのではないでしょうか?

IObservableとは、時間軸に沿って値が変わるものです。Intervalはx秒置きに等間隔で変わるので、まさに「時間」といったものですが、それ以外のものも全て時間軸に乗っていると考えることが可能です。例えばイベント、クリックやマウスムーブ、ジェスチャーやセンサーイベントで考えると、タッチした、x秒後にまたタッチした、x秒後にまたタッチした…… 発行される時間が不定期なだけで、時間軸に沿って次の値が出力されるという図式は同じです。

非同期処理もそうで、x秒後に一回だけ値が来る。Rangeや配列のToObservableは0.0001秒刻みに値が来る。Rxで、IObservableで表現することが出来る値というのは、時間軸に乗って変わる/発行する値ということになります。そして、見渡してみると、IObservableになる、時間によって変わるという表現がマッチするものは意外と多い。特にリッチクライアントでは。UI自身の値の変化(バインディング/イベントによる通知)もそうだし、ModelのINotifyPropertyChangedもそう。INotifyPropertyChangedとは、或るプロパティの値が変化したという通知を行うオブジェクト。そのプロパティだけに着目してみれば、時間軸上で連続的に変化する値とみなせる、つまりIObservableで表現できます。

UIからのバインディング

では、UIからのバインディングもしてみましょう。これは、空のReactivePropertyを作成してバインディングします。これにより、UIからの入力をIObservableとして他へと中継することができます。

<StackPanel>
    <!-- このTriggerは入力と同時に発火させるために(SL4では)必要なもの -->
    <TextBox Text="{Binding CurrentText.Value, Mode=TwoWay}">
        <i:Interaction.Behaviors>
            <prism:UpdateTextBindingOnPropertyChanged />
        </i:Interaction.Behaviors>
    </TextBox>
    <TextBlock Text="{Binding DisplayText.Value}" />
</StackPanel>
public class FromUIViewModel
{
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveProperty<string> DisplayText { get; private set; }
 
    public FromUIViewModel()
    {
        // UIからのテキスト入力の受け口
        CurrentText = new ReactiveProperty<string>();
 
        // そして、それを元にして加工してUIへ返してみたり
        DisplayText = CurrentText
            .Select(x => x.ToUpper()) // 全て大文字にして
            .Delay(TimeSpan.FromSeconds(3)) // 3秒後に値を流す
            .ToReactiveProperty();
    }
}

Microsoft Silverlight を入手

Interaction.Behaviorは本題とは関係なくて、値の更新通知のタイミングを、値の変更と同時にするためのものです(デフォルトだとフォーカスが移ったときかな)。WPFでは、こういった小細工がなくてもいいのですが、SL4,WP7では必要なので已むを得ず。詳しくは Silverlight 4のTextBoxのTextプロパティの変更のタイミングでBindingのSourceを更新したい - MSDN Samples Gallery に。というわけで、このUpdateTextBindingOnPropertyChangedはPrismからコピペってきたものです。

さて、入力の受け付けをベースにするものは、newで空の物を作ります。出力中心のものはToReactivePropertyなわけですね。あとは、文字が非連続的に、(同じスレッド上の)非同期でやってくるので、LINQで加工します。Selectで大文字にして、そして、Rxなので時間系のものも使えるので、Delayを使ってみたりしながら、UIに戻しました。なお、Delayの時点で値の実行スレッドはUIスレッドからスレッドプールに移りますが、ReactivePropertyを使う限りは、ReactiveProperty内部でスレッド間の値の通知を解決するため、Dispatcher.BeginInvokeも、ObserveOnDispatcherも不必要です。実行スレッドを意識するなんて原始的ですよね、ReactivePropertyなら、全く意識する必要がなくなります。非同期は自然のまま非同期で扱える。だって、そもそも全てが非同期なのだもの。

ReactiveCommand

ReactivePropertyのもう一つの大事な機構が、ReactiveCommandです。これは、IObservable<bool>という、実行可否の変化のストリームからICommandを生成します。一般的なMVVMフレームワークで使われるRelayCommand, DelegateCommandとは発想が異なるのですが、私はこのReactiveCommandのアプローチこそがベストだと考えます。まずは例を。

<StackPanel>
    <StackPanel Orientation="Horizontal">
        <CheckBox IsChecked="{Binding IsChecked1.Value, Mode=TwoWay}">CheckBox1</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked2.Value, Mode=TwoWay}">CheckBox2</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked3.Value, Mode=TwoWay}">CheckBox3</CheckBox>
        <CheckBox IsChecked="{Binding IsChecked4.Value, Mode=TwoWay}">CheckBox4</CheckBox>
    </StackPanel>
    <TextBox Text="{Binding CurrentText.Value, Mode=TwoWay}" />
    <Button Command="{Binding ExecCommand}">Execute?</Button>
</StackPanel>
using Codeplex.Reactive.Extensions; // 拡張メソッドを使う場合はこれを忘れず。
 
public class CommmandDemoViewModel
{
    public ReactiveProperty<bool> IsChecked1 { get; private set; }
    public ReactiveProperty<bool> IsChecked2 { get; private set; }
    public ReactiveProperty<bool> IsChecked3 { get; private set; }
    public ReactiveProperty<bool> IsChecked4 { get; private set; }
    public ReactiveProperty<string> CurrentText { get; private set; }
    public ReactiveCommand ExecCommand { get; private set; }
 
    public CommmandDemoViewModel()
    {
            var mode = ReactivePropertyMode.RaiseLatestValueOnSubscribe | ReactivePropertyMode.DistinctUntilChanged;
 
            IsChecked1 = new ReactiveProperty<bool>(mode: mode);
            IsChecked2 = new ReactiveProperty<bool>(mode: mode);
            IsChecked3 = new ReactiveProperty<bool>(mode: mode);
            IsChecked4 = new ReactiveProperty<bool>(mode: mode);
            CurrentText = new ReactiveProperty<string>(
                initialValue: "テキストが空の時はボタン押せないよ",
                mode: mode);
 
            ExecCommand = IsChecked1.CombineLatest(IsChecked2, IsChecked3, IsChecked4, CurrentText,
                    (a, b, c, d, txt) => a && b && c && d && txt != "")
                .ToReactiveCommand();
 
            ExecCommand.Subscribe(_ => MessageBox.Show("Execute!"));
    }
}

Microsoft Silverlight を入手

全てのチェックがONで、かつ、テキストボックスに文字が含まれていないとボタンを押せません(テキストボックスの値の判定はフォーカスが外れてからになります)。CombineLatestというのは、二つの値のうち、どちらか一つが更新されると、片方は新しい値、片方はキャッシュから値を返して、イベントを合成するものです。もし片方にキャッシュがない場合はイベントは起こしません。「二つの値」というように、標準では二つの合成しか出来ないのですが、ReactivePropertyではこれを拡張して7つの値まで同時に合成できるようにしました(それ以上合成したい場合は、匿名型にまとめて、再度CombineLatestを繋げればよいでしょう)。この拡張はCodeplex.Reactive.Extensionsをusingすることで使えるようになります。Extensions名前空間には、他にも有益な拡張メソッドが大量に定義されていますので、是非覗いてみてください。

さて、つまりCombineLatestの結果というのは、ボタンが押せるか否かの、条件のストリームです。どういうことかというと、連続的なCanExecuteです。なので、そのままICommandに変換してしまいましょう、というのがToReactiveCommandになります。CanExecuteに変更があることを、条件のほうからPushして伝えるので、従来使われてきたコマンドを集中管理するCommandManager.RequerySuggestedや、(イベントなので)本来外から叩けないCanExecuteChangedを外から叩けるようにする、などの手立ては不要です。

常にtrueのコマンドならば、new ReactiveCommand()で生成できます。

ところで、mode: ReactivePropertyMode.RaiseLatestValueOnSubscribe というのは、Subscribeされる時に(ToReactivePropertyやToReactiveCommandは内部でSubscribeしています)、同時に最新の値を返します(最新の値がない場合は初期値を返します。初期値は指定することもできますし、もし指定していない場合はdefault(T)になります)。どういうことかというと、判定のタイミングの問題があります。CombineLatestは全てに一度は値の通知が来ている(キャッシュが存在する)状態じゃないと、イベントを起こしてくれません。なので、CanExecuteを初回時から判定させるために、これの指定が必要です。なお、ReactivePropertyModeのデフォルトはDistinctUntilChangedのみで、RaiseLatestValueOnSubscribeは明示的に指定しなければなりません。一件便利そうに見えるRaiseLatestValueOnSubscribeですが、問題も抱えていまして、後でも詳しく説明しますがバリデーションの時。バリデーションの場合は初回実行はして欲しくない(画面を表示したら、いきなり真っ赤っかだと嫌でしょう?)ケースがほとんどのはずです。RaiseLatestValueOnSubscribeを指定すると、初回実行してしまうので、そういう場合にとても都合が悪いのです。これは、良し悪しなので、適宜判断して、最適な方をお選びください。

宣言的であるということ

ReactiveCommandは、条件を宣言的に記述しました。そして、外部から叩くことは禁じられているので、その宣言以外のことが絡む可能性はありません。また、状態を外部変数から取得する(RelayCommandなどはそうなりますね)わけではないので、CanExecuteの変化のスコープはToReactiveCommandをする一連のシーケンスを読むだけですみます。変数を返す場合は、変数を使う範囲、つまりオブジェクト全体から変更可能性が混ざるという、大きなスコープの観察を余儀なくされます。読む場合だけでなく、書く場合でも、集中的に変化の条件を記述することができるので、ずっと楽でしょう。

ReactivePropertyもまた、同じです。値の変化の条件を宣言的に記述しました(こちらはReactiveCommandと違い、(Two-wayでバインド可能にするという都合上外部から叩くことが可能なのでスコープは閉じていませんが)。大事なのは、スコープを小さくすること。大きなスコープは往々に管理しきれないものです。リッチクライアントはステートを持つ。その通りだ。プロパティが、オブジェクトが、絡みあう。それは実に複雑なのは間違いない。でも複雑だから難しくて当然、複雑だからテストできない、複雑さを複雑なまま放っておいたら、それはただの新世代のスパゲティにすぎない。

ステートを捨てようじゃあなくて、宣言的にやろう。それがReactivePropertyの解決策、提案です。

INotifyPropertyChangedと一緒に。

全てReactivePropertyで相互作用を記述する、というのは理想的ですが過激派です。それに、既存のModelや自動生成のModelなど、様々なところにINotifyPropertyChangedはあります。理想だけじゃ生きていけません。それに、私もプレーンなModelはPOCO(+INotifyPropertyChanged)のほうが嬉しい。でも、IObservableになっていないと、関係の合成が不可能で困るので、INotifyPropertyChanged -> ReactivePropery変換を可能にしました。ここでは説明しませんが、その逆のReactiveProperty -> INotifyPropertyChanged変換も可能です。

using Codeplex.Reactive.Extensions; // ObservePropertyもこれをusingで。
 
public class ObserveViewModel
{
    public ReactiveProperty<string> ModelText { get; private set; }
 
    public ObserveViewModel()
    {
        ModelText = new ToaruModel()
            .ObserveProperty(x => x.Text) // タイプセーフにIObservable<T>に変換
            .ToReactiveProperty();
    }
}
 
// WCFからだったりEntity Frameworkだったり既存のModelだったり他のViewModelだったり
// ともかく、INotifyPropertyChangedは至る所に存在します
public class ToaruModel : INotifyPropertyChanged
{
    private string text;
    public string Text
    {
        get { return text; }
        set { text = value; PropertyChanged(this, new PropertyChangedEventArgs("Text")); }
    }
 
    public event PropertyChangedEventHandler PropertyChanged = (_, __) => { };
}

INotifyPropertyChangedの仕組みって、とあるプロパティが変更された、と名前でPushして、変更通知を受けた方はその名前を元にPullで取り出す。描画フレームワークが面倒を見ているなら、それでいいのですが、通常使うには、とてもまどろっこしい。だから、そうしたオブジェクトという大きな土台から名前ベースのPush & Pull通知を、プロパティ単位の小さなPush通知に変換してやりました。指定はExpressionで行うのでタイプセーフですしね。

ReactivePropertyのほうがINotifyPropertyChangedよりも細かいハンドリングが効くのは当たり前の話で、単位が小さいから。逆に、だから、INotifyPropertyChangedという大きい単位で関係を作り込んでいくのは、非常に複雑でスパゲティの元だと言わざるを得ない。勿論、Reactive Extensionsという、プロパティ単位でのPushを自在に扱う仕組みが背後にあってこそのやり方ではあるのですが。

MとVMの境界

が、曖昧にみえるのはその通りかもしれません。けれど、処理がVMに偏りすぎるように見えるのなら、それは素直にMに移せばいい。細かいMを束ねるMを導入すればいい。名前は、サービスでもファサードでもプロキシーでもアプリケーションでもコントローラーでも、なんでもいい(わけではないけれど)。移せばいいなんて簡単にいいますが、それは簡単にできるからです。VM-M-VMが一気通貫してループを描いているなら、ローカル変数への依存もなくメソッドチェーンを切った貼ったするだけなので、どこに置くのも移すのは楽です。

そもそも、最初から明確に分けようとしたってどうせうまくいかないもの。インターフェイスだって、具象型から抽象を見出したほうが簡単だし、ずっとうまくいくでしょう?ネーミングだってリファクタリングで徐々に洗練させる。そもそもVMがヘヴィになりがちなのは、目で見える境界がないから、なせいでしょう。VはXAMLで線引きされるけれど、それ以外はコードで地続き。理想論だけで線を引こうとしたって空疎だし、そもそも、無理な話。境界を見出すには具体的に積み重なった後じゃないと無理でしょう(勿論、境界の敷き方を常日頃考える、研究することは有意義だと思います。そもそも考えていないと、いざ境界を見出そうとしても見えませんから)

そもそもMVVMなのか

UIに対するReactive Programmingなのは間違いないと思ってます。Reactive ProgrammingはUI向きだ。よく聞くその話は、実際その通りだと思うのですが、しかし同時にUI(というか、WPF/SL/WP7などXAML)とRxってどうもイマイチフィットしないなぁ、と悩んでいました。その理由は、最終的に描画を司るフレームワーク(XAML)とミスマッチなせいにあるのだと、気づきました。フレームワークの要求(INotifyPropertyChangedなオブジェクトであったりICommandであったり)と異なるものを、そのまま使おうとしたところで、良い結果は得られない。ゴリ押ししてもXAMLの旨みが生かせないし、Reactive Programmingを大前提に置いた描画フレームワークを構築すれば、もっと違う形になるでしょうが、そんなものは非現実的な話です。膨大な投資のされた、現在のXAML中心のシステムより良いもの……。やはり、絵空事にしか見えません。それに、XAMLは何だかんだ言って、良いものです。

それを認識したならば、必要なのは、境界を繋ぐシステムだと導ける。そのことを念頭においてReactivePropertyとReactiveCommandをデザインしました。MVVMライクなのは描画フレームワークに合わせた結果です、だから、MVVMでもあり、そうでもないようでもある。ただ、それによってパラダイムがミックスされてどちらの長所も活かせるし、世界最高峰のシステムであるXAMLアプリケーションに乗っかれるので今すぐ実用的という面もあるわけなので、これでいいと思うんです。いや、これがいいんです。マルチパラダイムは悪いことではない。あとは、ミックス故に生まれる新しい悩みをどう解消していくか、です。

マルチパラダイムといえば、ReactivePropertyは描画フレームワークからの言語への要求が変化(吸収)しているので、F#でも美味しくXAMLアプリケーションを書くことが可能になるでしょう。多分。

非同期拡張メソッド群

Rxは非同期の苦痛を癒す。とはいっても、実のところ素の状態だと罠が多くて、意外と使いづらかったりします。WebClientは、実行順序の問題があり、そのままではRxで扱いにくい。WebRequestはWebRequestでプリミティブすぎて機能が乏しいし、そのままではリソース処理に問題を抱えたりします。どちらも、ただFromEvent, FromAsyncするだけでは足りなくて、もう一手間かけたRx化が必要です。そのため、WebClient, WebRequestに対して拡張メソッドを用意し、簡単に実行出来るようにしました。

ReactivePropertyと合わせてのインクリメンタルサーチの例を。これは、ReactivePropertyのダウンロードファイルに含む非同期サンプルですので、是非ダウンロードして、サンプルを実際に実行してみてください。

using Codeplex.Reactive.Asynchronous; // 非同期系の拡張メソッド群を格納
using Codeplex.Reactive.Extensions; // OnErrorRetryはこちら
 
public class AsynchronousViewModel
{
    public ReactiveProperty<string> SearchTerm { get; private set; }
    public ReactiveProperty<string> SearchingStatus { get; private set; }
    public ReactiveProperty<string> ProgressStatus { get; private set; }
    public ReactiveProperty<string[]> SearchResults { get; private set; }
 
    public AsynchronousViewModel()
    {
        // IncrementしたりDecrementしたりすることでイベント(Empty ,Inc, Dec, Max)が発生する
        // それはネットワークの状態を管理するのに都合が良い(IObservable<SignalChangedStatus>)
        var connect = new SignalNotifier();
        // 指定したスケジューラ(デフォルトはUIDispatcherScheduler)上で任意にイベントを起こせる
        // 主にProgressと併用して進捗報告に利用する
        var progress = new ScheduledNotifier<DownloadProgressChangedEventArgs>();
 
        SearchTerm = new ReactiveProperty<string>();
 
        // 検索は当然非同期で行い、それをダイレクトにバインドしてしまう
        SearchResults = SearchTerm
            .Select(term =>
            {
                connect.Increment(); // 非同期なのでリクエストは一つじゃなく並列になるので、これで管理
                return WikipediaModel.SearchTermAsync(term, progress)
                    .Finally(() => connect.Decrement()); // リクエストが終了したら、確実にカウントを下げる
            })
            .Switch() 
            .OnErrorRetry((WebException ex) => ProgressStatus.Value = "error occured")
            .Select(w => w.Select(x => x.ToString()).ToArray())
            .ToReactiveProperty();
 
        // SignalChangedStatus : Increment(network open), Decrement(network close), Empty(all complete)
        SearchingStatus = connect
            .Select(x => (x != SignalChangedStatus.Empty) ? "loading..." : "complete")
            .ToReactiveProperty();
 
        ProgressStatus = progress
            .Select(x => string.Format("{0}/{1} {2}%", x.BytesReceived, x.TotalBytesToReceive, x.ProgressPercentage))
            .ToReactiveProperty();
    }
}
 
// 非同期リクエストとデータ。単純ですが、Modelということで。
public class WikipediaModel
{
    const string ApiFormat = "http://en.wikipedia.org/w/api.php?action=opensearch&search={0}&format=xml";
 
    public string Text { get; set; }
    public string Description { get; set; }
 
    public WikipediaModel(XElement item)
    {
        var ns = item.Name.Namespace;
        Text = (string)item.Element(ns + "Text");
        Description = (string)item.Element(ns + "Description");
    }
 
    // WebClientの他に、WebRequestやWebResponseへの非同期拡張メソッドも多数用意されています
    // また、ほとんど全ての非同期拡張メソッドにはプログレス通知を受け付けるオーバーロードがあります
    public static IObservable<WikipediaModel[]> SearchTermAsync(string term, IProgress<DownloadProgressChangedEventArgs> progress)
    {
        var clinet = new WebClient();
        return clinet.DownloadStringObservableAsync(new Uri(string.Format(ApiFormat, term)), progress)
            .Select(Parse);
    }
 
    static WikipediaModel[] Parse(string rawXmlText)
    {
        var xml = XElement.Parse(rawXmlText);
        var ns = xml.Name.Namespace;
        return xml.Descendants(ns + "Item")
            .Select(x => new WikipediaModel(x))
            .ToArray();
    }
 
    public override string ToString()
    {
        return Text + ":" + Description;
    }
}

色々な機能を一度に説明しようとしているので、些か複雑かもしれません。まず、非同期リクエストは並列になります。例えばボタンを、通信中はDisabledにするのに、単純にbooleanで管理してもうまくいきません。どれか一つのアクセスが始まったらDisabledにしてどれか一つのアクセスが終わったらEnabledにする。それではダメです。どれか一つのアクセスが終わったところで、他のリクエストが通信中かもしれないケースに対応できませんから。

そこで、ReactivePropertyではSignalNotifierというものを用意しました。これは、IncrementかDecrementの操作によって、「ゼロになった」「インクリメントされた」「デクリメントされた」「Max(初期値で指定した場合)になった」というイベントを発行します。イベントといっても、自身がIObservable<SignalStatus>になっているので、直接Rxで扱えます。これのネットワークリクエストへの適用はシンプルで、通信開始されたらインクリメント。通信終了したらデクリメントする。そして、ゼロになったか否かを見れば、それが通信中か否かの判定になります。

非同期拡張メソッドはキャンセルに対しても強く考慮されています。WebClient(のSubscribeの戻り値)にDisposeするとCancelAsyncを、WebRequest(のSubscribeの戻り値)にDisposeするとAbortを呼ぶようになっています。このような挙動は、単純にFromEvent, FromAsyncしただけでは実現できないので、大きくて間を省けることでしょう。ネットワークリクエストを自身でキャンセルすることは少ないかもしれませんが、上の例であげたSwitchは内部でDisposeを呼びまくる仕組みになっていますので、しっかり対応している、というのは実行上、大きなアドバンテージとなります。

Switchは複数の非同期リクエストが確認された場合に、前のリクエストをキャンセル+キャンセルが遅れた場合でも遮断して結果を後続に返さないことで、最新のリクエストの結果のみを返します。そのため、非同期リクエストが抱える結果が前後してしまう可能性、例えばインクリメンタルサーチではLINQと検索したのに、LINQの結果よりも後にLIの結果が返ってきたために、表示されるのがLIの結果になってしまう。などという自体が防げます。

また、OnErroRetryに注目してください。これはReactivePropertyが独自に定義している拡張メソッドで、例外発生時の処理をすると同時に、Retry(ここでいうとSearchTermの再購読なので、つまりチェーンの状態が維持される、ということになる)します。ToReactivePropertyを使い、ダイレクトに結び付けている場合は、例外が発生するとチェーンが終了して困るのですが、例外処理にこのOnErrorRetryを使うことで、そのような悩みは不要になります。なお、このOnErrorRetryは勿論ReactiveProperty専用というわけでもなく汎用的に使えます。例えば、もしネットワークからのダウンロードに失敗したら、一定間隔を置いて再度ダウンロードをする、但しリトライの挑戦は指定回数まで。というよくありそうな処理が、引数で回数とTimeSpanが指定できるので、簡単に記述できます。

進捗レポートも非同期処理では欠かせませんが、これは非同期拡張メソッドとScheduledNotifierを組み合わせることで、簡単に実現出来ます。これら非同期周りのサポートはReactivePropertyの重要な柱だと考えているので、UI周りの機能は必要ない、という人も、是非試してみて欲しいです。

同期 vs 非同期

SLやWP7はともかく、WPFでこのように強烈に非同期サポートする意味はあるのでしょうか。というと、あります(ただたんにコード共有しているから、というだけではなく)。まず、WinRTがそうなように、時間のかかる処理は時間のかかる処理なわけなので、強制的に非同期になっていたほうが、ViewModelなり束ねるModelなりで、そこら中に、明示的にスレッド管理(ただたんにTaskに投げるのも含む)をしないで済みます。本質的に非同期(CPU依存ではない形で時間がかかる)なものは非同期として扱ったほうが易しいのです。

もう一つは、Switchのような、キャンセルを多用した処理が書きやすいこと。それに、自然な形でプログレス処理もサポートできます。更には、ReactivePropertyを全面に使うのなら、全てがReactiveに通知しあう世界、つまり全てが非同期で回っているので、非同期のほうが圧倒的に相性が良いです。同期プログラミングさようなら。大丈夫です、何も問題ありません。

C#5.0 Async vs Rx

従来通りに書く。シンプルに同期のように。のならば、async/awaitのほうがずっと良い。そういう使い方をする場合は、Rxを非同期に使う必要性というのは、今後はなくなるでしょう。ではRxでの非同期に価値はなくなってしまうのか?というと、それに関しては明確にNOと答えます。

Rxの場合はLINQということで、宣言的なスタイル、演算子という形に汎用的な処理を閉じ込められる高いモジュール性。というのがあります。上で見たきたように、Switchのようなこと、OnErrorRetryのようなこと、これらを演算子という形で定義して、メソッド一発で適用出来るのはRxならではの利点です。もし自分でそれらの処理を書くとしたら…… あまり考えたくはないし、もしメソッドの形でまとめあげるとしても、Rxのように綺麗に適用させるのは不可能でしょう。どこか歪んだシグネチャを抱えることになります。

ReactivePropertyと親和性が高いのでViewModelへの伝達に使いやすいというのもポイントですね(TaskとIObservableは相互に変換可能なので、ToTaskしたりToObservableしたりするだけなので、別段問題でもないですけど)

使い分けというのは実際のところ幻想みたいなことなので、人によりどちらか主体のスタイルには落ち着くでしょう。私は、とりあえずRx主体で行きたいかなあと思ってますが、ライブラリ的な部分ではasync/awaitを使って書くでしょう(演算子の組み合わせでやろうとすると書くのも難しいし、パフォーマンスも出ないので)。現在のシーケンスに対する、yield returnで汎用的な演算子を作って、通常使うシーンではLINQ to Objectsで、定義した演算子を含めて使っていく。というのと同じスタイルが良さそうだと想像します。async/awaitの書きやすさ・パフォーマンスと、Rxのモジュール性の両立はその辺かなあ、って。

あと、連続的な非同期処理を一纏めにするというのが(今のところ)async/awaitだと出来ない(Task<T>の戻り値は一つだけだから)ので、その辺をやりたい場合にもRx(IObservable<T>は元より複数の戻り値を内包する)頼みになります。ここは将来的にどういう形になるのか、まだ不明瞭なところなので断言はしませんが。

Validation

ReactivePropertyでは、三種類のバリデーションに対応しています。DataAnnotationsによる属性ベース、IDataErrorInfoによるPull型のエラー確認、INotifyDataErrorInfoによるPush型/非同期のエラー確認。ただし、WPFではINotifyDataErrorInfoは使えなく(.NET4.5からは入るようですが)、WP7ではDataAnnotationsが使えません。これはクラスライブラリの問題なので私の方では如何ともしがたくで。

// XAMLは省略。詳しくはSample/Validationを見てください
 
public class ValidationViewModel
{
    [Required]
    [Range(0, 100)]
    public ReactiveProperty<string> ValidationAttr { get; private set; }
    public ReactiveProperty<string> ValidationData { get; private set; }
    [StringLength(5)]
    public ReactiveProperty<string> ValidationBoth { get; private set; }
    public ReactiveProperty<string> ValidationNotify { get; private set; }
    public ReactiveProperty<string> ErrorInfo { get; private set; }
    public ReactiveCommand NextCommand { get; private set; }
 
    public ValidationViewModel()
    {
        // 属性ベースのバリデーションは、自身のプロパティをExpressionで指定することで適用できます
        // 通常属性ベースの場合、例外経由ですが、ReactivePropertyではIDataErrroInfo経由になります
        // そのため、XAML側ではValidatesOnDataErrors=Trueにしてください
        ValidationAttr = new ReactiveProperty<string>()
            .SetValidateAttribute(() => ValidationAttr);
 
        // IDataErrorInfoではエラー時のメッセージを渡します、nullの場合は成功の判定になります
        ValidationData = new ReactiveProperty<string>()
            .SetValidateError(s => s.All(Char.IsUpper) ? null : "not all uppercase");
 
        // 三種類の指定は、重ねることが可能です(但し同じ種類のものを複数指定するのは不可能)
        ValidationBoth = new ReactiveProperty<string>()
            .SetValidateAttribute(() => ValidationBoth)
            .SetValidateError(s => s.All(Char.IsLower) ? null : "not all lowercase");
 
        // INotifyDataErrorInfoの場合は、IObservable<IEnumerable>を返してください
        // 第一引数はself、つまりIObservable<T>になっていて、最終的にSelectでIEnumerableに変換します
        // IObservableということで、非同期での検証が可能になっているのがポイントです
        // これもIDataErrorInfoと同じく、nullの場合は成功という判定になります
        ValidationNotify = new ReactiveProperty<string>("foo!", ReactivePropertyMode.RaiseLatestValueOnSubscribe)
            .SetValidateNotifyError(self => self
                .Delay(TimeSpan.FromSeconds(3)) // DB問い合わせなど非同期なバリデーション(が可能)
                .Select(s => string.IsNullOrEmpty(s) ? null : new[] { "not empty string" }));
 
        // バリデーションの結果は、三種類全てまとめられてObserveErrorChangedから購読できます
        var errors = Observable.Merge(
            ValidationAttr.ObserveErrorChanged,
            ValidationData.ObserveErrorChanged,
            ValidationBoth.ObserveErrorChanged,
            ValidationNotify.ObserveErrorChanged);
 
        // もし、それらを分類したいときは、OfTypeを使うといいでしょう
        ErrorInfo = Observable.Merge(
                errors.Where(o => o == null).Select(_ => ""), // 成功はnull
                errors.OfType<Exception>().Select(e => e.Message), // 属性からはException
                errors.OfType<string>(), // IDataErrorInfoからはstring
                errors.OfType<string[]>().Select(xs => xs[0]))  // INotifyDataErrorInfoからは、IEnumerableの何か
            .ToReactiveProperty();
 
        // 検証が全部通ったら実行可能にするコマンド、などもこうやって書けますね!
        NextCommand = errors.Select(x => x == null).ToReactiveCommand(initialValue: false);
        NextCommand.Subscribe(_ => MessageBox.Show("Can go to next!"));
    }
}

Microsoft Silverlight を入手

一点だけ通常と異なるのは、属性ベースのものを例外ではなくてIDataErrorInfoとして扱います(この辺はRxのパイプラインを通す都合上、例外を出すという形での実現が不可能だったので)

Event to Observable

イベントをXAML側で指定して、ReactivePropetyにバインドすることが可能です。

<Grid>
    <i:Interaction.Triggers>
        <i:EventTrigger EventName="MouseMove">
            <r:EventToReactive ReactiveProperty="{Binding MouseMove}" />
        </i:EventTrigger>
    </i:Interaction.Triggers>
    <TextBlock Text="{Binding CurrentPoint.Value}" />
</Grid>
public class EventToReactiveViewModel
{
    public ReactiveProperty<MouseEventArgs> MouseMove { get; private set; }
    public ReactiveProperty<string> CurrentPoint { get; private set; }
 
    public EventToReactiveViewModel()
    {
        // UIからのイベントストリームを受信
        MouseMove = new ReactiveProperty<MouseEventArgs>();
 
        // とりあえず座標を表示する、というもの
        CurrentPoint = MouseMove
            .Select(m => m.GetPosition(null))
            .Select(p => string.Format("X:{0} Y:{1}", p.X, p.Y))
            .ToReactiveProperty("MouseDown and drag move");
    }
}

Microsoft Silverlight を入手

お手軽なので、結構便利だと思います。あの長大なFromEventPatternを書くよりかは(笑)

シリアライズ

特にWP7では頻繁な休止と復帰で、シリアライズ/デシリアライズによる状態の回復が重要です。そこで、値の回復を可能にするシリアライズ用のヘルパーを用意しました。

// こんなビューモデルがあるとして
public class SerializationViewModel
{
    // なにもつけてないと普通にシリアライズ対象
    public ReactiveProperty<bool> IsChecked { get; private set; }
    [IgnoreDataMember] // Ignoreつけたら無視
    public ReactiveProperty<int> SelectedIndex { get; private set; }
    [DataMember(Order = 3)] // Orderつけたら、デシリアライズの順序を規程
    public ReactiveProperty<int> SliderPosition { get; private set; }
}
 
// 例えばWindows Phone 7のトゥームストーンなシチュエーションを考えてみると
private SerializationViewModel viewmodel = new SerializationViewModel();
private string viewmodelData = null;
 
protected override void OnNavigatingFrom(System.Windows.Navigation.NavigationEventArgs e)
{
    viewmodelData = SerializeHelper.PackReactivePropertyValue(viewmodel);
}
 
protected override void OnNavigatedTo(System.Windows.Navigation.NavigationEventArgs e)
{
    SerializeHelper.UnpackReactivePropertyValue(viewmodel, viewmodelData);
}

デシリアライズの順序はDataMember属性のOrderに従います。詳しくはデータ メンバーの順序を参照のこと。Pushしあう関係の都合上、デシリアライズの順序によって正確な復元ができないこともあるでしょうから、その場合は、Orderをつけると、ある程度制御できます。また、IgnoreDataMember属性をつけておくと、シリアライズ対象から除外することが可能です。

スニペットとサンプル

NuGetから入れてもらってもいいのですが、ダウンロードしてもらえるとコードスニペットとサンプルがついてきますので、最初はダウンロードのほうがいいかもです。コードスニペットはrpropでReactiveProperty<T> PropertyName{ get; private set; }という頻繁に書くことになる宣言が展開されます。他にはrcomm(ReactiveCommand)など。

サンプルはWPF/SL4/WP7全てで用意しました。サンプルを割としっかり用意した最大の理由は、ただ渡されても、もしかしなくてもどう書けばいいのかさっぱり分からないのでは、と思ったのがあります。決して複雑ではなく、むしろシンプルだし記述量は大幅に減るわけです、が、従来のやり方からするとあまりにも突飛なのは否めないので、いきなりスイスイ書くというのは無理ですよねぇ、と。

その他紹介していない、サンプルに載ってない機能は、まだいっぱい。こんなに記事がなくなっちゃってもまだ全然足りない。でも、いきなりてんこ盛りだと引いてしまうので、基本的にはReactivePropertyとReactiveCommandが主体で、慣れたら徐々に周囲を見てもらえばな、ぐらいに思っています。

まとめ

仕上がりはかなり良いと、興奮しています。この長い記事!興奮を伝えたいという気持ちでいっぱいだからです。今後も、利用シーンの模索と合わせて、どんどん進化させていくつもりです。初回リリースですし、というのもありますが、コアコンセプトの実現と、使い勝手としてのAPIの錬成に力を注いだので、それ以外の部分の研究が疎かになっているというのは否めませんので、そこのところの強化も行なっていきます。また、JavaScriptへの移植もノリ気なので、まずKnockout.jsを試して、その上に構築させたいなあ、とか考えています。

ところで、10/8土曜日、明日のSilverlightを囲む会in東京#4の一番最後に、少し、デモ中心でお話をするつもりなので(最後のオマケなのでほんの少しだけですけどね)良ければ見に来てください。ギリギリではありますが、まだ申し込みも出来ると思います。また、もしよければ会場/懇親会でつかまえて聞いてくれたりすると泣いて喜びます。会場に来れなくてもIIJさんのSmooth Streamingで超高画質な配信が行われると思われますので、そちらでも是非是非。

SL/WP7のSilverlight Unit Test Frameworkについて少し深く

の、前に少し。DynamicJsonAnonymousComparerをNuGetに登録しました。どちらも.csファイル一個のお手軽クラスですが、NuGetからインストール可能になったことで、より気楽に使えるのではかと思います。機能説明は省略。

そして、昨日の今日ですがChaining AssertionSilverlight Unit Test Frameworkに対応させました。リリースのバージョンは1.6.0.1ということで。NuGetではChainingAssertion-SLChainingAssertion-WP7になります。

Silverlight Unit Test Framework

Silverlightで使う場合は(WP7じゃなくてね、という意味です)、一応Silverlight Toolkitに同梱という話ではあるのですが、テンプレートなどの用意が面倒くさいので、NuGet経由で入れるのが最も楽のようです。Install-Package Silverlight.UnitTestで。

まず、Silverlightアプリケーションを新規作成。Webサイトでのホストはなしでいいです。それとブラウザで実行させる必要もないので、プロジェクトのプロパティからOut of Browserに変更してしまいましょう。次に、NuGetからInstall-Package Silverlight.UnitTest。これでライブラリの参照と、ApplicationExtensions.cs(イニシャライズ用拡張メソッド)、UnitTest.cs(テスト用テンプレ)が追加されているはずです。次にApp.xaml.csのStartupを以下のように書き換えます。

private void Application_Startup(object sender, StartupEventArgs e)
{
    // this.StartTestRunnerDelayed();
    this.StartTestRunnerImmediate();
}

StartTestRunnerDelayedはテストランナー起動時に実行オプション(指定属性のもののみ実行するなど)を選択可能にするもの、Immediateはすぐに全テストを実行する、というものです。どちらかを選択すればOK。それで、とりあえず実行(Ctrl+F5)してみれば、テストランナーが立ち上がって、デフォテンプレに含まれるUnitTest.csのものが実行されているんじゃないかしらん。あとは、それを適宜書き換えていけばよし。なお、テンプレのテストクラスはSilverlightTestを継承していますが、これは必ずしも継承する必要はありません。後述しますが、Asynchronousのテストを行いたいときは必須ですが、そうでないならば、普通にMSTestでの場合と同じように、[TestClass]と[TestMethod]属性がついているものがテスト対象になっています。

なお、MainPage.xaml/.xaml.csは不要なので削除してしまってOK。StartTestRunnerによって、参照DLLのほうに含まれるxamlが呼ばれているためです。

WP7の場合。

一応NuGetにも用意されてるっぽい(silverlight.unittest.wp7)んですが、動きませんでした。ので、今のところ手動で色々用意する必要があります。詳しくはWindows Phone 7用の単体テストツール? その2「使ってみた」 - かずきのBlog@Hatenaに全部書いてあるのでそちらを参照のことということで。参照するためのDLLを拾ってくる→App.xaml.cs、ではなくてMainPage.xaml.csを書き換える、という、Silverlight版とやることは一緒なのですけどね。こういう状況なのはMangoのSDKがベータだったからとかなんとかのせいだとは思うので、近いうちに解決するのではかと、楽観視したいところです。

Chaining Assertionを使ってみる

Chaining Assertion ver 1.6.0.0の解説で紹介した失敗結果が丁寧に表示されるよー、をチェックしてみませう。

// こんなクラスがあるとして
public class Person
{
    public int Age { get; set; }
    public string FamilyName { get; set; }
    public string GivenName { get; set; }
}
 
[TestClass]
public class ToaruTest
{
    [TestMethod]
    public void PersonTest()
    {
        // こんなPersonがあるとすると
        var person = new Person { Age = 50, FamilyName = "Yamamoto", GivenName = "Tasuke" };
        // こんな風にメソッドチェーンで書ける(10歳以下でYamadaTarouであることをチェックしてます)    
        // 実際の値は50歳でYamamotoTasukeなので、このアサーションは失敗するでしょう
        person.Is(p => p.Age <= 10 && p.FamilyName == "Yamada" && p.GivenName == "Tarou");
    }
}

はい、ちゃんと表示されます。Chaining Assertionを使うと、メソッドチェーンスタイルで、実際の値.Is(期待値の条件)というように、 簡潔な記述でテストを書くことが出来るのがうりです。また、失敗時には、この場合personの値を詳細に出力してくれるので、何故失敗したのかが大変分かりやすい。もし、普通に書くと以下のようになりますが、

// もし普通に書く場合
var person = new Person { Age = 50, FamilyName = "Yamamoto", GivenName = "Tasuke" };
Assert.IsTrue(person.Age <= 10);
Assert.AreEqual("Yamada", person.FamilyName);
Assert.AreEqual("Tarou", person.GivenName);

まず、Assert.IsTrueでは失敗時にperson.Ageの値を出してくれないので、確認が面倒です。また、この場合、Personが正しいかをチェックしたいわけなので、FamilyNameやGivenNameも同時に判定して欲しいところですが、Ageを判定した時点で失敗のため、そこでテストは終了してしまうため、FamilyNameやGivienNameの実際の値を知ることは出来ません。

などなどの利点があるので、Chaining Assertionはお薦めです!この記事はSilverlight Unit Test Frameworkの紹介の体をとっていますが、実態はChaining Assertionの宣伝記事ですからね(キリッ

非同期テストをしてみる

Silverlightといったら非同期は避けて通れない。というわけで、Silverlight Unit Test Frameworkには非同期をテストできる機構が備わっています。[Asynchronous]というように、Asynchronous属性をつければそれだけでOK。と、思っていた時もありました。実際に試してみると全然違って、独特なシステムのうえにのっかっていて、かなり面倒くさかった……。

準備。まず、非同期テストをしたいクラスはSilverlightTestクラスを継承します。そしてAsynchronous属性をつけます。すると、そのテストメソッドはTestCompleteが呼ばれるか例外を検知するまでは、終了しなくなります。というわけで、こんな感じ。

[TestClass]
public class ToaruTest : SilverlightTest
{
    [TestMethod]
    [Asynchronous]
    public void AsyncTest()
    {
        var req = WebRequest.Create("http://www.google.co.jp/");
        req.BeginGetResponse(ar =>
        {
            try
            {
                req.EndGetResponse(ar)
                    .ResponseUri.ToString()
                    .Is("http://www.google.co.jp/");
            }
            catch (Exception ex)
            {
                EnqueueCallback(() => { throw ex; }); // 例外はテスト用スレッドに投げる必要がある
                return;
            }
 
            // ↓は定型句なので、EnqueueTestComplete(); という単純化されたのが用意されている
            EnqueueCallback(() => TestComplete()); // 何事もなければ終了でマーク
        }, null);
    }
}

このUnitTestの非同期は、独自のスレッドモデル(のようなもの)で動いていて、Dispatcherのようなキューにたいしてアクションを放り投げてあげる必要があります。別スレッドからUIスレッドは触れないように、「成功(TestComplete)」か「失敗(例外発生)」を伝えるには、EnqueueCallbackを経由しなければなりません。この辺はDispatcher.BeginInvokeするようなもの、と考えるといいかもしれません。

上のは少し原理に忠実にやりすぎた。まるごとEnqueueCallbackしてしまえばスレッドを意識する必要性は少しだけ減ります。

[TestMethod, Asynchronous]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/404"); //404なので例外出してくれる
    req.BeginGetResponse(ar =>
    {
        EnqueueCallback(() => req.EndGetResponse(ar)
            .ResponseUri.ToString()
            .Is("http://www.google.co.jp/"));
 
        EnqueueTestComplete();
    }, null);
}

といっても、これは非常に単純なケースなだけであって、複雑なケースを書くとどんどん泣きたくなっていくでしょう……。一応、Enqueueには他にEnqueueConditionalという、条件式がtrueになるまで待機し続けるというものが用意されているので、若干制御はできなくもないんですが、あんまりできるとは言い難い仕組みがあります。詳しくは述べませんというか、別に使いやすいシステムじゃないのでどうでもいいです。

Rxを使ってみる

結果・もしくは例外を別のスレッドシステムに投げる。どこかで聞いたことあるような。ここでティンと来るのはReactive ExtensionsのObserveOnDispatcherです。Dispatcher.BeginInvokeのかわりにEnqueueCallback。丸っきりそっくり。なので、ObserveOnTestQueueのようなメソッドが作れれば、非常に使い勝手がいいんじゃないか。と思い浮かぶわけです。

と、浮かんだ人は実に素敵な発想力を持っていますね。浮かんだのは私じゃなくて海外の人です。はい。Writing asynchronous unit tests with Rx and the Silverlight Unit Testing Framework | Richard Szalayに、実装が書かれています。

そのRxによるScheduler実装を使うと(WP7版なのでSystem.ObservableとMicrosoft.Phone.Reactiveも参照してください)

[TestMethod, Asynchronous]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/");
    Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse,req.EndGetResponse)()
        .ObserveOnTest(this)
        .Subscribe(r => 
            r.ResponseUri.ToString().Is("http://www.google.co.jp/"),
            () => TestComplete());
}

EnqueueCallbackの管理がなくなり、非常に簡単に記述できました。Rxのスケジューラのシステムの柔軟さの賜物ですね。これはRxの素晴らしい応用例だと本当に感動しました。Richard Szalayさんに乾杯。それと、私がこの記事を知ったのはInfoQ: Rx と Silverlight で非同期テストを記述するからなので、紹介したInfoQと、そして翻訳した勇 大地さんにも大変感謝します。

Silverlightの場合

Richard SzalayさんのコードはWP7のMicrosoft.Phone.Reactiveのためのものなので、Silverlight用Rxの場合はそのままでは動きません。はい。残念ながら、WP7版RxとDataCenter版Rxとでは、互換性がかなり崩壊しているので、そのまま動くことなんてないんです。悲しいですねえ……。これに関しては銀の光と藍い空: 「Rx と Silverlight で非同期テストを記述する」をWeb版にも使えるようにしたい!に書かれていますが、Silverlight用に移植してあげればよいようです。

既に、上記記事で田中さんが移植されているのですが、二番煎じに書いてみました(と、※欄で書いたものを流用です、毎回、流用させてもらっていてすみません……)

public static class TestHarnessSchedulerObservableExtensions
{
    public static IObservable<T> ObserveOnTestHarness<T>(this IObservable<T> source, WorkItemTest workItemTest)
    {
        return source.ObserveOn(new TestHarnessScheduler(workItemTest));
    }
 
    public static IDisposable RunAsyncTest<T>(this IObservable<T> source, WorkItemTest workItemTest, Action<T> assertion)
    {
        return source.ObserveOnTestHarness(workItemTest).Subscribe(assertion, () => workItemTest.TestComplete());
    }
}
 
public class TestHarnessScheduler : IScheduler, IDisposable
{
    readonly WorkItemTest workItemTest;
    readonly CompositeDisposable subscriptions;
 
    public TestHarnessScheduler(WorkItemTest workItemTest)
    {
        var completionSubscription =
            Observable.FromEventPattern<TestMethodCompletedEventArgs>(
                h => workItemTest.UnitTestHarness.TestMethodCompleted += h,
                h => workItemTest.UnitTestHarness.TestMethodCompleted -= h)
            .Take(1)
            .Subscribe(_ => Dispose());
 
        this.subscriptions = new CompositeDisposable(completionSubscription);
        this.workItemTest = workItemTest;
    }
 
    public void Dispose()
    {
        subscriptions.Dispose();
    }
 
    public DateTimeOffset Now
    {
        get { return DateTimeOffset.Now; }
    }
 
    public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
    {
        return Schedule(state, dueTime - Now, action);
    }
 
    public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
    {
        if (subscriptions.IsDisposed) return Disposable.Empty;
 
        workItemTest.EnqueueDelay(dueTime);
        return Schedule(state, action);
    }
 
    public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
    {
        if (subscriptions.IsDisposed) return Disposable.Empty;
 
        var cancelToken = new BooleanDisposable();
 
        workItemTest.EnqueueCallback(() =>
        {
            if (!cancelToken.IsDisposed) action(this, state);
        });
 
        subscriptions.Add(cancelToken);
        return Disposable.Create(() => subscriptions.Remove(cancelToken));
    }
}

Richard Szalayさんのコードが非常に素晴らしく、あらゆるケースへのキャンセルに対して完全に考慮されているという感じなので、そのまま持ってきました。実際のところ、テスト用なので「例外発生/TestCompleteが呼ばれる」で実行自体が終了してしまうわけなので、こうもギチギチに考えなくてもいいのではかなー、とか緩いことを思ってしまいますが、まあ、よく出来ているならよく出来ているままに使わさせてもらいます。

メソッド名は、ObserveOnTestHarnessに変更しました。ObserveOnTestだけだと何かイマイチかなー、と思いまして。それと、時間のスケジューリングは、NotSupportedではなくて、EnqueueDelayというのものがあるので、それを使うことにしてみました。それと、ObserveOn -> Subscribe -> onCompletedにTestCompleteが定形文句なので、それらをひとまとめにしたRunAsyncTestを追加。こんな風に書けます。

var req = WebRequest.Create("http://www.google.co.jp/444");
Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
    .RunAsyncTest(this, res => 
        res.ResponseUri.ToString().Is("http://www.google.co.jp/"));

定形文句が減る、つまりうっかりミスで書き忘れて死亡というのがなくなる、というのはいいことです。

通常のMSTestの場合

ところで、もしSilverlight/WP7固有の機能は使っていなくて、WPFでも利用出来るようなコードならば、コードをリンク共有の形でWPF側に持っていってしまって、そこでテスト実行してしまうと非常に楽です。まず第一に、MSTestやNUnitなどの通常のテストフレームワークが使えるため、Visual Studio統合やCIが簡単に行えます。第二に、非同期のテストが(Rxを使った場合)更に簡単になります。

[TestMethod]
public void AsyncTest()
{
    var req = WebRequest.Create("http://www.google.co.jp/");
    var result = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)()
        .First(); // First()で同期的に待機して値が取れる。複数の場合はToEnumerable().ToArray()で。
 
    result.ResponseUri.ToString().Is("http://www.google.co.jp/");
}

FirstやToEnumerable.ToArrayにより、同期的に待機することが出来るので、簡単にテストすることができます。通常のコードは同期的待機はすべきではないのですが、こうしたユニットテストの場合は便利に使えます。

じゃあSilverlightのユニットテストでも待機できるのはないか?というと、それはできません。理由はWindows Phone 7で同期APIを実現するたった つの冴えないやり方で書いたのですが、WebRequestなどのネットワーク問い合わせは、一度Dispatcherに積まれて、現在のメソッドを抜けた後に実行開始されるので、テスト実行スレッドで同期的に待って値を取り出すことは不可能なのです。

こういった細部の違いもあるので、コード共有してMSTestでチェックするのは楽でいいのですが、やはりSilverlight/WP7の実際の環境で動かされるユニットテストのほうも必要不可欠かなー、と。どこまでやるか、にもよりますが。

まとめ

Chaining Assertionは便利なので是非試してみてね!

なお、Rxを使うとTestScheduler(時間を好きなように進められる)やITestableObserver(通知の時間と値を記録できる)といった、イベント/非同期のテストを強力に支援する仕組みが備わっているので、それらと併用することで、より簡単に、もしくは今までは不可能だったことを記述できるようになります。それはまた後日そのうち。

SL/WP7のテストは、本当はIDE統合されてるといいんですけどねー。まあ、エミュレータ動かさなければならないので、しょうがないかな、というところもありますけれど。その辺も次期VisualStudioでは改善されるのかされないのか、怪しいところです。現在DeveloperPreviewで出ているVS11は、特に何も手をつけられてる感じがしないので、そのままな可能性はなきにしもあらず。どうなるかしらん。async/awaitが入ることだし、色々変わってくるとは思うんですけれど。

Prev | | Next

Search/Archive

Category

Profile


Yoshifumi Kawai
Microsoft MVP for Visual Studio and Development Technologies(C#)

April 2011
|
July 2018

Twitter:@neuecc
GitHub:neuecc
ils@neue.cc