ForEachAsync - 非同期の列挙の方法 Part2
- 2014-03-14
Part2って、Part1はあったのかというと、うーん、非同期時代のLINQ、かな……?さて、今回はForEachがテーマです。といってもそれってSelect+WhenAllでしょ!「Selectは非同期時代のForEach」って言ってたじゃない、というと、はい、言ってました。まだ他に言うことあるの?というと、例えば以下のシチュエーション。
var httpClient = new HttpClient();
var tasks = Enumerable.Range(1, 100000)
.Select(async x =>
{
var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
Console.WriteLine(str);
});
await Task.WhenAll(tasks);
別に動きはしますが、制御不能に10万件、同時リクエスト走ります。これはまぁいくないですよね。もはや途中で死んだりしますので動くとも言えない……。というわけで、元シーケンスが巨大な時は、Select+WhenAllはForEachになりえないのです。
さて、この事態に手抜きで対抗すると?
var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), x =>
{
var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
Console.WriteLine(str);
});
みんな大好きParallel.ForEachです。CPUバウンドとかI/Oバウンドとか面倒くさいんですよ、動きゃあいいんですよ(ホジホジ。という楽さ。実際これは普通に機能します。ので、バッチとかはこんなんでもいーんじゃないでしょうか、マジで。でも、これ、序盤はじわじわと並列数が上がってくので、初速がイマイチに感じるかもしれません。最初はコア数分しか並列にならず、待ちが多いことを検出してからじわじわ上がっていくので。あと終盤の挙動をアレゲに感じたりするかもしれません。待ち時間が長いと、際限なく並列数が上がってっちゃうんですよ。でも別に極端に上がっても速くなるわけじゃなくて、逆にむしろ余計遅くなる。
※これは別に作り話じゃなくて、私はプロダクション環境で実際に数十万リクエストを叩くコードを走らせていて、常に同時並列数やスレッド消費量のモニタ取って、調整いれてます。
どう調整入れるか、というと……
// 最小スレッドプール数を最初に適当に伸ばしてやると初速に効く
// 設定は一回でいいので、アプリケーションスタートアップのところにでも置いときましょう
ThreadPool.SetMinThreads(200, 200);
// 無尽蔵に伸び続けるのもいくないのでMaxDegreeOfParallelismを設定
var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), new ParallelOptions { MaxDegreeOfParallelism = 200 }, x =>
{
var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
Console.WriteLine(str);
});
SetMinThreadsとMaxDegreeOfParallelism、この2つはふとぅーに影響大きくて大事。なので適当に、とか書きましたがあんまり適当にやるのはよくない。
ForEachAsync
とはいえ、非同期は非同期として扱いたい!そりゃそーだ。で、つまり、ようするに、同時実行数を抑えながら非同期を走らせられればいい。それにうってつけのクラスがSemaphoreSlim。「リソースまたはリソースのプールに同時にアクセスできるスレッドの数を制限する Semaphore の軽量版です。SemaphoreSlim は、Windows カーネルのセマフォを使用しない、軽量セマフォ クラスを提供します。」。です。.NET 4.0からの登場。使うメソッドはWaitAsync(これは.NET 4.5から)とReleaseがほとんどかな。.NET 4.0の場合はWaitAsyncのかわりにWaitで。
内部にCountを持っていて、それをWaitAsyncで減らし、Releaseで増やします。Countが0に達すると、WaitAsyncは待機するようになります。これを用いてForEachAsyncを作ってみると?
public static class EnumerableExtensions
{
public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int concurrency, CancellationToken cancellationToken = default(CancellationToken), bool configureAwait = false)
{
if (source == null) throw new ArgumentNullException("source");
if (action == null) throw new ArgumentNullException("action");
if (concurrency <= 0) throw new ArgumentOutOfRangeException("concurrencyは1以上の必要があります");
using (var semaphore = new SemaphoreSlim(initialCount: concurrency, maxCount: concurrency))
{
var exceptionCount = 0;
var tasks = new List<Task>();
foreach (var item in source)
{
if (exceptionCount > 0) break;
cancellationToken.ThrowIfCancellationRequested();
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(configureAwait);
var task = action(item).ContinueWith(t =>
{
semaphore.Release();
if (t.IsFaulted)
{
Interlocked.Increment(ref exceptionCount);
throw t.Exception;
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(configureAwait);
}
}
}
ほむ、わからん。ExceptionとかCancellationTokenとかでゴチャついてますが、よーわ、実行開始しようとするとWaitAsyncでカウントを減らして、実行完了したらReleaseでカウントを増やす。初期値の指定がそのまま並列実行数になる、って感じ。利用例を見ると
var httpClient = new HttpClient();
await Enumerable.Range(1, 100000)
.ForEachAsync(async x =>
{
var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
Console.WriteLine(str);
}, concurrency: 200);
実に簡単にひどぅーきなForEachができました。これは、Taskの実行開始はシーケンシャルです。これも何気に有難かったりしますねえ。実行完了のほうは順不同です。まあ、そりゃそうだ、って話ですね。
まとめ
SemaphoreSlimかわいい。