非同期時代のLINQ

この記事はC# Advent Calendar 2013の4日目となります。2012年はMemcachedTranscoder - C#のMemcached用シリアライザライブラリというクソニッチな記事で誰得でした(しかもその後、私自身もMemcached使ってないし)。その前、2011年はModern C# Programming Style Guide、うーん、もう2年前ですかぁ、Modernじゃないですねえ。2011年の時点ではC# 5.0はCTPでしたが、もう2013年、当然のようにC# 5.0 async/awaitを使いまくる時代です。変化は非常に大きくプログラミングスタイルも大きく変わりますが、特にコレクションの、LINQの取り扱いに癖があります。今回は、非同期時代においてLINQをどう使いこなしていくかを見ていきましょう。

Selectは非同期時代のForEach

これ超大事。これさえ掴んでもらえれば十二分です。さて、まず単純に、Selectで値を取り出す場合。

// こんな同期版と非同期版のメソッドがあるとする
static string GetName(int id)
{
    return "HogeHoge:" + id;
}

static async Task<string> GetNameAsync(int id)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100)); // 適当に待機
    return "HogeHoge:" + id;
}

// 以後idsと出てきたらこれのこと指してるとします
var ids = Enumerable.Range(1, 10);

// 同期バージョン
var names1 = ids.Select(x => new { Id = x, Name = GetName(x) }).ToArray();

// 非同期バージョン
var names2 = await Task.WhenAll(ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }));

ラムダ内でasyncを書き、結果はIEnumerable<Task<T>>となるので、配列に戻してやるためにTask.WhenAllとセットで使っていくのが基本となります。Task.WhenAllで包むのはあまりにも頻出なので、以下の様な拡張メソッドを定義するといいでしょう。

// こういう拡張メソッドを定義しておけば
public static class TaskEnumerableExtensions
{
    public static Task WhenAll(this IEnumerable<Task> tasks)
    {
        return Task.WhenAll(tasks);
    }

    public static Task<T[]> WhenAll<T>(this IEnumerable<Task<T>> tasks)
    {
        return Task.WhenAll(tasks);
    }
}

// スッキリ書ける
var names2 = await ids.Select(async x => new { Id = x, Name = await GetNameAsync(x) }).WhenAll();

では、foreachは?

// 同期
foreach (var id in ids)
{
    Console.WriteLine(GetName(id));
}

// 非同期
foreach (var id in ids)
{
    Console.WriteLine(await GetNameAsync(id));
}

そりゃそーだ。……。おっと、しかしせっかく非同期なのに毎回待機してループしてたらMottaiなくない?GetNameAsyncは一回100ミリ秒かかっているから、100*10で1秒もかかってしまうんだ!ではどうするか、そこでSelectです。

// 同期(idsがList<int>だとする)
ids.ForEach(id =>
{
    Console.WriteLine(GetName(id));
});

// 非同期
await ids.Select(async id =>
{
    Console.WriteLine(await GetNameAsync(id));
})
.WhenAll();

ForEachの位置にSelect。ラムダ式中では戻り値を返していませんが、asyncなので、Taskを返していることになります(Task<T>ではなく)。同期ではvoidとなりLINQで扱えませんが、非同期におけるvoidのTaskは、Selectを通ります。あとはWhenAllで待機してやれば出来上がり。これは全て同時に走るので100msで完了します。10倍の高速化!

ただし、この場合処理順序は保証されません、同時に走っているので。例えばとある時はこうなりました。

HogeHoge:1
HogeHoge:10
HogeHoge:8
HogeHoge:7
HogeHoge:4
HogeHoge:2
HogeHoge:6
HogeHoge:3
HogeHoge:9
HogeHoge:5

処理順序を保証したいなら?WhenAll後に処理ループを回せばいいぢゃない。

// こうすれば全て並列でデータを取得したあと、取得順のままループを回せる
var data = await ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) }).WhenAll();
foreach (var item in data)
{
    Console.WriteLine(item.Name);
}

一旦、一気に詰めた(100ms)後に、再度回す(0ms)。これはアリです。そんなわけで、非同期時代のデータの処理方法は三択です。逐次await, ForEach代わりのSelect, 一気に配列に詰める。どれがイイということはないです、場合によって選べばいいでしょう。

ただ言えるのは、超大事なのは、Selectがキーであるということ、ForEachのような役割を担うこと。しっかり覚えてください。

非同期とLINQ、そしてプリロードについて

さて、SelectだけではただのForEachでLINQじゃない。LINQといったらWhereしてGroupByして、ほげ、もげ……。そんなわけでWhereしてみましょう?

// 非同期の ラムダ式 をデリゲート型 'System.Func<int,int,bool>' に変換できません。
// 非同期の ラムダ式 は void、Task、または Task<T> を返しますが、
// いずれも 'System.Func<int,int,bool>' に変換することができません。
ids.Where(async x =>
{
    var name = await GetNameAsync(x);
    return name.StartsWith("Hoge");
});

おお、コンパイルエラー!無慈悲なんでなんで?というのも、asyncを使うと何をどうやってもTask<bool>しか返せなくて、つまりFunc<T,Task<bool>>となってしまい、Whereの求めるFunc<T,bool>に合致させることは、できま、せん。

Whereだけじゃありません。ラムダ式を求めるものは、みんな詰みます。また、Selectで一度Task<T>が流れると、以降のパイプラインは全てasyncが強いられ、結果として……

// asyncでSelect後はTask<T>になるので以降ラムダ式は全てasyncが強いられる
// これはコンパイル通ってしまいますがkeySelectorにTaskを渡していることになるので
// 実行時エラーで死にます
ids.Select(async id => new { Id = id, Name = await GetNameAsync(id) })
   .OrderBy(async x => (await x).Id)
   .ToArray();

Selectがパイプラインにならず、むしろ出口(ForEach)になっている。自由はない。

ではどうするか。ここは、一度、配列に詰めましょう。

// とある非同期メソッドのあるClassがあるとして
var models = Enumerable.Range(1, 10).Select(x => new ToaruClass());

// 以降の処理で使う非同期系のメソッドなり何かを、全てawaitで実体化して匿名型に詰める
var preload = await models
    .Select(async model => new
    {
        model,
        a = await model.GetAsyncA(),
        b = await model.GetAsyncB(),
        c = await model.GetAsyncC()
    })
    .WhenAll();

// そうして読み取ったもので処理して、(必要なら)最後に戻す
preload.Where(x => x.a == 100 && x.b == 20).Select(x => x.model);

概念的にはプリロード。というのが近いと思います。最初に非同期なデータを全て取得しまえば、扱えるし、ちゃんと並列でデータ取ってこれる。LINQの美徳である無限リストが取り扱えるような遅延実行の性質は消えてしまいますが、それはshoganai。それに、LINQにも完全な遅延実行と、非ストリーミングな遅延実行の二種類があります。非ストリーミングとは、例えばOrderBy。これは並び替えのために、実行された瞬間に全要素を一度蓄えます。例えばGroupBy。これもグルーピングのために、実行された瞬間に全要素を舐めます。非同期LINQもまた、それらと同種だと思えば、少しは納得いきませんか?現実的な妥協としては、このラインはアリだと私は思っています。分かりやすいしパフォーマンスもいい。

AsyncEnumerableの幻想、或いはRxとの邂逅

それでも妥協したくないならば、次へ行きましょう。まだ手はあります、良いかどうかは別としてね。注:ここから先は上級トピックなので適当に読み飛ばしていいです

そう、例えばWhereAsyncのようにして、Func<T,bool>じゃなくFunc<T,Task<bool>>を受け入れてくれるオーバーロードがあれば、いいんじゃない?って思ってみたり。こんな風な?

public static class AsyncEnumerable
{
    // エラー:asyncとyield returnは併用できないよ
    public static async IEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        using (var e = source.GetEnumerator())
        {
            while (e.MoveNext())
            {
                if (await predicate(e.Current))
                {
                    yield return e.Current;
                }
            }
        }
    }
}

ただ、問題の本質はそんなことじゃあない。別にyield returnが使えなければ手書きで作ればいいわけで。そして作ってみれば、本質的な問題がどこにあるのか気づくことができます。

class WhereAsyncEnumerable<T> : IEnumerable<T>, IEnumerator<T>
{
    IEnumerable<T> source;
    Func<T, Task<bool>> predicate;
    T current = default(T);
    IEnumerator<T> enumerator;

    public WhereAsyncEnumerable(IEnumerable<T> source, Func<T, Task<bool>> predicate)
    {
        this.source = source;
        this.predicate = predicate;
    }

    public IEnumerator<T> GetEnumerator()
    {
        return this;
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public T Current
    {
        get { return current; }
    }

    object System.Collections.IEnumerator.Current
    {
        get { return Current; }
    }

    public void Reset()
    {
        throw new NotSupportedException();
    }

    public void Dispose()
    {

    }

    // ↑まではdoudemoii
    // MoveNextが本題

    public bool MoveNext()
    {
        if (enumerator == null) enumerator = source.GetEnumerator();

        while (enumerator.MoveNext())
        {
            // MoveNextはasyncじゃないのでawaitできないからコンパイルエラー
            if (await predicate(enumerator.Current))
            {
                current = enumerator.Current;
                return true;
            }
        }
        return false;
    }
}

MoveNextだけ見てもらえればいいのですが、predicateを使うのはMoveNextなわけです。ここがasyncじゃないと、AsyncなLINQは成立しません。さて、もしMoveNextがasyncだと?

public async Task<bool> MoveNext()
{
    // ここで取得するenumeratorのMoveNextも
    // 全て同一のインターフェイスであることが前提条件なのでTask<bool>とする
    if (enumerator == null) enumerator = source.GetEnumerator();

    while (await enumerator.MoveNext())
    {
        if (await predicate(enumerator.Current))
        {
            current = enumerator.Current;
            return true;
        }
    }
    return false;
}

これは機能します。MoveNextをasyncにするということは連鎖的に全てのMoveNextがasync。それが上から下まで統一されれば、このLINQは機能します。ただ、それってつまり、IEnumerator<T>を捨てるということ。MoveNextがasyncなのは、似て非なるものにすぎない。当然LINQっぽい何かもまた、全て、このasyncなMoveNextを前提にしたものが別途用意されなければならない。そして、それが、Ix-Async

Ix-Asyncのインターフェイスは、上で出したasyncなMoveNextを持ちます。

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetEnumerator();
}

public interface IAsyncEnumerator<out T> : IDisposable
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}

そして当然、各演算子はIAsyncEnumerableを求めます。

public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate);

これの何が便利?IEnumerable<T>からIAsyncEnumerable<T>へはToAsyncEnumerableで変換できはするけれど……、求めているのはIEnumerable<Task<T>>の取り扱いであったりpredicateにTaskを投げ込めたりすることであり、何だかどうにもなく、これじゃない感が否めない。

そもそも、LINQ to Objectsから完全に逸脱した新しいものなら、既にあるじゃない?非同期をLINQで扱うなら、Reactive Extensionsが。

Reactive Extensionsと非同期LINQ

ではRxで扱ってみましょう。の前に、まず、predicateにTaskは投げ込めません。なのでその前処理でロードするのは変わりません。ただ、そのまま続けてLINQ的に処理可能なのが違うところです。

await ids.ToObservable()
    .SelectMany(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

おお、LINQだ?勿論、Where以外にも何でもアリです。RxならLINQ to Objects以上の山のようなメソッドを繋げまわることが可能です。ところで、ここで出てきているのはSelectMany。LINQ to ObjectsでのSelectの役割を、Rxの場合はSelectManyが担っています。asyncにおいてForEachはSelectでRxでSelectはSelectMany……。混乱してきました?

なお、これの結果は順不同です。もしシーケンスの順序どおりにしたい場合はSelect + Concatを代わりに使います。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Concat()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

ソーナンダー?ちなみにSelectManyはSelect + Mergeに等しい。

await ids.ToObservable()
    .Select(async x => new
    {
        Id = x,
        Name = await GetNameAsync(x)
    })
    .Merge()
    .Where(x => x.Name.StartsWith("Hoge"))
    .ForEachAsync(x =>
    {
        Console.WriteLine(x);
    });

この辺のことがしっくりくればRxマスター。つまり、やっぱRxムズカシイデスネ。とはいえ、見たとおり、Rx(2.0)からは、asyncとかなり統合されて、シームレスに取り扱うことが可能になっています。対立じゃなくて協調。自然に共存できます。ただし、単品でもわけわからないものが合わさって更なるカオス!強烈強力!

まとめ

後半のAsyncEnumerableだのIx-AsyncだのRxだのは、割とdoudemoii話です、覚えなくていいです。特にIx-Asyncはただの思考実験なだけで実用性ゼロなので本気でdoudemoiiです。Rxは便利なので覚えてくれてもいいのですが……。

大事なのは、async + Selectです。SelectはForEachなんだー、というのがティンとくれば、勝ったも同然。そして、プリロード的な使い方。そこさえ覚えれば非同期でシーケンス処理も大丈夫。

asyncって新しいので、今まで出来たことが意外と出来なくてはまったりします。でも、それも、どういう障壁があって、どう対処すればいいのか分かっていればなんてことはない話です。乗り越えた先には、間違いなく素晴らしい未来が待っているので、是非C# 5.0の非同期、使いこなしてください。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive