Reactive Extensionsとエラーハンドリング

例外処理は非常に大事だけれど、非常に難しい。非同期となると、なおのこと難しくなる!一体どう処理したらいいものか。勿論、放置するわけにもいかない避けては通れない話。そのため、Reactive Extensionsには豊富な例外処理手段が用意されています。決してOnErrorだけではありません。想像を遥かに越える、恐るべき柔軟さがそこにはあります。そして、これもまた、何故Rxを使うべきなのか、の強い理由の一つになるでしょう。

OnError

まずは基本の例から。なお、DownloadStringAsyncはReactive Extensions用のWebRequest拡張メソッドからです。BeginGetResponseしてStreamをReadToEndしたもの、と思ってください。この辺も書いてると長くなるし本質的には関係ないので。「非同期で文字列(HTML)をダウンロード」程度の意味で。

// 存在しないアドレスにアクセスしようとする(当然例外が起こる!)
// 出力結果は「リモート名が解決出来ませんでした」など。Timeoutを待つ必要はあります。
WebRequest.Create("http://goooooogllle.co.jp/")
    .DownloadStringAsync()
    .Subscribe(
        s => Console.WriteLine(s), // OnNext
        e => Console.WriteLine(e.Message), // OnError
        () => Console.WriteLine("completed!")); // OnCompleted

// OnNextのみの場合はcatchされずに例外が外にthrowされる
WebRequest.Create("http://goooooogllle.co.jp/")
    .DownloadStringAsync()
    .Subscribe(Console.WriteLine);

存在しないアドレスにアクセスしたため、WebExceptionが発生します。Rxにおける基本の例外処理は、Subscribe時にOnErrorにエラー時処理を書くこと。ここにシーケンス内で発生した例外が集められ、一括で処理出来ます。

ところで、Subscribeのオーバーロードは多数用意されている、ように見えて実のところ一つしかありません。IObserver<T>を受け取るものただ一つ。では、普段やっているAction<T>を一つだけ渡しているのは何なの?というと、OnNextだけ定義されたIObserver<T>を作るショートカットにすぎません。挙動としては、OnErrorを省略した場合は例外をcatchせずそのままthrowされていきます。OnErrorを定義した場合は、ここでExceptionを丸ごとcatch。

OnErrorで丸ごとキャッチというのは、try-catch(Exception e)のようかもしれません。例外は何でもキャッチはダメ、なるべく上の階層で処理すべき、というセオリーから考えると違和感も?ただ、同期的に書いた場合は下位層でcatchせず、最上位でcatchしよう、という話が成立しますが、非同期には最上位などというものはなく、OnErrorで掴まなければ集約例外ハンドラ行きとなるので、最上位の呼び出しメソッドなどというものはなく、そんな当てはまるものでもないかもです。というか、つまりはOnErrorが最上位のcatchの役割を担っているわけですね。

Catch

出てくる例外によって処理内容を変えたかったり、例外の種類によってはCatchしないで欲しかったりするシチュエーションはいっぱいあります。その場合OnErrorでとりあえずExceptionを取ってe is HogeException... などと分岐、というのは格好悪い。というわけで、Catchメソッド。

// TwitterのUserTimeLineは認証(OAuth)が必要なので例外が発生する!
// 出力結果は、以下のレスポンス
// {"error":"This method requires authentication.","request":"\/statuses\/user_timeline.json"}
WebRequest.Create("http://twitter.com/statuses/user_timeline.json")
    .DownloadStringAsync()
    .Catch((WebException e) => e.Response.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

Catchに渡すメソッドは型が指定出来て、型が一致しない例外はCatchしません。そして、ここで少し面白いのが渡すメソッドの戻り値はIObservable<T>でなければならないということ。例外が発生した場合は、後続に代わりのシーケンスを渡すことが出来ます。

例えばWebRequestではエラー原因を知るため、WebExceptionからResponseを取ってデータを取得したいわけです。そこで、そのままWebException中のResponseStreamから非同期で読み取ってそのまま流す。という例が上のコードです。WebAPIを試してる間はこうやってエラーメッセージが簡単に読み取れると非常に楽。OAuthとかややこしくて中々認証通せなくて泣きますからね……。

Empty/Never

Catchしたら例外処理する(ログに書くなどなど)だけで、別に後続に渡したいものなどない。という場合はEmptyを渡してやりましょう。

// 5が出たら例外出してみる
// 出力結果は1,2,3,4,completed
Observable.Range(1, 10)
    .Do(i => { if (i == 5) throw new Exception(); })
    .Catch((Exception e) => Observable.Empty<int>())
    .Subscribe(i => Console.WriteLine(i), e => { }, () => Console.WriteLine("completed"));

他に同種のものとして、Neverがあります。

// Neverは何も返さない物
// Emptyとの違いは、OnCompletedすら発生しない
// 余談ですが、FromEventの第一引数は最新情報によると h => h.Invoke が一番短く書けてお薦めです!
var collection = new ObservableCollection<int>();

Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
        h => h.Invoke, h => collection.CollectionChanged += h, h => collection.CollectionChanged -= h)
    .Select(e => (int)e.EventArgs.NewItems[0])
    .Do(i => { if (i == -1)  throw new Exception(); })
    .Catch((Exception e) => Observable.Never<int>())
    .Subscribe(Console.WriteLine, e => { }, () => Console.WriteLine("終了"));

// 出力結果は 300
collection.Add(300);  // 300が出力される
collection.Add(-1);   // 例外発生
collection.Add(1000); // デタッチ済みなので何も発生しない

NeverはOnCompletedを発生させないという性質が、Emptyとの使い分けとして面白いかもしれません。

OnCompletedとFinallyの違いについて

Catchがあるなら、当然Finallyもあります!そこでふと思うFinallyとOnCompletedってどう違うの?というと、OnErrorとの絡みで違います。

// 出力結果は 例外発生, ふぁいなりー
Observable.Throw<int>(new Exception()) // Throwは例外のみを出すというもの
    .Finally(() => Console.WriteLine("ふぁいなりー"))
    .Subscribe(
        i => Console.WriteLine(i),
        e => Console.WriteLine("例外発生"),
        () => Console.WriteLine("こんぷりーてっど"));

Reactive Extensionsにおいて守られている、また、もし自分で拡張メソッドを書く場合などに守らなければならない原則があります。それは「OnErrorとOnCompletedはどちらか一つのみが発生する」「OnError/OnCompleted後はOnNextは発生しない」という点。というわけで、FinallyとOnCompletedの違いですが、例外発生時にも実行されるのがFinally、そうでないのがOnCompletedといったところです。

また、Finallyは途中で挟むのでメソッドチェーンの並びによっては実行されるタイミングを調整出来るのもポイントです。例えば

// 1, 2, 3, ふぁいなりー1, 100, 101, 102, ふぁいなりー2
Observable.Range(1, 3)
    .Do(Console.WriteLine)
    .Finally(() => Console.WriteLine("ふぁいなりー1"))
    .TakeLast(1)
    .SelectMany(i => Observable.Range(100, 3))
    .Finally(() => Console.WriteLine("ふぁいなりー2"))
    .Subscribe(Console.WriteLine);

TakeLast(1)は最後の1つのみを取得する、そのためにそれ以前のものは「完了」していなければならない。完了したということはFinallyが発動する。というわけで、SelectrMany後の、SubscribeされているOnNextに届く前に一つ目のFinallyが実行されます。二つ目のFinallyに関しては、全てのシーケンスが列挙された最後となります。

この性質は、Streamなど適切にClose/Disposeしなければならないものの実行タイミングの調整に使えます。

OnErrorResumeNext

Catchと同じような性質を持つメソッドとして、OnErrorResumeNextがあります。両者の違いは、例外非発生時に現れます。

// 実行結果は「1, 2, 3」Catchは例外非発生時は後続を渡さない
Observable.Range(1, 3)
    .Catch(Observable.Range(100, 3))
    .Subscribe(Console.WriteLine);

// 実行結果は「1, 2, 3, 100, 101, 102」OnErrorResumeNextは例外非発生時も後続に繋ぐ
Observable.Range(1, 3)
    .OnErrorResumeNext(Observable.Range(100, 3))
    .Subscribe(Console.WriteLine);

代わりに渡す後続が、例外非発生時にも渡されるのがOnErrorResumeNext、渡さないのがCatch。つまりOnErrorResumeNextは必ず後続が繋がれるので、Catch().Concat()、で表現できます。

Retry/Timeout

Webにアクセスする時って失敗したらリトライしたいですよねー、特にTwitterなんて普通にサーバー不調でエラー返してきやがりますからね!という時はRetry。

// Access Start x3回のあとに401例外発生
Observable.Defer(() =>
    {
        Console.WriteLine("Access Start:");
        return WebRequest.Create("http://twitter.com/statuses/user_timeline.json").DownloadStringAsync();
    })
    .Retry(3)
    .Subscribe(Console.WriteLine);

再アクセスしているのが分かるようコードが少しゴチャついてしまいましたが……。この例では認証が必要なものにアクセスしているため、100% WebExceptionが発生してます。んが、Retry(3)ということで、3回リトライしています。リトライ処理は必須ではあるものの、面倒くさいものの筆頭でしたが、恐ろしく簡単に書けてしまいました。同様に、タイムアウトもあります。

// Timeoutは指定時間以内に値が通過しなければTimeoutExceptionが発生します
// カウントのタイミングはSubscribeされたらスタート
// この例では恐らく例外発生します(100ミリ秒で結果を返せる、ことは恐らくないでしょふ)
var wc = new WebClient();
Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
        h => h.Invoke, h => wc.DownloadStringCompleted += h, h => wc.DownloadStringCompleted -= h)
    .Timeout(TimeSpan.FromMilliseconds(100)) // 100ミリ秒
    .Take(1) // これつけないとTimeoutのチェックが働き続けます←WebClientもWebRequest感覚になるようにしたほうがRx的にはお薦め
    .Subscribe(e => Console.WriteLine(e.EventArgs.Result));

wc.DownloadStringAsync(new Uri("http://bing.com/"));

Web関連だとWebRequestであればTimeoutがありますが、ないもの(WebClient)もありますし、また、こちらのほうがお手軽なので、それなりに重宝すると思います。勿論、Web関連以外のメソッドでも使えますので、例えば一つのボタンが押されて、指定時間内にもう一つのボタンが押されなければTimeoutExceptionを出す、などなども考えられますね。更にそれをTimeoutExceptionをCatchで、指定時間内で二つのボタンが押されなかった場合の処理を追加、などなど、従来面倒くさかったことが恐ろしくスッキリ記述出来ます。

まとめ

ReactiveOAuthが機能的にスッカラカンなのは手抜きじゃなくて、(Retryとか)Rxに丸投げ出来るからです。ひたすら機能は分解、モジュールは小さく。というのは大変関数型的だと思うのですが、まさにそれです。(ただ、ちょっと、というかかなり手直ししないとマズいところがあるので近いうちに更新します……)

といったわけで、Rxはかなり柔軟にエラーを処理することが出来るため、もう同期とか非同期とか関係なく、Rx挟まないでネットワーク処理書くのは面倒くさくて嫌です、という勢いだったり。大変素晴らしい。というかもう全ての処理をRx上に載せてしまってもいいんじゃね?ぐらいの勢いだったりしますがそれはやりすぎである。

なお、この記事はC# Advent Calendar jp: 2010の12/4分として書きました。リレーが途切れると、寂しいです。最後まで完走させたいですね。JavaScriptなど割とすぐに埋まったのに、こんなところで言語の(日本のネット上での)人気不人気が視覚化されるとしたら、寂しい話です。

あと、私はJavaScript Advent Calendar 2010のほうでも12/20に書くので、20日もよろしくどうも。linq.jsとRxJSについて書きます。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive