XboxInfoTwit - ver.2.3.0.1

Xbox.comがリニューアルされました!というわけで新Xbox.comに対応しました。それだけです!そして、Xbox.comの構造が変わったお陰で取得が軽量化されました(情報取得のための巡回ページ数が大幅削減されたため)。ヨカッタヨカッタ。あまり良い評判を聞かない新Xbox.comですが、こういうところで良くなってますよー、と。地味な変化としては、今回からフレンドが0な場合でも取得/投稿できるようになりました。たまにフレンドいないから使えない!という嘆きを見かけるので、ちゃんと使えるようになってヨカッタヨカッタ。

例によってテストが非常に不十分なため、ゲームタイトルによっては投稿できなかったりするかもしれません。もし怪しいところがあったら、Twitterで投稿文に「XboxInfoTwit」を含めてポスト、例えば「GoW2がXboxInfoTwitで動かないんだけど何これ」とか言ってもらえれば、検索経由で発見しますので気楽に文句書いてください。(GoW2は動きますけど)

AnonymousComparer - ver.1.3.0.0

ver 1.3というか、バグフィックスです。ToDictionaryとかだと拡張メソッドが本来あるのとオーバーロードが被ってて使おうとするとコンパイル通らなかったのです。気づいてたんだけど放置してました、すみませんすみません。というわけで幾つかのものを削りました。これでコンフリクトなし。

どんなものかというと、こんなものです。最初投稿した時のの流用で(こらこら)

class MyClass
{
    public int MyProperty { get; set; }
}

static void Main()
{
    // 例として、こんな配列があったとします
    var mc1 = new MyClass { MyProperty = 3 };
    var mc2 = new MyClass { MyProperty = 3 };
    var array = new[] { mc1, mc2 };
    // Distinctは重複を取り除く。でも結果として、これは、2です。
    var result = array.Distinct().Count();
    // 参照の比較なので当然です。では、MyPropertyの値で比較したかったら?
    // DistinctにはIEqualityComparerインスタンスを受け付けるオーバーロードもあります
    // しかしIEqualityComparerはわざわざ実装したクラスを作らないと使えない

    // そこで、キー比較のための匿名Comparerを作りました。
    // ラムダ式を渡すことで、その場だけで使うキー比較のIEqualityComparerが作れます。
    array.Distinct(AnonymousComparer.Create((MyClass mc) => mc.MyProperty));

    // でも、長いし、型推論が効かないから型を書く必要がある
    // Linqに流れているものが匿名型だったりしたら対応できないよ!
    // というわけで、本来のLinqメソッドのオーバーロードとして、記述出来るようにしました
    // ちゃんと全てのIEqualityComparerを実装しているLinq標準演算子に定義してあります
    array.Distinct(mc => mc.MyProperty);

    // 短いし、型推論もちゃんと効くしで素晴らしいー。
    // 匿名型でもいけます(VBの匿名型はC#(全ての値が一致)と違ってKey指定らしいですね)
    var anonymous = new[] 
    {
        new { Foo = "A", Key = 10 },
        new { Foo = "B", Key = 15 }
    };
    // true
    anonymous.Contains(new { Foo = "dummy", Key = 10 }, a => a.Key);
}

つまり、LinqのIEqualityComparerのオーバーロードうぜえ、何がインターフェースだよクソが、Linqならラムダ式だろ、インターフェースとかJava臭いんだよ。無名クラス(別に欲しくはないけど)がないから作るの面倒なんだよ。ということです。あるとそれなりに便利です。

Windows Phone 7で同期APIを実現するたった つの冴えないやり方

Windows Phone 7が発表されました。中々に素晴らしい仕上がりに見えます。米国では来月発売と非常に順調そうですが、日本では…… ローカライズが非常に難しそうに見えました。発売されること自体は全然疑っていませんが、問題は、米国で達成出来ているクオリティをどこまで落とさず持ってこれるか。日本語フォントや日本語入力、今一つなBing Map、但し日本は除くなZune Pass。本体だけではなく、周辺サービスも持ってきて初めてWindows Phone 7の世界が完成する。ということを考えると、大変難しそう。

その辺はMicrosoft株式会社に頑張ってもらうとして、一開発者的には淡々とアプリ作るだけでする。というわけで、標題のお話。WP7というかSilverlightと、そして例によっていつもの通り、Rxの話です。

問題です。以下のコードの出力結果(Debug.WriteLineの順序)はどうなるでしょうか。

// 何も変哲もないボタンをクリックしたとする
void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    // 10秒以内にレスポンスが来るとする
    var req = WebRequest.Create("http://bing.com/");
    req.BeginGetResponse(ar => Debug.WriteLine("async"), null);

    Thread.Sleep(10000); // 10秒待機
    Debug.WriteLine("end");
}

答えは後で。

Dispatcher.BeginInvokeとPriority

Dispatcherとは何ぞやか。について説明するには余白が狭すぎる。ので軽くスルーしてコードを。Dispatcher.BeginInvokeは通常は別スレッドから単発呼び出しが多いですが、UIスレッド上でDispatcher.BeginInvokeを呼ぶとどうなるでしょう?

// (WPF)何も変哲もないボタンをクリックしたとする
void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("normal1")), DispatcherPriority.Normal);
    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("background")), DispatcherPriority.Background);
    Dispatcher.BeginInvoke(new Action(() => Debug.WriteLine("normal2")), DispatcherPriority.Normal);
            
    Debug.WriteLine("end");
}

結果は、start->end->normal1->normal2->backgroundです。なおDispatcherPriorityはWPFでは設定可能ですが、Silverlightでは設定不可で、内部的には全てBackgroundになります。挙動は以下の図のようになっています。

一番上のブロックが現在実行中メソッド。下のがDispatcher。BeginInvokeで実行キューに優先度付きで突っ込まれて、現在実行中のメソッドが終了したら、キューの中のメソッドが順次、優先度順に実行されます。といったイメージ。

問題の答え

冒頭の問題の答えは、WPFではstart->async->endの順。Silverlight(WP7も含む)ではstart->end->asyncの順になります。ええ。WPFとSilverlightで挙動が違うのです!今更何をっていう識者も多そうですが(Silverlightももう4だしねえ)私ははぢめて知りました。はまった。BeginGetResponseはWPFでは(というか普通の.NET環境では)そのまま別スレッド送りで実行されますが、Silverlightでは一旦Dispatcherに突っ込まれた後に実行されるのですねー、といったような雰囲気(なので一つ前でDispatcher.BeginInvokeがどうのという話を挟みました)。

Silverlightでは、BeginGetResponseはすぐには実行されない。それを踏まえて次へ。

非同期 to 同期

非同期を同期に変換してみましょう。Reactive Extensions for .NET (Rx)で。

void Button_Click(object sender, RoutedEventArgs e)
{
    // 非同期を同期に変換!
    var req = WebRequest.Create("http://bing.com/");
    var response = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse).Invoke()
        .First();
    Debug.WriteLine(response.ResponseUri);
}

Rxで非同期を包むと長さ1のReactiveシーケンスとなるので、Firstを使うと同期的に値を取り出せる、という話を前回の記事 Rxを使って非同期プログラミングを簡単に でしました。そして実際、上のコードはWPFでは上手く動きます。きっちりブロックして値を取り出せる。勿論、それならGetResponseを使えよという話ではありますが。

では、Silverlight(勿論WP7でも)では、というと…… 永久フリーズします。理由は、BeginGetResponseはDispatcherに積まれた状態なので、現在実行中のメソッドを抜けない限りは動き出さない。Firstは非同期実行が完了するまでは現在実行中のメソッドで待機し続けるので、結果として、待機しているので実行が始まらない=実行完了は来ない→永遠に待機。になります。

結論としては、UIスレッド上で同期的に待つことは不可能です。代替案としてはThreadPoolで丸々包んでしまうということもなくはない。

void Button_Click(object sender, RoutedEventArgs e)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        var req = WebRequest.Create("http://bing.com/");
        var response = Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
            .Invoke()
            .First();
        Debug.WriteLine(response.ResponseUri);
    });
}

こうすれば、BeginGetResponseが発動するので問題なく待機して値を取り出せます。でも、これじゃあ全然嬉しくもない話で全く意味がない。Rxで包んでいる状態ならば、.Subscribeでいいぢゃん。ということだし。

// 非同期を同期に、そんなことは幻想なのでこう書くのがベストプラクティス
void Button_Click(object sender, RoutedEventArgs e)
{
    var req = WebRequest.Create("http://bing.com/");
    Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)
        .Invoke()
        .Subscribe(res => Debug.WriteLine(res.ResponseUri));
}

素直に、普通にReactive Extensionsを使うのが、一番簡単に書けます。息を吸うように、ごく自然にそこにあるものとしてRxを使おう。

Delegate.BeginInvokeのこと

相違点はまだあります。普通の.NET環境ではDelegateのBeginInvokeで非同期実行できますが、Silverlightにはありません。やってみるとNotSupportedExceptionが出ます。じゃあFuncにラップしてみるとどうだろう?

void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    Action action = () => Debug.WriteLine("action");
    Func<AsyncCallback, object, IAsyncResult> wrappedBeginInvoke = action.BeginInvoke;
    wrappedBeginInvoke.Invoke(ar => Debug.WriteLine("async"), null);

    Debug.WriteLine("end");
}

WPFではstart->end->action->async。Silverlightではstart->NotSupportedExceptionの例外。ここまではいいんです。Windows Phone 7でこのコードを試すと、例外出ません。何故か実行出来ます。SilverlightではBeginInvokeは出来ないはずなのに!そして、その実行結果はstart->action->end。つまり、非同期じゃない。BeginInvokeじゃない。Invokeとして実行されてる。意味がさっぱりわかりません。

不思議!不思議すぎたので、MSDNのWindows Phone 7 Forumで聞いてみましたが、良い返答は貰えず。とりあえず、怪しい挙動をしているのは間違いないので、これはやらないほうが無難です。勿論、通常こんなこと書きはしないと思うのですが、RxのFromAsyncPatternをDelegateに対して使おうとするとこうなりますので注意。Delegateの非同期実行したい場合はFromAsyncPetternじゃなくてToAsyncを使いましょう。

void Button_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("start");

    // Observable.ToAsync(()=>{})でもいいし、すぐにInvokeするならObservable.Start(()=>{})も有用
    Action action = () => Debug.WriteLine("action");
    action.ToAsync().Invoke().Subscribe(_ => Debug.WriteLine("async"));

    Debug.WriteLine("end");
}

こうすることで、Rxは内部でBeginInvokeではなくThreadPoolを使うので、問題は起こらずWPFと同じ結果が得られます。

まとめ

同期的に書くほうが分かりやすいには違いないし、また、非同期が苦痛なのもその通り。でも、非同期を同期に、なんて考えない方がいい。AutoResetEventなどを駆使して擬似的に再現出来たとしても、やっぱ無理ありますし、非同期のメリットを犠牲にしてまでやるものではない。確かに非同期をそのまま扱うのは苦痛だけれど、Rxを使えば緩和される。むしろ慣れれば同期的に書くよりも利点が見えてくるぐらい。無理に同期に変換しようとしないでRxを覚えよう。が、結論です。

でもドキュメント全然ないし日本語の話なんて皆無で難しいって? そうですねえ、そうかもですねえ……。このブログも全然順序立ってなくて、思い立ったところから書いてるだけで分かりづらいことこの上ないし。うむむ……。でも、Rxの機能のうち非同期周りの解説に関してはほとんど出せているはずなので、読みにくい文章ですが、目を通してもらえればと思います。

もしつまづくところがあれば、Twitterで「Reactive Extensions」を投稿文に含めてくれれば、Twitter検索経由で見つけて反応します。(「Rx」だと検索結果が膨大になるので反応出来ません……)。検索を見てるワードとしては、他に「Linq」なども高確率で反応しにいきます←逆に怖いって?すみませんすみません。

「C#」が検索キーワードに使えたらいいんですけどねえ。「Scala」とか「JavaScript」は常時見てるんですが、かなり活況に流れているんですよ。そういうの見てると、Twitter上のC#な話も漏らさず見たい・参加したいと思ってしまうわけで。

XboxInfoTwit - ver.2.2.0.4

未知のエラーが出まくっていたので、暫定というか適当な対処を取ってみました。コードがかなり古くて汚いので、根本的に手を入れたいところなんですが、作業量を考えると中々やる気が沸かないという微妙な状態。機能追加のリクエストも数点頂いているので申し訳ないんですけどね。

Rxを使って非同期プログラミングを簡単に

こないだ公開されたMSDNマガジンの記事、非同期タスク - タスクを使って非同期プログラミングを簡単に。おお、これは分かりやすく非同期周りについて網羅されてる!あと、私はTask全然知らないので初歩から入る導入はお役立ちです。いやまあ、実際のとこTask周りの導入記事っていっぱいあるのに、未だにお役立ち、とか言ってるのもどうかと思わなくもないところではあるんですが不勉強なもので。

同期処理でUIをブロックしてしまう、スレッドプールに投げればいいぢゃない、イベントベースのパターンとAPM(IAsyncResultを使うAsynchronous Programming Model)、そしてTask。おお、全部ですね。全部、全部?そう、何か欠けてます、ええ、Reactive Extensionsが欠けています。というわけで、Taskと対比させながらRxでのコードを見ていきましょう。

非同期実行、そして待機

MSDNマガジンでは真ん中辺りからのタスクのコードを、Rxでのコードと一緒に並べてみます。

// タスクパターン
Task<double> task = Task.Factory.StartNew(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("The task is running asynchronously...");
task.Wait(); // 実行完了まで待機
Console.WriteLine("The task computed: {0}", task.Result);

// Reactive Extensions
var obs = Observable.Start(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("Observable.Start非同期実行中");
var r = obs.First(); // 結果が返るまで待機
Console.WriteLine("完了 : {0}", r);

// 余談:タスクはIObservableに変換出来たりする
task.ToObservable().Run(Console.WriteLine);

どちらもデフォルトではThreadPoolで非同期を実行します。ThreadPoolと違うのは、待機するのも戻り値を取り出すのも簡単。Rxでは長さ1のReactiveシーケンスとして扱われるので、Firstを使うと同期的にブロックして値を取り出せます。ここだけを見ると、Wait() + task.ResultなTaskより扱いやすいのではないかと思います。また、両者ともに似ているので、TaskからIObservable<T>への変換も容易です。System.Reactive.dllを読みこめば、Taskに対してToObservableメソッドが追加され、簡単に変換することが出来ます。

自由な変換

汎用的に非同期処理をTaskに、Rxに変換しよう。TaskにはTaskCompletionSourceが、RxにはAsyncSubjectがあります。

// Construct a TaskCompletionSource and get its 
// associated Task
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
Task<int> task = tcs.Task;

// Asynchronously, call SetResult on TaskCompletionSource
ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // Do something
    tcs.SetResult(123);
});

Console.WriteLine("The operation is executing asynchronously...");
task.Wait();

// And get the result that was placed into the task by 
// the TaskCompletionSource
Console.WriteLine("The task computed: {0}", task.Result);

// ---

// TaskCompletionSourceは、RxではAsyncSubjectと対比させられる
// AsyncSubjectはRxでの非同期表現を自前で実装する場合に使う(Rx内部でも当然使われている)
var async = new AsyncSubject<int>();

ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // 何か重い処理をしてたとする
    async.OnNext(123); // 値のセット
    async.OnCompleted(); // 値を確定し非同期実行完了
});

Console.WriteLine("重い処理を非同期で実行中...");
var r = async.First(); // 同期的に結果を待機し取得
Console.WriteLine("処理完了:{0}", r);

こちらもまた、両者ともに実によく似ています。何らかの任意の非同期処理は、使いやすいようにRxに包んでしまうと素敵な気分になれる。

IAsyncResultパターンの変換

IAsyncResultパターン。Rxの辺りでは、というか多分.NET周りで言う分には、Asynchronous Programming Model、略してAPMと呼ぶそうです。私がその言葉を見たのは、プログラミングMicrosoft .NET Framework 第2版 (マイクロソフト公式解説書)でした。Richterはリッチャーと呼ぶべきかリヒターと呼ぶべきなのか謎という話ががが。この本は.NET3種の良書のうちの一冊だと思うので(もう一冊は.NETのクラスライブラリ設計 開発チーム直伝の設計原則、コーディング標準、パターン (Microsoft.net Development Series)、もう一冊は未定というか将来のために取っておくというか)だと思うので未読の人は是非是非。と思ったら絶版じゃないですか!いや、amazonで偶然品切れなだけかもしれませんが、どうなんでしょう。これが手に入らないのは損失です!海外では既に.NET 4に対応したThird Editionが出ています。ということは、であり、風のウワサによると―― らしいですので、まあ、ですね!

今読み返したら20ページほど費やされて色々書いてありました。ぶっちゃけ面倒くさいと思って半分以上流し読みしてたという事実に気づいてしまったり。おおぉ。まあ、その辺はThird Editionの時に拾い直せば……。そんなわけで、その面倒くささを緩和するFromAsync/FromAsyncPatternをどうぞ。

// FromAsyncで包むとその場で実行
Task<IPAddress[]> task = Task<IPAddress[]>.Factory.FromAsync(
    Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null);

task.Wait();
foreach (var item in task.Result) Console.WriteLine(item);

// RxのFromAsyncPatternの型指定は引数と戻り値の二つを指定する
// FromAsyncPatternで包んだら即実行ではなく、funcをInvokeするまでは開始されない
var obs = Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com"); // 即実行なら変数に代入せずメソッドチェーン、実行を遅らせたい場合はfuncで持っておくと良いかも

var r = obs.First();
foreach (var item in r) Console.WriteLine(item);

イベントベースのパターンの変換

残念なことに、Taskには組み込みの変換パターンがないので、TaskCompletionSourceを使って自前で作る必要があるようです。RxではAsyncSubjectを使って自前で用意するまでもなく、そもそもイベントのLinq化として売り出されたので、イベントベースのパターンの変換はお手の物です。見てみましょう。

var client = new WebClient();

Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
    .Select(e => e.EventArgs.Result)
    .Subscribe(s => Console.WriteLine(s));

client.DownloadStringAsync(new Uri("http://www.microsoft.com/"));
Console.ReadKey(); // 完了待ち

// こういう風に書くとリフレクションを使わないので軽くて望ましい、けど結構面倒くさい(ので事前に自動生成しておくといいよ)
var obs = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
    h => new DownloadStringCompletedEventHandler(h),
    h => client.DownloadStringCompleted += h,
    h => client.DownloadStringCompleted -= h);

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
var result = obs.Select(e => e.EventArgs.Result).First(); // 同期で待機&受け取り
Console.WriteLine(result);

FromEvent。実に美しい!Linq!Linq!おっと、興奮してしまった。とはいえ、stringでイベント名を指定するのも、h=>hoge+=の連打も、どちらも悲しくダサい。そこでT4 TemplateでFromEventに包んだのを一気に自動生成してしまうのをお薦めします。そのためのT4は以前に書きましたので、Reactive ExtensionsのFromEventをT4 Templateで自動生成する 是非どうぞ。割と頑張ったし便利だと思ったけれど、はてブ数がああああ。まあ、そんなわけで、Rxをガリガリ使う分には必需品です。

しかし、それで大丈夫か?

FromEventの後者の書き方は問題を残しています。DownloadStringAsyncの後にFirst。これは、危険です。どう危険かというと……。

// T4自動生成を使うとFromEventがこんなスッキリ!
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable();

// ところで、実行を開始した後にSubscribe(Firstもそうです)したら?
// それも、超速で非同期実行が完了したとしたら?
client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

// 次の値は(完了済みなので)永遠にやってこない、つまり永久フリーズ
var result = obs.Select(e => e.EventArgs.Result).First(); 

Firstだと同期で延々と待つのでフリーズ。Subscribeならフリーズはありませんが、結果がこないので意図した結果ではないでしょう。これは大変マズい。そんな時はTake-Prune-Connectパターン、なんてものはなく今思いつきました。今思いついたのでこれがベストなやり方なのか、ちょっとよく分からないのであとでForumとか見て調べておきます。挙動的には全然問題ない。

// Take(1).Prune and ConnectでAsyncSubjectっぽい挙動に変換
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable()
    .Select(e => e.EventArgs.Result) // Selectはどこに書いてもいいので自分がスッキリと思うところへ
    .Take(1)
    .Prune();
obs.Connect();

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

var result = obs.First(); // 大丈夫だ、問題ない
Console.WriteLine(result);

var result2 = obs.First(); // 何度でも取り出せる
Console.WriteLine(result2);

わけわかんなくなってきました?失望のため息が聞こえます。どうしたものかねえ、これ。PruneはキャッシュとしてAsyncSubjectを持って後続に渡します。また、値を流すタイミングを自由に調整出来ます(Connectしたら流す、それまでは値が来ていても堰止める)。今回はFromAsyncPatternをなぞらえるため、即座にConnectしました。やっていることは、上の方で出したAsyncSubjectのパターンのシミュレーションです。つまり、OnNextが一回来て、OnCompletedが来る。そうでないと、AsyncSubjectが完了しない。FromEventはそのままだと無限リスト状態で完了の状態がこないので、Take(1)で長さ1のReactiveシーケンスとする。こうすることで、後ろに非同期結果の値が流れ出します。

といったイミフな話はReactive Extensionsの非同期周りの解説と自前実装で少し、それと、それに関連してufcppさんが分かりやすいスライドにしてまとめてくれていますので、必見 => さて、WordPress になったところで再度、PowerPoint 貼り付けテスト « ++C++; // 未確認飛行 C ブログ。貼りつけテストという実に分かりにくいタイトルでサラッと流してしまうところが漢らしい(謎)

タスクの操作と構成

一つの非同期実行程度なら、APMだろうがイベントモデルだろうが、素のまま扱っても別にそこまで面倒なわけではない。Taskが、Rxが真価を発揮するのは複数の操作を行うとき。まずは、待機を。

Task<int> task1 = new Task<int>(() => ComputeSomething(0));
Task<int> task2 = new Task<int>(() => ComputeSomething(1));
Task<int> task3 = new Task<int>(() => ComputeSomething(2));

task1.Start(); task2.Start(); task3.Start(); // 実行しとかないと永遠待機しちゃうよ
task1.Wait();
Console.WriteLine("Task 1 is definitely done.");

Task.WaitAny(task2, task3); // どっちかが完了するまで待機
Console.WriteLine("Task 2 or task 3 is also done.");

Task.WaitAll(task1, task2, task3); // 全部完了するまで待機
Console.WriteLine("All tasks are done.");

// ---

// Observable.Startは即時実行だけど、ToAsyncはInvokeまで実行開始されない
var async1 = Observable.ToAsync(() => ComputeSomething(0));
var async2 = Observable.ToAsync(() => ComputeSomething(1));
var async3 = Observable.ToAsync(() => ComputeSomething(2));
var io1 = async1(); // Invokeってのはデリゲートのなので()でもおk
var io2 = async2();
var io3 = async3();

io1.Run(); // 引数なしRunで実行結果も受けずただの待機になる

// WaitAnyはどちらか先に完了したほうを1つだけ流す Merge().Take(1) して待機
io2.Merge(io3).Take(1).Run();

Observable.Concat(io1, io2, io3).Run(); // WaitAllは全部連結して待機
Observable.ForkJoin(io1, io2, io3).Run(); // こちらは並列実行で待機

複数を同時に走らせて待機がいとも簡単に。で、面白いのがRx。Rxは非同期特化というわけではないので直接的にアレとコレのどっちかが来るまで待ってね、なんていうメソッドはないのですが、豊富な結合系メソッドで余裕でシミュレート出来てしまいます。WaitAnyはMerge.Takeで。WaitAllはConcatで。素晴らしい。凄い。と同時に、若干パズル的な気がしなくもない。が、しかし、面白い。Reactiveモデルの何でもできるという底力を感じる。

継続・継続・継続

今までは待機してたという、おいおい、非同期でやってるのに同期かよ、って感じだったので本領発揮で非同期のままの流るような実行を。TaskではContinueWith、Rxでは、72通りあるから何を言えばいいのか。

// ほう、メソッドチェーンが生きたな
Task<IPAddress[]>.Factory.FromAsync(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null)
    .ContinueWith(t =>
    {
        foreach (var item in t.Result) // IPAddress[]なので。
        {
            Console.WriteLine(item);
        }
    });

// しかしRxはそれどころじゃない
Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com")
    .SelectMany(xs => xs) // xsはIPAddress[]、つまりIEnumerableとIObservableを区別なくバラしているという狂気の融合!
    .Subscribe(Console.WriteLine);

ContinueWithは、まあごく普通に結果が流れてきてるんだなー、程度。しかしRxのほうはヤバい。この場合のContinueWithに該当するのはSubscribeで、まあそれは普通なのですが、それよりしかし流れてくるIPAddress[]の[]がウザいので、Linq的に扱うならフラットにしたいよね。というわけで、IObservable<IPAddress[]>をSelectManyでIObservable<IPAddress>に変換しています。SelectManyはIObservableだろうとIEnumerableだろうと、平等にバラします。これは実にヤバい。狂気すら感じるパワー。皆も是非Rxを使ってこのヤバさを知って欲しい。

実行・待機

同時実行して、その結果を一辺に受けたい場合ってありますよね。そんな場合はForkJoinで。ForkJoinよく出てくるなあ。

string[] urls = new[] { "www.microsoft.com", "www.msdn.com" };
Task<IPAddress[]>[] tasks = new Task<IPAddress[]>[urls.Length];

for (int i = 0; i < urls.Length; i++)
{
    tasks[i] = Task<IPAddress[]>.Factory.FromAsync(
        Dns.BeginGetHostAddresses,
        Dns.EndGetHostAddresses,
        urls[i], null);
}

Task.WaitAll(tasks);

Console.WriteLine(
    "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
    tasks[0].Result.Length,
    tasks[1].Result.Length);


// WaitAll? ああ、ForkJoinで並行実行のことですか
Observable.ForkJoin(urls.Select(url =>
        Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)(url)))
    .Run(xs => Console.WriteLine(
        "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
        xs[0].Length, xs[1].Length));

そろそろマンネリ気味で疲れてきた。あ、最後にデッカイのがありますね。TaskではContinueWhenAllが初お目見え。でもRxでは別に変わらずForkJoinなんだよねえ。

// Task要の定義
static Task<string> DownloadStringAsTask(Uri address)
{
    TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
    WebClient client = new WebClient();
    client.DownloadStringCompleted += (sender, args) =>
    {
        if (args.Error != null) tcs.SetException(args.Error);
        else if (args.Cancelled) tcs.SetCanceled();
        else tcs.SetResult(args.Result);
    };
    client.DownloadStringAsync(address);
    return tcs.Task;
}

// Rx用の定義
public static IObservable<IEvent<DownloadStringCompletedEventArgs>> DownloadStringAsObservable(Uri address)
{
    var client = new WebClient();
    var con = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
            h => new System.Net.DownloadStringCompletedEventHandler(h),
            h => client.DownloadStringCompleted += h,
            h => client.DownloadStringCompleted -= h)
        .Take(1).Prune();
    con.Connect();
    client.DownloadStringAsync(address);
    return con;
}

static int CountParagraphs(string s)
{
    return Regex.Matches(s, "<p>").Count;
}

static void Main(string[] args)
{
    Task<string> page1Task = DownloadStringAsTask(new Uri("http://www.microsoft.com"));
    Task<string> page2Task = DownloadStringAsTask(new Uri("http://www.msdn.com"));

    Task<int> count1Task = page1Task.ContinueWith(t => CountParagraphs(t.Result));
    Task<int> count2Task = page2Task.ContinueWith(t => CountParagraphs(t.Result));

    /// 全てが完了したら、Actionを実行
    Task.Factory.ContinueWhenAll(new[] { count1Task, count2Task },
        tasks =>
        {
            // tasks引数使わないのね(笑)
            Console.WriteLine("<P> tags on microsoft.com: {0}", count1Task.Result);
            Console.WriteLine("<P> tags on msdn.com: {0}", count2Task.Result);
        });

    // Rxではこうなる
    Observable.ForkJoin(
            DownloadStringAsObservable(new Uri("http://www.microsoft.com")),
            DownloadStringAsObservable(new Uri("http://www.msdn.com")))
        .Select(xs => xs.Select(e => CountParagraphs(e.EventArgs.Result)).ToArray())
        .Subscribe(xs =>
        {
            Console.WriteLine("<P> tags on microsoft.com: {0}", xs[0]);
            Console.WriteLine("<P> tags on msdn.com: {0}", xs[1]);
        });

    Console.ReadKey(); // 非同期実行なので終了しないように
}

ふむ(何がふむだ)

非同期とUIスレッド

Dispatcher.BeginInvokeさようなら!ですね。ObserveOnはRxだけの専売特許じゃない。Taskにだってあるもん。

void Button_Click(object sender, RoutedEventArgs e)
{
    // ContinueWithの引数にTaskSchedulerを入れると非同期実行結果からUIを触れる
    TaskScheduler uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    DownloadStringAsTask(new Uri("http://www.microsoft.com"))
        .ContinueWith(t => { textBox1.Text = t.Result; }, uiTaskScheduler);

    // Rxではすっかりお馴染みなObserveOnで(WPFの場合は省略形としてObserveOnDispatcherがある)
    DownloadStringAsObservable(new Uri("http://www.microsoft.com"))
        .ObserveOnDispatcher()
        .Subscribe(ev => { textBox1.Text = ev.EventArgs.Result; });
}

真面目に、このObserveOnDispatcherは死ぬほど役立ちです。

まとめ

機能的には十分被る。そして、より汎用的なはずのRxが専用のはずのTaskでの処理を十分に代替出来てしまうという事実に驚く。Rx始まりすぎてる。今更言うのもアレですが、これは確実にヤバい。ビッグウェーブはもう既に来てる。乗り遅れてもいいんで乗ろう!

それにしてもMSDNマガジン素晴らしいなあ。この記事は実に入門向けに満遍なくも濃密で、素敵な時間を過ごせた。いやあ、一時期は機械翻訳のみになってたけれど、再び翻訳に戻って大変ありがたい。そして、そろそろRxもMSDNマガジンに記事が来てもいいのでは?もしくはMSKKの人がですね。どうでしょう。どうなんでしょう。どうなってるんでしょう。

あ、そういえば私は10月期のMicrosoft MVPに応募してたんですがそれはやっぱりダメだったよ。言うことを聞かないからね。これを見てるそこの君、以下略。来年の1月にretryしようかなー。それまでには記事増量、ってやること変わってないんじゃ結果は同じじゃないのというのががが。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive