RxとRedisを用いたリモートPub/Sub通信

今日から始まったBuild Insiderで、RedisとBookSleeveの記事を書きました - C#のRedisライブラリ「BookSleeve」の利用法。Redis、面白いし、Windowsで試すのも想像以上に簡単なので、是非是非試してみて欲しいです。そして何よりもBookSleeve!使うと、強制的に全てがasyncになるので、C# 5.0でのコーディングの仕方のトレーニングになる(笑)。にちじょー的にasync/awaitを使い倒すと、そこから色々アイディアが沸き上がっきます。ただたんにsyncがasyncになった、などというだけじゃなく、アプリケーションの造りが変わります。そういう意味では、Taskは結構過小評価されてるのかもしれないな、なんて最近は思っています。

さて、RedisにはPub/Sub機能がついているわけですが、Pub/Sub→オブザーバーパターン→Rx!これはティンと来た!と、いうわけで、これ、Rxに乗せられました、とても自然に。というわけで、□□を○○と見做すシリーズ、なCloudStructuresにRx対応を載せました。

追加したクラスはRedisSubject<T>。これはSubjectやAsyncSubjectなどと同じく、ISubject<T>になっています。IObservableであり、IObserverでもある代物。とりあえず、コード例を見てください。

var settings = new RedisSettings("127.0.0.1");

var subject = new RedisSubject<string>(settings, "PubSubTest");

// SubscribeはIObservable<T>なのでRxなLINQで書ける
var a = subject
    .Select(x => DateTime.Now.Ticks + " " + x)
    .Subscribe(x => Console.WriteLine(x));

var b = subject
    .Where(x => !x.StartsWith("A"))
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("completed!"));

// IObserverなのでOnNext/OnError/OnCompletedでメッセージ配信
subject.OnNext("ABCDEFGHIJKLM");
subject.OnNext("あいうえお");
subject.OnNext("なにぬねの");

Thread.Sleep(200); // 結果表示を待つ...

a.Dispose(); // UnsubscribeはDisposeで
subject.OnCompleted(); // OnCompletedを受信したSubscriberもUnsubscribeされる

はい、別になんてこともない極々フツーのRxのSubjectです。が、しかし、これはネットワークを通って、Redisを通して、全てのSubscriberへとメッセージを配信しています。おお~。いやまあ、コードの見た目からじゃあそういうの分からないので何も感動するところもないのですが、とにかく、本当に極々自然に普通に、しかし、ネットワークを超えます。この、見た目何も変わらずに、というところがいいところなわけです。

Subscribeする側はIObservableなので別にフツーに合成していけますし、Publishする側もIObserverなので、Subscribeにしれっと潜り込ませたりしても構わない。もう、全然、普通にインメモリなRxでやる時と同じことが、できます。

オワリ。地味だ。

う、うーん。ほ、ほらほら!!

// ネットワーク経由
var settings = new RedisSettings("xx.xxx.xxx.xxx");

var subject = new RedisSubject<DateTime>(settings, "PubSubTest");

// publisherはこちらのコード
while (true)
{
    Console.ReadLine();
    var now = DateTime.Now;
    Console.WriteLine(now.Ticks);
    subject.OnNext(now);
}

// subscriberはこちらのコードを動かす
subject.Subscribe(x =>
{
    var now = DateTime.Now;
    Console.Write(x.Ticks + " => " + now.Ticks);
    Console.WriteLine(" | " + (now - x));
});

というわけで、実際にネットワーク経由(AWS上に立てたRedisサーバーを通してる)で動かしてみた結果がこんな感じです。ネットワークを超えたことで用法は幾らでもある!夢膨らみまくり!

で、↑のコードは遅延時間のチェックを兼ねてるのですが、概ね、0.03秒ぐらい。たまにひっかかって0.5秒超えてるのがあって、ぐぬぬですが。実際のとこRedis/Linuxの設定で結構変わってくるところがあるので、その辺は煮詰めてくださいといったところでしょうか。

ともあれチャットとかなら全然問題なし、ゲームでもアクションとかタイミングにシビアじゃないものなら余裕ですね、ボードゲームぐらいなら全く問題ない。ちょっとしたMMOぐらいならいけるかも。これからはネットワーク対戦はRedisで、Rxで!!!

余談

CloudStructuresですが、.configからの設定読み込み機能も地味につけました。

<configSections>
    <section name="cloudStructures" type="CloudStructures.Redis.CloudStructuresConfigurationSection, CloudStructures" />
</configSections>

<cloudStructures>
    <redis>
        <group name="cache">
            <add host="127.0.0.1" />
            <add host="127.0.0.2" port="1000" />
        </group>
        <group name="session">
            <add host="127.0.0.1" db="2" valueConverter="CloudStructures.Redis.ProtoBufRedisValueConverter, CloudStructures" />
        </group>
    </redis>
</cloudStructures>
// これで設定を読み込める
var groups = CloudStructuresConfigurationSection.GetSection().ToRedisGroups();

色々使いやすくなってきて良い感じじゃあないでしょーか。

コールバック撲滅

そうそう、最後に実装の話を。元々BookSleeveのPub/Sub購読はコールバック形式です。「public Task Subscribe(string key, Action<string, byte[]> handler)」handlerはstringがキー, byte[]が送られてくるオブジェクトを指します。コールバック is ダサい。コールバック is 扱いにくい。ので、コールバックを見かけたらObservableかTaskに変換することを考えましょう!それがC# 5.0世代の常識です!

というわけで、以下のようにして変換しました。

public IDisposable Subscribe(IObserver<T> observer)
{
    var channel = Connection.GetOpenSubscriberChannel();

    var disposable = System.Reactive.Disposables.Disposable.Create(() =>
    {
        channel.Unsubscribe(Key).Wait();
    });

    // ここが元からあるコールバック
    channel.Subscribe(Key, (_, xs) =>
    {
        using (var ms = new MemoryStream(xs))
        {
            var value = RemotableNotification<T>.ReadFrom(ms, valueConverter);
            value.Accept(observer); // この中でobserverのOnNext/OnError/OnCompletedが叩かれる
            if (value.Kind == NotificationKind.OnError || value.Kind == NotificationKind.OnCompleted)
            {
                disposable.Dispose(); // ErrorかCompletedでもUnsubscribeしますん
            }
        }
    }).Wait();

    return disposable; // もしDisposableが呼ばれたらUnsubscribeしますん
}

こんなふぅーにRxで包むことで、相当使いやすさがアップします。感動した。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive