RxとRedisを用いたリモートPub/Sub通信

今日から始まったBuild Insiderで、RedisとBookSleeveの記事を書きました - C#のRedisライブラリ「BookSleeve」の利用法。Redis、面白いし、Windowsで試すのも想像以上に簡単なので、是非是非試してみて欲しいです。そして何よりもBookSleeve!使うと、強制的に全てがasyncになるので、C# 5.0でのコーディングの仕方のトレーニングになる(笑)。にちじょー的にasync/awaitを使い倒すと、そこから色々アイディアが沸き上がっきます。ただたんにsyncがasyncになった、などというだけじゃなく、アプリケーションの造りが変わります。そういう意味では、Taskは結構過小評価されてるのかもしれないな、なんて最近は思っています。

さて、RedisにはPub/Sub機能がついているわけですが、Pub/Sub→オブザーバーパターン→Rx!これはティンと来た!と、いうわけで、これ、Rxに乗せられました、とても自然に。というわけで、□□を○○と見做すシリーズ、なCloudStructuresにRx対応を載せました。

追加したクラスはRedisSubject<T>。これはSubjectやAsyncSubjectなどと同じく、ISubject<T>になっています。IObservableであり、IObserverでもある代物。とりあえず、コード例を見てください。

var settings = new RedisSettings("127.0.0.1");

var subject = new RedisSubject<string>(settings, "PubSubTest");

// SubscribeはIObservable<T>なのでRxなLINQで書ける
var a = subject
    .Select(x => DateTime.Now.Ticks + " " + x)
    .Subscribe(x => Console.WriteLine(x));

var b = subject
    .Where(x => !x.StartsWith("A"))
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("completed!"));

// IObserverなのでOnNext/OnError/OnCompletedでメッセージ配信
subject.OnNext("ABCDEFGHIJKLM");
subject.OnNext("あいうえお");
subject.OnNext("なにぬねの");

Thread.Sleep(200); // 結果表示を待つ...

a.Dispose(); // UnsubscribeはDisposeで
subject.OnCompleted(); // OnCompletedを受信したSubscriberもUnsubscribeされる

はい、別になんてこともない極々フツーのRxのSubjectです。が、しかし、これはネットワークを通って、Redisを通して、全てのSubscriberへとメッセージを配信しています。おお~。いやまあ、コードの見た目からじゃあそういうの分からないので何も感動するところもないのですが、とにかく、本当に極々自然に普通に、しかし、ネットワークを超えます。この、見た目何も変わらずに、というところがいいところなわけです。

Subscribeする側はIObservableなので別にフツーに合成していけますし、Publishする側もIObserverなので、Subscribeにしれっと潜り込ませたりしても構わない。もう、全然、普通にインメモリなRxでやる時と同じことが、できます。

オワリ。地味だ。

う、うーん。ほ、ほらほら!!

// ネットワーク経由
var settings = new RedisSettings("xx.xxx.xxx.xxx");

var subject = new RedisSubject<DateTime>(settings, "PubSubTest");

// publisherはこちらのコード
while (true)
{
    Console.ReadLine();
    var now = DateTime.Now;
    Console.WriteLine(now.Ticks);
    subject.OnNext(now);
}

// subscriberはこちらのコードを動かす
subject.Subscribe(x =>
{
    var now = DateTime.Now;
    Console.Write(x.Ticks + " => " + now.Ticks);
    Console.WriteLine(" | " + (now - x));
});

というわけで、実際にネットワーク経由(AWS上に立てたRedisサーバーを通してる)で動かしてみた結果がこんな感じです。ネットワークを超えたことで用法は幾らでもある!夢膨らみまくり!

で、↑のコードは遅延時間のチェックを兼ねてるのですが、概ね、0.03秒ぐらい。たまにひっかかって0.5秒超えてるのがあって、ぐぬぬですが。実際のとこRedis/Linuxの設定で結構変わってくるところがあるので、その辺は煮詰めてくださいといったところでしょうか。

ともあれチャットとかなら全然問題なし、ゲームでもアクションとかタイミングにシビアじゃないものなら余裕ですね、ボードゲームぐらいなら全く問題ない。ちょっとしたMMOぐらいならいけるかも。これからはネットワーク対戦はRedisで、Rxで!!!

余談

CloudStructuresですが、.configからの設定読み込み機能も地味につけました。

<configSections>
    <section name="cloudStructures" type="CloudStructures.Redis.CloudStructuresConfigurationSection, CloudStructures" />
</configSections>

<cloudStructures>
    <redis>
        <group name="cache">
            <add host="127.0.0.1" />
            <add host="127.0.0.2" port="1000" />
        </group>
        <group name="session">
            <add host="127.0.0.1" db="2" valueConverter="CloudStructures.Redis.ProtoBufRedisValueConverter, CloudStructures" />
        </group>
    </redis>
</cloudStructures>
// これで設定を読み込める
var groups = CloudStructuresConfigurationSection.GetSection().ToRedisGroups();

色々使いやすくなってきて良い感じじゃあないでしょーか。

コールバック撲滅

そうそう、最後に実装の話を。元々BookSleeveのPub/Sub購読はコールバック形式です。「public Task Subscribe(string key, Action<string, byte[]> handler)」handlerはstringがキー, byte[]が送られてくるオブジェクトを指します。コールバック is ダサい。コールバック is 扱いにくい。ので、コールバックを見かけたらObservableかTaskに変換することを考えましょう!それがC# 5.0世代の常識です!

というわけで、以下のようにして変換しました。

public IDisposable Subscribe(IObserver<T> observer)
{
    var channel = Connection.GetOpenSubscriberChannel();

    var disposable = System.Reactive.Disposables.Disposable.Create(() =>
    {
        channel.Unsubscribe(Key).Wait();
    });

    // ここが元からあるコールバック
    channel.Subscribe(Key, (_, xs) =>
    {
        using (var ms = new MemoryStream(xs))
        {
            var value = RemotableNotification<T>.ReadFrom(ms, valueConverter);
            value.Accept(observer); // この中でobserverのOnNext/OnError/OnCompletedが叩かれる
            if (value.Kind == NotificationKind.OnError || value.Kind == NotificationKind.OnCompleted)
            {
                disposable.Dispose(); // ErrorかCompletedでもUnsubscribeしますん
            }
        }
    }).Wait();

    return disposable; // もしDisposableが呼ばれたらUnsubscribeしますん
}

こんなふぅーにRxで包むことで、相当使いやすさがアップします。感動した。

CloudStructures - ローカルとクラウドのデータ構造を透過的に表現するC# + Redisライブラリ

というものを作りました。インストールはNuGetから。

何を言ってるのかヨクワカラナイので、まずはコード例を。

// こんなクラスがあるとして
public class Person
{
    public string Name { get; private set; }
    public List<Person> Friends { get; private set; }

    public Person(string name)
    {
        Name = name;
        Friends = new List<Person>();
    }
}

// こんなのがいるとして
var sato = new Person("さとう");

// 人を足す
sato.Friends.Add(new Person("やまだ"));
sato.Friends.Add(new Person("いとう"));

// 件数数える
var friendCount = sato.Friends.Count;

これは普通にローカルで表現する場合です。実に普通です。では、次。

// RedisServerの設定の表現
public static class RedisServer
{
    public static readonly RedisSettings Default = new RedisSettings("127.0.0.1");
}

// こんなクラスがあるとして
public class Person
{
    public string Name { get; private set; }
    public RedisList<Person> Friends { get; private set; }

    public Person(string name)
    {
        Name = name;
        Friends = new RedisList<Person>(RedisServer.Default, "Person-" + Name);
    }
}

// こんなのがいるとして
var sato = new Person("さとう");

// 人を足す
await sato.Friends.AddLast(new Person("やまだ"));
await sato.Friends.AddLast(new Person("いとう"));

// 件数数える
var friendCount = await sato.Friends.GetLength();

この場合、Redisを通してサーバー上にデータは保存されています。ですが、操作感覚はローカルにあるものとほぼほぼ同じです。違いは全ての操作が非同期なので、awaitするぐらい。

IAsyncList

これは、Actor Framework for Windows AzureのDistributed Collectionsに影響を受けています。ActorFxのそれは、SOURCE CODEを落としてdocsフォルダの Distributed Collections using the ActorFx.docx に色々書いてあって面白いので必読です。

そして、ActorFxではSystem.Cloud.Collectionsとして(System名前空間!)、現状、以下のようなインターフェイスが定義されています(まだ変更の可能性大いにあり)。

namespace System.Cloud.Collections
{
    public interface IAsyncCollection<T> : IObservable<T>
    {
        Task<int> CountAsync { get; }
        Task<bool> IsReadOnlyAsync { get; }

        Task AddAsync(T item);
        Task ClearAsync();
        Task<bool> ContainsAsync(T item);
        Task CopyToAsync(T[] array, int arrayIndex);
        Task<bool> RemoveAsync(T item);
    }

    public interface IAsyncList<T> : IAsyncCollection<T>
    {
        Task<T> GetItemAsync(int index);
        Task SetItemAsync(int index, T value);
        Task<int> IndexOfAsync(T item);
        Task InsertAsync(int index, T item);
        Task RemoveAtAsync(int index);

        // Less chatty versions
        Task AddAsync(IEnumerable<T> items);
        Task RemoveRangeAsync(int index, int count);
    }

    public interface IAsyncDictionary<TKey, TValue> : IAsyncCollection<KeyValuePair<TKey, TValue>>
    {
        Task<TValue> GetValueAsync(TKey key);
        Task SetValueAsync(TKey key, TValue value);
        Task<Tuple<bool, TValue>> TryGetValueAsync(TKey key);

        // No AddAsync - use SetValueAsync instead.  We have no atomic operation to add iff a value is not in the dictionary.
        Task<bool> ContainsKeyAsync(TKey key);
        Task<bool> RemoveAsync(TKey key);

        // Bulk operations
        Task<ICollection<TValue>> GetValuesAsync(IEnumerable<TKey> keys);
        Task SetValuesAsync(IEnumerable<TKey> keys, IEnumerable<TValue> values);
        Task RemoveAsync(IEnumerable<TKey> keys);

        ICollection<TKey> Keys { get; }
        ICollection<TValue> Values { get; }
    }
}

わくわくしてきません?私はこの定義を見た瞬間に衝撃を受けました。RxのIObservable<T>を見た時と同程度の衝撃かもわからない。Ax(ActorFx)の実装としてはCloudList, CloudDictionary, CloudStringDictionaryがありますが(基盤としてAzure Table)、見てすぐにRedisと結びついた。Redisの持つデータ構造、List, Hash, Set, SortedSetってこれじゃないか!って。こういう風に表現されたらどれだけ素敵な見た目になるか……!

Strings, Set, SortedSet, List, Hash, その他

というわけで、最初の例ではRedisListだけ出しましたが、StringsもSetもSortedSetもHashもあります。また、HashClassやMemoizedRedisStringといった特殊なものも幾つか用意してあります。

// フィールドに持たなくても、ふつーにRedisClient的に使ってもいいよ
var client = new RedisString<string>(RedisServer.Default, "toaru-key");
await client.Set("あいうえお!", expirySeconds: TimeSpan.FromMinutes(60).TotalSeconds);

// RedisClassはRedisのHash構造をクラスにマッピングするもの
var hito = new RedisClass<Hito>(RedisServer.Default, "hito-1");
await hito.SetField("Name", "やまもと");
await hito.Increment("Money", 100);

var localHito = await hito.GetValue(); // Cloud -> Localに落とす、的ないめーぢ

実際色々あるので見て回ってください!

ConnectionManagement

基盤的な機能として、BookSleeveの接続管理を兼ねています。

// Redisの設定を表す
var settings = new RedisSettings(host: "127.0.0.1", port: 6379, db: 0);

// BookSleeveはスレッドセーフで単一のコネクションを扱う
// コネクションを一つに保ったり切断されていた場合の再接続などをしてくれる
var conn = settings.GetConnection();


// 複数接続はRedisGroupで管理できる
var group = new RedisGroup(groupName: "Cache", settings: new[]
{
    new RedisSettings(host: "100.0.0.1", port: 6379, db: 0),
    new RedisSettings(host: "105.0.0.1", port: 6379, db: 0),
});

// keyを元に分散先のサーバーを決める(デフォルトはMD5をサーバー台数で割って決めるだけの単純な分散)
var conn = group.GetSettings("hogehoge-100").GetConnection();

// シリアライザはデフォルトではJSONとProtoBufを用意(未指定の場合はJSON)
new RedisSettings("127.0.0.1", converter: new JsonRedisValueConverter());
new RedisSettings("127.0.0.1", converter: new ProtoBufRedisValueConverter());

って、ここまでBookSleeveの説明がなかった!BookSleeveはRedisのライブラリで、非同期の操作のみを提供しています。CloudStructuresのRedis操作はBookSleeveに全部委ねてます。というかぶっちゃけ、かなり単純なラップがほとんどだったりします(!)。見せ方を変えただけ、です、よーするところ。

んで、BookSleeveは斬新で非常に良いライブラリなのですけれど、操作が本当にプリミティブなものしかないので(全てのGetとSetがstringとbyte[]しかない、とかね)、ある程度、自分で作りこんでやらないと全く使えません。なので、この部分だけでも、結構使えるかなって思います。

Next

個人的にはすっごく面白いと思ってます。見せ方の違いでしかないわけですが、しかし、その見せ方の違いというのが非常に大事なのです。直感的、ですが、ある種奇抜なデザインなので、戸惑うとは思います。異色度合いで言ったら、以前に私の作ったReactivePropertyと同程度に異色かな、と。だからこそ、凄く大きな可能性を感じませんか?

ちなみに、これは(いつものように)コンセプト止まりじゃなくて、実際に使う予定アリなので、しっかり育ててく気満々です。是非、試してみてもらえると嬉しいですね。

Microsoft MVP for Visual C#を再再受賞しました

今年も再受賞することができました。去年は某g社のフロントに立って、というのもあって、オフライン活動が割と非常に活発でした。ブログは量は減りましたし、中身も職業柄ウェブ系の要素が濃くなった感。とはいえ、内容自体は結構面白げなC#の内容になってたのではかしらん。基本的には、他で得られない情報や、ディープに踏み込んだ先の話、新しい考え。そういったのを提供し続けたいです。

今年はもっともっと面白くしたい。去年に、私は

C#を世間(といっても、技術系の人に、ですね)に強くアピールするにはどうすればいいのか、といったら、一番大事なのはそれで書かれたメジャーなアプリケーションなのです。PerlではてなやMixi、livedoorなどを思い浮かべる、RubyでCookpadを、ScalaでFoursquareやTwitterを、そういった憧れにも似た気持ちを浮かべさせるようなアプリケーションがなければいけなくて。

Stack OverflowはC#, ASP.NET MVCだし、TIOBEのプログラミング言語ランキングでは三位など、海外でのC#の地位は十分に高い。のですが、国内ではそれと比べれば全くもってない。日本で誰もが知る会社の誰もが知るアプリケーション、それがC#で書かれている。そういう状態にならなければ、日本で強く普及は無理だな、と。

と言いました。今年はそれを実現するため、謎社(もう謎ではない)のCTOとして、人の憧れるような、日本を代表するC#の企業にします。C#による圧倒的な成果、C#だからこその強さ、というのを現実に示していく。幸い、私は今、それができる場所にいると思っています。

そんなわけで、今年もよろしくお願いします。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive