並列実行とSqlConnection
- 2013-03-09
どうも、ParallelやThreadな処理が苦痛度100なペチパーです。嘘です。空前のThreadLocalブームが来てたり来てなかったりする昨今です。あ、謎社の宣伝しますとグリーとグラニ、「GREE」におけるソーシャルゲームの提供などについて戦略的業務提携に合意というわけで、ぐりとぐら、としかいいようがない昨今でもあります。その日に開催されていたGREEプラットフォームカンファレンスでは、謎社はC#企業になる!と大宣言したので、ちゃんと実現させていきたいところです、いや、むしろそのためにフル回転しています。
そんな宣伝はおいておいて本題なのですけれど、SQL。データベース。大量にクエリ発行したい時など、パラレル実行したいの!インサートだったら当然BulkInsertが一番早いんですが、Updateとかね。シンドイんだよね。あとUpsert(Merge/ON DUPLICATE KEY UPDATE)とかも使っちゃったりしてね。そんなわけで、お手軽お気楽な手法としてはParallelがありますねー、.NETはこの辺本当に楽なんだよねー、ぴーHPはシラネ。
で、実際パラレールにこんな感じに書くと……
using (var connection = new SqlConnection("接続文字列"))
{
connection.Open();
Parallel.For(1, 1000, x =>
{
var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
});
}
落ちます。理由は単純明快でSqlConnectionはスレッドセーフじゃないから。というわけで、やるなら
Parallel.For(1, 1000, x =>
{
using (var connection = new SqlConnection("接続文字列"))
{
connection.Open();
var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
}
});
となります、これなら絶対安全。でも、スレッドって基本的にコアの数とちょびっとしか立てられないわけだし、連続的に実行しているのだから、たとえコネクションプール行きだとかなんだりであっても、一々コネクションを開いて閉じてをするよりも、開きっぱで行きたいよね。
ようするにSqlConnectionがスレッドセーフじゃないからいけない。これはどこかで聞いたような話です。先日C#とランダムで出したThreadLocalの出番ではないでしょうか!
ThreadLocal
というわけでスレッドセーフなSqlConnectionを作りましょう、ThreadLocalを使って。
using (var connection = new ThreadLocal<SqlConnection>(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }))
{
Parallel.For(1, 1000, x =>
{
var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
});
}
new SqlConnectionがThreadLocalに変わっただけのお手軽さ。これで、安全なんですって!本当に?本当に。で、実際こうして速度はどうなのかというと、私の環境で実行したところ、シングルスレッドで16秒、毎回new SqlConnectionするParallelで5秒、ThreadLocalなParallelで2秒でした。これは圧勝。幸せになれそう。
Disposeを忘れない、或いは忘れた
でも↑のコードはダメです。ダメな理由は、コネクションをDisposeしてないからです。ThreadLocalのDisposeは、あくまでThreadLocalのDisposeなのであって、中身のDisposeはしてくれてないのです。ここ忘れると悲劇が待ってます。でもFactoryで作ってる上にThreadで一意なValue、どうやってまとめてDisposeすればいいの!というと、trackAllValuesというオプションを有効にすると簡単に実現できます。
using (var connection = new ThreadLocal<SqlConnection>(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }
, trackAllValues: true)) // ThreadLocalの.Valuesプロパティの参照を有効化する
{
Parallel.For(1, 1000, x =>
{
var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
});
// 生成された全てのConnectionを一括Dispose
foreach (var item in connection.Values.OfType<IDisposable>()) item.Dispose();
}
このtrackAllValuesが可能なThreadLocalは.NET 4.5からです。それ以前の人は、残念でした……。謎社は遠慮なく.NET 4.5を使いますので全然問題ありません(
もう一つまとめて
とはいえ、なんか面倒くさいので、ちょっとラップしませう、以下のようなクラスを用意します。
public static class DisposableThreadLocal
{
public static DisposableThreadLocal<T> Create<T>(Func<T> valueFactory)
where T : IDisposable
{
return new DisposableThreadLocal<T>(valueFactory);
}
}
public class DisposableThreadLocal<T> : ThreadLocal<T>
where T : IDisposable
{
public DisposableThreadLocal(Func<T> valueFactory)
: base(valueFactory, trackAllValues: true)
{
}
protected override void Dispose(bool disposing)
{
var exceptions = new List<Exception>();
foreach (var item in this.Values.OfType<IDisposable>())
{
try
{
item.Dispose();
}
catch (Exception e)
{
exceptions.Add(e);
}
}
base.Dispose(disposing);
if (exceptions.Any()) throw new AggregateException(exceptions);
}
}
これを使うと
using (var connection = DisposableThreadLocal.Create(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }))
{
Parallel.For(1, 1000, x =>
{
var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
});
}
といったように、超シンプルに書けます。うん。いいね。
それAsync?
Asyncでドバッと発行してTask.WhenAll的なやり方も、接続が非スレッドセーフなのは変わらなくて、結構やりづらいんですよ……。それで、なんか色々細かくawaitしまくりで逆に遅くなったら意味ないし。それならドストレートに行ったほうがいいのでは感が若干ある。どうせThreadなんてそこそこ余ってるんだから(←そうか?)局所的にParallelってもいいぢゃないと思いたい、とかなんとかかんとか。
非.NET 4.5の場合
Parallel.For, ForEachに関しては、localInit, localFinallyというタスク内で一意になる変数を利用したオーバーロードを利用して、似たような雰囲気で書けます。正確には同じ挙動ではないですが、まぁまぁ悪くない結果が得られます。
Parallel.For(1, 1000,
() =>
{
// local init
var conn = new SqlConnection("接続文字列");
conn.Open();
return conn;
},
(x, state, connection) =>
{
var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
return connection;
},
(connection) =>
{
// local finally
connection.Dispose();
});
オーバーロードが結構地獄でシンドイですね!ここも簡単にラップしたものを作りましょう。
public static class ParallelEx
{
public static ParallelLoopResult DisposableFor<TDisposable>(long fromInclusive, long toExclusive, Func<TDisposable> resourceFactory, Action<long, ParallelLoopState, TDisposable> body)
where TDisposable : IDisposable
{
return Parallel.For(fromInclusive, toExclusive, resourceFactory, (item, state, resource) => { body(item, state, resource); return resource; }, disp => disp.Dispose());
}
public static ParallelLoopResult DisposableForEach<T, TDisposable>(IEnumerable<T> source, Func<TDisposable> resourceFactory, Action<T, ParallelLoopState, TDisposable> body)
where TDisposable : IDisposable
{
return Parallel.ForEach(source, resourceFactory, (item, state, resource) => { body(item, state, resource); return resource; }, disp => disp.Dispose());
}
}
こうしたものを作れば、
ParallelEx.DisposableFor(1, 1000,
() =>
{
var conn = new var conn = new SqlConnection("接続文字列");
conn.Open();
return conn;
},
(x, state, connection) =>
{
var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
});
まぁまぁ許せる、かな?