Reactive Extensions用のWebRequest拡張メソッド

WebClientは楽ちんです。WebRequestはシンドイです。そのシンドさといったら、FromAsyncPatternでラップした程度じゃあまり意味がなかったりなわけです。いえ、単純なダウンロード程度ならいいのです。でも、アップロードとか!プログレスとか!そんなのに対応しようとすると、やっぱどうしょうもなく面倒臭い。とはいえ、面倒くさいのも一度書いてしまえば済むわけなので、一通り使いそうなものを書いてみました。Reactive Extensionsが動く環境(.NET 4 Client Profile, Silverlight4)と、Windows Phone 7環境では標準搭載のMicrosoft.Phone.Reactiveで確認取ってあります。

ソースコード、の前に利用例のほうを。

// DownloadStringAsyncメソッドで非同期読み込みも楽チン
var req1 = WebRequest.Create("http://www.twitter.com/statuses/public_timeline.json");
req1.DownloadStringAsync().Subscribe(Console.WriteLine);
 
// UploadValuesAsyncで非同期POSTも楽チン
// この例はgoo.gl短縮URLにPOSTして結果のJSONを取得するというもの
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsync(new Dictionary<string, string>
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    })
    .SelectMany(res => res.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

実に簡単に、「WebClientの同期のように」WebRequestで非同期が扱えます。WebClientと同じ感覚で扱える、というのを大事にするためにも、メソッド名は同じにしてあります(但しUpload系の戻り値はWebResponseとしました、それが最もRx的に制御しやすいと感じたので)。名前末尾にAsyncを付けていますが、この命名規則はAsyncCTPから。

WebRequestでポストする方法のおさらい。GetRequestStreamでRequestStreamを取り出し、それにPOST内容を書きこんでStreamを閉じたら、今度はGetResponseでWebResponseを取得、そのResponseから GetResponseStreamでResponseStreamを取り出し、結果を読み込む。つまりは request.BeginGetRequestStream -> stream.BeginWrite -> request.BeginGetResponse -> stream.BeginRead。ふつーに書いたら人間の扱えるネスト量ではありませんね!

というわけで、streamへのWrite/Readは隠蔽し、単純にアップロードしたら長さ1のReactiveシーケンスが、ダウンロードしたら長さ1のReactiveシーケンスが戻ってくる。という形で簡単に扱えるようにしました。非同期が難しいとか面倒くさいとか、過去の話でしたね!

プログレス表示

プログレス(進捗表示)は大事です。WebClientにもProgressChangedイベントがあるし、せっかく非同期でやってるのだからプログレスも扱いたい。というわけで、それ関連のメソッドも作りました。プログレスは***WithProgressというメソッドを使うと、戻り値が IObservable<Progress<T>>になっていて、このProgressクラスにはValue:値, CurrentLength:現在の長さ, TotalLength:全体の長さ, Percentage:パーセント、の読み取り専用プロパティが入ってます。

// DownloadDataAsyncWithProgressメソッドで進捗を出しながら非同期に画像をダウンロード(して最後に保存する)
var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg")
    .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K
    .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示
    .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換
    .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存
 
// UploadValuesAsyncWithProgressで非同期アップロードの進捗表示もスムーズに
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsyncWithProgress(new Dictionary<string, string> 
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    }, 10) // 分割サイズ、あまりに小さいのは良くない、本当は画像とか大きなファイルでやるべきなのですが適当なPOST先が見つからなくて...
    .Do(p => Console.WriteLine("Up: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // アップロード進捗表示
    .TakeLast(1) // アップロード完了(=最後の1個が通過)までスルー
    .SelectMany(_ => req2.DownloadDataAsyncWithProgress(10)) // 戻り値がProgress<Unit>なので、WebRequestからレスポンス取得
    .Do(p => Console.WriteLine("Down: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // ダウンロード進捗表示
    .Aggregate(new List<byte>(), (l, p) => { l.AddRange(p.Value); return l; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => Encoding.UTF8.GetString(l.ToArray())) // 応答を文字列(JSON)に変換
    .Subscribe(Console.WriteLine);

基本的には非プログレスと同じなのですが、長さ1ではなくて進捗状況に応じた分だけ値が流れてくるという違いがあります。また、書き込み時はUnit、読み込み時はbyte[]が来るので、それを一本にまとめる必要があります。というわけで、若干の手間とクセがあるのですが、まあそれなりに平易に扱えるのではと思います。UploadProgressとDownloadProgressを連結する辺りには、Rxの合成のパワーが見えるのではないでしょうかどうでしょうか。パワーが見えるのはいいとして意味が伝わるかは微妙なところなので、以下図解。

まずはResponseStreamに細切れにWriteし、進捗状況を垂れ流します。Writeなので戻り値はない。ので、Unit(voidみたいなものだと思ってくだしあ)。そしてSelectManyでWriteからReadへ。そう、RxにおいてのSelectManyは、1対多とかフラットにするとかというよりも、イメージとしては「摩り替える」だと思っています。マウスダウンしたと思っていたらマウスムーブに摩り替わっていた、何を (ry。といったように。今回のコードだと、Writeしていたと思ったらいつのまにかReadになっていた、何を言っているのか以下略。

Writeの戻り値にそのままSelectManyを繋げてはいけません。Readを始めるのはWriteが「終わってから」でなければならない。そこで出番なのがTakeLast(1)。意味はそのまんまで、最後の1個を通すというもの。対としてTake(1) - 最初の1個だけ通す、もよく使いますね。

Readもまた細切れにbyte[]で値が読み込まれ送られてきますが、文字列に変換するにせよ何にせよ、 IObservable<byte[]>からbyte[]にしたい。ここでIEnumerable<byte[]>だったら SelectMany.ToArray って感じですが、Observableではそうもいかないので、ここはAggregateを使ってリストに値を詰めてみました。

最後は文字列に変換して、煮るなり焼くなり好きにどうぞ。ブロックはいっぱいありますが、Subscribeに届くのは最後の、集計が全て終わった一つだけというわけでした。なお、進捗はDoメソッドで随時、その箇所に値が流れるたびに画面に出力しています。一見メソッドチェーンだらけで複雑そうですが、順を追ってみてみれば、メソッド全てが、メソッド名通りの意味を明確に持った挙動を取るのと、それぞれのメソッド自体はお馴染みのLinqの挙動そのものなので、Rxの導入までの学習コストというのは存外低いかもしれません(導入を超えた後の敷居に関してはノーコメント)。

あと、このようにブロックが細切れになっているわけですが、これによりキャンセルが容易になるという性質を持っています。キャンセルは Subscribeの戻り値(IDisposable)のDisposeを呼ぶだけで済むのですが、図のとおりに分割されているため、簡単にブロック間に割って入って処理を止めることが可能です。この、処理単位がIObservableとして分割されていることによるキャンセルの容易さは、わざわざ CancellationTokenをチェックしたり(Task)、CancellationPendingをチェックしたり (BackgroundWorker)を処理の途中に挟みこむ必要がないという、Rxの大きな利点となっています。この辺の比較などは次回にでも。

なお、プログレスや細かい単位でのキャンセルが必要なければ、冒頭の例のようにWithProgerss抜きの方のメソッドを使えば、通常の非同期と同じく長さ1のReactiveシーケンスとして、Aggregateとかの処理も拡張メソッド側で全部やってくれますのでお手軽に使えます。

ソースコード

以下ソースコード。ご利用はご自由にのパブリックライセンスで。.NET 4 Client Profile, Silverlight4, Windows Phone 7環境下で動くのを確認してます。名前空間としてusing AsynchronousExtensions することで、拡張メソッド群が使えるようになります。そんなわけでお薦めのファイル名はAsynchronousExtensions.cs。ソースコード一つというわけで、気に入らない箇所がありましたら、勿論当然直接書き換えてしまえばOKです。バグがあれば教えてください。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
#endif

namespace AsynchronousExtensions
{
    internal static class Progress
    {
        public static Progress<T> Create<T>(T value, double currentLength, double totalLength)
        {
            return new Progress<T>(value, currentLength, totalLength);
        }
    }

    internal class Progress<T>
    {
        public T Value { get; private set; }
        public double TotalLength { get; private set; }
        public double CurrentLength { get; private set; }
        public int Percentage
        {
            get
            {
                return (TotalLength <= 0 || CurrentLength <= 0)
                    ? 0
                    : (int)((CurrentLength / TotalLength) * 100);
            }
        }

        public Progress(T value, double currentLength, double totalLength)
        {
            Value = value;
            TotalLength = totalLength;
            CurrentLength = currentLength;
        }
    }

    internal static class WebRequestExtensions
    {
        public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<WebResponse>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar =>
                {
                    var res = request.EndGetResponse(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<Stream> GetRequestStreamAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<Stream>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, ar =>
                {
                    var res = request.EndGetRequestStream(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<byte[]> DownloadDataAsync(this WebRequest request)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsync());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize));
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request)
        {
            return DownloadStringAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringAsync(encoding));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request)
        {
            return DownloadStringLineAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringLineAsync(encoding));
        }

        public static IObservable<WebResponse> UploadStringAsync(this WebRequest request, string data)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadStringAsyncWithProgress(this WebRequest request, string data, int chunkSize = 65536)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadValuesAsync(this WebRequest request, IDictionary<string, string> parameters)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadValuesAsyncWithProgress(this WebRequest request, IDictionary<string, string> parameters, int chunkSize = 65536)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadDataAsync(this WebRequest request, byte[] data)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsObservable(data, 0, data.Length).Finally(() => stream.Close()))
                .TakeLast(1)
                .SelectMany(_ => request.GetResponseAsObservable());
        }

        public static IObservable<Progress<Unit>> UploadDataAsyncWithProgress(this WebRequest request, byte[] data, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsync(data, chunkSize))
                .Scan(0, (i, _) => i + 1)
                .Select(i =>
                {
                    var currentLength = i * chunkSize;
                    if (currentLength > data.Length) currentLength = data.Length;
                    return Progress.Create(new Unit(), currentLength, data.Length);
                });
        }
    }

    internal static class WebResponseExtensions
    {
        public static IObservable<byte[]> DownloadDataAsync(this WebResponse response)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync())
                .Finally(() => response.Close())
                .Aggregate(new List<byte>(), (list, bytes) => { list.AddRange(bytes); return list; })
                .Select(x => x.ToArray());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize))
                .Finally(() => response.Close())
                .Scan(Progress.Create(new byte[0], 0, 0),
                    (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength));
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response)
        {
            return DownloadStringAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response, Encoding encoding)
        {
            return response.DownloadDataAsync().Select(x => encoding.GetString(x, 0, x.Length));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response)
        {
            return DownloadStringLineAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response, Encoding encoding)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadLineAsync(encoding))
                .Finally(() => response.Close());
        }
    }

    internal static class StreamExtensions
    {
        public static IObservable<Unit> WriteAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern((ac, o) => stream.BeginWrite(buffer, offset, count, ac, o), stream.EndWrite)();
        }

        public static IObservable<int> ReadAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern<int>((ac, o) => stream.BeginRead(buffer, offset, count, ac, o), stream.EndRead)();
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data)
        {
            return WriteAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, encoding.GetBytes(data));
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IEnumerable<byte> data, int chunkSize = 65536)
        {
            return WriteAsync(stream, data.ToObservable(), chunkSize);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IObservable<byte> data, int chunkSize = 65536)
        {
            return Observable.Defer(() => data)
                .Buffer(chunkSize)
                .SelectMany(l => stream.WriteAsObservable(l.ToArray(), 0, l.Count))
                .Finally(() => stream.Close());
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, data + Environment.NewLine, encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data, Encoding encoding)
        {
            return WriteLineAsync(stream, data.ToObservable(), encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data, Encoding encoding)
        {
            return WriteAsync(stream, data.SelectMany(s => encoding.GetBytes(s + Environment.NewLine)));
        }

        public static IObservable<byte[]> ReadAsync(this Stream stream, int chunkSize = 65536)
        {
            return Observable.Defer(() => Observable.Return(new byte[chunkSize], Scheduler.CurrentThread))
                .SelectMany(buffer => stream.ReadAsObservable(buffer, 0, chunkSize),
                    (buffer, readCount) => new { buffer, readCount })
                .Repeat()
                .TakeWhile(a => a.readCount != 0)
                .Select(a =>
                {
                    if (a.readCount == chunkSize) return a.buffer;

                    var newBuffer = new byte[a.readCount];
                    Array.Copy(a.buffer, newBuffer, a.readCount);
                    return newBuffer;
                })
                .Finally(() => stream.Close());
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, int chunkSize = 65536)
        {
            return ReadLineAsync(stream, Encoding.UTF8, chunkSize);
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, Encoding encoding, int chunkSize = 65536)
        {
            return ObservableForCompatible.Create<string>(observer =>
            {
                var decoder = encoding.GetDecoder();
                var bom = encoding.GetChars(encoding.GetPreamble()).FirstOrDefault();
                var sb = new StringBuilder();
                var prev = default(char);

                return stream.ReadAsync(chunkSize)
                    .SelectMany(bytes =>
                    {
                        var charBuffer = new char[encoding.GetMaxCharCount(bytes.Length)];
                        var count = decoder.GetChars(bytes, 0, bytes.Length, charBuffer, 0);
                        return charBuffer.Take(count);
                    })
                    .Subscribe(
                        c =>
                        {
                            if (c == bom) { } // skip bom
                            else if (prev == '\r' && c == '\n') { } // when \r\n do nothing
                            else if (c == '\r' || c == '\n')   // reach at EndOfLine
                            {
                                var str = sb.ToString();
                                sb.Length = 0;
                                observer.OnNext(str);
                            }
                            else sb.Append(c); // normally char

                            prev = c;
                        },
                        observer.OnError,
                        () =>
                        {
                            var str = sb.ToString();
                            if (str != "") observer.OnNext(str);
                            observer.OnCompleted();
                        });
            });
        }
    }

    internal static class ObservableForCompatible
    {
#if WINDOWS_PHONE
        public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
        {
            return source.BufferWithCount(count);
        }
#endif

        public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
        {
#if WINDOWS_PHONE
            return Observable.CreateWithDisposable(subscribe);
#else
            return Observable.Create(subscribe);
#endif
        }
    }
}

Progress用のクラスと、WebRequest, WebResponse, Streamに対する拡張メソッドです。各メソッドは大体数行でローカル変数もほとんどないコンパクトなものですが(Rxの強力さに全力で乗っかってるだけ)、コード自体はあまり追いやすくはないかもです。まあ、面倒くさい部分は拡張メソッド側で隠蔽してやれるならそれで良いと思ってます。利用側はWebClientを同期で使うように、簡単にWebRequestが非同期で扱えるんじゃないかなー、と思いますがどうでしょう。そしてついでなので多めに作ってしまった……。

StreamのReadAsync、WriteAsyncですが、それぞれ一度の読み込み/書き込みサイズ指定(デフォルトは64K)で、ストリーミングで読み書きをするようになっています。特にWriteAsyncで要求するbyteはIEunmerable/IObservableですから。つまり、「ストリーミングで読みながら」「ストリーミングで書きこむ」ことが出来るという、メモリに非常に優しいプログラミングが可能になっています。(まあ、読み込みが速くて書き込みが遅い場合は、読み込みだけどんどん進んで、どこかで溜め込まれてしまうわけですけどー、WriteされたらReadが始まる、みたいな仕組み作れないかなー、とは思いつつ方法分からない)

ところでしかし、Rxも素のままではなく、拡張補助ライブラリ的なのを用意するといいのかもねえ。Achiralのように。T4によるFromEventの自動生成などと一緒にまとめて、Rx Supplemental Library。うーん。Rx利用の俺々MVVMライブラリ(を、いつか作りたい、今はまだMVVM自体がヨクワカッテナイのですが)と一緒に、そのうちにでも。

まとめ

WPF, SL, WP7で全く同じコードが動くって素敵。Rx素晴らしい。んで、もしかしてRxって面倒くさいの……? と思ったとしたら、いえ、違います。面倒くさいのはWebRequestでプログレスやアップロード処理を作るのが面倒くさいだけです。上のコードがゲップでそうな感じであるとしたら、Rxが面倒なせいではなくて、WebRequestが面倒なせいなだけです。これRx抜きに同期で書いても面倒臭いです。それどころか、一層大変なことになっていました。むしろRxの上に乗っかっているからこそ、色々な演算子が使えて、コンパクトに書けたのではないかと思われます。また、プログレスや柔軟なキャンセルなど、「クライアントアプリならスレッド余ってるし同期的に書いてThreadPoolに突っ込んでも問題ないし楽っしょー」といった次元を超えた価値をReactive Extensionsは提供できているのではないでしょうか。

プログレスは大事。何が大事って、最近Windows Phone 7を輸入して電波法違反、じゃなくて、ええと、まあ、b-mobile回線で使っているのですけど、死ぬほど遅い。b-mobile遅い。MAX 300kbpsと謳っていて、それも遅いわけだけど実測だと100kbpsで大変遅い。なので、TwitterのXML引っ張ってくる程度であっても(モバイル回線は貧弱なのでXMLじゃなくてJSONがいいですねえ)、何%といった表示は欲しかったりなのです。さすがにそれはやり過ぎだとしても、Twitpicなどにカメラ画像を上げる時などではもう必須と言ってもいいぐらい。

でも、WebRequest使うとプログレスはご覧のように面倒臭いので、どうしてもサポート出来なかったりなのですよね。WebClient使えよってのは正論なのですが、諸事情あったりでWebRequest使いたいって場合もあるでしょう(OAuthとかあるしね)。もうひとつは、Reactive Extensionsに載せるならイベントベースになっているWebClientよりもWebRequestのほうが使いよいということもあり。

そんなジレンマの日々も今日でサヨナラです。Rx + WebRequestでConsoleでも、WPFでも、Silverlightでも、Windows Phone 7でも、幸せな非同期生活を送りましょう。ところで今回のプログレスの話はasync/awaitやBackgroundWorkerと絡めてお話したかったのですが、長くなるので断念。これは次回に(いや、次々回かもしれませんが)必ず書きます!もう既に半分ぐらいサンプルコードとかも書いてはあるので、絶対に近いうちには。

なお、今回の記事はRxTeamのJeffery Van GoghのブログシリーズRx on the serverが下敷きになってます。そして、Rx on the serverで紹介されているコードは、最新のRxのリリースに含まれているSystem.Reactive.ClientProfileに収録されています。が、このPart2で紹介されているAsyncReadLines(非同期でのStreamから一行毎にString取り出し)は簡易的すぎて使い物になりません。2バイト圏無視してるし、1バイトであっても挙動は怪しい。サンプルレベルなら良いと思うし記事は素晴らしく参考になったのですが、本体に収録/配布はやめて欲しかったなあ、少しがっかり。

でもAsyncReadLines自体は欲しいですねえ。今のところ一行毎に取り出すにはStreamReaderしかないのだけれど、非同期APIがないので。非同期読み込み可能なStreamReaderは、何故かSystem.DiagnosticsにAsyncStreamReaderとしてあったりするのですが、internalクラスなので外からでは使えません。というわけで、非同期で一行毎読み込みをやるには、自前実装しか無いようで。うーん、やだなー。私はゆとりゆるふわプログラマなので、バイトとかエンコーディングとかが絡むのはやりたくないなー。というか絶対ミスするでする。

※追記@12/03, やっぱり欲しいと思ったのでReadLineAsyncとして実装しました。↑ソースコードにも反映させてあります。

※追記@2011/8/29, .NET版とWP7同梱版との互換性をもたせました。また、リソースの扱いを正確にしました。

※追記@2011/10/15, ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリの一部として、より機能向上させライブラリとしてまとめあげたものを公開しました。↑ソースコードをそのまま使うより、ReactivePropertyを参照するほうをお薦めします。

余談:Windows Phone 7

そういえば、WP7ですか?中々良いですよ、日本語さえまともならば。それを抜きにしても、正直なところ想像以上に自由の効かないガチガチ縛りなので、Androiderな方々は絶対気に入らないと思っています。まあでも、私は好きですねえ。初版なのでダメなところが目立つのはしょうがないし、徐々に改善される(といいなあ!)でしょう。良いところに目を向ければ、大変可能性を感じるOSに仕上がってると思います。

ちなみにWP7の開発環境に望むのはJSONサポート何とかしる!ってことですかね。標準だとシリアライザしか入ってないんですよ、ふざけんな死ね。System.Json入れるかJsonReaderWriterFactory入れるかしろって話ですよ本当に。あとdynamicのサポートが入ってくれればDynamicJsonを対応させるんだけどなあ。

今のところ実機にアプリケーションを転送できません。WP7は認証が必要なので(有料)。で、認証のための証明書の発行手続きはしたのですが、その発行を請け負っているGeoTrustという会社があまりにも仕事しないせいでうばばばば。しょうがないので、今やれることは基盤をがっつし固めることぐらいですはい。

C#のEnumを(Javaのように)別の値を持たせるなど拡張する

Enumに文字列を与えたいというのは少なくなくよくあると思います。例えばFruits.Appleには.ToString()したら「リンゴ」と出て欲しいなー、とか。それならFruits.リンゴと、日本語名つければ?というのはごもっとも。でも、同時に「林檎」とも付けたいなー、とかも思ってしまったりするわけです。しません?Java→C#な人が一番不満に思うのはEnumのようですし(JavaのEnumは高機能!)。

例えばこんな風にかけたらいいな、って。

// こうやって属性定義するだけ!
public enum Color
{
    [Japanese("黒"), Hex("000000"), Rgb(0, 0, 0)]
    Black,
    [Japanese("白"), Hex("FFFFFF"), Rgb(255, 255, 255)]
    White,
    [Japanese("赤"), Hex("FF0000"), Rgb(255, 0, 0)]
    Red
}

class Program
{
    static void Main(string[] args)
    {
        var red = Color.Red;
        Console.WriteLine(red.ToHex()); // FF0000
        Console.WriteLine(red.ToJpnName()); // 赤
        Console.WriteLine(red.ToRgb()); // 255000000
    }
}

んね、非常にすっきり定義出来て幸せ度高い。これをもし普通に書くならば

interface IColor
{
    string EngName { get; }
    string JpnName { get; }
    string Rgb { get; }
    string Hex { get; }
}

public class Red : IColor
{
    public string EngName { get { return "Red"; } }
    public string JpnName { get { return "赤"; } }
    public string Rgb { get { return "255000000"; } }
    public string Hex { get { return "FF0000"; } }
}

といった感じになって面倒くさいことこの上ない(いや別にこれクラスの意味あんまなくてnew Color("Red", "赤",..)といった感じにインスタンスでいいやん、という感じではありますががが。Enumはswitch要因なとこがメインなので、そもそもColorという例が良くないかしら...) まあともかく、Enumは素晴らしい。属性もまた素晴らしい。んで、Enumへの別名などの定義の仕組みは非常に単純で、個別のEnumへ拡張メソッドを定義しているだけです。Enumと拡張メソッドは相性が良い。

public static class ColorExtensions
{
    private static Dictionary<Color, RgbAttribute> rgbCache;
    private static Dictionary<Color, HexAttribute> hexCache;
    private static Dictionary<Color, JapaneseAttribute> jpnCache;

    static ColorExtensions()
    {
        // Enumから属性と値を取り出す。
        // この部分は汎用的に使えるようユーティリティクラスに隔離してもいいかもですね。
        var type = typeof(Color);
        var lookup = type.GetFields()
            .Where(fi => fi.FieldType == type)
            .SelectMany(fi => fi.GetCustomAttributes(false),
                (fi, Attribute) => new { Color = (Color)fi.GetValue(null), Attribute })
            .ToLookup(a => a.Attribute.GetType());

        // キャッシュに突っ込む
        jpnCache = lookup[typeof(JapaneseAttribute)].ToDictionary(a => a.Color, a => (JapaneseAttribute)a.Attribute);
        hexCache = lookup[typeof(HexAttribute)].ToDictionary(a => a.Color, a => (HexAttribute)a.Attribute);
        rgbCache = lookup[typeof(RgbAttribute)].ToDictionary(a => a.Color, a => (RgbAttribute)a.Attribute);
    }

    public static string ToJpnName(this Color color)
    {
        return jpnCache[color].Value;
    }

    public static string ToHex(this Color color)
    {
        return hexCache[color].Value;
    }

    public static string ToRgb(this Color color)
    {
        var rgb = rgbCache[color];
        return string.Format("{0:D3}{1:D3}{2:D3}", rgb.R, rgb.G, rgb.B);
    }
}
// 属性などり
[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class JapaneseAttribute : Attribute
{
    public string Value { get; private set; }

    public JapaneseAttribute(string value)
    {
        Value = value;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class RgbAttribute : Attribute
{
    public int R { get; private set; }
    public int G { get; private set; }
    public int B { get; private set; }

    public RgbAttribute(int r, int g, int b)
    {
        R = r;
        G = g;
        B = b;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class HexAttribute : Attribute
{
    public string Value { get; private set; }

    public HexAttribute(string value)
    {
        Value = value;
    }
}

少し、処理が多いですかね。属性定義の部分はスルーでいいので、本質的にはToJpnNameなどの拡張メソッド定義の部分と、静的コンストラクタでEnumから属性と値を取り出してる部分だけなので、そんなでもないです。静的コンストラクタを使っているのは、ここでDictionaryに突っ込むことで、毎回リフレクションがゴニョゴニョと走り出すのを避けています。

属性は固定で複数を付加するなら名前付き引数でもいいですね。

まとめ

まとめというか、このネタは以前にも enumの日本語別名とか三項演算子ネストとか という記事で書いてたりはしたのですが、もう少し実用的になるような形にしました。

C#のEnumは、私は好きです。素朴というか、シンプルで使いやすいですよね。Visual Studioでswitch使うと一気に列挙してくれるのもいいし、勿論ビットフラグも。機能だけならクラス立てれば代替出来ないこともないけれど、色々と遥かに面倒くさくなる。Enumはシンプルに、すっきり書けるのが素敵。Javaは高機能(コンパイル時にクラス作るわけですしね)なのはいいとしても、EnumSetとか、なんだか大仰で、その辺があまり好きではない。

この拡張メソッドは、隙間を少し埋めてくれるかな、と思います。

イベントベースの非同期処理をReactive Extensionsで扱う方法

Reactive Extensionsを使用して複数のサービス非同期コールを連続して呼べるか試してみた。その2 - y_maeyamaの日記 という記事にて、Rxを使ってWCFを綺麗に呼んでました。なるほど!Silverlightで!WCFで!非同期コールを連続して呼ぶ!とっても実践的なテーマ。XboxInfoほげほげがどうのこうの、とかやってたどこかの私と違います、げふんげふん。とても素晴らしいテーマとコードだと思ったので、拝借して私も少し書いてみました。

// 実行部分(実行後10秒後ぐらいにWCFサービスの連鎖を経てメッセージボックスに結果を表示)
var client = new ServiceReference1.Service1Client();

var asyncQuery = from a in Observable.Defer(() => client.GetAAsObservable("てすと"))
                 from b in client.GetBAsObservable(a.EventArgs.Result)
                 from c in client.GetCAsObservable(b.EventArgs.Result)
                 select c.EventArgs.Result;

asyncQuery.Subscribe(s => MessageBox.Show("チェーン終了 - " + s));

「前に呼んだサービスの引数にアクセス」に関しては、SelectManyで匿名型を作る、が解だと思います。で、幾つも連鎖する時はクエリ構文を使うのが大変お手軽。ただしクエリ構文は独自定義のメソッドと繋がりが悪くなるという欠点があります。RxではDoとかObserveOnとか結構使いますから、クエリ構文、便利な用で使いどころに困る代物。その辺は実際に書く内容によって判断つけるところですねー。

XxxAsObservable

突然出てきているGetAAsObservableって何だ、といったら、勿論、拡張メソッドです。FromEventやFromAsyncPatternなどはコード本体に書いてあると鬱陶しいので、基本は拡張メソッドで隔離してしまいましょう。命名規則に特に決まりはありませんが、私はXxxAsObservableという規則で書いています。今回はWCFのサービス参照で自動生成されたイベントベースの非同期処理のラップなので、FromEventをベースに、ただし少し小細工を。

public static class IObservableExtensions
{
    // イベントベースの非同期はRxと今一つ相性が悪いので変換する
    public static IObservable<IEvent<T>> ToAsynchronousObservable<T>(this IObservable<IEvent<T>> source) where T : EventArgs
    {
        var connectable = source
            .Take(1) // 実行は一回のみ(これしとかないとイベントハンドラの解除がされないし、そもそもPruneが動かなくなる)
            .Prune(); // AsyncSubjectに変換
        var detacher = connectable.Connect(); // コネクト(イベント登録実行)
        return connectable.Finally(() => detacher.Dispose()); // 任意のDispose時にイベントのデタッチ(なくても構いません)
    }
}

public static class Service1Extensions
{
    // 量産なので多いようならT4テンプレートで生成してしまうと良いでしょう
    public static IObservable<IEvent<GetACompletedEventArgs>> GetAAsObservable(this Service1Client client, string key)
    {
        var o = Observable.FromEvent<GetACompletedEventArgs>(
                h => client.GetACompleted += h, h => client.GetACompleted -= h)
            .ToAsynchronousObservable();
        client.GetAAsync(key); // 非同期実行開始
        return o;
    }
    
    // あと二つほど
}

ふつーにFromEventをラップして終わり、ではなくて少々細工を仕込んでいます。理由は、イベントベースの非同期処理はそのままだとRxでは扱いづらいから。

イベントベース非同期処理とReactive Extensions

.NET Frameworkには基本的に2つの非同期処理の方法があります。一つはBeginXxx-EndXxxによるAPM(非同期プログラミングモデル)。もう一つはXxxAsyncとXxxCompletedによるイベントベースの非同期処理。イベントベースは素で扱う分にはAPMよりもずっと書きやすかった。だからWebClient、BackgroundWorkerなど、手軽に使える系のものに採用された(というのが理由かは知りませんが)。そしてサービス参照の自動生成のものもイベントベース。

しかし、ラッピングして使う場合はとにかく使いづらい!一度の登録で何度も何度も実行されてしまうことは合成時に都合が悪い。また、ラップしたメソッドと、引数を渡す処理の実行箇所が離れてしまうことは(FromEvent.Subscribe で登録して client.GetAsync で実行)書くのが二度手間になり面倒、という他に、Subscribeよりも前に非同期実行して、更にSubscribeよりも前に非同期実行が完了してしまった場合は何も起こらなくなる。といった実行タイミングの面倒くさい問題まで絡んでくる。

というわけで、イベントベースの非同期処理をReactive Extensionsに載せる場合は、ただたんにFromEventで包むだけではなく、一工夫することをお薦めします。

それが上で定義したToAsynchronousObservable拡張メソッド。これはRxを使って非同期プログラミングを簡単にの時に少し出しました。 source.Take(1).Prune() ですって! ふむ、よくわからん。一行ずつ見ていくと、sourceは、この場合はFromEvent後のものを想定しています(なので拡張メソッドの対象はIObservable<IEvent>)。それをTake(1)。これは、ふつーの非同期処理では実行完了は1回だけだから、それを模しています。イベントベースのラップなので何もしないと無限回になっていますから。

続けてPrune。これは内部処理をAsyncSubjectに変換します。AsyncSubjectに関してはReactive Extensionsの非同期周りの解説と自前実装で簡易実装しましたが、処理が完了(OnCompleted)まで待機して、完了後はキャッシュした値を流すという、非同期処理で不都合(実行タイミングの問題など)が起こらなくするようにしたもの。対になるのはSubjectでイベント処理を模したもの。FromEventを使うと内部ではSubjectが使われることになるので、それを非同期で都合が良い形であるAsyncSubjectに変換する、ためにPruneを使いました。そしてここでConnectすることで即座にイベントにアタッチします(実行タイミングの前後の問題をなくすため)。

awaitは救いの手?

Reactive Extensionsは準備さえ済んでしまえば、非同期が恐ろしく簡単に書ける、のですが、如何せん準備が決して簡単とは言いません。さて、それがC# 5.0のasync/awaitは魔法のように解決してくれるのか、というと、そんなことはありません。現在のAsync CTPではAsyncCtpLibrary.dllにAsyncCtpExtensionsというクラスが用意されていて、その中には既存クラスに対して涙ぐましいまでにAsyncで使うのに最適なようにとXxxAsync拡張メソッドが大量に定義されています。例えばWebClientのDownloadStringにはDownloadStringTaskAsyncが。少しその中身を見てみれば、上で書いたものとやっていることは同じようなもの。イベントを登録して、完了時にはイベントハンドラを解除して。

既存クラスは用意してもらったのがあるからいいけど、サービス参照のような自動生成されるクラスにたいしてはどうすれば?答えは勿論、自分で書こうね!orz。まだまだ、そんな受難な時代は続きそうです。でも、Rxは一度分かってしまえばTake(1).Prune()で済むし、T4で自動生成もそんな難しくない雰囲気なので、まあ悪くはないんじゃないでしょーか。悪いのはむしろイベントベースの非同期処理なのでは、という気がしてきた昨今。

まとめ

今月はまだまだ非同期周りの記事を書くよ!いっぱい書くよ!

そういえばで、今回のソースをBitbucketに上げました。今後も何か書くときは合わせてBitbucketに上げていきたいと思っています。

neuecc / EventToAsync / overview – Bitbucket

RSSなどのアイコンが並んでいる部分の一番右のget sourceからzipで落とせます(Bitbucketはナビゲーションが少しわかりづらいのよねえ、Mercurialベースで非常に快適なソースホスティングサービスだとは思うのですが)。また、RxのライブラリはNuGetを使って参照しています(なのでRx本体のインストールがしてなくても試せます、かわりにNuGetが必要?不必要?その辺まだ仕組みがよく分かってないのですが……)。以前はNuPackっていう名前でしたが、名前が被るとかで変更されたそうです。VS2010の拡張機能マネージャのオンラインギャラリーからもVS拡張がインストール出来るので、とりあえずお薦め。色々なライブラリが簡単にインストール出来ます。

私もlinq.js登録しちゃおうかな、かな……。まあ、Fixしなきゃいけないことがかなり溜まってるので、まずはそれやれよって話なのですががが。

実践例から見るReactive Extensinosを用いた非同期処理

私はXboxInfoTwitという、Xbox.comからデータをスクレイピングで取り出してTwitterに投稿するという、大変不届き者なアプリケーションを公開しています。お陰様で認証者数も2500人を超えて、割と活況。このアプリケーションでのデータ取得ですが、正規なルートでの情報取得ならばAPI叩けば一発、というケースも少なくないですが、いかんせんルートがアレなので一回の取得で取れる情報は断片。あちらこちらから値を受け渡し組み立てなければならないという、入り組んだ通信手順になっています。これを、もし全部非同期でやろうとしたら目眩がするなあ。ふむ。そこでReactive Extensions。むしろ良いネタがあるじゃないか!というわけでクローラーのコア部分を完全非同期 + Reactive Extensinosで全面的に書きなおしてみました。

neuecc / XboxInfoTwitCore / overview – Bitbucket

dllでライブラリという形体を取っていますが、基本的には誰にも使い道がないものかと思われます。というわけで、このXboxInfoTwitCoreの内容自体はどうでもいいとスルーして、この現実的な課題(?)にReactive Extensinosがどう簡単にしてくれたかを見ていきます。

非同期通信の連続

まずは、手順について図で。最近、パワーポイントでのポンチ絵を書く練習を兼ねて、しかしこんなヘタクソで大丈夫か??

プレイ状況の取得は、MyXbox/Profileから取得、実績内容の取得はそこからゲームのタイトルIDを取り出して、個別のゲーム画面のURLを作り取得。最終的には両者を結合してデータを作り出します。ただし、ログイン前にMyXbox/Profileにアクセスすると認証フォームにリダイレクトされるので、そこでごそごそと処理してlive.comの認証クッキーを取得する必要があります。この認証クッキー取得が若干ややこしい。一度分かってしまえばそんなでもないのですけれど、ちょびっと嫌らしい造りになっています(恐らくこの認証部分はWindows Liveで共通だと思うので、SkyDriveをモニョモニョしたい、など考えている人は参考にでもどうぞ)

ちなみに非同期のPOSTってのは、BeginGetRequestStreamで非同期でRequestStreamを呼び出して書き込む(POST)してから、BeginGetResponseで非同期でレスポンスを取得するので、ネスト量2倍。クッキー認証部分だけで、普通に書くと7つぐらいネストするわけです……。

Asynchronus Query

細部を見る前に、最終的にどうなったか、を。

var fetcher = new LiveTokenFetcher("userid", "password");
var locale = new Locale("ja-JP", "日本");

// 非同期クエリ組み立て
var asyncQuery =
    from crawler in
        from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
        select new XboxInfoCrawler(cookie, locale)
    from pi in crawler.GetPlayerInfo()
    from ai in pi.GameInfo.TitleId != 0
        ? crawler.GetAchievement(pi.GameInfo.TitleId)
        : Observable.Return<AchievementInfo>(null)
    select pi.Tap(x => x.GameInfo.AchievementInfo = ai); // Tapについては解説しな...い

// 実行
asyncQuery.Subscribe(pi => Console.WriteLine("{0} - {1}", pi.GamerTag, pi.PlayerState));

(私にしては珍しく)クエリ構文を使っていますが、こう見ると本当に、非同期処理がLinqに統合された形で問い合わせられる、ということが良く伝わるのではないでしょうか?処理の順序が一見イミフ、というのは慣れの問題です!解説します。

GetLoginCookie、GetPlayerInfo、GetAchievementがIObservable<T>を返す、非同期処理。GetLoginCookieはポンチ絵で言うところの、クッキー取得で、GetPlayerInfoが中央左、GetAchievementが中央右で、最後のselectがマージというわけです。ややこしくネストするはずの非同期が非常にあっさりと記述できました。

GETとPOSTの連続

一番面倒くさいログインクッキー取得の部分について。

public IObservable<CookieCollection> GetLoginCookie(string url)
{
    return Observable.Defer(() => CreateWebRequest(url).GetResponseAsObservable())
        .Select(res => res.TransformResponseHtml(xml => new
        {
            Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
            PostUrl = GetPostUrl(xml),
            PPFT = GetInputValue(xml, "PPFT"),
            PPSX = GetInputValue(xml, "PPSX")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post, a.Cookie)
            .UploadValues(new Dictionary<string, string>
            {
                {"LoginOptions", "3"},
                {"PPFT", a.PPFT},
                {"PPSX", a.PPSX},
                {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
                {"login", loginId},
                {"passwd", password}
            }))
        .Select(res => res.TransformResponseHtml(xml => new
        {
            PostUrl = GetPostUrl(xml),
            Anon = GetInputValue(xml, "ANON"),
            T = GetInputValue(xml, "t")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post)
            .Tap(r => r.AllowAutoRedirect = false)
            .UploadValues(new Dictionary<string, string>
            {
                {"ANON", a.Anon},
                {"t",Uri.EscapeDataString(a.T)}
            }))
        .Select(res => ConstructCookie(res.Headers["Set-Cookie"]));
}

となってます。長くて一瞬ウゲ、となりますが、基本的にはSelectとSelectManyしか使っていません。Selectは流れてくる値を変換。SelectManyは、次の非同期処理の実行。そう覚えて貰えれば、どうでしょう?

先頭から見るとurl(MyXbox/Profile)に対して非同期でのGetResponseを開始。認証フォームにリダイレクトされるので、HTMLをスクレイピングしてPOSTに必要なデータ(PPFT,PPSXなど)を集める。そして非同期でポスト(UploadValuesは、詳細はソースコードを参照してもらうとして非同期で投稿してIObservable<WebResponse>を返す自前定義の拡張メソッドです)。続いて、スクレイピングして次のPOSTに必要なデータ(ポスト先URL,Anon,T)を生成、そして再びUploadValuesで非同期ポスト。するとWebResponseに認証クッキーがあるので、ヘッダーの"Set-Cookie"からクッキーを取ってCookieCollectionを生成したものを返す。

となってます。流れるように!また、最後がSelectで終わっているように、流れは終わっていません。利用者が、ここから更に非同期での流れを繋いでいくわけです。

SelectとSelectMany、そしてクエリ構文

御大層なことを言いつつ、SelectとSelectManyしか使ってないじゃないか!というと、はい、その通りです。それでいいんです。ふつーのシチュエーションでの非同期処理って、そういうものでしょう?データを加工するSelectと、ネストをフラットにするSelectMany。これだけ覚えれば十分であり、そして、役立ちなのです。

ところで、冒頭の例で(メソッド構文信者な私が)何でクエリ構文を使ったかというと、そこそこ綺麗に見えるかなー、とか思ったのともう一つは、割と必要に迫られたから。書き方は色々あります。例えばクエリ構文をベースにするなら、他にも

// クエリ構文とメソッド構文を混ぜるのも時には分かりやすさに繋がる
from crawler in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
                       .Select(cookie => new XboxInfoCrawler(cookie, locale))
from pi in crawler.GetPlayerInfo() // 以下略

// select intoを使うことで完全フラットに流すことも可能です
from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
select new XboxInfoCrawler(cookie, locale) into crawler
from pi in crawler.GetPlayerInfo() // 以下略

と出来るでしょう。特に後者のselect intoを使った書き方は冒頭のものより良いかもしれません。ネストして順番が上下するのは複雑さの現れになりますから。では、メソッド構文で書くと?

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale)) // クッキーをラップしたもの
    .SelectMany(crawler => crawler.GetPlayerInfo())
    .SelectMany(playerInfo => crawler.GetAchievement(playerInfo.GameInfo.TitleId));
    // と、書きたいのですが、SelectManyでplayerInfoに変換されているのでcrawlerはスコープ外でこうは書けない!

SelectManyは、前方の値が使えません。GetLoginCookieのように前の値は一切使わないのならば何ら問題ないのですが、GetAchievement(実績取得)はクッキーとGetPlayerInfoで得られたタイトルIDの"両方"が必要。この両方、というシチュエーションの時に、上から流れて変形するだけのSelectManyだと非常に、書きづらい。ではメソッド構文ではどうするか、というと、クエリ構文のように考える。

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale))
    .SelectMany(crawler => crawler.GetPlayerInfo().Select(playerInfo =>
        new { crawler, playerInfo })
    .SelectMany(a => a.crawler.GetAchievement(a.playerInfo.GameInfo.TitleId)));

下側のフローに二つ以上の値を持ち越したいのならば、二つ以上の値を格納した匿名型を作ればいいぢゃない。ということですね。クエリ構文の実態もこうなっています。正直、メソッド構文でこれを書くのは二つだけならまだいいですが、幾つも繋いでいくとなると相当汚くなるので、これはもう素直にクエリ構文の出番だな、と私は思いました。

奇怪なるクエリ構文の実態については多重 from の展開結果 - ++C++; // 管理人の日記で書かれています。って、よくみるとTrackbackに昔の私が送ってますね、ふむふむ、Linq to ObjectsのJavaScript移植であるlinq.jsで書くとこうなるようですよ?

// クエリ
var apart = Enumerable.Range(1, 5);
var query = apart
   .SelectMany(function(baker){ return apart
   .SelectMany(function(cooper){ return apart
   .SelectMany(function(fletcher){ return apart
   .SelectMany(function(miller){ return apart
   .Select(function(smith){ return {
       baker: baker, cooper: cooper, fletcher: fletcher, miller: miller, smith: smith}})})})})})
   .Where("Enumerable.From($).Distinct('$.Value').Count() == 5")
   .Where("$.baker != 5")
   .Where("$.cooper != 1")
   .Where("$.fletcher != 1 && $.fletcher != 5")
   .Where("$.miller > $.cooper")
   .Where("Math.abs($.smith - $.fletcher) != 1")
   .Where("Math.abs($.fletcher - $.cooper) != 1");

// 出力
var result = Enumerable.From(query.Single()) // 答えは一つなのでSingle,そしてobjectをKeyValuePair[]に分解
   .OrderBy("$.Value")
   .ToString(", ", "$.Key + ':' + $.Value"); // シーケンスを文字列に結合

alert(result); // smith:1, cooper:2, baker:3, fletcher:4, miller:5

linq.jsは"完全な"移植なので、.NETで動くコードが動かない、ということはありません。JavaScriptにもLinqの革命を!と、宣伝はさておき、SelectManyで下方に値を持ち出しているのに、毎回匿名型を作っていません。SelectManyのresultSelectorの部分でネストさせることで、値を下の方まで持ち運ぶ事が可能です。これはJavaScriptだけでなくC#でのLinq/Rxでも有効です。一つのパターンというかテクニックというか。C#ではわざわざこんなことやるぐらいならクエリ構文使えという気がしますが、クエリ構文の使えないReactive Extensions for JavaScript(RxJS)では活きるかもしれません。

ただ、この手のインデントの細工による対策は、IDEの自動整形と相性悪いんですよね。上のSelectManyのものも、コードフォーマッタにかけるとボロボロに崩れ去ります。基本的には私は、インデントは機械任せでやるべき。人が調整するものではないし調整してはならない。と思っているので、残念ながら上記テクニックは使うことはないかもなー、いや、どうだろう、積極的にはやらないという程度かしら。

The Future of C#(async/await)

つい数日前に開催されたPDC2010で、C#設計者のAnders Hejlsbergが、The Future of C#と題してC#5.0(とは言ってませんが、恐らくそうなる)のFeatureを語りました。一つは今まで言われていたCompiler as a Service。もう一つが、asynchronusの言語統合。C# 2.0でのyield returnによるIEnumerable生成のようなコンパイル時生成で非同期をコードの見た目上、完全に同期のように扱うことが出来ます。

Asynchronous Programming for C# and Visual BasicにてVisual Studio Async CTPが出ていて既に試してみることが可能なので(要:英語版VS2010)、実際にXboxInfoTwitCoreをasync/awaitで書き直してみました。全体はbitbucketのリポジトリ上にあるので興味あればそちらもどうぞ。以下は、ログインクッキーを取得する部分。

private async static Task<WebResponse> UploadValuesAsync(WebRequest request, IDictionary<string, string> parameters)
{
    var bytes = parameters.ToQueryParameter().Pipe(Encoding.UTF8.GetBytes);
    using (var stream = await request.GetRequestStreamAsync())
    {
        await stream.WriteAsync(bytes, 0, bytes.Length);
    }
    return await request.GetResponseAsync();
}

public async Task<CookieCollection> GetLoginCookie(string url)
{
    var res = await CreateWebRequest(url).GetResponseAsync();
    var prepare = res.TransformResponseHtml(xml => new
    {
        Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
        PostUrl = GetPostUrl(xml),
        PPFT = GetInputValue(xml, "PPFT"),
        PPSX = GetInputValue(xml, "PPSX")
    });

    var req2 = CreateWebRequest(prepare.PostUrl, MethodType.Post, prepare.Cookie);
    var res2 = await UploadValuesAsync(req2, new Dictionary<string, string>
    {
        {"LoginOptions", "3"},
        {"PPFT", prepare.PPFT},
        {"PPSX", prepare.PPSX},
        {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
        {"login", loginId},
        {"passwd", password}
    });

    var prepare2 = res2.TransformResponseHtml(xml => new
    {
        PostUrl = GetPostUrl(xml),
        Anon = GetInputValue(xml, "ANON"),
        T = GetInputValue(xml, "t")
    });

    var req3 = CreateWebRequest(prepare2.PostUrl, MethodType.Post);
    req3.AllowAutoRedirect = false;

    var res3 = await UploadValuesAsync(req3, new Dictionary<string, string>
    {
        {"ANON",prepare2.Anon},
        {"t",Uri.EscapeDataString(prepare2.T)}
    });

    return ConstructCookie(res3.Headers["Set-Cookie"]);
}

まあ見事にベッタベタに書かれているのが良く見えますねえ。このasync/awaitですが、非同期として扱うものにasyncを宣言したメソッドを用意。あとは同期のように書くだけ。yield returnのかわりにawait + XxxAsync と書くだけ。それで、まるで同期のように書けてしまいます。あまりにもお手軽。魔法のように。Silverlightのネック(そして初心者キラー)なところは、分かりづらい非同期でしたが、ウルトラC的に解決……。

この発表を見た時は普通にショックで、更に一日経って冷静に考えると更にショックで寝こむ勢いでしたね(笑) 正直なところ、被ります。単純な非同期の簡易化という点だけで考えればめっちゃ被ります。それはTaskとRxが被るよねー、Rxのほうが書きやすいっすよー、とか少し前に言っていた程度にはモロ被ります。実際問題Taskはあまり使いやすいとは言えないのですが、コンパイラサポートがガッツシ来てしまったら、それは話は別ですな。

勿論、RxのメリットはIEnumerable<T>や、イベントやタイマーなどの他のIObservable<T>とのシームレスな連携にもあるので、非同期を利用するシーンにおいてasync/awaitが完全に置き換えるもの、とは言いませんけれど。そして、私はそういった統合っぷりに魅力を感じているのでasync/awaitはあまり使わないかなー、という気がしてなくはないのですが、ちょっとその辺、判断が難しい。

さて、ところで両者を置き換えるのは非常に簡単です。戻り値がTask<T>かIObservable<T>か。RxではSelectManyだった位置にawaitを置く。それだけです。これは覚えておいて損はないかもです。そういう意味でも普通にかぶってるよな、とか思いつつ。awaitが分かればRx-SelectManyも簡単だし、その逆もまた然り。

SilverlightとWindows Phone 7

Silverlightは同期APIがないので、Rxで組むことで互換性が狙えます。このXboxInfoTwitCoreはフル非同期・フルRxなので、Silverlight/Windows Phone 7にも対応させ、られませんでした!えー。CookieExceptionが発生したり取れる最終的なCookieが何か違ったりと、Cookieが鬼門すぎて無理でした、とほほほ。マジワケワカラン。ええと、一応はWPF-Silverlight-WP7で完全なコード共有を実現する、という点もRxの強力な武器であり、使う理由になるとは思っています。それを示したかったのですが、うーむ。完全に同一なファイルで非同期としての挙動は問題なく取れているのですが、実際にデータが出せないとねえ……。カッコワルイ。

SgmlReader for Silverlight/Windows Phone 7

XboxInfoTwitCoreではHtml to Xml変換にSgmlReaderを使用しています。以前に紹介を書きましたが、これは本当に重宝します。ストリームとXElement.Loadの間に置くことで、不正なXMLである(ためXElement.Loadに渡すと例外が発声する)ネット上のHTMLをLinq to Xmlで扱えるようになります。しかし、元のコードベースが古いのと開発が活況とは言い難い状況であるために、SilverlightやWindows Phone 7に対応していません。大変困った。困ったので、Silverlight/Windows Phone 7で動くように少々コード弄ったところ、問題なく動いた!ので、こちらもbitbucketにあげときました。

neuecc / SgmlReader.SL / overview – Bitbucket

基本的にはデスクトップ版と全く同じ感覚で使えます。

var req = WebRequest.Create("http://google.com/");
req.BeginGetResponse(ar =>
{
    var res = req.EndGetResponse(ar);
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    using (var sgmlReader = new SgmlReader { DocType = "HTML", CaseFolding = CaseFolding.ToLower, InputStream = sr })
    {
        var xml = XElement.Load(sgmlReader); // これ。
        Dispatcher.BeginInvoke(() => MessageBox.Show(xml.ToString()));
    }
}, null);

プロパティにURLを渡すと例外で落ちます(同期通信は許可されていません!)。というわけで、WebRequestなどから非同期でウェブからStreamを取り出して、それをInputStreamプロパティに渡すという形を取ってください。この制限は、SilverlightではXElement.Load("url")が許可されていないのと同じことです。

XboxInfoTwitの今後

ここまで見てくれた人がXboxInfoTwit本体の利用者とは思えません!のですが、とりあえず書きますと、コア部分は書き換わったので、あとはGUIというか設定画面を複数言語に対応させてCodePlexで公開。を目指しています。が、いかんせんGUIは難敵です……。今のXboxInfoTwitも一応WPFなんですが見よう見まねで組んだXAMLで汚いので、きっちりと綺麗に書き換えたい、のですが、それをやるにはあまりにもWPF/XAMLの知識がなさすぎる。そのため、今日明日でフル改装で公開!というわけにもいきません。もう少し時間がかかりそうです。

Twitterへの投稿に関しては、以前書いたReactiveOAuth(完全なRxベースのOAuthライブラリ)があるので、シームレスに統合出来そう(こちらも幾つか課題が溜まっているので更新しないと……)。あとは、今使ってる自動アップデートプログラムがタコな出来なので、これも作り直したいなあ。なんて思っていたり。

まとめ

ふつーの非同期で書いてネストしまくりで、こんなの書いてられないだろほれみろバーカバーカ、とか言ってみたかったのですが気力がなくてそれは断念。ともあれ、今ある非同期の厳しさへの現実解として、Rxはアリ、です。非同期処理が、かなり綺麗にLinqクエリとして溶け込む様は分かるのではないかと思います。C#5.0のasync/awaitと比較してみても、awaitの位置にSelectMany、というだけの話ですしね。そして何よりも、async/awaitは今日明日に出るわけじゃないのです! Visual Studio 2005 -> 2008 -> 2010のペースで考えるならば2013年。まだ2年も先です。

async/awaitがないから「しょうがなく」使う、ってスタンスもアレゲ。いえいえ、喜んで使うのです。Rxならではのメリットが、当然あります。ただの非同期処理の一本化というだけに終わらず、イベントやタイマーなど他のソースとの融合は魅力です。そう、そもそも、Rxの持つ側面は3つ。「Asynchronus」「Event」「Pull to Push」。ここ最近はAsynchronusのことしか書いてなかったですが、他の2つのほうも見逃せないというか、むしろRxは最初はLinq to Eventsとして紹介されていたぐらいだしEventのほうがメインですよ?

ところで話は全く変わりますが、Bitbucket使い始めました。バージョン管理は当然Mercurialです。Mercurialは非常にいいです。もうSubversionは使う気になれない。Team Foundation Serverは使ったことないので知らないけれど(あれはオールインワンなところに価値がある気がしつつ、まあ、重量級ですよね)Mercurialのサクサク感は、バージョン管理はしっくりこないんです!とかほざいていた私にガツーンとバージョン管理がないともう生きていけない!ちょっとしたプロジェクトもhg initするし!な心構えに変えさせる威力がありました。あとちょっとした変更でもローカルでコミット、な素敵さとか。

TortoiseHgは、若干UIの導線がイケてないとは思いつつ、まあまあ使いやすく(日本語化もデフォルトでされています←要環境変数にLANG=ja)、Visual Studio拡張のVisualHGも素敵な使い勝手。というか、UI部分はTortoiseHgを呼び出しているだけというシンプルさがいい。私はEclipse下ではMercurial Eclipseを使ってますが、こちらは独自にがっつし作りこんであって、それが今一つな使い勝手で。私はVisual Hgの方向性を支持します。

最後に、毎回嘘っぱちな次回予告ですが(SelectManyの解説をすると言い続けて半年以上経過)、ポーリングをイベントとしてRx化してPushで共通化とか何とかかんとか、を予定。引き続き題材はXboxInfoほげほげを使うつもりです。乞うご期待しない。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive