LINQPad Driver + LINQ to BigQueryによるBigQueryデスクトップGUIクライアント

Happy signed!何かというと、長らく署名の付いていなかったGoogle APIの.NET SDKに署名が付いたのです!署名が付くと何ができるかというと、LINQPadのDriver(プラグイン)が作れます。LINQPadのDriverは署名なしだと起動できないので……。正直、私ももはや署名とか全然重視してないし100億年前の化石概念の負の異物だろ、ぐらいに思ってなくもないのですが、さすがに、LINQPad Driverを作れない、という事態には随分と嘆いたものでした。が、やっと作ることが出来て感無量。そして、実際動かしてみると相当便利ですね。これがやりたかったんですよ、これがー。

LINQ to BigQueryのLINQPad Driverが可能にする範囲は、

  • サイドバーでのスキーマのツリー表示
  • thisを読み込んでいるConnectionで認証済みのBigQueryContextに変更
  • 関連するアセンブリと名前空間を自動で読み込み
  • スキーマに対応するクラスを動的に生成/読み込み
  • ちょっとしたユーティリティDumpの追加(DumpRun/DumpRunToArray/DumpChart/DumpGroupChart)
  • もちろんクエリのローカルでの保存/読み込みが可能

です。元々のLINQ to BigQueryが提供している機能としては

  • TableDateRangeに対するサポート
  • DateTimeの自動変換(一部のBigQueryの機能はUnix Timestampで書く必要があり、実質手で書くのは不可能なものもありましたが、自動変換により救われる)
  • 結果セットをローカル時間に自動変換(基本的にUTCで帰ってくるので、ローカル時間で考える際に+9時間しなきゃいけなかったりしますが、C#側でデシリアライズする際にローカルタイムに自動変換する)
  • 全てが型付きで入力補完が全面的に効く
  • 全てのBigQuery関数の入力補完にドキュメント付き

があって(この辺の詳しい話は以前に書いたLINQ to BigQuery - C#による型付きDSLとLINQPadによるDumpと可視化を見てください)、相乗効果でかなり強まったのではないでしょうか。

公式ウェブコンソールで叩くのとどっちがいいかといったら、まぁ私自身も結構、ウェブから叩くのは多かったりしますので、どっちでもいいといえばいいんですが、それもプラグインを作る前は……かしら。今後は私自身もLINQPad利用が増えるかなー。明らかにウェブから叩くのじゃ提供できない機能というか、素のBigQuery SQLじゃ中々できない機能を多く提供しているわけで、LINQPad + LINQ to BigQUeryにはかなりのアドバンテージがあります。

Excel統合

問答無用に愚直なExcel統合があります。

legendary_dump_to_excel

そう、DumpToExcel()で実行すると結果セットがダイレクトにExcelで開く……。しかし実際こういうのでいいんだよこういうので感あります。Excelでクエリ書く系の統合は面倒くさい(実際アレはダルいのでない)。いちいちCSVに落として開くのは面倒くさすぎる。LINQPadでクエリ書く、結果がExcelで見れる。あとはピボットテーブルなりで好きに分析できる。そう、そういうことなんですよ、これなんですよ #とは

入れ方

ExplorerのAdd Connection→View More Drivers からLINQ to BigQueryを探して、clickでインストールできます。簡単。

image

かなり上の方のいい位置に入れてもらいました!

using static

BigQueryの関数はLINQ to BigQueryではBqFunc以下に押し込める形をとっていますが、C# 6.0から(Javaのように)静的メソッドのインポートが可能になりました。また、LINQPad 5でもスクリプトのバックエンドがRoslynになり、C# 6.0にフル対応しています。LINQ to BigQueryのDriverでは、LINQPad 5以上に読み込ませた場合のみ、using static BigQuery.Linq.BqFunc が自動インポートされます。

これにより、クエリを書いた際の見た目がより自然に、というかウザったいBqFuncが完全に消え去りました!関数名を覚えていない、ウロ覚えの時はBqFunc.を押して探せるし

image

慣れきった関数なら、直接書くことができる。完璧。

How to make LINQPad Driver

難しいようで難しくないようで難しいです。しっかりしたドキュメントとサンプルが付属しているので、スタートはそれなりにスムーズに行けるかと思います。一つ、大事なのはプラグイン開発だからってデバッグ環境に妥協しないでください。ふつーの開発と同じように、F5でVisual Studioが立ち上がってすぐにブレークポイント貼ってステップ実行できる環境を築きましょう。細かいハマりどころが多いので、それ出来ないと挫けます。逆に出来てれば、あとは気合、かな……?細かいやり方はここに書くには余白が(以下略

変わったハマりどころとしては、例えば別々に呼ばれるメソッド間で変数渡したいなー、と思ってprivate fieldに置くと、そもそも都度頻繁にコンストラクタが呼ばれて生成されなおすので、共有できない。なるほど、じゃあせめてstatic変数だったらどうだろうか?というと、LINQPadの内部の実行環境の都合上、AppDomainがガンガン切られて飛んで来るので、static fieldすら消える!マジか!なるほどねー厳しいねー、などなど。

ちなみに動的なアセンブリ生成ではCodeDomのCSharpCodeProviderを利用しています。つい先月、Metaprogramming Universe in C# - 実例に見るILからRoslynまでの活用例でCodeDomはオワコン、使わないとか言ってたくせに!舌の根も乾かぬうちに自分で使うことになるとは思わなかった!

まとめ

社内でのBigQuery活用法として、定形クエリのダッシュボードはDomoにより可視化、アドホックなクエリはLINQPad + LINQ to BigQueryによりクエリを色々書いたり、そのままExcelに送り込んで(LINQPadはデスクトップアプリなので、DumpToExcel()メソッドとかを作ることによりシームレスに結果セットをExcelに投げ込んだりできるのも強い)PowerPivotでこねくり回したり、などをしてます。とはいえ、今までは事前にスキーマに対応するクラスを生成して保存しておかなければならないという面倒くささがあったので、イマイチ活用しきれてなかったのも事実。実際、私自身ですらBigQueryの公式ウェブコンソールでクエリ叩いたりが多かったですし。それが、今回のLINQPad Driverにより圧倒的に利便性が上がった(というか前のがもはや原始時代に見える)ので、使える度合いが桁違いに上がったんじゃないかなー、と思います。

デスクトップGUIクライアントの便利さは、例えばMySQLだったらウェブでphpMyAdminよりもHeidiSQLやMySQL Workbenchのほうが100億倍便利なわけでして、良いところ沢山あるんですよね。BigQuery関連だとCloud DataLabなんかもちょうど出ましたが、ウェブとデスクトップ、それぞれ良さがあるので、ここはうまく使い分けていきたいところです。

最近のBigQueryのアップデートへの追随だと、新メソッドは全部実装が完了してます。また、GroupByへのRollupなど文法の追加もOK。ただ、大きな目玉であるUDF(User Defined Function)への対応がまだです。別にそんな難しくもないんですが、APIの馴染ませ方どうしようかな、とか思ってる間にLINQPad Driverの作成に時間喰われたので、対応入れるのは近いうちの次回ということで。

同期(風)コードと対比させたUnity+UniRxで非同期を扱う場合のパターン集

UniRxのGitHubのStar数が500行きました!

image

今のところGitHub上でのUnity + C#でスター順の検索だと、世界5位です。おおー。更に上を狙いたいところですね。最近はちょっと更新が滞っていますが、ネタはあるのでより完成度を高めたい。(滞った理由は、PhotonWireとか色々他のところに手を出していたため……)

さて、本題。イベント結合に使う際はあてはまりませんが、Rx(UniRx)を非同期(長さ1のIOservableシーケンス)として扱う場合、それなりに癖があります。とはいえ、基本的には同期(或いはyield return)で書いていた際と、1:1で対比できるパターン化した形で概ね対応できるので、そのためのチートシートを考えてみました。コード例はC# 5.0のasync/awaitで出しますが、同期コード or IEnumeratorと同じように思ってもらえればいいです。例えば

public void Sync()
{
    /* before action */
    Method();
    /* after action */
}

public IEnumerator IEnumerator()
{
    /* before action */
    yield return StartCoroutine(Method());
    /* after action */
}

public async Task Task()
{
    /* before action */
    await MethodAsync();
    /* after action */
}

みたいな感じです、awaitに馴染みのない人も、なんとなくイメージしながら眺めてみてもらえると嬉しいです。

非同期汚染

コード例の前に非同期汚染、或いは非同期の伝搬について。まぁ、あんまし汚染という言い方は好きじゃないのですが、基本的に非同期、つまりTaskでもFutureでもPromiseでもIObservableでも、は、下層から上層まで伝搬していきます。メソッドが非同期であるなら戻り値はIObservableであり、そのIObservableを呼ぶメソッドもまた自然と非同期でなければならないので、IObservableになる、と。何故非同期の連鎖でなければならないのか。消費(Subscribe)してしまうと、その瞬間Fire and Forgetになってしまい、戻りを待ったりキャンセルしたりなどの別の操作が行えなくなってしまうからです。別にFire and Forgetしたければ、呼び元がそれを選択(Subscribeして放置)すればいいわけで、呼ばれる側が決定することではない。

もちろん、最終的にはどこかの層で消費(Subscribe)しなければならないので、そこで伝搬は止まるのですけれど、それは、基本的には上層であればあるほどよいということですね。どこが上層やねんって話はあるかもしれませんが、ユーザーインタラクションに近かったり、MonoBehaviourのイベント層に近かったり、あたりがそうですかねー。あとは、ごく一部でしか使わないんだ!という確固たる思いがあれば、早い段階でSubscribeして伝搬を止めるのも策ではあります、その辺はケースバイケースで。

非同期の伝搬に都合の良いメソッドが現状のUniRxには足りてません。実は!というわけで、次期バージョンではForEachAsyncというものを足したいのですが、それまでは以下のものをコピペって代用してください。挙動的にはシーケンスを消費して長さ1のIObservable[Unit]を返すもので、元シーケンスが非同期(長さ1)ならDoやSelectと、概ね一緒です。

// 次期バージョンに入るので、それまでの代用ということで。
// 元シーケンスが非同期なら .Select(x => { /* action(); */ return Unit.Default; }) とほぼ同様
namespace UniRx
{
    public static class UniRxExtensions
    {
        public static IObservable<Unit> ForEachAsync<T>(this IObservable<T> source, Action<T> onNext)
        {
            return Observable.Create<Unit>(observer =>
            {
                return source.Subscribe(x =>
                {
                    try
                    {
                        onNext(x);
                    }
                    catch (Exception ex)
                    {
                        observer.OnError(ex);
                        return;
                    }
                }, observer.OnError, () =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
            });
        }
    }
}

また、副作用(外の変数への代入など)に関しては、あまり気にしないほうが吉です。いや、Rxのパイプラインに押し込めたほうが美しくはあるんですが、それがオブジェクトであるなら、副作用かけてフィールド変数を変えたり、ReactivePropertyに結果を伝えたりとかは、あって然りかな、と。考える際には「もしこれが同期コードだったらどうなのか」を意識したほうがいいかもしれません、同期コードで自然なら、別にRxでそれを行っても、構わないのです。とはいえ、以下に紹介するコードは全部、副作用大前提みたいな説明なので、それはそれで若干の狂気でもありますが、その辺は慣れてきてからでよいかと。

戻り値のない場合

public async Task Demo1_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action */
}

public IObservable<Unit> Demo1_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .ForEachAsync(_ =>
        {
            /* after action */
        });
}

メソッドに戻り値がない場合は、awaitの位置にForEachAsyncで、その中にactionを書く形になります。RxにおいてはIObservable[Unit]を戻り値のないことの表明として使います。

内部に複数の非同期がある場合

public async Task Demo2_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action 1 */
    var y = await Task.Factory.StartNew(() => 200);
    /* after action 2 */
}

public IObservable<Unit> Demo2_IO_1Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */

            return Observable.Start(() => 200);
        })
        .ForEachAsync(y =>
        {
            /* after action 2 */
        });
}

awaitの位置にSelectManyを置くことで繋げることができます。最後の消費だけForEachAsyncで。

パイプライン中に複数の値を伝搬したい場合

public IObservable<Unit> Demo2_IO_2Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            return Observable.Start(() => 200);
        }, (x, y) => new { x, y }) // transport argument to next chain
        .ForEachAsync(o =>
        {
            /* after action 2 */
            // { o.x, o,y } 
        });
}

public IObservable<Unit> Demo2_IO_2_2Async()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            var z = SyncMethod();
            return Observable.Start(() => 200).Select(y => new { x, y, z });
        })
        .ForEachAsync(o =>
        {
            /* after action 2 */
            // { o.x, o,y, o.z } 
        });
}

同期コードでは、そのスコープ中の全ての値が使えるわけですが、Rxのメソッドチェーンでは次のパイプラインに送り込める値は一つしかありません。というわけで、匿名型(もしくはUniRx.Tuple)を使って、次のパイプラインへは値をまとめて上げる必要があります。SelectManyには第二引数があり、それにより前の値と次の値をまとめることができます。また、SelectMany内部で作った値を送り込みたい場合は、戻り値のところでSelectを使ってスコープ内でキャプチャして返してあげればいいでしょう。(匿名型、Tupleともにclassなので、気になる場合はstructの入れ物を用意してもいいかもしれない、何か箱を作って運搬しなきゃいけないのは残念ながら仕様です)

非同期が連鎖する場合

public IObservable<Unit> Demo2_IO_2_MoreChainAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .SelectMany(x =>
        {
            /* after action 1 */
            return Observable.Start(() => 200);
        }, (x, y) => new { x, y })
        .SelectMany(o =>
        {
            /* after action 2 */
            return Observable.Start(() => 300);
        }, (o, z) => new { o.x, o.y, z }) // re-construct self
        .ForEachAsync(o =>
        {
            /* after action 3 */
            // { o.x, o,y, o.z } 
        });
}

SelectManyの連打になります。また、伝搬する値は自分で分解して付け直してあげる必要があります、これは面倒くさいですね!この辺はクエリ構文を使った場合、Transparent Identifierという仕組みで自動的にコンパイラが行うのですが(An Internal of LINQ to Objectsの35P、Rxでクエリ構文は結構頻繁にクエリ構文の範疇を逸脱するのと、副作用をパイプライン途中に書けないためあまり使い勝手は良くないので、面倒くさいながら手作業再構築を薦めます。

戻り値を返す場合

public async Task<int> Demo3_TaskAsync()
{
    /* before action */
    var x = await Task.Factory.StartNew(() => 100);
    /* after action */
    return x; // return value
}

public IObservable<int> Demo3_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .Select(x =>
        {
            /* after action */
            return x; // return value
        });
}

ForEachAsyncではなく、Selectを使っていきましょう。戻り値の型が同一で副作用だけ起こしたいならDoでも構わないのですが、まぁどっちでもいいです。また、awaitが複数になる場合は、SelectManyになります。そのうえでSelectManyのままreturnするか、最後に再びSelect(もしくはDo)を使うかどうかは、状況次第、かな。

例外をキャッチ

public async Task Demo4_TaskAsync()
{
    /* before action */
    try
    {
        var x = await Task.Factory.StartNew(() => 100);
    }
    catch (Exception ex)
    {
        /* onerror action */
        throw;
    }

    /* after action */
}

public IObservable<Unit> Demo4_IOAsync()
{
    /* before action */
    return Observable.Start(() => 100)
        .Catch((Exception ex) =>
        {
            /* onerror action */
            return Observable.Throw<int>(ex);
        })
        .ForEachAsync(x =>
        {
            /* after action */
        });
}

これはCatchで賄えます。なお、Catchメソッドを使う際は、Catch<T>で例外の型を指定するよりも、ラムダ式の引数側で例外の型を書いたほうが書きやすいです(そうしたほうが型推論の関係上、ソースシーケンスの型を書かなくて済むため)。Catchの戻り値では再スローをObservable.Throw、握りつぶしをObservable.Return/Emptyで表現可能です。

Finally

public async Task Demo5_TaskAsync()
{
    /* before action(1) */
    try
    {
        var x = await Task.Factory.StartNew(() => 100);
    }
    finally
    {
        /* finally action(2) */
    }

    /* after action(3) */
}

// not equivant try-finally
public IObservable<Unit> Demo5_IO_PseudoAsync()
{
    /* before action(1) */
    return Observable.Start(() => 100)
        .Finally(() =>
        {
            /* finally action(3) */
        })
        .ForEachAsync(x =>
        {
            /* after action(2) */
        });
}

public IObservable<Unit> Demo5_IO_CorrectLightweightButIsNotDryAsync()
{
    /* before action(1) */
    return Observable.Start(() => 100)
        .Do(_ => { /* finally action(2) */}, _ => {/* same finally action(2) */})
        .ForEachAsync(x =>
        {
            /* after action(3) */
        });
}

Finallyに関しては、実は同じに扱える表現がありません!RxのFinallyはパイプラインの終了時の実行なので、実行順序がベタtry-finallyで書いた時と異なるんですよねえ。いちおう、DoでOnNextとOnErrorのところに同じコードを書くことでそれっぽい表現は可能ではありますが……。

並列処理

public async Task ParallelAsync()
{
    var a = Task.Factory.StartNew(() => 100);
    var b = Task.Factory.StartNew(() => 200);
    var c = Task.Factory.StartNew(() => 300);
    
    var xs = await Task.WhenAll(a, b, c);
    /* after action */
}


public IObservable<Unit> ParallelIO()
{
    var a = Observable.Start(() => 100);
    var b = Observable.Start(() => 200);
    var c = Observable.Start(() => 300);
    
    return Observable.WhenAll(a, b, c)
        .ForEachAsync(xs =>
        {
            /* after action */
        });
}

並列処理は非同期固有の実行ですが、WhenAllでドバッとまとめるというのが基本方針。

タイムアウト

public async Task TimeoutAsync(TimeSpan timeout)
{
    var task = Task.Factory.StartNew(() => 100);    
    var delay = Task.Delay(timeout);
    if (await Task.WhenAny(task, delay) == delay)
    {
        /* timeout action */
        throw new TimeoutException();
    }
    /* after action */
}


public IObservable<Unit> TimeoutIO(TimeSpan timeout)
{
    return Observable.Start(() => 100)
        .Timeout(timeout)
        .Catch((TimeoutException ex) =>
        {
            /* timeout action */
            return Observable.Throw<int>(ex);
        })
        .ForEachAsync(x =>
        {
            /* after action */
        });
}

タイマウトも非同期固有の処理。async/awaitの場合、特有のイディオムがあります。UniRxの場合はTimeoutだけでOK。特に例外時に処理するものもないなら、Catchは不要です。

IEnumeratorに戻す

public IObservable<Unit> Demo6_IE()
{
    /* before action(1) */
    return Observable.FromCoroutine(() => Demo6_IECore());
}

IEnumerator Demo6_IECore()
{
    // 戻り値の不要な場合
    yield return Observable.Start(() => 100).StartAsCoroutine();
    
    int ret;
    yield return Observable.Start(() => 100).StartAsCoroutine(x => ret = x);
}

SelectManyの連打が辛い場合、ふつーのコルーチンに戻して、更にIObservableでラップするという手段も取れます。まあ、この辺は複雑さ度合いで自由に!

だったらもはや最初から全部コルーチンでええやん!Rxでメソッドチェーン複雑だし見た目だけならコルーチン最強にスッキリじゃん!というのは正しい。正しいんですが、例外処理・戻り値・合成可能性・並列処理・マルチスレッド、などといった要素が欠落してるので、コルーチンはコルーチンで苦しいところが多いというか実際のところシンプルなケース以外では相当苦しいので、基本的にはRxのほうが有利です。

async/awaitは必要?

みたとーり、必要です。どう考えても。さすがにSelectManyの連打を同期コードほどスッキリと言い張るのは無理があるでしょう。とはいえまぁ、書いて書けないこともないので、今あるツールの中でベストを尽くすのまた良きかな、とは思いますねー。というわけで良き非同期生活を!UniRxでイベントを扱う際のパターン集は、またそのうちにでも!

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive