Immutable CollectionsとSubject(Rx)の高速化について

最近はUniRxというUnity向けのReactive Extensionsの実装を書いているので、そこにImmutableなCollectionのちょーどよく分かりやすい使い道の実例があるので紹介しようかと思います。Rx自体はImmutable Collections使ってるわけではありませんが、同様の(簡易的)実装を内部で持っています。UniRxも同様に簡易実装を中で持つ形です。

Immutable Collectionsを知らにゃい?詳しくはNET Framework Blog - Immutable collections ready for prime timeを。または、以前に私がセッションで発表した資料もありますので、それも見てください。neue cc - .NETのコレクション概要とImmutable Collectionsについて。1.0リリースからもベータ版のリリースは続いていて、今回はそのベータのほうを使います(ダウンロードはNuGetでプリリリースのものを有効にするだけです)。何故かと言うと、今回使うImmutableArrayはベータのほうにしか入っていないからです。

素朴なSubject

最も素朴なSubjectを作ってみましょう。SubjectはEventのRx的な表現で+=とInvokeが出来るもの、とでも思ってもらえれば。

public class MySubject<T> : IObservable<T>, IObserver<T>
{
    List<IObserver<T>> observers = new List<IObserver<T>>();

    // Subscribeするとリストに貯めて
    public IDisposable Subscribe(IObserver<T> observer)
    {
        observers.Add(observer);
        return null; // 本来は戻り値をDisposeするとRemoveだけど省略
    }

    // OnNextで配信
    public void OnNext(T value)
    {
        foreach (var item in observers)
        {
            item.OnNext(value);
        }
    }

    // OnErrorとOnCompletedは中略
}

こんなもんですね、簡単簡単。実際使う場合は

// とりあえずこういうの用意しとかないとメンドーなので。
// Rxを参照してるならSubescribe(x => { })でいいよ!
public class ActionObserver<T> : IObserver<T>
{
    readonly Action<T> onNext;

    public ActionObserver(Action<T> onNext)
    {
        this.onNext = onNext;
    }

    public void OnNext(T value)
    {
        onNext(value);
    }

    // OnErrorとOnCompletedは中略
}

// で、こんなかんぢ
var subject = new MySubject<int>();

subject.Subscribe(new ActionObserver<int>(x => Console.WriteLine(x)));
subject.Subscribe(new ActionObserver<int>(x => Console.WriteLine(x * 2)));

subject.OnNext(500); // 500, 1000

概ね見たまんまな単純な話ですねー、さて、この実装は素朴すぎるので簡単に死にます。マルチスレッドで、とかそういうことじゃなく、例えば……

// 呼ばれるとイベント登録しに走るような場合
subject.Subscribe(new ActionObserver<int>(x => subject.Subscribe(new ActionObserver<int>(_ => Console.WriteLine(x)))));

//ハンドルされていない例外: System.InvalidOperationException: コレクションが変更されました。列挙操作は実行されない可能性があります。
subject.OnNext(10000);

foreachの最中にList本体にAddやRemoveといった操作は許可されていないのですねー。そんなのしねーよ、と突っぱねることはRxの使い方の場合は実際できないので、対処が必要です。一番簡単なのはまるっとコピーすること。

// MySubject<T>.OnNext
public void OnNext(T value)
{
    foreach (var item in observers.ToArray()) // 列挙はコピー
    {
        item.OnNext(value);
    }
}

こういう対処はLINQ to XMLのドキュメント宣言型コードと命令型コードの混在のバグ (LINQ to XML)でも薦められている、特別でもない一般的なテクニックということで、場合によっては普通に使っても構わない話だと思います。スレッドセーフにするのもlock仕込むだけ。

public IDisposable Subscribe(IObserver<T> observer)
{
    lock (observers)
    {
        observers.Add(observer);
    }
    return null;
}

public void OnNext(T value)
{
    IObserver<T>[] array;
    lock (observers)
    {
        array = observers.ToArray();
    }
    foreach (var item in array)
    {
        item.OnNext(value);
    }
}

高速化する

素朴な実装の問題は、まぁパフォーマンス。コピーだから一概に悪いとは言わなくて、場合によっては全然普通に使って構わないというのは頭に入れて欲しいのですけれど、さすがにOnNextのような、イベントが叩かれるような、頻度の高いもので毎回コピーが走るのは些か厳しい。じゃあどうしよう?そうだConcurrent Collectionだ!ふむ……。でもConcurrentQueueとかだと(今回省いてますが)Removeするのがむつかしい。ConcurrentDictionaryで代替だ!でも列挙の具合が不透明(並列コレクションの列挙の挙動は結構色々なのでそれなりに注意が必要です)、パフォーマンス的にもただのforeachよりは劣るよねえ、せっかくやるならエクストリームな性能を追い求めたい気もする。

と、そこで出てくるのが(?)Immutable Collections、の、ImmutableArray。

注意しなきゃいけないのは別にImmutable Collections使ったからって必ずしも早いとかってわけじゃないです。むしろ多くの場合でImmutable Collectionsは不適でしょう。コレクションには特性があって、それにうまく合致しなければむしろ遅いです。今回のシチュエーションではImmutableArrayが割と最適にハマります(同じAPIを持ったリスト的なものにImmutableListがありますが、今回だとArrayのほうが良い)。とりあえず見てみましょう、か。

class MySubject<T> : IObservable<T>, IObserver<T>
{
    ImmutableArray<IObserver<T>> observers = ImmutableArray.Create<IObserver<T>>();

    public IDisposable Subscribe(IObserver<T> observer)
    {
        // スレッドセーフな入れ替え
        while (true)
        {
            var oldCollection = observers;
            var newCollection = oldCollection.Add(observer);
            var comparedCollection = ImmutableInterlocked.InterlockedCompareExchange(ref observers, newCollection, oldCollection);

            if (comparedCollection == oldCollection) return null; // 変更対象がAddしている間に変わってなければ成功
        };
    }

    public void OnNext(T value)
    {
        // 普通にぐるぐる回しても安全
        foreach (var item in observers)
        {
            item.OnNext(value);
        }
    }

    // OnErrorとOnCompletedは中略
}

Listの宣言をImmutableArrayに変えて、あとは、フィールドの代入が全然変わってる!そう、ImmutableArrayの差し替えはちょっと面倒くさいのです。所謂CAS(Compare And Swap)という奴で、「大抵の場合は衝突しないけど原理的にたまに衝突する」という場合のために、グルグル回って比較して置き換えるという手段を取ります。ImmutableCollectionsにはそういった処理のためのヘルパーメソッドがImmutableInterlockedクラスに幾つか用意されています。ここではImmutableArrayで使えるImmutableInterlocked.InterlockedCompareExchangeを使いました。

特性

ImmutableArrayの中身は、配列です。Addは中で内部の配列をまるっとコピーして、新しい配列を作っています。foreachはその内部の配列に対して列挙かけるだけなので、普通の配列を回すのと性能はまるっきり変わらない。なので、Addのコストは非常に高いけれど、他は通常の配列と変わらないぐらい高速というのが特性です。

なんでSubjectの実装にImmutableArrayが適切かというと、「追加や削除よりも圧倒的に多く列挙が呼ばれる」からですね。そもそも普通にOnNext書けば毎回コピーが走るので、だったら追加の時のコピー一発で済ませられるなら遥かに高効率と思われるのではないでしょーか。

これにより、イベント的な使用でのSubjectのパフォーマンスは、ノーロック・ノーコピーで、配列とほぼ同等の性能が出ます。完璧!

まとめ

Immutable Collectionsは、まぁ、実際のとこガチッと使えるシーンがはまるケースはぶっちけあんまないと思います!コレクションとしての重要度は 普通のジェネリックコレクション>コンカレントコレクション>超えられない壁>イミュータブルコレクション でしょうし、使うコレクションを探す場合も、そこから順番で考えたほうが良いでしょふ。neue cc - .NETのコレクション概要とImmutable Collectionsについてでも書いたのですけれど、別にイミューラブルコレクション=速い、というのは大間違いです。むしろかなりピーキーで、性能特性をしっかり考えないと全く使いこなせません。

それでも今回のように使えるかもしれない!?ような局面というもの自体は存在するので、覚えておいて損はないと思います。次の.NET Frameworkに標準で入るのかどうかは今のところ分かりませんが、多分入るんじゃないかなー、Roslynで使いたいようだしー、って感じなので先取りしちゃりましょう!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive