Amazon Kinesis + Reactive Extensionsによる簡易CEP

AWSのAmazon Kinesis!大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス。うーん、いかにもわくわくしそうなキーワードが並んでいます。そしてついに先日、東京リージョンでも利用可能になったということでAWS Summitの最中もwktkして、どうやって利用したもんかと考えてました。だって、リアルタイムにイベントデータが流れてくる→オブザーバブルシーケンス→Reactive Extensions(Rx)、という連想になるのは自然なことですよね?

Kinesisとは

Rx、の前にKinesisとは。【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理を見れば事足りますが、表現するなら土管、ですかね。イベントデータの。以下ぽんち絵

Streamの中はShardという単位で分かれていて、データを放り込む時はPartitionKeyを元に、どのShardに突っ込まれるか決まる。読み書き性能自体は完全にShardの数で決まっていて、1シャード毎にWriteは1MB/sec - 1000Req/sec, Readは2MB/sec - 5Req/secとなってます。事前に負荷状況を予測していくのと、随時、Split(Shardの分割)とMerge(Shardの統合)してスケーリングしていく、って感じですかねえ。API自体は単純で、あんま数もないので簡単に理解できるかと。

APIが単純なのはやれることが少ないから。土管。情報を左から右に流すだけのパイプ。その代わり入力は限りなく無限にスケールしていく(Shardを増やしまくれば)。では出力は?というと、Kinesis Applicationとよばれる、といっても実体は、別にAPIをほぼほぼポーリングで叩いてデータ取り出して何か処理するものをそう呼んでるだけ。で、取り出すのはAPI叩いて保存されたデータを読むだけ。

そう、ポーリング。Kinesis自体は一時保管所であって、本当のリアルタイムでPubSub配信するわけじゃあない(用途としては問題ないレベルで低遅延にはなるけれど)。保存時間は24時間で、その間はStream中のどこから(最初からでも最新からでも任意の位置から)でも取り出すことができる。一時保管所がわりにS3を使ったりすると、ゴミは貯まるしどこまで取ったかとか煩わしくなるけれど、Kinesisの場合はStreamの形状になっているのでとてもやりやすい。ただしKinesisは制限として1レコード辺り50KBまで。更にHTTPで投げる際にBase64になってブヨっと膨らむ。

ObservableKinesisClient

C#でKinesisを使うには、AWS SDK for .NETを使えばAmazonKinesisClient入ってます。ソースコードも公開されてるしNuGetでも入れられるし、APIはとりあえずAsyncに対応してるし、APIデザインもちょっと奇妙なところもあるけれど、一応全て統一されたモデルでデザインされてるので、割と結構良いと思ってます。

Kinesis、データの登録はPutRecordでバイナリ投げるだけなので単純なのですが、取り出しの方はいささか面倒で、DescribeStreamによるStream内のShard情報の取得、GetShardIteratorによるShardIterator(どの位置から取得開始するか、の情報)の取得、それを元にGetRecord、そして延々とポーリングのためのループ。と、繰り返す必要があります。

というわけかで、まずは利用例の方を。

// とりあえずAWSのキーと、ストリーム名で生成する感じ
var client = new ObservableKinesisClient("awsAccessId", "awsSecretAccessKey", RegionEndpoint.APNortheast1, streamName: "KinesisTest");

// データの登録。オブジェクトを投げ込むとJSONシリアライズしたのを叩き込む。
await client.PutRecordAsync(new { Date = DateTime.Now, Value = "ほげほげほげほげ" });

// ObserveRecordDynamicでJSONのストリームとして購読できる
client.ObserveRecordDynamic()
    .Where(x => x.Value != "ほげ") // xはdynamicなのでどんなSchemaのJSONも自由に辿れる
    .Select(x => x.Date + ":" + x.Value)
    .Subscribe(Console.WriteLine);

はい。ObserveRecordDynamicで、リアルタイムに流れてくるデータを簡単に購読できます。IObservableなので、Rxによって自由にクエリを書くことが可能。また、何のデータが流れてくるか分からないストリームのために、JSONはdynamicの形でデシリアライズされています。(IntelliSenseの補助は効きませんが)スキーマレスに、あらゆるデータをRxで処理できます。もちろん、型付けされたものが欲しければObserverRecord<T>を、今は実装してないですが、まあ簡単につくれます:)

以下ObservableKinesisClient本体。

// JSON.NET, AWSSDK, Rx-Mainの参照が必要
public class ObservableKinesisClient
{
    readonly UTF8Encoding encoding = new UTF8Encoding(false);
    readonly JsonSerializer serializer = new JsonSerializer() { Formatting = Newtonsoft.Json.Formatting.None }; // ThreadSafeだよ
    readonly string streamName;
    readonly AmazonKinesisClient kinesis; // ThreadSafeなのかは知らない(ぉぃ

    // コンストラクタはもっとまぢめにやりましょう
    public ObservableKinesisClient(string awsAccessId, string awsSecretAccessKey, RegionEndpoint endPoint, string streamName)
    {
        this.kinesis = new AmazonKinesisClient(awsAccessId, awsSecretAccessKey, endPoint);
        this.streamName = streamName;
    }

    // ようするにObjectを1レコードずつJSONで突っ込むもの
    public async Task<PutRecordResponse> PutRecordAsync(object value)
    {
        using (var ms = new MemoryStream())
        using (var sw = new StreamWriter(ms, encoding))
        using (var jw = new JsonTextWriter(sw) { Formatting = Formatting.None })
        {
            serializer.Serialize(jw, value);
            jw.Flush();
            ms.Position = 0;

            var request = new PutRecordRequest
            {
                StreamName = streamName,
                Data = ms,
                PartitionKey = Guid.NewGuid().ToString() // PartitionKeyは適当にランダム
            };

            // つまり1レコード1HTTP POSTということになる。
            // 大量に投げる際は素朴すぎてアレゲ感があるので、実際にやるときはまとめてから放り込んで
            // 取り出す側も↑の構造を前提にして取り出すよーな感じにしたほうがいーかもデスネー
            return await kinesis.PutRecordAsync(request).ConfigureAwait(false);
        }
    }

    // Dynamicが嫌な場合はSerialize<T>でおk。とりあえずこの例ではdynamicでやります。
    // Client内部で分配しちゃったほうがきっと自然にやさしい(Publish().RefCount())
    public IObservable<dynamic> ObserveRecordDynamic()
    {
        return Observable.Create<dynamic>(async (observer, cancellationToken) =>
        {
            var isRunningNextPipeline = false;
            try
            {
                // まずShard一覧を取得する
                // TODO:これを使いまわしちゃうとShardsの増減には対応してないよ!
                // 毎回DescribeStream読むのもアレだしたまに問い合わせとかがいいの?
                var describeStreamResponse = await kinesis.DescribeStreamAsync(new DescribeStreamRequest { StreamName = streamName }).ConfigureAwait(false);
                var shards = describeStreamResponse.StreamDescription.Shards;

                var nextIterators = new List<string>();
                foreach (var shard in shards)
                {
                    if (cancellationToken.IsCancellationRequested) return; // CancellationTokenの監視だいぢだいぢ

                    // ShardIteratorTypeは実際は取り出した位置を記録しておいてAFTER_SEQUENCE_NUMBERでやるか、LATESTでやるかがいーんじゃないでしょーか?
                    var shardIterator = await kinesis.GetShardIteratorAsync(new GetShardIteratorRequest
                    {
                        StreamName = streamName,
                        ShardId = shard.ShardId,
                        ShardIteratorType = ShardIteratorType.TRIM_HORIZON, // TRIM_HORIZON = 最初から, LATEST = 最新, AT_SEQUENCE_NUMBER = そこから, AFTER_SEQUENCE_NUMBER = 次から
                    }).ConfigureAwait(false);

                    var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator.ShardIterator }).ConfigureAwait(false);

                    // Shardの順番で回してるので、このPushの順番は必ずしも「時系列ではない」ことにチューイ!
                    foreach (var item in record.Records)
                    {
                        PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                    }

                    nextIterators.Add(record.NextShardIterator);
                }

                // NextShardIteratorがある状態で無限ぐるぐる
                do
                {
                    if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part2

                    for (int i = 0; i < nextIterators.Count; i++)
                    {
                        if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part3

                        var shardIterator = nextIterators[i];

                        var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator }).ConfigureAwait(false);

                        // こちらでも、やはりShardの順番で回してるので、状況によって必ずしも時系列にはならないことにチューイ!
                        foreach (var item in record.Records)
                        {
                            PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                        }

                        nextIterators[i] = record.NextShardIterator;
                    }

                    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // 実質ポーリングなのでなんとなくDelayをちょっと入れてみる

                    nextIterators = nextIterators.Where(x => x != null).ToList(); // 明らかに非効率なこの実装はテキトーなんで真面目にやるなら真面目に書いてください:)
                } while (nextIterators.Any());
            }
            catch (Exception ex)
            {
                if (isRunningNextPipeline)
                {
                    throw;
                }
                else
                {
                    observer.OnError(ex);
                }

                return;
            }

            observer.OnCompleted();
        });
    }

    void PushRecord(Record record, IObserver<dynamic> observer, ref bool isRunningNextPipeline)
    {
        using (var sr = new StreamReader(record.Data, encoding)) // item.DataにMemoryStreamの形で1レコードが受け取れる
        using (var jr = new JsonTextReader(sr))
        {
            var obj = serializer.Deserialize(jr);
            isRunningNextPipeline = true;
            observer.OnNext(obj); // 1レコードをPush
            isRunningNextPipeline = false;
        }
    }
}

PutRecordAsyncはまんま、JSONにシリアライズしたデータを投げ込んでるだけです。ObserverRecordDynamicのほうはちょっと複雑っぽいですが、やってることは順に、DescribeStreamAsyncでShard一覧を取得→それぞれのShardでGetShardIteratorAsyncで始点の取得・GetRecordsAsyncで最初のデータを取得しobserverに配信→取得できたNextShardIteratorを元にデータ取得と配信の無限ループ。です。

コメントで色々書いてありますが、Shard単位で処理していくのでレコードのSequenceNumberの順にPushされているわけではないことと、ShardがSplitやMergeで変動することへの対応は必要よね、とか考えることは色々ありますね。あと、Readの制限が5Req/secとかなり少ないので、複数処理する必要があるなら、できればリクエストは分配してやりたいところ。RxならPublishで分配、ついでにRefCountでSubscriberが0になったら購読解除というのが自然に書けるので、その辺も入れてやるといいかなー、なんて思います。とはいえ、基本的にはデータ取ってOnNextで垂れ流すという、それだけに収まってはいます(ほんとだよ!)。

従来はこの手のコードはyield returnで処理するはずですが、それがOnNextに変わっているという事実が面白い!勿論、同期API + yield returnにすることも可能ですが、AWS SDKの同期APIは非同期のものを.Resultで取ってるだけで非同期のほうがネイティブになるので、同期API使うのはお薦めしません。非同期時代のLINQ、非同期時代のイテレータ。中々面白くありません?UniRx - Reactive Extensions for UnityのFromCoroutineでも、IObserverをyielderとして渡して、非同期のイテレータを作れる(コンバートできる)ようにしています。こういうのも一つのデザイン。

like CEP(with LINQPad)

CEP(Complex Event Processing)は最近良く聞くようになりましたねー、MicrosoftにもStreamInsightというかなり立派なプロダクトがあるのですが、あんまり話を聞かないし将来性もビミョーそうなので見なかったことにしましょう。ちなみにStreamInsightは2.1からRxと統合されたりして、この手のイベントストリームとRxとが相性良いこと自体は証明済みです。

そんなわけでMicrosoft周辺では全然聞きませんが、日本だとLINEでのEsper CEPの活用例とかNorikra:Schema-less Stream Processing with SQLで盛んに聞いて、まーたMicrosoft周辺によくある、一歩先を行ったと思ったら周回遅れ現象か!とか思ったり思わなかったり。

というわけで、Norikraの紹介スライドのクエリ5つをRxで書いてみましょう。また、動作確認はLINQPadのDumpでリアルタイムに表示が可能です(asynchronousにクエリが走ってる最中はResultsのところにリアルタイムにグリッドが追加されていく!)

// Queries:(1)
client.ObserveRecordDynamic()
    .Select(x => new{ x.Name, x.Age })
    .Dump();

// Queries:(2)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Shibuya")
    .Select(x => new{ x.Name, x.Age })
    .Dump();

// Queries:(3)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

// Queries:(4)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.Max(x => x.Age))
    .Dump();

// Queries:(5)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Kyoto" && x.Attend[0] && x.Attend[1])
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.User.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

5分間だったらBufferもしくはWindowが使えます(量が少なそうならBufferのほうが、後続クエリにLINQ to Objectsが使えて分かりやすい、量が多いならWindowで、同様にRxで集計クエリが書ける)。他に何ができるかはRxJavaのWikiのOperator一覧でもどうぞ。めちゃくちゃ何でもできます。

SQL vs Rx

SQLである必要は、あるようで、ない。テキストベースのDSLを作るならSQLが共通知識として期待できるので、SQLに寄せる必要性はかなり高い。けれど、Rxならば、LINQとしての共通知識と、C#そのものであるというコンパイルセーフな点と何でもできること、メソッドチェーン(+IntelliSense)による書きやすさ。SQLライクなものを使いたい理由は全くない。

(とはいえ勿論いちだいのRxがぶんさんごりごりのに勝てるとは思ってないんで、そこはまぁかじゅあるなはなしです)

TODO

というわけで見てきたわけですが、まあ所詮まだ単純なコードによるコンセプトレベルの話ですね!本格的にこれからやるとしたら

  • ObservableKinesisClientをもっとしっかりしたものに
  • Kinesis ApplicationをホストするためのServiceとプラグイン機構
  • ログ転送側としてSLABのKinesis用Sink

ですかねえ。まぁ、これらはJavaですでに用意されているamazon-kinesis-clientamazon-kinesis-connectorsを.NET環境で代替するために必要だ、といったところですね。素直にJava書けば?っていうのは一理あるけれど、どーなんですかね、C#でやりたいんですよ(笑)

Semantic Logging Application Block(SLAB)というのは構造化ロガー(正確にはロガーは含まれないけれど)と収集サービスがセットになったライブラリです。面白いのはOut-Of-Processでの動作が選べて、その場合はWindowsネイティブのEvent Tracing for Windows (ETW)経由でログが運ばれるので、非常に高速に動作する、というところ。Sinkというのは出力用プラグインみたいなものです。なので、アプリケーション→EventSourceロガー→SLAB Service(+ KinesisSink)→Kinesis という構造を作ることで、データをリアルタイムに投下するところまでは行ける。あとはRedShiftに送って解析(amazon-kinesis-connectorsには既にありますね)するなり、他のKinesis Application作るなりよしなに出来るかなぁ、できればいいかなぁ、と。ラムダアーキテクチャ、というホドデハ・モチロンナイ。

AWS + Windows(C#)

先週の木・金に開催されたAWS Summit Tokyo 2014にて、AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践と題して、セッションを行いました。

AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践 from Yoshifumi Kawai

まとめで書きましたが、C#+AWSは現実解、だと思ってます。そしてAWSだからといって特別なこともなく、そしてC#だからといって特別なこともない。Kinesisもちゃんと使えるわけだし、結構面白いことがまだまだ出来るんじゃないかな、って思ってます。なんでAzure使わないんですか?というのには、よく聞かれるのでお茶を濁して答えないとして(!)、AzureにもKinesisのようなAzure Event Hubsというものが先週プレビューリリースされました。C#からの活用という点では、こちらにも注目していきたいところです。Event Hubs Developer Guideなんか見ると普通に色々参考になるし、機能的にはHTTP以外にAMQP使えたり、ちょっと強そうではある。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive