R3 - C#用のReactive Extensionsの新しい現代的再実装

先日、新しいC#用のReactive Extensionsの実装としてR3を正式公開しました!R3はRx for .NETを第一世代、UniRxを第二世代とした場合の、第三世代のRxという意味で命名しています。Rxとしてのコア部分(ほぼdotnet/reactiveと同様)は.NET共通のライブラリとして提供し、各プラットフォーム特化のカスタムスケジューラーやオペレーターは別ライブラリに分けるという形により、全ての.NETプラットフォーム向けのコアライブラリと、各種フレームワーク Unity, Godot, Avalonia, WPF, WinForms, WinUI3, Stride, LogicLooper, MAUI, MonoGame 向けの拡張ライブラリを提供しています。

幾つかの破壊的変更を含むため、ドロップインリプレースメントではないですが、dotnet/reactiveやUniRxからの移行も現実的に可能な範囲に収めてあります。この辺は語彙や操作がLINQ的に共通化されているというRxの良いところで、そこのところは大きく変わりはありません。思ったよりも何も変わっていない、といったような印象すら抱けるかもしれませんが、そう思っていただければ、それはそれでR3の設計としては大成功ということになります。

なので基本的なところはRxですし、使えるところも変わりないです。よって、押さえておくべきことは、なぜ今R3という新たな実装が必要になったかということと、Rx for .NET, UniRxとの違いはどこかということです。(新規の人は何も考えず使ってください……!)

機能とか移行とかの話は、toRisouPさんにより既に優れた記事が上がっているので、今回は概念的なところを中心に紹介します……!

Rxの歴史と vs async/await

Rx使ってますか?という問いに、使ってません、と答える人も増えてきました。別にこれは.NETやUnityだけの話ではなく、JavaでもSwiftでもKotlinでも。明らかにプレゼンスが低下しています。なぜか?というと、それはもう簡単です。async/awaitが登場したから。.NETのReactive Extensionsが初登場したのは2009年。C# 3.0, .NET Framework 3.5の頃であり、対応プラットフォームもSilverlightやWindows Phoneといった、今はもう消滅したプラットフォームも並んでくるような時代。もちろん、async/await(初登場はC# 5.0, 2012年)も存在していません。まだTaskすら導入されていなかった頃です。余談ですがReactive Extensionsの"Extensions"は、先行して開発されていたParallel Extensions(Parallel LINQやTask Parallel Library, .NET Framework 4.0で追加された)から名前が取られたとされています。

Rxは、まず、言語サポートのない場合の非同期処理の決定版として、あらゆる言語に普及し一世を風靡しました。単機能なTaskやPromiseよりも、豊富なオペレーターを備えたRxのほうが使いやすいし遥かに強力!私も当時はTPLいらね、とRxに夢中になったものです。しかしasync/awaitが言語に追加されて以降の結果はご存じの通り。async/awaitこそが非同期処理の決定版として、これまたC#からあらゆる言語に普及し、非同期処理におけるスタンダードとなりました。(ちなみにF#こそが発祥だって言う人もいますが、国内海外問わず当時のF#コミュニティのC# async/awaitへの反発と難癖の数々はよーく覚えているので、あ、そうですか、ぐらいの感じです。awaitないしね)

async/awaitが普及したことにより、とりあえず非同期処理のためにRxを入れるという需要はなくなり、Rxの採用率は下がっていったのであった。UnityにおいてのRxのスタンダードであったUniRxの開発者である私も、別にそれに固執することはなく、むしろゲームエンジン(Unity)に特化したasync/awaitランタイムが必要であると素早く認知し、Unityにおいて必要な条件(C# 7.0)が揃ったタイミングで即座にUniTaskを開発し、今ではUniTaskは絶対に入れるけどUniRxは入れない、といった開発者も増えてきました。そしてそれは悪いことではなく、むしろ正しい感覚であると思います。

Rxの価値の再発見

そもそもRxって別に非同期処理のためだけのシステムではないですよね?LINQ to Everythingではあったけれど、むしろEverythingというのはノイズで、分離するものは分離したほうがいい、最適なものはそれを使ったほうがいい。Rxを非同期処理のために使うべきではないし、長さ1のObservableはTaskで表現したほうが、分かりやすさにおいてもパフォーマンスにおいても利点がある。そうなるとRxにはasync/awaitと統合されたAPIが必要で、それはObservableはモナドだからSelectManyにTaskを渡せることもできるだとか、そんなどうでもいいことではない。真剣にasync/awaitと共存するRxを考えてみると、手を加えなければならないAPIは多数ある。

単純にawaitできるだけでは現実のアプリケーション開発には少し足りない。そこで非同期/並列処理に関しては様々なライブラリが考案されてきました、RxだけではなくTPL Dataflowなど色々ありましたが、それらを好んで今から使おうとする人もいないでしょう。そして今は2024年、勝者は決まりました。言語サポートのIAsyncEnumerableSystem.Threading.Channelsがベストです。また、これらはバックプレッシャーの性質も内包しているため、RxJavaなどにあるバックプレッシャーに関するオペレーターは.NETには不要でしょう。もう少し具体的なI/Oに関する処理が必要ならSystem.IO.Pipelinesを選べば、最大のパフォーマンスを発揮できます。

非同期LINQはあってもいいけれど、実際の非同期ストリームのシナリオからするとLINQ to Objectsと違い利用頻度も少ないので、別に積極的に導入したいというほどの代物ではない(なお、これは私はUniTaskにUniTaskAsyncEnumerableとLINQを自分で実装して提供している上での発言です)。Rxの夢の一つとして分散クエリ(IQbservable)がありましたが、それも、現代での勝者はGraphQLになるでしょう。分散システムという点ではKubernetesが普及し、RPCとしてはgRPCがスタンダードとして君臨し、Orleans, Akka.NET, SignalR, MagicOnionといったような選択肢のバリエーションもあります。

今は様々なテクノロジーが覇権を争った2009年ではない。現代でService Fabricを選ぶ人などいないように、今からそこに乗り出して勝ち筋を見出すのは難しい。そうした分散処理に進むことはRxの未来ではない。と、私は考えています。Rxを生み出したのがCloud Programmability Teamであるからといって、Cloudで活用できるようにすることが原点で正しいなどということもないだろう。もちろん、未来は複数あってもいいので、私が示すRxの未来の選択肢の一つがR3だと思ってもらえればよいです。

ではRxの価値はどこにあるのか、というと、原点に立ち返ってインメモリのメッセージングをLINQで処理するLINQ to Eventsにあると考えます。特にクライアントサイド、UIに対する処理は、現代でもRxが評価されているポイントであり、Rx Likeな、しかしより言語に寄り添い最適化されているKotlin FlowSwift Combineといった選択肢が現役で存在しています。UIだけではなく、複雑で大量のイベントが飛び交うゲームアプリケーションにおいても、ゲームエンジン(Unity)で使われているUniRxの開発者として、非常に有益であることを実感しています。オブザーバーパターンやeventの有意義さは疑う余地のないところですし、そこでRxがbetter event、オブザーパーパターンの決定版として使えることもまた変わらないわけです。

R3での再構築

最初に、Rxとしてのインターフェイスを100%維持しながらレガシーAPIの削除や新APIの追加をすべきか、それとも根本から変更すべきかを悩みました。しかし(私が問題だと考えている)すべての問題を解決するには抜本的な変更が必要だし、Kotlin FlowやSwift Combineの成功事例もあるので、旧来のRxとの互換性に囚われず、.NET 8, C# 12という現代のC#環境に合わせて再構築された、完全に新しいRxであるべきという路線に決めました。

といっても、最終的にはインターフェイスにそこまで大きな違いはありません。

public abstract class Observable<T>
{
    public IDisposable Subscribe(Observer<T> observer);
}

public abstract class Observer<T> : IDisposable
{
    public void OnNext(T value);
    public void OnErrorResume(Exception error);
    public void OnCompleted(Result result); // Result is (Success | Failure)
}

パッと見だとOnErrorがOnErrorResumeになったことと、interfaceではなくてabstract classになったこと、ぐらいでしょうか。どうしても変更したかった点の一つがOnErrorで、パイプライン上で例外が起きると購読解除されるという挙動はRxにおけるbillion-dollar mistakeだと思っています。R3では例外はOnErrorResumeに流れて、購読解除されません。かわりにOnCompletedに、SuccessまたはFailureを表すResultが渡ってくるようになっていて、こちらでパイプラインの終了が表されています。

IObservable<T>/IObserver<T>の定義はIEnumerble<T>/IEnumerator<T>と密接に関わっていて、数学的双対であると称しているのですが、実用上不便なところがあり、その最たるものがOnErrorで停止することです。なぜ不便かというと、IEnumerable<T>のforeachの例外発生とIObservable<T>の例外発生では、ライフタイムが異なることに起因します。foreachの例外発生はそこでイテレーターの消化が終わり、必要があればtry-catchで処理して、大抵はリトライすることもないですが、ObservableのSubscribeは違います。イベントの購読の寿命は長く、例外発生でも停止しないで欲しいと思うことは不自然ではありません。通常のeventで例外が発生したとて停止することはないですが、Rxの場合はオペレーターチェーンの都合上、パイプライン中に例外が発生する可能性が常にあります(SelectやWhereすればFuncが例外を出す可能性がある)。イベントの代替、あるいは上位互換として考えると、例外で停止するほうが不自然になってしまいます。

そして、必要があればCatchしてRetryすればいい、というものではない!Rxにおいて停止したイベントを再購読するというのは非常に難しい!Observableにはeventと異なり、完了するという概念があります。完了したIObservableを購読すると即座にOnError | OnCompletedが呼ばれる、それにより自動的な再購読は、完了済みのシーケンスを再購読しにかかる危険性があります。もちろんそうなれば無限ループであり、それを判定し正しくハンドリングする術もない。Stack OverflowにはRx/Combine/FlowのUI購読で再購読するにはどうすればいいですか?のような質問が多数あり、そしてその回答は非常に複雑なコードの記述を要求していたりします。現実はRepeat/Retryだけで解決していない!

そこで、そもそも例外で停止しないように変更しました。OnErrorという命名のままでは従来の停止する動作と混同する可能性があるため、かわりにOnErrorResumeという名前に変えています。これで再購読に関する問題は全て解決します。更にこの変更には利点があり、停止する→停止しないの挙動変更は不可能ですが(Disposeチェーンが走ってしまうので状態を復元できないので全体の再購読以外に手段がない)、停止しない→停止するへの挙動変更は非常に簡単でパフォーマンスもよく実装できます。OnErrorResumeが来たらOnCompleted(Result.Failure)に変換するオペレーターを用意するだけですから(標準でOnErrorResumeAsFailureというオペレーターを追加してあります)。

Rx自体が複雑なコントラクトを持つ(OnErrorかOnCompletedはどちらか一つしか発行されない、など)わりに、インターフェースは実装上の保証がないので、従来のRxは正しく実装するのが難しいという問題がありました。SourceのSubscribeが遅延される場合は、先行して返却されるDisposableを正しくハンドリングする必要がある(SingleAssignmentDisposableを使う)などといったことも、正しく理解することは難しいでしょう。SubscribeのonNextで発生した例外はどこに行くのか、onErrorに行ってDisposeされるのか継続されるのか。その動作は特に規定されていないため実装次第で挙動はバラバラの場合もあります。R3ではasbtract class化することにより大部分のコントラクトを保証し、挙動の統一と、独自実装を容易にしました。

そしてabstract classにした最大の理由は、全ての購読を中央管理できるようにしたことです。全てのSubscribeは必ず基底クラスのSubscribe実装を通ります。これにより、購読のトラッキングが可能になりました。例えば以下のような形で表示できます。

image

これはUnity向けの拡張Windowですが、Godot用にも存在するほか、APIとして提供しているためログに出したり任意のタイミングで取得したり、独自の可視化を作ることも可能です

TaskにはParallel Debuggerがありますが(これもTaskが基底クラス側でs_asyncDebuggingEnabledの時に中央管理している)、Rxの購読の可視化は、それよりも遥かに重要でしょう。イベントの購読リークはつきもので、開発終盤に必死に探し回る羽目になりますが、R3ならもう不要です!圧倒的開発効率アップ!

R3ではこうした購読の管理、リーク防止については最重要視していて、Observable Trackerによる全ての購読の追跡の他に、概念として「全てのObservableは完了することができる」ようにしました。

Rxにおける購読の管理の基本はIDisposableをDisposeすることです。が、購読を解除する方法は実はそれだけではなく、OnError | OnCompletedが流れることでも解除されるようになっています(IObservableのコントラクトが保証しているわけではないですが実装上そうなっている、R3では必ずそうなるように基底クラス側で保証するようにした)。つまりシーケンスの上流(OnError | OnCompletedの発行)と下流(Dispose)、両面からハンドリングすることでリークをより確実に防ぐことができます。

対応として過剰に思うかもしれませんが、実際のアプリケーションを開発してきた経験からいうと、購読管理は過剰なぐらいがちょうどいい。そうした思想から、R3では、今までOnCompletedを発行する手段のなかったObservable.FromEventやObservable.Timer、EveryUpdateなども、OnCompletedを発行可能にしました。なお、発行方法はCancellationTokenを渡すことで、これもasync/await以降に多用(あるいは濫用)されるようになったCancellationTokenを活用する現代的なAPI設計です。また、こうした全てのObservableは完了する、という思想があるため、SubjectのDisposeも標準でOnCompletedを発行するように変更しました。

ISchedulerを再考する

Rxの時空を移動するマジックを実現する機構がISchedulerです。TimerやObserveOnに渡すことで、任意の場所(ThreadやDispatcher、PlayerLoopなど)・時間に値を移動させることができます。

public interface IScheduler
{
    DateTimeOffset Now { get; }

    IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
}

そして、実は破綻しています。Rxのソースコードを見たことがあるなら気づいているかもしれませんが、初期のうちから追加の別の定義が用意されています。例えばThreadPoolSchedulerは以下のようなインターフェイスを実装しています。

public interface ISchedulerLongRunning
{
    IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action);
}

public interface ISchedulerPeriodic
{
    IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action);
}

public interface IStopwatchProvider
{
    IStopwatch StartStopwatch();
}

public abstract partial class LocalScheduler : IScheduler, IStopwatchProvider, IServiceProvider
{
}

public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
}

そして、以下のような呼び出しがなされています。

public static IStopwatch StartStopwatch(this IScheduler scheduler)
{
    var swp = scheduler.AsStopwatchProvider();
    if (swp != null)
    {
        return swp.StartStopwatch();
    }

    return new EmulatedStopwatch(scheduler);
}

private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
    var periodic = scheduler.AsPeriodic();
    if (periodic != null)
    {
        return periodic.SchedulePeriodic(state, period, action);
    }

    var swp = scheduler.AsStopwatchProvider();
    if (swp != null)
    {
        var spr = new SchedulePeriodicStopwatch<TState>(scheduler, state, period, action, swp);
        return spr.Start();
    }
    else
    {
        var spr = new SchedulePeriodicRecursive<TState>(scheduler, state, period, action);
        return spr.Start();
    }
}

ようは生のISchedulerを使わないケースがそれなりにあります。なぜ使われないのか、というと、パフォーマンス上の問題で、IScheduler.Scheduleは単発の実行しか定義されていなくて、複数回の呼び出しは再帰的にScheduleを呼べばいいじゃんという発想なわけですが、都度IDisposableを生成するなどパフォーマンス的に問題がある。ので、それを回避するためにISchedulerPeriodicなどが用意されたのでした。

それなら、もうISchedulerではなく、実態をまともに反映されたものを使ったほうがいいんじゃないか?と思ったときに出てきたのが.NET 8で追加されたTimeProviderで、これならISchedulerが行っていたことをより効率的にできることを発見しました。

public abstract class TimeProvider
{
    // use these.
    public virtual ITimer CreateTimer(TimerCallback callback, object? state, TimeSpan dueTime, TimeSpan period);
    public virtual long GetTimestamp();
}

CreateTimerで生成されるITimerはISchedulerPeriodicで行える機能を十分持っているほか、ワンタイムの実行を繰り返す(Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action))のシナリオにおいても、ITimerを使いまわせるため、dotnet/reactiveのThreadPoolSchedulerよりも効率的です(ThreadPoolSchedulerは都度new Timer()している)。

現在時間の取得に関しては、DateTimeOffset IScheduler.NowのようにTimeProviderもDateTimeOffset TimeProvider.GetUtcNow()がありますが、使っているのはlong GetTimestampだけです。というのも、オペレーターの実装に必要なのはTicksだけなので、わざわざDateTimeOffsetに包むようなオーバーヘッドはないほうが良いので、生のTicksを扱って時間を計算します。

DateTimeOffset.UtcNowはOSのシステム時刻の変更の影響を受ける可能性もあるので、そういう点でもDateTimeOffsetを介さないGetTimestamp(標準ではStopwatch.GetTimestamp()からの高解像度タイマーが利用される)経由が良いでしょう。

ISchedulerのもう一つの問題として、同期的な処理を行うImmediateSchedulerCurrentSchedulerがいます。これらにTimerやDelayなど時間系の処理を任せるとThread.Sleepするという、使うべきではない非同期コードのエミュレーションをするので、つまり、同期的なSchedulerは存在が悪なのでないほうがいいでしょう。R3では完全に消し、TimeProviderを指定するということは必ず非同期的な呼び出しであるということを徹底しました。

ImmediateSchedulerCurrentSchedulerの問題はそれだけじゃなくて、そもそもパフォーマンスが致命的に悪いという問題があります。

image

Observable.Range(1, 10000).Subscribe() の結果

CurrentSchedulerはともかく、ImmediateSchedulerの結果が悪いのは直観に反するかもしれません。dotnet/reactiveのImmediateSchedulerは、Scheduleされるたびにnew AsyncLockScheduler()し、AsyncLockSchedulerが呼び出す基底クラスLocalSchedulerのコンストラクターがSystemClock.Registerし、それはlocknew WeakReference<LocalScheduler>(scheduler)し、HashSet.Addします。パフォーマンスが悪いのも当然です(ただし再帰的な呼び出し時には都度SingleAssignmentDisposableを生成するだけに抑えられてはいます、それでも多いですが)

Rangeなんてめったに使わないから大丈夫と思いきや、実は意外なところでImmediateSchedulerはちょくちょく使われています。代表的なのがMergeで、これはISchedulerが無指定の場合はImmediateSchedulerを使うため、頻繁な購読を繰り返す作りになっていると、かなりの呼び出す回数になる可能性があります。実際、dotnet/reactiveをサーバーアプリケーションで使用した際に、MergeとImmediateSchedulerが原因でサーバーのメモリ使用量のかなりを占めたことがありました。その時はカスタムの軽量なスケジューラーを作成し、直接指定することで徹底的にImmediateSchedulerを避けることで何とかしました。Next dotnet/reactiveがあるなら、ImmediateSchedulerのパフォーマンスの改善は真っ先に行う必要があります。

SystemClock.Registerをしている理由としては、DateTimeOffset.UtcNowとシステム時刻の変更の監視のためのようです。つまり、最初からDateTimeOffsetではなくlongを使えば、このような致命的なパフォーマンス低下も招きませんでした。これもまたISchedulerのインターフェイス定義の失敗理由の一つです。

ところで、TimeProviderの採用によって、Microsoft.Extensions.Time.Testing.FakeTimeProviderを使い、標準的な手法でユニットテストが容易になったことも嬉しいところでしょう。

FrameProvider

他のRxでは見かけないがUniRxで絶大な効果を発揮したものとして、フレームベースのオペレーター郡があります。一定フレーム後に実行するDelayFrameや次フレームで実行するNextFrame、毎フレーム発行するファクトリーであるEveryUpdateや、毎フレーム値を監視するEveryValueChangedなど、ゲームエンジンで利用するにあたって便利なオペレーターが揃っています。

そこで気づいたのが、時間とフレームは概念的には似たものであり、ゲームエンジンだけでなく、UI処理ではメッセージループやレンダリングループという形で、様々なフレームワークに存在している。そこで、R3では新しくTimerProviderと対になるFrameProviderという形でフレームベースの処理を抽象化しました。これによってUnityだけに提供されていたフレームベースのオペレーターが、C#が動作するあらゆるフレームワーク(WinForms, WPF, WinUI3, MAUI, Godot, Avalonia, Stride, etc...)で動作せることができるようになりました。

public abstract class FrameProvider
{
    public abstract long GetFrameCount();
    public abstract void Register(IFrameRunnerWorkItem callback);
}

public interface IFrameRunnerWorkItem
{
    // true, continue
    bool MoveNext(long frameCount);
}

R3ではTimeProviderを要求するオペレーターがある場合、全てに対となる***Frameオペレーターを実装しました。

  • Return <-> ReturnFrame
  • Yield <-> YieldFrame
  • Interval <-> IntervalFrame
  • Timer <-> TimerFrame
  • Chunk <-> ChunkFrame
  • Debounce <-> DebounceFrame
  • Delay <-> DelayFrame
  • DelaySubscription <-> DelaySubscriptionFrame
  • ObserveOn(TimeProvider) <-> ObserveOn(FrameProvider)
  • Replay <-> ReplayFrame
  • Skip <-> SkipFrame
  • SkipLast <-> SkipLastFrame
  • SubscribeOn(TimeProvider) <-> SubscribeOn(FrameProvider)
  • Take <-> TakeFrame
  • TakeLast <-> TakeLastFrame
  • ThrottleFirst <-> ThrottleFirstFrame
  • ThrottleFirstLast <-> ThrottleFirstLastFrame
  • ThrottleLast <-> ThrottleLastFrame
  • Timeout <-> TimeoutFrame

async/await Integration

まず、既存のRxにおいて良くない点である単一の値を返すObservableを徹底的に排除しました。これらはasync/awaitを使うべきで、単一の値を返したり、単一の値を期待して合成するようなオペレーターはバッドプラクティスに誘うノイズです。FirstはFirstAsyncになり、Task<T>を返します。AsyncSubjectはなくなり、TaskCompletionSourceを使ってください。

そのうえで、現在のC#コードは日常的に非同期のコードが返ってきます、が、基本的にはRxは同期コードしか受け取りません。うっかりすればFireAndForget状態になるし、SelectManyに混ぜるだけでは十分とはいえません。そこで、Where/Select/Subscribeに特殊なメソッド群を用意しました。

  • SelectAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask<TResult>> selector, AwaitOperation awaitOperation = Sequential, ...)
  • WhereAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask<Boolean>> predicate, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, Action<Result> onCompleted, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, Action<Exception> onErrorResume, Action<Result> onCompleted, AwaitOperation awaitOperation = Sequential, ...)
public enum AwaitOperation
{
    /// <summary>All values are queued, and the next value waits for the completion of the asynchronous method.</summary>
    Sequential,
    /// <summary>Drop new value when async operation is running.</summary>
    Drop,
    /// <summary>If the previous asynchronous method is running, it is cancelled and the next asynchronous method is executed.</summary>
    Switch,
    /// <summary>All values are sent immediately to the asynchronous method.</summary>
    Parallel,
    /// <summary>All values are sent immediately to the asynchronous method, but the results are queued and passed to the next operator in order.</summary>
    SequentialParallel,
    /// <summary>Send the first value and the last value while the asynchronous method is running.</summary>
    ThrottleFirstLast
}

SelectAwait, WhereAwait, SubscribeAwaitは非同期メソッドを受け取り、その非同期メソッドが実行されている間に届く値に対する処理のパターンを6パターン用意しました。Sequentialはいったんキューにためて非同期メソッドが完了したら新しい値を送ります。Dropは実行中に届いた値は全て捨てます、これはイベントハンドリングで多重Submit防止などに使えます。SwitchはObservable<Observable>.Switchと同様、Parallelは並列実行するものでObservable<Observable>.Mergeと同様、ですがわかりやすいでしょう。並列実行数も指定できます。SequentialParallelは並列実行しつつ、後続に流す値は届いた順序で保証します。ThrottleFirstLastは非同期メソッド実行中の最初の値と最後の値を送ります。

更に、以下の時間系のフィルタリングメソッドなども非同期メソッドを受け取るようになっています。

  • Debounce(this Observable<T> source, Func<T, CancellationToken, ValueTask> throttleDurationSelector, ...)
  • ThrottleFirst(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)
  • ThrottleLast(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)
  • ThrottleFirstLast(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)

また、Chunkも同様に非同期メソッドを受け取るほか、SkipUntilには非同期メソッドと、Task, CancellationTokenを受け取れるようになっています。

  • SkipUntil(this Observable<T> source, CancellationToken cancellationToken)
  • SkipUntil(this Observable<T> source, Task task)
  • SkipUntil(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, ...)
  • TakeUntil(this Observable<T> source, CancellationToken cancellationToken)
  • TakeUntil(this Observable<T> source, Task task)
  • TakeUntil(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, ...)
  • Chunk(this Observable source, Func<T, CancellationToken, ValueTask> asyncWindow, ...)

例えばChunkの非同期関数版を使えば、固定時間ではなくてランダム時間でチャンクを生成するといった複雑な処理を、自然に簡単に書けるようになります。

Observable.Interval(TimeSpan.FromSeconds(1))
    .Index()
    .Chunk(async (_, ct) =>
    {
        await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct);
    })
    .Subscribe(xs =>
    {
        Console.WriteLine(string.Join(", ", xs));
    });

async/awaitは現代のC#に欠かせないコードですが、可能な限りスムーズにRxと統合されるように腐心しました。

Retry関連もasync/awaitを活用することで、よりベターなハンドリングができます。まず、以前のRxはパイプライン丸ごとのリトライしか出来ませんでしたが、async/awaitを受け入れられるR3なら、非同期メソッド実行単位でのリトライができます。

button.OnClickAsObservable()
    .SelectAwait(async (_, ct) =>
    {
        var retry = 0;
    AGAIN:
        try
        {            
            var req = await UnityWebRequest.Get("https://google.com/").SendWebRequest().WithCancellation(ct);
            return req.downloadHandler.text;
        }
        catch
        {
            if (retry++ < 3) goto AGAIN;
            throw;
        }
    }, AwaitOperation.Drop)
    .SubscribeToText(text);

Repeatもasync/awaitと組み合わせることで実装できます。この場合、Repeatの条件に関する複雑なハンドリングがRxだけで完結させるよりも、容易にできるでしょう。

while (!ct.IsCancellationRequested)
{
    await button.OnClickAsObservable()
        .Take(1)
        .ForEachAsync(_ =>
        {
            // do something
        });
}

手続き的なコードは決して悪いことではないですし、場合によりRxのオペレーターだけで完結させるよりも可読性が高くなります。コーディングにおいて優先すべきは可読性の高さ(とパフォーマンス)です。より良いコードのためにも、Rxとasync/awaitをうまく連携させていきましょう。

CreateやCreateFromなどで、非同期メソッドからObservableを生成することもできます。ここから生成することで、オペレーターを無理やりこねくり回すよりも簡潔に記述することが可能かもしれません。

  • Create(Func<Observer<T>, CancellationToken, ValueTask> subscribe, ...)
  • CreateFrom(Func<CancellationToken, IAsyncEnumerable<T>> factory)

名前付けのルール

R3では幾つかのメソッドの名前がdotnet/rectiveやUniRxから変更されています。例えば以下のものです。

  • Buffer -> Chunk
  • StartWith -> Prepend
  • Distinct(selector) -> DistinctBy
  • Throttle -> Debounce
  • Sample -> ThrottleLast

この変更の理由について説明しましょう。

まず、.NETにおいてLINQスタイルのライブラリを作成する場合に最優先すべき名前はLINQ to Objects(Enumerable)に実装されているメソッド名です。BufferがなぜChunkに変更されたかというと、.NET 6からEnumerable.Chunkが追加され、その機能がBufferと同じだからです。RxのほうがChunkの登場より遥か前なので、名前が違うのはどうにもならないのですが、何のしがらみもないのなら名称はLINQ to Objectsに合わせなければならない。よって、Chunk一択です。PrependやDistinctByも同様です。

ThrottleDebounceに変更されたことには抵抗があるかもしれません。これは、そもそも世の中のスタンダードはDebounceだからです。Rx系でDebounceThrottleという名前でやってるのはdotnet/reactiveだけです。世の中のRxの始祖はRxNetなのだから変えなきゃいけない謂われはない、と突っぱねることも正義ではあるんですが、もはや多勢に無勢の少数派なので、長いものに巻かれることもまた正しい。

Debounceに変えた理由はそれだけではなく、ThrottleFirst / ThrottleLastの存在もあります。これらはサンプリング期間の最初の値を採用する、または最後の値を採用する、というもので対になっています。で、(dotnet/reactiveの)Throttleは全然違う挙動なわけです、なのにThrottleという名前は混乱するでしょう。そももそもdotnet/reactiveにはThrottleFirstが存在せず、ThrottleLastに相当するSampleのみが存在するので大丈夫なのですが、ThrottleFirst/ThrottleLastを採用するなら、必然的に名前はDebounceにせざるを得ません。どちらかというとdotnet/reactiveの機能不足が悪い。

Sampleに関してはFirst/Lastという名前と機能の対称性からThrottleLastという名前に変更しました。dotnet/reactiveではFirstが存在しないのでSampleでも良かったのですが、ThrottleFirstを採用するなら、必然的に名前はThrottleLastになります。

Sampleの名前は残してThrottleLastのエイリアスにするという折衷案もあるのですが(RxJavaなどはそうなっています)、同じ機能の別名があるとユーザーは混乱します。世の中にはsamplethrottleLastの違いってなんですか?みたいな質問がそれなりにあります。ただでさえ複雑なRx、無用な混乱を避けるためにもエイリアスは絶対にやめるべき。SelectをMap、WhereをFilterにマッピングするみたいなエイリアスは愚かの極みです。

プラットフォーム向けデフォルトスケジューラー

dotnet/reactiveにおいてデフォルトのスケジューラーはほとんど固定です。正確にはIPlatformEnlightenmentProviderIConcurrencyAbstractionLayerというのものを適切に実装すれば、ある程度挙動を差し替えることも可能なのですが、無駄に複雑なうえに[EditorBrowsable(EditorBrowsableState.Never)]で隠されているしで、まともに使うことはほとんど想定されていないように見えます。

しかし、TimerやDelayなどはWPFであればDispatcherTimerで、UnityではPlayerLoop上のTimerで動くと、自動的にメインスレッドにディスパッチしてくれるので、ほとんどの場合でObserveOnが不要になるので便利ですしパフォーマンス上も有利に働きます。

R3ではシンプルにデフォルトのTimeProvider/FrameProviderを差し替えられるようにしました。

public static class ObservableSystem
{
    public static TimeProvider DefaultTimeProvider { get; set; } = TimeProvider.System;
    public static FrameProvider DefaultFrameProvider { get; set; } = new NotSupportedFrameProvider();
}

アプリケーション起動時に差し替えれば、そのアプリケーション上でベストなスケジューラーがデフォルト利用されます。

// 例えばWPFの場合はDispatcher系がセットされるので自動的にUIスレッドに戻ってくる
public static class WpfProviderInitializer
{
    public static void SetDefaultObservableSystem(Action<Exception> unhandledExceptionHandler)
    {
        ObservableSystem.RegisterUnhandledExceptionHandler(unhandledExceptionHandler);
        ObservableSystem.DefaultTimeProvider = new WpfDispatcherTimerProvider();
        ObservableSystem.DefaultFrameProvider = new WpfRenderingFrameProvider();
    }
}

// Unityの場合はPlayerLoopベースのものが使用されるのでThreadPoolを避けれる
public static class UnityProviderInitializer
{
    [RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.AfterAssembliesLoaded)]
    public static void SetDefaultObservableSystem()
    {
        SetDefaultObservableSystem(static ex => UnityEngine.Debug.LogException(ex));
    }

    public static void SetDefaultObservableSystem(Action<Exception> unhandledExceptionHandler)
    {
        ObservableSystem.RegisterUnhandledExceptionHandler(unhandledExceptionHandler);
        ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
        ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
    }
}

dotnet/reactiveがデフォルトスケジューラーを変更できないのは、あまり、多種のプラットフォームをサポートしているとは言い難いでしょう。

internal static class SchedulerDefaults
{
    internal static IScheduler ConstantTimeOperations => ImmediateScheduler.Instance;
    internal static IScheduler TailRecursion => ImmediateScheduler.Instance;
    internal static IScheduler Iteration => CurrentThreadScheduler.Instance;
    internal static IScheduler TimeBasedOperations => DefaultScheduler.Instance;
    internal static IScheduler AsyncConversions => DefaultScheduler.Instance;
}

特にAOTのシナリオやWeb向けパブリッシュ(WASM)では、ThreadPoolが使えなくて絶対に避けたいという状況もあります。そこでSchedulerDefaults.TimeBasedOperationsが実質ThreadPoolSchedulerに固定されているのは厳しいと言わざるを得ません。

Pull IAsyncEnumerable vs Push Observable

IAsyncEnumerable(またはUniTaskのIUniTaskAsyncEnumerable)は、Pullベースの非同期シーケンス。RxはPushベースの非同期シーケンス。似てます。LINQ的なことができるのも似てます。どちらを使うべきかがケースバイケースなのは当然だとして、じゃあそのケースってのはなんなのか、いつどちらを使えばいいのか。という判断基準は欲しいところです。

基本的には裏にバッファー(キュー)があるものはPullベースが向いていると思うので、ネットワーク系のシナリオなんかはIAsyncEnumerableを使っていくといいんじゃないでしょーか。で、実際、System.IO.PipelinesSystem.Threading.Channelsによって自然と使う機会が出てきます。

Rxを使うべきところは、やはりイベント関連です。

どちらを使うべきかの判断の決め手は、源流のソースにとって自然な表現を選ぶべき、ということです。生のイベント、OnMoveであったりOnClickであったりなどは、完全にPushで、そこにバッファーはありません。ということは、Rxで扱うほうが自然です。間にキューを挟んでIAsyncEnumerableで扱うこともできますが、不自然ですよね。あるいはキューを介さないことにより意図的に値をDropするという表現をすることもできますが、やはりそれも不自然です。不自然ということはたいていはパフォーマンスも良くないし、分かりやすくもない。つまり、良くない。だから、イベント関連はRxで扱いましょう。R3ならasync/awaitとの統合によって、非同期処理中のバッファリングや値のドロップなどは明示的にオペレーターで指定することができます。それは、分かりやすく、パフォーマンスも良い。R3を使っていきましょう。

C#パフォーマンス勉強会

ところで4/27にC#パフォーマンス勉強会という勉強会が大阪で(大阪で!)開催されます。私は「R3のコードから見る実践LINQ実装最適化・コンカレントプログラミング実例」というタイトルで、R3の!実装の!パフォーマンス上の工夫を!徹底的に解説しようと思っているので、参加できる方はぜひぜひです。関西へは滅多に行かないので貴重な機会ということなのでよろしくお願いします!

まとめ

色々言いましたが、オリジナルのRx.NETの作者達には感謝しかありません。改めて、やはりRxのアイディアの素晴らしさや、各種オペレーターの整理された機能には目を見張るものがあります。幾つかの部分の実装は古くなってしまっていますが、実装クオリティも高いと思います。私自身も最初期から使ってきたし、熱狂してきました。そして、現在のメンテナーにも感謝します。常に変わっていく環境の中で、多く使われているライブラリを維持することはとても大変なことです。

しかし、だからこそ、Rxの価値を復活させたかった。そして、再構築するならば、できるのは私しかいないと思った。最初期からのRxの歴史と実装を知っていて、自分でRxそのものの実装(UniRx)を行い、それが世の中に広く使われることで多くのユースケースや問題点を知り、自分自身もゲームタイトルの実装で大規模に使われるRxのアプリケーション側にも関わり、Rxと対となるasync/awaitの独自ランタイム(UniTask)を実装し、それも世の中に広く使われていることで、この領域に関してのあらゆる知見がある。

上のほうでも言いましたが、未来は複数あってもいいので、私が示すRxの未来の一つがR3だと思ってもらえればよいです。dotnet/reactiveにもまた別の進化と未来がある。かもしれません。

そのうえでR3は置き換えられるだけのポテンシャルと、可能性を見せることができたと思っています。実装には自信あり、です。今回UniRxの実績があったからというのもあり、プレビュー公開時から多くのフィードバックがもらえたことは嬉しかったです(UniTask初公開時は、Unityのコンパイラを実験的コンパイラに差し替える必要があるとかいうエクストリーム仕様だったせいか、しばらくの間は誰も使ってくれなかったというか意義を分かってくれなかったので……)。

移行に関するシナリオも最大限配慮したつもりではあるので、是非使ってみてください……!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive