複数の値とC# 5.0 Async再び、或いはAsyncEnumerableへの渇望とRx

以前にReactive Extensions + asyncによるC#5.0の非同期処理では、単体の値であったらasync、複数の値であったらIObservable<T>が使い分け、とかかんとかと言ってましたが、本当にそうなの?もしくは、そもそも複数の値のシチュエーションって分かるような分からないようななのだけど?などなどと思ったりする昨今を如何様にお過ごしでしょうか。というわけで、今回はグッとこの部分に深く迫ってみましょう。

同期的なシチュエーション

さて、例、なのですけれど、データベースで行きましょう。生DataReaderを転がしてます。

// 接続文字列に Asynchronous Processing=true は非同期でやるなら欠かさずに
const string ConnectionString = @"Data Source=.;Initial Catalog=AdventureWorks2012;Integrated Security=True;Asynchronous Processing=true;";

// こういうreader.Readが尽きるまで列挙するだけのヘルパーがあるだけで
static IEnumerable<IDataRecord> EachReader(IDbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) connection.Open();

        using (var reader = command.ExecuteReader())
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}

static void Main(string[] args)
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // LINQでSelectとかいろいろ出来る!便利!抱いて!
        var result = EachReader(conn, "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .ToArray();

        // PKでとりたければFirstOrDefualtとかでいいわけです
        var customer = EachReader(conn, "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetInt32(0),
                PersonID = !x.IsDBNull(1) ? (int?)x.GetValue(1) : null,
                StoreID = !x.IsDBNull(2) ? (int?)x.GetValue(2) : null,
                TerritoryID = !x.IsDBNull(3) ? (int?)x.GetValue(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetString(4) : null
            })
            .FirstOrDefault();
    }
}

ExecuteReaderの結果のIDataRederは、yield returnで列挙してやると、LINQで加工できるようになるので、SelectしてToArrayとか、SelectしてFirstOrDefaultとか、非常にやりやすくて便利なわけです。

ここまでは、いいと思います。じゃあ、非同期でやると、どうするの、と。

内部イテレータ的に考える

.NET Framework 4.5からは主要な非同期メソッドに全てXxxAsyncという名前のものがつきました。ADO.NETにおいては、OpenAsyncやExecuteReaderAyncなどがあります。というわけで、試してみましょう。

// 非同期だったこうしたい、でもこれはコンパイル通らない!
static async Task<IEnumerable<DbDataReader>> EachReaderAsync(DbConnection connection, string query)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();

        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) yield return reader;
        }
    }
}

残念ながら、asyncとyield returnを共存させることはできないので、どうにもなりません。……はい。しかし出来ない、では困る。さすがに、非同期実行するところに全部生のままで、こんなCreateCommand..., ExecuteReaderAsync, .... なんて書いてられないし。

じゃあどうするか、というと、内部イテレータ的にしましょう。つまりList<T>にあるようなForEachです。yield returnが外部イテレータ的であり、それが無理なら、内部イテレータ的にすればいいぢゃない。

// ループを回してる最中に実行するaction引数をつけた(FuncじゃなくてAction<DbDataReader>のオーバーロードも作るとベター)
static async Task ForEachAsync(DbConnection connection, string query, Func<DbDataReader, Task> action)
{
    using (var command = connection.CreateCommand())
    {
        command.CommandText = query;
        if (connection.State != ConnectionState.Open) await connection.OpenAsync();

        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess)) // 基本的にはSequentialAccessにしておきたい
        {
            while (!reader.IsClosed && reader.Read()) await action(reader);
        }
    }
}

static async Task<List<Customer>> GetCustomers()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // これで、Selectっぽくできてないこともないと言えなくもない
        var list = new List<Customer>();

        await ForEachAsync(conn, "select * from Sales.Customer", async x =>
        {
            var customer = new Customer
            {
                CustomerID = await x.GetFieldValueAsync<int>(0),
                PersonID = !x.IsDBNull(1) ? await x.GetFieldValueAsync<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? await x.GetFieldValueAsync<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? await x.GetFieldValueAsync<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? await x.GetFieldValueAsync<string>(4) : null
            };
            list.Add(customer);
        });

        return list;
    }
}

// LINQじゃないので匿名型は使えないのね
public class Customer
{
    public int CustomerID { get; set; }
    public int? PersonID { get; set; }
    public int? StoreID { get; set; }
    public int? TerritoryID { get; set; }
    public string AccountNumber { get; set; }
}

ForEachAsync!というわけで、Actionを渡してやって、そこでグルグルッとすることにより制限を回避もどき。ToArrayしたい?古き良きListにAddすればいいぢゃない、といったものですよ、ははは。LINQじゃないので匿名型は使えないがね!

さて、匿名型が使えないのはいいとしても、問題は、LINQの特徴である合成可能性を欠いているところです。一番困るのは、これ、FirstOrDefaultできないね、って。全件取るんですか?まあ、PKだったら一件なのが保証されてるし、そうでないならtop 1とでも書いておけよ、と言えなくもないですが、しかしどうなのよこれ、と。

そんなわけでForEachAsyncと、もう一つ、ExecuteSingleAsyncという名前で、一件のみを列挙するようなものを別途作る必要があります。とはいえ、それでも対応できているのは一件と全件だけ。例えばTakeWhileみたいなのがやりたい、SkipWhileみたいなのがやりたいとなったらどうするの、と。答えは、どうにもなりません。諦めるしかない。

AsyncEnumerableで救われる

どうしても諦められないのならば、AsyncEnumerableを授けましょう。NuGetからIx_Experimental-Asyncを引っ張ってきます。Ix、そう、みんな大好きReactive Extensionsの兄弟なわけですが、しかし紹介しておいてアレですが、このIx_Experimental-Asyncはお薦めはしません!完全に実験的に、できるから、というだけで実装例を掲示してみせてくれたというだけなノリがぷんぷんしているからです。実際、最初のAsync CTPが出た時に公開されて、それから更新されてませんしね……。

ともあれ、どんなコンセプトの代物なのかは見ておきましょう。

// こういうDB列挙用のIAsyncEnumerable/Enumeratorを作る。
// yield returnのようなコンパイラサポートはないので手書きするんだよ!
public class AsyncDbEnumerable : IAsyncEnumerable<DbDataReader>, IAsyncEnumerator<DbDataReader>
{
    DbConnection connection;
    DbCommand command;
    DbDataReader reader;
    string query;

    public AsyncDbEnumerable(DbConnection connection, string query)
    {
        this.connection = connection;
        this.query = query;
    }

    public IAsyncEnumerator<DbDataReader> GetEnumerator()
    {
        return this;
    }

    public DbDataReader Current
    {
        get { return reader; }
    }

    public async Task<bool> MoveNext(System.Threading.CancellationToken cancellationToken)
    {
        if (command == null)
        {
            if (connection.State != ConnectionState.Open) await connection.OpenAsync();

            command = connection.CreateCommand();
            command.CommandText = query;
            reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
        }

        if (await reader.ReadAsync())
        {
            return true;
        }
        return false;
    }

    public void Dispose()
    {
        reader.Dispose();
        command.Dispose();
    }
}

static IAsyncEnumerable<DbDataReader> EachReaderAsync(SqlConnection connection, string query)
{
    return new AsyncDbEnumerable(connection, query);
}

static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // 完全にLINQなのでSelectしてToArrayで匿名型もOK
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();

        // 勿論FirstOrDefaultもできる
        var customer = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer where CustomerID = 100")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .FirstOrDefault();
    }
}

IAsyncEnumerableの実装は完全に手作りです!真面目に使うなら、AnonymousAsyncEnumerableとかを作って、それを使って構築しますが、今回はてきとーな感じに実装しておきました。てきとーと言っても、ちゃんと動きますよ、はい。

さて、これによってIAsyncEnumerableに変換されたシロモノは、LINQのメソッドが全て使えます。おー、やったね、これなら完璧。

と、言いたいのですが、よーくSelectの中のラムダ式を見ると、ForEachAsyncの時のものと違うのが分かるでしょうか?ForEachAsyncの時はGetFieldValueAsyncといった、値取得まで非同期のものを使いました。でも、今回はそれは使ってない。何故かというと、そこでasyncにしてしまうと戻り値がIAsyncEnumerable<Task<T>>になってしまうから。

何がいけないのか、というと、例えばToArrayした結果で見るとresult[0]はTaskなんですよ。実態はresult[0].Resultとしなきゃあいけません。おまけに、全部Taskということは、実行中かもしれないわけです。じゃあ全部待てばいいのか、 await Task.WhenAll(result) とすればいいか、というと、そうなると、一つのコネクションで複数実行が走る、この場合Connectionは複数実行は許容されていないので、まあ例外が飛んできてしまうでしょう。

じゃあasyncは諦めるの?というと、幾つか手はある。ひとつはAwaitとかいうような拡張メソッドを作って、IAsyncEnumerable<Task<T>> から IAsyncEnumerable<T> に戻すようなものを作ればいい。作るのは、まあ、難しくはないのだけど結局yield returnがないので完全手書きしなきゃならないので面倒なので例は割愛。

もしくはSelectなどのselectorに、ラムダ式がasyncで書かれている場合(戻り値がTaskになっている場合)はawaitするようなオーバーロードがあっても良かったと思うのですがねえ。んー、まあ、それはさすがに大きなお世話っぽいからダメか……。

もしくは、ToArrayなどで外に出すのではなく、ForEachAsyncというメソッドが用意されているので、それを使ってawaitするか。こうなると内部イテレータとやってること同じになってきますが。

とはいえともかく、例に出したAwaitメソッドのような、そういうのがないと、実用性に欠けると言わざるを得ません。また、これはあくまでもAsyncEnumerableであって、IEnumerableへのLINQ to Objectsとは別物です。Select, Where, Firstなどは挙動が同じというだけで、中身は全然違います。で、当然ながらコンセプト実装なので、内部的にもあまりこなれてませんね。なので、Ix_Experimental-Asyncは所詮はコンセプト実装であり、面白いとは思いますし満喫はしましたが、実世的なものとは言いがたいと結論づけます。

IObservable<T>の中へ

と、ここまで見てきたので、最後はRxで〆ましょう。ちなみにRxはバージョンが幾つかありますが、私としてはRx 2.0-RCしか使う気はありません。正直、1.0系とは雲泥の差ですからねえ。

static IObservable<DbDataReader> EachReaderAsync(DbConnection connection, string query)
{
    // CreateAsyncで作る。OnErrorなどは手書きですが、十分に書きやすい
    return Observable.CreateAsync<DbDataReader>(async observer =>
    {
        try
        {
            using (var command = connection.CreateCommand())
            {
                command.CommandText = query;
                if (connection.State != ConnectionState.Open) await connection.OpenAsync();

                using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                {
                    while (!reader.IsClosed && reader.Read()) observer.OnNext(reader);
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return;
        }
        observer.OnCompleted();
    });
}

static async Task Test()
{
    using (var conn = new SqlConnection(ConnectionString))
    {
        // Rxなので完全にLINQ、awaitableなのでToArrayをawaitしたりも出来るのでノリは完全に一緒
        var result = await EachReaderAsync(new SqlConnection(ConnectionString), "select * from Sales.Customer")
            .Select(x => new
            {
                CustomerID = x.GetFieldValue<int>(0),
                PersonID = !x.IsDBNull(1) ? x.GetFieldValue<int?>(1) : null,
                StoreID = !x.IsDBNull(2) ? x.GetFieldValue<int?>(2) : null,
                TerritoryID = !x.IsDBNull(3) ? x.GetFieldValue<int?>(3) : null,
                AccountNumber = !x.IsDBNull(4) ? x.GetFieldValue<string>(4) : null
            })
            .ToArray();

        // もちろん、そのままSubscribeしたり、FirstOrDefaultAsyncしたりしてもいい
    }
}

というわけでRxでも生成はしやすいし、出力結果も相当扱いやすい。素晴らしい!けれど、これもIAsyncEnumerableと同様にSelectの中でasyncしちゃうと厄介なことになるので、基本的にはできない。

Await拡張メソッドを作るのは簡単だけど、Rxの場合はスレッドセーフの問題が結構難しくて、うまく決めるのは難しい……。

それに加えて、ほとんど決まりきっている、ただたんに列挙したいだけ、のものにRxを使うというのはやり過ぎ感があり、性能面でちょっとね、と。うーん、言うほど悪くはないし、今まで散々持ち上げといてなんだよそれって感じですが、目の前にTaskが転がっていて、そこまで利点が大きくない中で選ぶか?と迫られたら、選びにくいなあ、って。思ってしまうのです。

おまけ:Entity Framework 6について

Entity Frameworkはロードマップによりバージョン6から非同期対応すると明言しています。また、デザインノートTask-based Asynchronous Pattern support in EF.も公開されていたり、それとEFはソースコードが公開されているのですが、CodePlexから最新版を落としてくれば、そこには既にTaskによる実装が存在しています。

軽く見たところ、EF内部で使う用のAsyncEnumerableを定義してありました。それを使って、非同期系を動かしてましたね。ただ、完全に内部用なので外からは使えないし、色々限定的ですけれど。また、最終的に出力する場合は、やはり内部イテレータ的に、await ForEachAsyncしてListに変換するなりしていました。ふんふん、なるほどねー、と、結構眺めてて面白いのでお薦めです。

まとめ

C# 5.0で非同期は簡単になった!そして、同様に非同期はやはり難しい!こうして案を見ていったわけですが、結局どれを選ぶの?というと、まあ、内部イテレータ案が一番無難で良いと思います。

それにしても、いやあもう頭ぱっつんぱっつんです。で、何でDBがネタになっているかというと、新Micro-ORMライブラリを作成中で(DbExecutor v.Next)、今から作るなら非同期対応しないとかありえないよねー、ということで色々考えてはいるんですが、これが中々にビシッ!としっくり決めるのが大変で。結構良い感じにはなってきてつもりではあるんですが、全然まだまだで。あんま人に見られたくない段階なんですが諸事情で見れる人には見れるようになってしまってて恥ずかちい。

そんなわけでVS2012登場まで、もうすぐです(MSDN会員は一週間切ってる、そうでない人も一ヶ月後)。C# 5.0への備え、できてますか?さあ、カオスな非同期時代に突入しましょう!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive