非同期の再帰的な辿り方、或いはRxとC# 5.0 Asyncの連携について

例えば、ページを辿る。何度もアクセスを繰り返して、辿る。非同期で。単純なようで、やってみると何気にこれが結構難しい。例としてコードレシピのReactive Extensionsを使用してTwitterから非同期にデータを取得し表示するがありました。MVVMも絡めて、素晴らしいサンプルですね!

というわけで、お題を拝借して、Twitter ApiのGET statuses/friendsを使わせて頂きます。んが、今回は、手を抜いてフォロワーのscreen_name(@hogehogeのhogehogeの部分)だけを取れれば良い、ということにします。JSON解析やデシリアライズも面倒だし話の本題でもないので省略するため、DynamicJsonを使って、JSONを生のまんまっぽく扱うことにします。DynamicJsonは便利だなあ(棒)

さて、まずTwitter APIのcursorですが、大体こんな風になっています。目的はカーソルを辿って全てのuser(に含まれるscreen_name)を集めること。

JSON取得毎にnext_cursor_strという、次のページへのIDが取れるので、それを辿っていって、0が出たらページ末尾。といった具合です。next_cursor_strの値は一見ランダムに見える整数(2121409421とかそんな値になっている)であり、next_cursorという数字のものもあるのに、_strという文字列として得られるほうを使っています。何故かというと、TwitterのステータスIDが53bitを越えたお話 - tmytのらくがきを参照ください。DynamicJsonでは数字(Number)はdoubleとして扱うので、_strのほうを使わないと、危ういわけです。

まあ、ただのお題で本題な話ではないので、その辺は深く考えずそういうものなのだなあ、ぐらいで。

同期とyield returnと非同期

コードレシピのサンプルを見させて頂いたのですが、ネットワークアクセス部分がOpenReadなので、非同期"ではない"です。でも挙動は非同期だよ?というのは、Scheduler.ThreadPoolを使っているからなわけですが、つまるところ挙動的にはBackgroundWorkerを使って非同期にするのと同じことです。その場合ですと、Generateも確かに良いのですが、APIへのアクセスがそもそも同期であるならば、難しく考える必要はなく、yield returnを使ったほうが簡単です。単純なものは演算子の組み合わせで、複雑なものは素直に偉大なるコンパイラ生成(yield return)に頼る。そういう切り分けがLINQ的には大事かなって。

static IEnumerable<string> EnumerateFriends(string screenName)
{
    var cursor = "-1"; // 初期値は-1から
    while (cursor != "0") // 0が出たら終了
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = new WebClient().OpenRead(url))
        {
            var json = DynamicJson.Parse(stream); // 面倒くさいんでDynamicJson使いますよ:)
            foreach (var item in json.users)
            {
                yield return item.screen_name; // screen_nameを列挙
            }
            cursor = json.next_cursor_str; // 次のカーソルにセット
        }
    }
}

static void Main()
{
    var friends = EnumerateFriends("neuecc").ToArray();
}

すっきりと書けるのが分かると思います。え、これだとブロックしてしまって良くない?その通り。じゃあ非同期にしましょう。いえ、Reactive Extensionsで簡単にできてしまいます。yield returnで生成されたEnumerableをObservableに変換するのは、ToObservableです。

static void Main()
{
    EnumerateFriends("neuecc")
        .ToObservable(Scheduler.ThreadPool) // ThreadPoolで実行!
        .Subscribe(Console.WriteLine);
        
    Console.ReadLine(); // 終了してしまうからね
}

ToObservableはデフォルトでは Scheduler.CurrentThread 上で実行されるため、同期的にブロックしますが(※Push型シーケンスだからといって必ずしも非同期とは限らない)、任意のものに変更することも可能です。今回はScheduler.ThreadPoolを指定したので、ThreadPool上で動くようになっています。そのため、ブロックされません。

こういった書き方のほうが、コードがクリアになるし、IEnumerable<T>とIObservable<T>に両対応できてる、という柔軟性の点でも良いかと思います。また、BackgroundWorkerを使うよりも遥かに簡単ですよね。プログレス通知もなく、ただ処理をバックグラウンドでやりたい、というだけならば、Rxを使ったほうが楽チンです。プログレスが必要な場合は、Rxだとその辺の処理を作りこまなければならないので、素直にBackgroundWorkerを用いるのもいいかもしれません。私だったらRxをちょっと拡張してプログレス処理を作り込むほうを選ぶかな?その辺の話はReactive ExtensionsとAsync CTPでの非同期のキャンセル・プログレス処理を参照ください。

また、Observable化するとPublishによる分配 - C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsなど、色々と応用な使い方が広がるのもメリットの一つと言えるでしょう。

Async CTP

今回は例がWPFなため、WebClientで同期的(OpenRead)に取ってしまいましたし、それでも全然問題ないわけですが、SilverlightとかWP7だったらこの手(同期でJSON取ってきてyield returnで返す)は使えません。同期のOpenReadがそもそもなくて、非同期のOpenReadAsyncしかないからね。どうしましょう?それだとyield returnが使えないのはモチロンのこと、Generateでもうまく動きません。もしページ番号がカーソルのように不定ではなく1,2,3...といった形で辿れたとしても、Observable.Rangeでやると、うまくいきません。非同期なので結果が帰ってくる時間が不定だからです。結果を取得してから次の結果を取得する、という形式にしないとダメなのです。

ところでそもそも、同期的に書いたとしても、本来は書くのは大変なはずなのです。それが、yield returnというコンパイラ生成があるから簡単に書ける。ということは、そうです、非同期もコンパイラ生成してしまえばいいのです、ということでAsync CTPで書きましょう。Async CTPはC# 5.0で入る、かもしれない、async/await構文を使えるようにするためのものです。コミュニティテクノロジープレビュー。ようするにベータ版ですね。

// このコードはAsync CTP (SP1 Refresh)によるもので、将来的にも同じコードで動作することは保証しません
async static Task<List<string>> EnumerateFriends(string screenName)
{
    var list = new List<string>();

    var cursor = "-1";
    while (cursor != "0")
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
        {
            var json = DynamicJson.Parse(stream);
            foreach (var item in json.users)
            {
                list.Add(item.screen_name); // yield returnの代わりに……
            }
            cursor = json.next_cursor_str;
        }
    }

    return list;
}

Async CTPは簡単に解説すると、awaitキーワードを使うと、本来非同期のものが同期のように書けるようになります。詳しくは非同期処理 (C# によるプログラミング入門)を参照のこと。コード的にも見たように、差異はWebClientのOpenReadの部分を、await OpenReadTaskAsyncに変更しただけで、あとはまるっきり一緒です。非同期なんて簡単なものだね。と、言いたかったのですが、全部読み込んでListで返してるぢゃん……。これじゃEnumerateじゃないよ、yield returnじゃないの?これだと結果取得に時間かかるし、Takeなどを用いて、途中で止めることもできないし。あまりよくない。

結論としては今のところどうやら無理ということで。asyncの返すものはTaskもしくはTask<T>でなければならない。いや、Task<IEnumerable<T>>を返してくれればいいぢゃん、await yield returnとか出来たら素敵ぢゃないのん?と思わなくもないというか、普通にそういうリクエストも上がっているのですが、それにはIAsyncEnumerable<T>のようなものと、それに対するコンパイラサポートが必要だよね、という返しでした。

Rx + Async

IAsyncEnumerable<T>、それってIObservable<T>で代替出来る話だよね。IObservable<T>は連続的な非同期を内包しているから。(※IObservable<T>は一つのインターフェイスであまりにも多くのものを表現出来てしまい、内部の状態が読みづらく(同期なのか非同期なのか、遅延なのか即時なのか)混乱を生みがちという問題もありますが……)。なので、Rxでやってみましょう。といっても、Rxで完全に自前でやるのは相当大変なので、Async CTPのサポートも併用します。これにより非同期の待機が同期的に書けるようになり、yield returnであったりlist.Addであったりの部分を、OnNextに置き換えるだけになります。

Stable版のRxにはAsync CTP連携は入っていないのですが、Experimental(実験的)版には、awaitで待機出来る、というだけはなく、幾つかAsync CTPと連携できるメソッドが入っています。

// ExperimentalのRxのため、将来的にもこのコードが動作し続けることは保証しません
static IObservable<string> EnumerateFriends(string screenName)
{
    // ラムダ式の中でasync書けることがポイント
    return Observable.Create<string>(async (observer, cancelToken) =>
    {
        try
        {
            var cursor = "-1";
            while (cursor != "0")
            {
                if (cancelToken.IsCancellationRequested) return; // cancelをチェック

                var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
                    screenName, cursor);
                using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
                {
                    var json = DynamicJson.Parse(stream);
                    foreach (var item in json.users)
                    {
                        observer.OnNext(item.screen_name); // yield returnのかわりに
                    }
                    cursor = json.next_cursor_str;
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return; // 例外発生時はOnErrorを呼んで終了
        }

        observer.OnCompleted(); // 例外発生もキャンセルもなく完了したなら、OnCompletedを呼ぶ
    });
}

static void Main()
{
    EnumerateFriends("neuecc")
        .Take(350) // 350件後にDisposeされてtokenがcancelになる
        .Subscribe(
            s => Console.WriteLine(s),
            e => Console.WriteLine("error:" + e),
            () => Console.WriteLine("完了")); // Takeのほうから呼び出されるので、cancel扱いになっても表示される

    Console.ReadLine();
}

Observable.Create(RangeやGenerateなどの生成子、WhereやSelectなどの演算子の全てが使っている、本当のプリミティブの生成子)を使って、生のobserverでOnNext, OnError, OnCompletedの3つを制御してやります。Createやtry-catchの分、ネストが深くなってしまっていますが、コード自体は同期的に、yield returnを使って書いていたものとほとんど変わってないのが分かると思います。yield returnの部分にOnNextを置いた、それだけでそのまま置き換えられています。

これならIObservable<T>でも十分に自動生成のサポートが効いていると言えなくもないですね。やってみて、結構満足できてしまった。パフォーマンス的にも、演算子をベタベタ組み合わせるのはあまり良くはならないので、こうしてasync/awaitと連携させて作れると、素直に書けるうえに、パフォーマンス向上も狙えるのが嬉しい。ただ、OnErrorやOnCompleted、キャンセル(Dispose)をどうするか。考慮する事項が多いので、ある程度分かっていないと大変かもしれません。全て考えておかないと、正しく動作しません。既存演算子の組み合わせだけで済ませられるなら、そういった考慮事項は演算子が受け持ってくれるので、考えなくて済むのですが……。どうしても演算子の組み合わせじゃうまく出来ない、逆に複雑になりすぎる、そういった時の奥の手、ぐらいに考えておくと良さそう。

ところでasyncはメソッドの宣言だけでなく、ラムダ式の部分でも宣言できてawaitすることが出てきてしまうんですよね、ならば、同じようなコンパイラ生成であるyield returnも、現状は外部メソッドでしか使えないわけですが、以下のようにインラインでも使えるようになってくれると嬉しいなって。思ってしまうのです。

// 妄想なので、現状はこれは出来ませんが!
var infinity = Enumerable.Create(()=>
{
    var num = 0;
    while(true)
    {
        yield return num++;
    }
});

そんなもの散々突っ込み受けたですって?Iterator Blocks Part Seven: Why no anonymous iterators? - Fabulous Adventures In Coding。ええ、知ってます。しかし、awaitなどで必要さの要請を受けて、コストとベネフィットが逆転するときが来た、と、思うのです。それに、VBでも、いや、なんでもない。

Expand

さて、ともかくAsync CTPは未来の話であり、現状手元にあるもので何とかする方法はないのだろうかというと、あります。ようするところ、再帰的に辿ってるわけですよね、cursorを。じゃあ、Expandです。ExpandはReactive Extensions v1.0安定版リリースでEnumerableバージョンのものを説明しましたが、Observableバージョンもあります。

// 補助メソッド、Async CTPにはOpenReadTaskAsyncとか、そういうのがデフォで用意されてますが、
// Rxにはないので、自前で用意しなきゃあいけないという、それだけの話です(ダウンロードしてストリングを返すだけのもの)
public static class WebRequestExtensions
{
    public static IObservable<string> DownloadStringAsync(this WebRequest request)
    {
        return Observable.Defer(() => Observable.FromAsyncPattern<WebResponse>(
                request.BeginGetResponse, request.EndGetResponse)())
            .Select(res =>
            {
                using (var stream = res.GetResponseStream())
                using (var sr = new StreamReader(stream))
                {
                    return sr.ReadToEnd();
                }
            });
    }
}

// ExpandはStable版にはまだ搭載されていないので、Experimental版を使ってください
static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url).DownloadStringAsync().Select(DynamicJson.Parse);
    };

    return downloadJson("-1")
        .Expand(d => (d.next_cursor_str == "0")
            ? Observable.Empty<dynamic>() // TakeWhileで判定すると最後の一つを取りこぼすので
            : downloadJson(d.next_cursor_str))
        .SelectMany(d => (dynamic[])d.users)
        .Select(d => (string)d.screen_name);
}

そこそこ直感的ではないでしょうか?最初 Expand().TakeWhile(next_cursor_str != "0") と書いたのですが、それだと最後のページを取りこぼしてしまうのに気づいて、Emptyを投げる方針に変更しました。その辺、境界については注意を払わなきゃですね。

そして、残念ながら、ExpandはRxのStable版にはまだない。ということはWP7にもないわけで。

再帰とRx

Stable版でもやりましょう。awaitなし、Expandなし。では、どうやって作りましょうか。うーん、再帰的というのなら、本当に再帰させてしまえばいいのではないか?

static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = null; // 再帰するにはこーして最初にnull代入
    downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url)
            .DownloadStringAsync()
            .Select(DynamicJson.Parse) // ここまでExpandと共通
            .SelectMany(json =>
            {
                // Expandメソッドの中でやってることを大幅簡易化、ということです、つまるところ。
                var next = (json.next_cursor_str == "0")
                    ? Observable.Empty<dynamic>()
                    : downloadJson((string)json.next_cursor_str);
                return (IObservable<dynamic>)Observable.StartWith(next, json);
            });
    };

    return downloadJson("-1")
        .SelectMany(d => (dynamic[])d.users) // ここからもExpandと共通
        .Select(d => (string)d.screen_name);
}

これを再帰というには、あまり再帰してないのですが、まあ雰囲気雰囲気。Expandと大体共通です。つまるところ、Expandを自前で作る、のは結構大変なので、Expandのように汎用的ではなく、特化したものをその場で作る、といった程度の代物。そうすれば、少しは簡単に用意できます。

まとめ

同期でListに格納するだけなら簡単。遅延でやるのもyield returnのお陰で簡単。非同期で辿るのは難しい。awaitで複数の値をyield的に列挙するのは現状難しい。Rxとawaitの連携は大変素晴らしい。Expandは便利。なければないで何とかなる。でもやっぱ大変。

一見簡単なことが存外難しいってのはいくないですね。一見簡単なら、簡単なままでできないと。Expandも悪くはないんですけど、中々どうして慣れてないと分かりづらい。しかし、将来のC#には十分期待できそう。と、思いました、まる。あとRxとAsyncは全然仲良しなんですよ~、というところです。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive