Rx Christmas Release 2010によるPublishの変化解説

メリークルシミマス。Happyなことに、Reactive Extensions for .NET (Rx)が更新されました。なんと、WP7に標準搭載されたことだし安定性・互換性はある程度保証されてきたよねー、とか思った側から完全に互換を崩壊させる素晴らしいBreaking Changesをかましてきました!常識では計り知れない大胆な所業。そこにしびれるあこがれれぅ。

さて、今回の変化はJoin/GroupJoin/Windowの新規搭載とPublish系の変更です。Joinについてはまた後日ということで、今回はPublishの変更について解説します。で、ですねえ、これはもう100%変わってます。昨日ちょうどPublishによる分配、とか書いたわけですが、出したコードは全部動かないです!素晴らしいタイミング!Ouch!

端的に言えばPublishは引数にISubjectを受け取るよう変化。Prune/ReplayはPublishに統合されたことにより消滅。何でかというと、元々、分配ソースにSubjectを使うのがPublish、BehaviorSubjectを使うのが初期値付きPublish、AsyncSubjectを使うのがPrune、ReplaySubjectを使うのがReplayでした。分配ソースを任意で受け取るようになったため、メソッドが分かれる意味がなくなったというわけですね。

ナンタラSubjectの解説は、まあまた後日にでも。簡単に言えばSubjectは普通のイベントと同じで素直な挙動、つまりOnNextしたら値が流れる。AsyncSubjectは非同期を模していて、OnNextされたら値を一つだけキャッシュし、OnCompletedされたらキャッシュした値を流す。以降はSubscribeされるたびにキャッシュした値を即座に流し、OnCompletedも発行。ReplaySubjectはOnNextされる度に値をキャッシュし、それを流す。Subscribeされるとキャッシュした値を全て即座に流す。BehaviorSubjectはOnNextされる度に最新の値一つのみをキャッシュし、それを流す。Subscribeされるとキャッシュした値を即座に流す。

イマイチ分かりづらいですね:) というわけで本当に詳しいことは後日で。AsyncSubjectだけは、今までにも、そういう挙動であることの意味とかしつこく書いてきましたが。

そして、IConnectableObservableが戻り値となるものはMulticastというメソッド名になりました。引数は勿論、ISubjectを受け取るという形に。では、昨日のコードで例を。新旧比較ということで。

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(new Subject<int>(), xs => xs.Min().Zip(xs.Max(), (min, max) => new { min, max }))
    .Subscribe(Console.WriteLine);

Publishの第一引数にSubjectを突っ込みました。第二引数にはそれで分配されたIObservableが渡ってくるという塩梅です。

var input = Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Multicast(new Subject<int>());

input.Min().Zip(input.Max(), (min, max) => new { min, max })
    .Subscribe(Console.WriteLine);

input.Connect();

こちらがMulticastです。まあ、普通にISubjectを受け取るようになったというだけで、今までのPublishでIConnectableObservableが返ってくるのと同じです。さて、ISubjectを受け取るということは、外部のISubjectを渡してもOKです。新しくこんな分配が可能になりました。

var input = new ReplaySubject<int>();

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(input) // 流れてくる値をReplaySubjectに保存する
    .Max()
    .Subscribe(i => Console.WriteLine("Max:" + i));

var inputArray = input.ToEnumerable().ToArray(); // 入力値を配列に変換

といっても、この例は全く意味がなくて、Max()で止めておいて、その戻り値をinputとして受ければいいだけの話なのですが。上手い利用例が浮かばなかったのです!まあ、色々と応用しどころというのは生まれてくるのではないかと思います。

まとめ

この破壊的変更ですが、私としては好意的に捉えたいです。如何せんPruneというメソッド名は意味不明でしたし、オーバーロードが8つもあるという状況も良くなかった。今回整理されたことで、使いやすくなったと思います。が、しかし、WP7版との互換性が切れてしまったのは相当痛い。今までusingをプリプロセッサディレクティブで#if WINDOWS_PHONE using Microsoft.Phone.Reactive という感じに切り替えてWPF/SL/WP7で互換を取っていたのですが、ここまで派手に互換性なくなるとなあ。

ちなみにWP7標準搭載"ではない"DevLabs版のRx for WP7というのも用意されているので、そちらを使えばいいわけなのですが、それはそれでどうかなー、どうかなー、困った話で。

それと、この変更はまだ追随されていませんが、そのうち System.Interactive(Ix.NET/EnumerableEx) や RxJS にも派生してくるような気がするので、再びAPI安定していない状態に戻った感がありますねー。要チェックで要注意で。Rxチームは大変なクリスマスプレゼントを贈ってきました。そして、Rxが.NET 4 SP1に入るの?入らないの?的な希望観測もあったわけですが、私個人の印象としては、SP1入りな可能性はなくなったな、という気がしてます。まだまだ作り替える気満々だもの、これ。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive