Reactive Extensions用のWebRequest拡張メソッド

WebClientは楽ちんです。WebRequestはシンドイです。そのシンドさといったら、FromAsyncPatternでラップした程度じゃあまり意味がなかったりなわけです。いえ、単純なダウンロード程度ならいいのです。でも、アップロードとか!プログレスとか!そんなのに対応しようとすると、やっぱどうしょうもなく面倒臭い。とはいえ、面倒くさいのも一度書いてしまえば済むわけなので、一通り使いそうなものを書いてみました。Reactive Extensionsが動く環境(.NET 4 Client Profile, Silverlight4)と、Windows Phone 7環境では標準搭載のMicrosoft.Phone.Reactiveで確認取ってあります。

ソースコード、の前に利用例のほうを。

// DownloadStringAsyncメソッドで非同期読み込みも楽チン
var req1 = WebRequest.Create("http://www.twitter.com/statuses/public_timeline.json");
req1.DownloadStringAsync().Subscribe(Console.WriteLine);
 
// UploadValuesAsyncで非同期POSTも楽チン
// この例はgoo.gl短縮URLにPOSTして結果のJSONを取得するというもの
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsync(new Dictionary<string, string>
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    })
    .SelectMany(res => res.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

実に簡単に、「WebClientの同期のように」WebRequestで非同期が扱えます。WebClientと同じ感覚で扱える、というのを大事にするためにも、メソッド名は同じにしてあります(但しUpload系の戻り値はWebResponseとしました、それが最もRx的に制御しやすいと感じたので)。名前末尾にAsyncを付けていますが、この命名規則はAsyncCTPから。

WebRequestでポストする方法のおさらい。GetRequestStreamでRequestStreamを取り出し、それにPOST内容を書きこんでStreamを閉じたら、今度はGetResponseでWebResponseを取得、そのResponseから GetResponseStreamでResponseStreamを取り出し、結果を読み込む。つまりは request.BeginGetRequestStream -> stream.BeginWrite -> request.BeginGetResponse -> stream.BeginRead。ふつーに書いたら人間の扱えるネスト量ではありませんね!

というわけで、streamへのWrite/Readは隠蔽し、単純にアップロードしたら長さ1のReactiveシーケンスが、ダウンロードしたら長さ1のReactiveシーケンスが戻ってくる。という形で簡単に扱えるようにしました。非同期が難しいとか面倒くさいとか、過去の話でしたね!

プログレス表示

プログレス(進捗表示)は大事です。WebClientにもProgressChangedイベントがあるし、せっかく非同期でやってるのだからプログレスも扱いたい。というわけで、それ関連のメソッドも作りました。プログレスは***WithProgressというメソッドを使うと、戻り値が IObservable<Progress<T>>になっていて、このProgressクラスにはValue:値, CurrentLength:現在の長さ, TotalLength:全体の長さ, Percentage:パーセント、の読み取り専用プロパティが入ってます。

// DownloadDataAsyncWithProgressメソッドで進捗を出しながら非同期に画像をダウンロード(して最後に保存する)
var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg")
    .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K
    .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示
    .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換
    .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存
 
// UploadValuesAsyncWithProgressで非同期アップロードの進捗表示もスムーズに
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsyncWithProgress(new Dictionary<string, string> 
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    }, 10) // 分割サイズ、あまりに小さいのは良くない、本当は画像とか大きなファイルでやるべきなのですが適当なPOST先が見つからなくて...
    .Do(p => Console.WriteLine("Up: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // アップロード進捗表示
    .TakeLast(1) // アップロード完了(=最後の1個が通過)までスルー
    .SelectMany(_ => req2.DownloadDataAsyncWithProgress(10)) // 戻り値がProgress<Unit>なので、WebRequestからレスポンス取得
    .Do(p => Console.WriteLine("Down: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // ダウンロード進捗表示
    .Aggregate(new List<byte>(), (l, p) => { l.AddRange(p.Value); return l; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => Encoding.UTF8.GetString(l.ToArray())) // 応答を文字列(JSON)に変換
    .Subscribe(Console.WriteLine);

基本的には非プログレスと同じなのですが、長さ1ではなくて進捗状況に応じた分だけ値が流れてくるという違いがあります。また、書き込み時はUnit、読み込み時はbyte[]が来るので、それを一本にまとめる必要があります。というわけで、若干の手間とクセがあるのですが、まあそれなりに平易に扱えるのではと思います。UploadProgressとDownloadProgressを連結する辺りには、Rxの合成のパワーが見えるのではないでしょうかどうでしょうか。パワーが見えるのはいいとして意味が伝わるかは微妙なところなので、以下図解。

まずはResponseStreamに細切れにWriteし、進捗状況を垂れ流します。Writeなので戻り値はない。ので、Unit(voidみたいなものだと思ってくだしあ)。そしてSelectManyでWriteからReadへ。そう、RxにおいてのSelectManyは、1対多とかフラットにするとかというよりも、イメージとしては「摩り替える」だと思っています。マウスダウンしたと思っていたらマウスムーブに摩り替わっていた、何を (ry。といったように。今回のコードだと、Writeしていたと思ったらいつのまにかReadになっていた、何を言っているのか以下略。

Writeの戻り値にそのままSelectManyを繋げてはいけません。Readを始めるのはWriteが「終わってから」でなければならない。そこで出番なのがTakeLast(1)。意味はそのまんまで、最後の1個を通すというもの。対としてTake(1) - 最初の1個だけ通す、もよく使いますね。

Readもまた細切れにbyte[]で値が読み込まれ送られてきますが、文字列に変換するにせよ何にせよ、 IObservable<byte[]>からbyte[]にしたい。ここでIEnumerable<byte[]>だったら SelectMany.ToArray って感じですが、Observableではそうもいかないので、ここはAggregateを使ってリストに値を詰めてみました。

最後は文字列に変換して、煮るなり焼くなり好きにどうぞ。ブロックはいっぱいありますが、Subscribeに届くのは最後の、集計が全て終わった一つだけというわけでした。なお、進捗はDoメソッドで随時、その箇所に値が流れるたびに画面に出力しています。一見メソッドチェーンだらけで複雑そうですが、順を追ってみてみれば、メソッド全てが、メソッド名通りの意味を明確に持った挙動を取るのと、それぞれのメソッド自体はお馴染みのLinqの挙動そのものなので、Rxの導入までの学習コストというのは存外低いかもしれません(導入を超えた後の敷居に関してはノーコメント)。

あと、このようにブロックが細切れになっているわけですが、これによりキャンセルが容易になるという性質を持っています。キャンセルは Subscribeの戻り値(IDisposable)のDisposeを呼ぶだけで済むのですが、図のとおりに分割されているため、簡単にブロック間に割って入って処理を止めることが可能です。この、処理単位がIObservableとして分割されていることによるキャンセルの容易さは、わざわざ CancellationTokenをチェックしたり(Task)、CancellationPendingをチェックしたり (BackgroundWorker)を処理の途中に挟みこむ必要がないという、Rxの大きな利点となっています。この辺の比較などは次回にでも。

なお、プログレスや細かい単位でのキャンセルが必要なければ、冒頭の例のようにWithProgerss抜きの方のメソッドを使えば、通常の非同期と同じく長さ1のReactiveシーケンスとして、Aggregateとかの処理も拡張メソッド側で全部やってくれますのでお手軽に使えます。

ソースコード

以下ソースコード。ご利用はご自由にのパブリックライセンスで。.NET 4 Client Profile, Silverlight4, Windows Phone 7環境下で動くのを確認してます。名前空間としてusing AsynchronousExtensions することで、拡張メソッド群が使えるようになります。そんなわけでお薦めのファイル名はAsynchronousExtensions.cs。ソースコード一つというわけで、気に入らない箇所がありましたら、勿論当然直接書き換えてしまえばOKです。バグがあれば教えてください。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
#endif

namespace AsynchronousExtensions
{
    internal static class Progress
    {
        public static Progress<T> Create<T>(T value, double currentLength, double totalLength)
        {
            return new Progress<T>(value, currentLength, totalLength);
        }
    }

    internal class Progress<T>
    {
        public T Value { get; private set; }
        public double TotalLength { get; private set; }
        public double CurrentLength { get; private set; }
        public int Percentage
        {
            get
            {
                return (TotalLength <= 0 || CurrentLength <= 0)
                    ? 0
                    : (int)((CurrentLength / TotalLength) * 100);
            }
        }

        public Progress(T value, double currentLength, double totalLength)
        {
            Value = value;
            TotalLength = totalLength;
            CurrentLength = currentLength;
        }
    }

    internal static class WebRequestExtensions
    {
        public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<WebResponse>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar =>
                {
                    var res = request.EndGetResponse(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<Stream> GetRequestStreamAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<Stream>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, ar =>
                {
                    var res = request.EndGetRequestStream(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<byte[]> DownloadDataAsync(this WebRequest request)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsync());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize));
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request)
        {
            return DownloadStringAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringAsync(encoding));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request)
        {
            return DownloadStringLineAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringLineAsync(encoding));
        }

        public static IObservable<WebResponse> UploadStringAsync(this WebRequest request, string data)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadStringAsyncWithProgress(this WebRequest request, string data, int chunkSize = 65536)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadValuesAsync(this WebRequest request, IDictionary<string, string> parameters)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadValuesAsyncWithProgress(this WebRequest request, IDictionary<string, string> parameters, int chunkSize = 65536)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadDataAsync(this WebRequest request, byte[] data)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsObservable(data, 0, data.Length).Finally(() => stream.Close()))
                .TakeLast(1)
                .SelectMany(_ => request.GetResponseAsObservable());
        }

        public static IObservable<Progress<Unit>> UploadDataAsyncWithProgress(this WebRequest request, byte[] data, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsync(data, chunkSize))
                .Scan(0, (i, _) => i + 1)
                .Select(i =>
                {
                    var currentLength = i * chunkSize;
                    if (currentLength > data.Length) currentLength = data.Length;
                    return Progress.Create(new Unit(), currentLength, data.Length);
                });
        }
    }

    internal static class WebResponseExtensions
    {
        public static IObservable<byte[]> DownloadDataAsync(this WebResponse response)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync())
                .Finally(() => response.Close())
                .Aggregate(new List<byte>(), (list, bytes) => { list.AddRange(bytes); return list; })
                .Select(x => x.ToArray());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize))
                .Finally(() => response.Close())
                .Scan(Progress.Create(new byte[0], 0, 0),
                    (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength));
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response)
        {
            return DownloadStringAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response, Encoding encoding)
        {
            return response.DownloadDataAsync().Select(x => encoding.GetString(x, 0, x.Length));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response)
        {
            return DownloadStringLineAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response, Encoding encoding)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadLineAsync(encoding))
                .Finally(() => response.Close());
        }
    }

    internal static class StreamExtensions
    {
        public static IObservable<Unit> WriteAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern((ac, o) => stream.BeginWrite(buffer, offset, count, ac, o), stream.EndWrite)();
        }

        public static IObservable<int> ReadAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern<int>((ac, o) => stream.BeginRead(buffer, offset, count, ac, o), stream.EndRead)();
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data)
        {
            return WriteAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, encoding.GetBytes(data));
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IEnumerable<byte> data, int chunkSize = 65536)
        {
            return WriteAsync(stream, data.ToObservable(), chunkSize);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IObservable<byte> data, int chunkSize = 65536)
        {
            return Observable.Defer(() => data)
                .Buffer(chunkSize)
                .SelectMany(l => stream.WriteAsObservable(l.ToArray(), 0, l.Count))
                .Finally(() => stream.Close());
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, data + Environment.NewLine, encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data, Encoding encoding)
        {
            return WriteLineAsync(stream, data.ToObservable(), encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data, Encoding encoding)
        {
            return WriteAsync(stream, data.SelectMany(s => encoding.GetBytes(s + Environment.NewLine)));
        }

        public static IObservable<byte[]> ReadAsync(this Stream stream, int chunkSize = 65536)
        {
            return Observable.Defer(() => Observable.Return(new byte[chunkSize], Scheduler.CurrentThread))
                .SelectMany(buffer => stream.ReadAsObservable(buffer, 0, chunkSize),
                    (buffer, readCount) => new { buffer, readCount })
                .Repeat()
                .TakeWhile(a => a.readCount != 0)
                .Select(a =>
                {
                    if (a.readCount == chunkSize) return a.buffer;

                    var newBuffer = new byte[a.readCount];
                    Array.Copy(a.buffer, newBuffer, a.readCount);
                    return newBuffer;
                })
                .Finally(() => stream.Close());
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, int chunkSize = 65536)
        {
            return ReadLineAsync(stream, Encoding.UTF8, chunkSize);
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, Encoding encoding, int chunkSize = 65536)
        {
            return ObservableForCompatible.Create<string>(observer =>
            {
                var decoder = encoding.GetDecoder();
                var bom = encoding.GetChars(encoding.GetPreamble()).FirstOrDefault();
                var sb = new StringBuilder();
                var prev = default(char);

                return stream.ReadAsync(chunkSize)
                    .SelectMany(bytes =>
                    {
                        var charBuffer = new char[encoding.GetMaxCharCount(bytes.Length)];
                        var count = decoder.GetChars(bytes, 0, bytes.Length, charBuffer, 0);
                        return charBuffer.Take(count);
                    })
                    .Subscribe(
                        c =>
                        {
                            if (c == bom) { } // skip bom
                            else if (prev == '\r' && c == '\n') { } // when \r\n do nothing
                            else if (c == '\r' || c == '\n')   // reach at EndOfLine
                            {
                                var str = sb.ToString();
                                sb.Length = 0;
                                observer.OnNext(str);
                            }
                            else sb.Append(c); // normally char

                            prev = c;
                        },
                        observer.OnError,
                        () =>
                        {
                            var str = sb.ToString();
                            if (str != "") observer.OnNext(str);
                            observer.OnCompleted();
                        });
            });
        }
    }

    internal static class ObservableForCompatible
    {
#if WINDOWS_PHONE
        public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
        {
            return source.BufferWithCount(count);
        }
#endif

        public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
        {
#if WINDOWS_PHONE
            return Observable.CreateWithDisposable(subscribe);
#else
            return Observable.Create(subscribe);
#endif
        }
    }
}

Progress用のクラスと、WebRequest, WebResponse, Streamに対する拡張メソッドです。各メソッドは大体数行でローカル変数もほとんどないコンパクトなものですが(Rxの強力さに全力で乗っかってるだけ)、コード自体はあまり追いやすくはないかもです。まあ、面倒くさい部分は拡張メソッド側で隠蔽してやれるならそれで良いと思ってます。利用側はWebClientを同期で使うように、簡単にWebRequestが非同期で扱えるんじゃないかなー、と思いますがどうでしょう。そしてついでなので多めに作ってしまった……。

StreamのReadAsync、WriteAsyncですが、それぞれ一度の読み込み/書き込みサイズ指定(デフォルトは64K)で、ストリーミングで読み書きをするようになっています。特にWriteAsyncで要求するbyteはIEunmerable/IObservableですから。つまり、「ストリーミングで読みながら」「ストリーミングで書きこむ」ことが出来るという、メモリに非常に優しいプログラミングが可能になっています。(まあ、読み込みが速くて書き込みが遅い場合は、読み込みだけどんどん進んで、どこかで溜め込まれてしまうわけですけどー、WriteされたらReadが始まる、みたいな仕組み作れないかなー、とは思いつつ方法分からない)

ところでしかし、Rxも素のままではなく、拡張補助ライブラリ的なのを用意するといいのかもねえ。Achiralのように。T4によるFromEventの自動生成などと一緒にまとめて、Rx Supplemental Library。うーん。Rx利用の俺々MVVMライブラリ(を、いつか作りたい、今はまだMVVM自体がヨクワカッテナイのですが)と一緒に、そのうちにでも。

まとめ

WPF, SL, WP7で全く同じコードが動くって素敵。Rx素晴らしい。んで、もしかしてRxって面倒くさいの……? と思ったとしたら、いえ、違います。面倒くさいのはWebRequestでプログレスやアップロード処理を作るのが面倒くさいだけです。上のコードがゲップでそうな感じであるとしたら、Rxが面倒なせいではなくて、WebRequestが面倒なせいなだけです。これRx抜きに同期で書いても面倒臭いです。それどころか、一層大変なことになっていました。むしろRxの上に乗っかっているからこそ、色々な演算子が使えて、コンパクトに書けたのではないかと思われます。また、プログレスや柔軟なキャンセルなど、「クライアントアプリならスレッド余ってるし同期的に書いてThreadPoolに突っ込んでも問題ないし楽っしょー」といった次元を超えた価値をReactive Extensionsは提供できているのではないでしょうか。

プログレスは大事。何が大事って、最近Windows Phone 7を輸入して電波法違反、じゃなくて、ええと、まあ、b-mobile回線で使っているのですけど、死ぬほど遅い。b-mobile遅い。MAX 300kbpsと謳っていて、それも遅いわけだけど実測だと100kbpsで大変遅い。なので、TwitterのXML引っ張ってくる程度であっても(モバイル回線は貧弱なのでXMLじゃなくてJSONがいいですねえ)、何%といった表示は欲しかったりなのです。さすがにそれはやり過ぎだとしても、Twitpicなどにカメラ画像を上げる時などではもう必須と言ってもいいぐらい。

でも、WebRequest使うとプログレスはご覧のように面倒臭いので、どうしてもサポート出来なかったりなのですよね。WebClient使えよってのは正論なのですが、諸事情あったりでWebRequest使いたいって場合もあるでしょう(OAuthとかあるしね)。もうひとつは、Reactive Extensionsに載せるならイベントベースになっているWebClientよりもWebRequestのほうが使いよいということもあり。

そんなジレンマの日々も今日でサヨナラです。Rx + WebRequestでConsoleでも、WPFでも、Silverlightでも、Windows Phone 7でも、幸せな非同期生活を送りましょう。ところで今回のプログレスの話はasync/awaitやBackgroundWorkerと絡めてお話したかったのですが、長くなるので断念。これは次回に(いや、次々回かもしれませんが)必ず書きます!もう既に半分ぐらいサンプルコードとかも書いてはあるので、絶対に近いうちには。

なお、今回の記事はRxTeamのJeffery Van GoghのブログシリーズRx on the serverが下敷きになってます。そして、Rx on the serverで紹介されているコードは、最新のRxのリリースに含まれているSystem.Reactive.ClientProfileに収録されています。が、このPart2で紹介されているAsyncReadLines(非同期でのStreamから一行毎にString取り出し)は簡易的すぎて使い物になりません。2バイト圏無視してるし、1バイトであっても挙動は怪しい。サンプルレベルなら良いと思うし記事は素晴らしく参考になったのですが、本体に収録/配布はやめて欲しかったなあ、少しがっかり。

でもAsyncReadLines自体は欲しいですねえ。今のところ一行毎に取り出すにはStreamReaderしかないのだけれど、非同期APIがないので。非同期読み込み可能なStreamReaderは、何故かSystem.DiagnosticsにAsyncStreamReaderとしてあったりするのですが、internalクラスなので外からでは使えません。というわけで、非同期で一行毎読み込みをやるには、自前実装しか無いようで。うーん、やだなー。私はゆとりゆるふわプログラマなので、バイトとかエンコーディングとかが絡むのはやりたくないなー。というか絶対ミスするでする。

※追記@12/03, やっぱり欲しいと思ったのでReadLineAsyncとして実装しました。↑ソースコードにも反映させてあります。

※追記@2011/8/29, .NET版とWP7同梱版との互換性をもたせました。また、リソースの扱いを正確にしました。

※追記@2011/10/15, ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリの一部として、より機能向上させライブラリとしてまとめあげたものを公開しました。↑ソースコードをそのまま使うより、ReactivePropertyを参照するほうをお薦めします。

余談:Windows Phone 7

そういえば、WP7ですか?中々良いですよ、日本語さえまともならば。それを抜きにしても、正直なところ想像以上に自由の効かないガチガチ縛りなので、Androiderな方々は絶対気に入らないと思っています。まあでも、私は好きですねえ。初版なのでダメなところが目立つのはしょうがないし、徐々に改善される(といいなあ!)でしょう。良いところに目を向ければ、大変可能性を感じるOSに仕上がってると思います。

ちなみにWP7の開発環境に望むのはJSONサポート何とかしる!ってことですかね。標準だとシリアライザしか入ってないんですよ、ふざけんな死ね。System.Json入れるかJsonReaderWriterFactory入れるかしろって話ですよ本当に。あとdynamicのサポートが入ってくれればDynamicJsonを対応させるんだけどなあ。

今のところ実機にアプリケーションを転送できません。WP7は認証が必要なので(有料)。で、認証のための証明書の発行手続きはしたのですが、その発行を請け負っているGeoTrustという会社があまりにも仕事しないせいでうばばばば。しょうがないので、今やれることは基盤をがっつし固めることぐらいですはい。

C#のEnumを(Javaのように)別の値を持たせるなど拡張する

Enumに文字列を与えたいというのは少なくなくよくあると思います。例えばFruits.Appleには.ToString()したら「リンゴ」と出て欲しいなー、とか。それならFruits.リンゴと、日本語名つければ?というのはごもっとも。でも、同時に「林檎」とも付けたいなー、とかも思ってしまったりするわけです。しません?Java→C#な人が一番不満に思うのはEnumのようですし(JavaのEnumは高機能!)。

例えばこんな風にかけたらいいな、って。

// こうやって属性定義するだけ!
public enum Color
{
    [Japanese("黒"), Hex("000000"), Rgb(0, 0, 0)]
    Black,
    [Japanese("白"), Hex("FFFFFF"), Rgb(255, 255, 255)]
    White,
    [Japanese("赤"), Hex("FF0000"), Rgb(255, 0, 0)]
    Red
}

class Program
{
    static void Main(string[] args)
    {
        var red = Color.Red;
        Console.WriteLine(red.ToHex()); // FF0000
        Console.WriteLine(red.ToJpnName()); // 赤
        Console.WriteLine(red.ToRgb()); // 255000000
    }
}

んね、非常にすっきり定義出来て幸せ度高い。これをもし普通に書くならば

interface IColor
{
    string EngName { get; }
    string JpnName { get; }
    string Rgb { get; }
    string Hex { get; }
}

public class Red : IColor
{
    public string EngName { get { return "Red"; } }
    public string JpnName { get { return "赤"; } }
    public string Rgb { get { return "255000000"; } }
    public string Hex { get { return "FF0000"; } }
}

といった感じになって面倒くさいことこの上ない(いや別にこれクラスの意味あんまなくてnew Color("Red", "赤",..)といった感じにインスタンスでいいやん、という感じではありますががが。Enumはswitch要因なとこがメインなので、そもそもColorという例が良くないかしら...) まあともかく、Enumは素晴らしい。属性もまた素晴らしい。んで、Enumへの別名などの定義の仕組みは非常に単純で、個別のEnumへ拡張メソッドを定義しているだけです。Enumと拡張メソッドは相性が良い。

public static class ColorExtensions
{
    private static Dictionary<Color, RgbAttribute> rgbCache;
    private static Dictionary<Color, HexAttribute> hexCache;
    private static Dictionary<Color, JapaneseAttribute> jpnCache;

    static ColorExtensions()
    {
        // Enumから属性と値を取り出す。
        // この部分は汎用的に使えるようユーティリティクラスに隔離してもいいかもですね。
        var type = typeof(Color);
        var lookup = type.GetFields()
            .Where(fi => fi.FieldType == type)
            .SelectMany(fi => fi.GetCustomAttributes(false),
                (fi, Attribute) => new { Color = (Color)fi.GetValue(null), Attribute })
            .ToLookup(a => a.Attribute.GetType());

        // キャッシュに突っ込む
        jpnCache = lookup[typeof(JapaneseAttribute)].ToDictionary(a => a.Color, a => (JapaneseAttribute)a.Attribute);
        hexCache = lookup[typeof(HexAttribute)].ToDictionary(a => a.Color, a => (HexAttribute)a.Attribute);
        rgbCache = lookup[typeof(RgbAttribute)].ToDictionary(a => a.Color, a => (RgbAttribute)a.Attribute);
    }

    public static string ToJpnName(this Color color)
    {
        return jpnCache[color].Value;
    }

    public static string ToHex(this Color color)
    {
        return hexCache[color].Value;
    }

    public static string ToRgb(this Color color)
    {
        var rgb = rgbCache[color];
        return string.Format("{0:D3}{1:D3}{2:D3}", rgb.R, rgb.G, rgb.B);
    }
}
// 属性などり
[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class JapaneseAttribute : Attribute
{
    public string Value { get; private set; }

    public JapaneseAttribute(string value)
    {
        Value = value;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class RgbAttribute : Attribute
{
    public int R { get; private set; }
    public int G { get; private set; }
    public int B { get; private set; }

    public RgbAttribute(int r, int g, int b)
    {
        R = r;
        G = g;
        B = b;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class HexAttribute : Attribute
{
    public string Value { get; private set; }

    public HexAttribute(string value)
    {
        Value = value;
    }
}

少し、処理が多いですかね。属性定義の部分はスルーでいいので、本質的にはToJpnNameなどの拡張メソッド定義の部分と、静的コンストラクタでEnumから属性と値を取り出してる部分だけなので、そんなでもないです。静的コンストラクタを使っているのは、ここでDictionaryに突っ込むことで、毎回リフレクションがゴニョゴニョと走り出すのを避けています。

属性は固定で複数を付加するなら名前付き引数でもいいですね。

まとめ

まとめというか、このネタは以前にも enumの日本語別名とか三項演算子ネストとか という記事で書いてたりはしたのですが、もう少し実用的になるような形にしました。

C#のEnumは、私は好きです。素朴というか、シンプルで使いやすいですよね。Visual Studioでswitch使うと一気に列挙してくれるのもいいし、勿論ビットフラグも。機能だけならクラス立てれば代替出来ないこともないけれど、色々と遥かに面倒くさくなる。Enumはシンプルに、すっきり書けるのが素敵。Javaは高機能(コンパイル時にクラス作るわけですしね)なのはいいとしても、EnumSetとか、なんだか大仰で、その辺があまり好きではない。

この拡張メソッドは、隙間を少し埋めてくれるかな、と思います。

イベントベースの非同期処理をReactive Extensionsで扱う方法

Reactive Extensionsを使用して複数のサービス非同期コールを連続して呼べるか試してみた。その2 - y_maeyamaの日記 という記事にて、Rxを使ってWCFを綺麗に呼んでました。なるほど!Silverlightで!WCFで!非同期コールを連続して呼ぶ!とっても実践的なテーマ。XboxInfoほげほげがどうのこうの、とかやってたどこかの私と違います、げふんげふん。とても素晴らしいテーマとコードだと思ったので、拝借して私も少し書いてみました。

// 実行部分(実行後10秒後ぐらいにWCFサービスの連鎖を経てメッセージボックスに結果を表示)
var client = new ServiceReference1.Service1Client();

var asyncQuery = from a in Observable.Defer(() => client.GetAAsObservable("てすと"))
                 from b in client.GetBAsObservable(a.EventArgs.Result)
                 from c in client.GetCAsObservable(b.EventArgs.Result)
                 select c.EventArgs.Result;

asyncQuery.Subscribe(s => MessageBox.Show("チェーン終了 - " + s));

「前に呼んだサービスの引数にアクセス」に関しては、SelectManyで匿名型を作る、が解だと思います。で、幾つも連鎖する時はクエリ構文を使うのが大変お手軽。ただしクエリ構文は独自定義のメソッドと繋がりが悪くなるという欠点があります。RxではDoとかObserveOnとか結構使いますから、クエリ構文、便利な用で使いどころに困る代物。その辺は実際に書く内容によって判断つけるところですねー。

XxxAsObservable

突然出てきているGetAAsObservableって何だ、といったら、勿論、拡張メソッドです。FromEventやFromAsyncPatternなどはコード本体に書いてあると鬱陶しいので、基本は拡張メソッドで隔離してしまいましょう。命名規則に特に決まりはありませんが、私はXxxAsObservableという規則で書いています。今回はWCFのサービス参照で自動生成されたイベントベースの非同期処理のラップなので、FromEventをベースに、ただし少し小細工を。

public static class IObservableExtensions
{
    // イベントベースの非同期はRxと今一つ相性が悪いので変換する
    public static IObservable<IEvent<T>> ToAsynchronousObservable<T>(this IObservable<IEvent<T>> source) where T : EventArgs
    {
        var connectable = source
            .Take(1) // 実行は一回のみ(これしとかないとイベントハンドラの解除がされないし、そもそもPruneが動かなくなる)
            .Prune(); // AsyncSubjectに変換
        var detacher = connectable.Connect(); // コネクト(イベント登録実行)
        return connectable.Finally(() => detacher.Dispose()); // 任意のDispose時にイベントのデタッチ(なくても構いません)
    }
}

public static class Service1Extensions
{
    // 量産なので多いようならT4テンプレートで生成してしまうと良いでしょう
    public static IObservable<IEvent<GetACompletedEventArgs>> GetAAsObservable(this Service1Client client, string key)
    {
        var o = Observable.FromEvent<GetACompletedEventArgs>(
                h => client.GetACompleted += h, h => client.GetACompleted -= h)
            .ToAsynchronousObservable();
        client.GetAAsync(key); // 非同期実行開始
        return o;
    }
    
    // あと二つほど
}

ふつーにFromEventをラップして終わり、ではなくて少々細工を仕込んでいます。理由は、イベントベースの非同期処理はそのままだとRxでは扱いづらいから。

イベントベース非同期処理とReactive Extensions

.NET Frameworkには基本的に2つの非同期処理の方法があります。一つはBeginXxx-EndXxxによるAPM(非同期プログラミングモデル)。もう一つはXxxAsyncとXxxCompletedによるイベントベースの非同期処理。イベントベースは素で扱う分にはAPMよりもずっと書きやすかった。だからWebClient、BackgroundWorkerなど、手軽に使える系のものに採用された(というのが理由かは知りませんが)。そしてサービス参照の自動生成のものもイベントベース。

しかし、ラッピングして使う場合はとにかく使いづらい!一度の登録で何度も何度も実行されてしまうことは合成時に都合が悪い。また、ラップしたメソッドと、引数を渡す処理の実行箇所が離れてしまうことは(FromEvent.Subscribe で登録して client.GetAsync で実行)書くのが二度手間になり面倒、という他に、Subscribeよりも前に非同期実行して、更にSubscribeよりも前に非同期実行が完了してしまった場合は何も起こらなくなる。といった実行タイミングの面倒くさい問題まで絡んでくる。

というわけで、イベントベースの非同期処理をReactive Extensionsに載せる場合は、ただたんにFromEventで包むだけではなく、一工夫することをお薦めします。

それが上で定義したToAsynchronousObservable拡張メソッド。これはRxを使って非同期プログラミングを簡単にの時に少し出しました。 source.Take(1).Prune() ですって! ふむ、よくわからん。一行ずつ見ていくと、sourceは、この場合はFromEvent後のものを想定しています(なので拡張メソッドの対象はIObservable<IEvent>)。それをTake(1)。これは、ふつーの非同期処理では実行完了は1回だけだから、それを模しています。イベントベースのラップなので何もしないと無限回になっていますから。

続けてPrune。これは内部処理をAsyncSubjectに変換します。AsyncSubjectに関してはReactive Extensionsの非同期周りの解説と自前実装で簡易実装しましたが、処理が完了(OnCompleted)まで待機して、完了後はキャッシュした値を流すという、非同期処理で不都合(実行タイミングの問題など)が起こらなくするようにしたもの。対になるのはSubjectでイベント処理を模したもの。FromEventを使うと内部ではSubjectが使われることになるので、それを非同期で都合が良い形であるAsyncSubjectに変換する、ためにPruneを使いました。そしてここでConnectすることで即座にイベントにアタッチします(実行タイミングの前後の問題をなくすため)。

awaitは救いの手?

Reactive Extensionsは準備さえ済んでしまえば、非同期が恐ろしく簡単に書ける、のですが、如何せん準備が決して簡単とは言いません。さて、それがC# 5.0のasync/awaitは魔法のように解決してくれるのか、というと、そんなことはありません。現在のAsync CTPではAsyncCtpLibrary.dllにAsyncCtpExtensionsというクラスが用意されていて、その中には既存クラスに対して涙ぐましいまでにAsyncで使うのに最適なようにとXxxAsync拡張メソッドが大量に定義されています。例えばWebClientのDownloadStringにはDownloadStringTaskAsyncが。少しその中身を見てみれば、上で書いたものとやっていることは同じようなもの。イベントを登録して、完了時にはイベントハンドラを解除して。

既存クラスは用意してもらったのがあるからいいけど、サービス参照のような自動生成されるクラスにたいしてはどうすれば?答えは勿論、自分で書こうね!orz。まだまだ、そんな受難な時代は続きそうです。でも、Rxは一度分かってしまえばTake(1).Prune()で済むし、T4で自動生成もそんな難しくない雰囲気なので、まあ悪くはないんじゃないでしょーか。悪いのはむしろイベントベースの非同期処理なのでは、という気がしてきた昨今。

まとめ

今月はまだまだ非同期周りの記事を書くよ!いっぱい書くよ!

そういえばで、今回のソースをBitbucketに上げました。今後も何か書くときは合わせてBitbucketに上げていきたいと思っています。

neuecc / EventToAsync / overview – Bitbucket

RSSなどのアイコンが並んでいる部分の一番右のget sourceからzipで落とせます(Bitbucketはナビゲーションが少しわかりづらいのよねえ、Mercurialベースで非常に快適なソースホスティングサービスだとは思うのですが)。また、RxのライブラリはNuGetを使って参照しています(なのでRx本体のインストールがしてなくても試せます、かわりにNuGetが必要?不必要?その辺まだ仕組みがよく分かってないのですが……)。以前はNuPackっていう名前でしたが、名前が被るとかで変更されたそうです。VS2010の拡張機能マネージャのオンラインギャラリーからもVS拡張がインストール出来るので、とりあえずお薦め。色々なライブラリが簡単にインストール出来ます。

私もlinq.js登録しちゃおうかな、かな……。まあ、Fixしなきゃいけないことがかなり溜まってるので、まずはそれやれよって話なのですががが。

実践例から見るReactive Extensinosを用いた非同期処理

私はXboxInfoTwitという、Xbox.comからデータをスクレイピングで取り出してTwitterに投稿するという、大変不届き者なアプリケーションを公開しています。お陰様で認証者数も2500人を超えて、割と活況。このアプリケーションでのデータ取得ですが、正規なルートでの情報取得ならばAPI叩けば一発、というケースも少なくないですが、いかんせんルートがアレなので一回の取得で取れる情報は断片。あちらこちらから値を受け渡し組み立てなければならないという、入り組んだ通信手順になっています。これを、もし全部非同期でやろうとしたら目眩がするなあ。ふむ。そこでReactive Extensions。むしろ良いネタがあるじゃないか!というわけでクローラーのコア部分を完全非同期 + Reactive Extensinosで全面的に書きなおしてみました。

neuecc / XboxInfoTwitCore / overview – Bitbucket

dllでライブラリという形体を取っていますが、基本的には誰にも使い道がないものかと思われます。というわけで、このXboxInfoTwitCoreの内容自体はどうでもいいとスルーして、この現実的な課題(?)にReactive Extensinosがどう簡単にしてくれたかを見ていきます。

非同期通信の連続

まずは、手順について図で。最近、パワーポイントでのポンチ絵を書く練習を兼ねて、しかしこんなヘタクソで大丈夫か??

プレイ状況の取得は、MyXbox/Profileから取得、実績内容の取得はそこからゲームのタイトルIDを取り出して、個別のゲーム画面のURLを作り取得。最終的には両者を結合してデータを作り出します。ただし、ログイン前にMyXbox/Profileにアクセスすると認証フォームにリダイレクトされるので、そこでごそごそと処理してlive.comの認証クッキーを取得する必要があります。この認証クッキー取得が若干ややこしい。一度分かってしまえばそんなでもないのですけれど、ちょびっと嫌らしい造りになっています(恐らくこの認証部分はWindows Liveで共通だと思うので、SkyDriveをモニョモニョしたい、など考えている人は参考にでもどうぞ)

ちなみに非同期のPOSTってのは、BeginGetRequestStreamで非同期でRequestStreamを呼び出して書き込む(POST)してから、BeginGetResponseで非同期でレスポンスを取得するので、ネスト量2倍。クッキー認証部分だけで、普通に書くと7つぐらいネストするわけです……。

Asynchronus Query

細部を見る前に、最終的にどうなったか、を。

var fetcher = new LiveTokenFetcher("userid", "password");
var locale = new Locale("ja-JP", "日本");

// 非同期クエリ組み立て
var asyncQuery =
    from crawler in
        from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
        select new XboxInfoCrawler(cookie, locale)
    from pi in crawler.GetPlayerInfo()
    from ai in pi.GameInfo.TitleId != 0
        ? crawler.GetAchievement(pi.GameInfo.TitleId)
        : Observable.Return<AchievementInfo>(null)
    select pi.Tap(x => x.GameInfo.AchievementInfo = ai); // Tapについては解説しな...い

// 実行
asyncQuery.Subscribe(pi => Console.WriteLine("{0} - {1}", pi.GamerTag, pi.PlayerState));

(私にしては珍しく)クエリ構文を使っていますが、こう見ると本当に、非同期処理がLinqに統合された形で問い合わせられる、ということが良く伝わるのではないでしょうか?処理の順序が一見イミフ、というのは慣れの問題です!解説します。

GetLoginCookie、GetPlayerInfo、GetAchievementがIObservable<T>を返す、非同期処理。GetLoginCookieはポンチ絵で言うところの、クッキー取得で、GetPlayerInfoが中央左、GetAchievementが中央右で、最後のselectがマージというわけです。ややこしくネストするはずの非同期が非常にあっさりと記述できました。

GETとPOSTの連続

一番面倒くさいログインクッキー取得の部分について。

public IObservable<CookieCollection> GetLoginCookie(string url)
{
    return Observable.Defer(() => CreateWebRequest(url).GetResponseAsObservable())
        .Select(res => res.TransformResponseHtml(xml => new
        {
            Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
            PostUrl = GetPostUrl(xml),
            PPFT = GetInputValue(xml, "PPFT"),
            PPSX = GetInputValue(xml, "PPSX")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post, a.Cookie)
            .UploadValues(new Dictionary<string, string>
            {
                {"LoginOptions", "3"},
                {"PPFT", a.PPFT},
                {"PPSX", a.PPSX},
                {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
                {"login", loginId},
                {"passwd", password}
            }))
        .Select(res => res.TransformResponseHtml(xml => new
        {
            PostUrl = GetPostUrl(xml),
            Anon = GetInputValue(xml, "ANON"),
            T = GetInputValue(xml, "t")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post)
            .Tap(r => r.AllowAutoRedirect = false)
            .UploadValues(new Dictionary<string, string>
            {
                {"ANON", a.Anon},
                {"t",Uri.EscapeDataString(a.T)}
            }))
        .Select(res => ConstructCookie(res.Headers["Set-Cookie"]));
}

となってます。長くて一瞬ウゲ、となりますが、基本的にはSelectとSelectManyしか使っていません。Selectは流れてくる値を変換。SelectManyは、次の非同期処理の実行。そう覚えて貰えれば、どうでしょう?

先頭から見るとurl(MyXbox/Profile)に対して非同期でのGetResponseを開始。認証フォームにリダイレクトされるので、HTMLをスクレイピングしてPOSTに必要なデータ(PPFT,PPSXなど)を集める。そして非同期でポスト(UploadValuesは、詳細はソースコードを参照してもらうとして非同期で投稿してIObservable<WebResponse>を返す自前定義の拡張メソッドです)。続いて、スクレイピングして次のPOSTに必要なデータ(ポスト先URL,Anon,T)を生成、そして再びUploadValuesで非同期ポスト。するとWebResponseに認証クッキーがあるので、ヘッダーの"Set-Cookie"からクッキーを取ってCookieCollectionを生成したものを返す。

となってます。流れるように!また、最後がSelectで終わっているように、流れは終わっていません。利用者が、ここから更に非同期での流れを繋いでいくわけです。

SelectとSelectMany、そしてクエリ構文

御大層なことを言いつつ、SelectとSelectManyしか使ってないじゃないか!というと、はい、その通りです。それでいいんです。ふつーのシチュエーションでの非同期処理って、そういうものでしょう?データを加工するSelectと、ネストをフラットにするSelectMany。これだけ覚えれば十分であり、そして、役立ちなのです。

ところで、冒頭の例で(メソッド構文信者な私が)何でクエリ構文を使ったかというと、そこそこ綺麗に見えるかなー、とか思ったのともう一つは、割と必要に迫られたから。書き方は色々あります。例えばクエリ構文をベースにするなら、他にも

// クエリ構文とメソッド構文を混ぜるのも時には分かりやすさに繋がる
from crawler in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
                       .Select(cookie => new XboxInfoCrawler(cookie, locale))
from pi in crawler.GetPlayerInfo() // 以下略

// select intoを使うことで完全フラットに流すことも可能です
from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
select new XboxInfoCrawler(cookie, locale) into crawler
from pi in crawler.GetPlayerInfo() // 以下略

と出来るでしょう。特に後者のselect intoを使った書き方は冒頭のものより良いかもしれません。ネストして順番が上下するのは複雑さの現れになりますから。では、メソッド構文で書くと?

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale)) // クッキーをラップしたもの
    .SelectMany(crawler => crawler.GetPlayerInfo())
    .SelectMany(playerInfo => crawler.GetAchievement(playerInfo.GameInfo.TitleId));
    // と、書きたいのですが、SelectManyでplayerInfoに変換されているのでcrawlerはスコープ外でこうは書けない!

SelectManyは、前方の値が使えません。GetLoginCookieのように前の値は一切使わないのならば何ら問題ないのですが、GetAchievement(実績取得)はクッキーとGetPlayerInfoで得られたタイトルIDの"両方"が必要。この両方、というシチュエーションの時に、上から流れて変形するだけのSelectManyだと非常に、書きづらい。ではメソッド構文ではどうするか、というと、クエリ構文のように考える。

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale))
    .SelectMany(crawler => crawler.GetPlayerInfo().Select(playerInfo =>
        new { crawler, playerInfo })
    .SelectMany(a => a.crawler.GetAchievement(a.playerInfo.GameInfo.TitleId)));

下側のフローに二つ以上の値を持ち越したいのならば、二つ以上の値を格納した匿名型を作ればいいぢゃない。ということですね。クエリ構文の実態もこうなっています。正直、メソッド構文でこれを書くのは二つだけならまだいいですが、幾つも繋いでいくとなると相当汚くなるので、これはもう素直にクエリ構文の出番だな、と私は思いました。

奇怪なるクエリ構文の実態については多重 from の展開結果 - ++C++; // 管理人の日記で書かれています。って、よくみるとTrackbackに昔の私が送ってますね、ふむふむ、Linq to ObjectsのJavaScript移植であるlinq.jsで書くとこうなるようですよ?

// クエリ
var apart = Enumerable.Range(1, 5);
var query = apart
   .SelectMany(function(baker){ return apart
   .SelectMany(function(cooper){ return apart
   .SelectMany(function(fletcher){ return apart
   .SelectMany(function(miller){ return apart
   .Select(function(smith){ return {
       baker: baker, cooper: cooper, fletcher: fletcher, miller: miller, smith: smith}})})})})})
   .Where("Enumerable.From($).Distinct('$.Value').Count() == 5")
   .Where("$.baker != 5")
   .Where("$.cooper != 1")
   .Where("$.fletcher != 1 && $.fletcher != 5")
   .Where("$.miller > $.cooper")
   .Where("Math.abs($.smith - $.fletcher) != 1")
   .Where("Math.abs($.fletcher - $.cooper) != 1");

// 出力
var result = Enumerable.From(query.Single()) // 答えは一つなのでSingle,そしてobjectをKeyValuePair[]に分解
   .OrderBy("$.Value")
   .ToString(", ", "$.Key + ':' + $.Value"); // シーケンスを文字列に結合

alert(result); // smith:1, cooper:2, baker:3, fletcher:4, miller:5

linq.jsは"完全な"移植なので、.NETで動くコードが動かない、ということはありません。JavaScriptにもLinqの革命を!と、宣伝はさておき、SelectManyで下方に値を持ち出しているのに、毎回匿名型を作っていません。SelectManyのresultSelectorの部分でネストさせることで、値を下の方まで持ち運ぶ事が可能です。これはJavaScriptだけでなくC#でのLinq/Rxでも有効です。一つのパターンというかテクニックというか。C#ではわざわざこんなことやるぐらいならクエリ構文使えという気がしますが、クエリ構文の使えないReactive Extensions for JavaScript(RxJS)では活きるかもしれません。

ただ、この手のインデントの細工による対策は、IDEの自動整形と相性悪いんですよね。上のSelectManyのものも、コードフォーマッタにかけるとボロボロに崩れ去ります。基本的には私は、インデントは機械任せでやるべき。人が調整するものではないし調整してはならない。と思っているので、残念ながら上記テクニックは使うことはないかもなー、いや、どうだろう、積極的にはやらないという程度かしら。

The Future of C#(async/await)

つい数日前に開催されたPDC2010で、C#設計者のAnders Hejlsbergが、The Future of C#と題してC#5.0(とは言ってませんが、恐らくそうなる)のFeatureを語りました。一つは今まで言われていたCompiler as a Service。もう一つが、asynchronusの言語統合。C# 2.0でのyield returnによるIEnumerable生成のようなコンパイル時生成で非同期をコードの見た目上、完全に同期のように扱うことが出来ます。

Asynchronous Programming for C# and Visual BasicにてVisual Studio Async CTPが出ていて既に試してみることが可能なので(要:英語版VS2010)、実際にXboxInfoTwitCoreをasync/awaitで書き直してみました。全体はbitbucketのリポジトリ上にあるので興味あればそちらもどうぞ。以下は、ログインクッキーを取得する部分。

private async static Task<WebResponse> UploadValuesAsync(WebRequest request, IDictionary<string, string> parameters)
{
    var bytes = parameters.ToQueryParameter().Pipe(Encoding.UTF8.GetBytes);
    using (var stream = await request.GetRequestStreamAsync())
    {
        await stream.WriteAsync(bytes, 0, bytes.Length);
    }
    return await request.GetResponseAsync();
}

public async Task<CookieCollection> GetLoginCookie(string url)
{
    var res = await CreateWebRequest(url).GetResponseAsync();
    var prepare = res.TransformResponseHtml(xml => new
    {
        Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
        PostUrl = GetPostUrl(xml),
        PPFT = GetInputValue(xml, "PPFT"),
        PPSX = GetInputValue(xml, "PPSX")
    });

    var req2 = CreateWebRequest(prepare.PostUrl, MethodType.Post, prepare.Cookie);
    var res2 = await UploadValuesAsync(req2, new Dictionary<string, string>
    {
        {"LoginOptions", "3"},
        {"PPFT", prepare.PPFT},
        {"PPSX", prepare.PPSX},
        {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
        {"login", loginId},
        {"passwd", password}
    });

    var prepare2 = res2.TransformResponseHtml(xml => new
    {
        PostUrl = GetPostUrl(xml),
        Anon = GetInputValue(xml, "ANON"),
        T = GetInputValue(xml, "t")
    });

    var req3 = CreateWebRequest(prepare2.PostUrl, MethodType.Post);
    req3.AllowAutoRedirect = false;

    var res3 = await UploadValuesAsync(req3, new Dictionary<string, string>
    {
        {"ANON",prepare2.Anon},
        {"t",Uri.EscapeDataString(prepare2.T)}
    });

    return ConstructCookie(res3.Headers["Set-Cookie"]);
}

まあ見事にベッタベタに書かれているのが良く見えますねえ。このasync/awaitですが、非同期として扱うものにasyncを宣言したメソッドを用意。あとは同期のように書くだけ。yield returnのかわりにawait + XxxAsync と書くだけ。それで、まるで同期のように書けてしまいます。あまりにもお手軽。魔法のように。Silverlightのネック(そして初心者キラー)なところは、分かりづらい非同期でしたが、ウルトラC的に解決……。

この発表を見た時は普通にショックで、更に一日経って冷静に考えると更にショックで寝こむ勢いでしたね(笑) 正直なところ、被ります。単純な非同期の簡易化という点だけで考えればめっちゃ被ります。それはTaskとRxが被るよねー、Rxのほうが書きやすいっすよー、とか少し前に言っていた程度にはモロ被ります。実際問題Taskはあまり使いやすいとは言えないのですが、コンパイラサポートがガッツシ来てしまったら、それは話は別ですな。

勿論、RxのメリットはIEnumerable<T>や、イベントやタイマーなどの他のIObservable<T>とのシームレスな連携にもあるので、非同期を利用するシーンにおいてasync/awaitが完全に置き換えるもの、とは言いませんけれど。そして、私はそういった統合っぷりに魅力を感じているのでasync/awaitはあまり使わないかなー、という気がしてなくはないのですが、ちょっとその辺、判断が難しい。

さて、ところで両者を置き換えるのは非常に簡単です。戻り値がTask<T>かIObservable<T>か。RxではSelectManyだった位置にawaitを置く。それだけです。これは覚えておいて損はないかもです。そういう意味でも普通にかぶってるよな、とか思いつつ。awaitが分かればRx-SelectManyも簡単だし、その逆もまた然り。

SilverlightとWindows Phone 7

Silverlightは同期APIがないので、Rxで組むことで互換性が狙えます。このXboxInfoTwitCoreはフル非同期・フルRxなので、Silverlight/Windows Phone 7にも対応させ、られませんでした!えー。CookieExceptionが発生したり取れる最終的なCookieが何か違ったりと、Cookieが鬼門すぎて無理でした、とほほほ。マジワケワカラン。ええと、一応はWPF-Silverlight-WP7で完全なコード共有を実現する、という点もRxの強力な武器であり、使う理由になるとは思っています。それを示したかったのですが、うーむ。完全に同一なファイルで非同期としての挙動は問題なく取れているのですが、実際にデータが出せないとねえ……。カッコワルイ。

SgmlReader for Silverlight/Windows Phone 7

XboxInfoTwitCoreではHtml to Xml変換にSgmlReaderを使用しています。以前に紹介を書きましたが、これは本当に重宝します。ストリームとXElement.Loadの間に置くことで、不正なXMLである(ためXElement.Loadに渡すと例外が発声する)ネット上のHTMLをLinq to Xmlで扱えるようになります。しかし、元のコードベースが古いのと開発が活況とは言い難い状況であるために、SilverlightやWindows Phone 7に対応していません。大変困った。困ったので、Silverlight/Windows Phone 7で動くように少々コード弄ったところ、問題なく動いた!ので、こちらもbitbucketにあげときました。

neuecc / SgmlReader.SL / overview – Bitbucket

基本的にはデスクトップ版と全く同じ感覚で使えます。

var req = WebRequest.Create("http://google.com/");
req.BeginGetResponse(ar =>
{
    var res = req.EndGetResponse(ar);
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    using (var sgmlReader = new SgmlReader { DocType = "HTML", CaseFolding = CaseFolding.ToLower, InputStream = sr })
    {
        var xml = XElement.Load(sgmlReader); // これ。
        Dispatcher.BeginInvoke(() => MessageBox.Show(xml.ToString()));
    }
}, null);

プロパティにURLを渡すと例外で落ちます(同期通信は許可されていません!)。というわけで、WebRequestなどから非同期でウェブからStreamを取り出して、それをInputStreamプロパティに渡すという形を取ってください。この制限は、SilverlightではXElement.Load("url")が許可されていないのと同じことです。

XboxInfoTwitの今後

ここまで見てくれた人がXboxInfoTwit本体の利用者とは思えません!のですが、とりあえず書きますと、コア部分は書き換わったので、あとはGUIというか設定画面を複数言語に対応させてCodePlexで公開。を目指しています。が、いかんせんGUIは難敵です……。今のXboxInfoTwitも一応WPFなんですが見よう見まねで組んだXAMLで汚いので、きっちりと綺麗に書き換えたい、のですが、それをやるにはあまりにもWPF/XAMLの知識がなさすぎる。そのため、今日明日でフル改装で公開!というわけにもいきません。もう少し時間がかかりそうです。

Twitterへの投稿に関しては、以前書いたReactiveOAuth(完全なRxベースのOAuthライブラリ)があるので、シームレスに統合出来そう(こちらも幾つか課題が溜まっているので更新しないと……)。あとは、今使ってる自動アップデートプログラムがタコな出来なので、これも作り直したいなあ。なんて思っていたり。

まとめ

ふつーの非同期で書いてネストしまくりで、こんなの書いてられないだろほれみろバーカバーカ、とか言ってみたかったのですが気力がなくてそれは断念。ともあれ、今ある非同期の厳しさへの現実解として、Rxはアリ、です。非同期処理が、かなり綺麗にLinqクエリとして溶け込む様は分かるのではないかと思います。C#5.0のasync/awaitと比較してみても、awaitの位置にSelectMany、というだけの話ですしね。そして何よりも、async/awaitは今日明日に出るわけじゃないのです! Visual Studio 2005 -> 2008 -> 2010のペースで考えるならば2013年。まだ2年も先です。

async/awaitがないから「しょうがなく」使う、ってスタンスもアレゲ。いえいえ、喜んで使うのです。Rxならではのメリットが、当然あります。ただの非同期処理の一本化というだけに終わらず、イベントやタイマーなど他のソースとの融合は魅力です。そう、そもそも、Rxの持つ側面は3つ。「Asynchronus」「Event」「Pull to Push」。ここ最近はAsynchronusのことしか書いてなかったですが、他の2つのほうも見逃せないというか、むしろRxは最初はLinq to Eventsとして紹介されていたぐらいだしEventのほうがメインですよ?

ところで話は全く変わりますが、Bitbucket使い始めました。バージョン管理は当然Mercurialです。Mercurialは非常にいいです。もうSubversionは使う気になれない。Team Foundation Serverは使ったことないので知らないけれど(あれはオールインワンなところに価値がある気がしつつ、まあ、重量級ですよね)Mercurialのサクサク感は、バージョン管理はしっくりこないんです!とかほざいていた私にガツーンとバージョン管理がないともう生きていけない!ちょっとしたプロジェクトもhg initするし!な心構えに変えさせる威力がありました。あとちょっとした変更でもローカルでコミット、な素敵さとか。

TortoiseHgは、若干UIの導線がイケてないとは思いつつ、まあまあ使いやすく(日本語化もデフォルトでされています←要環境変数にLANG=ja)、Visual Studio拡張のVisualHGも素敵な使い勝手。というか、UI部分はTortoiseHgを呼び出しているだけというシンプルさがいい。私はEclipse下ではMercurial Eclipseを使ってますが、こちらは独自にがっつし作りこんであって、それが今一つな使い勝手で。私はVisual Hgの方向性を支持します。

最後に、毎回嘘っぱちな次回予告ですが(SelectManyの解説をすると言い続けて半年以上経過)、ポーリングをイベントとしてRx化してPushで共通化とか何とかかんとか、を予定。引き続き題材はXboxInfoほげほげを使うつもりです。乞うご期待しない。

XboxInfoTwit - ver.2.3.0.1

Xbox.comがリニューアルされました!というわけで新Xbox.comに対応しました。それだけです!そして、Xbox.comの構造が変わったお陰で取得が軽量化されました(情報取得のための巡回ページ数が大幅削減されたため)。ヨカッタヨカッタ。あまり良い評判を聞かない新Xbox.comですが、こういうところで良くなってますよー、と。地味な変化としては、今回からフレンドが0な場合でも取得/投稿できるようになりました。たまにフレンドいないから使えない!という嘆きを見かけるので、ちゃんと使えるようになってヨカッタヨカッタ。

例によってテストが非常に不十分なため、ゲームタイトルによっては投稿できなかったりするかもしれません。もし怪しいところがあったら、Twitterで投稿文に「XboxInfoTwit」を含めてポスト、例えば「GoW2がXboxInfoTwitで動かないんだけど何これ」とか言ってもらえれば、検索経由で発見しますので気楽に文句書いてください。(GoW2は動きますけど)

AnonymousComparer - ver.1.3.0.0

ver 1.3というか、バグフィックスです。ToDictionaryとかだと拡張メソッドが本来あるのとオーバーロードが被ってて使おうとするとコンパイル通らなかったのです。気づいてたんだけど放置してました、すみませんすみません。というわけで幾つかのものを削りました。これでコンフリクトなし。

どんなものかというと、こんなものです。最初投稿した時のの流用で(こらこら)

class MyClass
{
    public int MyProperty { get; set; }
}

static void Main()
{
    // 例として、こんな配列があったとします
    var mc1 = new MyClass { MyProperty = 3 };
    var mc2 = new MyClass { MyProperty = 3 };
    var array = new[] { mc1, mc2 };
    // Distinctは重複を取り除く。でも結果として、これは、2です。
    var result = array.Distinct().Count();
    // 参照の比較なので当然です。では、MyPropertyの値で比較したかったら?
    // DistinctにはIEqualityComparerインスタンスを受け付けるオーバーロードもあります
    // しかしIEqualityComparerはわざわざ実装したクラスを作らないと使えない

    // そこで、キー比較のための匿名Comparerを作りました。
    // ラムダ式を渡すことで、その場だけで使うキー比較のIEqualityComparerが作れます。
    array.Distinct(AnonymousComparer.Create((MyClass mc) => mc.MyProperty));

    // でも、長いし、型推論が効かないから型を書く必要がある
    // Linqに流れているものが匿名型だったりしたら対応できないよ!
    // というわけで、本来のLinqメソッドのオーバーロードとして、記述出来るようにしました
    // ちゃんと全てのIEqualityComparerを実装しているLinq標準演算子に定義してあります
    array.Distinct(mc => mc.MyProperty);

    // 短いし、型推論もちゃんと効くしで素晴らしいー。
    // 匿名型でもいけます(VBの匿名型はC#(全ての値が一致)と違ってKey指定らしいですね)
    var anonymous = new[] 
    {
        new { Foo = "A", Key = 10 },
        new { Foo = "B", Key = 15 }
    };
    // true
    anonymous.Contains(new { Foo = "dummy", Key = 10 }, a => a.Key);
}

つまり、LinqのIEqualityComparerのオーバーロードうぜえ、何がインターフェースだよクソが、Linqならラムダ式だろ、インターフェースとかJava臭いんだよ。無名クラス(別に欲しくはないけど)がないから作るの面倒なんだよ。ということです。あるとそれなりに便利です。

Windows Phone 7で同期APIを実現するたった つの冴えないやり方

Windows Phone 7が発表されました。中々に素晴らしい仕上がりに見えます。米国では来月発売と非常に順調そうですが、日本では…… ローカライズが非常に難しそうに見えました。発売されること自体は全然疑っていませんが、問題は、米国で達成出来ているクオリティをどこまで落とさず持ってこれるか。日本語フォントや日本語入力、今一つなBing Map、但し日本は除くなZune Pass。本体だけではなく、周辺サービスも持ってきて初めてWindows Phone 7の世界が完成する。ということを考えると、大変難しそう。

その辺はMicrosoft株式会社に頑張ってもらうとして、一開発者的には淡々とアプリ作るだけでする。というわけで、標題のお話。WP7というかSilverlightと、そして例によっていつもの通り、Rxの話です。

問題です。以下のコードの出力結果(Debug.WriteLineの順序)はどうなるでしょうか。

// 何も変哲もないボタンをクリックしたとする
void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    // 10秒以内にレスポンスが来るとする
    var req = WebRequest.Create("http://bing.com/");
    req.BeginGetResponse(ar => Debug.WriteLine("async"), null);

    Thread.Sleep(10000); // 10秒待機
    Debug.WriteLine("end");
}

答えは後で。

Dispatcher.BeginInvokeとPriority

Dispatcherとは何ぞやか。について説明するには余白が狭すぎる。ので軽くスルーしてコードを。Dispatcher.BeginInvokeは通常は別スレッドから単発呼び出しが多いですが、UIスレッド上でDispatcher.BeginInvokeを呼ぶとどうなるでしょう?

// (WPF)何も変哲もないボタンをクリックしたとする
void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("normal1")), DispatcherPriority.Normal);
    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("background")), DispatcherPriority.Background);
    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("normal2")), DispatcherPriority.Normal);
            
    Debug.WriteLine("end");
}

結果は、start->end->normal1->normal2->backgroundです。なおDispatcherPriorityはWPFでは設定可能ですが、Silverlightでは設定不可で、内部的には全てBackgroundになります。挙動は以下の図のようになっています。

一番上のブロックが現在実行中メソッド。下のがDispatcher。BeginInvokeで実行キューに優先度付きで突っ込まれて、現在実行中のメソッドが終了したら、キューの中のメソッドが順次、優先度順に実行されます。といったイメージ。

問題の答え

冒頭の問題の答えは、WPFではstart->async->endの順。Silverlight(WP7も含む)ではstart->end->asyncの順になります。ええ。WPFとSilverlightで挙動が違うのです!今更何をっていう識者も多そうですが(Silverlightももう4だしねえ)私ははぢめて知りました。はまった。BeginGetResponseはWPFでは(というか普通の.NET環境では)そのまま別スレッド送りで実行されますが、Silverlightでは一旦Dispatcherに突っ込まれた後に実行されるのですねー、といったような雰囲気(なので一つ前でDispatcher.BeginInvokeがどうのという話を挟みました)。

Silverlightでは、BeginGetResponseはすぐには実行されない。それを踏まえて次へ。

非同期 to 同期

非同期を同期に変換してみましょう。Reactive Extensions for .NET (Rx)で。

void Button_Click(object sender, RoutedEventArgs e)
{
    // 非同期を同期に変換!
    var req = WebRequest.Create("http://bing.com/");
    var response = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse).Invoke()
        .First();
    Debug.WriteLine(response.ResponseUri);
}

Rxで非同期を包むと長さ1のReactiveシーケンスとなるので、Firstを使うと同期的に値を取り出せる、という話を前回の記事 Rxを使って非同期プログラミングを簡単に でしました。そして実際、上のコードはWPFでは上手く動きます。きっちりブロックして値を取り出せる。勿論、それならGetResponseを使えよという話ではありますが。

では、Silverlight(勿論WP7でも)では、というと…… 永久フリーズします。理由は、BeginGetResponseはDispatcherに積まれた状態なので、現在実行中のメソッドを抜けない限りは動き出さない。Firstは非同期実行が完了するまでは現在実行中のメソッドで待機し続けるので、結果として、待機しているので実行が始まらない=実行完了は来ない→永遠に待機。になります。

結論としては、UIスレッド上で同期的に待つことは不可能です。代替案としてはThreadPoolで丸々包んでしまうということもなくはない。

void Button_Click(object sender, RoutedEventArgs e)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        var req = WebRequest.Create("http://bing.com/");
        var response = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
            .Invoke()
            .First();
        Debug.WriteLine(response.ResponseUri);
    });
}

こうすれば、BeginGetResponseが発動するので問題なく待機して値を取り出せます。でも、これじゃあ全然嬉しくもない話で全く意味がない。Rxで包んでいる状態ならば、.Subscribeでいいぢゃん。ということだし。

// 非同期を同期に、そんなことは幻想なのでこう書くのがベストプラクティス
void Button_Click(object sender, RoutedEventArgs e)
{
    var req = WebRequest.Create("http://bing.com/");
    Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
        .Invoke()
        .Subscribe(res => Debug.WriteLine(res.ResponseUri));
}

素直に、普通にReactive Extensionsを使うのが、一番簡単に書けます。息を吸うように、ごく自然にそこにあるものとしてRxを使おう。

Delegate.BeginInvokeのこと

相違点はまだあります。普通の.NET環境ではDelegateのBeginInvokeで非同期実行できますが、Silverlightにはありません。やってみるとNotSupportedExceptionが出ます。じゃあFuncにラップしてみるとどうだろう?

void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    Action action = () => Debug.WriteLine("action");
    Func<AsyncCallback, object, IAsyncResult> wrappedBeginInvoke = action.BeginInvoke;
    wrappedBeginInvoke.Invoke(ar => Debug.WriteLine("async"), null);

    Debug.WriteLine("end");
}

WPFではstart->end->action->async。Silverlightではstart->NotSupportedExceptionの例外。ここまではいいんです。Windows Phone 7でこのコードを試すと、例外出ません。何故か実行出来ます。SilverlightではBeginInvokeは出来ないはずなのに!そして、その実行結果はstart->action->end。つまり、非同期じゃない。BeginInvokeじゃない。Invokeとして実行されてる。意味がさっぱりわかりません。

不思議!不思議すぎたので、MSDNのWindows Phone 7 Forumで聞いてみましたが、良い返答は貰えず。とりあえず、怪しい挙動をしているのは間違いないので、これはやらないほうが無難です。勿論、通常こんなこと書きはしないと思うのですが、RxのFromAsyncPatternをDelegateに対して使おうとするとこうなりますので注意。Delegateの非同期実行したい場合はFromAsyncPetternじゃなくてToAsyncを使いましょう。

void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    // Observable.ToAsync(()=>{})でもいいし、すぐにInvokeするならObservable.Start(()=>{})も有用
    Action action = () => Debug.WriteLine("action");
    action.ToAsync().Invoke().Subscribe(_ => Debug.WriteLine("async"));

    Debug.WriteLine("end");
}

こうすることで、Rxは内部でBeginInvokeではなくThreadPoolを使うので、問題は起こらずWPFと同じ結果が得られます。

まとめ

同期的に書くほうが分かりやすいには違いないし、また、非同期が苦痛なのもその通り。でも、非同期を同期に、なんて考えない方がいい。AutoResetEventなどを駆使して擬似的に再現出来たとしても、やっぱ無理ありますし、非同期のメリットを犠牲にしてまでやるものではない。確かに非同期をそのまま扱うのは苦痛だけれど、Rxを使えば緩和される。むしろ慣れれば同期的に書くよりも利点が見えてくるぐらい。無理に同期に変換しようとしないでRxを覚えよう。が、結論です。

でもドキュメント全然ないし日本語の話なんて皆無で難しいって? そうですねえ、そうかもですねえ……。このブログも全然順序立ってなくて、思い立ったところから書いてるだけで分かりづらいことこの上ないし。うむむ……。でも、Rxの機能のうち非同期周りの解説に関してはほとんど出せているはずなので、読みにくい文章ですが、目を通してもらえればと思います。

もしつまづくところがあれば、Twitterで「Reactive Extensions」を投稿文に含めてくれれば、Twitter検索経由で見つけて反応します。(「Rx」だと検索結果が膨大になるので反応出来ません……)。検索を見てるワードとしては、他に「Linq」なども高確率で反応しにいきます←逆に怖いって?すみませんすみません。

「C#」が検索キーワードに使えたらいいんですけどねえ。「Scala」とか「JavaScript」は常時見てるんですが、かなり活況に流れているんですよ。そういうの見てると、Twitter上のC#な話も漏らさず見たい・参加したいと思ってしまうわけで。

XboxInfoTwit - ver.2.2.0.4

未知のエラーが出まくっていたので、暫定というか適当な対処を取ってみました。コードがかなり古くて汚いので、根本的に手を入れたいところなんですが、作業量を考えると中々やる気が沸かないという微妙な状態。機能追加のリクエストも数点頂いているので申し訳ないんですけどね。

Rxを使って非同期プログラミングを簡単に

こないだ公開されたMSDNマガジンの記事、非同期タスク - タスクを使って非同期プログラミングを簡単に。おお、これは分かりやすく非同期周りについて網羅されてる!あと、私はTask全然知らないので初歩から入る導入はお役立ちです。いやまあ、実際のとこTask周りの導入記事っていっぱいあるのに、未だにお役立ち、とか言ってるのもどうかと思わなくもないところではあるんですが不勉強なもので。

同期処理でUIをブロックしてしまう、スレッドプールに投げればいいぢゃない、イベントベースのパターンとAPM(IAsyncResultを使うAsynchronous Programming Model)、そしてTask。おお、全部ですね。全部、全部?そう、何か欠けてます、ええ、Reactive Extensionsが欠けています。というわけで、Taskと対比させながらRxでのコードを見ていきましょう。

非同期実行、そして待機

MSDNマガジンでは真ん中辺りからのタスクのコードを、Rxでのコードと一緒に並べてみます。

// タスクパターン
Task<double> task = Task.Factory.StartNew(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("The task is running asynchronously...");
task.Wait(); // 実行完了まで待機
Console.WriteLine("The task computed: {0}", task.Result);

// Reactive Extensions
var obs = Observable.Start(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("Observable.Start非同期実行中");
var r = obs.First(); // 結果が返るまで待機
Console.WriteLine("完了 : {0}", r);

// 余談:タスクはIObservableに変換出来たりする
task.ToObservable().Run(Console.WriteLine);

どちらもデフォルトではThreadPoolで非同期を実行します。ThreadPoolと違うのは、待機するのも戻り値を取り出すのも簡単。Rxでは長さ1のReactiveシーケンスとして扱われるので、Firstを使うと同期的にブロックして値を取り出せます。ここだけを見ると、Wait() + task.ResultなTaskより扱いやすいのではないかと思います。また、両者ともに似ているので、TaskからIObservable<T>への変換も容易です。System.Reactive.dllを読みこめば、Taskに対してToObservableメソッドが追加され、簡単に変換することが出来ます。

自由な変換

汎用的に非同期処理をTaskに、Rxに変換しよう。TaskにはTaskCompletionSourceが、RxにはAsyncSubjectがあります。

// Construct a TaskCompletionSource and get its 
// associated Task
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
Task<int> task = tcs.Task;

// Asynchronously, call SetResult on TaskCompletionSource
ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // Do something
    tcs.SetResult(123);
});

Console.WriteLine("The operation is executing asynchronously...");
task.Wait();

// And get the result that was placed into the task by 
// the TaskCompletionSource
Console.WriteLine("The task computed: {0}", task.Result);

// ---

// TaskCompletionSourceは、RxではAsyncSubjectと対比させられる
// AsyncSubjectはRxでの非同期表現を自前で実装する場合に使う(Rx内部でも当然使われている)
var async = new AsyncSubject<int>();

ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // 何か重い処理をしてたとする
    async.OnNext(123); // 値のセット
    async.OnCompleted(); // 値を確定し非同期実行完了
});

Console.WriteLine("重い処理を非同期で実行中...");
var r = async.First(); // 同期的に結果を待機し取得
Console.WriteLine("処理完了:{0}", r);

こちらもまた、両者ともに実によく似ています。何らかの任意の非同期処理は、使いやすいようにRxに包んでしまうと素敵な気分になれる。

IAsyncResultパターンの変換

IAsyncResultパターン。Rxの辺りでは、というか多分.NET周りで言う分には、Asynchronous Programming Model、略してAPMと呼ぶそうです。私がその言葉を見たのは、プログラミングMicrosoft .NET Framework 第2版 (マイクロソフト公式解説書)でした。Richterはリッチャーと呼ぶべきかリヒターと呼ぶべきなのか謎という話ががが。この本は.NET3種の良書のうちの一冊だと思うので(もう一冊は.NETのクラスライブラリ設計 開発チーム直伝の設計原則、コーディング標準、パターン (Microsoft.net Development Series)、もう一冊は未定というか将来のために取っておくというか)だと思うので未読の人は是非是非。と思ったら絶版じゃないですか!いや、amazonで偶然品切れなだけかもしれませんが、どうなんでしょう。これが手に入らないのは損失です!海外では既に.NET 4に対応したThird Editionが出ています。ということは、であり、風のウワサによると―― らしいですので、まあ、ですね!

今読み返したら20ページほど費やされて色々書いてありました。ぶっちゃけ面倒くさいと思って半分以上流し読みしてたという事実に気づいてしまったり。おおぉ。まあ、その辺はThird Editionの時に拾い直せば……。そんなわけで、その面倒くささを緩和するFromAsync/FromAsyncPatternをどうぞ。

// FromAsyncで包むとその場で実行
Task<IPAddress[]> task = Task<IPAddress[]>.Factory.FromAsync(
    Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null);

task.Wait();
foreach (var item in task.Result) Console.WriteLine(item);

// RxのFromAsyncPatternの型指定は引数と戻り値の二つを指定する
// FromAsyncPatternで包んだら即実行ではなく、funcをInvokeするまでは開始されない
var obs = Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com"); // 即実行なら変数に代入せずメソッドチェーン、実行を遅らせたい場合はfuncで持っておくと良いかも

var r = obs.First();
foreach (var item in r) Console.WriteLine(item);

イベントベースのパターンの変換

残念なことに、Taskには組み込みの変換パターンがないので、TaskCompletionSourceを使って自前で作る必要があるようです。RxではAsyncSubjectを使って自前で用意するまでもなく、そもそもイベントのLinq化として売り出されたので、イベントベースのパターンの変換はお手の物です。見てみましょう。

var client = new WebClient();

Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
    .Select(e => e.EventArgs.Result)
    .Subscribe(s => Console.WriteLine(s));

client.DownloadStringAsync(new Uri("http://www.microsoft.com/"));
Console.ReadKey(); // 完了待ち

// こういう風に書くとリフレクションを使わないので軽くて望ましい、けど結構面倒くさい(ので事前に自動生成しておくといいよ)
var obs = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
    h => new DownloadStringCompletedEventHandler(h),
    h => client.DownloadStringCompleted += h,
    h => client.DownloadStringCompleted -= h);

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
var result = obs.Select(e => e.EventArgs.Result).First(); // 同期で待機&受け取り
Console.WriteLine(result);

FromEvent。実に美しい!Linq!Linq!おっと、興奮してしまった。とはいえ、stringでイベント名を指定するのも、h=>hoge+=の連打も、どちらも悲しくダサい。そこでT4 TemplateでFromEventに包んだのを一気に自動生成してしまうのをお薦めします。そのためのT4は以前に書きましたので、Reactive ExtensionsのFromEventをT4 Templateで自動生成する 是非どうぞ。割と頑張ったし便利だと思ったけれど、はてブ数がああああ。まあ、そんなわけで、Rxをガリガリ使う分には必需品です。

しかし、それで大丈夫か?

FromEventの後者の書き方は問題を残しています。DownloadStringAsyncの後にFirst。これは、危険です。どう危険かというと……。

// T4自動生成を使うとFromEventがこんなスッキリ!
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable();

// ところで、実行を開始した後にSubscribe(Firstもそうです)したら?
// それも、超速で非同期実行が完了したとしたら?
client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

// 次の値は(完了済みなので)永遠にやってこない、つまり永久フリーズ
var result = obs.Select(e => e.EventArgs.Result).First(); 

Firstだと同期で延々と待つのでフリーズ。Subscribeならフリーズはありませんが、結果がこないので意図した結果ではないでしょう。これは大変マズい。そんな時はTake-Prune-Connectパターン、なんてものはなく今思いつきました。今思いついたのでこれがベストなやり方なのか、ちょっとよく分からないのであとでForumとか見て調べておきます。挙動的には全然問題ない。

// Take(1).Prune and ConnectでAsyncSubjectっぽい挙動に変換
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable()
    .Select(e => e.EventArgs.Result) // Selectはどこに書いてもいいので自分がスッキリと思うところへ
    .Take(1)
    .Prune();
obs.Connect();

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

var result = obs.First(); // 大丈夫だ、問題ない
Console.WriteLine(result);

var result2 = obs.First(); // 何度でも取り出せる
Console.WriteLine(result2);

わけわかんなくなってきました?失望のため息が聞こえます。どうしたものかねえ、これ。PruneはキャッシュとしてAsyncSubjectを持って後続に渡します。また、値を流すタイミングを自由に調整出来ます(Connectしたら流す、それまでは値が来ていても堰止める)。今回はFromAsyncPatternをなぞらえるため、即座にConnectしました。やっていることは、上の方で出したAsyncSubjectのパターンのシミュレーションです。つまり、OnNextが一回来て、OnCompletedが来る。そうでないと、AsyncSubjectが完了しない。FromEventはそのままだと無限リスト状態で完了の状態がこないので、Take(1)で長さ1のReactiveシーケンスとする。こうすることで、後ろに非同期結果の値が流れ出します。

といったイミフな話はReactive Extensionsの非同期周りの解説と自前実装で少し、それと、それに関連してufcppさんが分かりやすいスライドにしてまとめてくれていますので、必見 => さて、WordPress になったところで再度、PowerPoint 貼り付けテスト « ++C++; // 未確認飛行 C ブログ。貼りつけテストという実に分かりにくいタイトルでサラッと流してしまうところが漢らしい(謎)

タスクの操作と構成

一つの非同期実行程度なら、APMだろうがイベントモデルだろうが、素のまま扱っても別にそこまで面倒なわけではない。Taskが、Rxが真価を発揮するのは複数の操作を行うとき。まずは、待機を。

Task<int> task1 = new Task<int>(() => ComputeSomething(0));
Task<int> task2 = new Task<int>(() => ComputeSomething(1));
Task<int> task3 = new Task<int>(() => ComputeSomething(2));

task1.Start(); task2.Start(); task3.Start(); // 実行しとかないと永遠待機しちゃうよ
task1.Wait();
Console.WriteLine("Task 1 is definitely done.");

Task.WaitAny(task2, task3); // どっちかが完了するまで待機
Console.WriteLine("Task 2 or task 3 is also done.");

Task.WaitAll(task1, task2, task3); // 全部完了するまで待機
Console.WriteLine("All tasks are done.");

// ---

// Observable.Startは即時実行だけど、ToAsyncはInvokeまで実行開始されない
var async1 = Observable.ToAsync(() => ComputeSomething(0));
var async2 = Observable.ToAsync(() => ComputeSomething(1));
var async3 = Observable.ToAsync(() => ComputeSomething(2));
var io1 = async1(); // Invokeってのはデリゲートのなので()でもおk
var io2 = async2();
var io3 = async3();

io1.Run(); // 引数なしRunで実行結果も受けずただの待機になる

// WaitAnyはどちらか先に完了したほうを1つだけ流す Merge().Take(1) して待機
io2.Merge(io3).Take(1).Run();

Observable.Concat(io1, io2, io3).Run(); // WaitAllは全部連結して待機
Observable.ForkJoin(io1, io2, io3).Run(); // こちらは並列実行で待機

複数を同時に走らせて待機がいとも簡単に。で、面白いのがRx。Rxは非同期特化というわけではないので直接的にアレとコレのどっちかが来るまで待ってね、なんていうメソッドはないのですが、豊富な結合系メソッドで余裕でシミュレート出来てしまいます。WaitAnyはMerge.Takeで。WaitAllはConcatで。素晴らしい。凄い。と同時に、若干パズル的な気がしなくもない。が、しかし、面白い。Reactiveモデルの何でもできるという底力を感じる。

継続・継続・継続

今までは待機してたという、おいおい、非同期でやってるのに同期かよ、って感じだったので本領発揮で非同期のままの流るような実行を。TaskではContinueWith、Rxでは、72通りあるから何を言えばいいのか。

// ほう、メソッドチェーンが生きたな
Task<IPAddress[]>.Factory.FromAsync(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null)
    .ContinueWith(t =>
    {
        foreach (var item in t.Result) // IPAddress[]なので。
        {
            Console.WriteLine(item);
        }
    });

// しかしRxはそれどころじゃない
Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com")
    .SelectMany(xs => xs) // xsはIPAddress[]、つまりIEnumerableとIObservableを区別なくバラしているという狂気の融合!
    .Subscribe(Console.WriteLine);

ContinueWithは、まあごく普通に結果が流れてきてるんだなー、程度。しかしRxのほうはヤバい。この場合のContinueWithに該当するのはSubscribeで、まあそれは普通なのですが、それよりしかし流れてくるIPAddress[]の[]がウザいので、Linq的に扱うならフラットにしたいよね。というわけで、IObservable<IPAddress[]>をSelectManyでIObservable<IPAddress>に変換しています。SelectManyはIObservableだろうとIEnumerableだろうと、平等にバラします。これは実にヤバい。狂気すら感じるパワー。皆も是非Rxを使ってこのヤバさを知って欲しい。

実行・待機

同時実行して、その結果を一辺に受けたい場合ってありますよね。そんな場合はForkJoinで。ForkJoinよく出てくるなあ。

string[] urls = new[] { "www.microsoft.com", "www.msdn.com" };
Task<IPAddress[]>[] tasks = new Task<IPAddress[]>[urls.Length];

for (int i = 0; i < urls.Length; i++)
{
    tasks[i] = Task<IPAddress[]>.Factory.FromAsync(
        Dns.BeginGetHostAddresses,
        Dns.EndGetHostAddresses,
        urls[i], null);
}

Task.WaitAll(tasks);

Console.WriteLine(
    "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
    tasks[0].Result.Length,
    tasks[1].Result.Length);


// WaitAll? ああ、ForkJoinで並行実行のことですか
Observable.ForkJoin(urls.Select(url =>
        Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)(url)))
    .Run(xs => Console.WriteLine(
        "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
        xs[0].Length, xs[1].Length));

そろそろマンネリ気味で疲れてきた。あ、最後にデッカイのがありますね。TaskではContinueWhenAllが初お目見え。でもRxでは別に変わらずForkJoinなんだよねえ。

// Task要の定義
static Task<string> DownloadStringAsTask(Uri address)
{
    TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
    WebClient client = new WebClient();
    client.DownloadStringCompleted += (sender, args) =>
    {
        if (args.Error != null) tcs.SetException(args.Error);
        else if (args.Cancelled) tcs.SetCanceled();
        else tcs.SetResult(args.Result);
    };
    client.DownloadStringAsync(address);
    return tcs.Task;
}

// Rx用の定義
public static IObservable<IEvent<DownloadStringCompletedEventArgs>> DownloadStringAsObservable(Uri address)
{
    var client = new WebClient();
    var con = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
            h => new System.Net.DownloadStringCompletedEventHandler(h),
            h => client.DownloadStringCompleted += h,
            h => client.DownloadStringCompleted -= h)
        .Take(1).Prune();
    con.Connect();
    client.DownloadStringAsync(address);
    return con;
}

static int CountParagraphs(string s)
{
    return Regex.Matches(s, "<p>").Count;
}

static void Main(string[] args)
{
    Task<string> page1Task = DownloadStringAsTask(new Uri("http://www.microsoft.com"));
    Task<string> page2Task = DownloadStringAsTask(new Uri("http://www.msdn.com"));

    Task<int> count1Task = page1Task.ContinueWith(t => CountParagraphs(t.Result));
    Task<int> count2Task = page2Task.ContinueWith(t => CountParagraphs(t.Result));

    /// 全てが完了したら、Actionを実行
    Task.Factory.ContinueWhenAll(new[] { count1Task, count2Task },
        tasks =>
        {
            // tasks引数使わないのね(笑)
            Console.WriteLine("<P> tags on microsoft.com: {0}", count1Task.Result);
            Console.WriteLine("<P> tags on msdn.com: {0}", count2Task.Result);
        });

    // Rxではこうなる
    Observable.ForkJoin(
            DownloadStringAsObservable(new Uri("http://www.microsoft.com")),
            DownloadStringAsObservable(new Uri("http://www.msdn.com")))
        .Select(xs => xs.Select(e => CountParagraphs(e.EventArgs.Result)).ToArray())
        .Subscribe(xs =>
        {
            Console.WriteLine("<P> tags on microsoft.com: {0}", xs[0]);
            Console.WriteLine("<P> tags on msdn.com: {0}", xs[1]);
        });

    Console.ReadKey(); // 非同期実行なので終了しないように
}

ふむ(何がふむだ)

非同期とUIスレッド

Dispatcher.BeginInvokeさようなら!ですね。ObserveOnはRxだけの専売特許じゃない。Taskにだってあるもん。

void Button_Click(object sender, RoutedEventArgs e)
{
    // ContinueWithの引数にTaskSchedulerを入れると非同期実行結果からUIを触れる
    TaskScheduler uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    DownloadStringAsTask(new Uri("http://www.microsoft.com"))
        .ContinueWith(t => { textBox1.Text = t.Result; }, uiTaskScheduler);

    // Rxではすっかりお馴染みなObserveOnで(WPFの場合は省略形としてObserveOnDispatcherがある)
    DownloadStringAsObservable(new Uri("http://www.microsoft.com"))
        .ObserveOnDispatcher()
        .Subscribe(ev => { textBox1.Text = ev.EventArgs.Result; });
}

真面目に、このObserveOnDispatcherは死ぬほど役立ちです。

まとめ

機能的には十分被る。そして、より汎用的なはずのRxが専用のはずのTaskでの処理を十分に代替出来てしまうという事実に驚く。Rx始まりすぎてる。今更言うのもアレですが、これは確実にヤバい。ビッグウェーブはもう既に来てる。乗り遅れてもいいんで乗ろう!

それにしてもMSDNマガジン素晴らしいなあ。この記事は実に入門向けに満遍なくも濃密で、素敵な時間を過ごせた。いやあ、一時期は機械翻訳のみになってたけれど、再び翻訳に戻って大変ありがたい。そして、そろそろRxもMSDNマガジンに記事が来てもいいのでは?もしくはMSKKの人がですね。どうでしょう。どうなんでしょう。どうなってるんでしょう。

あ、そういえば私は10月期のMicrosoft MVPに応募してたんですがそれはやっぱりダメだったよ。言うことを聞かないからね。これを見てるそこの君、以下略。来年の1月にretryしようかなー。それまでには記事増量、ってやること変わってないんじゃ結果は同じじゃないのというのががが。

Reactive Extensionsの非同期周りの解説と自前実装

最近id:okazukiさんが凄い勢いでRx解説を書いていて凄い!そして、刺激を受けますねー。というわけで、今回はRxの非同期周りの解説をします。今日の昼に非同期処理を行うメソッドの戻り値は全てIObservable<T>にしてしまえばいいんじゃないんだろうかを読んで、Twitterで少しAsyncSubjectについて書いたのでそれをまとめて……、と思ったらReactive Extensions入門 11「非同期処理用のSubject」ですって!早!私なんて一年かけて何も書いていやしなかったりするのに、この早さは本当に見習いたい、ごほごほ。

そんなわけかで、色々と被ってしまっているのですが、Rxの非同期実行についてまとめます。まず、Rxの非同期の起点はStart, ToAsync, FromAsyncPatternの3つ。Start,ToAsyncはFromAsyncPatternの簡易化みたいなものなので、実質は1つです。これらを使うと長さ1のIObservable<T>として非同期処理が扱えるわけです。ところで非同期処理が完了するまでの間に複数Subscribeしたらどうなるのだろう、または非同期処理が完了してしまった後にSubscribeしたらどうなるのだろう? Silverlightで実際に触って試してみてください。

Microsoft Silverlight の取得

ボタンだらけでイミフ? 解説します……。Rxの非同期系処理は全てAsyncSubjectを通ります。AsyncSubjectのOnNextが非同期で実行された値の戻り値を、OnCompletedが非同期処理の完了を示します。通常はOnNextとOnCompletedはワンセットで非同期処理です。というわけで、例えばSubscribeを二回押す→OnNextを一回押す→OnCompletedを一回押すで、非同期のIObservable<int>に対して二つSubscribe(Console.WriteLine)したということになり、1が二つ右側のログに表示されたはずです。続けてSubscribeを押す(非同期処理が完了した後にSubscribeした場合)とどうなるか?というと、1が追加されたはずです。

以前にHot vs ColdとしてIObservableの性質を少し取り上げました。ColdはSubscribeするとすぐに実行される。HotはSubscribeしても値が流れてくるまで待機する。非同期におけるIObservable<T> = AsyncSubjectは、つまり、両方の性質を持ちます。OnCompleted以前はHotで、値が流れてくる(非同期処理が完了する)まで待機される。OnCompleted以後はColdとなって、Subscribeするとすぐに値を流す。

何となくAsyncSubjectの中身が想像付いてきました?そう、非同期の実行結果である値をキャッシュしています。もし複数回OnNextされたらどうなるか、というと、これは最後の値だけが残ります。(一度Resetしてから)OnNextを連打してからOnCompletedを押して、Subscribeしてみてください。

非同期というとキャンセルはどうするの?というと、ありますよ!Subscribeの戻り値はIDisposable。イベントのRx化、FromEventの場合はイベントのデタッチでしたが、非同期で使う場合はキャンセルになります。例えばSubscribeを押してから、Disposeを押して(キャンセル!)、OnNext→OnCompleted(非同期処理完了)を押してみてください。ログに何も出てきません。非同期処理が完了する前にDisposeしたということで、Subscribeがキャンセルされた、ということです。では、続けて(OnCompleted完了後)Subscribeを押すと……、ログに値が表示されます。Disposeは処理自体をキャンセルするわけではなく、 Subscribeのキャンセルということですね。

そうそう、ResetボタンはSubscribeやDispose、AsyncSubjectの状態をリセットして初期化します。Clearボタンはログの消去になります。

AsyncSubjectの簡易実装

AsyncSubjectの実態が大体分かったので、次は自分で実装してみましょう!その前にSubjectって何ぞや、というと、IObservable<T>かつIObserver<T>。これはRxネイティブのイベントだと思ってください。詳しくはReactive Extensions入門 + メソッド早見解説表をどうぞ。

とりあえず実装したコードを。

public class MyAsyncSubject<T> : IObservable<T>, IObserver<T>
{
    bool isCompleted = false;
    T lastValue = default(T);
    readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    // OnCompleted済みなら即座にOnNext呼び出し、そうでないならListへAdd
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (isCompleted)
        {
            observer.OnNext(lastValue);
            observer.OnCompleted();
            return Disposable.Empty;
        }
        else
        {
            observers.Add(observer);
            // 正しくはキャンセルが可能なように、Disposeが呼ばれた際にListからremoveされるよう
            // ラップした特別なIDisposableを返す必要があるけれど、簡略化した例ということでその辺は省きます
            return Disposable.Empty;
        }

    }

    // 初回呼び出しの場合は最後の値で全てのobserverのOnNextとOnCompletedを呼ぶ
    public void OnCompleted()
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnNext(lastValue);
            o.OnCompleted();
        });
        observers.Clear();
    }

    // OnCompletedと同じ。これも呼ばれたらCompleted済みとなる
    public void OnError(Exception error)
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnError(error);
        });
        observers.Clear();
    }

    // Completed済みでなければキャッシュを置き換える
    public void OnNext(T value)
    {
        if (isCompleted) return;
        lastValue = value;
    }
}

あくまでこれは簡易化したものです。実際はこれより、もう少し複雑です。あとlockを省いているのにも注意。まあ、ちゃんとしたのが知りたい場合はリフレクタとキャッキャウフフしてくださいということで。コードの中身ですが、Silverlightのデモで見てきた通りの、比較的単純な作りになっています。OnCompleted前後のSubscribeの挙動なんて、コードで見たほうが一目瞭然で早いですね。

これの挙動がRxの非同期系の挙動です。もしAsyncSubjectではなくSubject(値のキャッシュなし)を使った場合は、非同期開始からSubscribeまでの間に完了してしまった場合、何も起きないことになってしまいます。キャッシュを取るというのは、非同期に最適な理に叶った、というか、少なくとも不都合は起きないような挙動になります。もし自前で非同期でIObservable<T>を返すようなメソッドを実装する場合は、必ずAsyncSubjectを使いましょう。ユーザーの期待する挙動を取らなければならないという、義務として。

FromAsyncPatternの簡易実装

ここまで来たら、ついでなのでFromAsyncPatternも実装してしまいましょう!AsyncSubjectがあれば簡単です。あ、こちらでも断っておくと、あくまで簡易実装であって実際のものとは若干異なります。

static class Program
{
    static Func<T, IObservable<TR>> MyFromAsyncPattern<T, TR>(Func<T, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TR> end)
    {
        return arg =>
        {
            var asyncSubject = new AsyncSubject<TR>(Scheduler.ThreadPool); // おや、引数に……

            // 引数を渡されたら、Subscribeを待たずBeginInvokeで非同期実行が始まります
            begin.Invoke(arg, ar =>
            {
                TR result;
                try
                {
                    result = end.Invoke(ar); // EndInvokeで結果を得て
                }
                catch (Exception error)
                {
                    asyncSubject.OnError(error);
                    return;
                }
                asyncSubject.OnNext(result); // OnNext!
                asyncSubject.OnCompleted();
            }, null);

            return asyncSubject.AsObservable(); // SubjectのOnNextなどを隠す(なので昔はHideというメソッド名でした)
        };
    }

    // ToAsyncはFunc/ActionのBeginInvoke,EndInvoke簡略化版です
    // というのは少し嘘です、実際はSchedulerを使うのであってBeginInvokeは使いません、詳しくはまたそのうち
    static Func<T, IObservable<TR>> MyToAsync<T, TR>(this Func<T, TR> func)
    {
        return MyFromAsyncPattern<T, TR>(func.BeginInvoke, func.EndInvoke);
    }

    // StartはToAsyncの引数無しのものを即時実行というものです
    // Func<T>のみのFromAsyncPatternを作っていないので、Rx本来のToAsyncで書きます
    static IObservable<T> MyStart<T>(Func<T> func)
    {
        return Observable.ToAsync(func).Invoke(); // ここで即座にInvoke
    }

    static void Main(string[] args)
    {
        // *が引数の分だけ並ぶという関数があったとする
        Func<int, string> repeat = i => new String('*', i);

        var obs = MyFromAsyncPattern<int, string>(repeat.BeginInvoke, repeat.EndInvoke)
            .Invoke(3); // Subscribe時ではなく、Invokeした瞬間に非同期の実行は開始されていることに注意

        obs.Subscribe(Console.WriteLine); // ***
        Console.ReadKey();
    }
}

FromAsyncPatternは引数の多さや引数のイミフさにビビりますが、デリゲートのBeginInvoke, EndInvokeに合わせてやるだけです。こんなの私もソラでは書けませんよー。さて、今回は引数を一つ取るFromAsyncPatternを定義してみました。戻り値がFunc<T, IObservable<TR>>なのは、引数を一つ取ってIObservable<T>にする、ということです。

中身は割と簡単で、とりあえずBeginInvokeして、EndInvokeを待たずに(そりゃ非同期なので当然だ)とりあえずAsyncSubjectを戻します。つまり、これでIObservableとしてはHotの状態。そして何らかの処理が終えたらEndInvokeで戻り値を得て、AsyncSubjectのOnNext, OnCompletedを呼んでやる。これでSubscribeされていたものが実行されて、ついでにIObservableとしてColdになる。これだけ。意外とシンプル。

ついでのついでなのでObservable.ToAsyncとObservable.Startも定義してみました。ToAsyncはFunc/Actionを簡単にRx化するもの。FromAsyncPatternだとジェネリックの引数を書いたりと、何かと面倒くさいですから。StartはToAsyncを更に簡略化したもので、引数なしのものならInvoke不要で即時実行するというもの。

実行開始タイミングについて

FromAsyncPatternは(ToAsync/Startも)Invokeした瞬間に非同期実行が始まります。 FromAsyncPattern().Invoke().Subscribe() といったように、直接繋げる場合は気にする必要も特にないかもですが、一時変数などに置いたりする場合などは、Subscribeされたときに初めて非同期実行が開始されて欲しい、と思ったりあるかもですね。そんな場合はDeferを使います。

Microsoft Silverlight の取得

// 100を返すだけのどうでもいい関数
Func<int> func = () =>
{
    Console.WriteLine("fire");
    return 100;
};

var start = Observable.Start(func);
var defer = Observable.Defer(() => Observable.Start(func));
var prune= Observable.Defer(() => Observable.Start(func)).Prune();

StartButton.Click += (sender, e) =>
{
    start.Subscribe(Console.WriteLine);
};

DeferButton.Click += (sender, e) =>
{
    defer.Subscribe(Console.WriteLine);
};

var isConnected = false;
ReplayButton.Click += (sender, e) =>
{
    if (!isConnected) { prune.Connect(); isConnected = true; }
    prune.Subscribe(Console.WriteLine);
};

何もボタンを押さなくてもログにfireと出てしまっています。Observable.Startのせいなわけですががが。そんなわけで、Deferを使うとSubscribeまで実行を遅延することができます。ところで注意なのが、どちらもIObservable<T>なのですが、StartはColdとしてキャッシュされた値を返し続けるのに対して、Deferは中の関数を再度実行します。もしキャッシュしたい場合は、Pruneを使うといいでしょう。Pruneはこのブログでも延々と出してきた値の分配のためのPublishの親戚で、Connect後は最後の値をキャッシュして返し続けるという、AsyncSubjectと同じ動作をします(というか中身がAsyncSubjectなのですが)。この辺の使い分けというのもヤヤコシイところですねえ、そもそもPruneっていうメソッド名がイミフ……。

まとめ

Linq to Objectsで、最初分からなかったんですよ、yield returnなどで返される遅延評価としてのIEnumerable<T>と、配列(これもIEnumerable<T>ではある)の違いが。初めてLinqを知った後、半年ぐらいは分からないままだった(C#歴も半年でしたが)。同じインターフェイスなのに、状態としては、ちょっと違う。こういうのって結構分かりづらくて、躓いてしまうところです。

Rxの厄介なところは、IObservable<T>が一つで色々な状態を持ちすぎ。HotとColdの違いもある上に、更には混じり合った状態まである、Deferと非Deferも、外からだけだと全く区別がつかない。もう分かりづらいったらない。これに関しては一個一個丁寧に見ていくしかない、かな。今回はRxにおける非同期を徹底的に解剖してみました。一つ一つ、丁寧に。Rxも徐々に盛り上がりつつあるようなので、これからも、私なりに出来る限りに情報を発信していけたらと思います。

ところで、凄くシンプルなんですよね、Rxって。何を言ってるんだ?って話ですが、ええと、ほら、あくまでも「簡易」だからってのもありますが、実装のコード行数も少ないし全然難しいことやってないという。Linq to Objectsもそれ自体は凄くシンプルで、シンプルなのにとんでもなく強力という。それと同じで。Rx触ってて、内部をリフレクタでちょろちょろと見てて、本当に驚く。シンプルなのだけど、自分じゃ絶対書けないし思いつけないしっていう。悔しいし、そして、憧れるところですねえ。

特に魔法もなく、素直に中間層厚めな実装は、パフォーマンスは(もがもがもがもが)。何か言いました?

ReactiveOAuth - Windows Phone 7対応のOAuthライブラリ

Windows Phone 7用のOAuth認証ライブラリを作成し、公開しました。他のライブラリに比べての特徴は、非同期APIしか用意されていないWindows Phone 7での利用を念頭に置き、Reactive Extensions(Rx)をフル活用しているという点です。そもそもWindows Phone 7対応のOAuthライブラリが少ないので、特にWindows Phone 7開発者は是非どうぞ。

Windows Phone 7専用というわけでもないですが(Console/WPFで使えるよう、DLLとサンプルコードを用意してあります)、Windows Phone 7以外では別途Rxのインストールが必要です。Windows Phone 7環境では最初から入っているのでRxのインストール不要。Silverlight用は、コードコピペで別プロジェクト立てるだけで動くと思うんですが、確認取るのが面倒だった(クロスドメインがー)ので、そのうちに。

ところでそもそもRxって何、という人は Reactive Extensions入門 + メソッド早見解説表 をどうぞ。

何故Rxを使うのか

ReactiveOAuthの説明に入る前に、何故Rxを使うのかということを少し。理由は簡単で、非同期プログラミングは大変だから。論より証拠で、WebRequestのPOSTを全てBegin-Endパターンで構築してみましょう。

var req = (HttpWebRequest)WebRequest.Create("http://google.co.jp/"); // dummy
req.Method = "POST";
req.BeginGetRequestStream(ar =>
{
    var stream = req.EndGetRequestStream(ar);
    stream.BeginWrite(new byte[10], 0, 10, _ar =>
    {
        stream.EndWrite(_ar);
        req.BeginGetResponse(__ar =>
        {
            var res = req.EndGetResponse(__ar);
            var resStream = res.GetResponseStream();
            var s = new StreamReader(resStream).ReadToEnd();
            Console.WriteLine(s);
        }, null);
    }, null);
}, null);

コールバックの連鎖とはこういうことであり、大変酷い。冗談のようだ。こんなに面倒なら、こんなに苦しいのなら、非同期などいらぬ!しかし現実問題、Silverlightには、Windows Phone 7には、非同期APIしか搭載されていません。Begin-Endが強要される世界。理由は分かる。ユーザーエクスペリエンスの為でしょう。モバイル機器は性能が貧弱だから重い処理はいけない、と言うけれど、大事なのは処理が重いか否かではなく、体感。UIを止めさえしなければ、不快感を与えることはない。だから、強制的に非同期操作のみとした。

けれど、それで開発難しくなったり面倒になってしまってはいけない。非同期処理が簡単に出来れば……。その答えが、Rx。「簡単に出来るから」「開発者が幸せで」「全てが非同期になり」「ユーザーも幸せになる」。楽しい開発って大事だよね。本当にそう思っていて。開発者が不幸せで、コードに愛がなければ良いものなんて生まれやしないんだって、本当に思っていて。

// Rxならこう書ける(...AsObservableは拡張メソッドとして別途定義)
req.GetRequestStreamAsObservable()
    .SelectMany(stream => stream.WriteAsObservable(new byte[10], 0, 10))
    .SelectMany(_ => req.GetResponseAsObservable())
    .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd())
    .Subscribe(Console.WriteLine);

// SelectManyが苦手ならばクエリ構文という手もあります
var query = from stream in req.GetRequestStreamAsObservable()
            from _ in stream.WriteAsObservable(new byte[10], 0, 10)
            from res in req.GetResponseAsObservable()
            select new StreamReader(res.GetResponseStream()).ReadToEnd();
query.Subscribe(Console.WriteLine);

ネストが消滅して、メソッドチェーンの形をとって非同期が同期的のように書けるようになります。利点は他にもあって、様々な操作(合成・射影・抽出・待機などなど)が可能になる、ということもありますが、それはまたそのうち。

ところで、最初のコールバックの連鎖って何だか見覚えのあるような雰囲気ありませんか?JavaScriptで。そう、XmlHttpRequestであったりsetTimeoutであったりの連鎖と同じです。RxのJavaScript版、RxJSではJavaScriptでのそれらのネストを殺害することが可能です。興味があれば、そちらも是非試してみてくださいな。

ReactiveOAuthとは?

OAuthであっても、ようするにWebRequestなわけです。GET/POSTしてResponseを取ってくるということにかわりはない。そんなわけで、ネットワーク通信してResponseを取ってくる部分を片っ端から全てIObservableにしたのがReactiveOAuthです。Rxにべったり依存したことで、良くも悪くも、他のOAuthライブラリとは全く毛色の違う仕上がりになっています。

とはいっても、とにかく「簡単に書けること」にこだわりを持ってデザインしたので、利用自体は簡単ですし、Rxの知識もそんなに必要ありません。普通に取得するなら、SelectとSubscribeしか使わないので、全然安心です!まあ、できれば、これが入り口になってRxの世界を知ってもらえると嬉しいなあ、という皮算用もあったりですが。

使いかた1. AccessToken取得

デスクトップアプリケーション上でのOAuthの仕組みを簡単に説明すると、RequestToken取得→認証URL表示→PINコード入力→PINコード+RequestTokenを使ってAccessToken取得。という形になっています。まず、ユーザー名・パスワードの代わりとなるものはAccessTokenです。これを取得して、API呼び出しの時に使い、また、パスワード代わりに保存するわけです。そのRequestTokenやPINコードはAccessTokenを取得するための一時的な認証用キーということです。

AccessToken取得までにはOAuthAuthorizerクラスを使います。

// グローバル変数ということで。
const string ConsumerKey = "consumerkey";
const string ConsumerSecret = "consumersecret";
RequestToken requestToken;
AccessToken accessToken;

private void GetRequestTokenButton_Click(object sender, RoutedEventArgs e)
{
    var authorizer = new OAuthAuthorizer(ConsumerKey, ConsumerSecret);
    authorizer.GetRequestToken("http://twitter.com/oauth/request_token")
        .Select(res => res.Token)
        .ObserveOnDispatcher()
        .Subscribe(token =>
        {
            requestToken = token;
            var url = authorizer.BuildAuthorizeUrl("http://twitter.com/oauth/authorize", token);
            webBrowser1.Navigate(new Uri(url)); // navigate browser
        });
} 

GetRequestTokenとBuildAuthorizeUrlを使い、RequestTokenの取得と、内蔵ブラウザに認証用URLを表示させました。

private void GetAccessTokenButton_Click(object sender, RoutedEventArgs e)
{
    var pincode = PinCodeTextBox.Text; // ユーザーの入力したピンコード

    var authorizer = new OAuthAuthorizer(ConsumerKey, ConsumerSecret);
    authorizer.GetAccessToken("http://twitter.com/oauth/access_token", requestToken, pincode)
        .ObserveOnDispatcher()
        .Subscribe(res =>
        {
            // Token取得時のレスポンスには、Token以外に幾つかのデータが含まれています
            // Twitterの場合はuser_idとscreeen_nameがついてきます
            // ILookup<string,string>なので、First()で取り出してください
            UserIdTextBlock.Text = res.ExtraData["user_id"].First();
            ScreenNameTextBlock.Text = res.ExtraData["screen_name"].First();
            accessToken = res.Token; // AccessToken
        });
}

画像は認証が全て終わった時の図になっています。RequestTokenとPinCodeをGetAccessTokenメソッドに渡すだけです。これでAccessTokenが取得できたので、全ての認証が必要なAPIにアクセス出来るようになりました。

使いかた2. APIへのGet/Post

ここからはConsoleApplicationのサンプルコードで説明。

var client = new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    Url = "http://api.twitter.com/1/statuses/home_timeline.xml",
    Parameters = { { "count", 20 }, { "page", 1 } },
    ApplyBeforeRequest = req => { req.Timeout = 1000; req.UserAgent = "ReactiveOAuth"; }
};
client.GetResponseText()
    .Select(s => XElement.Parse(s))
    .Run(x => Console.WriteLine(x.ToString()));

IObservable<T>連鎖の最後のメソッドとしてRunを使うと、同期的になります。通常はSubscribeで非同期にすると良いですが、コンソールアプリケーションなどでは、同期的な動作のほうが都合が良いでしょう。

OAuthClientを作成し、オブジェクト初期化子でURL、パラメータ(コレクション初期化子が使えます)を設定したら、GetResponseTextを呼ぶだけ。あとはIObservable<string>になっているので、Linqと同じように操作していけます。

ApplyBeforeRequestではリクエストが発行される前に、生のHttpWebRequestが渡されるので(!)、TimeoutやUserAgentなど細かい設定がしたい場合は、ここにラムダ式を埋めてください。

では、POSTは?

new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    MethodType = MethodType.Post,
    Url = "http://api.twitter.com/1/statuses/update.xml",
    Parameters = { { "status", "PostTest from ReactiveOAuth" } }
}.GetResponseText()
    .Select(s => XElement.Parse(s))
    .Run(x => Console.WriteLine("Post Success:" + x.Element("text")));

POSTの場合はMethodTypeにMethodType.Postを指定します(デフォルトがGETなので、GETの場合は指定の省略が可)。それ以外はGETと同じです。Urlとパラメータ指定して、GetResponse。

ストリーミングもいけます

OAuthClientには3つのメソッドがあります。GetResponseは生のWebResponseを返すもので細かい制御をしたい時にどうぞ。GetResponseTextはStreamReaderのReadToEndで応答をテキストに変えたものを返してくれるもので、お手軽です。そのままXElement.Parseとかに流すと楽ちん。そして、GetResponseLinesはReadLineで一行ずつ返してくれるもの、となっています。GetResponseTextとGetResponseLinesは型で見ると両方共IObservable<string>なため戸惑ってしまうかもですが、前者は流れてくるのは一つだけ、後者は行数分だけ、となります。

GetResponseLinesはStreamingAPIで使うことを想定しています。とりあえず、WPF用のサンプルを見てください。

var client = new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    Url = "http://chirpstream.twitter.com/2b/user.json"
};
// streamingHandleはIDisposableで、これのDisposeを呼べばストリーミング停止
streamingHandle = client.GetResponseLines()
    .Where(s => !string.IsNullOrWhiteSpace(s)) // filter invalid data
    .Select(s => DynamicJson.Parse(s))
    .Where(d => d.text()) // has text is status
    .ObserveOnDispatcher()
    .Subscribe(
        d => StreamingViewListBox.Items.Add(d.user.screen_name + ":" + d.text),
        ex => MessageBox.Show(ReadWebException(ex))); // エラー処理

ストリーミングAPIを使うにあたっても、何の面倒くささもなく、至って自然に扱えてしまいます!Dynamicを使ったJsonへの変換にはDynamicJson - C# 4.0のdynamicでスムーズにJSONを扱うライブラリを、また、RxとStreamingAPIとの相性については過去記事C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsを見てください。

そうそう、それとネットワーク通信で起こったエラーのハンドリングが、SubscribeのOnErrorに書くだけで済むというのもRxを使って嬉しいことの一つです。

実装について

構造は全体的に@ugaya40さんのOAuthAccessがベースになっています(パク……)。そもそもにTwitterTL to HTMLのOAuth対応した時に、OAuthAccessを使って、あー、OAuth周りってRxに乗せると快適になるなー、とか思ったのが作ろうとした最初の動機だったりもして。非常に感謝。

WebClient風の、認証がOAuthなだけのベタなWebRequestラッパーという感じなので、特別なところはありません。インターフェイスとかなくて、本当にただのベタ書き。特に奇をてらってるところはないんですが、ストリーミングAPIで使うために用意したGetResponseLinesは個人的には笑えたり。

var req = WebRequest.Create(Url);
return Observable.Defer(() => req.GetRequestStreamAsObservable())
    .SelectMany(stream => stream.WriteAsObservable(postData, 0, postData.Length))
    .SelectMany(_ => req.GetResponseAsObservable());
    .Select(res => res.GetResponseStream())
    .SelectMany(s => Observable.Using(() => new StreamReader(s), sr => Observable.Repeat(sr)))
    .TakeWhile(sr => !sr.EndOfStream)
    .Select(sr => sr.ReadLine());

うわー……。利用者としては、ただのIObservable<string>としか見えないので、前段階でこんなにチェーンが繋がってるだなんてこと、気にする必要は全くないんですけどねー。これがベストな書き方だとは全然思えないので、誰かアドバイス欲すぃです。

まとめ

RxというとLinq to Events、イベントのLinq化という方向が目につきますが、今回は非同期のLinq化のほうにフォーカスしました。何というか、実に、素晴らしい!asynchronus programming is hard、と、思っていた時もありました。今や私達にはRxがある。恐れることはなにもない。

今回WPFとWindows Phone 7(Silverlight)でサンプルを作ったのですが、コードがコピペで、完全な互換性もって動いちゃうんですね。WPFで書いてSilverlightに持っていく時に、ああ、BeginGetResponseに書き換えなきゃ…… みたいなことが起こらない。最初から非同期で統一することで、全部ライブラリがネットワーク周りを吸収してくれる。非同期→同期にするのも簡単だし(RunやToEnumerableを使えばいい)、そもそも、Rxの土台に乗っている方が、普通に同期的に書くよりもむしろ楽だったりします。

個人的には、Windows Phone 7のローンチに粗製乱造Twitterアプリを送り込むという野望があるんですがねえー。いや、粗製乱造にするつもりはないんですが、機能をザックリと削って、一般性を無視して「私が使うシチュエーションで私が使いやすいような」アプリを出したいなーと思ってます。構想はあって、そこそこ尖った個性ある内容になる予定なので一部の人がフィットしてくれればいいな、と。多くの人に目に触れては欲しいのでローンチのタイミングは外したくない。問題はWorldのローンチタイミングに合わせてもJapanだと実機がないってことですね!開発機欲しい(お金は払いますからどうかー)。

と、思ってたのですがサンプル作りで、あきらかーなXAML知識のなさ(ていうか何も知りません)が露呈したので、ローンチに間にあわせるとか寝言すぎるのですが。せめてJapanのローンチまでにはそれなりな技量を身につけたいところです。

C#から使うMicrosoft Ajax MinifierでのJavaScriptコード圧縮

いつのまにか独立してCodePlex入りしているMicrosoft Ajax Minifier。その名の通り、Microsoft謹製のJavaScript/CSSの圧縮/整形ツールです。発表からすぐに、GoogleのClosure Compilerが出てしまったのですっかり影も薄く、ていうか名前悪いよね、Ajax関係ないじゃん……。CodePlexのプロジェクトページも何だか活気ない寂れた感じで、あーあ、といった趣。良いツールなんですけどねえ。

コマンドライン版とDLL版が用意されていて、コマンドラインの、単体で実行可能なexeの解説は【ハウツー】Microsoft Ajax MinifierでJavaScriptを縮小化しようで解説されているので、DLL版の使い方を簡単に解説します。

C#が使える人ならばDLL版のほうが遥かに使いやすかったりして。設定をダラダラと引数を連ねるのではなく、オブジェクト初期化子とenumで出来るので、そう、IntelliSenseが効くわけです。ヘルプ要らずで書けるのは快適。Visual Studioは偉大だなぁ。コマンドライン, PowerShellを捨て、VSのConsoleApplicationを常に立ち上げよう。実際、ちょっとしたテキスト処理とかC#で書いちゃうんですよねえ、私。Linqが楽だから。

Minifierオブジェクトを作ってMinifyJavaScript(CSSの場合はMinifyStyleSheet)メソッドを実行するだけなので、コード見たほうが早いかな。例としてlinq.jsをlinq.min.jsに圧縮します。そして、解析されたWarningをコンソールに表示します。ただの圧縮だけでなく、このコード分析機能が実にありがたい。

using System;
using System.IO;
using Microsoft.Ajax.Utilities;

class Program
{
    static void Main(string[] args)
    {
        var minifier = new Minifier
        {
            WarningLevel = 3, // 最大4。4はウザいのが多いので3がいいね。
            FileName = "linq.js" // エラー表示用に使うだけなので、なくてもいい
        };

        var settings = new CodeSettings // CSSの場合はCSSSettings
        {
            LocalRenaming = LocalRenaming.CrunchAll,
            OutputMode = OutputMode.SingleLine, // MultiLineにすると整形
            IndentSize = 4, // SingleLineの時は意味なし
            CollapseToLiteral = true,
            CombineDuplicateLiterals = true
            // その他いろいろ(それぞれの意味はIntelliSenseのsummaryを読めば分かるね!)
        };

        var load = System.IO.File.ReadAllText(@"linq.js"); // 読み込み

        // 最後の引数はグローバル領域に何の変数が定義されてるか指定するもの
        // 別になくても構わないんだけど、warningに出るので多少は指定しておく
        var result = minifier.MinifyJavaScript(load, settings, "Enumerable", "Enumerator", "JSON", "console", "$", "jQuery");

        // Warningで発見されたエラーはErrorsに格納されてる
        foreach (var item in minifier.Errors)
        {
            Console.WriteLine(item);
        }

        File.WriteAllText("linq.min.js", result); // 書き出し
        Console.WriteLine();
        Console.WriteLine("Original:" + new FileInfo("linq.js").Length / 1024 + "KB");
        Console.WriteLine("Minified:" + new FileInfo("linq.min.js").Length / 1024 + "KB");
    }
}

二つエラー出てますね。変数がvarで宣言されてないそうです。つまり、コードがその行通ると変数がグローバルに置かれてしまいます。確認したら、OrderBy実行したらsortContextという変数がグローバルに飛んでしまってました。ええ、つまりバグと言っていいです。……。あうあう、すみません、直します。これからは必ずAjax Minifierの分析を通してから公開しよう。どうでもよくどうでもよくないんですがlinq.jsもアップデートすべきことが結構溜まってるので近いうちには……。

というわけで、Ajax Minifierの偉大さが分かりました。素晴らしい!みんな使おう!DLLの形になっているので、T4 Templateと混ぜたりなどもしやすく、夢が膨らむ。minifyだけでなく、JSのパーサーとして抽象構文木を取り出したりなども出来ます。

WebでGUIでサクッと実行できる環境とかあるといいと思うんですよね。それこそDLLになっているので、Silverlightならすぐ作れるわけですし。誰か作ればいいのに。いや、お前が作れよって話なんですが。Ajax Minifierが出てすぐの頃に作ろうとしてお蔵入りしちゃったんだよね、ふーみぅ。そうそう、あと、VSと統合されて欲しい!このWarningは大変強力なので、その場でVSのエラー一覧に表示して欲しいぐらい。今のVSのエラー表示はWarningLevelで言うところの1ぐらいで警告出してくれて、それはそれで良い感じなんですが、やっぱこう、もっとビシッと言って欲しいわけですよ。

ワンクリックで整形/圧縮してくれるのとエラー一覧表示が出来るようなVisual Studio拡張を誰か作って欲しいなあ。いやー、本気で欲しいので、とりあえず私も自分で作ってみようと思い、VS2010 SDKは入れてみた。気力が折れてなければ来月ぐらいには公開、したい、です、予定は未定のやるやる詐欺ですががが。

TwitterTLtoHTML ver.0.2.0.0

TwitterのBasic認証の有効期限が8月いっぱいだったのですよね。それは知っていて動かなくなるのも知っておきながら、実際に動かなくなるまで放置していたという酷い有様。結果的に自分で困ってました(普通に今も使っていたので)。というわけで、TwitterTL to HTMLをOAuth対応にしました。認証時のアプリケーション名がTL to HTMLなのですが、これは「Twitter」という単語名をアプリケーション名に入れられないためです。面倒くさいねえ。

OAuth認証は@ugaya40さんのOAuthAccessを利用しています。XboxInfoTwitでは自前のものを使っていたのですが、どうしょうもなく酷いので自分で作り直すかライブラリを使うか、でズルズル悩んで締切りを迎えたのでライブラリ利用に決定ー。使いやすくて良いと思います。

T4による不変オブジェクト生成のためのテンプレート

不変欲しい!const欲しい!readonlyをローカル変数にもつけたい!という要望をたまに見かけるこの頃。もし、そういった再代入不可というマークがローカル変数に導入されるとしたら、readonlyの使い回しだけは勘弁です。何故って、ローカル変数なんて大抵は再代入しないので、readonly推奨ということになるでしょう、そのうちreadonly付けろreadonly付けろというreadonly厨が出てくるのは目に見えています。

良いことなら付ければいいじゃない、というのはもっともですが、Uglyですよ、視覚的に。readonly var hoge = 3 だなんて、見たくはない。頻繁に使うほうがオプションで醜く面倒くさいってのは、良くないことです。let hoge = 3 といったように、let、もしくはその他のキーワード(valとか?)を導入するならば、いいかな、とは思いますが。

それに、ただ単にマークしただけじゃあ不変を保証するわけでもない……。例えばListなんてClearしてAddRangeしたのと再代入とは、どう違うの?的な。難しいねえ。そんなimmutableの分類に関してはufcppさんのimmutableという記事が、コメント欄含め参考になりました。

そうはいっても、そんなにガチに捉えなくても、不変にしたいシチュエーションはいっぱいあります。実はオブジェクト指向ってしっくりきすぎるんです! 不変オブジェクトのすゝめ。 - Bug Catharsis。おお、すすめられたい。ところでしかし、こういう時にいつも疑問に思っているのは、生成どうすればいいのだろう、ということ。今のところ現実解としてあるのはreadonly、つまり、コンストラクタに渡すしかないのですが……

public Hoge(int a, int b, int c, string d, string e, DateTime f, .....

破綻してる。こんなクソ長いコンストラクタ見かけたら殺していいと思う。全くもって酷い。さて、どうしましょう。こういう場合はビルダーを使いましょう、とはEffective Javaが言ってますので(私、この本あんま好きじゃないんだよねー、とかはどうでもいいんですがー)とりあえずストレートに従ってみます。

// あまり行数使うのもアレなので短くしますが、実際は10行ぐらいあると思ってください
Hoge hoge = new HogeBuilder()
  .Age(10)
  .Name("hogehoge")
  .Build();

まあ、悪くない、ですって?いえいえ、これはBuilder作るの面倒くさいし、第一Java臭い。メソッドチェーンだからモダンで素敵、と脳が直結してる人は考えが一歩足らない。むしろ古臭い。最近は流れるようなインターフェイスとかも割と懐疑的で、私は。頂くのはアイディアだけであって、書き方に関しては、各言語にきっちり馴染ませるべき。先頭の大文字小文字だけ整えて移植だとか、愚かな話。というわけで、C#ならオブジェクト初期化子を使おう。

var hoge = new HogeBuilder
{
    Age = 10,
    Name = "hogehoge"
}.Build();

// 暗黙的な型変換を使えばBuildメソッドも不要になる(私はvarのほうが好みですが)
Hoge hoge = new HogeBuilder
{
    Age = 10,
    Name = "hogehoge"
};

ええ、これなら悪くない。オブジェクト初期化子は大変素晴らしい(本当にそろそろModern C# Designをですね……)。ビルダーを作る手間もJava方式に比べ大幅に軽減されます(set専用の自動プロパティを用意するだけ)。それにIntelliSenseのサポートも効きます。

未代入のもののみリストアップしてくれる(Ctrl+Space押しだと全部出てきたりする、バグですかね、困った困った)。そういえばで、これは、不変である匿名型の記法とも似ています。余分なのは.Build()だけで、書く手間的にはそんな変わらない。

前説が長ったらしくなりました。本題は「匿名型のような楽な記法で不変型を生成したい」が目標です。C#の現在の記法では、それは無い。欲しいなあ。名前付き引数使えば似たような雰囲気になると言えばなるんですが、アレ使うと「省略可」な雰囲気が出てダメ。ビルダーで作りたいのは、原則「省略不可」なので。

なければ作ればいいじゃない、オブジェクト初期化子を使って.Buildで生成させるビルダーを作れば似たような感じになる。あとは、手動でそれ定義するの非常に面倒なので、そう、T4で自動生成しちゃえばいいぢゃない。

以下コード。例によってパブリックドメインで。別にブログにベタ貼りなコードは自明でいいんじゃないかって気もするんですが、宣言は一応しておいたほうがいいのかなー、と。

<#@ assembly Name="System.Core.dll" #>
<#@ import namespace="System.Linq" #>
<#@ import namespace="System.Collections.Generic" #>
<#@ import namespace="System.Text.RegularExpressions" #>
<#@ output extension="Generated.cs" #>
<#
    // 設定:クラス名はそのまま文字列で入力
    // クラスの持つ変数は、コンストラクタに書くみたいにdeffinesに
    // "string hoge","int huga" といった形で並べてください
    // usingとnamespaceは、直下の出力部を直に弄ってください
    // partial classなので、これをベースにメソッドを足す場合は別ファイルにpartialで定義することを推奨します
    // Code Contractsに関わる部分は(ContractVerification属性とContract.EndContractBlock())は、
    // 対象がWindows Phone 7などContractが入っていない環境下では削除してください(通常の.NET 4環境では放置で大丈夫)

    var className = "Person";
    var deffines = new DeffineList {
        "string name",
        "DateTime birth",
        "string address"
    };
#>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;

namespace Neue.Test
{
    [DebuggerDisplay(@"<#= deffines.DebuggerDisplay #>", Type = "<#= className #>")]
    public partial class <#= className #> : IEquatable<<#= className #>>
    {
<# foreach(var x in deffines) {#>
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly <#= x.TypeName #> <#= x.FieldName #>;
        public <#= x.TypeName #> <#= x.PropName #> { get { return <#= x.FieldName #>; } }
<# } #>

        private <#= className #>(<#= deffines.Constructor #>)
        {
<# foreach(var x in deffines) {#>
            this.<#= x.FieldName #> = <#= x.FieldName #>;
<# } #>
        }
        
        [ContractVerification(false)]
        public static implicit operator Person(Builder builder)
        {
            return builder.Build();
        }

        public bool Equals(<#= className #> other)
        {
            if (other == null || GetType() != other.GetType()) return false;
            if (ReferenceEquals(this, other)) return true;
            return EqualityComparer<<#= deffines.First().TypeName #>>.Default.Equals(<#= deffines.First().FieldName #>, other.<#= deffines.First().FieldName #>)
<# foreach(var x in deffines.Skip(1)) {#>
                && EqualityComparer<<#= x.TypeName #>>.Default.Equals(<#= x.FieldName #>, other.<#= x.FieldName #>)
<# } #>
                ;
        }

        public override bool Equals(object obj)
        {
            var other = obj as <#= className #>;
            return (other != null) ? Equals(other) : false;
        }

        public override int GetHashCode()
        {
            var hash = 0xf937b6f;
<# foreach(var x in deffines) {#>
            hash = (-1521134295 * hash) + EqualityComparer<<#= x.TypeName #>>.Default.GetHashCode(<#= x.FieldName #>);
<# } #>
            return hash;
        }

        public override string ToString()
        {
            return "{ " + "<#= deffines.First().PropName #> = " + <#= deffines.First().FieldName #> +
<# foreach(var x in deffines.Skip(1)) {#>
                ", <#= x.PropName #> = " + <#= x.FieldName #> +
<# } #>
                " }";
        }

        public class Builder
        {
<# foreach(var x in deffines) {#>
            public <#= x.TypeName #> <#= x.PropName #> { private get; set; }
<# } #>

            public <#= className #> Build()
            {
<# foreach(var x in deffines) {#>
                if ((object)<#= x.PropName #> == null) throw new ArgumentNullException("<#= x.PropName #>");
<# } #>
                Contract.EndContractBlock();
                return new <#= className #>(<#= string.Join(", ", deffines.Select(d => d.PropName)) #>);
            }
        }
    }
}
<#+
    class Deffine
    {
        public string TypeName, FieldName, PropName;

        public Deffine(string constructorParam)
        {
            var split = constructorParam.Split(' ');
            this.TypeName = split.First();
            this.FieldName = Regex.Replace(split.Last(),  "^(.)", m => m.Groups[1].Value.ToLower());
            this.PropName = Regex.Replace(FieldName, "^(.)", m => m.Groups[1].Value.ToUpper());
        }
    }

    class DeffineList : IEnumerable<Deffine>
    {
        private List<Deffine> list = new List<Deffine>();

        public void Add(string constructorParam)
        {
            list.Add(new Deffine(constructorParam));
        }        
        
        public string DebuggerDisplay
        {
            get
            {
                return "\\{ " + string.Join(", ", list.Select(d =>
                    string.Format("{0} = {{{1}}}", d.PropName, d.FieldName))) + " }";
            }
        }

        public string Constructor
        {
            get { return string.Join(", ", list.Select(d => d.TypeName + " " + d.FieldName)); }
        }

        public IEnumerator<Deffine> GetEnumerator()
        {
            return list.GetEnumerator();
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }
#>

以下のようなのが出力されます(長いねー)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;

namespace Neue.Test
{
    [DebuggerDisplay(@"\{ Name = {name}, Birth = {birth}, Address = {address} }", Type = "Person")]
    public partial class Person : IEquatable<Person>
    {
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly string name;
        public string Name { get { return name; } }
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly DateTime birth;
        public DateTime Birth { get { return birth; } }
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly string address;
        public string Address { get { return address; } }

        private Person(string name, DateTime birth, string address)
        {
            this.name = name;
            this.birth = birth;
            this.address = address;
        }
        
        [ContractVerification(false)]
        public static implicit operator Person(Builder builder)
        {
            return builder.Build();
        }

        public bool Equals(Person other)
        {
            if (other == null || GetType() != other.GetType()) return false;
            if (ReferenceEquals(this, other)) return true;
            return EqualityComparer<string>.Default.Equals(name, other.name)
                && EqualityComparer<DateTime>.Default.Equals(birth, other.birth)
                && EqualityComparer<string>.Default.Equals(address, other.address)
                ;
        }

        public override bool Equals(object obj)
        {
            var other = obj as Person;
            return (other != null) ? Equals(other) : false;
        }

        public override int GetHashCode()
        {
            var hash = 0xf937b6f;
            hash = (-1521134295 * hash) + EqualityComparer<string>.Default.GetHashCode(name);
            hash = (-1521134295 * hash) + EqualityComparer<DateTime>.Default.GetHashCode(birth);
            hash = (-1521134295 * hash) + EqualityComparer<string>.Default.GetHashCode(address);
            return hash;
        }

        public override string ToString()
        {
            return "{ " + "Name = " + name +
                ", Birth = " + birth +
                ", Address = " + address +
                " }";
        }

        public class Builder
        {
            public string Name { private get; set; }
            public DateTime Birth { private get; set; }
            public string Address { private get; set; }

            public Person Build()
            {
                if ((object)Name == null) throw new ArgumentNullException("Name");
                if ((object)Birth == null) throw new ArgumentNullException("Birth");
                if ((object)Address == null) throw new ArgumentNullException("Address");
                Contract.EndContractBlock();
                return new Person(Name, Birth, Address);
            }
        }
    }
}

これで、どれだけ引数の多いクラスであろうとも、簡単な記述でイミュータブルオブジェクトを生成させることが出来ます。しかも、普通にクラス作るよりも楽なぐらいです、.ttをコピペって、先頭の方に、コンストラクタに並べる型を書くだけ。後は全部自動生成任せ。もし積極的に使うなら、Generated.csのほうを消して.ttのみにした状態で、ファイル→テンプレートのエクスポートで項目のエクスポートをすると使い回しやすくて素敵と思われます、項目名はImmutableObjectとかで。

// 書くときはこんな風にやります
var person1 = new Person.Builder
{
    Name = "hoge",
    Birth = new DateTime(1999, 12, 12),
    Address = "Tokyo"
}.Build();

// 暗黙的な型変換も実装されているので、.Buildメソッドの省略も可
Person person2 = new Person.Builder
{
    Name = "hoge",
    Birth = new DateTime(1999, 12, 12),
    Address = "Tokyo"
};

// 参照ではなく、全てのフィールドの値の同値性で比較される
Console.WriteLine(person1.Equals(person2)); // true

匿名型の再現なので、EqualsやGetHashCodeもオーバーライドされて、フィールドの値で比較を行うようになっています。この辺はもう手動だと書いてられないですよね。ReSharperなどを入れて生成をお任せする、という手はありますが。

==はオーバーライドされていません。これもまた匿名型の再現なので……。Tupleもされてないですしね。これは、フィールドをreadonlyで統一しようと「変更可能」な可能性が含まれるので==は不適切、というガイドライン的なもの(と解釈しました)に従った結果です。変更可能云々は、下の方で解説します。

Code Contracts

更に、Code Contractsを入れれば、値の未代入に対するコンパイラからの静的チェックまで得られます!下記画像のは、Addressが未入力で、通常は実行時に例外が飛ぶことで検出するしかないですが、Code Contractsが静的チェックで実行前にnullだと警告してくれています。

ビルダーの欠点は未代入の検出が実行時まで出来なかったりすること。インターフェイスで細工することで、順番を規定したり、必ず代入しなければならないものを定義し終えるまでは.BuildメソッドがIntelliSenseに出てこないようにする。などが出来ますが、手間がかかりすぎて理想論に留まっている気がします。

簡単であることってのはとても大事で、過剰な手間暇や複雑な設計だったりってのは、必ず無理が生じます。手間がかかること、複雑であることは、それ自体が良くない性質の一つであり、メリットがよほど上回らない限りは机上の空論にすぎない。

今回、Code Contractsのパワーにより、シンプルなオブジェクト初期化子を使ったビルダーでも未代入の静的チェックをかませる、という素敵機構が実現しました。残念ながらCode Contractsは要求環境が厳しいです。アドインを入れてない/入れられない(Express)場合はどうなるのか、というと、.NET 4にクラス群は入っているので、コンパイル通らないということはありません。普通にArgumentNullExceptionがthrowされるという形になります。

私が考えるに、.NET 4でクラスが入ったのって、Code Contractsのインストールの有無に関係なくコードが共有出来るように、という配慮でしかない予感。ExpressでContractクラスを使う意味は、あまりなさそうですね。Windows Phone 7環境など、Contractクラスそのものがないような場合では、T4のBuilderクラスBuildメソッドのContract.EndContractBlock();の一行とimplict operatorのContractVerification属性を削除してください。自分で好きに簡単に書き換えられるのもT4の良さです。

今回はnullチェックしかしていないので、つまり値型の未代入には無効です。何とかして入れたいとは思ったんですが、例えば対策として値型をNullableにするにせよType情報が必要で、そのためにはAssembly参照が必要で、と設定への手間が増えてしまうので今回は止めました(このT4はただ文字列を展開しているだけで、完全にリフレクション未使用)

Code Contractsに関しては、事前条件のnullチェックにしか使っていなくて真価の1%も発揮されていないので、詳しくはとある契約の備忘目録。契約による設計(Design by Contract)で信頼性の高いソフトウェアを構築しよう。 - Bug Catharsisなどなどを。不変オブジェクトに関してもそうだけれど、zeclさんの記事は素晴らしいです。

Code Contracts自体は、メソッド本体の上の方で、コントラクトの記述が膨れ上がるのは好きでないかも。従来型の、if-then-throwでの引数チェックも、5行を超えるぐらいになるとウンザリしますね。ご丁寧に{で改行して、if-then-throwの一個のチェックに4行も使って、それが5個ぐらい連なって20行も使いやがったりするコードを見ると発狂します。そういう場合に限ってメソッド本体は一行で他のメソッド呼んでるだけで、更にその、他のメソッドの行頭にも大量の引数チェックがあったりすると、死ねと言いたくなる。コードは視覚的に、横領域の節約も少しは大事だけど、縦も大事なんだよ、分かってよね……。メソッド本体が1000行とか書く人じゃなく、100行超えたら罰金(キリッ とか言ってる人だけど、それならガード句が10行超えたら罰金だよこっちとしては。

話が脱線した。つまるところ、コントラクトはライブラリレベルで頑張るよりも、言語側でのサポートが必要な概念ですね、ということで。実際rewriterとか、ライブラリレベル超えて無茶しやがって、の領域に踏み込んでいますし<Code Contracts。

プラスアルファ

partial classで生成されるので(デフォルトではクラス名.Generated.cs)、別ファイルにクラスを作ることで、フィールドの増減などでT4を後で修正しても、影響を受けることなくメソッドを追加することができます。

// Person.csという形で別ファイルで追加
using System;

namespace Neue.Test
{
    public partial class Person
    {
        public int GetAge(DateTime target)
        {
            return (target.Year - birth.Year);
        }
    }
}

それと、nullチェックだけじゃなくきっちりBuildに前提条件入れたい(もしくはnullを許容したい)場合は、T4のBuildメソッドの部分に直に条件を書いてしまうか、それも何だか不自然に感じる場合は生成後のファイルをT4と切り離してしまうのも良いかもですね。自由なので好きにどうぞですます。

で、本当に不変なの?

何をもってどこまでを不変というのかはむつかしいところですが、Equalsが、GetHashCodeが変化するなら、可変かしら? 単純に全ての含まれる型のゲッターが常に同一の値を返さなければ不変ではない、でも良いですが。冒頭でも言いましたが、そう見るとreadonlyだけでは不変を厳密には保証しきれていません。匿名型で例を出すと

class MyClass
{
   public int i;

   public override int GetHashCode()
   {
       return i;
   }
}

static void Main(string[] args)
{
   var anon = new { MC = new MyClass { i = 100 } };
   var hashCode1 = anon.GetHashCode();

   anon.MC.i = 1000; // 変更
   var hashCode2 = anon.GetHashCode();

   Console.WriteLine(hashCode1 == hashCode2); // false
   Console.WriteLine(hashCode1);
   Console.WriteLine(hashCode2);
}

参照しているMyClassのインスタンスの中身が変化可能で、それが変化してしまえば、違う値になってしまいます。厳密に不変であるためには、中のクラス全てが不変でなければなりません。これは今の言語仕様的には制限かけるのは無理かなー、といったところ。T4なのでリフレクションで全部バラして、参照している型が本当の意味で不変なのかどうか検証して、可変の型を含む場合はジェネレートしない、という形でチェックかけるのは原理的には可能かもしれません、が、やはり色々無理があるかなあ。

まとめ

プログラミングの楽しさの源は、書きやすく見た目が美しいことです。私はLinq to Objects/Linq to Xmlでプログラミングを学んだようなものなので、Linqの成し遂げたこと(究極のIntelliSenseフレンドリーなモデル・使いづらいDOMの大破壊)というのが、設計の理想と思っているところが相当あります。C#は言語そのものが素晴らしいお手本。匿名型素晴らしいよ(一年ぐらい前は匿名型も可変ならいいのに、とか口走っていた時期があった気がしますが忘れた、いやまあ、可変だと楽なシチュエーションってのもそれなりにいっぱいあるんですよね)。

T4の標準搭載はC#にとって非常に大きい。T4標準搭載によって、出来る事の幅がもう一段階広がった気がします。partial class素晴らしい。自動生成って素敵。T4はただのテキストテンプレートじゃなくて「VSと密接に結びついていて」「なおかつ標準搭載」「もはやC#の一部といってもいい」ことが、全く違った価値をもたらしていると思います。自動生成前提のパターンを作っても/使ってもいいんだよ、と。言語的に足らない部分の迂回策が、また一つ加わった。

見た目上若干Uglyになっても自動生成でなんとかする、というのはJava + Eclipseもそうですが、それと違うのはpartialでUglyな部分を隔離出来る(隔離によって自動生成の修正が容易になることも見逃せない)ことと、自動生成部分をユーザーが簡単に書けること、ですね。Eclipseの自動生成のプラグインを書くのは敷居が高すぎですが、T4を書く、書くまではしなくても修正する、というのは相当容易でしょう。

最近本当にT4好きですねー。色々と弄ってしまいます。こーどじぇねれーと素晴らしい。あとは、T4自体のUglyさが少し軽減されればな、といったところでしょうか。テンプレートエンジンとしてRazorに切り替えられたりを望みたいなあ。

Reactive ExtensionsのFromEventをT4 Templateで自動生成する

Rxで面倒くさいのが、毎回書かなければならないFromEvent。F#ならイベントがファーストクラスで、そのままストリーム処理に流せるという素敵仕様なのですが、残念ながらC#のeventはかなり雁字搦めな感があります。しかし、そこは豊富な周辺環境で何とか出来てしまうのがC#というものです。F#では form.MouseMove |> Event.filter と書けますが、 form.MouseMoveAsObservable().Where と書けるならば、似たようなものですよね?

というわけで、T4です。FromEventを自動生成しましょう!と、いうネタは散々既出で海外のサイトにも幾つかあるし、日本にもid:kettlerさんがFromEventが面倒なので自動生成させてみた2として既に書かれているのですが、私も書いてみました。書くにあたってid:kettlerさんのコードを大変参考にさせていただきました、ありがとうございます。

私の書いたもののメリットですが、リフレクションを使用しないFromEventで生成しているため、実行コストが最小に抑えられています。リフレクションを使わないFromEventは書くのが面倒でダルいのですが、その辺自動生成の威力発揮ということで。それと、命名規則をGetEventではなくEventAsObservableという形にしています。これは、サフィックスのほうがIntelliSenseに優しいため。

んね?この命名規則は、RxJSのほうで公式に採用されているものなので(例えばrx.jQuery.jsのanimateAsObservable)、俺々規則というわけじゃないので普通に従っていいと思われます。

以下コード。利用改変その他ご自由にどうぞ、パブリックドメインで。

<#@ assembly Name="System.Core.dll" #>
<#@ assembly Name="System.Windows.Forms.dll" #>
<#@ assembly Name="C:\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0\Profile\Client\System.Xaml.dll" #>
<#@ assembly Name="C:\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0\Profile\Client\PresentationCore.dll" #>
<#@ assembly Name="C:\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETFramework\v4.0\Profile\Client\PresentationFramework.dll" #>
<#@ import namespace="System.Linq" #>
<#@ import namespace="System.Collections.Generic" #>
<#@ import namespace="System.Text.RegularExpressions" #>
<#@ import namespace="System.Reflection" #>
<#
    // 設定:ここに生成したいクラス(のTypeをFullNameで)を足してください(以下の4つは例)
    // クラスによってはassemblyの増減が必要です、WPF/Silverlightなどはフルパス直書きしてください
    var types = new[] {
        typeof(System.Collections.ObjectModel.ObservableCollection<>),
        typeof(System.Windows.Forms.Button),
        typeof(System.Windows.Controls.Primitives.TextBoxBase),
        typeof(System.Windows.Controls.Primitives.ButtonBase)
    };
#>
using System.Linq;
using System.Collections.Generic;

<# foreach(var x in GenerateTemplates(types)) {#>

namespace <#= x.Namespace #>
{
    <# foreach(var ct in x.ClassTemplates) {#>

    internal static class <#= ct.Classname #>EventExtensions
    {
        <# foreach(var ev in ct.EventTemplates) {#>
		
        public static IObservable<IEvent<<#= ev.Args #>>> <#= ev.Name #>AsObservable<#= ct.GenericArgs #>(this <#= ct.Classname #><#= ct.GenericArgs #> source)
        {
            return Observable.FromEvent<<#= ev.Handler + (ev.IsGeneric ? "<" + ev.Args + ">" : "") #>, <#= ev.Args #>>(
                h => <#= ev.IsGeneric ? "h" : "new " + ev.Handler + "(h)" #>,
                h => source.<#= ev.Name #> += h,
                h => source.<#= ev.Name #> -= h);
        }
        <# } #>
    }
    <# }#>
}
<# }#>
<#+
    IEnumerable<T> TraverseNode<T>(T root, Func<T, T> selector)
    {
        var current = root;
        while (current != null)
        {
            yield return current;
            current = selector(current);
        }
    }

    IEnumerable<ObservableTemplate> GenerateTemplates(Type[] types)
    {
        return types.SelectMany(t => TraverseNode(t, x => x.BaseType))
            .Distinct()
            .GroupBy(t => t.Namespace)
            .Select(g => new ObservableTemplate
            {
                Namespace = g.Key,
                ClassTemplates = g.Select(t => new ClassTemplate(t))
                    .Where(t => t.EventTemplates.Any())
                    .ToArray()
            })
            .Where(a => a.ClassTemplates.Any())
            .OrderBy(a => a.Namespace);
    }

    class ObservableTemplate
    {
        public string Namespace;
        public ClassTemplate[] ClassTemplates;
    }

    class ClassTemplate
    {
        public string Classname, GenericArgs;
        public EventTemplate[] EventTemplates;

        public ClassTemplate(Type type)
        {
            Classname = Regex.Replace(type.Name, "`.*$", "");
            GenericArgs = type.IsGenericType
                ? "<" + string.Join(",", type.GetGenericArguments().Select((_, i) => "T" + (i + 1))) + ">"
                : "";
            EventTemplates = type.GetEvents(BindingFlags.Public | BindingFlags.InvokeMethod | BindingFlags.DeclaredOnly | BindingFlags.Instance)
                .Select(ei => new { EventInfo = ei, Args = ei.EventHandlerType.GetMethod("Invoke").GetParameters().Last().ParameterType })
                .Where(a => a.Args == typeof(EventArgs) || a.Args.IsSubclassOf(typeof(EventArgs)))
                .Select(a => new EventTemplate
                {
                    Name = a.EventInfo.Name,
                    Handler = Regex.Replace(a.EventInfo.EventHandlerType.FullName, "`.*$", ""),
                    Args = a.Args.FullName,
                    IsGeneric = a.EventInfo.EventHandlerType.IsGenericType
                })
                .ToArray();
        }
    }

    class EventTemplate
    {
        public string Name, Args, Handler;
        public bool IsGeneric;
    }
#>
// こんなのが生成されます

namespace System.Collections.ObjectModel
{
    internal static class ObservableCollectionEventExtensions
    {
        public static IObservable<IEvent<System.Collections.Specialized.NotifyCollectionChangedEventArgs>> CollectionChangedAsObservable<T1>(this ObservableCollection<T1> source)
        {
            return Observable.FromEvent<System.Collections.Specialized.NotifyCollectionChangedEventHandler, System.Collections.Specialized.NotifyCollectionChangedEventArgs>(
                h => new System.Collections.Specialized.NotifyCollectionChangedEventHandler(h),
                h => source.CollectionChanged += h,
                h => source.CollectionChanged -= h);
        }
    }
}

namespace System.ComponentModel
{
    internal static class ComponentEventExtensions
    {
        public static IObservable<IEvent<System.EventArgs>> DisposedAsObservable(this Component source)
        {
            return Observable.FromEvent<System.EventHandler, System.EventArgs>(
                h => new System.EventHandler(h),
                h => source.Disposed += h,
                h => source.Disposed -= h);
        }
    }

    // 以下略

使い方ですが、RxGenerator.ttとか、名前はなんでもいいのですがコピペって、上の方のvar typesに設定したい型を並べてください。一緒に並べたものの場合は、全て継承関係を見て重複を省くようになっています。WPFとかSilverlightのクラスから生成する場合は、assembly Nameに直にDLLのパスを書いてやってくださいな。コード的には、例によってLinq大活躍というかLinqなかったら死ぬというか。リフレクションxLINQxT4は鉄板すぎる。

一つ難点があって、名前空間をそのクラスの属している空間にきっちりと分けたせいで、例えばWPFのbutton.ClickAsObservableはSystem.Windows.Controls.Primitivesをusingしないと出てこないという、微妙に分かりづらいことになっちゃっています……。これ普通にHogeHogeExtensionsとかいう任意の名前空間にフラットに配置したほうが良かったのかなあ。ちょっと悩ましいところ。

T4の書き方

漠然と書いてると汚いんですよね、T4。読みにくくてダメだし読みにくいということは書きにくいということでダメだ。というわけで、今回からは書き方を変えました。ASP.NETのRepeater的というかデータバインド的にというかで、入れ物クラスを作って、パブリックフィールド(自動プロパティじゃないのって?そんな大袈裟なものは要りません)を参照させるという形にしました。foreachや閉じカッコ("}")は一行にする。<% %>で囲まれる範囲を最小限に抑えることで、ある程度の可読性が確保出来ているんじゃないかと思います。

といったようなアイディアは

よく訓練されたT4使いは 「何を元に作るか」 「何を作るか」 だけを考える。
何を元に作るかはきっと from ... select になるでしょう。 何を作るかの中では <#=o.Property#> で値を出力する事ができます。
csproj.user を作るための T4 テンプレート

からです。「何を元に作るか」 「何を作るか」 。聞いてみれば当たり前のようだけれど、本当にコロンブスの卵というか(前も同じこと書いた気がする)、脳みそガツーンと叩かれた感じで、うぉぉぉぉぉ、と叫んで納得でした。はい。それと、T4は書きやすいと言っても書きにくい(?)ので、囲む範囲を最小にするってことは、普通のコードでじっくり書いてからT4に移植しやすいってことでもあるんですね。

まとめ

最近F#勉強中なのです。Expert F# 2.0買ったので。と思ったらプログラミングF#が翻訳されて発売されるだとー!もうすぐ。あと一週間後。くぉ、英語にひいこらしながら読んでいるというのにー。

F#すげーなー、と知れば知るほど確かに思うわけですが、しかし何故か同時に、C#への期待感もまた高まっていきます。必ずや「良さ」を吟味して取り込んでくれるという信頼感があります、C#には。そしてまた、ライブラリレベルで強烈に何とか出来る地力がある、例えばイベントをストリームに見立てた処理には、Reactive Extensionsが登場してC#でも実現出来ちゃったり。Scalaと対比され緩やかに死んでいくJavaと比べると、F#と対比しても元気に対抗していくC#の頼もしさといったらない。

といっても、F#も全然まだ表面ぐらいしか見えてないし、突っつけば突っつくほど応えてくれる奥の深い言語な感じなので、今の程度の知識で比較してどうこうってのはないです。Java7のクロージャにたいし、Javaにそんなものはいらない、とか頑な態度を取っている人を見るとみっともないな、と思うわけですが、いつか私もC#に拘泥してC#にそんなものはいらない、的なことを言い出すようだと嫌だなー、とかってのは思ってます。進化を受け入れられなくなったら、終わり。

マルチパラダイム言語の勝利→C++/CLI大勝利ですか?→いやそれは多分違う。的なこともあるので何もかもを受け入れろ、ひたすら取り込んで鈍重な恐竜になれ(最後に絶滅する)、とは言いません。この辺のバランス感覚が、きっと言語設計にとって難しいことであり、そして今のC#は外から見れば恐竜のようにラムダ式だのdynamicだのを取り入れてるように見えるでしょうが、決してそうではなく、素晴らしいバランスに立っています。機能の追加が恐竜への道になっていない。むしろ追加によって過去の機能を互換性を保ちつつ捨てているんですよね、例えば、もうdelegateというキーワードは書くどころか目にすることもほとんどない←なのでC#を学習する場合、C#1.0->2.0->3.0->4.0という順番を辿るのは良くなくて、最新のものから降りていったほうがいい。

何が言いたいかっていったらC#愛してるってことですな。うはは。5.0にも当然期待していますし、Anders Hejlsbergの手腕には絶対的に信頼を寄せています。4.0は言語的な飛躍はあまりなかっただけに、5.0は凄いことになるに違いない。

Prev | | Next

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2026

X:@neuecc GitHub:neuecc

Archive