RxとパフォーマンスとユニットテストとMoles再び
- 2011-12-21
C# Advent Calendar 2011、順調に進んでいますね。どのエントリも力作で大変素晴らしいです。私はこないだModern C# Programming Style Guideというものを書きました。はてブ数は現段階で45、うーん、あまり振るわない……。私の力不足はともかくとしても、他の言語だったらもっと伸びてるだろうに、と思うと、日本のC#の現状はそんなものかなあ、はぁ、という感じではあります。はてブが全てではない(むしろ斜陽?)とはいえ、Twitterでの言及数などを見ても、やっぱまだまだまだまだまだまだ厳しいかなあ、といったところ。Unityなどもあって、見ている限りだと人口自体は着実に増えている感じではありますけれど、もっともっと、関心持ってくれる人が増えるといいな。私も微力ながら尽力したいところです。
ところで、id:ZOETROPEさんのAdvent Calendarの記事、Reactive Extensionsでセンサプログラミングが大変素晴らしい!センサー、というと私だとWindows Phone 7から引っ張ってくるぐらいしか浮かばないのですが(最近だとKinectもHotですか、私は全然触れてませんが……)おお、USB接続のレンジセンサ!完全に門外漢な私としては、そういうのもあるのか!といったぐらいなわけですが、こうしてコード見させていただくと、実践的に使うRxといった感じでとてもいいです。
記事中で扱われているトピックも幅広いわけですが、まず、パフォーマンスに関しては少し補足を。@okazukiさんの見せてもらおうじゃないかReactive Extensionsの性能とやらを! その2のコメント欄でもちょっと言及したのですが、この測り方の場合、Observable.Rangeに引っ張られているので、ベンチマークの値はちょっと不正確かな、と思います。
// 1000回イベントが発火(発火の度に長さ3000のbyte配列が得られる)を模写
static IObservable<byte[]> DummyEventsRaised()
{
return Observable.Repeat(new byte[3000], 1000, Scheduler.Immediate);
}
// 配列をバラす処理にObservable.Rangeを用いた場合
static IObservable<byte> TestObservableRange()
{
return Observable.Create<byte>(observer =>
{
return DummyEventsRaised()
.Subscribe(xs =>
{
Observable.Range(0, xs.Length, Scheduler.Immediate).ForEach(x => observer.OnNext(xs[x]));
});
});
}
// 配列をバラす処理にEnumerable.Rangeを用いた場合(ForEachはIxのもの)
static IObservable<byte> TestEnumerableRange()
{
return Observable.Create<byte>(observer =>
{
return DummyEventsRaised()
.Subscribe(xs =>
{
Enumerable.Range(0, xs.Length).ForEach(x => observer.OnNext(xs[x]));
});
});
}
// SelectManyでバラす場合
static IObservable<byte> TestSelectMany()
{
return DummyEventsRaised().SelectMany(xs => xs);
}
static void Main(string[] args)
{
// ベンチマーク補助関数
Action<Action, string> bench = (action, label) =>
{
var sw = Stopwatch.StartNew();
action();
Console.WriteLine("{0,-12}{1}", label, sw.Elapsed);
};
// 配列をばらすケースは再度連結する(ToList)
bench(() => TestObservableRange().ToList().Subscribe(), "Ob.Range");
bench(() => TestEnumerableRange().ToList().Subscribe(), "En.Range");
bench(() => TestSelectMany().ToList().Subscribe(), "SelectMany");
// 配列をばらして連結せず直接処理する場合
bench(() => TestSelectMany().Subscribe(), "DirectRx");
// byte[]をばらさず直接処理する場合
bench(() => DummyEventsRaised().Subscribe(xs => { foreach (var x in xs);}), "DirectLoop");
// 実行結果
// Ob.Range 00:00:02.2619670
// En.Range 00:00:00.2600460
// SelectMany 00:00:00.2701137
// DirectRx 00:00:00.0852836
// DirectLoop 00:00:00.0152816
}
得られる配列をダイレクトに処理するとして、Observable.Rangeで配列のループを回すと論外なほど遅い。のですが、しかし、この場合ですとEnumerable.Rangeで十分なわけで、そうすれば速度は全然変わってきます(もっと言えば、ここではEnumerable.Rangeではなくforeachを使えば更に若干速くなります)。更に、これは配列を平坦化している処理とみなすことができるので、observerを直に触らず、SelectManyを使うこともできますね。そうすれば速度はほとんど変わらず、コードはよりすっきり仕上がります。
と、いうわけで、遅さの原因はObservable.Rangeです。Rangeが遅いということはRepeatやGenerateなども同様に遅いです。遅い理由は、値の一つ一つをISchedulerを通して流しているから。スケジューラ経由であることは大きな柔軟性をもたらしていますが、直にforeachするよりもずっとずっと遅くなる。なので、Enumerableで処理出来る局面ならば、Enumerableを使わなければなりません。これは、使うほうがいい、とかではなくて、圧倒的な速度差となるので、絶対に、Enumerableのほうを使いましょう。
また、一旦配列をバラして、再度連結というのは、無駄極まりなく、大きな速度差にも現れてきます。もし再度連結しないでそのまま利用(ベンチ結果:DirectRx)すれば直接ループを回す(ベンチ結果:DirectLoop)よりも5倍程度の遅さで済んでいます。このぐらいなら許容範囲と言えないでしょうか?とはいえ、それでも、遅さには違いないわけで、避けれるのならば避けたほうがよいでしょう。
ZOETROPEさんの記事にあるように、ここはばらさないほうが良い、というのが結論かなあ、と思います。正しくは上流ではばらさない。一旦バラしたものは復元不可能です。LINQで、パイプラインで処理を接続することが可能という性質を活かすのならば、なるべく後続で自由の効く形で流してあげたほうがいい。アプリケーション側でバラす必要があるなら、それこそSelectMany一発でばらせるのだから。
例えばWebRequestで配列状態のXMLを取ってくるとします。要素は20個あるとしましょう。最初の文字列状態だけを送られてもあまり意味はないので、XElement.Parseして、実際のクラスへのマッピングまではやります。例えばここではPersonにマッピングするとして、長さ1のIObservable<Person[]>です。しかし、それをSelectManyして長さ20のIObservable<Person>にはしないほうがいい。ここでバラしてしまうと長さという情報は消滅してしまうし、一回のリクエスト単位ではなくなるのも不都合が生じやすい。もしアプリケーション的にフラットになっていたほうが都合が良いのなら、それはまたそれで別のメソッドとして切り分けましょう。
成功と失敗の一本化
ZOETROPEさんの記事の素晴らしいのは、通常のルート(DataReceived)と失敗のルート(ErrorReceived)を混ぜあわせているところ!これもまたイベントの合成の一つの形なわけなんですねー。こういう事例はWebClientのDownloadStringAsyncのような、EAP(Eventbased Asynchronous Programming)をTaskCompletionSourceでラップしてTaskに変換する 方法: タスクに EAP パターンをラップする←なんかゴチャゴチャしていますが、TrySetCanceled, TrySetException, TrySetResultで結果を包んでいます、というのと似た話だと見なせます。
WebClientではEventArgsがCancelledやErrorといったステータスを持っているのでずっと単純ですが、SerialPortではエラーは別のイベントでやってくるのですね。というわけで、私もラップしてみました。
public static class SerialPortExtensions
{
// 面倒くさいけれど単純なFromEventでのイベントのRx化
public static IObservable<SerialDataReceivedEventArgs> DataReceivedAsObservable(this SerialPort serialPort)
{
return Observable.FromEvent<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
h => (sender, e) => h(e), h => serialPort.DataReceived += h, h => serialPort.DataReceived -= h);
}
public static IObservable<SerialErrorReceivedEventArgs> ErrorReceivedAsObservable(this SerialPort serialPort)
{
return Observable.FromEvent<SerialErrorReceivedEventHandler, SerialErrorReceivedEventArgs>(
h => (sender, e) => h(e), h => serialPort.ErrorReceived += h, h => serialPort.ErrorReceived -= h);
}
// DataReceived(プラスbyte[]化)とErrorReceivedを合成する
public static IObservable<byte[]> ObserveReceiveBytes(this SerialPort serialPort)
{
var received = serialPort.DataReceivedAsObservable()
.TakeWhile(e => e.EventType != SerialData.Eof) // これでOnCompletedを出す
.Select(e =>
{
var buf = new byte[serialPort.BytesToRead];
serialPort.Read(buf, 0, buf.Length);
return buf;
});
var error = serialPort.ErrorReceivedAsObservable()
.Take(1) // 届いたらすぐに例外だすので長さ1として扱う(どうせthrowするなら関係ないけど一応)
.Do(x => { throw new Exception(x.EventType.ToString()); });
return received.TakeUntil(error); // receivedが完了した時に同時にerrorをデタッチする必要があるのでMergeではダメ
}
}
成功例と失敗例を合成して一本のストリーム化。また、DataReceivedはそのままじゃデータすっからかんなので、Selectでbyte[]に変換してあげています。これで、ObserveReceiveBytes拡張メソッドを呼び出すだけで、かなり扱いやすい形になっている、と言えるでしょう。パフォーマンスも、これなら全く問題ありません。
MolesとRx
と、ドヤ顔しながら書いていたのですが、とーぜんセンサーの実物なんて持ってませんので動作確認しようにもできないし。ま、まあ、そういう時はモックとか用意して、ってSerialDataReceivedEventArgsはパブリックなコンストラクタないし、ああもうどうすればー。と、そこで出てくるのがMoles - Isolation framework。以前にRx + MolesによるC#での次世代非同期モックテスト考察という記事で紹介したのですが、めちゃくちゃ強力なモックライブラリです。パブリックなコンストラクタがないとか関係なくダミーのインスタンスを生成可能だし、センサーのイベントだから作り出せないし、なんてこともなく自由にダミーのイベントを発行しまくれます。
[TestClass]
public class SerialPortExtensionsTest : ReactiveTest
{
[TestMethod, HostType("Moles")]
public void ObserveReceiveBytesOnCompleted()
{
// EventArgsを捏造!
var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
// SerialPort::BytesToRead/SerialPort::Readで何もしない
MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
var scheduler = new TestScheduler();
// 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
OnNext(10, chars),
OnNext(20, chars),
OnNext(30, chars),
OnNext(40, chars),
OnNext(50, eof))
.Select(x => (SerialDataReceivedEventArgs)x);
// 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
result.Messages.Is(
OnNext(10, Unit.Default),
OnNext(20, Unit.Default),
OnNext(30, Unit.Default),
OnNext(40, Unit.Default),
OnCompleted<Unit>(50));
}
[TestMethod, HostType("Moles")]
public void ObserveReceiveBytesOnError()
{
// EventArgsを捏造!
var chars = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Chars };
var eof = new MSerialDataReceivedEventArgs() { EventTypeGet = () => SerialData.Eof };
// SerialPort::BytesToRead/SerialPort::Readで何もしない
MSerialPort.AllInstances.BytesToReadGet = (self) => 0;
MSerialPort.AllInstances.ReadByteArrayInt32Int32 = (self, buffer, offset, count) => 0;
var scheduler = new TestScheduler();
// 時間10, 20, 30, 40でSerialData.Charsのイベントを、時間50でEofのイベントを発行
MSerialPortExtensions.DataReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
OnNext(10, chars),
OnNext(20, chars),
OnNext(30, chars),
OnNext(40, chars),
OnNext(50, eof))
.Select(x => (SerialDataReceivedEventArgs)x);
/* ↑までOnCompletedのものと共通 */
// 時間35でErrorのイベントを発行
MSerialPortExtensions.ErrorReceivedAsObservableSerialPort = _ => scheduler.CreateHotObservable(
OnNext<SerialErrorReceivedEventArgs>(35, new MSerialErrorReceivedEventArgs()));
// 走らせる(戻り値のbyte[]はどうでもいいので無視するためUnitに変換)
var result = scheduler.Start(() => new SerialPort().ObserveReceiveBytes().Select(_ => Unit.Default), 0, 0, 100);
// Exceptionの等値比較ができないので、バラしてAssertする
result.Messages.Count.Is(4);
result.Messages[0].Is(OnNext(10, Unit.Default));
result.Messages[1].Is(OnNext(20, Unit.Default));
result.Messages[2].Is(OnNext(30, Unit.Default));
result.Messages[3].Value.Kind.Is(NotificationKind.OnError);
result.Messages[3].Time.Is(35);
}
}
アサーションに使っているIsメソッドは、いつも通りChaining Assertionです。
Molesがいくら強力だとは言っても、イベントをそのまま乗っ取るのはデリゲートの差し替えなどで、割と面倒だったりします。しかし、FromEventでラップしただけのIObservable<T>を用意しておくと…… それを差し替えるだけで済むので超簡単になります。イベント発行については、TestScheduler(Rx-Testingを参照しておく)で、仮想時間で発行する値を作ってしまうと楽です。こういう、任意の時間で任意の値、というダミーの用意もFromEventでラップしただけのIObservable<T>があると、非常に簡単になります。
あとは、scheduler.Startで走らせると(3つの引数はそれぞれcreated, subscribed, disposedの仮想時間、何も指定しないと…… 実は0始まり「ではない」ことに注意。100,200,1000がデフォなので、0はすっ飛ばされています)、その戻り値で結果を受け取って、Messagesに記録されているので、それにたいしてアサートメソッドをしかける。
実に簡単ですね!Molesの力とRxの力が組み合わさると、イベントのテストが恐ろしく簡単になります。素敵じゃないでしょうか?
まとめ
テストなしで書いてたコードは、Molesでテスト走らせたら間違ってました。TakeWhileの条件が==だったのと、Mergeで結合していたり……。はっはっは、ちゃんとユニットテストは書かないとダメですね!そして、Molesのお陰でちゃんと動作するコードが書けたので恥を欠かなくてすみました、やったね。