ForEachAsync - 非同期の列挙の方法 Part2

Part2って、Part1はあったのかというと、うーん、非同期時代のLINQ、かな……?さて、今回はForEachがテーマです。といってもそれってSelect+WhenAllでしょ!「Selectは非同期時代のForEach」って言ってたじゃない、というと、はい、言ってました。まだ他に言うことあるの?というと、例えば以下のシチュエーション。

var httpClient = new HttpClient();
var tasks = Enumerable.Range(1, 100000)
    .Select(async x =>
    {
        var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
        Console.WriteLine(str);
    });
await Task.WhenAll(tasks);

別に動きはしますが、制御不能に10万件、同時リクエスト走ります。これはまぁいくないですよね。もはや途中で死んだりしますので動くとも言えない……。というわけで、元シーケンスが巨大な時は、Select+WhenAllはForEachになりえないのです。

さて、この事態に手抜きで対抗すると?

var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), x =>
{
    var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
    Console.WriteLine(str);
});

みんな大好きParallel.ForEachです。CPUバウンドとかI/Oバウンドとか面倒くさいんですよ、動きゃあいいんですよ(ホジホジ。という楽さ。実際これは普通に機能します。ので、バッチとかはこんなんでもいーんじゃないでしょうか、マジで。でも、これ、序盤はじわじわと並列数が上がってくので、初速がイマイチに感じるかもしれません。最初はコア数分しか並列にならず、待ちが多いことを検出してからじわじわ上がっていくので。あと終盤の挙動をアレゲに感じたりするかもしれません。待ち時間が長いと、際限なく並列数が上がってっちゃうんですよ。でも別に極端に上がっても速くなるわけじゃなくて、逆にむしろ余計遅くなる。

※これは別に作り話じゃなくて、私はプロダクション環境で実際に数十万リクエストを叩くコードを走らせていて、常に同時並列数やスレッド消費量のモニタ取って、調整いれてます。

どう調整入れるか、というと……

// 最小スレッドプール数を最初に適当に伸ばしてやると初速に効く
// 設定は一回でいいので、アプリケーションスタートアップのところにでも置いときましょう
ThreadPool.SetMinThreads(200, 200);

// 無尽蔵に伸び続けるのもいくないのでMaxDegreeOfParallelismを設定
var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), new ParallelOptions { MaxDegreeOfParallelism = 200 }, x =>
{
    var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
    Console.WriteLine(str);
});

SetMinThreadsとMaxDegreeOfParallelism、この2つはふとぅーに影響大きくて大事。なので適当に、とか書きましたがあんまり適当にやるのはよくない。

ForEachAsync

とはいえ、非同期は非同期として扱いたい!そりゃそーだ。で、つまり、ようするに、同時実行数を抑えながら非同期を走らせられればいい。それにうってつけのクラスがSemaphoreSlim。「リソースまたはリソースのプールに同時にアクセスできるスレッドの数を制限する Semaphore の軽量版です。SemaphoreSlim は、Windows カーネルのセマフォを使用しない、軽量セマフォ クラスを提供します。」。です。.NET 4.0からの登場。使うメソッドはWaitAsync(これは.NET 4.5から)とReleaseがほとんどかな。.NET 4.0の場合はWaitAsyncのかわりにWaitで。

内部にCountを持っていて、それをWaitAsyncで減らし、Releaseで増やします。Countが0に達すると、WaitAsyncは待機するようになります。これを用いてForEachAsyncを作ってみると?

public static class EnumerableExtensions
{
    public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int concurrency, CancellationToken cancellationToken = default(CancellationToken), bool configureAwait = false)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (action == null) throw new ArgumentNullException("action");
        if (concurrency <= 0) throw new ArgumentOutOfRangeException("concurrencyは1以上の必要があります");

        using (var semaphore = new SemaphoreSlim(initialCount: concurrency, maxCount: concurrency))
        {
            var exceptionCount = 0;
            var tasks = new List<Task>();

            foreach (var item in source)
            {
                if (exceptionCount > 0) break;
                cancellationToken.ThrowIfCancellationRequested();

                await semaphore.WaitAsync(cancellationToken).ConfigureAwait(configureAwait);
                var task = action(item).ContinueWith(t =>
                {
                    semaphore.Release();

                    if (t.IsFaulted)
                    {
                        Interlocked.Increment(ref exceptionCount);
                        throw t.Exception;
                    }
                });
                tasks.Add(task);
            }

            await Task.WhenAll(tasks.ToArray()).ConfigureAwait(configureAwait);
        }
    }
}

ほむ、わからん。ExceptionとかCancellationTokenとかでゴチャついてますが、よーわ、実行開始しようとするとWaitAsyncでカウントを減らして、実行完了したらReleaseでカウントを増やす。初期値の指定がそのまま並列実行数になる、って感じ。利用例を見ると

var httpClient = new HttpClient();
await Enumerable.Range(1, 100000)
    .ForEachAsync(async x =>
    {
        var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
        Console.WriteLine(str);
    }, concurrency: 200);

実に簡単にひどぅーきなForEachができました。これは、Taskの実行開始はシーケンシャルです。これも何気に有難かったりしますねえ。実行完了のほうは順不同です。まあ、そりゃそうだ、って話ですね。

まとめ

SemaphoreSlimかわいい。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive