Reactive Extensions + asyncによるC#5.0の非同期処理

Reactive Extensions(Rx)の利点ってなんですかー、というと、合成可能なんです!ということです。合成可能って何?というと、LINQが使えるということなんです!です。じゃあ他には、ということで…… 詳しくはこの動画/スライド見るといいです。 Curing Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012 | Channel 9。最初のほうの例が非常に分かりやすいので、とりあえずその部分だけ引っ張ってきますと

// sender, argsの型がふわふわ
exchange.StockTick += (sender, args) => // senderの型が消えてる
{
    if (args.Quote.Symbol == "MSFT")
    {
        // 合成できないからイベントの中でベタ書きしかない
    }
};
 
exchange.StockTick -= /* ラムダ式でイベント登録すると解除不能 */

これが通常のイベントの弱点です。Rxにすると

// <Quote>という型が生きてる
IObservable<Quote> stockQuotes = ...; // 変数に渡せる
 
// LINQクエリ演算子が使える
var msft = stockQuotes
    .Where(quote => quote.Symbol == "MSFT");
 
var subscription = msft.Subscribe(quote => /* */);
 
// イベント解除が容易
subscription.Dispose();

といった感じで、実に素晴らしい!じゃあEventはもうObsoleteでいいぢゃん、というと、まあいいと思うのですがそれはそれとして、C#ネイティブだからこそデザイナがイベントが大前提で考慮されていたり、軽くて実行速度が良かったり、といったところは勿論あります。あとRxだとイベントのIObservable化が面倒だとかもね。この辺は最初から言語サポートの効いてるF#のほうが強いんですよねー。

非同期のリトライ

Visual Studio 2012も、もうRCということで間近感が相当にあります。一方でReactive Extensions v2.0 Release Candidate available now!ということで、こちらも間近感があります。一度2.0使うと1.0には戻れないよ!(NuGetではRx-Main -Preで入れられます)

じゃあRx 2.0の紹介でもしますかー、というと、しません!(ぉぃ)。その前に、asyncとRxの関係性にケリをつけておきましょう。

で、asyncの非同期とRxの非同期はやっぱり使い分けフワフワッという感じ。複数の値が来るときはRxでー、とか言われても、そもそも複数っていうのがそんなにー、とか。あと、それ以外にないの?というと、Rxの合成の強さが非同期にも発揮してRetry処理とか柔軟でー、とか。確かにそれっぽい。けれど、どうもフワッとしてピンと来ないかもしれない。

ので、例を出していきましょう。まず、リトライ処理。リトライ処理を素の非同期で書くと泣きたくなりますが、C# 5.0を使えばasync/awaitで何も悩むことなくスッキリと!

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
    goto RETRY;
}
 
static void Main(string[] args)
{
    var google = DownloadStringAsyncWithRetry("http://google.com/404", 3);
    Console.WriteLine(google.Result);
}

簡単です。さて、ではこれをRxで書くと……

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    var req = WebRequest.CreateHttp(url);
 
    // retry処理は.Retryで済む
    using (var res = await req.GetResponseAsync().ToObservable().Retry(retryCount))
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

はい。別にRxとasyncは排他じゃありません。使って効果のあるところに差し込んで、Mixしてやれば、ただでさえ強力なasyncが更に強力になります。TaskとIObservableは変換可能なので、ToObservableして、あとはRetryメソッドを繋げるだけ。そしてIObservableはawait可能(LastAsyncと同じ効果で、最後の値を取る。非同期処理の場合は値が一つなので問題なし)なので、まんまawaitしてasyncと繋げてやればいい。素敵ですねー。

が、上のコードはちょっと間違ってます。どこが間違っているか分かりますか?

エラーの帰ってくるページ(google/404などは404エラーを返してくれるのでテストに楽←別に500にすれば500を返してくれるわけじゃなくて、ただたんに存在しないページだから404なだけで、別にどこでもいいです)を指定してFiddlerなどで観察すれば分かりますが、一回しかリクエスト飛ばしません。Retry(3)としても一回しか飛んでいません。ちゃんとRetryは3回しているのに。

どういうことかというと、GetResponseAsync()の時点でリクエストに失敗しているからです。失敗済みのリクエストに対しては、何回Retryしても失敗しか返しません。ここは本当にはまりやすくて注意所なので、よく気を付けてください!

解決策は、Retryで生成を毎回やり直すこと。Deferで包めばいいです。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount)
{
    // Retry時に毎回WebRequestを作り直す
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .Retry(retryCount);
 
    // retry処理は.Retryで済む
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

ちょっと罠があるしコードも増えてしまったけれど、それでも、まあ、まだ割といいかな、って感じでしょうか?

さて、リトライは即時じゃなくて一定間隔置いた後にリトライして欲しいってことが多いと思います。同期処理だとThread.Sleepで待っちゃうところですが、それはちょっとスレッド勿体ない。C# 5.0からはawait Task.Delayを使いましょう。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        var req = WebRequest.CreateHttp(url);
 
        using (var res = await req.GetResponseAsync())
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
            return await sr.ReadToEndAsync();
        }
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay); // これで待つ
    }
 
    goto RETRY;
}

以前のものにTask.Delayを足しただけで簡単です。わーお、素晴らしい、なかなか強力強烈です。ではRxは、というと、同じように遅延する演算子を足すだけ。Delay、ではダメでDelaySubscription(Rx 2.0から追加)を使います。

static async Task<string> DownloadStringAsyncWithRetry(string url, int retryCount, TimeSpan retryDelay)
{
    // DelaySubscriptionで遅延させる
    var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
        .DelaySubscription(retryDelay)
        .Retry(retryCount);
 
    using (var res = await asyncQuery)
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    {
        return await sr.ReadToEndAsync();
    }
}

できました!できました?いや、これだと初回リクエスト時にも遅延されちゃってて、ちょっとイケてない。修正しましょう。

// 外部変数用意するのがダサい
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => count++ == 0 ? xs : xs.DelaySubscription(retryDelay))
    .Retry(retryCount);

はい、ダサいし、なんだか何やってるのかさっぱりになってきました、サイテー。LetもRx 1.0にはなくて(それ以前にはあったのですが削られた)2.0から復活になります。Letは、一時変数を置かなくて済むというチェーン病にかかった人がお世話になる処方薬です。んなもん読みにくくさせるだけじゃねーか、という感じですが、もしLetがないと変数を置いて var xs = ToObservable(); xs = () ? xs : xs.Delay..; xs = xs.Retry(); としなければならなくて、非常に面倒くさいのです。だから、使いどころを守って乱用しなければ、割とイケてます。結構大事。

が、しかし、これも間違っています!(えー)。というかLetではなくて変数に展開してみるとオカシイとはっきり分かるのですが、Letの内部はRetryとか関係なく一回しか評価されないので、これだと必ずDelaySubscriptionなしのほうしか通りません。この路線で行くなら、更にやけくそでDeferを追加しましょうか。

// Deferだらけとかダサすぎるにも程がある
var count = 0;
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => Observable.Defer(() => count++ == 0 ? xs : xs.DelaySubscription(retryDelay)))
    .Retry(retryCount);

ダサすぎて話にならない。Defer連打ダサい。Deferまみれになったら、ちょっと根本から方針を疑いましょうか。ついでに外部変数を使うというのがそもそもダサい。もう少し頑張りましょう!クエリ演算子をこねくり回して、と。

// DelayなしのDeferとDelayありのDeferを連結して、DelayありのみをRetryさせている
var asyncQuery = Observable.Defer(() => WebRequest.CreateHttp(url).GetResponseAsync().ToObservable())
    .Let(xs => xs.Catch(xs.DelaySubscription(retryDelay).Retry(retryCount - 1)));

どうでしょう。他にもやりようは色々とあるかもですが、正直ワケガワカラナイのでこの辺でよしておいたほうがマシです。実際のところ、以下のような拡張メソッドを作るのがベストだと思っています。

// 結局これが一番なのではかという結論に至る
public static async Task<string> DownloadStringAsyncWithRetry(this WebClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.DownloadStringTaskAsync(url);
 
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

new WebClient().DownloadStringAsyncWithRetry(”hogehoge”); だけですからねー。拡張メソッド万歳。Rx最終形のような短さとか魔術っぽさはゼロで面白くも何ともない、というか正直クソつまらないコードなわけですが、そこがC#のC#たる所以ですな、ということで。私はRxのようなクールさも勿論大好きなのですが、こういうイモさもまた、C#らしさであって、現実をより良くするための、目的を忘れない素敵な側面だと思っています。

ちなみにWebRequestの場合はそれ自体の作り直しが必要なので(一度エラーを受けたら何度GetResponseを繰り返してもダメぽ)、拡張メソッドダメです。WebClientはイベントベースなのでTask系と相性がアレで今一つなわけですが、WebRequestはWebRequestで、これベースに拡張メソッドだけで整えるのは無理があるのですね……。

.NET Framework 4.5からはHttpClientというクラスが入るので、それを使うとちょっとだけモダンっぽい雰囲気。

// モダンなドトネト的にはHttpClientかしら
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    var count = 0;
RETRY:
    try
    {
        count++;
        return await client.GetStringAsync(url);
    }
    catch
    {
        if (count >= retryCount) throw;
    }
 
    if (retryDelay > TimeSpan.Zero)
    {
        await Task.Delay(retryDelay);
    }
 
    goto RETRY;
}

別途System.Net.Httpの参照が必要なのが面倒ですが。

非同期のタイムアウト

Rxが色々できるのは分かったけれど、結局そういう部分って拡張メソッドとかに隔離してアプリケーションコードからは離れるので、やっぱそんなでもないんじゃないの!?というと、あー、まあそうかもねえ、とか思いつつ、複雑になればなるほど効果は加速しますが、そうなるとRxでも(見た目はスッキリしたとしても)やっぱ複雑ですからね。さておき、このままだとアレなのでもう少しまともな例を、一番最初に挙げたCuring Your Event Processing Blues with Reactive Extensions (Rx) | TechEd Europe 2012から引っ張って来ましょうか。

タイムアウトを追加する例です。WebRequestだとTimeout設定すればいいぢゃーん、ではあるものの、そうではないシチュエーションも沢山ありますから、対策を知っていて損はないです。まず、asyncの例を。

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url);
    var timeout = Task.Delay(TimeSpan.FromSeconds(30));
 
    // これ結構トリッキーですよね
    if (await Task.WhenAny(download, timeout) == timeout)
    {
        throw new TimeoutException();
    }
 
    var html = await download;
    return html;
}

WhenAnyが中々トリッキーですね。慣用句として覚えてしまえばどうってことないのですが……。asyncもただawaitするだけじゃなくて、ちょっと慣れてきたらTask.WaitAll/Any, Task.WhenAll/Anyを使いこなすと、性能的な意味でも表現力的な意味でもグッと広がりますので、探究するのがお薦め。

さて、それをRxでやると……

static async Task<string> GetHtmlAsync(Uri url)
{
    var client = new WebClient();
 
    var download = client.DownloadStringTaskAsync(url)
        .ToObservable()
        .Timeout(TimeSpan.FromSeconds(30));
 
    var html = await download;
    return html;
}

ToObservableして、Retryの時のようにTimeoutを足すだけ。非常に直観的で、楽ちん、分かりやすい。演算子が豊富なのはRxの強みです。だからRetryにTimeoutがつけられるオーバーロードが最初から用意されていれば、Letとかで複雑になってしまった例もスッキリ仕上がって、ドヤァと言えたんですけどね(笑)

// こういうのを作っておけば!
static IObservable<T> Retry<T>(this IObservable<T> source, int retryCount, TimeSpan retryDelay)
{
    return source.Catch(source.DelaySubscription(retryDelay).Retry(retryCount - 1));
}
 
// 神がかってシンプルに!Rx最強!
public static async Task<string> GetStringAsyncWithRetry(this HttpClient client, string url, int retryCount, TimeSpan retryDelay)
{
    return await Observable.Defer(() => client.GetStringAsync(url).ToObservable()).Retry(retryCount, retryDelay);
}

標準で足りない演算子は自分で作ればいいので、また、asyncが出来たことで、今まで自作が大変だった演算子も作るのが大分容易になりました!ので、ガンガン作ってしまうといいです。汎用的に使える演算子が集まれば集まるほど、Rxの合成可能という性質が価値を発揮します。

リトライやタイムアウトをC# 4.0でRxなしで書くと

死ぬほど面倒なので書きません。いや無理でしょ常識的に考えて。

まとめ

というわけで、Rxとasyncは手を取り合って仲良く高みを目指せばいいわけです。使いこなしが必要なのはどっちも変わらない!

さて、@ITの連載、Reactive Extensions(Rx)入門 - @IT の次回は非同期のはずですが(聞こえなーい)、ええと、はい、すみません……。ええと、あと次はRx 2.0の強化事項を、ええと、まあそのうちいつか……。はい、すみません。

諸事情あって今色々詰まってて本気でヤバいんですが、それはそれとして、現在全力で一年以上やるやる詐欺だったlinq.jsの改修を進めていまして、これは本当に本当に絶対近日中にベータを出すのでお楽しみに。相当にイイ出来で、割と革命的に凄い内容になってます。いやほんと。かなり自信ありますよ。

他の積みタスクは、ReactiveOAuth(バグ修正のPull Requestを放置中というサイテーな有様、OAuth 2.0対応しないの?とか)、ReactiveProperty(WinRT対応まだー?)、Utakotoha(現在動いてない模様なので要改修)、DbExecutor(全面再構築まだー?DataSet殺すんでしょー?)とかでしょうか、って結構ありますね、うわぉぅ。というかReactive系は2.0対応とWinRT対応をやらなきゃならないので作業量的に面倒くさくて、ついつい手が遠ざかってしまいですね。はい、でも、やります。

にゃー、という感じでブログも結構アレなWebFormsとDataSetディスもそろそろさようならして、通常営業に戻ってきませう。

Comment (5)

abeq : (07/11 09:49)

こんにちわ、いつも拝見させていただいています。有益な情報のご提供を感謝します。
linq.jsの改版、パンツを脱いで待ってます。ハックション。楽しみで楽しみでなりません。

いたた : (07/13 18:39)

興味深い内容ですが、ここのサイトは目に痛い・・。

Siena : (07/25 16:48)

XboxInfoTwitの更新も是非……お願いします。

Tak : (07/30 11:20)

はじめまして。
どこにコメントつけようかとおもいましたがこちらに。
最近、DbExecutor使い始めさせてもらいました。
かなり素敵なライブラリだと思います。

それで不具合というわけでもないですが、DBExcutorでSelectDynamicをした結果を
IEnumratableで返すような関数を作ると、評価が遅延されて実際にアクセスした時には
Connectionが破棄されているという事がありました。
なお、ToListでリストを返すようにすると、評価が確定するので落ちませんでした。
一応、ご報告しておきます。
———————————————-
var data = DB.GetData(); //IEnumratableを返す関数
label.Text = data.Count(); //OK
foreach(var item in data) //ここで落ちる
{
}

neuecc : (07/30 21:53)

はい、ここでもOKですよー<DbExecutor。
そして、どういたしましてー。

なるほど、コネクションを閉じるか閉じないかは結構微妙な問題ですねー。
上の例だと二度重複したDBアクセスが走るので、
大抵はどこかで確立させることを望むはずで、とはいえそれはそれで不便な時もあり、で。
どーすべきかなあ、とは、ちょっと考えてみますー。

Name
WebSite(option)
Comment

Trackback(1) | http://neue.cc/2012/07/11_377.html/trackback

.NET Clips : (07/11 09:34)

neue cc - Reactive Extensions + asyncによるC#5.0の非同期処理…

素敵なエントリーの登録ありがとうございます - .NET Clipsからのトラックバック…

Search/Archive

Category

Profile


Yoshifumi Kawai
Microsoft MVP for Visual Studio and Development Technologies(C#)

April 2011
|
July 2018

Twitter:@neuecc
GitHub:neuecc
ils@neue.cc