C#のラムダ式でyieldっぽい何かをawaitで代用する方法

C#がインラインでyield書けないならawait使えばいいじゃない。と、偉い人は言いました。というわけで、こそこそっと開発がされているIxに、面白い機能が入りました(開発リポジトリ上だけなのでNuGetからダウンロードしても、まだ入ってません)。こんなのです。

var hoge = "あいうえお";

var seq = EnumerableEx.Create<int>(async Yield =>
{
    await Yield.Return(10);
    await Yield.Return(100);

    hoge = "ふがふが"; // インラインで書けるのでお外への副作用が可能

    await Yield.Return(1000);
});


foreach (var item in seq)
{
    Console.WriteLine(item); // 10, 100, 1000
}
Console.WriteLine(hoge); // ふがふが

そう、yield return(っぽい何か)がラムダ式で、メソッド外部に出すことなく書けてしまうのです!これは素敵ですね?い、いや、なんか何やってるのか分からなすぎて黒魔術怖いって雰囲気も漂ってますね!しかし面白いものは面白いので、実装見ましょう。

add types iyielder, iawaitable, and iawait; add support for creating ienumerable from action.ということで、まあ、ローカルで動かしたいんでコピペってきましょう。

public static class EnumerableEx
{
    public static IEnumerable<T> Create<T>(Action<IYielder<T>> create)
    {
        if (create == null) throw new ArgumentNullException("create");

        foreach (var x in new Yielder<T>(create))
        {
            yield return x;
        }
    }
}

public interface IYielder<in T>
{
    IAwaitable Return(T value);
    IAwaitable Break();
}

public interface IAwaitable
{
    IAwaiter GetAwaiter();
}

public interface IAwaiter : ICriticalNotifyCompletion
{
    bool IsCompleted { get; }
    void GetResult();
}

public class Yielder<T> : IYielder<T>, IAwaitable, IAwaiter, ICriticalNotifyCompletion
{
    private readonly Action<Yielder<T>> _create;
    private bool _running;
    private bool _hasValue;
    private T _value;
    private bool _stopped;
    private Action _continuation;

    public Yielder(Action<Yielder<T>> create)
    {
        _create = create;
    }

    public IAwaitable Return(T value)
    {
        _hasValue = true;
        _value = value;
        return this;
    }

    public IAwaitable Break()
    {
        _stopped = true;
        return this;
    }

    public Yielder<T> GetEnumerator()
    {
        return this;
    }

    public bool MoveNext()
    {
        if (!_running)
        {
            _running = true;
            _create(this);
        }
        else
        {
            _hasValue = false;
            _continuation();
        }

        return !_stopped && _hasValue;
    }

    public T Current
    {
        get
        {
            return _value;
        }
    }

    public IAwaiter GetAwaiter()
    {
        return this;
    }

    public bool IsCompleted
    {
        get { return false; }
    }

    public void GetResult() { }


    public void OnCompleted(Action continuation)
    {
        _continuation = continuation;
    }

    public void UnsafeOnCompleted(Action continuation)
    {
        _continuation = continuation;
    }
}

ほぅ、わけわからん?若干トリッキーなので、順を追っていきますか。asyncについて考える前に、まず、基本的なforeachのルール。実はIEnumerableを実装している必要はなくて、GetEnumeratorという名前のメソッドがあればいい。同様にMoveNextとCurrentというメソッドがあればIEnumerator扱いされる。なので、foreach (var x in new Yielder(create)) されているYielderはIEnumerableじゃないし、GetEnumeratorでreturn thisされていますが、YielderはIEnumeratorでもない。でも、foreachでグルグル回せている、というわけです。挙動は通常のforeachと同じで、MoveNext→Current、といった形です。

あと、インターフェイスが、IAwaitableとかいっぱい再定義されてて、ワケワカランのですけれど、そこまで意味あるわけじゃないです。これはラムダ式にYielderを渡すわけですが、そこで内部の諸々が呼べちゃうのはイクナイので隠ぺいする、程度の意味合いでしかないので、これを実装するのにインターフェイスの再定義が必要!というわけは全然ないです。

で、コアになるのはMoveNext。

public bool MoveNext()
{
    if (!_running)
    {
        _running = true;
        _create(this);
    }
    else
    {
        _hasValue = false;
        _continuation();
    }

    return !_stopped && _hasValue;
}

そもそもyield returnで生成されたメソッドが最初に実行されるのは、GetEnumeratorのタイミングではなく、GetEnumeratorされて最初のMoveNextが走った時、なので、ここが本体になっているのはセマンティクス的に問題なし。

!_runnningは初回実行時の意味で、ここで_create(this)、によってラムダ式で書いた本体が走ります。

var seq = EnumerableEx.Create<int>(async Yield =>
{
    await Yield.Return(10);
    // ↑のとこがまず実行され始める
    await Yield.Return(100);
    await Yield.Return(1000);
});

public IAwaitable Return(T value)
{
    _hasValue = true;
    _value = value;
    return this;
}

まずはメソッド実行なのでReturn。これは値をセットして回っているだけ。そしてIAwaitableを返し、await。ここで流れは別のところに行きます。

public bool IsCompleted
{
    get { return false; }
}

public void GetResult() { }


public void OnCompleted(Action continuation)
{
    _continuation = continuation;
}

public void UnsafeOnCompleted(Action continuation)
{
    _continuation = continuation;
}

まず完了しているかどうかの確認(IsCompleted)が走りますが、この場合は常にfalseで(そうしないと終了ということになってラムダ式のほうに戻ってこなくなっちゃう)。これによってUnsafeOnCompleted(ICriticalNotifyCompletionが実装されている場合はこっちが走る)でcontinuation(メソッド本体)が走る。で、「次回用」に変数保存して、MoveNext(create(this)したとこの位置)に戻ってくる。あとはMoveNextがtrueを返すのでCurrentで値取得して、それがyield returnされる。

二度目のMoveNextでは

public bool MoveNext()
{
    if (!_running)
    {
        _running = true;
        _create(this);
    }
    else
    {
        _hasValue = false;
        _continuation(); // ここが呼び出されて
    }

    return !_stopped && _hasValue;
}

var seq = EnumerableEx.Create<int>(async Yield =>
{
    await Yield.Return(10);
    // ここから再度走り出す
    await Yield.Return(100);
    await Yield.Return(1000);
});

といった感じになって、以下繰り返し。良く出来てますね!ていうか、asyncなのに非同期全く関係ないのが素敵。そう、asyncは別に非同期関係なく使えちゃうわけです。ここ大事なので繰り返しましょう。asyncは別に非同期関係なく使うことができます。

まとめ

async、フツーに使うのもそろそろ飽きてき頃だと思うので、弄って遊ぶのは大正義。実際に投下しだすかどうかは判断次第。あと、↑のはまだ大事な要素ができていないので絶対使いませんけれど。大事な要素はIDisposableであること。foreachで大事だと思ってるのはDisposeしてくれるとこ!だとも思っているので、それが実現できてないのはナイナー、と。

そういえばAsyncについてですが、3/30の土曜にRoom metro #15でHttpClient(非同期の塊!)について話すので、まだ残席ありますので良ければお越しくだしあー。

並列実行とSqlConnection

どうも、ParallelやThreadな処理が苦痛度100なペチパーです。嘘です。空前のThreadLocalブームが来てたり来てなかったりする昨今です。あ、謎社の宣伝しますとグリーとグラニ、「GREE」におけるソーシャルゲームの提供などについて戦略的業務提携に合意というわけで、ぐりとぐら、としかいいようがない昨今でもあります。その日に開催されていたGREEプラットフォームカンファレンスでは、謎社はC#企業になる!と大宣言したので、ちゃんと実現させていきたいところです、いや、むしろそのためにフル回転しています。

そんな宣伝はおいておいて本題なのですけれど、SQL。データベース。大量にクエリ発行したい時など、パラレル実行したいの!インサートだったら当然BulkInsertが一番早いんですが、Updateとかね。シンドイんだよね。あとUpsert(Merge/ON DUPLICATE KEY UPDATE)とかも使っちゃったりしてね。そんなわけで、お手軽お気楽な手法としてはParallelがありますねー、.NETはこの辺本当に楽なんだよねー、ぴーHPはシラネ。

で、実際パラレールにこんな感じに書くと……

using (var connection = new SqlConnection("接続文字列"))
{
    connection.Open();
    Parallel.For(1, 1000, x =>
    {
        var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
    });
}

落ちます。理由は単純明快でSqlConnectionはスレッドセーフじゃないから。というわけで、やるなら

Parallel.For(1, 1000, x =>
{
    using (var connection = new SqlConnection("接続文字列"))
    {
        connection.Open();
        var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
    }
});

となります、これなら絶対安全。でも、スレッドって基本的にコアの数とちょびっとしか立てられないわけだし、連続的に実行しているのだから、たとえコネクションプール行きだとかなんだりであっても、一々コネクションを開いて閉じてをするよりも、開きっぱで行きたいよね。

ようするにSqlConnectionがスレッドセーフじゃないからいけない。これはどこかで聞いたような話です。先日C#とランダムで出したThreadLocalの出番ではないでしょうか!

ThreadLocal

というわけでスレッドセーフなSqlConnectionを作りましょう、ThreadLocalを使って。

using (var connection = new ThreadLocal<SqlConnection>(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }))
{
    Parallel.For(1, 1000, x =>
    {
        var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
    });
}

new SqlConnectionがThreadLocalに変わっただけのお手軽さ。これで、安全なんですって!本当に?本当に。で、実際こうして速度はどうなのかというと、私の環境で実行したところ、シングルスレッドで16秒、毎回new SqlConnectionするParallelで5秒、ThreadLocalなParallelで2秒でした。これは圧勝。幸せになれそう。

Disposeを忘れない、或いは忘れた

でも↑のコードはダメです。ダメな理由は、コネクションをDisposeしてないからです。ThreadLocalのDisposeは、あくまでThreadLocalのDisposeなのであって、中身のDisposeはしてくれてないのです。ここ忘れると悲劇が待ってます。でもFactoryで作ってる上にThreadで一意なValue、どうやってまとめてDisposeすればいいの!というと、trackAllValuesというオプションを有効にすると簡単に実現できます。

using (var connection = new ThreadLocal<SqlConnection>(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }
    , trackAllValues: true)) // ThreadLocalの.Valuesプロパティの参照を有効化する
{
    Parallel.For(1, 1000, x =>
    {
        var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
    });

    // 生成された全てのConnectionを一括Dispose
    foreach (var item in connection.Values.OfType<IDisposable>()) item.Dispose();
}

このtrackAllValuesが可能なThreadLocalは.NET 4.5からです。それ以前の人は、残念でした……。謎社は遠慮なく.NET 4.5を使いますので全然問題ありません(

もう一つまとめて

とはいえ、なんか面倒くさいので、ちょっとラップしませう、以下のようなクラスを用意します。

public static class DisposableThreadLocal
{
    public static DisposableThreadLocal<T> Create<T>(Func<T> valueFactory)
        where T : IDisposable
    {
        return new DisposableThreadLocal<T>(valueFactory);
    }
}

public class DisposableThreadLocal<T> : ThreadLocal<T>
    where T : IDisposable
{
    public DisposableThreadLocal(Func<T> valueFactory)
        : base(valueFactory, trackAllValues: true)
    {

    }

    protected override void Dispose(bool disposing)
    {
        var exceptions = new List<Exception>();
        foreach (var item in this.Values.OfType<IDisposable>())
        {
            try
            {
                item.Dispose();
            }
            catch (Exception e)
            {
                exceptions.Add(e);
            }
        }

        base.Dispose(disposing);

        if (exceptions.Any()) throw new AggregateException(exceptions);
    }
}

これを使うと

using (var connection = DisposableThreadLocal.Create(() => { var conn = new SqlConnection("接続文字列"); conn.Open(); return conn; }))
{
    Parallel.For(1, 1000, x =>
    {
        var _ = connection.Value.Query<DateTime>("select current_timestamp").First(); // Dapper
    });
}

といったように、超シンプルに書けます。うん。いいね。

それAsync?

Asyncでドバッと発行してTask.WhenAll的なやり方も、接続が非スレッドセーフなのは変わらなくて、結構やりづらいんですよ……。それで、なんか色々細かくawaitしまくりで逆に遅くなったら意味ないし。それならドストレートに行ったほうがいいのでは感が若干ある。どうせThreadなんてそこそこ余ってるんだから(←そうか?)局所的にParallelってもいいぢゃないと思いたい、とかなんとかかんとか。

非.NET 4.5の場合

Parallel.For, ForEachに関しては、localInit, localFinallyというタスク内で一意になる変数を利用したオーバーロードを利用して、似たような雰囲気で書けます。正確には同じ挙動ではないですが、まぁまぁ悪くない結果が得られます。

Parallel.For(1, 1000,
    () =>
    {
        // local init
        var conn = new SqlConnection("接続文字列");
        conn.Open();
        return conn;
    },
    (x, state, connection) =>
    {
        var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
        return connection;
    },
    (connection) =>
    {
        // local finally
        connection.Dispose();
    });

オーバーロードが結構地獄でシンドイですね!ここも簡単にラップしたものを作りましょう。

public static class ParallelEx
{
    public static ParallelLoopResult DisposableFor<TDisposable>(long fromInclusive, long toExclusive, Func<TDisposable> resourceFactory, Action<long, ParallelLoopState, TDisposable> body)
        where TDisposable : IDisposable
    {
        return Parallel.For(fromInclusive, toExclusive, resourceFactory, (item, state, resource) => { body(item, state, resource); return resource; }, disp => disp.Dispose());
    }

    public static ParallelLoopResult DisposableForEach<T, TDisposable>(IEnumerable<T> source, Func<TDisposable> resourceFactory, Action<T, ParallelLoopState, TDisposable> body)
        where TDisposable : IDisposable
    {
        return Parallel.ForEach(source, resourceFactory, (item, state, resource) => { body(item, state, resource); return resource; }, disp => disp.Dispose());
    }
}

こうしたものを作れば、

ParallelEx.DisposableFor(1, 1000,
    () =>
    {
        var conn = new var conn = new SqlConnection("接続文字列");
        conn.Open();
        return conn;
    },
    (x, state, connection) =>
    {
        var _ = connection.Query<DateTime>("select current_timestamp").First(); // Dapper
    });

まぁまぁ許せる、かな?

C#とランダム

古くて新しいわけはない昔ながらのSystem.Randomのお話。Randomのコンストラクタは二種類あって、seed引数アリの場合は必ず同じ順序で数値を返すようになります。

// 何度実行しても同じ結果
var rand = new Random(0);
Console.WriteLine(rand.Next()); // 1559595546
Console.WriteLine(rand.Next()); // 1755192844
Console.WriteLine(rand.Next()); // 1649316166

例えばゲームのリプレイなどは、ランダムだけど同一の結果が得られることを期待したいわけなので、大事大事ですね。(とはいえ、Windows-CLIとLinux-monoでは結果が違ったりするので、マルチプラットフォームでの共有などという場合は、別策を取ったほうがよさそうです)。何も渡さない場合はseedとしてEnvironment.TickCountが渡されます。精度はミリ秒。ということは、ですね、例えばループの中でRandomをnewするとですよ、

for (int i = 0; i < 100; i++)
{
    var rand = new Random();
    Console.WriteLine(rand.Next());
}

マシンスペックにもよりますが、私の環境では30個ぐらい同じ数値が出た後に、別の、また30個ぐらい同じ数値が続き……となりました。何故か、というと、seedがEnvironment.TickCountだからで、ループ内といったようなミリ秒を超える超高速の状態で生成されている時は、seed値が同じとなってしまうから。なので、正しくは

var rand = new Random();
for (int i = 0; i < 100; i++)
{
    Console.WriteLine(rand.Next());
}

といったように、ループの外に出す必要性があります。

ランダムなランダム

では、ランダムなランダムが欲しい場合は。例えばマルチスレッド。そうでなくても、例えばループの外に出す(直接的でなくてもメソッドの中身がそうなっていて、意図せず使われてしまう可能性がある)のを忘れてしまうのを強制的に避ける場合。もしくは、別にマルチスレッドは気を付けるよー、といっても、ASP.NETとか複数リクエストが同時に走るわけで、同タイミングでのRandom生成になってしまう可能性は十分にある。そういう時は、RandomNumberGeneratorを使います。

using (var rng = new RNGCryptoServiceProvider())
{
    // 厳密にランダムなInt32を作る
    var buffer = new byte[sizeof(int)];
    rng.GetBytes(buffer);
    var seed = BitConverter.ToInt32(buffer, 0);
    // そのseedを基にRandomを作る
    var rand = new Random(seed);
}

これでマルチスレッドでも安全安心だ!勿論、RNGCryptoServiceProviderはちょっとコスト高。でも、全然我慢できる範囲ではある。お終い。

ThreadLocal

でも、これって別にスレッドセーフなランダムが欲しいってだけなわけだよね、それなのにちょっとした、とはいえ、コスト高を背負うのって馬鹿げてない?そこで出てくるのがThreadLocal<T>、.NET 4.0以降ですが、スレッド単位で一意な変数を宣言できます。それを使った、Jon Skeet氏(ゆーめーじん)の実装

public static class RandomProvider
{    
    private static int seed = Environment.TickCount;
    
    private static ThreadLocal<Random> randomWrapper = new ThreadLocal<Random>(() =>
        new Random(Interlocked.Increment(ref seed))
    );

    public static Random GetThreadRandom()
    {
        return randomWrapper.Value;
    }
}

なるほどねー!これなら軽量だし、とってもセーフで安心できるしイイね!もし複数スレッドで同時タイミングで初期化が走った時のために、Interlocked.Incrementで、必ず違う値がseedになるようになってるので、これなら色々大丈夫。

マルチスレッド→マルチサーバー

けれど、大丈夫なのは、一台のコンピューターで完結する時だけの時の話。クラウドでしょ!サーバー山盛りでしょ!な時代では、サーバーをまたいで同時タイミングなEnvironment.TickCountで初期化されてしまう可能性が微レ存。というわけで、Environment.TickCountに頼るのは完全に安全ではない。じゃあ、そう、合わせ技で行けばいいじゃない、seedは完全ランダムで行きましょう。

public static class RandomProvider
{
    private static ThreadLocal<Random> randomWrapper = new ThreadLocal<Random>(() =>
    {
        using (var rng = new RNGCryptoServiceProvider())
        {
            var buffer = new byte[sizeof(int)];
            rng.GetBytes(buffer);
            var seed = BitConverter.ToInt32(buffer, 0);
            return new Random(seed);
        }
    });

    public static Random GetThreadRandom()
    {
        return randomWrapper.Value;
    }
}

これで、軽量かつ安全安泰なRandomが手に入りました。めでたしめでたし。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive