Redis互換の超高速インメモリデータストア「Garnet」にC# CustomCommandを実装してコマンドを拡張する

MicrosoftからIntroducing Garnet – an open-source, next-generation, faster cache-store for accelerating applications and servicesという記事が今日公開されて、Garnetという新しいインメモリデータストアがOSSとして公開されました。Microsoft ResearchでFASTERを手掛けていたチームによるもので、FASTERはC#実装の高速なキーバリューストアでした。今回のGarnetはその発展形のようなもので、FASTERベースのストレージと、Redis互換のプロトコルによる、インメモリデータストアになっています。詳しくはGarnetのほうのブログA Brief History of Garnetで。GarnetもC#で作られています。

ベンチマークによると、Redisはもちろんのこと、DragonflyというRedis互換の世界最速のインメモリデータストア(を公式で謳ってる)Dragonflyよりも高速、だそうで。

image

このグラフ、そこまで大きな差がないように見えますが対数グラフになっていて、Redisが1,000.00 kops/sec に対して、100,000.00 kops/secって言ってます。100倍です!えー。

そもそもRedisの速度に関していうと、シングルスレッドベースであることなどから、たまによくそこまで速くはないというのは言われてきていて、先述のDragonflyはRedis互換で25倍高速とする「Dragonfly」が登場。2022年の最新技術でインメモリデータストアを実装などというリリースとともに、現代の技術で作り直せばもっともっと速くなる、とはされてきました。とはいえ、単純なGET/SETだけのメモリキャッシュとは比較にならない豊富なデータ型など利便性がとても高く、いうて別にそこまで遅いというわけでもないので、特に気にすることなく使われ続けているのではないでしょうか。

GarnetはC#で作られていますが、当然ながらC#専用ではなく、汎用的なRedisサーバーとして動作するため、既存のRedisクライアントで直接繋げることができます。RedisはそのプロトコルRedis serialization protocol(RESP)の仕様を公開しているため、互換サーバーが作りやすいというわけですね、素晴らしい……!

C#から使う場合はStackExchange.Redisと、Garnet同梱のGarent Clientのどちらかが使えます。パッとGarnet Clientを見た限り、現状現実的に使うならStackExchange.Redisですね。最低限は用意されているけれど、Redisクライアントとして使うには、しんどみがありそうです。ただ、性能面ではGarnet Clientのほうが良さそうです。StackExchange.Redisも、前身のBookSleeveから数えると初期設計が10年以上前のものになっているので、現代の観点から見ると設計は古く、パフォーマンス的にも、この実装は悪そうだな、と思えるところがかなりあります。なのでロマンを追いかけるならGarnet Clientを使うのも面白くはあります……!

C#でカスタムコマンドを実装する

普通にRedis互換サーバーとして立てて使うのもいいのですが、C#使いなら面白い点があって、Garnetをライブラリとして参照して(NuGet: Microsoft.Garnet)、アプリケーションに組み込んでのセルフホストができます。例えばロガーとしてZLoggerを差し込んでVerboseでログを出してみたりとか、ちょっと使いやすくていい感じです。ローカル開発とかだったらDockerでRedis動かして、などではなく、ソリューションにGarnetをそのまま組み込んで.NET Aspireで同時起動させるとかもいい感じでしょう。RedisはWindowsでは動かないので(大昔にMicrosoftがForkして動かせるようにしたプロジェクトがありましたが!)、ちゃんと動く互換サーバーが出てきたこと自体がとても嬉しかったりもします。

using Garnet;
using Microsoft.Extensions.Logging;
using ZLogger;

try
{
    var loggerFactory = LoggerFactory.Create(x =>
    {
        x.ClearProviders();
        x.SetMinimumLevel(LogLevel.Trace);
        x.AddZLoggerConsole(options =>
        {
            options.UsePlainTextFormatter(formatter =>
            {
                formatter.SetPrefixFormatter($"[{0}]", (in MessageTemplate template, in LogInfo info) => template.Format(info.Category));
            });
        });
    });

    using var server = new GarnetServer(args, loggerFactory);

    // Optional: register custom extensions
    RegisterExtensions(server);

    // Start the server
    server.Start();
    Thread.Sleep(Timeout.Infinite);
}
catch (Exception ex)
{
    Console.WriteLine($"Unable to initialize server due to exception: {ex.Message}");
}

もう一つは、カスタムコマンドを実装できることです……!C#で……!

Redis上でちょっと複雑な実行をしたいことはよくあり、Redisの場合はLua Scriptで処理していましたが、GarnetではC#でカスタムコマンドを実装して組み込むことができます。LUAだとパフォーマンス上どうか、あるいはLUAではできないかなり複雑なことをしたい、といった場合に、パフォーマンス上のデメリットなく使えます。もっとさらに嬉しい点としては、サーバー側で用意した拡張コマンドは、RESPに従っているので、クライアントはC#専用ではなく、PHPからでもGoからでも呼べます。

というわけで、サンプルということで単純な、「SETLCLAMP」というSET時にclampするカスタムコマンドを早速作っていきましょう。作る前に、先に↑のコードで欠けてるRegisterExtensionsの部分を。

static void RegisterExtensions(GarnetServer server)
{
    // ClampLongCustomCommandというカスタムコマンドをSETLCLAMPというコマンド名で登録する。
    // これはMath.Clampを呼び出すので、パラメーター数は3(long value, long min, long max)
    server.Register.NewCommand("SETLCLAMP", 3, CommandType.ReadModifyWrite, new ClampLongCustomCommand());
}

カスタムコマンドの登録自体は非常に簡単で、CustomRawStringFunctions, CustomTransactionProcedure または CustomObjectFactory を実装したクラスをコマンド名と共に追加するだけです。

カスタムコマンドの実装も簡単……?まぁ、理解すればそれなりぐらいに。

using Garnet.server;
using System.Buffers;
using System.Buffers.Binary;
using Tsavorite.core;

sealed class ClampLongCustomCommand : CustomRawStringFunctions
{
    // trueの場合はKeyが空の時の動作(GetInitialLength, InitilUpdate)を呼びに行く
    public override bool NeedInitialUpdate(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, ref (IMemoryOwner<byte>, int) output) => true;

    // UpdaterのSpan<byte> value(書き込みたいメモリデータ)の長さを決める
    public override int GetInitialLength(ReadOnlySpan<byte> input)
    {
        // 今回はlongだけなので決め打ち8
        return 8;
    }

    public override bool InitialUpdater(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, Span<byte> value, ref (IMemoryOwner<byte>, int) output, ref RMWInfo rmwInfo)
    {
        // inputに対してGetNextArgを連続して呼ぶとパラメーターの取得。これは定型句。
        int offset = 0;
        var arg1 = GetNextArg(input, ref offset);
        var arg2 = GetNextArg(input, ref offset);
        var arg3 = GetNextArg(input, ref offset);

        // ClientはWriteInt64LittleEndianでシリアライズしてきてるので、Readでデシリアライズ
        var v = BinaryPrimitives.ReadInt64LittleEndian(arg1);
        var min = BinaryPrimitives.ReadInt64LittleEndian(arg2);
        var max = BinaryPrimitives.ReadInt64LittleEndian(arg3);

        var result = Math.Clamp(v, min, max);

        // valueに対して値を書くことで値のセットになる
        BinaryPrimitives.WriteInt64LittleEndian(value, result);

        // 戻り値とかエラーを書きたい場合はoutputを使う(RespWriteUtilsに色々Utilityが揃ってる)
        // WriteIntegerAsBulkStringなどを使うと"String"としての結果になることに注意
        // 今回はlongをバイナリとして出力する
        unsafe
        {
            var len = 8 + 6; // $8\r\n{value}\r\n
            var pool = MemoryPool.Rent(len);
            using var memory = pool.Memory.Pin();
            var begin = (byte*)memory.Pointer;
            var end = begin + len;
            RespWriteUtils.WriteBulkString(value, ref begin, end);
            output = (pool, len);
        }

        return true;
    }

    // 同じメモリ領域を再利用する(置換する値の長さが同値なら再利用可能)かどうかを決める
    public override bool NeedCopyUpdate(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, ReadOnlySpan<byte> oldValue, ref (IMemoryOwner<byte>, int) output) => false;

    // 置換時に再利用する場合
    public override bool InPlaceUpdater(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, Span<byte> value, ref int valueLength, ref (IMemoryOwner<byte>, int) output, ref RMWInfo rmwInfo)
    {
        // 置換するvalueの長さが一緒(あるいは小さい)の場合は
        // valueにはoldValueが入ってきてる。
        // 今回は特に考慮しないのでそのまんま書く。

        int offset = 0;
        var v = BinaryPrimitives.ReadInt64LittleEndian(GetNextArg(input, ref offset));
        var min = BinaryPrimitives.ReadInt64LittleEndian(GetNextArg(input, ref offset));
        var max = BinaryPrimitives.ReadInt64LittleEndian(GetNextArg(input, ref offset));

        var result = Math.Clamp(v, min, max);

        BinaryPrimitives.WriteInt64LittleEndian(value, result);
        unsafe
        {
            var len = 8 + 6; // $8\r\n{value}\r\n
            var pool = MemoryPool.Rent(len);
            using var memory = pool.Memory.Pin();
            var begin = (byte*)memory.Pointer;
            var end = begin + len;
            RespWriteUtils.WriteBulkString(value, ref begin, end);
            output = (pool, len);
        }

        return true;
    }

    // 置換時に別のメモリ領域を確保する場合

    public override int GetLength(ReadOnlySpan<byte> value, ReadOnlySpan<byte> input) => 8;

    public override bool CopyUpdater(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, ReadOnlySpan<byte> oldValue, Span<byte> newValue, ref (IMemoryOwner<byte>, int) output, ref RMWInfo rmwInfo) => throw new NotImplementedException();


    // 読み込み処理用
    public override bool Reader(ReadOnlySpan<byte> key, ReadOnlySpan<byte> input, ReadOnlySpan<byte> value, ref (IMemoryOwner<byte>, int) output, ref ReadInfo readInfo) => throw new NotImplementedException();
}

今回はRedisでいうところのStringベースで作るので CustomRawStringFunctions を使います。RedisのStringは文字列型じゃなくて、どちらかというとバイナリ型で、バイナリシリアライズできるものなら、なんでも突っ込めるイメージです。私もゲームサーバーを作っていたときはMessagePackのバイナリを突っ込みまくってましたし、開発時には雑に画像データのバイナリを投げ込んで画像DB代わりに使ったりとかもありました。

オーバーライドするメソッドの数が多いことと、パラメーターがSpan<byte>だらけで一瞬圧倒されちゃうんですが、冷静に追ってみるとそこまで難しいことは言ってないことに気づきます。追加時(Add)・置換時(Replace)が、最適化のため同じサイズか違うサイズかで2択、それとRead時用。といった別れ方をしています。

key, input, valueが全てReadOnlySpan<byte>なのは、まぁそりゃそうでしょう(ここでstringとか出てきたら逆に良くない!)

inputをパラメーターに分解するのはGetNextArgというヘルパーメソッドを使います。当然それも出てくるのはReadOnlySpan<byte>なので、あとは適当に、もしJSONとかMessagePackとかMemoryPackでシリアライズしたデータだったらシリアライザを使って戻すのもいいし、プリミティブの値だったらBinaryPrimitivesが恐らく適役です。MemoryPackでValueTupleにまとめちゃうのがArgumentが分かれないので最速かつ簡単かもしれません。

結果はSpan<byte> valueに書きます。この出力先のSpanの長さは事前にGetLengthまたはGetInitialLengthで求めておく必要があります。outputはクライアント側に戻すときの値で、RESPに則った形式で出力する必要があるので色々注意がいります。まずはRESPの仕様を簡単にでも頭に入れたほうがつまずかないで済むかもしれません、ここを分かってないとイマイチ書きづらいと思います。

と、いうわけで、バイナリ操作がそこそこ混ざることを除けば、それなりに素直に書けるのではないでしょうか。雰囲気は理解しました!ある程度なんでもは出来ますが(CustomTransactionProcedureCustomObjectFactory でもまた色々出来る)、同期メソッドしかないように、DB呼んだりHTTP通信したりはご法度です。当たり前ですが。当たり前ですが。計算量もGarnetサーバーのCPUにストレートに影響を与えるので、そんなに無茶なことを書くことはないと思いますがお気をつけを。それでも、LUAを走らせるよりもずっと軽いんじゃないかなという予感はさせてくれます。実際これただのC#のメソッドそのものですしね。

クライアントから呼び出す場合は、こんなメソッドを用意してみます。

public static class GarnetClientExtensions
{
    // RESPプロトコルにのっとってOpCodeを用意する
    // RESPのBlukStringの仕様: https://redis.io/docs/reference/protocol-spec/#bulk-strings
    // $<length>\r\n<data>\r\n
    readonly static Memory<byte> OpCode_SETLCLAMP = Encoding.ASCII.GetBytes("$9\r\nSETLCLAMP\r\n");

    public static async Task<long> ClampAsync(this GarnetClient client, Memory<byte> key, long value, long min, long max, CancellationToken cancellationToken = default)
    {
        var parameters = new byte[24];

        var valSpan = parameters[0..8];
        var minSpan = parameters[8..16];
        var maxSpan = parameters[16..24];

        BinaryPrimitives.WriteInt64LittleEndian(valSpan, value);
        BinaryPrimitives.WriteInt64LittleEndian(minSpan, min);
        BinaryPrimitives.WriteInt64LittleEndian(maxSpan, max);

        // key + (value, min, max)
        // 戻り値のMemoryResultはArrayPoolから借りてる状態なのでDisposeでReturnする
        using var result = await client.ExecuteForMemoryResultWithCancellationAsync(OpCode_SETLCLAMP, new Memory<byte>[] { key, valSpan, minSpan, maxSpan }, cancellationToken);
        
        return BinaryPrimitives.ReadInt64LittleEndian(result.Span);
    }
}

サーバー側で用意した拡張コマンドは、ちゃんとRESPに従っているので、クライアントはC#専用ではありませんし、Garnet Client専用でもありません。StackExchange.Redisであれば、db.Execute("SETLCLAMP", ...) で呼べます。

実際に動かしてみるとこんな感じです。

static async Task RunClientAsync(ILoggerFactory loggerFactory)
{
    var logger = loggerFactory.CreateLogger("Client");

    var client = new GarnetClient("localhost", 3278, logger: logger);

    logger.ZLogInformation($"Client Connecting.");
    await client.ConnectAsync();
    logger.ZLogInformation($"Success Connect.");

    var key = Encoding.UTF8.GetBytes("foo");

    var v1 = await client.ClampAsync(key, 12345, min: 0, max: 100);
    Console.WriteLine(v1); // 100

    // String系のGET/SET/DELなどは普通に呼べる
    using var v2 = await client.StringGetAsMemoryAsync(key);
    Console.WriteLine(BinaryPrimitives.ReadInt64LittleEndian(v2.Span)); // 100

    var isDelete = await client.KeyDeleteAsync(key);
    Console.WriteLine(isDelete); // True
}

いいですね!

まとめ

さすがに公開されてまだ10時間経ってないぐらいなのでザックリとした理解なのですが、かなりいいんじゃないかと!

どうしてもMemachedとかRedisとかは、クラウドのマネージドサービスが用意されてないと嫌だー、という思考に陥りがちなのですが、C#でガリガリ拡張できるとなれば、まぁマネージドがなくてもしょうがないな!という気持ちになれ、る、でしょうかね……?

まぁそうじゃなくても、あまりマネージド指向になりすぎるのも良くないかな、とは思っています。私は最近はPubSubにNATSをお薦めしてクライアントも作ったりしてたわけですが、もちろんマネージドサービスはありません。で、だから、諦めます、というのは違うかな、と。もったいないと思うんですよね。

なので、必要あれば、いや、必要じゃなくても(?)気持ちがあるなら、自前に立てるというのも否定しちゃあいけないと思ってます。特にC#アプリケーションを作ったことがある人なら、C#で組み込んでホスティングすること自体は別に難しくもない、なんだったらいつもやってることの延長線上でいけますし。もちろん、そこからインフラ安定させるとかデータどうするなとかリカバリどうするとか、そういうのは別問題の話ではありますが……!

ともあれかなり面白いし使える予感があるので、やっていきましょう!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive