同期(風)コードと対比させたUnity+UniRxで非同期を扱う場合のパターン集

UniRxのGitHubのStar数が500行きました!

image

今のところGitHub上でのUnity + C#でスター順の検索だと、世界5位です。おおー。更に上を狙いたいところですね。最近はちょっと更新が滞っていますが、ネタはあるのでより完成度を高めたい。(滞った理由は、PhotonWireとか色々他のところに手を出していたため……)

さて、本題。イベント結合に使う際はあてはまりませんが、Rx(UniRx)を非同期(長さ1のIOservableシーケンス)として扱う場合、それなりに癖があります。とはいえ、基本的には同期(或いはyield return)で書いていた際と、1:1で対比できるパターン化した形で概ね対応できるので、そのためのチートシートを考えてみました。コード例はC# 5.0のasync/awaitで出しますが、同期コード or IEnumeratorと同じように思ってもらえればいいです。例えば

public void Sync()
{
    /* before action */
    Method();
    /* after action */
}

public IEnumerator IEnumerator()
{
    /* before action */
    yield return StartCoroutine(Method());
    /* after action */
}

public async Task Task()
{
    /* before action */
    await MethodAsync();
    /* after action */
}

みたいな感じです、awaitに馴染みのない人も、なんとなくイメージしながら眺めてみてもらえると嬉しいです。

非同期汚染

コード例の前に非同期汚染、或いは非同期の伝搬について。まぁ、あんまし汚染という言い方は好きじゃないのですが、基本的に非同期、つまりTaskでもFutureでもPromiseでもIObservableでも、は、下層から上層まで伝搬していきます。メソッドが非同期であるなら戻り値はIObservableであり、そのIObservableを呼ぶメソッドもまた自然と非同期でなければならないので、IObservableになる、と。何故非同期の連鎖でなければならないのか。消費(Subscribe)してしまうと、その瞬間Fire and Forgetになってしまい、戻りを待ったりキャンセルしたりなどの別の操作が行えなくなってしまうからです。別にFire and Forgetしたければ、呼び元がそれを選択(Subscribeして放置)すればいいわけで、呼ばれる側が決定することではない。

もちろん、最終的にはどこかの層で消費(Subscribe)しなければならないので、そこで伝搬は止まるのですけれど、それは、基本的には上層であればあるほどよいということですね。どこが上層やねんって話はあるかもしれませんが、ユーザーインタラクションに近かったり、MonoBehaviourのイベント層に近かったり、あたりがそうですかねー。あとは、ごく一部でしか使わないんだ!という確固たる思いがあれば、早い段階でSubscribeして伝搬を止めるのも策ではあります、その辺はケースバイケースで。

非同期の伝搬に都合の良いメソッドが現状のUniRxには足りてません。実は!というわけで、次期バージョンではForEachAsyncというものを足したいのですが、それまでは以下のものをコピペって代用してください。挙動的にはシーケンスを消費して長さ1のIObservable[Unit]を返すもので、元シーケンスが非同期(長さ1)ならDoやSelectと、概ね一緒です。

// 次期バージョンに入るので、それまでの代用ということで。
// 元シーケンスが非同期なら .Select(x => { /* action(); */ return Unit.Default; }) とほぼ同様
namespace UniRx
{
    public static class UniRxExtensions
    {
        public static IObservable<Unit> ForEachAsync<T>(this IObservable<T> source, Action<T> onNext)
        {
            return Observable.Create<Unit>(observer =>
            {
                return source.Subscribe(x =>
                {
                    try
                    {
                        onNext(x);
                    }
                    catch (Exception ex)
                    {
                        observer.OnError(ex);
                        return;
                    }
                }, observer.OnError, () =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
            });
        }
    }
}

また、副作用(外の変数への代入など)に関しては、あまり気にしないほうが吉です。いや、Rxのパイプラインに押し込めたほうが美しくはあるんですが、それがオブジェクトであるなら、副作用かけてフィールド変数を変えたり、ReactivePropertyに結果を伝えたりとかは、あって然りかな、と。考える際には「もしこれが同期コードだったらどうなのか」を意識したほうがいいかもしれません、同期コードで自然なら、別にRxでそれを行っても、構わないのです。とはいえ、以下に紹介するコードは全部、副作用大前提みたいな説明なので、それはそれで若干の狂気でもありますが、その辺は慣れてきてからでよいかと。

戻り値のない場合

public async Task Demo1_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action */
}

public IObservable<Unit> Demo1_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .ForEachAsync(_ =>
        {
            /* after action */
        });
}

メソッドに戻り値がない場合は、awaitの位置にForEachAsyncで、その中にactionを書く形になります。RxにおいてはIObservable[Unit]を戻り値のないことの表明として使います。

内部に複数の非同期がある場合

public async Task Demo2_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action 1 */
    var y = await Task.Factory.StartNew(() => 200);
    /* after action 2 */
}

public IObservable<Unit> Demo2_IO_1Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */

            return Observable.Start(() => 200);
        })
        .ForEachAsync(y =>
        {
            /* after action 2 */
        });
}

awaitの位置にSelectManyを置くことで繋げることができます。最後の消費だけForEachAsyncで。

パイプライン中に複数の値を伝搬したい場合

public IObservable<Unit> Demo2_IO_2Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            return Observable.Start(() => 200);
        }, (x, y) => new { x, y }) // transport argument to next chain
        .ForEachAsync(o =>
        {
            /* after action 2 */
            // { o.x, o,y } 
        });
}

public IObservable<Unit> Demo2_IO_2_2Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            var z = SyncMethod();
            return Observable.Start(() => 200).Select(y => new { x, y, z });
        })
        .ForEachAsync(o =>
        {
            /* after action 2 */
            // { o.x, o,y, o.z } 
        });
}

同期コードでは、そのスコープ中の全ての値が使えるわけですが、Rxのメソッドチェーンでは次のパイプラインに送り込める値は一つしかありません。というわけで、匿名型(もしくはUniRx.Tuple)を使って、次のパイプラインへは値をまとめて上げる必要があります。SelectManyには第二引数があり、それにより前の値と次の値をまとめることができます。また、SelectMany内部で作った値を送り込みたい場合は、戻り値のところでSelectを使ってスコープ内でキャプチャして返してあげればいいでしょう。(匿名型、Tupleともにclassなので、気になる場合はstructの入れ物を用意してもいいかもしれない、何か箱を作って運搬しなきゃいけないのは残念ながら仕様です)

非同期が連鎖する場合

public IObservable<Unit> Demo2_IO_2_MoreChainAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            return Observable.Start(() => 200);
        }, (x, y) => new { x, y })
        .SelectMany(o =>
        {
            /* after action 2 */
            return Observable.Start(() => 300);
        }, (o, z) => new { o.x, o.y, z }) // re-construct self
        .ForEachAsync(o =>
        {
            /* after action 3 */
            // { o.x, o,y, o.z } 
        });
}

SelectManyの連打になります。また、伝搬する値は自分で分解して付け直してあげる必要があります、これは面倒くさいですね!この辺はクエリ構文を使った場合、Transparent Identifierという仕組みで自動的にコンパイラが行うのですが(An Internal of LINQ to Objectsの35P、Rxでクエリ構文は結構頻繁にクエリ構文の範疇を逸脱するのと、副作用をパイプライン途中に書けないためあまり使い勝手は良くないので、面倒くさいながら手作業再構築を薦めます。

戻り値を返す場合

public async Task<int> Demo3_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action */
    return x; // return value
}

public IObservable<int> Demo3_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .Select(x =>
        {
            /* after action */
            return x; // return value
        });
}

ForEachAsyncではなく、Selectを使っていきましょう。戻り値の型が同一で副作用だけ起こしたいならDoでも構わないのですが、まぁどっちでもいいです。また、awaitが複数になる場合は、SelectManyになります。そのうえでSelectManyのままreturnするか、最後に再びSelect(もしくはDo)を使うかどうかは、状況次第、かな。

例外をキャッチ

public async Task Demo4_TaskAsync()
{
    /* before action */
    try
    {
        var x = await Task.Factory.StartNew(() => 100);
    }
    catch (Exception ex)
    {
        /* onerror action */
        throw;
    }

    /* after action */
}

public IObservable<Unit> Demo4_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .Catch((Exception ex) =>
        {
            /* onerror action */
            return Observable.Throw<int>(ex);
        })
        .ForEachAsync(x =>
        {
            /* after action */
        });
}

これはCatchで賄えます。なお、Catchメソッドを使う際は、Catch<T>で例外の型を指定するよりも、ラムダ式の引数側で例外の型を書いたほうが書きやすいです(そうしたほうが型推論の関係上、ソースシーケンスの型を書かなくて済むため)。Catchの戻り値では再スローをObservable.Throw、握りつぶしをObservable.Return/Emptyで表現可能です。

Finally

public async Task Demo5_TaskAsync()
{
    /* before action(1) */
    try
    {
        var x = await Task.Factory.StartNew(() => 100);
    }
    finally
    {
        /* finally action(2) */
    }

    /* after action(3) */
}

// not equivant try-finally
public IObservable<Unit> Demo5_IO_PseudoAsync()
{
    /* before action(1) */
    return Observable.Start(() => 100)
        .Finally(() =>
        {
            /* finally action(3) */
        })
        .ForEachAsync(x =>
        {
            /* after action(2) */
        });
}

public IObservable<Unit> Demo5_IO_CorrectLightweightButIsNotDryAsync()
{
    /* before action(1) */
    return Observable.Start(() => 100)
        .Do(_ => { /* finally action(2) */}, _ => {/* same finally action(2) */})
        .ForEachAsync(x =>
        {
            /* after action(3) */
        });
}

Finallyに関しては、実は同じに扱える表現がありません!RxのFinallyはパイプラインの終了時の実行なので、実行順序がベタtry-finallyで書いた時と異なるんですよねえ。いちおう、DoでOnNextとOnErrorのところに同じコードを書くことでそれっぽい表現は可能ではありますが……。

並列処理

public async Task ParallelAsync()
{
    var a = Task.Factory.StartNew(() => 100);
    var b = Task.Factory.StartNew(() => 200);
    var c = Task.Factory.StartNew(() => 300);
    
    var xs = await Task.WhenAll(a, b, c);
    /* after action */
}


public IObservable<Unit> ParallelIO()
{
    var a = Observable.Start(() => 100);
    var b = Observable.Start(() => 200);
    var c = Observable.Start(() => 300);
    
    return Observable.WhenAll(a, b, c)
        .ForEachAsync(xs =>
        {
            /* after action */
        });
}

並列処理は非同期固有の実行ですが、WhenAllでドバッとまとめるというのが基本方針。

タイムアウト

public async Task TimeoutAsync(TimeSpan timeout)
{
    var task = Task.Factory.StartNew(() => 100);    
    var delay = Task.Delay(timeout);
    if (await Task.WhenAny(task, delay) == delay)
    {
        /* timeout action */
        throw new TimeoutException();
    }
    /* after action */
}


public IObservable<Unit> TimeoutIO(TimeSpan timeout)
{
    return Observable.Start(() => 100)
        .Timeout(timeout)
        .Catch((TimeoutException ex) =>
        {
            /* timeout action */
            return Observable.Throw<int>(ex);
        })
        .ForEachAsync(x =>
        {
            /* after action */
        });
}

タイマウトも非同期固有の処理。async/awaitの場合、特有のイディオムがあります。UniRxの場合はTimeoutだけでOK。特に例外時に処理するものもないなら、Catchは不要です。

IEnumeratorに戻す

public IObservable<Unit> Demo6_IE()
{
    /* before action(1) */
    return Observable.FromCoroutine(() => Demo6_IECore());
}

IEnumerator Demo6_IECore()
{
    // 戻り値の不要な場合
    yield return Observable.Start(() => 100).StartAsCoroutine();
    
    int ret;
    yield return Observable.Start(() => 100).StartAsCoroutine(x => ret = x);
}

SelectManyの連打が辛い場合、ふつーのコルーチンに戻して、更にIObservableでラップするという手段も取れます。まあ、この辺は複雑さ度合いで自由に!

だったらもはや最初から全部コルーチンでええやん!Rxでメソッドチェーン複雑だし見た目だけならコルーチン最強にスッキリじゃん!というのは正しい。正しいんですが、例外処理・戻り値・合成可能性・並列処理・マルチスレッド、などといった要素が欠落してるので、コルーチンはコルーチンで苦しいところが多いというか実際のところシンプルなケース以外では相当苦しいので、基本的にはRxのほうが有利です。

async/awaitは必要?

みたとーり、必要です。どう考えても。さすがにSelectManyの連打を同期コードほどスッキリと言い張るのは無理があるでしょう。とはいえまぁ、書いて書けないこともないので、今あるツールの中でベストを尽くすのまた良きかな、とは思いますねー。というわけで良き非同期生活を!UniRxでイベントを扱う際のパターン集は、またそのうちにでも!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive