Rxを使って非同期プログラミングを簡単に

こないだ公開されたMSDNマガジンの記事、非同期タスク - タスクを使って非同期プログラミングを簡単に。おお、これは分かりやすく非同期周りについて網羅されてる!あと、私はTask全然知らないので初歩から入る導入はお役立ちです。いやまあ、実際のとこTask周りの導入記事っていっぱいあるのに、未だにお役立ち、とか言ってるのもどうかと思わなくもないところではあるんですが不勉強なもので。

同期処理でUIをブロックしてしまう、スレッドプールに投げればいいぢゃない、イベントベースのパターンとAPM(IAsyncResultを使うAsynchronous Programming Model)、そしてTask。おお、全部ですね。全部、全部?そう、何か欠けてます、ええ、Reactive Extensionsが欠けています。というわけで、Taskと対比させながらRxでのコードを見ていきましょう。

非同期実行、そして待機

MSDNマガジンでは真ん中辺りからのタスクのコードを、Rxでのコードと一緒に並べてみます。

// タスクパターン
Task<double> task = Task.Factory.StartNew(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("The task is running asynchronously...");
task.Wait(); // 実行完了まで待機
Console.WriteLine("The task computed: {0}", task.Result);

// Reactive Extensions
var obs = Observable.Start(() =>
{
    double result = 0;
    for (int i = 0; i < 10000000; i++)
        result += Math.Sqrt(i);
    return result;
});

Console.WriteLine("Observable.Start非同期実行中");
var r = obs.First(); // 結果が返るまで待機
Console.WriteLine("完了 : {0}", r);

// 余談:タスクはIObservableに変換出来たりする
task.ToObservable().Run(Console.WriteLine);

どちらもデフォルトではThreadPoolで非同期を実行します。ThreadPoolと違うのは、待機するのも戻り値を取り出すのも簡単。Rxでは長さ1のReactiveシーケンスとして扱われるので、Firstを使うと同期的にブロックして値を取り出せます。ここだけを見ると、Wait() + task.ResultなTaskより扱いやすいのではないかと思います。また、両者ともに似ているので、TaskからIObservable<T>への変換も容易です。System.Reactive.dllを読みこめば、Taskに対してToObservableメソッドが追加され、簡単に変換することが出来ます。

自由な変換

汎用的に非同期処理をTaskに、Rxに変換しよう。TaskにはTaskCompletionSourceが、RxにはAsyncSubjectがあります。

// Construct a TaskCompletionSource and get its 
// associated Task
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
Task<int> task = tcs.Task;

// Asynchronously, call SetResult on TaskCompletionSource
ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // Do something
    tcs.SetResult(123);
});

Console.WriteLine("The operation is executing asynchronously...");
task.Wait();

// And get the result that was placed into the task by 
// the TaskCompletionSource
Console.WriteLine("The task computed: {0}", task.Result);

// ---

// TaskCompletionSourceは、RxではAsyncSubjectと対比させられる
// AsyncSubjectはRxでの非同期表現を自前で実装する場合に使う(Rx内部でも当然使われている)
var async = new AsyncSubject<int>();

ThreadPool.QueueUserWorkItem(_ =>
{
    Thread.Sleep(1000); // 何か重い処理をしてたとする
    async.OnNext(123); // 値のセット
    async.OnCompleted(); // 値を確定し非同期実行完了
});

Console.WriteLine("重い処理を非同期で実行中...");
var r = async.First(); // 同期的に結果を待機し取得
Console.WriteLine("処理完了:{0}", r);

こちらもまた、両者ともに実によく似ています。何らかの任意の非同期処理は、使いやすいようにRxに包んでしまうと素敵な気分になれる。

IAsyncResultパターンの変換

IAsyncResultパターン。Rxの辺りでは、というか多分.NET周りで言う分には、Asynchronous Programming Model、略してAPMと呼ぶそうです。私がその言葉を見たのは、プログラミングMicrosoft .NET Framework 第2版 (マイクロソフト公式解説書)でした。Richterはリッチャーと呼ぶべきかリヒターと呼ぶべきなのか謎という話ががが。この本は.NET3種の良書のうちの一冊だと思うので(もう一冊は.NETのクラスライブラリ設計 開発チーム直伝の設計原則、コーディング標準、パターン (Microsoft.net Development Series)、もう一冊は未定というか将来のために取っておくというか)だと思うので未読の人は是非是非。と思ったら絶版じゃないですか!いや、amazonで偶然品切れなだけかもしれませんが、どうなんでしょう。これが手に入らないのは損失です!海外では既に.NET 4に対応したThird Editionが出ています。ということは、であり、風のウワサによると―― らしいですので、まあ、ですね!

今読み返したら20ページほど費やされて色々書いてありました。ぶっちゃけ面倒くさいと思って半分以上流し読みしてたという事実に気づいてしまったり。おおぉ。まあ、その辺はThird Editionの時に拾い直せば……。そんなわけで、その面倒くささを緩和するFromAsync/FromAsyncPatternをどうぞ。

// FromAsyncで包むとその場で実行
Task<IPAddress[]> task = Task<IPAddress[]>.Factory.FromAsync(
    Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null);

task.Wait();
foreach (var item in task.Result) Console.WriteLine(item);

// RxのFromAsyncPatternの型指定は引数と戻り値の二つを指定する
// FromAsyncPatternで包んだら即実行ではなく、funcをInvokeするまでは開始されない
var obs = Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com"); // 即実行なら変数に代入せずメソッドチェーン、実行を遅らせたい場合はfuncで持っておくと良いかも

var r = obs.First();
foreach (var item in r) Console.WriteLine(item);

イベントベースのパターンの変換

残念なことに、Taskには組み込みの変換パターンがないので、TaskCompletionSourceを使って自前で作る必要があるようです。RxではAsyncSubjectを使って自前で用意するまでもなく、そもそもイベントのLinq化として売り出されたので、イベントベースのパターンの変換はお手の物です。見てみましょう。

var client = new WebClient();

Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
    .Select(e => e.EventArgs.Result)
    .Subscribe(s => Console.WriteLine(s));

client.DownloadStringAsync(new Uri("http://www.microsoft.com/"));
Console.ReadKey(); // 完了待ち

// こういう風に書くとリフレクションを使わないので軽くて望ましい、けど結構面倒くさい(ので事前に自動生成しておくといいよ)
var obs = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
    h => new DownloadStringCompletedEventHandler(h),
    h => client.DownloadStringCompleted += h,
    h => client.DownloadStringCompleted -= h);

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
var result = obs.Select(e => e.EventArgs.Result).First(); // 同期で待機&受け取り
Console.WriteLine(result);

FromEvent。実に美しい!Linq!Linq!おっと、興奮してしまった。とはいえ、stringでイベント名を指定するのも、h=>hoge+=の連打も、どちらも悲しくダサい。そこでT4 TemplateでFromEventに包んだのを一気に自動生成してしまうのをお薦めします。そのためのT4は以前に書きましたので、Reactive ExtensionsのFromEventをT4 Templateで自動生成する 是非どうぞ。割と頑張ったし便利だと思ったけれど、はてブ数がああああ。まあ、そんなわけで、Rxをガリガリ使う分には必需品です。

しかし、それで大丈夫か?

FromEventの後者の書き方は問題を残しています。DownloadStringAsyncの後にFirst。これは、危険です。どう危険かというと……。

// T4自動生成を使うとFromEventがこんなスッキリ!
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable();

// ところで、実行を開始した後にSubscribe(Firstもそうです)したら?
// それも、超速で非同期実行が完了したとしたら?
client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

// 次の値は(完了済みなので)永遠にやってこない、つまり永久フリーズ
var result = obs.Select(e => e.EventArgs.Result).First(); 

Firstだと同期で延々と待つのでフリーズ。Subscribeならフリーズはありませんが、結果がこないので意図した結果ではないでしょう。これは大変マズい。そんな時はTake-Prune-Connectパターン、なんてものはなく今思いつきました。今思いついたのでこれがベストなやり方なのか、ちょっとよく分からないのであとでForumとか見て調べておきます。挙動的には全然問題ない。

// Take(1).Prune and ConnectでAsyncSubjectっぽい挙動に変換
var client = new WebClient();
var obs = client.DownloadStringCompletedAsObservable()
    .Select(e => e.EventArgs.Result) // Selectはどこに書いてもいいので自分がスッキリと思うところへ
    .Take(1)
    .Prune();
obs.Connect();

client.DownloadStringAsync(new Uri("http://www.bing.com/"));
Thread.Sleep(5000); // ダウンロードが完了した後にSubscribeする、をシミュレート

var result = obs.First(); // 大丈夫だ、問題ない
Console.WriteLine(result);

var result2 = obs.First(); // 何度でも取り出せる
Console.WriteLine(result2);

わけわかんなくなってきました?失望のため息が聞こえます。どうしたものかねえ、これ。PruneはキャッシュとしてAsyncSubjectを持って後続に渡します。また、値を流すタイミングを自由に調整出来ます(Connectしたら流す、それまでは値が来ていても堰止める)。今回はFromAsyncPatternをなぞらえるため、即座にConnectしました。やっていることは、上の方で出したAsyncSubjectのパターンのシミュレーションです。つまり、OnNextが一回来て、OnCompletedが来る。そうでないと、AsyncSubjectが完了しない。FromEventはそのままだと無限リスト状態で完了の状態がこないので、Take(1)で長さ1のReactiveシーケンスとする。こうすることで、後ろに非同期結果の値が流れ出します。

といったイミフな話はReactive Extensionsの非同期周りの解説と自前実装で少し、それと、それに関連してufcppさんが分かりやすいスライドにしてまとめてくれていますので、必見 => さて、WordPress になったところで再度、PowerPoint 貼り付けテスト « ++C++; // 未確認飛行 C ブログ。貼りつけテストという実に分かりにくいタイトルでサラッと流してしまうところが漢らしい(謎)

タスクの操作と構成

一つの非同期実行程度なら、APMだろうがイベントモデルだろうが、素のまま扱っても別にそこまで面倒なわけではない。Taskが、Rxが真価を発揮するのは複数の操作を行うとき。まずは、待機を。

Task<int> task1 = new Task<int>(() => ComputeSomething(0));
Task<int> task2 = new Task<int>(() => ComputeSomething(1));
Task<int> task3 = new Task<int>(() => ComputeSomething(2));

task1.Start(); task2.Start(); task3.Start(); // 実行しとかないと永遠待機しちゃうよ
task1.Wait();
Console.WriteLine("Task 1 is definitely done.");

Task.WaitAny(task2, task3); // どっちかが完了するまで待機
Console.WriteLine("Task 2 or task 3 is also done.");

Task.WaitAll(task1, task2, task3); // 全部完了するまで待機
Console.WriteLine("All tasks are done.");

// ---

// Observable.Startは即時実行だけど、ToAsyncはInvokeまで実行開始されない
var async1 = Observable.ToAsync(() => ComputeSomething(0));
var async2 = Observable.ToAsync(() => ComputeSomething(1));
var async3 = Observable.ToAsync(() => ComputeSomething(2));
var io1 = async1(); // Invokeってのはデリゲートのなので()でもおk
var io2 = async2();
var io3 = async3();

io1.Run(); // 引数なしRunで実行結果も受けずただの待機になる

// WaitAnyはどちらか先に完了したほうを1つだけ流す Merge().Take(1) して待機
io2.Merge(io3).Take(1).Run();

Observable.Concat(io1, io2, io3).Run(); // WaitAllは全部連結して待機
Observable.ForkJoin(io1, io2, io3).Run(); // こちらは並列実行で待機

複数を同時に走らせて待機がいとも簡単に。で、面白いのがRx。Rxは非同期特化というわけではないので直接的にアレとコレのどっちかが来るまで待ってね、なんていうメソッドはないのですが、豊富な結合系メソッドで余裕でシミュレート出来てしまいます。WaitAnyはMerge.Takeで。WaitAllはConcatで。素晴らしい。凄い。と同時に、若干パズル的な気がしなくもない。が、しかし、面白い。Reactiveモデルの何でもできるという底力を感じる。

継続・継続・継続

今までは待機してたという、おいおい、非同期でやってるのに同期かよ、って感じだったので本領発揮で非同期のままの流るような実行を。TaskではContinueWith、Rxでは、72通りあるから何を言えばいいのか。

// ほう、メソッドチェーンが生きたな
Task<IPAddress[]>.Factory.FromAsync(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses, "www.microsoft.com", null)
    .ContinueWith(t =>
    {
        foreach (var item in t.Result) // IPAddress[]なので。
        {
            Console.WriteLine(item);
        }
    });

// しかしRxはそれどころじゃない
Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)
    .Invoke("www.microsoft.com")
    .SelectMany(xs => xs) // xsはIPAddress[]、つまりIEnumerableとIObservableを区別なくバラしているという狂気の融合!
    .Subscribe(Console.WriteLine);

ContinueWithは、まあごく普通に結果が流れてきてるんだなー、程度。しかしRxのほうはヤバい。この場合のContinueWithに該当するのはSubscribeで、まあそれは普通なのですが、それよりしかし流れてくるIPAddress[]の[]がウザいので、Linq的に扱うならフラットにしたいよね。というわけで、IObservable<IPAddress[]>をSelectManyでIObservable<IPAddress>に変換しています。SelectManyはIObservableだろうとIEnumerableだろうと、平等にバラします。これは実にヤバい。狂気すら感じるパワー。皆も是非Rxを使ってこのヤバさを知って欲しい。

実行・待機

同時実行して、その結果を一辺に受けたい場合ってありますよね。そんな場合はForkJoinで。ForkJoinよく出てくるなあ。

string[] urls = new[] { "www.microsoft.com", "www.msdn.com" };
Task<IPAddress[]>[] tasks = new Task<IPAddress[]>[urls.Length];

for (int i = 0; i < urls.Length; i++)
{
    tasks[i] = Task<IPAddress[]>.Factory.FromAsync(
        Dns.BeginGetHostAddresses,
        Dns.EndGetHostAddresses,
        urls[i], null);
}

Task.WaitAll(tasks);

Console.WriteLine(
    "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
    tasks[0].Result.Length,
    tasks[1].Result.Length);


// WaitAll? ああ、ForkJoinで並行実行のことですか
Observable.ForkJoin(urls.Select(url =>
        Observable.FromAsyncPattern<string, IPAddress[]>(Dns.BeginGetHostAddresses, Dns.EndGetHostAddresses)(url)))
    .Run(xs => Console.WriteLine(
        "microsoft.com resolves to {0} IP addresses. msdn.com resolves to {1}",
        xs[0].Length, xs[1].Length));

そろそろマンネリ気味で疲れてきた。あ、最後にデッカイのがありますね。TaskではContinueWhenAllが初お目見え。でもRxでは別に変わらずForkJoinなんだよねえ。

// Task要の定義
static Task<string> DownloadStringAsTask(Uri address)
{
    TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
    WebClient client = new WebClient();
    client.DownloadStringCompleted += (sender, args) =>
    {
        if (args.Error != null) tcs.SetException(args.Error);
        else if (args.Cancelled) tcs.SetCanceled();
        else tcs.SetResult(args.Result);
    };
    client.DownloadStringAsync(address);
    return tcs.Task;
}

// Rx用の定義
public static IObservable<IEvent<DownloadStringCompletedEventArgs>> DownloadStringAsObservable(Uri address)
{
    var client = new WebClient();
    var con = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
            h => new System.Net.DownloadStringCompletedEventHandler(h),
            h => client.DownloadStringCompleted += h,
            h => client.DownloadStringCompleted -= h)
        .Take(1).Prune();
    con.Connect();
    client.DownloadStringAsync(address);
    return con;
}

static int CountParagraphs(string s)
{
    return Regex.Matches(s, "<p>").Count;
}

static void Main(string[] args)
{
    Task<string> page1Task = DownloadStringAsTask(new Uri("http://www.microsoft.com"));
    Task<string> page2Task = DownloadStringAsTask(new Uri("http://www.msdn.com"));

    Task<int> count1Task = page1Task.ContinueWith(t => CountParagraphs(t.Result));
    Task<int> count2Task = page2Task.ContinueWith(t => CountParagraphs(t.Result));

    /// 全てが完了したら、Actionを実行
    Task.Factory.ContinueWhenAll(new[] { count1Task, count2Task },
        tasks =>
        {
            // tasks引数使わないのね(笑)
            Console.WriteLine("<P> tags on microsoft.com: {0}", count1Task.Result);
            Console.WriteLine("<P> tags on msdn.com: {0}", count2Task.Result);
        });

    // Rxではこうなる
    Observable.ForkJoin(
            DownloadStringAsObservable(new Uri("http://www.microsoft.com")),
            DownloadStringAsObservable(new Uri("http://www.msdn.com")))
        .Select(xs => xs.Select(e => CountParagraphs(e.EventArgs.Result)).ToArray())
        .Subscribe(xs =>
        {
            Console.WriteLine("<P> tags on microsoft.com: {0}", xs[0]);
            Console.WriteLine("<P> tags on msdn.com: {0}", xs[1]);
        });

    Console.ReadKey(); // 非同期実行なので終了しないように
}

ふむ(何がふむだ)

非同期とUIスレッド

Dispatcher.BeginInvokeさようなら!ですね。ObserveOnはRxだけの専売特許じゃない。Taskにだってあるもん。

void Button_Click(object sender, RoutedEventArgs e)
{
    // ContinueWithの引数にTaskSchedulerを入れると非同期実行結果からUIを触れる
    TaskScheduler uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
    DownloadStringAsTask(new Uri("http://www.microsoft.com"))
        .ContinueWith(t => { textBox1.Text = t.Result; }, uiTaskScheduler);

    // Rxではすっかりお馴染みなObserveOnで(WPFの場合は省略形としてObserveOnDispatcherがある)
    DownloadStringAsObservable(new Uri("http://www.microsoft.com"))
        .ObserveOnDispatcher()
        .Subscribe(ev => { textBox1.Text = ev.EventArgs.Result; });
}

真面目に、このObserveOnDispatcherは死ぬほど役立ちです。

まとめ

機能的には十分被る。そして、より汎用的なはずのRxが専用のはずのTaskでの処理を十分に代替出来てしまうという事実に驚く。Rx始まりすぎてる。今更言うのもアレですが、これは確実にヤバい。ビッグウェーブはもう既に来てる。乗り遅れてもいいんで乗ろう!

それにしてもMSDNマガジン素晴らしいなあ。この記事は実に入門向けに満遍なくも濃密で、素敵な時間を過ごせた。いやあ、一時期は機械翻訳のみになってたけれど、再び翻訳に戻って大変ありがたい。そして、そろそろRxもMSDNマガジンに記事が来てもいいのでは?もしくはMSKKの人がですね。どうでしょう。どうなんでしょう。どうなってるんでしょう。

あ、そういえば私は10月期のMicrosoft MVPに応募してたんですがそれはやっぱりダメだったよ。言うことを聞かないからね。これを見てるそこの君、以下略。来年の1月にretryしようかなー。それまでには記事増量、ってやること変わってないんじゃ結果は同じじゃないのというのががが。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive