Reactive Extensionsとスレッドのlock

ぱられるぱられる。もしパラレルにイベントが飛んできたら、どうする?

public class TestParallel
{
    public event Action<int> Log = _ => { }; // nullチェック面倒ぃので

    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            Log(x);
        });
    }
}

class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();

        // イベント登録して
        tes.Log += x => list.Add(x);

        // 実行
        tes.Raise();
    }
}

これは、十中八九、例外が出ます。list.Addはスレッドセーフじゃないので、まあそうだよね、と。では、Rxを使ってみるとどうなるでしょうか。

var list = new List<int>();
var tes = new TestParallel();

// イベント登録して
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Subscribe(list.Add);

// 実行
tes.Raise();

やはり変わりません。例外出ます。FromEventを中継しているだけですから……。さて、しかし一々Addの手前でlockするのは面倒だ、と、そこでSynchronizeメソッドが使えます。

Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
    .Synchronize()
    .Subscribe(list.Add);

// ようするにこんな感じになってる

var gate = new Object();
//....
lock(gate)
{
    OnNext();
}

これで、list.Addを問題なく動作させられます。Listとか適度にデリケートなので適当に注意してあげましょう。

Subjectの場合

さて、上のはイベントでしたが、ではSubjectの場合はどうなるでしょう。

public class TestParallel
{
    Subject<int> logMessenger = new Subject<int>();
    public IObservable<int> Log { get { return logMessenger.AsObservable(); } }

    public void Raise()
    {
        // デュアルコア以上のマシンで試してね!
        Parallel.For(0, 10000000, x =>
        {
            logMessenger.OnNext(x);
        });
    }
}

class Program
{
    static void Main(string[] args)
    {
        var list = new List<int>();
        var tes = new TestParallel();

        // イベント登録して
        tes.Log.Subscribe(list.Add);

        // 実行
        tes.Raise();
    }
}

たまーに例外起こらず処理できることもあるんですが、まあ大体は例外起こるんじゃないかと思います。初期のRxのSubjectは割とガチガチにlockされてたのですが、現在はパフォーマンスが優先されているため挙動が変更され、ゆるゆるです。回避策は同様にSynchronizeを足すことです。

tes.Log.Synchronize().Subscribe(list.Add);

これで問題なし。

余談

手元に残っていた大昔のRxを使って実行してみたら、死ぬほど遅かったり。確実に現在のものはパフォーマンス上がっていますねえ。あと、なんかもう最近面倒でeventだからってEventArgs使わなきゃならないなんて誰が言ったー、とActionばかり使うという手抜きをしてます。だってsenderいらないもん、大抵のばやい。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive