Reactive Extensionsとスレッドのlock
- 2011-11-30
ぱられるぱられる。もしパラレルにイベントが飛んできたら、どうする?
public class TestParallel
{
public event Action<int> Log = _ => { }; // nullチェック面倒ぃので
public void Raise()
{
// デュアルコア以上のマシンで試してね!
Parallel.For(0, 10000000, x =>
{
Log(x);
});
}
}
class Program
{
static void Main(string[] args)
{
var list = new List<int>();
var tes = new TestParallel();
// イベント登録して
tes.Log += x => list.Add(x);
// 実行
tes.Raise();
}
}
これは、十中八九、例外が出ます。list.Addはスレッドセーフじゃないので、まあそうだよね、と。では、Rxを使ってみるとどうなるでしょうか。
var list = new List<int>();
var tes = new TestParallel();
// イベント登録して
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
.Subscribe(list.Add);
// 実行
tes.Raise();
やはり変わりません。例外出ます。FromEventを中継しているだけですから……。さて、しかし一々Addの手前でlockするのは面倒だ、と、そこでSynchronizeメソッドが使えます。
Observable.FromEvent<int>(h => tes.Log += h, h => tes.Log -= h)
.Synchronize()
.Subscribe(list.Add);
// ようするにこんな感じになってる
var gate = new Object();
//....
lock(gate)
{
OnNext();
}
これで、list.Addを問題なく動作させられます。Listとか適度にデリケートなので適当に注意してあげましょう。
Subjectの場合
さて、上のはイベントでしたが、ではSubjectの場合はどうなるでしょう。
public class TestParallel
{
Subject<int> logMessenger = new Subject<int>();
public IObservable<int> Log { get { return logMessenger.AsObservable(); } }
public void Raise()
{
// デュアルコア以上のマシンで試してね!
Parallel.For(0, 10000000, x =>
{
logMessenger.OnNext(x);
});
}
}
class Program
{
static void Main(string[] args)
{
var list = new List<int>();
var tes = new TestParallel();
// イベント登録して
tes.Log.Subscribe(list.Add);
// 実行
tes.Raise();
}
}
たまーに例外起こらず処理できることもあるんですが、まあ大体は例外起こるんじゃないかと思います。初期のRxのSubjectは割とガチガチにlockされてたのですが、現在はパフォーマンスが優先されているため挙動が変更され、ゆるゆるです。回避策は同様にSynchronizeを足すことです。
tes.Log.Synchronize().Subscribe(list.Add);
これで問題なし。
余談
手元に残っていた大昔のRxを使って実行してみたら、死ぬほど遅かったり。確実に現在のものはパフォーマンス上がっていますねえ。あと、なんかもう最近面倒でeventだからってEventArgs使わなきゃならないなんて誰が言ったー、とActionばかり使うという手抜きをしてます。だってsenderいらないもん、大抵のばやい。