ForEachAsync - 非同期の列挙の方法 Part2
- 2014-03-14
Part2って、Part1はあったのかというと、うーん、非同期時代のLINQ、かな……?さて、今回はForEachがテーマです。といってもそれってSelect+WhenAllでしょ!「Selectは非同期時代のForEach」って言ってたじゃない、というと、はい、言ってました。まだ他に言うことあるの?というと、例えば以下のシチュエーション。
var httpClient = new HttpClient();
var tasks = Enumerable.Range(1, 100000)
.Select(async x =>
{
var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
Console.WriteLine(str);
});
await Task.WhenAll(tasks);
別に動きはしますが、制御不能に10万件、同時リクエスト走ります。これはまぁいくないですよね。もはや途中で死んだりしますので動くとも言えない……。というわけで、元シーケンスが巨大な時は、Select+WhenAllはForEachになりえないのです。
さて、この事態に手抜きで対抗すると?
var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), x =>
{
var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
Console.WriteLine(str);
});
みんな大好きParallel.ForEachです。CPUバウンドとかI/Oバウンドとか面倒くさいんですよ、動きゃあいいんですよ(ホジホジ。という楽さ。実際これは普通に機能します。ので、バッチとかはこんなんでもいーんじゃないでしょうか、マジで。でも、これ、序盤はじわじわと並列数が上がってくので、初速がイマイチに感じるかもしれません。最初はコア数分しか並列にならず、待ちが多いことを検出してからじわじわ上がっていくので。あと終盤の挙動をアレゲに感じたりするかもしれません。待ち時間が長いと、際限なく並列数が上がってっちゃうんですよ。でも別に極端に上がっても速くなるわけじゃなくて、逆にむしろ余計遅くなる。
※これは別に作り話じゃなくて、私はプロダクション環境で実際に数十万リクエストを叩くコードを走らせていて、常に同時並列数やスレッド消費量のモニタ取って、調整いれてます。
どう調整入れるか、というと……
// 最小スレッドプール数を最初に適当に伸ばしてやると初速に効く
// 設定は一回でいいので、アプリケーションスタートアップのところにでも置いときましょう
ThreadPool.SetMinThreads(200, 200);
// 無尽蔵に伸び続けるのもいくないのでMaxDegreeOfParallelismを設定
var httpClient = new HttpClient();
Parallel.ForEach(Enumerable.Range(1, 100000), new ParallelOptions { MaxDegreeOfParallelism = 200 }, x =>
{
var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result;
Console.WriteLine(str);
});
SetMinThreadsとMaxDegreeOfParallelism、この2つはふとぅーに影響大きくて大事。なので適当に、とか書きましたがあんまり適当にやるのはよくない。
ForEachAsync
とはいえ、非同期は非同期として扱いたい!そりゃそーだ。で、つまり、ようするに、同時実行数を抑えながら非同期を走らせられればいい。それにうってつけのクラスがSemaphoreSlim。「リソースまたはリソースのプールに同時にアクセスできるスレッドの数を制限する Semaphore の軽量版です。SemaphoreSlim は、Windows カーネルのセマフォを使用しない、軽量セマフォ クラスを提供します。」。です。.NET 4.0からの登場。使うメソッドはWaitAsync(これは.NET 4.5から)とReleaseがほとんどかな。.NET 4.0の場合はWaitAsyncのかわりにWaitで。
内部にCountを持っていて、それをWaitAsyncで減らし、Releaseで増やします。Countが0に達すると、WaitAsyncは待機するようになります。これを用いてForEachAsyncを作ってみると?
public static class EnumerableExtensions
{
public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int concurrency, CancellationToken cancellationToken = default(CancellationToken), bool configureAwait = false)
{
if (source == null) throw new ArgumentNullException("source");
if (action == null) throw new ArgumentNullException("action");
if (concurrency <= 0) throw new ArgumentOutOfRangeException("concurrencyは1以上の必要があります");
using (var semaphore = new SemaphoreSlim(initialCount: concurrency, maxCount: concurrency))
{
var exceptionCount = 0;
var tasks = new List<Task>();
foreach (var item in source)
{
if (exceptionCount > 0) break;
cancellationToken.ThrowIfCancellationRequested();
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(configureAwait);
var task = action(item).ContinueWith(t =>
{
semaphore.Release();
if (t.IsFaulted)
{
Interlocked.Increment(ref exceptionCount);
throw t.Exception;
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(configureAwait);
}
}
}
ほむ、わからん。ExceptionとかCancellationTokenとかでゴチャついてますが、よーわ、実行開始しようとするとWaitAsyncでカウントを減らして、実行完了したらReleaseでカウントを増やす。初期値の指定がそのまま並列実行数になる、って感じ。利用例を見ると
var httpClient = new HttpClient();
await Enumerable.Range(1, 100000)
.ForEachAsync(async x =>
{
var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x);
Console.WriteLine(str);
}, concurrency: 200);
実に簡単にひどぅーきなForEachができました。これは、Taskの実行開始はシーケンシャルです。これも何気に有難かったりしますねえ。実行完了のほうは順不同です。まあ、そりゃそうだ、って話ですね。
まとめ
SemaphoreSlimかわいい。
RespClient - PowerShell向けのRedisクライアント/コマンドレット
- 2014-03-11
というものを作りました。
- GitHub - RespClient
- PM> Install-Package RespClient
例によってインストールはNuGetで。PowerShellのコマンドレットを含んでいるのでSystem.Management.Automationがないと動きません(多分、よく知らない)。
RESPって?
REdis Serialization Protocolです。RespClientは、何かのRedisClientのラッパーではなくて、自前でプロトコルを解釈してSocket経由で叩いてます。といっても、RESPは非常にシンプルなプロトコロでして、そんなに難しくはありません。作ろうと思った発端は、プロトコルの定義を見てて、先頭の識別子がEnumで
public enum RespType : byte
{
SimpleStrings = (byte)'+',
Erorrs = (byte)'-',
Integers = (byte)':',
BulkStrings = (byte)'$',
Arrays = (byte)'*'
}
こんな風に定義できて面白いなー、という、それがきっかけなのでした。ただ、せめて実用的なものを作りたいと思ったので、特にPowerShellに強くフォーカスするようにしています。
既存のクライアント、私が作ってるCloudStructuresなり、その元のBookSleeveなり、ServiceStack.Redisというのは、やはりC#から使うのが前提で、結構ヘヴィーだと思うのです、PowerShell的なコマンドラインインターフェイスで使うには。なので、Redis-Cli的な感覚で使える、Windowsネイティブのクライアントは、隙間産業的に、ちょうどなかったので良いかな、と。なので私にしては珍しくというか初めてコマンドレット作りましたし!
PowerShellコマンドレット
こんなふーに使います。
# モジュールはdllで提供されています。
Import-Module RespClient.dll
# RedisServerへのコネクションは、一度コネクションを張るとセッション中、ずっと維持されます。
# 他のパラメータは -Host, -Port, -Timeout があります。
Connect-RedisServer 127.0.0.1
# コマンドを送るのはSend-RedisCommandで。戻り値はUTF8Stringでデコードされます。
Send-RedisCommand "set test abcde"
# パイプラインモードもサポートしています。
Begin-RedisPipeline
Send-RedisCommand "set test fghijk"
Send-RedisCommand "incr testb"
Send-RedisCommand "incr testc"
Send-RedisCommand "get test"
Execute-RedisPipeline
# 明示的にコネクションを切るときはDisconnectしてください。
Disconnect-RedisServer
RespClient(.NET)
生の.NETクライアントのほうが、よりコマンドレット経由よりも高機能です。場合によってはPowerShellで使う場合も、コマンドレットではなくて、こちらを使ったほうがいいこともあるかもしれません。具体的な差は、バイナリセーフな値を投げることができるのと、戻り値のバイナリのデコード形式を自由に選べます。
using (var client = new Redis.Protocol.RespClient())
{
// string command
client.SendCommand("set a 1", Encoding.UTF8.GetString);
// binary safe command
client.SendCommand("set", new[] { Encoding.UTF8.GetBytes("test"), Encoding.UTF8.GetBytes("abcde") }, Encoding.UTF8.GetString);
// use pipeline
var results = client.UsePipeline()
.QueueCommand("incr a")
.QueueCommand("incrby b 10")
.QueueCommand("get a", Encoding.UTF8.GetString)
.Execute();
} // disconnect on dispose
まとめ
弊社でぎたぱそさんがたまに使ってます。(私は……そもそもあんま生で触る機会がなく、かな!?)