実践例から見るReactive Extensinosを用いた非同期処理

私はXboxInfoTwitという、Xbox.comからデータをスクレイピングで取り出してTwitterに投稿するという、大変不届き者なアプリケーションを公開しています。お陰様で認証者数も2500人を超えて、割と活況。このアプリケーションでのデータ取得ですが、正規なルートでの情報取得ならばAPI叩けば一発、というケースも少なくないですが、いかんせんルートがアレなので一回の取得で取れる情報は断片。あちらこちらから値を受け渡し組み立てなければならないという、入り組んだ通信手順になっています。これを、もし全部非同期でやろうとしたら目眩がするなあ。ふむ。そこでReactive Extensions。むしろ良いネタがあるじゃないか!というわけでクローラーのコア部分を完全非同期 + Reactive Extensinosで全面的に書きなおしてみました。

neuecc / XboxInfoTwitCore / overview – Bitbucket

dllでライブラリという形体を取っていますが、基本的には誰にも使い道がないものかと思われます。というわけで、このXboxInfoTwitCoreの内容自体はどうでもいいとスルーして、この現実的な課題(?)にReactive Extensinosがどう簡単にしてくれたかを見ていきます。

非同期通信の連続

まずは、手順について図で。最近、パワーポイントでのポンチ絵を書く練習を兼ねて、しかしこんなヘタクソで大丈夫か??

プレイ状況の取得は、MyXbox/Profileから取得、実績内容の取得はそこからゲームのタイトルIDを取り出して、個別のゲーム画面のURLを作り取得。最終的には両者を結合してデータを作り出します。ただし、ログイン前にMyXbox/Profileにアクセスすると認証フォームにリダイレクトされるので、そこでごそごそと処理してlive.comの認証クッキーを取得する必要があります。この認証クッキー取得が若干ややこしい。一度分かってしまえばそんなでもないのですけれど、ちょびっと嫌らしい造りになっています(恐らくこの認証部分はWindows Liveで共通だと思うので、SkyDriveをモニョモニョしたい、など考えている人は参考にでもどうぞ)

ちなみに非同期のPOSTってのは、BeginGetRequestStreamで非同期でRequestStreamを呼び出して書き込む(POST)してから、BeginGetResponseで非同期でレスポンスを取得するので、ネスト量2倍。クッキー認証部分だけで、普通に書くと7つぐらいネストするわけです……。

Asynchronus Query

細部を見る前に、最終的にどうなったか、を。

var fetcher = new LiveTokenFetcher("userid", "password");
var locale = new Locale("ja-JP", "日本");

// 非同期クエリ組み立て
var asyncQuery =
    from crawler in
        from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
        select new XboxInfoCrawler(cookie, locale)
    from pi in crawler.GetPlayerInfo()
    from ai in pi.GameInfo.TitleId != 0
        ? crawler.GetAchievement(pi.GameInfo.TitleId)
        : Observable.Return<AchievementInfo>(null)
    select pi.Tap(x => x.GameInfo.AchievementInfo = ai); // Tapについては解説しな...い

// 実行
asyncQuery.Subscribe(pi => Console.WriteLine("{0} - {1}", pi.GamerTag, pi.PlayerState));

(私にしては珍しく)クエリ構文を使っていますが、こう見ると本当に、非同期処理がLinqに統合された形で問い合わせられる、ということが良く伝わるのではないでしょうか?処理の順序が一見イミフ、というのは慣れの問題です!解説します。

GetLoginCookie、GetPlayerInfo、GetAchievementがIObservable<T>を返す、非同期処理。GetLoginCookieはポンチ絵で言うところの、クッキー取得で、GetPlayerInfoが中央左、GetAchievementが中央右で、最後のselectがマージというわけです。ややこしくネストするはずの非同期が非常にあっさりと記述できました。

GETとPOSTの連続

一番面倒くさいログインクッキー取得の部分について。

public IObservable<CookieCollection> GetLoginCookie(string url)
{
    return Observable.Defer(() => CreateWebRequest(url).GetResponseAsObservable())
        .Select(res => res.TransformResponseHtml(xml => new
        {
            Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
            PostUrl = GetPostUrl(xml),
            PPFT = GetInputValue(xml, "PPFT"),
            PPSX = GetInputValue(xml, "PPSX")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post, a.Cookie)
            .UploadValues(new Dictionary<string, string>
            {
                {"LoginOptions", "3"},
                {"PPFT", a.PPFT},
                {"PPSX", a.PPSX},
                {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
                {"login", loginId},
                {"passwd", password}
            }))
        .Select(res => res.TransformResponseHtml(xml => new
        {
            PostUrl = GetPostUrl(xml),
            Anon = GetInputValue(xml, "ANON"),
            T = GetInputValue(xml, "t")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post)
            .Tap(r => r.AllowAutoRedirect = false)
            .UploadValues(new Dictionary<string, string>
            {
                {"ANON", a.Anon},
                {"t",Uri.EscapeDataString(a.T)}
            }))
        .Select(res => ConstructCookie(res.Headers["Set-Cookie"]));
}

となってます。長くて一瞬ウゲ、となりますが、基本的にはSelectとSelectManyしか使っていません。Selectは流れてくる値を変換。SelectManyは、次の非同期処理の実行。そう覚えて貰えれば、どうでしょう?

先頭から見るとurl(MyXbox/Profile)に対して非同期でのGetResponseを開始。認証フォームにリダイレクトされるので、HTMLをスクレイピングしてPOSTに必要なデータ(PPFT,PPSXなど)を集める。そして非同期でポスト(UploadValuesは、詳細はソースコードを参照してもらうとして非同期で投稿してIObservable<WebResponse>を返す自前定義の拡張メソッドです)。続いて、スクレイピングして次のPOSTに必要なデータ(ポスト先URL,Anon,T)を生成、そして再びUploadValuesで非同期ポスト。するとWebResponseに認証クッキーがあるので、ヘッダーの"Set-Cookie"からクッキーを取ってCookieCollectionを生成したものを返す。

となってます。流れるように!また、最後がSelectで終わっているように、流れは終わっていません。利用者が、ここから更に非同期での流れを繋いでいくわけです。

SelectとSelectMany、そしてクエリ構文

御大層なことを言いつつ、SelectとSelectManyしか使ってないじゃないか!というと、はい、その通りです。それでいいんです。ふつーのシチュエーションでの非同期処理って、そういうものでしょう?データを加工するSelectと、ネストをフラットにするSelectMany。これだけ覚えれば十分であり、そして、役立ちなのです。

ところで、冒頭の例で(メソッド構文信者な私が)何でクエリ構文を使ったかというと、そこそこ綺麗に見えるかなー、とか思ったのともう一つは、割と必要に迫られたから。書き方は色々あります。例えばクエリ構文をベースにするなら、他にも

// クエリ構文とメソッド構文を混ぜるのも時には分かりやすさに繋がる
from crawler in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
                       .Select(cookie => new XboxInfoCrawler(cookie, locale))
from pi in crawler.GetPlayerInfo() // 以下略

// select intoを使うことで完全フラットに流すことも可能です
from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
select new XboxInfoCrawler(cookie, locale) into crawler
from pi in crawler.GetPlayerInfo() // 以下略

と出来るでしょう。特に後者のselect intoを使った書き方は冒頭のものより良いかもしれません。ネストして順番が上下するのは複雑さの現れになりますから。では、メソッド構文で書くと?

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale)) // クッキーをラップしたもの
    .SelectMany(crawler => crawler.GetPlayerInfo())
    .SelectMany(playerInfo => crawler.GetAchievement(playerInfo.GameInfo.TitleId));
    // と、書きたいのですが、SelectManyでplayerInfoに変換されているのでcrawlerはスコープ外でこうは書けない!

SelectManyは、前方の値が使えません。GetLoginCookieのように前の値は一切使わないのならば何ら問題ないのですが、GetAchievement(実績取得)はクッキーとGetPlayerInfoで得られたタイトルIDの"両方"が必要。この両方、というシチュエーションの時に、上から流れて変形するだけのSelectManyだと非常に、書きづらい。ではメソッド構文ではどうするか、というと、クエリ構文のように考える。

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale))
    .SelectMany(crawler => crawler.GetPlayerInfo().Select(playerInfo =>
        new { crawler, playerInfo })
    .SelectMany(a => a.crawler.GetAchievement(a.playerInfo.GameInfo.TitleId)));

下側のフローに二つ以上の値を持ち越したいのならば、二つ以上の値を格納した匿名型を作ればいいぢゃない。ということですね。クエリ構文の実態もこうなっています。正直、メソッド構文でこれを書くのは二つだけならまだいいですが、幾つも繋いでいくとなると相当汚くなるので、これはもう素直にクエリ構文の出番だな、と私は思いました。

奇怪なるクエリ構文の実態については多重 from の展開結果 - ++C++; // 管理人の日記で書かれています。って、よくみるとTrackbackに昔の私が送ってますね、ふむふむ、Linq to ObjectsのJavaScript移植であるlinq.jsで書くとこうなるようですよ?

// クエリ
var apart = Enumerable.Range(1, 5);
var query = apart
   .SelectMany(function(baker){ return apart
   .SelectMany(function(cooper){ return apart
   .SelectMany(function(fletcher){ return apart
   .SelectMany(function(miller){ return apart
   .Select(function(smith){ return {
       baker: baker, cooper: cooper, fletcher: fletcher, miller: miller, smith: smith}})})})})})
   .Where("Enumerable.From($).Distinct('$.Value').Count() == 5")
   .Where("$.baker != 5")
   .Where("$.cooper != 1")
   .Where("$.fletcher != 1 && $.fletcher != 5")
   .Where("$.miller > $.cooper")
   .Where("Math.abs($.smith - $.fletcher) != 1")
   .Where("Math.abs($.fletcher - $.cooper) != 1");

// 出力
var result = Enumerable.From(query.Single()) // 答えは一つなのでSingle,そしてobjectをKeyValuePair[]に分解
   .OrderBy("$.Value")
   .ToString(", ", "$.Key + ':' + $.Value"); // シーケンスを文字列に結合

alert(result); // smith:1, cooper:2, baker:3, fletcher:4, miller:5

linq.jsは"完全な"移植なので、.NETで動くコードが動かない、ということはありません。JavaScriptにもLinqの革命を!と、宣伝はさておき、SelectManyで下方に値を持ち出しているのに、毎回匿名型を作っていません。SelectManyのresultSelectorの部分でネストさせることで、値を下の方まで持ち運ぶ事が可能です。これはJavaScriptだけでなくC#でのLinq/Rxでも有効です。一つのパターンというかテクニックというか。C#ではわざわざこんなことやるぐらいならクエリ構文使えという気がしますが、クエリ構文の使えないReactive Extensions for JavaScript(RxJS)では活きるかもしれません。

ただ、この手のインデントの細工による対策は、IDEの自動整形と相性悪いんですよね。上のSelectManyのものも、コードフォーマッタにかけるとボロボロに崩れ去ります。基本的には私は、インデントは機械任せでやるべき。人が調整するものではないし調整してはならない。と思っているので、残念ながら上記テクニックは使うことはないかもなー、いや、どうだろう、積極的にはやらないという程度かしら。

The Future of C#(async/await)

つい数日前に開催されたPDC2010で、C#設計者のAnders Hejlsbergが、The Future of C#と題してC#5.0(とは言ってませんが、恐らくそうなる)のFeatureを語りました。一つは今まで言われていたCompiler as a Service。もう一つが、asynchronusの言語統合。C# 2.0でのyield returnによるIEnumerable生成のようなコンパイル時生成で非同期をコードの見た目上、完全に同期のように扱うことが出来ます。

Asynchronous Programming for C# and Visual BasicにてVisual Studio Async CTPが出ていて既に試してみることが可能なので(要:英語版VS2010)、実際にXboxInfoTwitCoreをasync/awaitで書き直してみました。全体はbitbucketのリポジトリ上にあるので興味あればそちらもどうぞ。以下は、ログインクッキーを取得する部分。

private async static Task<WebResponse> UploadValuesAsync(WebRequest request, IDictionary<string, string> parameters)
{
    var bytes = parameters.ToQueryParameter().Pipe(Encoding.UTF8.GetBytes);
    using (var stream = await request.GetRequestStreamAsync())
    {
        await stream.WriteAsync(bytes, 0, bytes.Length);
    }
    return await request.GetResponseAsync();
}

public async Task<CookieCollection> GetLoginCookie(string url)
{
    var res = await CreateWebRequest(url).GetResponseAsync();
    var prepare = res.TransformResponseHtml(xml => new
    {
        Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
        PostUrl = GetPostUrl(xml),
        PPFT = GetInputValue(xml, "PPFT"),
        PPSX = GetInputValue(xml, "PPSX")
    });

    var req2 = CreateWebRequest(prepare.PostUrl, MethodType.Post, prepare.Cookie);
    var res2 = await UploadValuesAsync(req2, new Dictionary<string, string>
    {
        {"LoginOptions", "3"},
        {"PPFT", prepare.PPFT},
        {"PPSX", prepare.PPSX},
        {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
        {"login", loginId},
        {"passwd", password}
    });

    var prepare2 = res2.TransformResponseHtml(xml => new
    {
        PostUrl = GetPostUrl(xml),
        Anon = GetInputValue(xml, "ANON"),
        T = GetInputValue(xml, "t")
    });

    var req3 = CreateWebRequest(prepare2.PostUrl, MethodType.Post);
    req3.AllowAutoRedirect = false;

    var res3 = await UploadValuesAsync(req3, new Dictionary<string, string>
    {
        {"ANON",prepare2.Anon},
        {"t",Uri.EscapeDataString(prepare2.T)}
    });

    return ConstructCookie(res3.Headers["Set-Cookie"]);
}

まあ見事にベッタベタに書かれているのが良く見えますねえ。このasync/awaitですが、非同期として扱うものにasyncを宣言したメソッドを用意。あとは同期のように書くだけ。yield returnのかわりにawait + XxxAsync と書くだけ。それで、まるで同期のように書けてしまいます。あまりにもお手軽。魔法のように。Silverlightのネック(そして初心者キラー)なところは、分かりづらい非同期でしたが、ウルトラC的に解決……。

この発表を見た時は普通にショックで、更に一日経って冷静に考えると更にショックで寝こむ勢いでしたね(笑) 正直なところ、被ります。単純な非同期の簡易化という点だけで考えればめっちゃ被ります。それはTaskとRxが被るよねー、Rxのほうが書きやすいっすよー、とか少し前に言っていた程度にはモロ被ります。実際問題Taskはあまり使いやすいとは言えないのですが、コンパイラサポートがガッツシ来てしまったら、それは話は別ですな。

勿論、RxのメリットはIEnumerable<T>や、イベントやタイマーなどの他のIObservable<T>とのシームレスな連携にもあるので、非同期を利用するシーンにおいてasync/awaitが完全に置き換えるもの、とは言いませんけれど。そして、私はそういった統合っぷりに魅力を感じているのでasync/awaitはあまり使わないかなー、という気がしてなくはないのですが、ちょっとその辺、判断が難しい。

さて、ところで両者を置き換えるのは非常に簡単です。戻り値がTask<T>かIObservable<T>か。RxではSelectManyだった位置にawaitを置く。それだけです。これは覚えておいて損はないかもです。そういう意味でも普通にかぶってるよな、とか思いつつ。awaitが分かればRx-SelectManyも簡単だし、その逆もまた然り。

SilverlightとWindows Phone 7

Silverlightは同期APIがないので、Rxで組むことで互換性が狙えます。このXboxInfoTwitCoreはフル非同期・フルRxなので、Silverlight/Windows Phone 7にも対応させ、られませんでした!えー。CookieExceptionが発生したり取れる最終的なCookieが何か違ったりと、Cookieが鬼門すぎて無理でした、とほほほ。マジワケワカラン。ええと、一応はWPF-Silverlight-WP7で完全なコード共有を実現する、という点もRxの強力な武器であり、使う理由になるとは思っています。それを示したかったのですが、うーむ。完全に同一なファイルで非同期としての挙動は問題なく取れているのですが、実際にデータが出せないとねえ……。カッコワルイ。

SgmlReader for Silverlight/Windows Phone 7

XboxInfoTwitCoreではHtml to Xml変換にSgmlReaderを使用しています。以前に紹介を書きましたが、これは本当に重宝します。ストリームとXElement.Loadの間に置くことで、不正なXMLである(ためXElement.Loadに渡すと例外が発声する)ネット上のHTMLをLinq to Xmlで扱えるようになります。しかし、元のコードベースが古いのと開発が活況とは言い難い状況であるために、SilverlightやWindows Phone 7に対応していません。大変困った。困ったので、Silverlight/Windows Phone 7で動くように少々コード弄ったところ、問題なく動いた!ので、こちらもbitbucketにあげときました。

neuecc / SgmlReader.SL / overview – Bitbucket

基本的にはデスクトップ版と全く同じ感覚で使えます。

var req = WebRequest.Create("http://google.com/");
req.BeginGetResponse(ar =>
{
    var res = req.EndGetResponse(ar);
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    using (var sgmlReader = new SgmlReader { DocType = "HTML", CaseFolding = CaseFolding.ToLower, InputStream = sr })
    {
        var xml = XElement.Load(sgmlReader); // これ。
        Dispatcher.BeginInvoke(() => MessageBox.Show(xml.ToString()));
    }
}, null);

プロパティにURLを渡すと例外で落ちます(同期通信は許可されていません!)。というわけで、WebRequestなどから非同期でウェブからStreamを取り出して、それをInputStreamプロパティに渡すという形を取ってください。この制限は、SilverlightではXElement.Load("url")が許可されていないのと同じことです。

XboxInfoTwitの今後

ここまで見てくれた人がXboxInfoTwit本体の利用者とは思えません!のですが、とりあえず書きますと、コア部分は書き換わったので、あとはGUIというか設定画面を複数言語に対応させてCodePlexで公開。を目指しています。が、いかんせんGUIは難敵です……。今のXboxInfoTwitも一応WPFなんですが見よう見まねで組んだXAMLで汚いので、きっちりと綺麗に書き換えたい、のですが、それをやるにはあまりにもWPF/XAMLの知識がなさすぎる。そのため、今日明日でフル改装で公開!というわけにもいきません。もう少し時間がかかりそうです。

Twitterへの投稿に関しては、以前書いたReactiveOAuth(完全なRxベースのOAuthライブラリ)があるので、シームレスに統合出来そう(こちらも幾つか課題が溜まっているので更新しないと……)。あとは、今使ってる自動アップデートプログラムがタコな出来なので、これも作り直したいなあ。なんて思っていたり。

まとめ

ふつーの非同期で書いてネストしまくりで、こんなの書いてられないだろほれみろバーカバーカ、とか言ってみたかったのですが気力がなくてそれは断念。ともあれ、今ある非同期の厳しさへの現実解として、Rxはアリ、です。非同期処理が、かなり綺麗にLinqクエリとして溶け込む様は分かるのではないかと思います。C#5.0のasync/awaitと比較してみても、awaitの位置にSelectMany、というだけの話ですしね。そして何よりも、async/awaitは今日明日に出るわけじゃないのです! Visual Studio 2005 -> 2008 -> 2010のペースで考えるならば2013年。まだ2年も先です。

async/awaitがないから「しょうがなく」使う、ってスタンスもアレゲ。いえいえ、喜んで使うのです。Rxならではのメリットが、当然あります。ただの非同期処理の一本化というだけに終わらず、イベントやタイマーなど他のソースとの融合は魅力です。そう、そもそも、Rxの持つ側面は3つ。「Asynchronus」「Event」「Pull to Push」。ここ最近はAsynchronusのことしか書いてなかったですが、他の2つのほうも見逃せないというか、むしろRxは最初はLinq to Eventsとして紹介されていたぐらいだしEventのほうがメインですよ?

ところで話は全く変わりますが、Bitbucket使い始めました。バージョン管理は当然Mercurialです。Mercurialは非常にいいです。もうSubversionは使う気になれない。Team Foundation Serverは使ったことないので知らないけれど(あれはオールインワンなところに価値がある気がしつつ、まあ、重量級ですよね)Mercurialのサクサク感は、バージョン管理はしっくりこないんです!とかほざいていた私にガツーンとバージョン管理がないともう生きていけない!ちょっとしたプロジェクトもhg initするし!な心構えに変えさせる威力がありました。あとちょっとした変更でもローカルでコミット、な素敵さとか。

TortoiseHgは、若干UIの導線がイケてないとは思いつつ、まあまあ使いやすく(日本語化もデフォルトでされています←要環境変数にLANG=ja)、Visual Studio拡張のVisualHGも素敵な使い勝手。というか、UI部分はTortoiseHgを呼び出しているだけというシンプルさがいい。私はEclipse下ではMercurial Eclipseを使ってますが、こちらは独自にがっつし作りこんであって、それが今一つな使い勝手で。私はVisual Hgの方向性を支持します。

最後に、毎回嘘っぱちな次回予告ですが(SelectManyの解説をすると言い続けて半年以上経過)、ポーリングをイベントとしてRx化してPushで共通化とか何とかかんとか、を予定。引き続き題材はXboxInfoほげほげを使うつもりです。乞うご期待しない。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive