Reactive Extensions用のWebRequest拡張メソッド

WebClientは楽ちんです。WebRequestはシンドイです。そのシンドさといったら、FromAsyncPatternでラップした程度じゃあまり意味がなかったりなわけです。いえ、単純なダウンロード程度ならいいのです。でも、アップロードとか!プログレスとか!そんなのに対応しようとすると、やっぱどうしょうもなく面倒臭い。とはいえ、面倒くさいのも一度書いてしまえば済むわけなので、一通り使いそうなものを書いてみました。Reactive Extensionsが動く環境(.NET 4 Client Profile, Silverlight4)と、Windows Phone 7環境では標準搭載のMicrosoft.Phone.Reactiveで確認取ってあります。

ソースコード、の前に利用例のほうを。

// DownloadStringAsyncメソッドで非同期読み込みも楽チン
var req1 = WebRequest.Create("http://www.twitter.com/statuses/public_timeline.json");
req1.DownloadStringAsync().Subscribe(Console.WriteLine);
 
// UploadValuesAsyncで非同期POSTも楽チン
// この例はgoo.gl短縮URLにPOSTして結果のJSONを取得するというもの
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsync(new Dictionary<string, string>
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    })
    .SelectMany(res => res.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

実に簡単に、「WebClientの同期のように」WebRequestで非同期が扱えます。WebClientと同じ感覚で扱える、というのを大事にするためにも、メソッド名は同じにしてあります(但しUpload系の戻り値はWebResponseとしました、それが最もRx的に制御しやすいと感じたので)。名前末尾にAsyncを付けていますが、この命名規則はAsyncCTPから。

WebRequestでポストする方法のおさらい。GetRequestStreamでRequestStreamを取り出し、それにPOST内容を書きこんでStreamを閉じたら、今度はGetResponseでWebResponseを取得、そのResponseから GetResponseStreamでResponseStreamを取り出し、結果を読み込む。つまりは request.BeginGetRequestStream -> stream.BeginWrite -> request.BeginGetResponse -> stream.BeginRead。ふつーに書いたら人間の扱えるネスト量ではありませんね!

というわけで、streamへのWrite/Readは隠蔽し、単純にアップロードしたら長さ1のReactiveシーケンスが、ダウンロードしたら長さ1のReactiveシーケンスが戻ってくる。という形で簡単に扱えるようにしました。非同期が難しいとか面倒くさいとか、過去の話でしたね!

プログレス表示

プログレス(進捗表示)は大事です。WebClientにもProgressChangedイベントがあるし、せっかく非同期でやってるのだからプログレスも扱いたい。というわけで、それ関連のメソッドも作りました。プログレスは***WithProgressというメソッドを使うと、戻り値が IObservable<Progress<T>>になっていて、このProgressクラスにはValue:値, CurrentLength:現在の長さ, TotalLength:全体の長さ, Percentage:パーセント、の読み取り専用プロパティが入ってます。

// DownloadDataAsyncWithProgressメソッドで進捗を出しながら非同期に画像をダウンロード(して最後に保存する)
var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg")
    .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K
    .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示
    .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換
    .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存
 
// UploadValuesAsyncWithProgressで非同期アップロードの進捗表示もスムーズに
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsyncWithProgress(new Dictionary<string, string> 
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    }, 10) // 分割サイズ、あまりに小さいのは良くない、本当は画像とか大きなファイルでやるべきなのですが適当なPOST先が見つからなくて...
    .Do(p => Console.WriteLine("Up: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // アップロード進捗表示
    .TakeLast(1) // アップロード完了(=最後の1個が通過)までスルー
    .SelectMany(_ => req2.DownloadDataAsyncWithProgress(10)) // 戻り値がProgress<Unit>なので、WebRequestからレスポンス取得
    .Do(p => Console.WriteLine("Down: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // ダウンロード進捗表示
    .Aggregate(new List<byte>(), (l, p) => { l.AddRange(p.Value); return l; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => Encoding.UTF8.GetString(l.ToArray())) // 応答を文字列(JSON)に変換
    .Subscribe(Console.WriteLine);

基本的には非プログレスと同じなのですが、長さ1ではなくて進捗状況に応じた分だけ値が流れてくるという違いがあります。また、書き込み時はUnit、読み込み時はbyte[]が来るので、それを一本にまとめる必要があります。というわけで、若干の手間とクセがあるのですが、まあそれなりに平易に扱えるのではと思います。UploadProgressとDownloadProgressを連結する辺りには、Rxの合成のパワーが見えるのではないでしょうかどうでしょうか。パワーが見えるのはいいとして意味が伝わるかは微妙なところなので、以下図解。

まずはResponseStreamに細切れにWriteし、進捗状況を垂れ流します。Writeなので戻り値はない。ので、Unit(voidみたいなものだと思ってくだしあ)。そしてSelectManyでWriteからReadへ。そう、RxにおいてのSelectManyは、1対多とかフラットにするとかというよりも、イメージとしては「摩り替える」だと思っています。マウスダウンしたと思っていたらマウスムーブに摩り替わっていた、何を (ry。といったように。今回のコードだと、Writeしていたと思ったらいつのまにかReadになっていた、何を言っているのか以下略。

Writeの戻り値にそのままSelectManyを繋げてはいけません。Readを始めるのはWriteが「終わってから」でなければならない。そこで出番なのがTakeLast(1)。意味はそのまんまで、最後の1個を通すというもの。対としてTake(1) - 最初の1個だけ通す、もよく使いますね。

Readもまた細切れにbyte[]で値が読み込まれ送られてきますが、文字列に変換するにせよ何にせよ、 IObservable<byte[]>からbyte[]にしたい。ここでIEnumerable<byte[]>だったら SelectMany.ToArray って感じですが、Observableではそうもいかないので、ここはAggregateを使ってリストに値を詰めてみました。

最後は文字列に変換して、煮るなり焼くなり好きにどうぞ。ブロックはいっぱいありますが、Subscribeに届くのは最後の、集計が全て終わった一つだけというわけでした。なお、進捗はDoメソッドで随時、その箇所に値が流れるたびに画面に出力しています。一見メソッドチェーンだらけで複雑そうですが、順を追ってみてみれば、メソッド全てが、メソッド名通りの意味を明確に持った挙動を取るのと、それぞれのメソッド自体はお馴染みのLinqの挙動そのものなので、Rxの導入までの学習コストというのは存外低いかもしれません(導入を超えた後の敷居に関してはノーコメント)。

あと、このようにブロックが細切れになっているわけですが、これによりキャンセルが容易になるという性質を持っています。キャンセルは Subscribeの戻り値(IDisposable)のDisposeを呼ぶだけで済むのですが、図のとおりに分割されているため、簡単にブロック間に割って入って処理を止めることが可能です。この、処理単位がIObservableとして分割されていることによるキャンセルの容易さは、わざわざ CancellationTokenをチェックしたり(Task)、CancellationPendingをチェックしたり (BackgroundWorker)を処理の途中に挟みこむ必要がないという、Rxの大きな利点となっています。この辺の比較などは次回にでも。

なお、プログレスや細かい単位でのキャンセルが必要なければ、冒頭の例のようにWithProgerss抜きの方のメソッドを使えば、通常の非同期と同じく長さ1のReactiveシーケンスとして、Aggregateとかの処理も拡張メソッド側で全部やってくれますのでお手軽に使えます。

ソースコード

以下ソースコード。ご利用はご自由にのパブリックライセンスで。.NET 4 Client Profile, Silverlight4, Windows Phone 7環境下で動くのを確認してます。名前空間としてusing AsynchronousExtensions することで、拡張メソッド群が使えるようになります。そんなわけでお薦めのファイル名はAsynchronousExtensions.cs。ソースコード一つというわけで、気に入らない箇所がありましたら、勿論当然直接書き換えてしまえばOKです。バグがあれば教えてください。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
#endif
 
namespace AsynchronousExtensions
{
    internal static class Progress
    {
        public static Progress<T> Create<T>(T value, double currentLength, double totalLength)
        {
            return new Progress<T>(value, currentLength, totalLength);
        }
    }
 
    internal class Progress<T>
    {
        public T Value { get; private set; }
        public double TotalLength { get; private set; }
        public double CurrentLength { get; private set; }
        public int Percentage
        {
            get
            {
                return (TotalLength <= 0 || CurrentLength <= 0)
                    ? 0
                    : (int)((CurrentLength / TotalLength) * 100);
            }
        }
 
        public Progress(T value, double currentLength, double totalLength)
        {
            Value = value;
            TotalLength = totalLength;
            CurrentLength = currentLength;
        }
    }
 
    internal static class WebRequestExtensions
    {
        public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<WebResponse>(observer =>
            {
                var disposable = new BooleanDisposable();
 
                Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar =>
                {
                    var res = request.EndGetResponse(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);
 
                return disposable;
            });
        }
 
        public static IObservable<Stream> GetRequestStreamAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<Stream>(observer =>
            {
                var disposable = new BooleanDisposable();
 
                Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, ar =>
                {
                    var res = request.EndGetRequestStream(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);
 
                return disposable;
            });
        }
 
        public static IObservable<byte[]> DownloadDataAsync(this WebRequest request)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsync());
        }
 
        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize));
        }
 
        public static IObservable<string> DownloadStringAsync(this WebRequest request)
        {
            return DownloadStringAsync(request, Encoding.UTF8);
        }
 
        public static IObservable<string> DownloadStringAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringAsync(encoding));
        }
 
        public static IObservable<string> DownloadStringLineAsync(this WebRequest request)
        {
            return DownloadStringLineAsync(request, Encoding.UTF8);
        }
 
        public static IObservable<string> DownloadStringLineAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringLineAsync(encoding));
        }
 
        public static IObservable<WebResponse> UploadStringAsync(this WebRequest request, string data)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsync(bytes);
        }
 
        public static IObservable<Progress<Unit>> UploadStringAsyncWithProgress(this WebRequest request, string data, int chunkSize = 65536)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }
 
        public static IObservable<WebResponse> UploadValuesAsync(this WebRequest request, IDictionary<string, string> parameters)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);
 
            return request.UploadDataAsync(bytes);
        }
 
        public static IObservable<Progress<Unit>> UploadValuesAsyncWithProgress(this WebRequest request, IDictionary<string, string> parameters, int chunkSize = 65536)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);
 
            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }
 
        public static IObservable<WebResponse> UploadDataAsync(this WebRequest request, byte[] data)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsObservable(data, 0, data.Length).Finally(() => stream.Close()))
                .TakeLast(1)
                .SelectMany(_ => request.GetResponseAsObservable());
        }
 
        public static IObservable<Progress<Unit>> UploadDataAsyncWithProgress(this WebRequest request, byte[] data, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsync(data, chunkSize))
                .Scan(0, (i, _) => i + 1)
                .Select(i =>
                {
                    var currentLength = i * chunkSize;
                    if (currentLength > data.Length) currentLength = data.Length;
                    return Progress.Create(new Unit(), currentLength, data.Length);
                });
        }
    }
 
    internal static class WebResponseExtensions
    {
        public static IObservable<byte[]> DownloadDataAsync(this WebResponse response)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync())
                .Finally(() => response.Close())
                .Aggregate(new List<byte>(), (list, bytes) => { list.AddRange(bytes); return list; })
                .Select(x => x.ToArray());
        }
 
        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize))
                .Finally(() => response.Close())
                .Scan(Progress.Create(new byte[0], 0, 0),
                    (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength));
        }
 
        public static IObservable<string> DownloadStringAsync(this WebResponse response)
        {
            return DownloadStringAsync(response, Encoding.UTF8);
        }
 
        public static IObservable<string> DownloadStringAsync(this WebResponse response, Encoding encoding)
        {
            return response.DownloadDataAsync().Select(x => encoding.GetString(x, 0, x.Length));
        }
 
        public static IObservable<string> DownloadStringLineAsync(this WebResponse response)
        {
            return DownloadStringLineAsync(response, Encoding.UTF8);
        }
 
        public static IObservable<string> DownloadStringLineAsync(this WebResponse response, Encoding encoding)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadLineAsync(encoding))
                .Finally(() => response.Close());
        }
    }
 
    internal static class StreamExtensions
    {
        public static IObservable<Unit> WriteAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern((ac, o) => stream.BeginWrite(buffer, offset, count, ac, o), stream.EndWrite)();
        }
 
        public static IObservable<int> ReadAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern<int>((ac, o) => stream.BeginRead(buffer, offset, count, ac, o), stream.EndRead)();
        }
 
        public static IObservable<Unit> WriteAsync(this Stream stream, string data)
        {
            return WriteAsync(stream, data, Encoding.UTF8);
        }
 
        public static IObservable<Unit> WriteAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, encoding.GetBytes(data));
        }
 
        public static IObservable<Unit> WriteAsync(this Stream stream, IEnumerable<byte> data, int chunkSize = 65536)
        {
            return WriteAsync(stream, data.ToObservable(), chunkSize);
        }
 
        public static IObservable<Unit> WriteAsync(this Stream stream, IObservable<byte> data, int chunkSize = 65536)
        {
            return Observable.Defer(() => data)
                .Buffer(chunkSize)
                .SelectMany(l => stream.WriteAsObservable(l.ToArray(), 0, l.Count))
                .Finally(() => stream.Close());
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, data + Environment.NewLine, encoding);
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data, Encoding encoding)
        {
            return WriteLineAsync(stream, data.ToObservable(), encoding);
        }
 
        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data, Encoding encoding)
        {
            return WriteAsync(stream, data.SelectMany(s => encoding.GetBytes(s + Environment.NewLine)));
        }
 
        public static IObservable<byte[]> ReadAsync(this Stream stream, int chunkSize = 65536)
        {
            return Observable.Defer(() => Observable.Return(new byte[chunkSize], Scheduler.CurrentThread))
                .SelectMany(buffer => stream.ReadAsObservable(buffer, 0, chunkSize),
                    (buffer, readCount) => new { buffer, readCount })
                .Repeat()
                .TakeWhile(a => a.readCount != 0)
                .Select(a =>
                {
                    if (a.readCount == chunkSize) return a.buffer;
 
                    var newBuffer = new byte[a.readCount];
                    Array.Copy(a.buffer, newBuffer, a.readCount);
                    return newBuffer;
                })
                .Finally(() => stream.Close());
        }
 
        public static IObservable<string> ReadLineAsync(this Stream stream, int chunkSize = 65536)
        {
            return ReadLineAsync(stream, Encoding.UTF8, chunkSize);
        }
 
        public static IObservable<string> ReadLineAsync(this Stream stream, Encoding encoding, int chunkSize = 65536)
        {
            return ObservableForCompatible.Create<string>(observer =>
            {
                var decoder = encoding.GetDecoder();
                var bom = encoding.GetChars(encoding.GetPreamble()).FirstOrDefault();
                var sb = new StringBuilder();
                var prev = default(char);
 
                return stream.ReadAsync(chunkSize)
                    .SelectMany(bytes =>
                    {
                        var charBuffer = new char[encoding.GetMaxCharCount(bytes.Length)];
                        var count = decoder.GetChars(bytes, 0, bytes.Length, charBuffer, 0);
                        return charBuffer.Take(count);
                    })
                    .Subscribe(
                        c =>
                        {
                            if (c == bom) { } // skip bom
                            else if (prev == '\r' && c == '\n') { } // when \r\n do nothing
                            else if (c == '\r' || c == '\n')   // reach at EndOfLine
                            {
                                var str = sb.ToString();
                                sb.Length = 0;
                                observer.OnNext(str);
                            }
                            else sb.Append(c); // normally char
 
                            prev = c;
                        },
                        observer.OnError,
                        () =>
                        {
                            var str = sb.ToString();
                            if (str != "") observer.OnNext(str);
                            observer.OnCompleted();
                        });
            });
        }
    }
 
    internal static class ObservableForCompatible
    {
#if WINDOWS_PHONE
        public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
        {
            return source.BufferWithCount(count);
        }
#endif
 
        public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
        {
#if WINDOWS_PHONE
            return Observable.CreateWithDisposable(subscribe);
#else
            return Observable.Create(subscribe);
#endif
        }
    }
}

Progress用のクラスと、WebRequest, WebResponse, Streamに対する拡張メソッドです。各メソッドは大体数行でローカル変数もほとんどないコンパクトなものですが(Rxの強力さに全力で乗っかってるだけ)、コード自体はあまり追いやすくはないかもです。まあ、面倒くさい部分は拡張メソッド側で隠蔽してやれるならそれで良いと思ってます。利用側はWebClientを同期で使うように、簡単にWebRequestが非同期で扱えるんじゃないかなー、と思いますがどうでしょう。そしてついでなので多めに作ってしまった……。

StreamのReadAsync、WriteAsyncですが、それぞれ一度の読み込み/書き込みサイズ指定(デフォルトは64K)で、ストリーミングで読み書きをするようになっています。特にWriteAsyncで要求するbyteはIEunmerable/IObservableですから。つまり、「ストリーミングで読みながら」「ストリーミングで書きこむ」ことが出来るという、メモリに非常に優しいプログラミングが可能になっています。(まあ、読み込みが速くて書き込みが遅い場合は、読み込みだけどんどん進んで、どこかで溜め込まれてしまうわけですけどー、WriteされたらReadが始まる、みたいな仕組み作れないかなー、とは思いつつ方法分からない)

ところでしかし、Rxも素のままではなく、拡張補助ライブラリ的なのを用意するといいのかもねえ。Achiralのように。T4によるFromEventの自動生成などと一緒にまとめて、Rx Supplemental Library。うーん。Rx利用の俺々MVVMライブラリ(を、いつか作りたい、今はまだMVVM自体がヨクワカッテナイのですが)と一緒に、そのうちにでも。

まとめ

WPF, SL, WP7で全く同じコードが動くって素敵。Rx素晴らしい。んで、もしかしてRxって面倒くさいの……? と思ったとしたら、いえ、違います。面倒くさいのはWebRequestでプログレスやアップロード処理を作るのが面倒くさいだけです。上のコードがゲップでそうな感じであるとしたら、Rxが面倒なせいではなくて、WebRequestが面倒なせいなだけです。これRx抜きに同期で書いても面倒臭いです。それどころか、一層大変なことになっていました。むしろRxの上に乗っかっているからこそ、色々な演算子が使えて、コンパクトに書けたのではないかと思われます。また、プログレスや柔軟なキャンセルなど、「クライアントアプリならスレッド余ってるし同期的に書いてThreadPoolに突っ込んでも問題ないし楽っしょー」といった次元を超えた価値をReactive Extensionsは提供できているのではないでしょうか。

プログレスは大事。何が大事って、最近Windows Phone 7を輸入して電波法違反、じゃなくて、ええと、まあ、b-mobile回線で使っているのですけど、死ぬほど遅い。b-mobile遅い。MAX 300kbpsと謳っていて、それも遅いわけだけど実測だと100kbpsで大変遅い。なので、TwitterのXML引っ張ってくる程度であっても(モバイル回線は貧弱なのでXMLじゃなくてJSONがいいですねえ)、何%といった表示は欲しかったりなのです。さすがにそれはやり過ぎだとしても、Twitpicなどにカメラ画像を上げる時などではもう必須と言ってもいいぐらい。

でも、WebRequest使うとプログレスはご覧のように面倒臭いので、どうしてもサポート出来なかったりなのですよね。WebClient使えよってのは正論なのですが、諸事情あったりでWebRequest使いたいって場合もあるでしょう(OAuthとかあるしね)。もうひとつは、Reactive Extensionsに載せるならイベントベースになっているWebClientよりもWebRequestのほうが使いよいということもあり。

そんなジレンマの日々も今日でサヨナラです。Rx + WebRequestでConsoleでも、WPFでも、Silverlightでも、Windows Phone 7でも、幸せな非同期生活を送りましょう。ところで今回のプログレスの話はasync/awaitやBackgroundWorkerと絡めてお話したかったのですが、長くなるので断念。これは次回に(いや、次々回かもしれませんが)必ず書きます!もう既に半分ぐらいサンプルコードとかも書いてはあるので、絶対に近いうちには。

なお、今回の記事はRxTeamのJeffery Van GoghのブログシリーズRx on the serverが下敷きになってます。そして、Rx on the serverで紹介されているコードは、最新のRxのリリースに含まれているSystem.Reactive.ClientProfileに収録されています。が、このPart2で紹介されているAsyncReadLines(非同期でのStreamから一行毎にString取り出し)は簡易的すぎて使い物になりません。2バイト圏無視してるし、1バイトであっても挙動は怪しい。サンプルレベルなら良いと思うし記事は素晴らしく参考になったのですが、本体に収録/配布はやめて欲しかったなあ、少しがっかり。

でもAsyncReadLines自体は欲しいですねえ。今のところ一行毎に取り出すにはStreamReaderしかないのだけれど、非同期APIがないので。非同期読み込み可能なStreamReaderは、何故かSystem.DiagnosticsにAsyncStreamReaderとしてあったりするのですが、internalクラスなので外からでは使えません。というわけで、非同期で一行毎読み込みをやるには、自前実装しか無いようで。うーん、やだなー。私はゆとりゆるふわプログラマなので、バイトとかエンコーディングとかが絡むのはやりたくないなー。というか絶対ミスするでする。

※追記@12/03, やっぱり欲しいと思ったのでReadLineAsyncとして実装しました。↑ソースコードにも反映させてあります。

※追記@2011/8/29, .NET版とWP7同梱版との互換性をもたせました。また、リソースの扱いを正確にしました。

※追記@2011/10/15, ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリの一部として、より機能向上させライブラリとしてまとめあげたものを公開しました。↑ソースコードをそのまま使うより、ReactivePropertyを参照するほうをお薦めします。

余談:Windows Phone 7

そういえば、WP7ですか?中々良いですよ、日本語さえまともならば。それを抜きにしても、正直なところ想像以上に自由の効かないガチガチ縛りなので、Androiderな方々は絶対気に入らないと思っています。まあでも、私は好きですねえ。初版なのでダメなところが目立つのはしょうがないし、徐々に改善される(といいなあ!)でしょう。良いところに目を向ければ、大変可能性を感じるOSに仕上がってると思います。

ちなみにWP7の開発環境に望むのはJSONサポート何とかしる!ってことですかね。標準だとシリアライザしか入ってないんですよ、ふざけんな死ね。System.Json入れるかJsonReaderWriterFactory入れるかしろって話ですよ本当に。あとdynamicのサポートが入ってくれればDynamicJsonを対応させるんだけどなあ。

今のところ実機にアプリケーションを転送できません。WP7は認証が必要なので(有料)。で、認証のための証明書の発行手続きはしたのですが、その発行を請け負っているGeoTrustという会社があまりにも仕事しないせいでうばばばば。しょうがないので、今やれることは基盤をがっつし固めることぐらいですはい。

Comment (11)

C#恐怖症 : (09/01 21:22)

Rxの情報が少なく、いつも楽しみに拝見させて頂いています。

WebRequest拡張メソッドですが、気になる部分があります。
キャンセル対応を行った為か、存在しないURL等を指定した場合に例外が発生しますが
これが上位にまで通知されずプログラムが落ちます。

neuecc : (09/01 22:55)

こちらこそ、ありがとうございます。
また、貴重な情報、大変助かります。

ただ、こちらで試したところ、問題なく伝達されている気がします。
例えばGoogleの404(WebExceptionを発生させる)では

var req = WebRequest.Create(”http://google.co.jp/404″);
req.Timeout = 1000;

req.GetResponseAsObservable()
.ObserveOn(new SynchronizationContext())
.Subscribe(x => Console.WriteLine(x)); // 通知されてくるので例外発生、onErrorを書けばCatchされる

Console.ReadLine(); // 非同期の待機のため

問題なく伝達が来たようです。
もしよければ、再現コードを頂けると嬉しいです。

一つ可能性があるのは、例外発生はEndGetResponse(ar)の際に発生しますが、
ここは非同期実行のためThreadPool上での発生となります。
そのため、これをUIスレッドにまで通達させるにはObserveOnが必要です(Catchするつもりなら不必要)

C#恐怖症 : (09/02 07:15)

さっそくの返信ありがとうございます。

こちらで記述したコードは拡張メソッドの例として書かれていたダウンロードとアップロードのURIや
ヘッダー等を書き換えただけになります。

>一つ可能性があるのは、例外発生はEndGetResponse(ar)の際に発生しますが、
>ここは非同期実行のためThreadPool上での発生となります。
>そのため、これをUIスレッドにまで通達させるにはObserveOnが必要です
>(Catchするつもりなら不必要)

まさしく、この状態だと思います。
ObserveOnを追加したところ、onErrorでCatchされました。
まだまだ勉強不足ですね。

重ね重ねの質問で申し訳ないのですが、TimeoutやキャンセルとしてDisposeした場合に
Subscribeでこの状態を取得できないものでしょうか?

C#恐怖症 : (09/02 09:33)

追記)
開発はWindowsPhone 7.1 RC(日本語)で行っていますが、あれから色々とやってみて分かった事を書かせて頂きます。

まず、WP7に付属しているMicrosoft.Phone.Reactive.dllではObserveOnを記述してもしなくても例外がキャッチできません。
先程の書き込みのようにObserveOnを記述して例外がキャッチできたのは、NuGetよりインストールしたRx-Main(System.Reactive)の方でした。

neuecc : (09/03 18:38)

ありがとうございます。
すみません、私の方でも試しましたが、問題ないように見えました。
私の例が悪かったのですが、WP7でも
.ObserveOn(new SynchronizationContext())
と記述しましたか?
ObserveOnの役割は指定したスレッドに値を渡すことですが、
この場合UIスレッドに渡したく、WPF/SLではUIスレッドはDispatcherが担うので
.ObserveOnDispatcher()
と記述していただければ、こちらで試した限りでは問題なく動きました。

もし、私の推測が的外れであれば、またお願いします。

> TimeoutやキャンセルとしてDisposeした場合にSubscribeでこの状態を取得できないものでしょうか?

伝達がキャンセルされたという形になるので、そのままでは伝えるのは難しいですね……。
少し汚く感じますが、外部変数で状態管理して、Finallyで捉えるとか、でしょうか。

var req = WebRequest.CreateHttp(”http://google.co.jp/”);

var isSuccess = false;
var subscription = req.DownloadStringAsync()
.Do(_ => isSuccess = true) // 正常に完了した場合はここを通る
.ObserveOnDispatcher()
.Finally(() =>
{
if (!isSuccess)
{
MessageBox.Show(”Cancel or Exception”);
}
})
.Subscribe(x => { }, ex => { }, () => { });

subscription.Dispose(); // 即座にキャンセル

上のサンプルはWP7のPhone.Reactiveで書きました。
.Dispose()の行を消して、URLをgoogle.co.jp/404に変えれば、
例外発生時でも伝達されること&処理を行えることが確認できると思います。

C#恐怖症 : (09/04 18:33)

何度も検証して頂いて申し訳ありません。
.ObserveOn(new SynchronizationContext())はもちろん記述致しました。

また、今回書いて頂きましたサンプルでもURLをgoogle.co.jp/404にした所、
やはり例外がキャッチできませんでした。
環境の問題なのかな?とも思いますので、一度環境をクリーンにしてみようかと思います。
ここ最近、頻繁にリリースされたPhone系のSDKを入れたり消したりしていたので・・・

neuecc : (09/04 19:58)

いえいえ、大丈夫です。
ちゃんと解決出来ればいいなあ、と思いますので是非最後まで!

> .ObserveOn(new SynchronizationContext())はもちろん記述致しました。
いや、むしろWP7ではそれはダメで、.ObserveOnDispatcher()にしましょう、ということですー。

> また、今回書いて頂きましたサンプルでもURLをgoogle.co.jp/404にした所、
> やはり例外がキャッチできませんでした。
あらら……。
細かい部分でコードにずれがあるかもしれませんので、もし良ければ
http://ideone.com/
にフルでコードを載せていただくと、こちらでも再現できるかもしれません。

一応、私の方では、
日本語の一番新しいSDK(7.1ですか)とMicrosoft.Phone.Reactiveで試しています。

mikiofuku : (10/19 19:02)

すみません。私も同じ現象で悩んでおります。プロジェクト一式をアップしましたので、時間がありましたら試して頂けないでしょうか。http://smart-pda.net/~mikio/misc/RxError.zip

neuecc : (10/20 00:21)

プロジェクトありがとうございます。
存在しないURLに送る(反応が帰ってこない)ものに対しては
規程のタイムアウト(10秒か20秒か、そのぐらいでしょうか)の時間待機している感じですね。
それだけ待てば、ちゃんと(?)エラーが伝達されてきているようには見えました。
なので、URLが例えば http://google.co.jp/404 なら即座に404を返すのですぐキャッチされますが
http://goooooooooooglglglge.co.jp/ のような存在しないものだと、タイムアウトの間うんともかんとも、となるのではかと。

WP7ではタイムアウトの時間の設定は不可能なようなので、規程のタイムアウトの時間だと長すぎる、という場合は
RxのTimeoutメソッドを使って、代替としてのタイムアウト例外を出すことが可能かと思います。

mikiofuku : (01/02 11:22)

あけましておめでとうございます。
お返事いただいていたのにもかかわらず遅くなってすみませんでした。
以下のコードでやはり例外をキャッチできませんでした。WP7.1(RTW日本語)です。
Subscribe 内部で例外キャッチできるかと思ったのですが、GetResponseAsObservable の EndGetResponse で例外が発生してしまいました。

string url = “http://localhost/”;
WebRequest.Create(url)
.GetResponseAsObservable()
.ObserveOnDispatcher()
.Subscribe(
s =>
{
System.Diagnostics.Debug.WriteLine(s);
},
e =>
{
System.Diagnostics.Debug.WriteLine(e.Message);
});

localhost には Web サーバはないので、The remote server returned an error: NotFound. が発生しています。
例外キャッチの方法間違ってますか?

mikiofuku : (01/04 16:45)

たびたび済みません。以下のようなコードにしたら例外をキャッチすることが出来ました。
とりあえずご報告まで。

string urlquery = “http://localhost/”;

var req = HttpWebRequest.CreateHttp(urlquery);
Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)()
.ObserveOnDispatcher()
.SelectMany(res =>
{
return res.DownloadDataAsyncWithProgress();
})
.Do(p =>
{
System.Diagnostics.Debug.WriteLine(”{0}/{1} - {2}%”, p.CurrentLength, p.TotalLength, p.Percentage);
})
.Subscribe(
s =>
{
System.Diagnostics.Debug.WriteLine(s);
},
e =>
{
System.Diagnostics.Debug.WriteLine(e.Message);
});

Name
WebSite(option)
Comment

Trackback(0) | http://neue.cc/2010/11/26_286.html/trackback

Search/Archive

Category

Profile


Yoshifumi Kawai
Microsoft MVP for Visual Studio and Development Technologies(C#)

April 2011
|
July 2018

Twitter:@neuecc
GitHub:neuecc
ils@neue.cc