Reactive ExtensionsとSQLの非同期実行

DbExecutorのパフォーマンスが十分トップクラスであることは、こないだの計測で分かりました。では次のステップはどこへ向かおう。IL生成を頑張っても、もうほんの少ししか稼げる余地は残ってない。ならば、もっと根本的なところから行こう。ええ、非同期IOを。DbExecutorは一応、拡張性を考慮してあるので継承して非同期対応しましょう。

……ところで、この記事はAsyncDbExecutorの作り方、みたいになっていますが、読み取ってほしいのは「Reactive Extensionsの使い方」です。AsyncDbExecutorは、あくまでRxの利用法のサンプルにすぎません。DbExecutorなんて使わないしー、とか思わず、その辺を念頭において眺めてみてください。

BeginExecuteReader/EndExecuteReader

SqlServerならばSqlCommand.BeginExecuteReaderメソッドが使えます。IDbCommandにはないので、対象はSqlServer特化ということになってしまいますが、まあ、それはしょうがない(非同期対応してないDBもあるわけだし)。対応させる方法ですが、まずはDbExecutorを継承してコンストラクタを弄ります。

public class AsyncDbExecutor : DbExecutor
{
    public AsyncDbExecutor(string connectionString)
        : base(new SqlConnection(connectionString))
    { }

    public AsyncDbExecutor(SqlConnection connection)
        : base(connection)
    { }

    public AsyncDbExecutor(string connectionString, IsolationLevel isolationLevel)
        : base(new SqlConnection(connectionString), isolationLevel)
    { }

    public AsyncDbExecutor(SqlConnection connection, IsolationLevel isolationLevel)
        : base(connection, isolationLevel)
    { }
}

通常のDbExecutorはIDbConnectionを受け入れるようにしていましたが、今回はSqlServer特化なのでSqlConnectionで。また、利便性を考えて生の接続文字列からも作れるようにしてやりました。IsolationLevelを受け入れるオーバーロードはTransaction処理する場合のためとなっています(別にTransactionScope使ってもいいですけどね)。あとは、非同期に対応するメソッドを作ってやれば良いだけ。

Reactive Extensions

BeginXxx-EndXxxがカッタルイのは分かりきっているので、当然のようにRxを持ち出します。

// 引数が横に長いですがほとんど省略可能なので……
public IObservable<SqlDataReader> ExecuteReaderAsyncRaw(string query, object parameter = null, CommandType commandType = CommandType.Text, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    var cmd = (SqlCommand)this.PrepareExecute(query, commandType, parameter);
    return Observable.FromAsyncPattern<SqlDataReader>(
            (ac, o) => cmd.BeginExecuteReader(ac, o, commandBehavior), cmd.EndExecuteReader)
        .Invoke()
        .Finally(() => cmd.Dispose());
}

this.PrepareExecuteですが、これはDbExecutorのprotectedメソッドで、クエリ文字列や匿名型で渡すパラメータからIDbCommandを生成します。こんなこともあろうかとちゃんとprotectedにしておいて良かった。そしてキャストですが、今回はconnectionは必ずSqlConnectionで(そうなるようコンストラクタを調整してある)、戻り値がSqlCommandであることが保証されているためアップキャストしてやります。そうしないと非同期APIが使えないですから。

あとは、いつものように(?)FromAsyncPatternして、それとFinallyでcommandをDisposeしてやることを忘れないと気が効いてるかもり。利用する時は

// async=trueを忘れずに
var connstr = @"Data Source=.;async=true;Initial Catalog=master;Integrated Security=True;";

var executor = new AsyncDbExecutor(connstr);
executor.ExecuteReaderAsyncRaw("select * from sys.tables")
    .Finally(() => executor.Dispose()) // 接続のDisposeも忘れずに...
    .Subscribe(dr =>
    {
        while (dr.Read())
        {
            Console.WriteLine(dr.GetValue(0));
        }
    });

といった感じです。非同期APIはDisposeをどう仕込めばいいのかが悩ましいのですが、Rxなら簡単です、Finallyに突っ込めばいいだけ。同期APIでusingで囲むのと同じ感覚で、RxではFinallyに置いてください。非同期実行ということで、複数同時に走らせることも少なくないと思うのですが、その場合は全ての完了を自然に扱えるよう、結合して一本の流れにしてやる必要はあるでしょう。恐らくきっと。

そういえば忘れてはいけないのは、非同期APIを使う場合は接続文字列にasync=trueが必要です。これについてはADO.NET 2.0 における非同期コマンド実行によると

非同期コマンドを使用するためには、コマンドが実行される接続は、接続文字列を async=true と指定して初期化する必要があります。 非同期メソッドが接続文字列に async=true と指定されていない接続を使用するコマンドで呼び出されると、例外がスローされます。

所定の接続オブジェクトで同期コマンドのみを使用するとわかっている場合は、接続文字列に async キーワードを指定しないか、false に設定することをお勧めします。 非同期操作が有効になっている接続で同期操作を実行すると、リソースの利用率は著しく増大します。

同期 API と非同期 API の両方が必要な場合は、可能であれば別々の接続を使用することをお勧めします。 これが不可能であれば、async=true を指定して開かれた接続で同期メソッドを使用することもできます。この場合、通常どおりに動作しますが、パフォーマンスは若干劣化します。

とのことなので、少し気をつけたほうがいいかもしれません。まあ、Rx愛好家なら全てRxで非同期でやるに決まっているので(?)、別にasync=trueでも怖くありません。

SelectMany

しかし、生のDataReaderをwhileで回すとか、ダサ……。せっかくのRxなのだから値もPushで送ればいいぢゃない。

IEnumerable<IDataRecord> EnumerateSqlDataReader(SqlDataReader reader)
{
    using (reader)
    {
        // Closeされるタイミングがコントロール出来ないので、IsClosedのチェックは必須
        while (!reader.IsClosed && reader.Read())
        {
            yield return reader;
        }
    }
}

// 前に定義したAsyncRawを呼んでSelectManyするだけ
public IObservable<IDataRecord> ExecuteReaderAsync(string query, object parameter = null, CommandType commandType = CommandType.Text, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return ExecuteReaderAsyncRaw(query, parameter, commandType, commandBehavior)
        .SelectMany(dr => EnumerateSqlDataReader(dr));
}

// 使うときはこんな感じ
var executor = new AsyncDbExecutor(connstr);
executor.ExecuteReaderAsync("select * from sys.tables")
    .Select(dr => new
    {
        Name = dr.GetString(0),
        ObjectId = dr.GetInt32(1)
    })
    .Finally(() => executor.Dispose())
    .Subscribe(Console.WriteLine);

実にLINQっぽく自然になりました。yield returnを使ったのは、IObservableのSelectManyはIEnumerableも受け入れて平らにしてくれるからです。これ、Observable.UsingとかObservable.Generateを使って、Rxだけで頑張ることも可能ではあるのですが、面倒くさいしゴチャゴチャします。なので、yield returnが使えるなら、使ってしまったほうが楽。この辺はIEnumerable<T>を生成するためのyield returnがあるように、IObservable<T>を生成するためのコンパイラサポートが欲しいところ。awaitが乗ればTaskからIObservableへの変換(は基本的に容易)なので、ある程度可能になるのかなあ、と思いつつ、謂わばAsyncEnumerableになることの難しさもあるので今のところ何とも言えません。

ExecuteReaderAsyncがあればExecuteReaderAsyncRawいらないぢゃん!って感じですけれど、パフォーマンスのために非同期にするのに、SelectManyとかオーバーヘッドがあるのも嫌かなあ、と思う場合もあるかもなので、生のSqlDataReaderを返すものも残してあげるのもいいかな、とか思ったりはするところ。EnumerateSqlDataReaderのような拡張メソッドを別途定義してやれば、生のSqlDataReaderも、そう扱いが面倒というわけでもないですしね。いや、どうだろう……。

次バージョン

この調子でExecuteNonQueryやSelectなども書いていけば完成です。って、そういえばアクセサの動的生成&キャッシュの部分はinternalで外から触れない(せいぜいPrepareExecuteだけ)からSelectは書けないぢゃん。うげげ。うーん、publicにするのもどうかと思うので、InternalVisibleToで対処しようかなあ。そんなわけで、このAsyncDbExecutorは今はまだアイディア段階で内容を詰めてませんが、もっとブラッシュアップさせて、次のDbExecutorの更新時に含めたいと思っています。Rxが必要という都合もあるので、本体とは別DLLで。勿論、NuGet対応で依存解決でインストール楽々、です。

ところで本当に速いの?

うん、分かりません。ADO.NET 2.0 における非同期コマンド実行によれば

ADO.NET/SqlClient の非同期コマンド実行サポートは、実際に、本当の意味での非同期ネットワーク I/O (共有メモリの場合は非ブロッキングのシグナル通知) を基礎としています。 ご要望が多ければ、いずれ内部実装について文書にしたいと思います。 ここでは、"真の非同期" を行っており、特定の I/O 操作が終わるまで待機しているブロックされたバックグラウンドのスレッドは存在しない、と申し上げておきます。Windows 2000/XP/2003 オペレーティング システムのオーバーラップ I/O 機能と I/O 完了ポートの機能を利用し、単一スレッド (または少数スレッド) によって、所定のプロセスに対する未処理の要求をすべて処理することを可能にしています。

というわけで、まあ速いんじゃないかねえ、とは思うんですが、計らないことには分かりません。ベンチマークはAsyncDbExecutorのリリース時に、ちゃんと計ってみたいと思います。とりあえず非同期IOで万歳なNode.jsに負けてられませんからね(謎)。というのはともかく、この辺は以前にmono meetingでAzure Tableのパフォーマンスの話を聞いた時→資料:20110126 azure table in mono meeting全くサッパリだったこともあるので、ちゃんと調べたいとずっと思っていたのです(が、IOCPとかネイティブな話はさっぱり&データベースの挙動の中身も全然なので、あくまで.NETな上層のほうで…… いずれは何とかしたいのですけれど、手一杯でどうにも)。

やっぱ非同期が必須なのはサーバーサイドの話になるのですかねえ。IHttpAsyncHandlerとRx、とかそのうち書きたいのですが、そもそも私はASP.NETあんま分かりませんですよというところから始める必要があり。調べたいことがありすぎて積みタスクの山にうもれて完全死亡中。

デフォルトExecutor

最後に話は変わりますが、new DbExecutorするのに毎回コンストラクタにSqlConnection渡すのがダルいし不自然!という場合は、これまた継承して接続文字列が固定されたExecutorを用意すると良いです。

public class HogeExecutor : DbExecutor
{
    public HogeExecutor()
        : base(new SqlConnection("Data Source=hogehoge;"))
    { }

    public HogeExecutor(IsolationLevel isolationLevel)
        : base(new SqlConnection("Data Source=hogehoge;"), isolationLevel)
    { }
}

中々悪くないのではないでしょーか。残念ながらStaticメソッドのほうは使えませんが。Staticメソッドのほうは、どうも上手いやり方が考えつかなくて色々と保留中。考えてはいるのですが。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive