Amazon Kinesis + Reactive Extensionsによる簡易CEP

AWSのAmazon Kinesis!大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス。うーん、いかにもわくわくしそうなキーワードが並んでいます。そしてついに先日、東京リージョンでも利用可能になったということでAWS Summitの最中もwktkして、どうやって利用したもんかと考えてました。だって、リアルタイムにイベントデータが流れてくる→オブザーバブルシーケンス→Reactive Extensions(Rx)、という連想になるのは自然なことですよね?

Kinesisとは

Rx、の前にKinesisとは。【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理を見れば事足りますが、表現するなら土管、ですかね。イベントデータの。以下ぽんち絵

Streamの中はShardという単位で分かれていて、データを放り込む時はPartitionKeyを元に、どのShardに突っ込まれるか決まる。読み書き性能自体は完全にShardの数で決まっていて、1シャード毎にWriteは1MB/sec - 1000Req/sec, Readは2MB/sec - 5Req/secとなってます。事前に負荷状況を予測していくのと、随時、Split(Shardの分割)とMerge(Shardの統合)してスケーリングしていく、って感じですかねえ。API自体は単純で、あんま数もないので簡単に理解できるかと。

APIが単純なのはやれることが少ないから。土管。情報を左から右に流すだけのパイプ。その代わり入力は限りなく無限にスケールしていく(Shardを増やしまくれば)。では出力は?というと、Kinesis Applicationとよばれる、といっても実体は、別にAPIをほぼほぼポーリングで叩いてデータ取り出して何か処理するものをそう呼んでるだけ。で、取り出すのはAPI叩いて保存されたデータを読むだけ。

そう、ポーリング。Kinesis自体は一時保管所であって、本当のリアルタイムでPubSub配信するわけじゃあない(用途としては問題ないレベルで低遅延にはなるけれど)。保存時間は24時間で、その間はStream中のどこから(最初からでも最新からでも任意の位置から)でも取り出すことができる。一時保管所がわりにS3を使ったりすると、ゴミは貯まるしどこまで取ったかとか煩わしくなるけれど、Kinesisの場合はStreamの形状になっているのでとてもやりやすい。ただしKinesisは制限として1レコード辺り50KBまで。更にHTTPで投げる際にBase64になってブヨっと膨らむ。

ObservableKinesisClient

C#でKinesisを使うには、AWS SDK for .NETを使えばAmazonKinesisClient入ってます。ソースコードも公開されてるしNuGetでも入れられるし、APIはとりあえずAsyncに対応してるし、APIデザインもちょっと奇妙なところもあるけれど、一応全て統一されたモデルでデザインされてるので、割と結構良いと思ってます。

Kinesis、データの登録はPutRecordでバイナリ投げるだけなので単純なのですが、取り出しの方はいささか面倒で、DescribeStreamによるStream内のShard情報の取得、GetShardIteratorによるShardIterator(どの位置から取得開始するか、の情報)の取得、それを元にGetRecord、そして延々とポーリングのためのループ。と、繰り返す必要があります。

というわけかで、まずは利用例の方を。

// とりあえずAWSのキーと、ストリーム名で生成する感じ
var client = new ObservableKinesisClient("awsAccessId", "awsSecretAccessKey", RegionEndpoint.APNortheast1, streamName: "KinesisTest");

// データの登録。オブジェクトを投げ込むとJSONシリアライズしたのを叩き込む。
await client.PutRecordAsync(new { Date = DateTime.Now, Value = "ほげほげほげほげ" });

// ObserveRecordDynamicでJSONのストリームとして購読できる
client.ObserveRecordDynamic()
    .Where(x => x.Value != "ほげ") // xはdynamicなのでどんなSchemaのJSONも自由に辿れる
    .Select(x => x.Date + ":" + x.Value)
    .Subscribe(Console.WriteLine);

はい。ObserveRecordDynamicで、リアルタイムに流れてくるデータを簡単に購読できます。IObservableなので、Rxによって自由にクエリを書くことが可能。また、何のデータが流れてくるか分からないストリームのために、JSONはdynamicの形でデシリアライズされています。(IntelliSenseの補助は効きませんが)スキーマレスに、あらゆるデータをRxで処理できます。もちろん、型付けされたものが欲しければObserverRecord<T>を、今は実装してないですが、まあ簡単につくれます:)

以下ObservableKinesisClient本体。

// JSON.NET, AWSSDK, Rx-Mainの参照が必要
public class ObservableKinesisClient
{
    readonly UTF8Encoding encoding = new UTF8Encoding(false);
    readonly JsonSerializer serializer = new JsonSerializer() { Formatting = Newtonsoft.Json.Formatting.None }; // ThreadSafeだよ
    readonly string streamName;
    readonly AmazonKinesisClient kinesis; // ThreadSafeなのかは知らない(ぉぃ

    // コンストラクタはもっとまぢめにやりましょう
    public ObservableKinesisClient(string awsAccessId, string awsSecretAccessKey, RegionEndpoint endPoint, string streamName)
    {
        this.kinesis = new AmazonKinesisClient(awsAccessId, awsSecretAccessKey, endPoint);
        this.streamName = streamName;
    }

    // ようするにObjectを1レコードずつJSONで突っ込むもの
    public async Task<PutRecordResponse> PutRecordAsync(object value)
    {
        using (var ms = new MemoryStream())
        using (var sw = new StreamWriter(ms, encoding))
        using (var jw = new JsonTextWriter(sw) { Formatting = Formatting.None })
        {
            serializer.Serialize(jw, value);
            jw.Flush();
            ms.Position = 0;

            var request = new PutRecordRequest
            {
                StreamName = streamName,
                Data = ms,
                PartitionKey = Guid.NewGuid().ToString() // PartitionKeyは適当にランダム
            };

            // つまり1レコード1HTTP POSTということになる。
            // 大量に投げる際は素朴すぎてアレゲ感があるので、実際にやるときはまとめてから放り込んで
            // 取り出す側も↑の構造を前提にして取り出すよーな感じにしたほうがいーかもデスネー
            return await kinesis.PutRecordAsync(request).ConfigureAwait(false);
        }
    }

    // Dynamicが嫌な場合はSerialize<T>でおk。とりあえずこの例ではdynamicでやります。
    // Client内部で分配しちゃったほうがきっと自然にやさしい(Publish().RefCount())
    public IObservable<dynamic> ObserveRecordDynamic()
    {
        return Observable.Create<dynamic>(async (observer, cancellationToken) =>
        {
            var isRunningNextPipeline = false;
            try
            {
                // まずShard一覧を取得する
                // TODO:これを使いまわしちゃうとShardsの増減には対応してないよ!
                // 毎回DescribeStream読むのもアレだしたまに問い合わせとかがいいの?
                var describeStreamResponse = await kinesis.DescribeStreamAsync(new DescribeStreamRequest { StreamName = streamName }).ConfigureAwait(false);
                var shards = describeStreamResponse.StreamDescription.Shards;

                var nextIterators = new List<string>();
                foreach (var shard in shards)
                {
                    if (cancellationToken.IsCancellationRequested) return; // CancellationTokenの監視だいぢだいぢ

                    // ShardIteratorTypeは実際は取り出した位置を記録しておいてAFTER_SEQUENCE_NUMBERでやるか、LATESTでやるかがいーんじゃないでしょーか?
                    var shardIterator = await kinesis.GetShardIteratorAsync(new GetShardIteratorRequest
                    {
                        StreamName = streamName,
                        ShardId = shard.ShardId,
                        ShardIteratorType = ShardIteratorType.TRIM_HORIZON, // TRIM_HORIZON = 最初から, LATEST = 最新, AT_SEQUENCE_NUMBER = そこから, AFTER_SEQUENCE_NUMBER = 次から
                    }).ConfigureAwait(false);

                    var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator.ShardIterator }).ConfigureAwait(false);

                    // Shardの順番で回してるので、このPushの順番は必ずしも「時系列ではない」ことにチューイ!
                    foreach (var item in record.Records)
                    {
                        PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                    }

                    nextIterators.Add(record.NextShardIterator);
                }

                // NextShardIteratorがある状態で無限ぐるぐる
                do
                {
                    if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part2

                    for (int i = 0; i < nextIterators.Count; i++)
                    {
                        if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part3

                        var shardIterator = nextIterators[i];

                        var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator }).ConfigureAwait(false);

                        // こちらでも、やはりShardの順番で回してるので、状況によって必ずしも時系列にはならないことにチューイ!
                        foreach (var item in record.Records)
                        {
                            PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push!
                        }

                        nextIterators[i] = record.NextShardIterator;
                    }

                    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // 実質ポーリングなのでなんとなくDelayをちょっと入れてみる

                    nextIterators = nextIterators.Where(x => x != null).ToList(); // 明らかに非効率なこの実装はテキトーなんで真面目にやるなら真面目に書いてください:)
                } while (nextIterators.Any());
            }
            catch (Exception ex)
            {
                if (isRunningNextPipeline)
                {
                    throw;
                }
                else
                {
                    observer.OnError(ex);
                }

                return;
            }

            observer.OnCompleted();
        });
    }

    void PushRecord(Record record, IObserver<dynamic> observer, ref bool isRunningNextPipeline)
    {
        using (var sr = new StreamReader(record.Data, encoding)) // item.DataにMemoryStreamの形で1レコードが受け取れる
        using (var jr = new JsonTextReader(sr))
        {
            var obj = serializer.Deserialize(jr);
            isRunningNextPipeline = true;
            observer.OnNext(obj); // 1レコードをPush
            isRunningNextPipeline = false;
        }
    }
}

PutRecordAsyncはまんま、JSONにシリアライズしたデータを投げ込んでるだけです。ObserverRecordDynamicのほうはちょっと複雑っぽいですが、やってることは順に、DescribeStreamAsyncでShard一覧を取得→それぞれのShardでGetShardIteratorAsyncで始点の取得・GetRecordsAsyncで最初のデータを取得しobserverに配信→取得できたNextShardIteratorを元にデータ取得と配信の無限ループ。です。

コメントで色々書いてありますが、Shard単位で処理していくのでレコードのSequenceNumberの順にPushされているわけではないことと、ShardがSplitやMergeで変動することへの対応は必要よね、とか考えることは色々ありますね。あと、Readの制限が5Req/secとかなり少ないので、複数処理する必要があるなら、できればリクエストは分配してやりたいところ。RxならPublishで分配、ついでにRefCountでSubscriberが0になったら購読解除というのが自然に書けるので、その辺も入れてやるといいかなー、なんて思います。とはいえ、基本的にはデータ取ってOnNextで垂れ流すという、それだけに収まってはいます(ほんとだよ!)。

従来はこの手のコードはyield returnで処理するはずですが、それがOnNextに変わっているという事実が面白い!勿論、同期API + yield returnにすることも可能ですが、AWS SDKの同期APIは非同期のものを.Resultで取ってるだけで非同期のほうがネイティブになるので、同期API使うのはお薦めしません。非同期時代のLINQ、非同期時代のイテレータ。中々面白くありません?UniRx - Reactive Extensions for UnityのFromCoroutineでも、IObserverをyielderとして渡して、非同期のイテレータを作れる(コンバートできる)ようにしています。こういうのも一つのデザイン。

like CEP(with LINQPad)

CEP(Complex Event Processing)は最近良く聞くようになりましたねー、MicrosoftにもStreamInsightというかなり立派なプロダクトがあるのですが、あんまり話を聞かないし将来性もビミョーそうなので見なかったことにしましょう。ちなみにStreamInsightは2.1からRxと統合されたりして、この手のイベントストリームとRxとが相性良いこと自体は証明済みです。

そんなわけでMicrosoft周辺では全然聞きませんが、日本だとLINEでのEsper CEPの活用例とかNorikra:Schema-less Stream Processing with SQLで盛んに聞いて、まーたMicrosoft周辺によくある、一歩先を行ったと思ったら周回遅れ現象か!とか思ったり思わなかったり。

というわけで、Norikraの紹介スライドのクエリ5つをRxで書いてみましょう。また、動作確認はLINQPadのDumpでリアルタイムに表示が可能です(asynchronousにクエリが走ってる最中はResultsのところにリアルタイムにグリッドが追加されていく!)

// Queries:(1)
client.ObserveRecordDynamic()
    .Select(x => new{ x.Name, x.Age })
    .Dump();

// Queries:(2)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Shibuya")
    .Select(x => new{ x.Name, x.Age })
    .Dump();

// Queries:(3)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

// Queries:(4)
client.ObserveRecordDynamic()
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.Max(x => x.Age))
    .Dump();

// Queries:(5)
client.ObserveRecordDynamic()
    .Where(x => x.Current == "Kyoto" && x.Attend[0] && x.Attend[1])
    .Buffer(TimeSpan.FromMinutes(5))
    .Select(xs => xs.GroupBy(x => x.User.Age).Select(x => new { Age = x.Key, Count = x.Count() }))
    .Dump();

5分間だったらBufferもしくはWindowが使えます(量が少なそうならBufferのほうが、後続クエリにLINQ to Objectsが使えて分かりやすい、量が多いならWindowで、同様にRxで集計クエリが書ける)。他に何ができるかはRxJavaのWikiのOperator一覧でもどうぞ。めちゃくちゃ何でもできます。

SQL vs Rx

SQLである必要は、あるようで、ない。テキストベースのDSLを作るならSQLが共通知識として期待できるので、SQLに寄せる必要性はかなり高い。けれど、Rxならば、LINQとしての共通知識と、C#そのものであるというコンパイルセーフな点と何でもできること、メソッドチェーン(+IntelliSense)による書きやすさ。SQLライクなものを使いたい理由は全くない。

(とはいえ勿論いちだいのRxがぶんさんごりごりのに勝てるとは思ってないんで、そこはまぁかじゅあるなはなしです)

TODO

というわけで見てきたわけですが、まあ所詮まだ単純なコードによるコンセプトレベルの話ですね!本格的にこれからやるとしたら

  • ObservableKinesisClientをもっとしっかりしたものに
  • Kinesis ApplicationをホストするためのServiceとプラグイン機構
  • ログ転送側としてSLABのKinesis用Sink

ですかねえ。まぁ、これらはJavaですでに用意されているamazon-kinesis-clientamazon-kinesis-connectorsを.NET環境で代替するために必要だ、といったところですね。素直にJava書けば?っていうのは一理あるけれど、どーなんですかね、C#でやりたいんですよ(笑)

Semantic Logging Application Block(SLAB)というのは構造化ロガー(正確にはロガーは含まれないけれど)と収集サービスがセットになったライブラリです。面白いのはOut-Of-Processでの動作が選べて、その場合はWindowsネイティブのEvent Tracing for Windows (ETW)経由でログが運ばれるので、非常に高速に動作する、というところ。Sinkというのは出力用プラグインみたいなものです。なので、アプリケーション→EventSourceロガー→SLAB Service(+ KinesisSink)→Kinesis という構造を作ることで、データをリアルタイムに投下するところまでは行ける。あとはRedShiftに送って解析(amazon-kinesis-connectorsには既にありますね)するなり、他のKinesis Application作るなりよしなに出来るかなぁ、できればいいかなぁ、と。ラムダアーキテクチャ、というホドデハ・モチロンナイ。

AWS + Windows(C#)

先週の木・金に開催されたAWS Summit Tokyo 2014にて、AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践と題して、セッションを行いました。

AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践 from Yoshifumi Kawai

まとめで書きましたが、C#+AWSは現実解、だと思ってます。そしてAWSだからといって特別なこともなく、そしてC#だからといって特別なこともない。Kinesisもちゃんと使えるわけだし、結構面白いことがまだまだ出来るんじゃないかな、って思ってます。なんでAzure使わないんですか?というのには、よく聞かれるのでお茶を濁して答えないとして(!)、AzureにもKinesisのようなAzure Event Hubsというものが先週プレビューリリースされました。C#からの活用という点では、こちらにも注目していきたいところです。Event Hubs Developer Guideなんか見ると普通に色々参考になるし、機能的にはHTTP以外にAMQP使えたり、ちょっと強そうではある。

Unity + iOSのAOTでの例外の発生パターンと対処法

Unity、はUnity3Dのほうの話ですが、それで開発していてiOS実機にデプロイして確認すると、以下の様なエラーに悩まされると思います!

System.ExecutionEngineException: Attempting to JIT compile method

ひぎぃ!怖い!これはiOSはネイティブコードしか許可していないので、MonoのAOT(Ahead-Of-Time)コンパイラ経由でネイティブコード変換されるんですが、それの関係で色々な制限があるからなのですね。さて、制限があるのはshoganaiんですが、引っかかるのは痛いです、めっちゃ痛いです、辛いです。

というわけで、どういうコードを書けば発生するのか、というのを並べてみました。どうすれば発生するのか分かれば、自然に避けられますからね。そのうえで、幾つかのものはちょっとしたハックで防げるので、それも述べます。あとは、一々実機で確認なんてやってられないので、効率のよい確認方法などなども紹介します。

Unity 4.5で少し改善されたとか言ってましたが別にあんま改善されてる気配なくて以下のコードは4.5.1で確認取って全部片っ端から死にますんで安心してください、悲しい。

Interlocked.CompareExchange

正確にはInterlocked.CompareExchange<T>が死にます。以下のコードは即死。

// ExecutionEngineException: Attempting to JIT compile method '(wrapper native-to-managed)' while running with --aot-only
var a = "hoge";
Interlocked.CompareExchange<string>(ref a, "hugahuga", "hoge");

ExecutionEngineExceptionの中でもnative-to-managedと出ているものは対処方法が明確で、そもそもUnityのトラブルシューティングのiOSのところにも書いてあります。デリゲートに[MonoPInvokeCallback]が必要だ、と。つまりそういうことで、mscorlib.dll内のメソッドなので手が出せないので、100%死ぬ運命にあります、南無。対処方法は使わないこと。(実際にはそれだけじゃなさそうですが、中のことで分からないのでとりあえずそういうことにしておこふ)

ただし、実はCompareExchangeにはintやdoubleなどを受け取るオーバーロードがあって、そちらは大丈夫です。問題なのは<T>のオーバーロードだけなのです。しかもCompareExchangeにはobjectを受け取るオーバーロードもあるので、そちらを使うことによりT的なものも一応回避することが可能。どうしても使いたい場合は安心してどうぞ。

// これは大丈夫!
object a = "hoge";
var v = Interlocked.CompareExchange(ref a, "hugahuga", "hoge");

ちなみにInterlocked.CompareExchange<T>は意外なところでも使われていて、というか、VS2010以降のコンパイラでeventをコンパイルすると、eventの実装がInterlocked.CompareExchange<T>を用いたものになっています。なのでプラグインとしてdllを作ってUnityに読み込ませると、これに引っかかって死にます。回避方法はなし。event使うのやめましょう、Actionで我慢しましょう。なお、Unity内だけで使う分には古いコードが吐かれるので問題ないです。(あとufcppさんからコメント貰いましたが、add/deleteといったカスタムイベントアクセサを定義すれば回避できるもよふ)

動的コード生成

Reflection.Emitとか、この辺は当たり前だ!ですね。

Expression<Func<string>> expr = () => "hoge";

// System.ExecutionEngineException: Attempting to JIT compile method '(wrapper dynamic-method) System.Runtime.CompilerServices.ExecutionScope:lambda_method (System.Runtime.CompilerServices.ExecutionScope)' while running with --aot-only.
expr.Compile();

Expressionも構築まではOKだけどCompileはNG。悩ましいのは一般的にC#で高速化を測る場合(特にシリアライザ)って動的コード生成+キャッシュをよく使います。neue cc - C#での動的なメソッド選択における定形高速化パターンとかneue cc - Expression Treeのこね方・入門編 - 動的にデリゲートを生成してリフレクションを高速化をミテネ。が、動的コード生成が使えないと低速なリフレクションのみかぁ、うーん、萎える。といったかんぢ。こういうのが積み重なってC#が遅いとか言われると心外だなぁ、UnityのC#は正直、うーん、ねぇ……。

PropertyのReflection

そんなわけでリフレクション。これがひじょーに悩ましくて、どこまでが死んでどこまで大丈夫なのかがひじょーーーーーに分かりづらい!さて、実は意外と行けますが、そして意外と死にます。

// こんなクラスがあるとして
public class MyClass
{
    public int MyInt { get; set; }
    public string MyStr { get; set; }
}

// ----

var mc = new MyClass() { MyStr = "hoge", MyInt = 100 };
var propInfo = typeof(MyClass).GetProperty("MyStr");

// SetValueは大丈夫
propInfo.SetValue(mc, "hugahuga", null);

// GetValueは死ぬ
// System.ExecutionEngineException: Attempting to JIT compile method '(wrapper delegate-invoke) System.Reflection.MonoProperty/Getter`2<>:invoke_string__this___MyClass ()' while running with --aot-only.
var v = propInfo.GetValue(mc, null);

(GetValueは死ぬ、って書きましたがUnity 4.5.1 + iOS7.1で試したら死ななかった、↑が死んだのはmono2.6.7でした)。なんだこの非対称ってところですが、実際そうだからshoganai。そしてGetValueは実は簡単に回避できます。GetGetMethodでメソッドを取得して、それをInvokeすればいい。

var mc = new MyClass() { MyStr = "hoge", MyInt = 100 };
var propInfo = typeof(MyClass).GetProperty("MyStr");

// こうすればGetもできる
var v = propInfo.GetGetMethod().Invoke(mc, null);
Debug.Log(v);

というわけで、これでシリアライザも作ることができます。例えばこんな感じの簡易シリアライザ。

public static void Run()
{
    var format = Serialize(new MyClass { MyInt = 100, MyStr = "hoge" });

    var v = Deserialize<MyClass>(format);
    Debug.Log(v.MyStr + ":" + v.MyInt);
}

public static string Serialize<T>(T obj)
{
    // JSON、ではない
    var sb = new StringBuilder();
    foreach (var item in typeof(T).GetProperties())
    {
        sb.Append(item.Name + ":" + item.GetGetMethod().Invoke(obj, null));
        sb.Append(",");
    }
    return sb.ToString();
}

public static T Deserialize<T>(string format)
{
    var obj = Activator.CreateInstance<T>();
    var type = typeof(T);

    foreach (var item in format.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries).Select(x => x.Split(':')))
    {
        var key = item[0];
        var value = item[1];

        var propInfo = type.GetProperty(key);
        if (propInfo == null) continue;

        // 型の変換は超絶手抜き:)
        if (propInfo.PropertyType == typeof(int))
        {
            propInfo.SetValue(obj, int.Parse(value), null);
        }
        else
        {
            propInfo.SetValue(obj, value, null);
        }
    }

    return obj;
}

これはインチキなテキスト形式にシリアライズしてますが、例えばJSONにシリアライズ・デシリアライズとかできるようにすれば、ひじょーに有益でしょう。ベタリフレクションとかC#er的には萎えるんですが、まぁその辺はshoganaiということで諦めるぐらいはできる。諦めます。

InterfaceとGenericsとStruct

この3つが組み合わさることによって端的に言えば、死ぬ。

// こんなインターフェイスとメソッドがあるとして
public interface IMyInterface
{
    void MyMethod<T>(T x);
}

public class MyImpl : IMyInterface
{
    public void MyMethod<T>(T x)
    {
    }
}

IMyInterface intf = new MyImpl();
intf.MyMethod("hogehoge"); // 死なない

// System.ExecutionEngineException: Attempting to JIT compile method 'MyImpl:MyMethod<int> (int)' while running with --aot-only.
intf.MyMethod(100); // 死ぬ

ジェネリクスのメソッドをインターフェイスで受けて、構造体を渡すと死にます。死にます。クラスなら死なないんですけどねー。さて、しかしこの現象は回避する術があります。

// どこでもいいし呼び出さなくてもいいから、使う構造体の型を呼ぶコードをどっかに書いておく
static void _CompilerHint()
{
    new MyImpl().MyMethod(default(int));
}

void Awake()
{
    IMyInterface intf = new MyImpl();
    intf.MyMethod(100); // ↑により死なない
}

実体で実際に使う型を用いて呼び出してるコードを書いておくと死なずに済みます。実際に呼び出す必要はなくて、とにかく書いてあればいいです。イメージとしてはコンパイラにヒントを与えるような感じ。なのでまぁ、1. インターフェイスで受けないようにする 2.受けなきゃならないシチュエーションがあるなら(まぁそりゃあるよね)どっかに定義沢山書きだしておく。ことにより神回避。オマジナイのようでいて実際効果あるからshoganai。

あと、ジェネリクスはメソッドじゃなくてインターフェイスのほうがTなら死にません。IMyInterface<T>みたいなほう。

LambdaとGenericsとStruct

Genericsのラムダ作って構造体渡すと死にます、例によって渡すのがクラスなら死にません。

// こんなメソッドがあるとして
static void Death<T>()
{
    var act = new Action<T>(_ => { Debug.Log("hoge"); }); // ここではまだ死なない

    // System.ExecutionEngineException: Attempting to JIT compile method '<Death>b__0<int> (int)' while running with --aot-only.
    act(default(T)); // 呼び出すと死ぬ
}

// こんなコード呼び出しすると死ぬ
Death<int>();

こんな入り組んだコード書かないって?いや、案外このパターンに当てはまっちゃったりしたりするんですよ。特にライブラリ書いたりする人だとラムダ式の使いどころによっては、こういうパターンになりがちで頭抱えます。解決策はラムダ式使うのやめよう!じゃあなくて、簡単な解決策があります。

static void Death<T>()
{
    var _dummy = 0;
    var act = new Action<T>(_ => 
    {
        Debug.Log("hoge");

        _dummy.GetHashCode(); // なんでもいいから外側の変数をキャプチャする
    });

    act(default(T)); // 死なない
}

ラムダ式は外側の変数をキャプチャするかしないかによって、生成されるコードが変わってきます。そこがミソで、勿論キャプチャしないほうが本来は効率がいいんですが、AOTで死んでしまっては元も子もない。キャプチャすることによってAOTで死なないコードが生成されます、というわけで、入り組んだシチュエーションでラムダ式使いたい場合は意図的に外側の変数をキャプチャすることで回避できます。これは思いついた時は思わず叫んじゃいましたね!マジで!(そんだけこの問題に悩まされてたんですよ……)

参照型で死ぬ

型引数がクラスなら死ぬことはない、と思っていた時もありました。残念ながら、死ぬ時があるんですねぇー。いや、正確にはclass+structで死ぬ、なんですが、struct+structだと死なないのが癪。これは後述しますがLINQのSumがクラスで死ぬ理由が分からなくて再現コード作ってたらこうなったって感じです。よくわからないけど、こうなった。

public static void Run()
{
    // 参照型でメソッドを呼ぶ
    // System.ExecutionEngineException: Attempting to JIT compile method 'Method2<int, object> ()' while running with --aot-only.
    Method1<object>();
}

// 1型引数でメソッドを呼ぶ際に片方が値型
public static void Method1<T1>()
{
    Method2<int, T1>();
}

// 2型引数で戻り値がある(戻り値の型はなんでもいいけどvoidはダメ)
static string Method2<T1, T2>()
{
    return "";
}

ちなみにMethod1<int>みたいに、struct渡すんなら動くんですよね、逆にこれは。クラスだと死ぬ。どうしてこうなるのか、ちょっとこれはよくわからないですね、ともかくクラスでも油断すると死ぬということはよくわかりました、あべし。

LINQで死ぬ

UnityのiOSビルドで使うとエラーになるLINQ拡張メソッドのメモといった記事もありますが、実際死にます。これは濡れ衣みたいなものなんですけどねぇ、別にLINQが悪いわけじゃないし、それでLINQ使わない!LINQ禁止!とか絶対言って欲しくないです。LINQのないC#なんてC#じゃないです。C#の魅力の8割はLINQなのですから。と、それは置いておいて、実際幾つかのLINQのメソッドは死にます。

例えばAverage(selector)。

// System.ExecutionEngineException: Attempting to JIT compile method 'System.Linq.Enumerable:<Average`1>m__20<int> (long,int)' while running with --aot-only.
Enumerable.Range(1, 3).Average(x => x);

なんで死ぬのかというと、ソース見れば簡単に分かります。Unity-Technologies/monoからEnumerable.csの該当行を見ると

return source.Select (selector).Average<int, long, double> ((a, b) => a + b, (a, b) => (double) a / (double) b);

お分かりかな?そう、「LambdaとGenericsとStruct」のところで見たように、Genericsのメソッドの中で値型のラムダが放出されてます。そう、結構あるんですよ、Genericsのメソッドの中にラムダを埋めてしまうのって。さて、で、これは死にます。具体的に死んだ箇所は、エラー履歴の一番上のat...のとこ見れば

// at System.Linq.Enumerable.Average[Int32,Int64,Double] (IEnumerable`1 source, System.Func`3 func, System.Func`3 result) [0x00000] in <filename unknown>:0 

privateメソッドのAverage(↑でいうAverage[int, long, double]のとこ)で死んでるのが分かります。基本的に呼び出すタイミングで死ぬのでfunc(total, element)ってとこが死亡地点だと推測付きます。

これの対処方法は?ないよ!System.Core.dllの中のコードだから手が出せません。もはや使わないしか選択できません!もしくは、自前実装してAverageSafeとかって拡張メソッドを用意するとか、ですかねえ。それも悪くはないと思います、shoganaiし。

で、実はこの問題は当然mono本体は気付いていて、mono 2.8では改善されています。該当コミットを見れば、ラムダ使って共通化されてるコードがコピペに置き換えられてます:) これがAOTセーフだ!みたいな。はい、ライブラリには苦労してもらいましょう、使う側が快適ならそれで、それがいいのです。

残念ながら現在のUnity(4.5.1)のmonoは2.6で、しかも2.8へのUpgradeは蹴られてます。3.0(そう、monoの最新はもう遠いところにある、Unity置いてかれすぎ)へのアップデートは、もしやる気があるとしても大仕事になるだろうから、当面は来そうにないですねえ。でもmono 2.8で改善されたのって4年前なんですよね、4年前から変わらずLINQ(の一部)が使えないUnity……、残念です。とりあえずダメ元でEnumerable.csだけでもバージョンあげてくれ!ってFeedbackを出したので、よければVoteしてください。Upgrade Enumerable.cs for avoid AOT Problem of LINQ(Average etc...)

XamarinのほうはAOTに関しても先を行っているようで、詳しくはXamarinの中の人である榎本さんのインサイドXamarin(6)の真ん中辺りに書いてあります。最新のXamarinと昔のXamarinと、そしてUnityとではAOTの制限がそれぞれ微妙に違っててなんとも。しかしXamarin、じゅる、いいなぁ……。

UnityのLINQでは他にも明らかに使えないメソッドがあって、例えばThenBy。

// System.ExecutionEngineException: Attempting to JIT compile method 'System.Linq.OrderedEnumerable`1<int>:CreateOrderedEnumerable<int> (System.Func`2<int, int>,System.Collections.Generic.IComparer`1<int>,bool)' while running with --aot-only.
Enumerable.Range(1, 3)
    .OrderBy(x => x)
    .ThenBy(x => x)
    .ToArray();

死にます。これも最新のmonoでは解決しています、該当ソース

#if FULL_AOT_RUNTIME
			var oe = source as OrderedEnumerable <TSource>;
			if (oe != null)
				return oe.CreateOrderedEnumerable (keySelector, comparer, false);
#endif

これは「InterfaceとGenericsとStruct」のとこに書いた制限を回避してます。IOrderedEnumerableというインターフェイスのままCreateOrderedEnumerableを呼ぶと死ぬので、OrderedEnumerableにキャストして具象型に戻すことによってうまく動くようにしています。ThenByは便利なので使いたいものですねえ(まぁ富豪な処理なのでゲーム向けかと言われるとビミョーですが)

最後に、Sumは参照型でも死にます。逆に値型だと生き残れます。

// System.ExecutionEngineException: Attempting to JIT compile method 'System.Linq.Enumerable:Sum<object, int> (System.Collections.Generic.IEnumerable`1<object>,System.Func`3<int, object, int>)' while running with --aot-only.
Enumerable.Empty<object>().Sum(x => (int)x);

これは「参照型で死ぬ」パターン、ソースコードの該当箇所を見ると……

public static int Sum<TSource> (this IEnumerable<TSource> source, Func<TSource, int> selector)
{
	Check.SourceAndSelector (source, selector);

	return Sum<TSource, int> (source, (a, b) => checked (a + selector (b)));
}

static TR Sum<TA, TR> (this IEnumerable<TA> source, Func<TR, TA, TR> selector)
{
	TR total = default (TR);
	long counter = 0;
	foreach (var element in source) {
		total = selector (total, element);
		++counter;
	}

	return total;
}

これねえ、なんで参照型で死ぬのか本当にさっぱり分からなかったけど、とりあえずSum<TSource, int>で二回層掘ってるのが死因っぽいです、片方はint固定で、二階層目は戻り値TRっていう。Max, Minが参照型のみ死ぬもの同じようなコードだった。これで死ぬとかもはや理不尽さしか感じなくて怖い怖い。ちなみにmonoの最新版のコードではメソッドは一回層で重複上等のハイパーコピペになってます(勿論それによりExceptionは発生しなくなる)、それでいいです、はい、ほんと。

そんなわけでLINQは一部の地雷メソッドに注意しながら使う!まぁ、それはそれでいいんですが(地雷が怖いから使わないってのはNG)、やっぱ地雷が埋まってるのは怖い。というわけで、Unityのmonoのランタイムが新しくなってくれるのが一番なのですが現実はそれを待ってはいられないので、mono本体のEnumerable周辺コードを頂いて、名前空間だけ、例えばSystem.LinqExとかにして、基本そちらをusingするようにするっていう風にして回避するのがいいんじゃないかしら、というか私はそうしてます。この辺は名前空間の切り分けだけでなんとかなる拡張メソッドの良さですね。

Enumで死ぬ

簡単にはEnumの配列をToArrayすると観測できる!

// こんなEnumがあるとして
public enum MyEnum
{
    Apple
}

// ToArrayで問答無用で死ぬ
// System.ExecutionEngineException: Attempting to JIT compile method '(wrapper managed-to-managed) MyEnum[]:System.Collections.Generic.ICollection`1.CopyTo (UniRx.MyEnum[],int)' while running with --aot-only.
new[] { MyEnum.Apple }.ToArray();

(wrapper managed-to-managed)ってのが目新しくていいですね!これの対処方法は、元が配列とかListだと印象的にヤヴァいので空のイテレータに変えてやります、それもご丁寧にジェネリクスじゃないIEnumeratorを経由することで、なんとなく回避できます。

public static class AotSafeExtensions
{
    // こんなメソッドを用意しておくと
    public static IEnumerable<T> AsSafeEnumerable<T>(this IEnumerable<T> source)
    {
        var e = ((IEnumerable)source).GetEnumerator();
        using (e as IDisposable)
        {
            while (e.MoveNext())
            {
                yield return (T)e.Current;
            }
        }
    }
}

// 死なない!
new[] { MyEnum.Apple }.AsSafeEnumerable().ToArray();

ヤヴァそうな香りがしたらAsSafeEnumerableを呼ぶ、という対処療法で勝つる。かなぁ……?

実機を使わないでAOTのテストする方法

ここまでで例外の発生パターンと対処法は終わり。じゃあ実際、こういった問題をどう検出するか、ひたすら実機テスト?というのも辛い。で、AOT自体はmono本体にもあって、そして現在のUnityはmono 2.6相当です。というわけでmono 2.6でAOTを動かせばいいんじゃろ?mono --full-aot hoge.exeと書くだけで、iOS実機とほぼほぼ同等のAOT例外が検出できます(この記事の範囲だとInterlocked.CompareExchange以外は同じ)。MonoBehaviourとかは無理ですがロジック系だったらNUnitでユニットテスト書いて、回すことで自動テスト可能になります。

実際、私はこの記事を書くにあたって、Windows + Visual Studio 2013でC#を書いて.exe(ConsoleApplication)作って、それを会社の同僚の作ってくれたexeを渡すとfull-aotで実行して結果表示してくれるウェブサービスに突っ込んで延々と動作確認してました。超捗る。むしろ同僚が神だった。実機とかやってられない。そもそもUnity書くのもVisual Studioじゃなきゃ嫌だ(UnityVS - Unity開発におけるVisual Studio利用のすすめ)。

UniRx

なんで延々と調べたかというと、今、私はUniRx - Reactive Extensions for Unityというライブラリを作っていて、というか実際アセットストアにも既に公開されているんですが(無料です!)、例によってiOSで動かなくて!で、重い腰を上げて調べたのでした。パターンさえ分かってしまえば、まあ十分対応できる範囲ですねー、というわけでバシバシと動かなくなる箇所を殺してる最中です。

UniRx自体はブログ記事をそのうち書く書く詐欺で(一応、ちょっとだけ発表した時の資料はある)、ええと、AOTの対処が終わったら書く!というのと、【第1回】UnityアセットまみれのLT大会でLTするつもりなので、そちらでもよろしくお願いします。というか是非いらしてください、お話しませう。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(C#)
April 2011
|
July 2024

Twitter:@neuecc GitHub:neuecc

Archive