Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理

Reactive Extensionsv1.1.11011 (Experimental Release)がリリースされました。リリース対象はExperimental(実験)版のみです。Stable(安定)版のほうは変更ありません。別件で少しコメントで質問したところ、近いうちにStable版の更新もあるかも、とのことでしたので、Stableはそちらを待ちましょう。リリース内容の詳細な解説はフォーラムにあります

今回の大きな追加は.NET Framework 4.5 Developer PreviewWinRTへの対応です。というわけで、WinRT関連では、WinRT用のスケジューラであったりイベントであったりへの対応とまぁまぁ想像つく普通のもの。あと非同期処理を他言語と結びつけるIAsyncOperationへの書き出し、などなどもサポートされるようですね。

そして.NET 4.5周りでは、C#5.0のAsyncサポート・クラスライブラリがTask中心に書き換わることが念頭に置かれ、大規模に変更が入っています。今後のRxの方針がよく見えますので、Experimentalではありますが注意深く観察してみる必要がありそうです。というわけで、しっかり紹介します。なお、以下の話は.NET 4.5のRxの話なので、.NET 4.0以前の場合では直接は関係なく、Obsoleteにもなっていません。が、将来フレームワークのバージョン上げたらObsolete祭りでモニョるのは覚悟が必要かしらん。もう一つ注意としては、あくまでExperimentalなので、将来的にもこのままかどうかは保証されません。現時点での話です。

FromAsyncPatternがObsolete

はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。

毎回ToObservableなんて面倒くさい、という人のためにSelectManyに限っては、Taskを受け取るオーバーロードが用意されているので、ToObservableは不要です。内部では予想つく通り、ただ単にToObservableしているだけですね。その他の合成系メソッド(MergeやSwitch)などは残念ながらというか当然というか、ToObservableしてください。

IObservableはAwaitable

IObservable<T>も非同期を扱うものなので、awaitできます。正確に言えばGetAwaiterが定義された、といったところでしょうか。

var req = WebRequest.Create("http://google.com/");
var response = await Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)();

基本的にRxの非同期はFromAsyncPatternを初めとして長さが1のものを扱っていますが、複数の値が流れる場合はどうなるかというと、挙動は「最後の値」です。正確に言えばAsyncSubjectが利用されているので、OnCompletedの直前のOnNextの値。ではEmpty(OnCompletedのみ)やNever(何もなし)ではどうなるのか、というと……

// 10(最後の値)
var a = await Observable.Range(1, 10);
            
// InvalidOperationException(シーケンスに値がない)
var b = await Observable.Empty<int>();
            
// ある意味フリーズ、ここで永遠に止まる
var c = await Observable.Never<int>();

となります。これだけ見るとNeverって使い道がイミフですが、何かとMergeする必要があるときに、マージ対象がないときはNeverを渡すなどなど、ライブラリに近い部分では結構使う場所あります。私の書いているReactivePropertyというライブラリでもそうして利用しています。

FirstなどがObsolete、かわりにFirstAsyncなどとWaitが追加

え?という感じですがObsoleteです。対象はFirst,Last,Single、それとForEachも。FirstOrDefaultなど、XxxDefault系も同様です。つまり同期的にブロックするタイプのものが全てObsolete行きになりました。代わりにFirstAsyncなどXxxAsyncが用意されています。それとawaitを組み合わせてください。

var source = Observable.Return("async?");

var value0 = await source; // LastAsyncと挙動は同じ
var value1 = await source.FirstAsync();
var value2 = await source.LastAsync();

長さ1と分かっている状況なら、何もなくそのままawaitでも良いのではかしらん。LastAsyncがそれに相当しますね。さて、しかしawaitのお陰で同期的「のように」書けるには違いないけれど、FirstやLastなどはブロックして「同期」な挙動を取っていたわけなので、単純にObsolete行きにされたら困ってしまいます。そこで、同期的に待機して最後の値を取り出すWaitメソッドが用意されました(これは.NET 4.0やSilverlightなどでも使える、新たに追加されたメソッドです)。

var source = Observable.Return("async?");

var value0 = source.Wait();
var value1 = source.FirstAsync().Wait();
var value2 = source.LastAsync().Wait();

ちなみにWaitの中身はLastです。Lastという名前から離して、同期的に待機して値を取り出す、と明示させたのですね。それはいいと思います。Waitはいい。Waitはいいんですが、FirstをObsoleteにしてFirstAsyncを追加するのは、正直気にいりません。私は反対です。

ターゲットフレームワーク間でコードが共有出来なくなる、IQbservableプロバイダに影響が出る、そもそも標準クエリ演算子から離れるのはどうよ。などなど。だいたい、AllやAny、Maxなどは長さ1のIObservable<T>を返すようになっていました。Firstなどだけです、同期的に待機して値を取り出す、という別の意味が与えられていたのは。だから、ここはFirstはObsoleteにせず、FirstAsyncの挙動、つまりIObservable<T>を返すように変更すればいいのです。同期待ちについては、Waitが搭載されたので心配無用です。これで、全ての挙動に統一が取れる。

唯一問題点を挙げれば、本当に本当に「破壊的変更」になるんですよね。それも、Stableとか銘打ったものへの影響も出る。メソッド名同じで戻り値が変わる。そういう変更を許せるものか。私は、許してしまってもいいと思うのですけれど。Firstを廃止してFirstAsyncを追加、などという歪な形を将来に残すよりかは、ずっといい。

なので、Forumにもそうコメント入れましたところ返答貰えました。「Stableリリースが存在する以上、XxxAsyncのままでいるしかない。これが不幸なことは同意しますけれど、暫くはこのままでいるしかない。」とのことでした。というわけで、Stableと銘打つのが早まったな…… としか言いようがなく。あの段階でここまで読めなかったのはしょうがないところ、と思いつつ、やはり手痛いミスかなあ。うーん、将来に渡っての完璧なAPIを作り上げるというのは実に難しい。

まあ、Firstを多用する(といっても単体テストや動作確認時ぐらいですけど)のは、値を一つ取り出したい、ということなので、await sourceかsource.Wait() で済む。FirstAsyncやLastAsyncを直接使うことは恐らく少なくて、ならば実際上の問題というのはそこまでないかもしれません。

長さ1の非同期処理の戻り値はTaskを選ぶべきか、Rxを選ぶべきか

メソッドを作るときの非同期処理の戻り値。これは、Taskを選ぶべきです。それは.NET標準と合わせるべきという理由からもそうですし、この.NET 4.5向けのRxの指針からしてそうなっています。長さ1のIObservableで非同期を表現する、というのは特殊だったと言わざるを得ないので、メソッドを作るとき、非同期処理の戻り値はTaskにしたほうが間違いなく良いでしょう。

ただ、アプリケーショに全体でRxによる合成を中心に置く場合は、ToObservableが面倒くさい、というだけじゃなく逆にオーバーヘッドになる可能性もあるので、IObservable中心にしたほうが良いでしょう。この辺は一概には言えずケースバイケースでしょうか。どちらにせよToObservable<->ToTaskで相互変換が可能なわけなので、あまりガチガチに捉える必要もないですけれど。

あと、あくまで.NET 4.5の話でasync/awaitが入るからTaskのほうが良いと言ってるのであって、.NET 4.0以前ならまた違う話です。というかその場合だとRx一択です。

ねぇ、Rxってもう要らない子?

時代の徒花でしたね、短い命だった……。

って、ちょっと待ったー。それはYESでもあり、NOでもあります。YESなのは、単純な形での非同期処理ならば、遥かに楽になりますし、それに何よりもRxを通すよりもパフォーマンスは良いと思われるので、むしろasync/awaitを使うべきです。具体的には、SelectManyしてSubscribeするだけ、あとCatchで少し例外処理、みたいなコードなら、もう全面的にRxさようならでいいでしょう。

でも往々にしてそういう処理だけじゃないよね?以前にも紹介しましたがSwitch(新しい処理が入ったら以前の非同期処理はキャンセルして新しい処理のみを後続に流す)などを手書きせず演算子一つにパッケージ化できることや、全体的にTaskのメソッド群よりも合成や待ち合わせが容易に記述できる、などなど。そして、「複数の戻り値のある非同期処理、例えばStreamのBeginReadは細切れにbyte[]が得られますが、それを複数回分の非同期処理をまとめてシーケンスとして、IObservable<byte[]>としてまとめることは現状ではRxしかできません。

// 複数回のBeginReadをRx+Asyncで一つにまとめる例
static IObservable<byte[]> ReadMultipleAsync(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(async observer =>
    {
        try
        {
            while (true)
            {
                var buffer = new byte[bufferSize];
                var readCount = await stream.ReadAsync(buffer, 0, bufferSize);

                if (readCount == 0) break;
                if (readCount != bufferSize)
                {
                    var newBuffer = new byte[readCount];
                    Array.Copy(buffer, newBuffer, readCount);
                    buffer = newBuffer;
                }

                observer.OnNext(buffer); // yield returnのノリで書く
            }
            observer.OnCompleted(); // 完了合図と
        }
        catch (Exception ex)
        {
            observer.OnError(ex); // 例外は自前で明示的に
        }
    });
}

ExperimentalリリースではObservable.Createがasync/await対応しているので、擬似的なyield returnとして、非同期での列挙をそこそこ簡単に記述することができます。OnErrorとOnCompletedは自前で管理する必要がありますけれど。

なので、メソッド単体で分けた場合の戻り値は「長さ1」ならTask、複数ならIObservable。それらメソッドを使って非同期処理を組み上げる時は、単純ならawaitのみ、複雑ならRx。というのが使い分けの指針です。とはいえ、使い分けっていうのは幻想に近くて、実際はどっちか一つになりがちだとは思っています。そして、それならTask中心になるでしょうねえ、とも。非同期における大抵のシチュエーションでRxがサヨウナラ気味になるのはしかたのない話です。ぶっちゃけSelectManyしてSubscribeがほとんどだし、それ以外のことだって、同期的のように書けるのなら、いくらでもやりようはありますから。

まあ、未来を待たなくても今使える解としてなら十分ですし、それ自体がawait可能なので、今書いたコードは将来に渡っても無駄にはなりません。FromAsyncPatternがObsoleteというのは、まあ単純に.NET4.5本来のXxxAsyncに置き換えればいいというだけなので無駄になる、とは言わないでしょう。

けれど、それだけじゃあ、すごく後ろ向きで、寂しいよね。Rx自体の持つ力というのは、別に非同期に限らない。ただの幾つもある側面のうちの一つにすぎない。そこで出した私の答えがReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリです。ReactivePropertyはC#5.0によってプレゼンスが低下するReactive Extensionsに新たな価値をもたらしたいという危機感から作ったものだったりします(後付け、じゃなくてこれは本当の話です、次の一手を指すならRx自体に注目の集まっている今しかない、とも)

ReactivePropertyを通して見ると、非同期だけではない、Rxの持つポテンシャルがよく分かるのではないでしょうか?Rxの真の強みはイベント単独や非同期単独ではなくて、それらが、ただの配列も含めて、統一的に扱える、だから全て一本のストリームになって合成処理が自由自在。というOrchestrateな部分にある。ReactivePropertyはそれを全面的に押し出してます(半強制的に全てが一本に繋がるようになってる)

Rxは、この先も力強く存在し続けるので、学習する価値は間違いなくありますよ!

まとめ

相変わらずフリーダムな変更が続いているRxですが、あくまでExperimental版の話です。Stableでは、こんなバカバカと変わっていくことはないので、安心して使えばいいです。あと、Experimentalなのでまだまだ変動の余地はある、と思いますので、意見あればForumで直接言っておくとよさそう。私も、何やらかんやらと意見言っておきました(昔は書くのにビビッてたのに、今は平然と書いててアレです、慣れですね、ようするに)。

ところでRxは.NET 4.5に標準搭載されるのか否か、ですが、なんかまだまだ全然色々と変更や模索する気満々なようなので、この様子だと、仮にそういう話があったとしても間に合わなさそうという点で、標準搭載はなさそうですねー。ほぼ完成してるのに標準入りしない、とかだと悲しいんですが、そういう形で標準搭載見送り、ならばむしろ喜ばしい話なので、いいかなー、と思います。

それにしてもRx本は出すタイミング難しいですねえ。今回の変更は非同期周りの話がガラッと変わってきちゃうわけなので。また延期かしらねえ。さすがにそれはないか。ところで私もLINQ + Rx本をオライリーから出したいです(←ただたんに表紙をデンキウナギ(Rxのロゴはピンクのデンキウナギ)にしてウナギ本と呼ばれたいという一点だけの話なので間に受けないでください)

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive