Reactive Extensions v1.0安定版リリース

Reactive Extensions v1.0 Stable and v1.1 Experimental available now! ということで、今までも安定版だの正式だの何なり言っていましたが、今回こそ、本当に本当に正式リリース、v1.0だそうです。整理されたドキュメント、多くのチュートリアルビデオ、大幅なパフォーマンス改善、そして、よくテストされた(かどうかは不明)安定版としてのライブラリ本体。全てが整いました。さあ、使いましょう!実際のプロダクトに!

また、NuGetではRx-でStable版を、Rx_experimental-で実験版を、他にIx_experimental-でIxをインストールすることが可能です。

Ix復活

Ix(Interactive Extensions、Reactiveの反対ということでEnumerableEx、Linq to Objectsを拡張する拡張メソッド群)が復活しました。復活前に、今後についての意見を募集していたのですが、素敵なことに私の意見が全部反映されていました!XxxEnumerableはIntelliSenseの邪魔だから廃止してね、とかForEachにはindex付きのオーバーロード入れてよ、とかTreeを走査するメソッド入れてよ、とか。これは嬉しい。いやあ、やっぱり言っておくものですね。

そんなわけで、もうForEachを自作する必要はありません:)

標準クエリ演算子で何が「出来ない」のかを知っておくことは大切です。標準では複数回列挙なしで前後の値を使うことは出来ないし、再帰的に辿るような、複数段のSelectManyも出来ない。MaxByなどキーを使った最大値の取得もない。

Ixは、そこを補完してくれます。

Expand

Ixの中から、Expandを紹介します。なお、Rx(Observable)にはExperimental Releaseのほうにはあるのですが、まだStableには入っていません。

これは何かというと幅優先探索でツリーを解きほぐします。イミワカリマセンネ。ええと、ツリー状のオブジェクトを辿る場合は、通常は再帰を使って書くと思います。でも、そうして再帰で辿るのって、各枝の値が欲しいわけなんですよね?もしそうなら、ツリーは一直線上に出来る。IEnumerable<T>に出来る。LINQが適用できる。

ええ。熟練した C# 使いは再帰を書かないのです。で、ツリーです。ツリーを辿るのは頻出の再帰構造でパターン化できて、うんたらかんたら。ああ、もう!与えられた木から,子→親への対応を作る,を C# で - NyaRuRuの日記を見るといいです、それで全部解決です!

ExpandはBreadthFirstにあたります。DepthFirstはないの?というと、今のところないですねー。ないものはないです、しょうがない。それはさておき、このツリー的なものを辿るのというのは割とあるシチュエーションで、そしてExpandというメソッドは実に強力なので、是非使い方をマスターして欲しい。ので、例を出します。例えばWinFormsのコントロール。

Panelの下にButtonや、子Panelが並んでいます。さて、この中から全てのButtonを取り出したいのですが、どうしましょうか?予め配列にButtonを持っておく、のもまあ答えですが、ルート階層から辿るようにしましょう。Formに並ぶコントロールは、階層に別れたツリー構造をしています。Expandを使ってみましょう。

Expandはセレクターの結果がEmptyになるまで、辿り続けます。辿る順番は階層順(幅優先)。今回はContorolsを辿って全てのControlを列挙、Buttonが欲しいのでOfTypeでフィルタリング。というわけです。WPFやSilverlightでも、同様に辿ることが出来ます(ちょっとWPFのコントロール階層が面倒くさくて、コードがゴチャゴチャするので、今回はWinFormsを例とさせていただきました)

こういった走査メソッドはlinq.jsにもあります(CascadeBreadthFirst、メソッド名のとおり、NyaRuRuさんの作成されたAchiralから大きな影響を受けています)。JavaScriptの場合、ツリーの代表的なものはDOMです、というわけで勿論DOMに適用できるのですが、DOMの列挙はjQueryでやったほうがいいですよー、なので、JSONやオブジェクト(これもまたツリーになっている)の走査に使うのがいいかもしれません。何にせよ、使いどころというのは存外あるものです。

気になった点

そんな素敵なIxですが、触っていて幾つか気になった点があったので、またForumに投げておきました。リクエストが反映されたことで、調子に乗って味を占めているのかもしれません。ではなくて、フィードバックは積極的に出してあげたほうがいいでしょう常識的に考えて。英語?全部機械翻訳です、はは、まあ、コードがあれば伝わるはず。伝わりました。返答が15分で来た。

まず、Scanの挙動。seedなしの場合に、Rxは最初の値も列挙しますが、Ixは最初の値をスルーするという、挙動の違い。で、これ、最初の値がスルーされるのは都合が悪いので(スルーしたきゃあSkip(1)すればいい)、修正かなあ、と思います。返答では、この点については何も言ってませんでしたが。

Scanにはもう一つ、seed有りの際に、Rx, Ixともにseedをスルーしますが、F#などはseedも列挙します。これは以前はScan0というseed含めて列挙する別のメソッドがあった(WP7版にはある)のですが、今はScan0は廃止されたので、それならScanの挙動をseed込みでの列挙に変更すべき。と、思ったのですが、返答はStartWith(seed)を使えばいいとのこと。それでも確かにいいのですが、基本seed有りにしてseed飛ばしたい時はSkip(1)のほうが使いやすいと思うのだけど。まあ、これはWP7版との互換性の問題もありますし、そもそもRxはStableと言った以上、もう挙動は変えられないので、しょうがないところかもしれません。

他にはRepeat拡張メソッド(無限リピートする)のソースがEmptyの場合の挙動。例えば Range(1,1).Repeat().Take(3) と1,1,1になるわけですが、 Empty().Repeat().Take(3) の結果はどうでしょう?答えは、無限ループを彷徨って止まらなくなります。Emptyに対するRepeatをどう解釈するか、は正直微妙なところですけれど、元ソースが空だと死ぬというのは、結構リスキーなのではないかな、と考えてしまうのです。また、Takeをつけるのは止まることを期待するという点もあるわけですが、この場合はTakeをつけようが何しようが無駄、というのも怖い。Rxチームからの回答は、この挙動は仕様とのことでした。

最後にちょっとリクエストしてみた。今回のIxではDistinctにオーバーロードが足されています。通常だとIEqualityComparer<T>というダルいものをクラス定義して(ここが最悪!)渡さなければいけないのですが、ラムダ式でキーを指定するだけで済みます。言うならばDistinctByといったところ。これは、実に大変有益です。このオーバーロードは、AnonymousComparerという私が以前に作ったものにも載っているのですが(ちなみに拡張メソッドがかち合ってしまうため、Ixと同時使用は不可能になってしまった!まあ、.csファイル一個のライブラリなので、かち合う部分はコメントアウトすればいいのですが)、大変重宝しています。しょっちゅう使ってます。特にExceptとかで多用しているんですが……、今回IxではDistinctにしかオーバーロード足されていません。他の集合系メソッドであるIntersect, Except, Unionでも使えると便利なのになー、って思ってしまうのです。なのでリクエストしておきました。回答は、考えておく、とのことなのでもしかしたら次のバージョンでは乗っかっているかもしれません。

日本語だと言えるけれど、バッサリ切って機械翻訳した英語だと伝えたいニュアンスは吹っ飛んでしまうなあ、ううみぅ。必ずしもコードがあれば伝えられる、わけでもないか、当たり前だけど。

QueryableEx(笑)

ネタ。NuGetではIx_Experimental-Providerで入るんですが、まあ、ネタ。中身はEnumerableExのIQueryable版です。何がネタなのかというと、IQueryableはそれだけでは何の意味もなくて、解釈するエンジン(Linq to SqlなりLinq to Twitterなり)が大事なわけです。そうでなければ、not supportedをぶん投げるだけです。さて、そして、標準クエリ演算子ですらnot supported率が少なくないのに、QueryableExに対応するクエリプロバイダ……。ありえない、です。

そんなわけで、使う機会はないでしょう。んまあ、気合でQueryableExすらもフルサポートするクエリプロバイダを自作すれば、活用出来ますが、やはりそんな機会はないでしょう。

RxJSは?

ドキュメント書いたりとか、QueryableEx作ったり(笑)とか、色々忙しかったのでしょふ。今回「も」全く音沙汰なしですが、次こそは更新されるんじゃないですかねー、分かりませんけど。jQueryにはDeferred乗りましたが、アレは正直かなり使いづらいのでその点でも私はRxJSにしたいなあ。それと、MS支援でのNode.jsのWindows対応も発表されましたし、JavaScriptのAsyncを何とかするためのRx、はかなり価値があると思うので、もう少し頑張って欲しいな、と思います。

まとめ

しっかりとMSDN入りしている、ドキュメントもある、正式にv1 Stableと告知されている、など、もう採用できない理由はなくなりました。日本語リソースはないですが、それは気合で乗り切ればいいぢゃない(とか言ってるうちはダメなのでしょうがー)。

Jesse Liberty(オライリーから出ているプログラミングC#の著者)による解説本も今秋に出るようだし、確実に、順調にメインストリームに乗るテクノロジとしての道を歩んでいますので、安心して追えるのではないかと思います。

ReactiveOAuth ver.0.4 - Twitpic(OAuth Echo)対応

ver.0.4になりました。少し前に0.3.0.1をこっそり出していたので、それを含めて0.3からの差分は、「対象Rxのバージョンが現在最新の1.0.10605(Stable)」に、というのと「Realmが含まれていると認証が正しく生成出来なかったバグの修正」と、「TwitpicClientサンプルの追加」になります。バグのほうは本当にすみません……。Twitterでしかテストしてない&TwitterはRealm使わないため、全然気づいていなくて。ダメですねホント。

OAuth Echo

TwitpicはOAuth Echoという仕組みでTwitterと連携した認証をして、画像を投稿できます。詳しくはUsing OAuth Echo | dev.twitter.comTwitPic Developers - API Documentation - API v2 » uploadにありますが、よくわかりませんね!Twitpicに画像を投稿、というわけでTwitpicのAPIにアクセスするわけですが、その際のヘッダにTwitterに認証するためのOAuthのヘッダを付けておくと、Twitpic側がTwitterに問い合せて認証を行う。という仕組みです、大雑把に言って。

ただのOAuthとはちょっと違うので、今までのReactiveOAuthのOAuthClientクラスは使えない。けれど、認証用ヘッダの生成は同じように作る。というわけで、ここはReactiveOAuthにひっそり用意されているOAuthBaseクラスを継承して、Twitpic専用のTwitpicClientクラスを作りましょう。

が、作るのもまた少し面倒なので Sample/TwitpicClient/TwitpicClient.cs に作成したのを置いておきました。ファイルごとコピペってご自由にお使いください。.NET 4 Client Profile, Silverlight 4, Windows Phone 7の全てに対応しています。

Windows Phone 7でのカメラ撮影+投稿のサンプル

TwitpicClient.cs の解説は後でやりますが、その前に利用例を。WP7でカメラ撮影+投稿をしてみます。CameraCaptureTaskの利用法に関しては CameraCaptureTaskを使ってカメラで静止画撮影を行う – CH3COOH(酢酸)の実験室 を参考にさせて頂きました。TwitterのAccessTokenの取得に関しては、ここでは解説しませんので neue cc - ReactiveOAuth - Windows Phone 7対応のOAuthライブラリ を参照ください。

// CameraCaptureTaskのCompletedイベント
void camera_Completed(object sender, PhotoResult e)
{
    if (e.TaskResult == TaskResult.OK)
    {
        // 撮影画像(Stream)をバイト配列に格納
        var stream = e.ChosenPhoto;
        var buffer = new byte[stream.Length];
        stream.Read(buffer, 0, buffer.Length);

        // key, secret, tokenは別に設定・取得しておいてね
        new TwitpicClient(ConsumerKey, ConsumerSecret, accessToken)
            .UploadPicture(e.OriginalFileName, "from WP7!", buffer)
            .ObserveOnDispatcher()
            .Catch((WebException ex) =>
            {
                MessageBox.Show(new StreamReader(ex.Response.GetResponseStream()).ReadToEnd());
                return Observable.Empty<string>();
            })
            .Subscribe(s => MessageBox.Show(s), ex => MessageBox.Show(ex.ToString()));
    }
}

new TwitpicClient(キー, シークレット, アクセストークン).UploadPicture(ファイル名, メッセージ, 画像) といった風に使います。戻り値はIObservable<string>で結果(投稿後のURLとか)が返ってくるので、あとは好きなように。投稿に失敗した場合は、WebExceptionが投げられるので、それを捉えてエラーメッセージを読み取ると開発には楽になれそうです。

TwitpicClient.cs

以下ソース。Sample/TwitpicClient/TwitpicClient.cs と同じですが、自由にコピペって使ってください。大事なことなので2回言いました。このコード自体はTwitpicに特化してありますが、認証部分のヘッダを少しと画像アップロードを変更する部分を弄れば、他のOAuth Echoサービスにも対応させることができると思います。

using System;
using System.Linq;
using System.Text;
using System.Net;
using System.IO;

#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive.Linq;
#endif

namespace Codeplex.OAuth
{
    public class TwitpicClient : OAuthBase
    {
        const string ApiKey = ""; // set your apikey

        readonly AccessToken accessToken;

        public TwitpicClient(string consumerKey, string consumerSecret, AccessToken accessToken)
            : base(consumerKey, consumerSecret)
        {
            this.accessToken = accessToken;
        }

        private WebRequest CreateRequest(string url)
        {
            const string ServiceProvider = "https://api.twitter.com/1/account/verify_credentials.json";
            const string Realm = "http://api.twitter.com/";

            var req = WebRequest.Create(url);

            // generate oauth signature and parameters
            var parameters = ConstructBasicParameters(ServiceProvider, MethodType.Get, accessToken);
            // make auth header string
            var authHeader = BuildAuthorizationHeader(new[] { new Parameter("Realm", Realm) }.Concat(parameters));

            // set authenticate headers
            req.Headers["X-Verify-Credentials-Authorization"] = authHeader;
            req.Headers["X-Auth-Service-Provider"] = ServiceProvider;

            return req;
        }

        public IObservable<string> UploadPicture(string filename, string message, byte[] file)
        {
            var req = CreateRequest("http://api.twitpic.com/2/upload.xml"); // choose xml or json
            req.Method = "POST";

            var boundaryKey = Guid.NewGuid().ToString();
            var boundary = "--" + boundaryKey;
            req.ContentType = "multipart/form-data; boundary=" + boundaryKey;

            return Observable.Defer(() =>
                    Observable.FromAsyncPattern<Stream>(req.BeginGetRequestStream, req.EndGetRequestStream)())
                .Do(stream =>
                {
                    using (stream)
                    using (var sw = new StreamWriter(stream, new UTF8Encoding(false)))
                    {
                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"key\"");
                        sw.WriteLine();
                        sw.WriteLine(ApiKey);

                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"message\"");
                        sw.WriteLine();
                        sw.WriteLine(message);

                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"media\"; filename=\"" + filename + "\"");
                        sw.WriteLine("Content-Type: application/octet-stream");
                        sw.WriteLine("Content-Transfer-Encoding: binary");
                        sw.WriteLine();
                        sw.Flush();

                        stream.Write(file, 0, file.Length);
                        stream.Flush();

                        sw.WriteLine();
                        sw.WriteLine("--" + boundaryKey + "--");
                        sw.Flush();
                    }
                })
                .SelectMany(_ => Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)())
                .Select(res =>
                {
                    using (res)
                    using (var stream = res.GetResponseStream())
                    using (var sr = new StreamReader(stream, Encoding.UTF8))
                    {
                        return sr.ReadToEnd();
                    }
                });
        }
    }
}

認証ヘッダ作成はConstructBasicParametersとBuildAuthorizationHeaderというprotectedメソッドで行います。わけわかんないよね…気持ち悪いよね…。使いにくいメソッドです、すみません、私もそう思います。そういうものだと思って、見ないふりしてもらえれば幸いです。

コードの大半を占めているのは画像を投稿するためのmultipart/form-dataのもので、これはもうOAuth Echo関係ない話、で、面倒ぃ。特にWP7での非同期だと涙が出る。POSTはBeginGetRequestStreamとBeginGetResponseの二つの非同期メソッドをセットで使う必要があるため、コードがごちゃごちゃするのです。

しかしReactive Extensionsを使えばあら不思議!でもないですが、ネストがなくなって完全に平らなので、結構普通に読めるのではないでしょうか?(ストリーム書き込みのコード量が多いのは、これは同期でやっても同じ話なので)。例外処理も利用例のところで見たように、Catchメソッドをくっつけるだけ。実に色々とスッキリします。

Rxがあれば非同期POSTも怖くない。

やっていることは単純で、FromAsyncPatternでBegin-Endを変換。StreamへのWriteは後続への射影はなく、対象(Stream)に対しての副作用(書き込み)のみなのでDo、RequestStream->Responseへの切り替えはSelectMany、Responseから結果のStringへの変換はSelect、と、お決まりの定型メソッドに置き換えていっただけです。この辺はパターンみたいなものなので、これやるにはこのメソッドね、というのを覚えてしまえばそれでお終いです。

Stream読み書きは非同期にしないの?

StreamにもBeginReadとかBeginWriteとかありますものね。しかし、しません(キリッ。理由は死ぬほど面倒だからです。やってみると分かりますが想像以上に大変で、おまけに何とか実現するためにはRxでのチェーンを大量に重ねる必要がありオーバーヘッドがバカにならない……。なので、わざわざやるメリットも全くありません。

一応、ReactiveOAuthのOAuthClientは、そこも非同期でやってますが、わざわざ頑張った意味があったかは、かなり微妙なところ。実装は Internal/AsynchronousExtensions.cs にあるので参照ください。それと、この AsynchronousExtensions.cs はReactive Extensionsで非同期処理を簡単にで言った「拡張メソッドのすゝめ」を実践したものでもあります。WebRequestはプリミティブすぎて扱い難いので、Rxに特化したうえで簡単に扱えるようにDownloadStringやUploadValueなどといったメソッドを拡張してあります。便利だと思いますので、こちらも TwitpicClient.cs と同様に、ファイルごと自由にコピペって使ってやってください。

まとめ

ReactiveOAuthを公開する目的に、「これが入り口になってRxの世界を知ってもらえると嬉しい」というのもあったのですが、WP7開発で利用してもらったりと、その目的は少しは達成出来たかもで、良かった良かった。ちょっと練りたりなかったり、未だにバグがあったり(本当にごめんなさい!)と至らない点も多いですが、今後も改善していきますのでよろしくお願いします。

MSDNの上のReactive Extensions

DevLabsを卒業し、晴れて正式にData Developer Center入りしたReactive Extensionsですが、徐々に正式リリースへ向かう準備として、ついにドキュメントがMSDN入りを果たしました。

まだPre-releaseということで工事中の部分が多いですが、これはドキドキしますね。Getting Started with Rxはどんなところで使えるかの説明とダウンロード先について、Using Rxは詳細なドキュメント(まだ工事中項目も幾つか、でもかなり充実している感)、Reactive Extensions Class Libraryはリファレンス。必要十分には揃っていますね、あとは大きめのサンプルも欲しいところだけど、追加されるかな?

ところで、ツリー階層を見てください。

.NET Development直下です。何て素晴らしい位置にあるのか!Entity FrameworkやSilverlight などと同列に並べられているのを見ると、ついに始まったな、と感無量です。

その他のRx

Rx本体が順風満帆なのに比べると、RxJSとIx(EnumerableEx)はどうしちゃったのでしょう……。まず、RxJSは、やる気は一応あるみたいです。ダウンロードも可能ですしね。APIは随分更新されていなくて、.NET版Rxとかなり差が開いてしまいましたが(バグも幾つか見つかっているのですが)。今はRxチーム自体がドキュメント周りで忙しいということで手を付けてられないっぽく、ゆっくり待つしかないです。プロジェクト自体は死んでいないそうなので、そこだけは安心してもいいかと。(私は内心不安ですが!)

Ixのほうは、一旦死亡です。ダウンロードセンターはもとより、NuGetからも、もう手に入らなくなりました。が、しかし、完全終了ではなく、一旦引っ込めて練りなおして、Rxとは別枠となるかもですが提供する意思はあるそうで。「Don't worry. Ix is not disappearing forever.」とのこと。そこで、フィードバックが欲しいとのことなので、答えてあげるといいんじゃないかしら。Asking your input - Interactive Extensions functionality。私も好きなメソッドと嫌いなもの(XxxEnumerableがIntelliSenseを汚染して嫌いだった)、それとドサクサに紛れて再帰/ツリー探索系のメソッドを入れてよ!とリクエストしておきました。

NuGet

6月頭にバージョンが少し上がって、現在はStableがv1.0.10605、Experimentalがv1.1.10605が最新バージョンとなっています。そしてNuGetのパッケージ名も少し変化して、Stable版はRx-MainなどプリフィックスがRx-のもの、ExperimentalはRx_Experimental-MainなどプリフィックスがRx_Experimental-と完全に別れて提供されるようになりました。

と、まあ

そんなところです。最近は私もこうして、更新されたよー、と言うだけで、技術的な情報提供を怠っていてすみませんすみませんなので、徐々に再開していきたいと思います。Schedulerの話とか非常に重要なのに、このブログでは一度も書いていないですしね、全くもってイクない。こないだのセッションReactive Extensionsで非同期処理を簡単には非同期中心だったので、イベントサイドについての話もしっかりまとめたいなあ、とかも思いつつ。

linq.js入門記事を書きました

お話をいただき、@ITの.NET TIPSにlinq.jsの入門記事を、二週に渡り書きました。

このサイトでやると、アレもコレもとダラダラと書いてしまって分かりづらくなっていたのですが、記事では文字数制限などのお陰で構成がすっきり、校正してもらったお陰で文章の揺れもなく。つまるところ、ほとんど編集で助けてもらったというだけで、本当にありがとうございました。サンプルコードは、コードを見ただけで伝わるよう単純に、でもlinq.jsの威力を伝えなければならないので多少の複雑さは持たなければならない。などと思い結構悩んで作りました。

お陰様で反響も結構良かったみたいでなによりです。実績も良く分からない外部ライブラリは導入できない…… という方も、@ITに載ってるから大丈夫だよ!を説得材料(?)にできるのではないでしょうか。これを機に、是非試してみてください。

ところで幾つかの話。

メソッド名大文字

失敗した。かな……。特にTojQuery()でjQueryと見かけ上(あくまで見かけだけなんですが)シームレスに繋がっていると違和感が結構あります。以前に、jQueryとLINQの世界が視覚上切れて見えるから、むしろイイぐらいなんです、とか言ってましたが勿論ただの強がりです。この辺は今更変えにくいところで、どうしたものかな、と悩んでいるところです。

パフォーマンスについて

気になりますよね?ベンチマーク的に言えば、遅い。遅延評価の実現や豊富な機能は、速度を相当犠牲にしています。では、その遅さが許容できるほどか無視できないか。これは、卑怯な逃げ口になってしまいますが、状況次第。少なくとも普通のサイトでは問題ないレベルだと思いますし、また、ブラウザのJSはどんどん速くなっていってます。Chrome/Fx4/IE9の速さ!「許容できる」の範囲はどんどん広がっていってるのでは、と。

こういった話はC#でもそうです。LINQはベタforループより確実に遅い、が、じゃあベタforループで書くかといったら、よほどエクストリームに速度を求める場合以外は書きはしません。

Pentium3でWindows XPでIE6な奴らにも配慮する!というのも確かに美学なのですが、そうではない方向も見ないと、素敵な未来はやってこないのではないかな、って。21世紀にもなるのにループアンローリングで高速化!とかいう記事ばかりが踊る世界なんて悲しいじゃないですか。

モバイル機器のことも考えなければならないし、HTML5も控え、JavaScriptで高負荷な処理をすることも少なくないので、まだまだ時代は追いついていないけれど。それでも、私はもう少し先の未来を見ていたい……。JavaScriptがこれから先、本当に一級の言語となっていくのなら尚の事です。

Reactive ExtensionsとSQLの非同期実行

DbExecutorのパフォーマンスが十分トップクラスであることは、こないだの計測で分かりました。では次のステップはどこへ向かおう。IL生成を頑張っても、もうほんの少ししか稼げる余地は残ってない。ならば、もっと根本的なところから行こう。ええ、非同期IOを。DbExecutorは一応、拡張性を考慮してあるので継承して非同期対応しましょう。

……ところで、この記事はAsyncDbExecutorの作り方、みたいになっていますが、読み取ってほしいのは「Reactive Extensionsの使い方」です。AsyncDbExecutorは、あくまでRxの利用法のサンプルにすぎません。DbExecutorなんて使わないしー、とか思わず、その辺を念頭において眺めてみてください。

BeginExecuteReader/EndExecuteReader

SqlServerならばSqlCommand.BeginExecuteReaderメソッドが使えます。IDbCommandにはないので、対象はSqlServer特化ということになってしまいますが、まあ、それはしょうがない(非同期対応してないDBもあるわけだし)。対応させる方法ですが、まずはDbExecutorを継承してコンストラクタを弄ります。

public class AsyncDbExecutor : DbExecutor
{
    public AsyncDbExecutor(string connectionString)
        : base(new SqlConnection(connectionString))
    { }

    public AsyncDbExecutor(SqlConnection connection)
        : base(connection)
    { }

    public AsyncDbExecutor(string connectionString, IsolationLevel isolationLevel)
        : base(new SqlConnection(connectionString), isolationLevel)
    { }

    public AsyncDbExecutor(SqlConnection connection, IsolationLevel isolationLevel)
        : base(connection, isolationLevel)
    { }
}

通常のDbExecutorはIDbConnectionを受け入れるようにしていましたが、今回はSqlServer特化なのでSqlConnectionで。また、利便性を考えて生の接続文字列からも作れるようにしてやりました。IsolationLevelを受け入れるオーバーロードはTransaction処理する場合のためとなっています(別にTransactionScope使ってもいいですけどね)。あとは、非同期に対応するメソッドを作ってやれば良いだけ。

Reactive Extensions

BeginXxx-EndXxxがカッタルイのは分かりきっているので、当然のようにRxを持ち出します。

// 引数が横に長いですがほとんど省略可能なので……
public IObservable<SqlDataReader> ExecuteReaderAsyncRaw(string query, object parameter = null, CommandType commandType = CommandType.Text, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    var cmd = (SqlCommand)this.PrepareExecute(query, commandType, parameter);
    return Observable.FromAsyncPattern<SqlDataReader>(
            (ac, o) => cmd.BeginExecuteReader(ac, o, commandBehavior), cmd.EndExecuteReader)
        .Invoke()
        .Finally(() => cmd.Dispose());
}

this.PrepareExecuteですが、これはDbExecutorのprotectedメソッドで、クエリ文字列や匿名型で渡すパラメータからIDbCommandを生成します。こんなこともあろうかとちゃんとprotectedにしておいて良かった。そしてキャストですが、今回はconnectionは必ずSqlConnectionで(そうなるようコンストラクタを調整してある)、戻り値がSqlCommandであることが保証されているためアップキャストしてやります。そうしないと非同期APIが使えないですから。

あとは、いつものように(?)FromAsyncPatternして、それとFinallyでcommandをDisposeしてやることを忘れないと気が効いてるかもり。利用する時は

// async=trueを忘れずに
var connstr = @"Data Source=.;async=true;Initial Catalog=master;Integrated Security=True;";

var executor = new AsyncDbExecutor(connstr);
executor.ExecuteReaderAsyncRaw("select * from sys.tables")
    .Finally(() => executor.Dispose()) // 接続のDisposeも忘れずに...
    .Subscribe(dr =>
    {
        while (dr.Read())
        {
            Console.WriteLine(dr.GetValue(0));
        }
    });

といった感じです。非同期APIはDisposeをどう仕込めばいいのかが悩ましいのですが、Rxなら簡単です、Finallyに突っ込めばいいだけ。同期APIでusingで囲むのと同じ感覚で、RxではFinallyに置いてください。非同期実行ということで、複数同時に走らせることも少なくないと思うのですが、その場合は全ての完了を自然に扱えるよう、結合して一本の流れにしてやる必要はあるでしょう。恐らくきっと。

そういえば忘れてはいけないのは、非同期APIを使う場合は接続文字列にasync=trueが必要です。これについてはADO.NET 2.0 における非同期コマンド実行によると

非同期コマンドを使用するためには、コマンドが実行される接続は、接続文字列を async=true と指定して初期化する必要があります。 非同期メソッドが接続文字列に async=true と指定されていない接続を使用するコマンドで呼び出されると、例外がスローされます。

所定の接続オブジェクトで同期コマンドのみを使用するとわかっている場合は、接続文字列に async キーワードを指定しないか、false に設定することをお勧めします。 非同期操作が有効になっている接続で同期操作を実行すると、リソースの利用率は著しく増大します。

同期 API と非同期 API の両方が必要な場合は、可能であれば別々の接続を使用することをお勧めします。 これが不可能であれば、async=true を指定して開かれた接続で同期メソッドを使用することもできます。この場合、通常どおりに動作しますが、パフォーマンスは若干劣化します。

とのことなので、少し気をつけたほうがいいかもしれません。まあ、Rx愛好家なら全てRxで非同期でやるに決まっているので(?)、別にasync=trueでも怖くありません。

SelectMany

しかし、生のDataReaderをwhileで回すとか、ダサ……。せっかくのRxなのだから値もPushで送ればいいぢゃない。

IEnumerable<IDataRecord> EnumerateSqlDataReader(SqlDataReader reader)
{
    using (reader)
    {
        // Closeされるタイミングがコントロール出来ないので、IsClosedのチェックは必須
        while (!reader.IsClosed && reader.Read())
        {
            yield return reader;
        }
    }
}

// 前に定義したAsyncRawを呼んでSelectManyするだけ
public IObservable<IDataRecord> ExecuteReaderAsync(string query, object parameter = null, CommandType commandType = CommandType.Text, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return ExecuteReaderAsyncRaw(query, parameter, commandType, commandBehavior)
        .SelectMany(dr => EnumerateSqlDataReader(dr));
}

// 使うときはこんな感じ
var executor = new AsyncDbExecutor(connstr);
executor.ExecuteReaderAsync("select * from sys.tables")
    .Select(dr => new
    {
        Name = dr.GetString(0),
        ObjectId = dr.GetInt32(1)
    })
    .Finally(() => executor.Dispose())
    .Subscribe(Console.WriteLine);

実にLINQっぽく自然になりました。yield returnを使ったのは、IObservableのSelectManyはIEnumerableも受け入れて平らにしてくれるからです。これ、Observable.UsingとかObservable.Generateを使って、Rxだけで頑張ることも可能ではあるのですが、面倒くさいしゴチャゴチャします。なので、yield returnが使えるなら、使ってしまったほうが楽。この辺はIEnumerable<T>を生成するためのyield returnがあるように、IObservable<T>を生成するためのコンパイラサポートが欲しいところ。awaitが乗ればTaskからIObservableへの変換(は基本的に容易)なので、ある程度可能になるのかなあ、と思いつつ、謂わばAsyncEnumerableになることの難しさもあるので今のところ何とも言えません。

ExecuteReaderAsyncがあればExecuteReaderAsyncRawいらないぢゃん!って感じですけれど、パフォーマンスのために非同期にするのに、SelectManyとかオーバーヘッドがあるのも嫌かなあ、と思う場合もあるかもなので、生のSqlDataReaderを返すものも残してあげるのもいいかな、とか思ったりはするところ。EnumerateSqlDataReaderのような拡張メソッドを別途定義してやれば、生のSqlDataReaderも、そう扱いが面倒というわけでもないですしね。いや、どうだろう……。

次バージョン

この調子でExecuteNonQueryやSelectなども書いていけば完成です。って、そういえばアクセサの動的生成&キャッシュの部分はinternalで外から触れない(せいぜいPrepareExecuteだけ)からSelectは書けないぢゃん。うげげ。うーん、publicにするのもどうかと思うので、InternalVisibleToで対処しようかなあ。そんなわけで、このAsyncDbExecutorは今はまだアイディア段階で内容を詰めてませんが、もっとブラッシュアップさせて、次のDbExecutorの更新時に含めたいと思っています。Rxが必要という都合もあるので、本体とは別DLLで。勿論、NuGet対応で依存解決でインストール楽々、です。

ところで本当に速いの?

うん、分かりません。ADO.NET 2.0 における非同期コマンド実行によれば

ADO.NET/SqlClient の非同期コマンド実行サポートは、実際に、本当の意味での非同期ネットワーク I/O (共有メモリの場合は非ブロッキングのシグナル通知) を基礎としています。 ご要望が多ければ、いずれ内部実装について文書にしたいと思います。 ここでは、"真の非同期" を行っており、特定の I/O 操作が終わるまで待機しているブロックされたバックグラウンドのスレッドは存在しない、と申し上げておきます。Windows 2000/XP/2003 オペレーティング システムのオーバーラップ I/O 機能と I/O 完了ポートの機能を利用し、単一スレッド (または少数スレッド) によって、所定のプロセスに対する未処理の要求をすべて処理することを可能にしています。

というわけで、まあ速いんじゃないかねえ、とは思うんですが、計らないことには分かりません。ベンチマークはAsyncDbExecutorのリリース時に、ちゃんと計ってみたいと思います。とりあえず非同期IOで万歳なNode.jsに負けてられませんからね(謎)。というのはともかく、この辺は以前にmono meetingでAzure Tableのパフォーマンスの話を聞いた時→資料:20110126 azure table in mono meeting全くサッパリだったこともあるので、ちゃんと調べたいとずっと思っていたのです(が、IOCPとかネイティブな話はさっぱり&データベースの挙動の中身も全然なので、あくまで.NETな上層のほうで…… いずれは何とかしたいのですけれど、手一杯でどうにも)。

やっぱ非同期が必須なのはサーバーサイドの話になるのですかねえ。IHttpAsyncHandlerとRx、とかそのうち書きたいのですが、そもそも私はASP.NETあんま分かりませんですよというところから始める必要があり。調べたいことがありすぎて積みタスクの山にうもれて完全死亡中。

デフォルトExecutor

最後に話は変わりますが、new DbExecutorするのに毎回コンストラクタにSqlConnection渡すのがダルいし不自然!という場合は、これまた継承して接続文字列が固定されたExecutorを用意すると良いです。

public class HogeExecutor : DbExecutor
{
    public HogeExecutor()
        : base(new SqlConnection("Data Source=hogehoge;"))
    { }

    public HogeExecutor(IsolationLevel isolationLevel)
        : base(new SqlConnection("Data Source=hogehoge;"), isolationLevel)
    { }
}

中々悪くないのではないでしょーか。残念ながらStaticメソッドのほうは使えませんが。Staticメソッドのほうは、どうも上手いやり方が考えつかなくて色々と保留中。考えてはいるのですが。

C#のMicro-ORM(Dapper, Massive, PetaPoco)について

最近、巷という名の極一部で話題になっているMicro-ORMという分野。何それ?うーん、MicroなORマッパーです。まんまですが。Entity FrameworkがUltra Hugeだとしたら、その対極にあるような。定義としては「SQL本文は手書き、マッピングは自動」だけの機能を持つもの、といったところでしょうか。Microであるために「1ファイルのみ」を入れても良さそう。少なくとも、用語の発生元っぽいDapper, Massive, PetaPocoは1ファイルのみです。

他に、シンプルめなORマッパーを志している感じなのはSimple.Dataや、そしてORマッパーじゃなくてただの実行機です!と言い張る私の作っているDbExecutorなどもMicro-ORMに入れちゃってもいいかしら?Simple.Dataは名前に反してあまりシンプルではない(MEF使ってる……)感じですが。DbExecutorは十分Microです!id:taediumさんの作られているSomaも比較的シンプルめなものに入るかしら……?

この中で注目株はDapperで、Stack Overflowのエンジニアがパフォーマンスの問題を解消するために作成したもの、ということで、超巨大サイトでの利用で鍛えられているというのは何よりも信頼への担保がある。そして、この手のO/Rマッパーにはかかせない動的コード生成によるパフォーマンス向上に関しては、パフォーマンスのエキスパート中のエキスパートであるprotobuf-netの作者 Marc Gravell氏がIL生成部分を担当しているという、凄いコンビでそりゃ色々と叶わないね、という感。

パフォーマンステスト

Dapperは親切にもベンチマークプログラムも公開しているので、リポジトリにあるもの+DbExecutor、それとDataTableを追加して計測してみました。

hand coded took 53ms
Mapper Query (buffered) took 55ms
Dynamic Mapper Query (non-buffered) took 55ms
Mapper Query (non-buffered) took 56ms
Dynamic Mapper Query (buffered) took 56ms
Dapper.Cotrib took 56ms
PetaPoco (Fast) took 58ms
DbExecutor Select took 58ms
DbExecutor ExecuteReader(Hand Coded) took 58ms Dynamic Massive ORM Query took 62ms
PetaPoco (Normal) took 62ms
DbExecutor SelectDynamic took 62ms
DbExecutor ExecuteReaderDynamic(Hand Coded) 65ms
DataTable took 83ms
BLToolkit took 85ms
Simple.Data took 90ms
Linq 2 SQL Compiled took 100ms
SubSonic Coding Horror took 114ms
Entity framework CompiledQuery took 119ms
NHibernate SQL took 127ms
NHibernate HQL took 149ms
Soma took 168ms
NHibernate Criteria took 191ms
Linq 2 SQL ExecuteQuery took 215ms
Linq 2 SQL took 671ms
NHibernate LINQ took 708ms
Entity framework ExecuteStoreQuery took 726ms
Entity framework ESQL took 728ms
Entity framework No Tracking took 966ms
Entity framework took 969ms
SubSonic ActiveRecord.SingleOrDefault took 4259ms

hand codedがExecuteReaderを手で回した手書き、「Mapper Query」はDapperのことです。複数種類があるのはオプション違い。DbExecutor(太字にしています)も同様に4種類で測っています。

結果ですが、勿論Dapperは速いんですが、DbExecutorのSelectも悪くない位置にある。というか、これは普通に高速と名乗っていいレベルの速度は出てる。というか上位陣はほとんど誤差でいいんじゃないですかというところですね、実際何回か測ると若干入れ替わったりしますし。500回のループで3ms遅くて低速とか言われたら怒ります(笑)

ところでDbExecutorのDynamic類が十分すぎるほど速いのは少し驚いたり(dynamic経由だからもっとずっと遅くなるのかと思ってた……)。この計測結果を前にすると、手動マッピングするならExecuteReaderはダルいからExecuteReaderDynamic使いますねー、型変換とか不要でずっとシンプルに書けますから。Selectが使えるシーンではSelectで、柔軟なマッピングをする必要があるシーンではExecuteReaderDynamicで、というのがDbExecutorを使う場合の幸せシナリオになりそう。また、ExpandoObjectをDataTableのRowの代わりとして使うSelectDynamicも十分な速度が出ていて、これぐらい速度出るなら普通に使っちゃえますねえ。非常に良い感じ。

何故速いのか、あるいは何故遅いのか

動的コード生成しているから速い。といっても、真ん中ぐらいより上のコードはみんなやっているのではないかしら。勿論、DbExecutorも生成しています。では何でDbExecutorは大体の場合において僅差とはいえDapperより遅いのか。これはコード生成する範囲の問題です。Dapperはクエリ用に最適化してガッツシ固めて生成・キャッシュしてますが、DbExecutorは汎用的に、Typeに対して行っているので、そこで若干の差が出ています。

最初Dapperのコード見たときは、うわあ、これは凄い差がついちゃってるのでは?とか思ったんですが、蓋を開けてみると、誤差みたいに小さな差でたいしたことなかったので、このまんまで行きます(それと言い訳がましいですがCode Contractとかのハンデも若干あるので、どの程度響いているかは不明瞭ですが)。どちらにせよ、ようするところ、DBへのアクセス速度に比べれば、その程度のチューニングは大して差が出ないということでしょう。

動的コード生成も、ILでゴリゴリじゃなくてExpressionTreeを使ったゆとり全開の生成なので、それで上位にガッツリ肉薄しているのだから、十分以上です。

まとめ

DbExecutorは普通に速い。他のと比べると気が効いていて、かつ洗練されたAPIを持っていて、非常に使いやすいので、Dapperなどと比べても負けてない。Micro-ORMの最前線で全然戦えます(但し1ファイルではないけれど!)。次のアップデートでは、クエリ時の戻りが複数になる場合の対応と、ストアドプロシージャのOUTPUTの対応を予定してますので是非是非お試しを。

と、宣伝がましくなってて申し訳ないですね……。Micro-ORM自体については、まず、Entity Frameworkなど普通のORMを使わないようなら、必需品だと思います。完全手作業でデータベース触るのは馬鹿げてる。何らかの薄いラッパーぐらいは作っているだろうけれど、Dapperよりも優れていると確信持てなければ(そして多くの場合は持てるわけがない!)、そんなものは捨ててDapperを使ったほうがいいのではないかしら。いや、DbExecutorでもいいですけどね、というかDbExecutorは良いですよ。

Entity Frameworkなどを使っている場合はどうか、というと、重量級フレームワークの欠けた部分を補ってやる感じで、良い感じに使えそうです。その辺の小回りの良さ、大きめなものと一緒に使っても上手く馴染むのはMicro-ORMならではなのではかと思います。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive