Rxにおける並行非同期実行とリソース処理の問題

非同期(Asynchronous)だの並列(Parallel)だの並行(Concurerrent)だの、よくわからない単語が並びます。ParallelがやりたければPLINQ使うべし、と思うわけですがそれはさておき、Rxを使うと、意図しても意図しなくても、並行な状態にはなります。そして、その意図していないという状態は危うい線を踏んでいるので、きちんと認識しておく必要があります。また、危ういと同時に、Rxはその並行をうまくコントロールするメソッドが揃っているので、覚えておくと世界が一気に広がります。

例えば、こういう非同期メソッドがあるとして。

IObservable<T> AsyncModoki<T>(T value, int second)
{
    // second秒後にvalueを返す非同期処理をシミュレート
    return Observable.Return(value)
        .Delay(TimeSpan.FromSeconds(second));
}

static void Main(string[] args)
{
    // 1,2,3,4という入力をすぐに送り込む
    new[] { 1, 2, 3, 4 }
        .ToObservable()
        .SelectMany(x => AsyncModoki(x, 3))
        .Subscribe(x => Console.Write(x + "->"));

    Console.ReadLine();
}

Microsoft Silverlight を入手

1~4は、全て同時にリクエストが開始されます。だから、3秒後に同時に結果が表示されます。

同時に実行が開始されているということは、非同期の結果が完了する時間にズレがある場合、結果が前後することがあります。実際、上のものも何度か実行すると毎回結果が変わると思います(Delayは(デフォルトだと)値をThreadPoolに投げて遅延させます。ThreadPoolに入った時点で、順序の保証が消滅する)。というわけで、基本的にSelectManyを使った場合1:1で渡していくわけではなければ、順序は壊れると考えてください。さて、それだと困る場合もあるのではと思いますので、結果の順序を制御する方法が幾つかあります。

Switch

Switchは実に有意義なメソッドで、分かると、SelectMany以上に多用することが多くなるのではと思います。

clickEventObservable // クリック毎に
    .Select(x => AsyncModoki(x, 1)) // 何らかの非同期処理をするとする
    .Switch() // IObservable<IObservable<T>>の状態なので、Switch
    .Subscribe(Console.WriteLine);

Microsoft Silverlight を入手

クリックすると1秒遅延(非同期処理でもしていると考えてください)して、値が表示されます。しかし、1秒以内に次の値がクリックされた場合はキャンセルされ、表示しません。

つまり、最新の値だけを返すことを保証します。それ以前のものはキャンセル(Disposeが呼ばれる)されます。どういう時に使うかというと、例えばインクリメンタルサーチ。L, LI, LIN, LINQと入力が変わる度に非同期リクエストを発生させますが、欲しい結果は最後の一件のみで、次のキー入力があった場合は以前のものはキャンセルして欲しい。キャンセルはともかく、非同期実行だと結果が前後してしまうことだってあります。LINQと入力したのにLIの結果一覧が表示されてしまったら困る。そんな場合に、まさに、うってつけです。そして存外、こういったシチュエーションは多いのではないかと思われます。例えば私の以前作ったUtakotohaというWP7用歌詞表示アプリケーションも、曲のスキップに応じて最新のものだけを表示するために、Switchを利用しました。(コードが激しく酷いのと機能貧弱っぷりなので、そろそろ書き直したい)

Merge/Concat

Switch以外にも色々あります。

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .SelectMany(x => AsyncModoki(x, 1)) // 全て並行実行(最初の例です)
    .Subscribe(x => Console.Write(x + "->"));

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1)) // IO<IO<T>>
    .Merge() // こちらも全て並行実行、SelectMany(xs => xs)と同じ
    .Subscribe(x => Console.Write(x + "->"));

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Merge(2) // 2件ずつ並行実行する(並行実行数の指定が可能)
    .Subscribe(x => Console.Write(x + "->"));


new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki(x, 1))
    .Concat() // 1件ずつ実行する(Merge(1)と同じ)
    .Subscribe(x => Console.Write(x + "->"));

Microsoft Silverlight を入手

ネストはSelectManyで一気に崩してしまうケースが一般的でしょうけれど、IObservable<IObservable<T>>といったネストした状態にすると、選択肢がSwitchもそうですが、更に、MergeとConcatを選択することができます。ちなみに、このintで並行実行数が指定可能なMergeはWP7同梱版のRxには存在しません。残念。(もう一つ余談ですが、SelectManyはRx内部ではSelect(selector).Merge()という実装になっていたりします)

実行タイミングの問題

上のSilverlight、Merge2とMerge2Exの二つを用意しましたが、Merge2Exのほうは4つ同時に表示されるのが確認出来るはずです。コードはほぼ同一なのですが、AsyncModokiを似たようで別なものに差し替えました。

// Merge(2):Ex
new[] { 1, 2, 3, 4 }
    .ToObservable()
    .Select(x => AsyncModoki2(x, 1)) // これが差分
    .Merge(2)
    .Subscribe(x => Console.Write(x + "->"));

// スレッドプール上で非同期実行(結果は指定秒数後に返る)のシミュレート
// second秒後にネットワーク問い合わせが返る、的なものをイメージしてみてください
static IObservable<T> AsyncModoki2<T>(T value, int second)
{
    var subject = new AsyncSubject<T>();

    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(second)); // 指定秒数待機
        subject.OnNext(value);
        subject.OnCompleted(); // 完了(2つでワンセット)
    });

    return subject; // これ自体はすぐに返す(FromAsyncPatternの中身はこんな感じ)
}

このAsyncModoki2は、このメソッドを通ると即座にThreadPoolに送り込んで「実行」しています。Subscribeされるかどうかとは関係なく、Subscribeの「前に」。対してAsyncModokiはSubscribeされないと実行が開始されません。同じようで違う、この二つの状態をRxでは「Hot」と「Cold」と呼んで区別しています。HotはSubscribeとは関係なく動いているもの、イベントなんかはそうですね。ColdはSubscribeされて初めて動き出すもの、Observable.ReturnであったりRangeであったりと、Rxからの生成子の場合は、こちらのパターンが多いです。

実はFromAsyncPatternはHotなので、Subscribeとは関係なく即座に(といっても戻り値はFuncなのでInvokeしたら、ですが)非同期実行が開始されたりします。これは、あまり都合が良くなく(例えば上の例で見たように、MergeはSubscribeのタイミングによって実行数をコントロールしている)、Coldに変換したほうが扱いやすいです。そのためのメソッドがDefer。

static IObservable<WebResponse> AsyncModoki3<T>(WebRequest req)
{
    return Observable.Defer(()=>
        Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse,req.EndGetResponse)());
}

こちらのほうが、大抵の利用シーンにはマッチするかと思われます。

キャンセル時のリソース処理の問題

Switchは実に有意義なのですが、それの行っていることは、次の値を検知すると前の値をキャンセルする、ということです。普段はあまりキャンセルはしないと思うのですが、Switch内部では大量のキャンセルが発生しています。さて、どのような問題が発生するか、というと、例えば……。

using System;
using System.Net;
using System.Reactive.Linq;

class Program
{
    static void Main(string[] args)
    {
        // ネットワークの最大接続数。通常、デフォルトは2になっているはず。
        ServicePointManager.DefaultConnectionLimit = 2;

        // テキストボックスのTextChangedイベントをイメージした、インクリメンタルサーチで来る文字列群
        new[] { "w", "wi", "wik", "wiki", "wikip", "wikipe", "wikiped", "wikipedi", "wikipedia" }
            .ToObservable()
            .Select((word, id) =>
            {
                // wikipediaのAPIにリクエスト飛ばす
                var url = "http://en.wikipedia.org/w/api.php?action=opensearch&search=" + word + "&format=xml";
                var req = (HttpWebRequest)WebRequest.Create(url);
                req.UserAgent = "test";

                return Observable.FromAsyncPattern<WebResponse>((ac, state) =>
                    {
                        Console.WriteLine("ASYNC START:" + id);
                        return req.BeginGetResponse(ac, state);
                    }, ar =>
                    {
                        Console.WriteLine("ASYNC END:" + id);
                        return req.EndGetResponse(ar);
                    })()
                    .Select(res =>
                    {
                        using (res) // ここのセクションが呼ばれることはない
                        {
                            Console.WriteLine("CALLED NEXT:" + id);
                            return "response string:" + id;
                        }
                    });
            })
            .Switch()
            .ForEach(Console.WriteLine); // 終了を待機する形でのSubscribe
    }
}

// ConsoleApplication用のコードですが、是非実行してみてください。結果は以下のようになります。

ASYNC START:0
ASYNC START:1
ASYNC START:2
ASYNC START:3
ASYNC START:4
ASYNC START:5
ASYNC START:6
ASYNC START:7
ASYNC START:8
ASYNC END:0
ASYNC END:1
// そしてフリーズ...

これは、フリーズします。何故かというと、まず8件の非同期処理が一斉に開始されます(ASYNC STARTの表示)。一斉に開始はされますが、ネットワークの最大接続数は2なので、それ以外のものは内部的には待機されています。そして、Switchによる切り替えは最新のものだけを通すようにするため、7件はキャンセルされます。その後、最初の二件分のネットワークリクエストが終了し(ASYNC ENDの表示)、キャンセルされているためメソッドチェーンの続きであるSelectは呼ばれません。そして、フリーズ。

何故フリーズしてしまうかというと、EndGetResponseで取得した最初の二件のWebResponseが解放されていないためです。キャンセルが呼ばれなければ、Selectを通り、そこでusingにより利用+解放されるのですが、そのセクションを通らなければ何の意味がありません。使われることなく虚空に放り出されたWebResponseが、永遠にネットワーク接続を握ったままになってしまっています。

当然、大問題。

Switchを諦めてSelectMany(全件キャンセルせずに並行実行、どうせネットワーク自体の最大接続数で制限かかっているし)というのも手ではあります。大体の場合は結果は問題ないでしょう。けれど、Switchの利点は何でしたっけ、と。結果が前後しないことです。LINQを検索しようとしていたのに、検索結果が前後したせいでLINQ→LINの順番に結果が得られた結果、表示されるのがLINの結果では困ってしまいます。Switchなら、後に実行したものが必ず最後に来ると保証されるので、そのようなことにはなりません。反面、SelectManyは並行実行のため、前後する可能性が出てきます。Switchはこの例で挙げたような、インクリメンタルサーチのようなものと相性がとても良いんですね。

ではどうするか?

WebResponseのDispose(Close)を呼べれば解決するので、FromAsyncPatternのEnd部分に少し細工を加えてやる、ということが考えられます。

// こんなFromAsyncPatternを用意して
public static IObservable<TResult> SafeFromAsyncPattern<TResult>(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TResult> end)
    where TResult : IDisposable
{
    // WP7版ではCreateWithDisposableで(この辺の細かな差異が割とウザい)
    return Observable.Create<TResult>(observer =>
    {
        var disposable = new BooleanDisposable();

        Observable.FromAsyncPattern<TResult>(begin, ar =>
        {
            var result = end(ar);
            if (disposable.IsDisposed) result.Dispose(); // キャンセルされてたらDispose
            return result;
        })().Subscribe(observer);

        return disposable; // Disposeが呼ばれるとIsDisposedがtrueになる
    });
}

// こんな風に使うとか
public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest req)
{
    return ObservableEx.SafeFromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}

これにより、キャンセルされたかどうかをEnd部分で判定することが出来ます。よってEnd時にキャンセルされていたらリソースをDisposeしてしまう(ここでreturnしたオブジェクトは、チェーンは切れているので別に使われることなく虚空を彷徨うだけ)。これにより、FromAsyncPatternがリソースを返し、かつ、いつキャンセルされても問題なくなります。

他にも色々なアプローチが考えられます。CompositeDisposable/MutableDisposable/SingleAssignDisposableなどを使い、Disposeが呼ばれたら同時に管理下のリソースをDisposeしてしまう、といった手法。これは、リソースのDisposeされる瞬間が逆にコントロールしにくくなって、例えばWebResponseですと、その後のStreamを呼んでる最中にWebResponseがDisposeされてしまうなどの自体も起こりうるので、少し厄介に思えました。。リソースを後続に渡すまでは責任を持つ。それ以降はノータッチなので好きにやらせる、利用も解放も後続側が責任を。その方が自然だし、素直な動きになるので、いいかな。

他には、キャンセルを伝搬しないようなメソッドを作り、Disposeが呼ばれてもリソースを受け取れるようにし、後続でリソースをDisposeする、などの手段も考えられます。そうすればSafeFromAsyncPatternなどといった、独自のFromAsyncPatternを作る必要はなく、全てに適用できて汎用性は高いのですが、チェーンでの保証が途切れてしまうのが若干微妙かな、と……。この辺は悩ましいところです。

そもそもWebRequestなら、DisposeでAbortしてしまったほうが、キャンセルらしくていいかもしれない。

public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
{
    return Observable.Create<WebResponse>(observer =>
    {
        Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse,
            ar =>
            {
                try
                {
                    return request.EndGetResponse(ar); // Abort後の場合は例外発生
                }
                catch (WebException ex)
                {
                    if (ex.Status == WebExceptionStatus.RequestCanceled) return null;
                    throw; // キャンセル時以外は再スロー
                }
            })()
            .Subscribe(observer);
        return () => request.Abort(); // Dispose時にこのActionが呼ばれる
    });
}

Disposeが呼ばれるとwebRequest.Abortが呼ばれます。その後にEndGetResponseを呼ぶとRequestCanceledなWebExceptionが発生するので、キャンセルされていたならnullを(どちらにせよ、Dispose済みなので、ここでreturnしたものは次のメソッドチェーンで使われることはない)、そうでない例外ならば再スローを、という方針です。悪くなさそうですが、どうでしょうか。私的にはこれを採用するのがベストかなー、と考え中です。

まとめ

SwitchやMergeなどで、従来扱いにくかった並行処理時の非同期のコントロールが簡単になりました。単純に一本の非同期をSelectManyで摩り替えるだけもアリですけれど、せっかくの多機能なのだから、並行にリクエストなどを飛ばして、より速いアプリケーション作りを目指してもいいかもしれません。同期リクエストをTask.Factory.StartNewで包んで振り回すよりかは、ずっと楽です。また、現在行われているMSのイベントBUILDで発表されたWinRTなどは、完全に非同期主体です。C#5.0でasync/awaitが入り、非同期がより扱いやすくなることで、それに併せてModelの有り様も、同期から非同期へと変わっていき、それにあわせてVMなどの書き方も変わってくるのではかと思われます。

ただ、リソースの問題にだけは気をつけて!上で挙げた問題は、本質的にはFromAsyncPatternに限らず、リソース処理が引き離されている場合の全てで該当します。リソースを扱うのは難しい。とはいえ、全面的に問題になるのは、このFromAsyncPatternぐらいな気はします。Observable.Usingなども用意されているので、不用意にリソースをチェーン間で渡したりしなければ原則的には起こらない。けれど、そのFromAsyncPatternこそがリソースを扱うシチュエーションで最も使われるものなんですよね、とほほほ。

キャンセル(Dispose)を不用意に呼ばなければ問題は起こらないといえば起こらないんですが(そのため、不適切に書いてしまっていても、多くのケースで問題が表面化することはないでしょう)、Switchのようなアプローチが取れなくなるのがどうにも。現状だと、とりあえず気をつけましょう、としか言いようがないので、気をつけましょう。もし何かうまい具合に動かないなあ、と思ったら、この辺を疑ってみると良いかもしれません。

その辺難しいなあ、という場合は、近いうちに私の出すRx拡張ライブラリを使いましょう。特に考えなくても済むよう、色々配慮してあります。いつ出るの?というと、はい、最近ゴリゴリと書いてますんで(ブログがちょっと放置気味だった程度には)、必ず近いうちに出します。

Re:FromEvent vs FromEventPattern

現在のRxで最大の分かりにくいポイントになっているFromEventとFromEventPattern。以前にRxでのイベント変換まとめ - FromEvent vs FromEventPatternとして軽くまとめましたが、改めて、詳細に考えてみたいと思います。なお、ここでいうFromEventPatternはWP7版のRxではFromEventを指しています(ここも分かりにくいポイントです)。そして、ここでいうFromEventはWP7版のRxには未搭載です、あらあらかしこ。

ネタ元ですが、銀の光と藍い空: Silverlight 5 の新機能その3 番外編 DoubleClickTrigger をRxっぽくしてみたを拝見して、FromEventに関して実行時に例外が出るとのことなので、その部分の説明を書こうかと。最初コメント欄に書いたのですが、少しコメントにトチってしまったので、自分のブログに書かせていただきます、どうもすみません……。

FromEventとFromEventPatternの最大の違いは、FromEventがAction<EventArgs>を対象にしていて、FromEventPatternはAction<object, EventArgs>を対象にしているということです。Action<object, EventArgs>は、つまりEventHandler。ではAction<EventArgs>って何なんだよ、というと、通常は存在しません。というのもeventはデリゲートなら何でもアリということに仕様上はなっていますが、慣例としてobject sender, EventArgs eを引数に持つデリゲートを選択しているはずですから。

さて、デリゲート間には同じ引数の型・同じ戻り値の型を持っていても、型自体に互換性がないので(例えばEventHandlerとEventHandler<T>)、FromEventもFromEventPatternも、引数を3つ持つオーバーロードの第一引数は conversionという、デリゲートの型を変換するラムダ式を受け入れるようになっています。よって

Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(
    h => h.Invoke,
    h => AssociatedObject.MouseLeftButtonDown += h,
    h => AssociatedObject.MouseLeftButtonDown -= h);

これが、リフレクションなしで変換出来る形式になります。conversionが不要なオーバーロードもあるのですが(+=と-=を書くだけ)、それはリフレクションを使ってデリゲートの変換をしていて今一つ効率が悪いので、わざわざ+=, -=を書いているのだから、もう一手間かけて、 h => h.Invoke を書いておいたほうがお得です。(もし対象がEventHandler<T>の場合は事情が違ってconversionが不要なので+=と-=だけで済みます、この辺の事情の違いが面倒臭く混乱を招きがちなんですよね…… Rxチームには「便利そうだから」機能を追加する、とかやる前に、もう少し深く考えてくださいと苦情を言いたい)

h => h.Invoke だけで何故変換出来ているのかを詳しく説明します。これは正しく書くのならば

(EventHandler<MouseButtonEventArgs> h) => new MouseButtonEventHandler(h)

が正解です。(左側の型に関しては省略可能ですが、説明用には型を書いていたほうが分かりやすいので明記しておきます、また、この最初からEventHandler<T>であることが、対象がEventHandler<T>である場合はconversionが不要な理由になっています)。ただし、関数を直接渡すとコンパイラがデリゲートの型を変換してくれるため、h => h.Invokeを渡した場合は

h => new MouseButtonEventHandler(h.Invoke)

という風に内部的には自動で整形してくれます。そのため、new MouseButtonEventHandlerを書く手間が省けるということになっています。h と h.Invoke はやってることは完全一緒なのですけど、この辺はコンパイラの仕組みの都合に合わせるという感じで。むしろ仕組みの隙間をついたやり方といいましょうか。

では、FromEventなのですが、まず正しく変換出来る形を見ると

Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
    h => (sender, e) => h(e),
    h => AssociatedObject.MouseLeftButtonDown += h,
    h => AssociatedObject.MouseLeftButtonDown -= h);

です。もし第一引数を省いた場合はAction<EventArgs>を探してリフレクションをかけるようになっていて、そして、通常はそんなイベントを使うことはないので、十中八九例外が出るのではかと思われます(だからこういう混乱を招くだけのオーバーロードを入れるなとRxチームには苦情を言いたい)。

型まで明記すれば

Action<MouseButtonEventArgs> h => (object sender, MouseEventArgs e) => h(e)

となっているわけで、senderを捨ててMouseEventArgsだけを引数に渡す独自のconversionを渡しています。これですが、FromEventPatternであっても

h => (sender, e) => h(sender, e)

とも書けるので(つまるところ h.Invoke って何かといえば (sender, e) => h(sender,e) なのです)、それのsender抜きバージョンを渡しているということになります。

わかりづらい?例えば

.Subscribe(Console.WriteLine)
.Subscribe(x => Console.WriteLine(x))

この二つはやってること一緒なんですよ、ということですね。ラムダ式が入れ子になるとワケガワカラナイ度が加速されるので、私は関数型言語erにはなれないな、と思ったり思わなかったり。

まとめ

ただたんに使うにあたっては、こんなことは知ってる必要はなくh => h.Invoke と h => (sender, e) => h(e) を定型句だと思って暗記してもらうだけで十分です。はい。本来は、こういう部分はちゃんと隠蔽されてたほうがいいんですけれど、まあ、C#の限界としてはそうはいかないというとこですね(F#だとイベントがもう少し扱いやすいんですが)。

また、FromEventにせよFromEventPatternにせよFromAsyncPatternにせよ、実際に使うコードに直接書いてくにはノイズが多すぎるので、Rxでのイベント変換まとめ - FromEvent vs FromEventPatternで書いたように、拡張メソッドに隔離するのを私はお薦めしています。そうこうして裏側で地道に努力することでF#とC#の壁を縮める!とかなんとかかんとか。

ReactiveProperty : Rx + MVVMへの試み

Reactive Extensionsといったら非同期、じゃなくて、その前にイベントですよ!イベント!というわけで、随分手薄になっていたイベント周りの話を増強したいこの頃です。イベントと一口に言っても色々あります。UI(クリックやマウスムーブ)、センサー、変更通知(INotifyPropertyChanged)などなど。中でも一番よく使うのは、UI周りのイベントでしょう。

しかし、UIの持つTextChangedイベントだのから直接FromEventPatternで変換してしまったら、Viewと密接に結びついてしまってよろしくない。ここはMVVM的にやりましょう。でも、どうやって?

View(UI)が持つネイティブなイベントを、ViewModelの持つ更新通知付きのプロパティに変換します。これはバインディングにより可能です。そこはWPF/SLの仕組みに任せましょう。ということで、RxでUIに対してプログラミングするというのは、ViewModelの通知に対してプログラミングするという形になります。

テキストボックスの変更に反応して、1秒ディレイをかけた後に表示する、という簡単な例を(何の面白みもありません、すみません)

Microsoft Silverlight を入手

public class ToaruViewModel : INotifyPropertyChanged
{
    private string input;
    public string Input
    {
        get { return input; }
        set { input = value; RaiseEvent("Input"); }
    }

    private string output;
    public string Output
    {
        get { return output; }
        set { output = value; RaiseEvent("Output"); }
    }

    public ToaruViewModel()
    {
        Observable.FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                h => (sender, e) => h(e),
                h => this.PropertyChanged += h, h => this.PropertyChanged -= h)
            .Where(e => e.PropertyName == "Input") // Inputが更新されたら
            .Select(_ => Input) // Inputの値を
            .Delay(TimeSpan.FromSeconds(1)) // 1秒遅らせて
            .ObserveOnDispatcher() // Dispatcherで(Silverlightではこれ必要・WPFでは不要)
            .Subscribe(s => Output = "入力が1秒後に表示される:" + s); // Outputへ代入
    }

    // この辺は別途、ライブラリを使って持ってくるほうが良いかも
    public void RaiseEvent(string propertyName)
    {
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
    }

    public event PropertyChangedEventHandler PropertyChanged;
}

// xaml.csはInitializeだけ、xamlのバインディングは各プロパティへ当てるだけ。
// ただしSL/WP7はUpdateSourceTrigger=PropertyChangedに対応してないので別途Behaviorの適用が必要
// 詳しくは、最後にソース配布(WPF/SL/WP7全て含む)URLを置いているのでそちらを見てください

……実にダサい。はい。全くいけてないです。バインディング可能なのはプロパティなので、そういった中間レイヤへの中継が発生していて、冗長だし、美味しさがかなり損なわれています。わかりきったINotifyPropertyChangedのWhere, Selectは無駄そのもので。勿論、簡単にDelayを混ぜられるといった時間の扱いの容易さはRxならでは、ではあるのですけれど。

ReactiveProperty

中継が手間ならば、中間レイヤだけを抜き出してやればいい。通知処理を内包したIObservable<T>があれば解決する。というわけで、ReactivePropertyと名付けたものを作りました。それを使うと、こうなります。

public class SampleViewModel : INotifyPropertyChanged
{
    public ReactiveProperty<string> ReactiveIn { get; private set; }
    public ReactiveProperty<string> ReactiveOut { get; private set; }

    public SampleViewModel()
    {
        // UIから入力されるものはnewで作成、デフォルト値も同時に指定出来る。
        ReactiveIn = new ReactiveProperty<string>(_ => RaiseEvent("ReactiveIn"), "でふぉると");

        // UIへ出力するIO<T>はToReactivePropertyで、初期値での発火も自動的にされます。
        ReactiveOut = ReactiveIn
            .Delay(TimeSpan.FromSeconds(1))
            .Select(s => "入力が1秒後に表示される:" + s)
            .ToReactiveProperty(_ => RaiseEvent("ReactiveOut"));
    }

    // 通常は、他のMVVMフレームワークなりを使い、それの更新通知システムを利用するといいでしょう
    // Rxを使ったからって、決してMVVMフレームワークと競合するわけではなく、むしろ協調すると考えてください
    public void RaiseEvent(string propertyName)
    {
        var handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
    }

    public event PropertyChangedEventHandler PropertyChanged;
}
// これはWPF版のもの
<Window x:Class="ReactiveProperty.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:l="clr-namespace:ReactiveProperty"
        Title="MainWindow" Height="350" Width="525">
    <Window.DataContext>
        <l:SampleViewModel />
    </Window.DataContext>
    <StackPanel>
        <TextBox Text="{Binding ReactiveIn.Value, UpdateSourceTrigger=PropertyChanged}" />
        <TextBlock Text="{Binding ReactiveOut.Value}" />
    </StackPanel>
</Window>

XAMLではPathに必ず.Valueまで指定します。これによりGetが求められれば最新の値を返し、値をSetされればPushするようになります。

今回はUI->ReactiveProperty->クエリ演算->ReactiveProperty->UIという風に戻してやりましたが、勿論、UIからの入力をModelに流してそれで止めてもいいし、Modelからの値をUIに流すだけでもいいし、トリガーはタイマーであってもいいし、その辺は完全に自由です。普通の通知プロパティと何も変わりません。また普通のプロパティとして使いたい時は.Valueで値を取り出す/セットできます。

かなりシンプルに仕上がります。通知付きプロパティは、本質的に値の変更毎に通知される無限長のIObservable<T>と見なせるので、そのことにより表現がより自然になっています。書き味も、リアクティブプログラミング(といわれてパッと浮かばれる値が自動更新されるという奴)にかなり近い感じの風合い。XAMLでのバインドも簡単ですし、VMの実装も自動実装プロパティだけで書けるので記述が楽チン。

そして、Rxを使うことによる最大の利点である、他のイベント(他の変更通知プロパティ)と合成しやすかったり、時間が扱いやすくなったり、非同期と混ぜても同じように扱えたり、スレッドの切り替えが簡単であったり、などを最大限に甘受できます。VMとして独立している、かつ全てがRxに乗っているため、単体テストも非常に作成しやすい状態です(時間軸を扱う処理のテストは通常難しいのですが、Rxの場合は自分で時間をコントロール可能なSchedulerを中間に挟むと、好きなように時間を進められるようになります、イベントのテストも、この状態ならばプロパティを変更するだけで生成されますし)。また、決して他のMVVMフレームワークと競合が起こるわけではない(多分……)のも見逃せない利点です。

単純な例なのでModelがありませんが、まあこんな感じ?(それと今はコマンドがないので単純なデータバインドのみの図です)。Modelへのアクセスは通常恐らくRx:Query内で行い、Modelの形態は色々だと思いますが、通信してデータを処理して返す、みたいなものはRxになっているとVMのReactiveProperty側での合成処理が容易なので、非同期にしてIObservable<T>で返すと良いのではかと思います。自身が通知を持つReactivePropertyになっていてもいいですね。そうなると、コードのほとんどがLINQになるという素敵な夢が見れる気がしますが気のせいです。

実装

ReactivePropertyの実装はこんな感じです。ご自由にコピペって使ってみてください。

using System;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive.Linq;
using System.Reactive.Subjects;
#endif

public class ReactiveProperty<T> : IObservable<T>, IDisposable
{
    T latestValue;
    IObservable<T> source;
    Subject<T> anotherTrigger = new Subject<T>();
    IDisposable sourceDisposable;

    public ReactiveProperty(Action<T> propertyChanged, T initialValue = default(T))
        : this(Observable.Never<T>(), propertyChanged, initialValue)
    { }

    public ReactiveProperty(IObservable<T> source, Action<T> propertyChanged, T initialValue = default(T))
    {
        this.latestValue = initialValue;

        var merge = source.Merge(anotherTrigger)
            .DistinctUntilChanged()
            .Publish(initialValue);
        this.sourceDisposable = merge.Connect();

        // PropertyChangedの発火はUIスレッドで行うことにする
        // UIへの反映の際に、WPFでは問題ないが、SL/WP7ではUIスレッドから発行しないと例外が出るため
        merge.ObserveOnDispatcher().Subscribe(x =>
        {
            latestValue = x;
            propertyChanged(x);
        });

        this.source = merge;
    }

    public T Value
    {
        get
        {
            return latestValue;
        }
        set
        {
            latestValue = value;
            anotherTrigger.OnNext(value);
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(observer);
    }

    public void Dispose()
    {
        sourceDisposable.Dispose();
    }
}

// 拡張メソッド
public static class ObservableExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, Action<T> propertyChanged, T initialValue = default(T))
    {
        return new ReactiveProperty<T>(source, propertyChanged, initialValue);
    }
}

Valueで値の中継をしているという、それだけです。Publish(value)はBehaviorSubjectというものを使った分配で、必ず最新の値一つをキャッシュとして持っていて、Subscribeされると同時に、まずその値で通知してくれます。これにより「初期値での自動発火」が自然に行える、という仕組みになっています。また、プロパティの変更時に同値の場合は変更通知をしない、というよくあるほぼ必須処理も、ここでDistinctUntilChangedを挟んで行っています(オプションで選択制にしてもいいかもしれない)。

それReactiveUI?

ReactiveUIというRxを前提にしたMVVMフレームワークがあって、それに用意されているObservableAsPropertyHelperと、ReactivePropertyはかなり近いです(ということにプロトタイプ作ってから気づいた、ReactiveUIはこれまで名前は知ってたけど中身完全ノーチェックだったので)。ただ、機能的にはOAPHは双方向バインディングに対応していないので、ReactivePropertyのほうが上です。また、OAPHは使い勝手もあまり良くないし、名前がダサい(ObservableAsPropertyHelperは長すぎるし型名として宣言させるにはイマイチに思える……)などなどで、あまり気に入るものではなかったです。

ReactiveUIは全体的には軽く眺めた程度なのですが、今ひとつ私には合わない。ちょっと、いや、かなり気にいらない。なので、私としてはそのうち他のMVVMライブラリをベースに置いた上での拡張として、Rx用のUI周りライブラリを作りたい。独自に上から下まで面倒を見るフレームワーク、という指針は今一つに思えるので、Rxならではの特異な部分だけを、最初から他のMVVMフレームワークの拡張として用意していく、という方向性のほうが良いものが作れると思っています。素のままのRxでは辛いので、何かしらの中間層が必要なのは間違いないので。

次は、ReactiveCommandを!あー、あとReactiveCollectionも必要かしら。Validationとかも……。まあ、そういうところは普通に書けばいいんですよ、何も全部Rxでやる必要はないですからね。

まとめ

WPFのバインディングの美味しさをRxで更に美味しくする、ということでした。世の中的には弱参照が~などなどというお話もありますが、それには全然追いついてませんので、おいおいちかぢかそのうち。

今回のコードの全体(WPF/SL/WP7)はneuecc / ReactiveProperty /Bitbucketに置いてありますので、好きに見てください。例が単純すぎると美味しさもよくわからないので、もう少し複雑な例で、サンプル準備中なのでしばしお待ちを。

ところで9/15にいよいよRx本が出ます。

オライリーで出ているProgramming C#の著者と、ReactiveUIの作者(元Microsoft Office Labs、つい最近Githubに転職した模様)の共著です。私も買いますので、うーん、読書会とかやったら来てくれる方います?

linq.js LT資料

LTで簡単にlinq.jsの紹介をしましたので、その資料を。といっても、資料は全く使わないでLTの場では完全にデモ一本にしました。ええ、こういう場では、やっぱデモ優先のほうがいいかなー、と。資料は資料で、要素がきっちりまとまって紹介という感じなので、見てもらえればと思います。

スライドのテンプレは同じの使っていてそろそろ飽きたので、新しいのに変えたいところ。基本的にはテンプレのテーマまんまですが、やっぱ細かいところでスライドマスタの調整は必要なので、面倒くさー、と思ってしまい中々に気力が。むしろデザイン変更は一年に一回でいいかしらいいかしら?

そういえばどうでもよくないのですが、SlideshareをBlogに埋め込む時はlargeサイズを選んで欲しい。文字潰れてしまうもの、わざわざ小さいサイズで埋め込む必要はどこにもなくて。

文字列を先頭から見て同じところまで除去をlinq.jsとC#で解いてみた

JavaScript で「文字列を先頭から見て同じところまで除去」をやってみました。という記事を見て、「linq.js を使いたかったのですが使いどころがパッと思い浮かびませんでした」とのことなので、linq.js - LINQ for JavaScriptで答えてみます。お題の元はお題:文字列を先頭から見て同じところまで除去からです。解き方も色々あると思いますが、最長の一致する文字を見つけて、それを元に文字列を削除していく、という方法を取ることにしました。

function dropStartsSame(array)
{
    var seq = Enumerable.From(array);
    return Enumerable.From(seq.First())
        .Scan("$+$$")
        .TakeWhile(function (x) { return seq.All(function (y) { return y.indexOf(x) == 0 }) })
        .Insert(0, [""]) // 一つもマッチしなかった場合のため
        .TakeFromLast(1)
        .SelectMany(function (x) { return seq.Select(function (y) { return y.substring(x.length) }) });
}

dropStartsSame(["abcdef", "abc123"]).WriteLine();
dropStartsSame(["あいうえお", "あいさんさん", "あいどる"]).WriteLine();
dropStartsSame(["12345", "67890", "12abc"]).WriteLine();

はい、ワンライナーで書けました、って何だか意味不明ですね!まず、例えば"abcdef"から["a","ab","abc","abcd","abcde","abcdef"]を作ります。これはものすごく簡単で、Scanを使うだけです。

// ["a","ab","abc","abcd","abcde","abcdef"]
Enumerable.From("abcdef").Scan("$+$$")

素晴らしい!そうして比較のタネができたら、あとは全てのindexOfが0(先頭に一致する)の間だけ取得(TakeWhile)します。["abcdef","abc123"]だとシーケンスは["a","ab","abc"]に絞られます。必要なのは最長のもの一つだけなのでTakeFromLast(1)で最後のものだけを取得。もし一つもマッチしなかった場合は代わりに""が通るようにInsertで事前に先頭にさしてやってます。あとは、その"abc"を元にして文字列を置換したシーケンスを返してやるようにすればいい、というわけです、はい。

少し修正

SelectManyで繋げるのは悪趣味なので、ちょっと変えましょう。

function dropStartsSame(array)
{
    var seq = Enumerable.From(array);
    var pre = Enumerable.From(seq.First())
        .Scan("$+$$")
        .TakeWhile(function (x) { return seq.All(function (y) { return y.indexOf(x) == 0 }) })
        .LastOrDefault("");

    return seq.Select(function (x) { return x.substring(pre.length) });
}

変数を一つ置いてやるだけで随分とすっきり。無理に全部繋げるのはよくないね、という当たり前の話でした。

C# + Ix

C#とIxで書くとこうなるかな?基本的には同じです。(Ixって何?という人はneue cc - LINQ to Objects & Interactive Extensions & linq.js 全メソッド概説を参照ください)

static IEnumerable<string> DropStartsSame(params string[] args)
{
    var pre = args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .LastOrDefault() ?? "";
    return args.Select(x => x.Substring(pre.Length));
}

static void Main()
{
    var x = DropStartsSame("abcdef", "abc123").SequenceEqual(new[] { "def", "123" });
    var y = DropStartsSame("あいうえお", "あいさんさん", "あいどる").SequenceEqual(new[] { "うえお", "さんさん", "どる" });
    var z = DropStartsSame("12345", "67890", "12abc").SequenceEqual(new[] { "12345", "67890", "12abc" });

    Console.WriteLine(x == y == z == true);
}

Ixで使ってるのはScanだけですけれど。

Deferの使い道

ところで、上のコードは遅延評価なのか遅延評価でないのか、微妙な感じです。preの計算までは即時で、その後は遅延されています。まるごと遅延したい場合はIxのDeferというメソッドが使えます。

// Deferで生成を遅延する
static IEnumerable<string> DropStartsSame2(params string[] args)
{
    return EnumerableEx.Defer(() =>
    {
        var pre = args.First()
            .Scan("", (x, y) => x + y)
            .TakeWhile(x => args.All(y => y.StartsWith(x)))
            .LastOrDefault() ?? "";
        return args.Select(x => x.Substring(pre.Length));
    });
}

// もしくはyield returnを使ってしまうという手も私はよく使っていました
static IEnumerable<string> DropStartsSame3(params string[] args)
{
    var pre = args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .LastOrDefault() ?? "";
    var query = args.Select(x => x.Substring(pre.Length));
    
    foreach (var item in query) yield return item;
}

// 勿論、全部LINQで組んでしまってもOK
static IEnumerable<string> DropStartsSame4(params string[] args)
{
    return args.First()
        .Scan("", (x, y) => x + y)
        .TakeWhile(x => args.All(y => y.StartsWith(x)))
        .StartWith("") // linq.jsではInsert(0, [])でした
        .TakeLast(1) // linq.jsではTakeFromLastでした
        .SelectMany(x => args.Select(y => y.Substring(x.Length)));
}

私はIx以前はyield returnを結構よく使ってました。今は、Deferのほうが、例えば if(args == null) throw new ArgumentNullException(); とかがそのまま書けるのでDeferを選びたいかも。この辺の評価タイミングの話は前回、詳説Ix Share/Memoize/Publish編(もしくはyield returnの注意点)で書きました。

まとめ

というわけで、Scanの使い方でした。Scan可愛いよScan。ようするにAggregateの計算途中も列挙する版なわけなので、これ、標準クエリ演算子にも入って欲しかったなあ。結構使えるシーン多いです。

ああ、あとJavaScriptでもforなんて使いません(キリッ。linq.jsは真面目に普通に多機能なので遅い、じゃなくて、いや、それはまあ事実なんですが、便利には違いないです。他の普通のコレクションライブラリじゃ出来ないことも平然と出来ます。でもかわりに(ry

詳説Ix Share/Memoize/Publish編(もしくはyield returnの注意点)

LINQ to Objects & Interactive Extensions & linq.js 全メソッド概説でのIxのPublishの説明がおざなりだったので、一族について詳しく説明したいと思います(Ixの詳細・入手方法などは先の全メソッド概説のほうを参照ください)。ええ、これらはIBuffer<T>を返すという一族郎党だという共通点があります。なので、並べてみれば分かりやすい、かも?挙動を検証するためのコードは後で出しますので、ひとまず先に図をどうぞ。星がGetEnumerator、丸がMoveNext(+Current)、矢印線が二つ目の列挙子を取得したタイミングを表しています。

これだけじゃよく分かりません。と、いうわけで、少しだけ上の図を頭に入れて、以下の解説をどうぞ。

Memoize

Memoizeはメモ化。メモイズ。メモライズじゃないっぽいです。二度目三度目の列挙時は値をキャッシュから返すので、前段がリソースから取得したり複雑な計算処理をしていたりで重たい場合に有効、という使い道が考えられます。とりあえずEnumeratorを取ってきて、それでじっくり動きを観察してみましょう。

// ShareやMemoizeやPublishの戻り値の型はIBuffer<T>で、これはIDisposableです。
// もしリソースを抱えるものに適用するなら、usingしたほうがいいかも?
using (var m = Enumerable.Range(1, int.MaxValue)
    .Do(x => Console.WriteLine("spy:" + x)) // 元シーケンスが列挙される確認
    .Memoize())
{
    var e1 = m.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); // spy:1,spy:2,spy:3
    Console.WriteLine(e1.Current); // 3

    var e2 = m.GetEnumerator();
    e2.MoveNext(); // キャッシュから返されるので元シーケンスは不動
    Console.WriteLine(e2.Current); // 1

    e2.MoveNext(); e2.MoveNext(); e2.MoveNext(); // spy:4
    Console.WriteLine(e2.Current); // 4

    e1.MoveNext(); // 今度はe2が先行したので、こちらもキャッシュから
    Console.WriteLine(e1.Current); // 4
}

特に考える必要もなく分かりやすい内容です。オーバーロードにはreaderCountを受けるものがありますが、それの挙動は以下のようになります。

// Memoizeの第二引数はreaderCount(省略した場合は無限個)
using (var m = Enumerable.Range(1, int.MaxValue).Memoize(2))
{
    // readerCountはEnumeratorの取得可能数を表す
    var e1 = m.GetEnumerator();
    e1.MoveNext(); // MoveNextするとMemoizeに登録される
    var e2 = m.GetEnumerator();

    var e3 = m.GetEnumerator();
    e3.MoveNext();

    e2.MoveNext(); // 3つめはNGなので例外が飛んで来る
}

これを使うと、キャッシュから値が随時削除されていくので、メモ化しつつ巨大なシーケンスを取り扱いたい場合には、メモリ節約で有効なこともあるかもしれません。とはいっても、普段はあまり考えなくてもいいかな?毎回削除処理が入るので、実行時間は当然ながら遅くなります。

Share

Shareは列挙子を共有します。

using (var s = Enumerable.Range(1, int.MaxValue).Share())
{
    var e1 = s.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 3

    var e2 = s.GetEnumerator(); // 列挙子が共有されているので3からスタート
    Console.WriteLine(e2.Current); // 0。共有されるといっても、MoveNext前の値は不定です
    e2.MoveNext();
    Console.WriteLine(e2.Current); // 4。正しく初期値といえるのはここ

    Console.WriteLine(e1.Current); // 3。e1とe2は同じものを共有していますが、Currentの値はMoveNextしない限りは変わらない

    e1.MoveNext();
    Console.WriteLine(e1.Current); // 5。列挙子を共有しているので、e2の続き
}

といった形です。これの何が美味しいの?というと、例えば自分自身と結合させると、隣り合った値とくっつけられます。

// Share, Memoize, Publishにはselectorを受けるオーバーロードがある
// このselectorには var xs = source.Share() の xsが渡される
// つまり、一度外部変数に置かなくてもよいという仕組み、Zipなどと相性が良い

// 結果は {x = 1, y = 2}, { x = 3, y = 4}, { x = 5, y = 6}
// 列挙子を共有して自分自身と結合するので、隣り合った値とくっつく
Enumerable.Range(1, 6)
    .Share(xs => xs.Zip(xs, (x, y) => new { x, y }))
    .ForEach(x => Console.WriteLine(x));

なんだか、へぇー、という感じの動き。このShareを使うとstringの配列からDictionary<string, string>への変換 まとめ - かずきのBlog@Hatenaのコードは

// {"1":"one"}, {"2":"two"}
var array = new[] { "1", "one", "2", "two" };
var dict = array.Share(xs => xs.Zip(xs, Tuple.Create))
    .ToDictionary(t => t.Item1, t => t.Item2);

物凄くシンプルになります。ループを回すなんて、やはり原始人のやることでしたね!

Publish

PublishはRxでは値を分散させましたが、Ixでも分散です。ただ、挙動にはかなりクセがあり、あまりお薦め出来ないというか……。動きとしては、取得時には共有された列挙子から流れてくるのでShareのようであり、列挙子取得後は全て必ず同じ値が返ってくることからMemoizeのようでもある。

using (var p = Enumerable.Range(1, int.MaxValue).Publish())
{
    var e1 = p.GetEnumerator();
    e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 3

    var e2 = p.GetEnumerator(); // 取得時は列挙子の状態が共有されているので3からスタート
    Console.WriteLine(e2.Current); // 0。 共有されるといっても、MoveNext前の値はやはり不定
    e2.MoveNext();
    Console.WriteLine(e2.Current); // 4。正しく初期値といえるのはここ

    e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); e1.MoveNext(); e1.MoveNext();
    Console.WriteLine(e1.Current); // 8

    e2.MoveNext(); // 取得後の状態はそれぞれ別、またキャッシュから返される
    Console.WriteLine(e2.Current); // 5
}

このPublish、こうして生イテレータを操作している分には理解できますが、普通に使うように演算子を組み合わせると予測不能の挙動になります。例えば

// 自分自身と結合、GetEnumeratorのタイミングが同じなので同値で結合される
// {1,1},{2,2},{3,3},{4,4},{5,5}
Enumerable.Range(1, 5)
    .Publish(xs => xs.Zip(xs, (x, y) => new { x, y }))
    .ForEach(a => Console.WriteLine(a));

// もし後者のほうをSkip(1)したらこうなります
// {1,3},{2,4},{3,5}
Enumerable.Range(1, 5)
    .Publish(xs => xs.Zip(xs.Skip(1), (x, y) => new { x, y }))
    .ForEach(a => Console.WriteLine(a));

Skip(1)すると {1,2},{2,3}... ではなくて {1,3},{2,4}... になる理由、すぐにティンと来ますか?正直私はわけがわかりませんでした。Zipの実装を見ながら考えると、少し分かりやすくなります。

static IEnumerable<TR> Zip<T1, T2, TR>(this IEnumerable<T1> source1, IEnumerable<T2> source2, Func<T1, T2, TR> selector)
{
    using (var e1 = source1.GetEnumerator())
    using (var e2 = source2.GetEnumerator())
    {
        while (e1.MoveNext() && e2.MoveNext())
        {
            yield return selector(e1.Current, e2.Current);
        }
    }
}

Skip(1)のない、そのままZipで結合したものはEnumeratorを取得するタイミングは同じなので、 {1,1},{2,2}... になるのは妥当です。では、source2がSkip(1)である場合は、というと、source2.GetEnumeratorの時点で取得されるのはSkip(1)のEnumeratorであり、Publishで分配されているEnumeratorはまだ取得開始されていません。では、いつPublishされているEnumeratorを取得するか、というと、これは最初にe2.MoveNextが呼ばれたときになります。なので、e1.MoveNextにより一回列挙されているから、e2の(MoveNext済みでの)初期値は2。更にSkip(1)するので、{1,3},{2,4}... という結果が導かれます。

ZipやSkipなど、他のメソッドを組み合わせるなら、それらの内部をきっちり知らなければ挙動が導けないという、ものすごく危うさを抱えているので、Publishを上手く活用するのは難しい印象です、今のところ、私には。もともとPublishはRxに分配のためのメソッドとして存在して、その鏡としてIxにも移植されているという出自なのですが、どうしてもPull型で実現するには不向きなため、不自然な挙動となってしまっています。分配はPull(Ix)じゃなくてPush(Rx)のほうが向いている、というわけで、分配したいのならToObservableして、Observable側のPublishを使ったほうが、素直な動きをして良いと思います。

yield returnを突っつく

MemoizeのreaderCountの例でもそうでしたが、Publish/Memoizeされている列挙子を取得するのがGetEnumerator時ではなくて最初のMoveNextの時、になるのはyield returnを使うとそういう挙動で実装されるからです。例えば

static IEnumerable<T> Hide<T>(this IEnumerable<T> source)
{
    Console.WriteLine("列挙前");
    using (var e = source.GetEnumerator()) // 通常は、foreachを使いますが。
    {
        while (e.MoveNext()) yield return e.Current;
    }
    Console.WriteLine("Dispose済み");
} // yield break

static void Main(string[] args)
{
    var e = Enumerable.Repeat("hoge", 1).Hide().GetEnumerator(); // ここではまだ何も起こらない
    e.MoveNext(); // 列挙前 ← ここでメソッド冒頭からyield return e.Currentのところまで移動
    Console.WriteLine(e.Current); // hoge
    e.MoveNext(); // Dispose済み ← 最終行まで到達して終了
}

イテレータの自動実装でメソッド本文を動きだすのは、最初のMoveNextから、というわけです。また、イテレータ内でusingなどリソースを掴む実装をしている場合は、普通にブロックを(using,lock,try-finally)超えた時に解放されます。ただし、ちゃんとブロックを超えるまでMoveNextを呼びきれる保証なんてない(例外が発生したり)ので、GetEnumeratorする時はusingも忘れずに、は大原則です。using、つまりEnumeratorをDisposeすると、using,lock,finallyがメソッド本文中で呼ばれていなかった場合は、呼ぶようになってます。

ところで、本文が動き出すのは最初のMoveNextから、であることが困る場合もあります。例えば引数チェック。

public static IEnumerable<string> Hoge(string arg)
{
    if (arg == null) throw new ArgumentNullException();

    yield return arg;
}

void Main(string[] args)
{
    var hoge = Hoge(null); // ここでは何も起こらない!
    hoge.GetEnumerator().MoveNext(); // ArgumentNullException発生
}

nullチェックはメソッドに渡したその時にチェックして欲しいわけで、これではタイミングが違って良くない。これを回避するにはどうすればいいか、というと

// 先にnullチェックを済ませて普通にreturn
public static IEnumerable<string> Hoge(string arg)
{
    if (arg == null) throw new ArgumentNullException();

    return HogeCore(arg);
}

// privateなメソッドで、こちらにyield returnで本体を書く
private static IEnumerable<string> HogeCore(string arg)
{
    yield return arg;
}

static void Main(string[] args)
{
    var hoge = Hoge(null); // 例外発生!
}

こうすれば、完璧な引数チェックの完成。実際に、LINQ to Objectsの実装はそうなっています。この時のprivateメソッドの命名には若干困りますが、私は今のところXxxCoreという形で書くようにしてます。MicrosoftのEnumerable.csではXxxIteratorという命名のようですね。また、Ixを覗くとXxx_という名前を使っている感じ。みんなバラバラなので好きな命名でいいのではかと。

なお、こんなことのためだけにメソッドを分割しなければならないというのは、無駄だしバッドノウハウ的な話なので、かなり嫌いです。インラインにラムダ式でyield returnが使えればこんなことにはならないんだけれどなー、チラチラ(次期C#に期待)

まとめ

再度、冒頭の図を眺め直してもらうと、ああ、なるほどそういうことね、と分かりますでしょうか?

とはいえ、ShareもMemoizeもPublishもあんま使うことはないかと思いますぶっちゃけ!Memoizeは、使いたいシチュエーションは確かに多いのですけれど、しかし事前にToArrayしちゃってたりしちゃうことのほうが多いかなー、と。Shareは面白いんだけど使いどころを選ぶ。Publishは挙動が読みきれなくなりがちなので、避けたほうがいいと思います。

LINQ to Objects & Interactive Extensions & linq.js 全メソッド概説

@ITに以前書いたLINQの基礎知識の話が載りました -> LINQの仕組み&遅延評価の正しい基礎知識 - @IT。ああ、もっとしっかり書いていれば(図もへっぽこだし)、と思ったり思わなかったり。それでも校正していただいたのと、細部は修正してあるので、元のものよりも随分と読みやすいはずです。そういえばで1月頭の話なんですね、姉妹編としてRxの基礎知識もやるつもりだったのにまだやってないよ!

ところでそもそも基礎知識といったら標準クエリ演算子が何をできるかではないのでしょうか?知ってるようで知らない標準クエリ演算子。101 LINQ SamplesもあるしMSDNのリファレンスは十分に充実していますが、しかし意外と見逃しもあるかもしれません。また、Interactive Extensionsで何が拡張されているのかは知っていますか?ついでにJS実装のlinq.jsには何があるのか知っていますか?

そんなわけで、LINQ to Objects、Ix、linq.jsの全メソッドを一行解説したいと思います。

LINQ to Objects

いわゆる、標準クエリ演算子。.NET 3.5から使えます。.NET4.0からはZipメソッドが追加されました。なお、サンプルと実行例はlinq.js Referenceに「完全に」同じ挙動をするJS実装での例がありますので、そちらを参照にどうぞ。こういう場合はJS実装だと便利ですね。

Aggregate 汎用的な値算出
All 条件に全て一致するか
Any 条件に一つでも一致するか、引数なしの場合は空かどうか
AsEnumerable IEnumerable<T>へアップキャスト
Average 平均
Cast 値のダウンキャスト、主な用途はIEnumerableからIEnumerable<T>への変換
Concat 引数のシーケンスを後ろに連結
Contains 値が含まれているか、いわばAnyの簡易版
Count シーケンスの件数
DefaultIfEmpty シーケンスが空の場合、デフォルト値を返す(つまり長さ1)
Distinct 重複除去
ElementAt 指定インデックスの要素の取得
ElementAtOrDefault 指定インデックスの要素の取得、なければデフォルト値を返す
Empty 空シーケンスの生成
Except 差集合・差分だけ、集合なので重複は除去される
First 最初の値の取得、ない場合は例外が発生
FirstOrDefault 最初の値を取得、ない場合はデフォルト値を返す
GroupBy グループ化、ToLookupの遅延評価版(ただしストリーミングでの遅延評価ではない)
GroupJoin 右辺をグループにして結合、外部結合をしたい時にDefaultIfEmptyと合わせて使ったりもする
Intersect 積集合・共通の値だけ、集合なので重複は除去される
Join 内部結合
Last 最後の値を取得、ない場合は例外が発生
LastOrDefault 最後の値を取得、ない場合はデフォルト値を返す
LongCount シーケンスの件数、longなので長い日も安心
Max 最大値
Min 最小値
OfType 指定した型の値だけを返す、つまりWhereとisが組み合わさったようなもの
OrderBy 昇順に並び替え
OrderByDescending 降順に並び替え
Range 指定個数のintシーケンスの生成
Repeat 一つの値を繰り返すシーケンスの生成
Reverse 逆から列挙
Select 射影、関数の第二引数はインデックス
SelectMany シーケンスを一段階平らにする、モナドでいうbind
SequenceEqual 二つのシーケンスを値で比較
Single 唯一の値を取得、複数ある場合は例外が発生
SingleOrDefault 唯一の値を取得、複数ある場合はデフォルト値を返す
Skip 指定個数だけ飛ばす
SkipWhile 条件が正のあいだ飛ばす
Sum 合計
Take 指定個数列挙、シーケンスの個数より多く指定した場合はシーケンスの個数分だけ
TakeWhile 条件が正のあいだ列挙
ThenBy 同順の場合のソートキーの指定、昇順に並び替え
ThenByDescending 同順の場合のソートキーの指定、降順に並び替え
ToArray 配列に変換
ToDictionary 辞書に変換
ToList リストに変換
ToLookup 不変のマルチ辞書(一つのキーに複数の値を持つ)に変換
Union 和集合・両方の値全て、集合なので重複は除去される
Where フィルタ
Zip 二つのシーケンスの結合、長さが異なる場合短いほうに合わされる

暗記する必要はなくて、なんとなくこういうのがあってこんな名前だったかなー、とぐらいに覚えておけば、IntelliSenseにお任せできるので、それで十分です。

リスト処理という観点からみるとLINQはかなり充実しているわけですが、更に他の言語と比較した場合の特色は、やはりクエリ構文。SelectManyへの構文は多くの言語が備えていますが(モナドの驚異を参照のこと、LINQはLINM:言語統合モナドである、というお話)、SQLの構文をベースにしたJoin、GroupBy、OrderByへの専用記法は、意外と、というか普通に便利。

特にJoinはあってよかったな、と思います、インメモリで色々なところからデータ引っ張ってきて結合などすると特に。一つぐらいの結合なら別にメソッド構文でいいのですが、フツーのSQLと同じように大量のjoinを並べる場合に、クエリ構文じゃないとシンドい。インメモリからデータベースまで統一的な記法で扱える、ということの凄さを実感するところ。

といっても、普段はほとんどメソッド構文で書いてるんですけどねー。あくまで、込み入った状況になるときだけクエリ構文にしています。クエリ構文では表現できないものが結構多いわけで、わざわざ、これはクエリ構文だけで表現できるからクエリ構文にするかー、とか考えるのもカッタルイので。あと、単純にIntelliSenseでポコポコ打ってるほうが快適、というのもあります。

クエリ構文は、モナドへの記法というよりも、強力なリスト内包表記といった印象も、HaskellへのOrder By, Group Byのペーパー見て思ったりなんかしたりして。

Ix

Ix(Interactive Extensions)はReactive Extensionsで、現在は実験的なものとして提供されている、Enumerableの拡張メソッド群。NuGetのIx_Experimental-Mainで入れるのが使いやすい感じ。InfoQ: LINQ to Objectsのためのインタラクティブエクステンションに解説が少し出ていましたが、少し不足していたり、間違っていたり(DoWhileとTakeWhileは一見似ていますが、挙動は全然異なるし、Forは別に全く興味深くなくSelectManyと同じです)したので、こちらの方が正しいです(キリッ

Buffer 指定個数分に区切って配列で値を列挙
Case 引数のIDictionaryを元に列挙するシーケンスを決める、辞書に存在しない場合はEmpty
Catch 例外発生時に代わりに後続のシーケンスを返す
Concat 可変長引数を受け入れて連結する生成子、拡張メソッド版はシーケンスのシーケンスを平らにする
Create getEnumeratorを渡し任意のIEnumerableを生成する、といってもEnumerator.Createがないため、あまり意味がない
Defer シーケンスの生成をGetEumerator時まで遅延
Distinct 比較キーを受け入れるオーバーロード
DistinctUntilChanged 同じ値が続くものを除去
Do 副作用として各値にActionを適用し、値をそのまま列挙
DoWhile 一度列挙後に条件判定し、合致すれば再列挙
Expand 幅優先探索でシーケンスを再帰的に平らにする
Finally 列挙完了時に指定したActionを実行
For SelectManyと一緒なので存在意義はない(Rxと鏡にするためだけに存在)
ForEach foreach、関数の第二引数はインデックス
Generate forループを模した初期値、終了判定、増加関数、値成形関数を指定する生成子
Hide IEnumerable<T>に変換、具象型を隠す
If 条件が正なら指定したシーケンスを、負なら指定したシーケンス、もしくはEmptyで列挙する
IgnoreElements 後に続くメソッドに何の値も流さない
IsEmpty シーケンスが空か、!Any()と等しい
Max IComparer<T>を受け入れるオーバーロード
MaxBy 指定されたキーで比較し最大値だった値を返す
Memoize メモ化、複数回列挙する際にキャッシュされた値を返す
Min IComparer<T>を受け入れるオーバーロード
MinBy 指定されたキーで比較し最小値だった値を返す
OnErrorResumeNext 例外が発生してもしなくても後続のシーケンスを返す
Publish ShareとMemoizeが合わさったような何か
Repeat 無限リピート生成子、拡張メソッドのほうは列挙後に無限/指定回数最列挙
Retry 例外発生時に再度列挙する
Return 単一シーケンス生成子
Scan Aggregateの算出途中の値も列挙する版
SelectMany 引数を使わず別のシーケンスに差し替えるオーバーロード
Share 列挙子を共有
SkipLast 後ろからn個の値をスキップ
StartWith 先頭に値を連結
TakeLast 後ろからn個の値だけを列挙
Throw 例外が発生するシーケンス生成子
Using 列挙完了後にDisposeするためのシーケンス生成子
While 列挙前に条件判定し合致したら列挙し、終了後再度条件判定を繰り返す生成子

みんな実装したことあるForEachが載っているのが一番大きいのではないでしょうか。別に自分で実装するのは簡単ですが、公式に(といってもExperimental Releaseですが)あると、全然違いますから。なお、何故ForEachが標準クエリ演算子にないのか、というのは、“foreach” vs “ForEach” - Fabulous Adventures In Codingによれば副作用ダメ絶対とのことで。納得は……しない。

Ixに含まれるメソッドは標準クエリ演算子では「できない」もしくは「面倒くさい」。Ixを知ることは標準だけでは何ができないのかを知ること。何ができないのかを知っていれば、必要な局面でIxを使うなり自前実装するなりといった対応がすぐに取れます、無理に標準クエリ演算子をこねくり回すことなく。例えばBufferやExpandは非常に有益で、使いたいシチュエーションはいっぱいあるんですが、標準クエリ演算子ではできないことです。

While, DoWhileとTakeWhileの違いは条件判定する箇所。While,DoWhileは列挙完了前/後に判定し、判定がtrueならシーケンスを再び全て列挙する。TakeWhileは通る値で毎回判定する。

PublishとMemoizeの違いは難解です。Memoizeは直球そのままなメモ化なんですが、Publishが凄く説明しづらくて……。Enumerator取得まではShareと同じく列挙子の状態は共有されてるんですが、取得後はMemoizeのようにキャッシュした値を返すので値の順番は保証される、といった感じです。うまく説明できません。

存在意義が微妙なものも、それなりにありますね。例えばIfとCaseとForなどは、正直、使うことはないでしょう。Usingも、これを使うなら別メソッドに分けて、普通にusing + yield returnで書いてしまうほうが良いと私は考えています。

Ixを加えると、ほとんど全てをLINQで表現出来るようになりますが、やりすぎて解読困難に陥ったりしがちなのには少し注意を。複雑になるようならベタベタ書かずに、一定の塊にしたものを別メソッドに分ければいいし、分けた先では、メソッドを組み合わせるよりも、yield returnで書いたほうが素直に表現出来るかもしれません。

適切なバランス感覚を持って、よきLINQ生活を!

linq.js

LINQ to ObjectsのJavaScript実装であるlinq.jsにも、標準クエリ演算子の他に(作者の私の趣味で)大量のメソッドが仕込んであるので、せっかくなのでそれの解説も。標準クエリ演算子にあるものは省きます(挙動は同一なので)。また、C#でIEqualityComparer<T>を受け取るオーバーロードは、全てキーセレクター関数のオーバーロードに置き換えられています。

一行サンプルと実行はlinq.js Referenceのほうをどうぞ。

Alternate 値の間にセパレーターを織り込む、HaskellのIntersperseと同じ
BufferWithCount IxのBufferと同じ、次のアップデートでBufferに改称予定
CascadeBreadthFirst 幅優先探索でシーケンスを再帰的に平らにする、IxのExpandと同じ
CascadeDepthFirst 深さ優先探索でシーケンスを再帰的に平らにする
Catch IxのCatchと同じ
Choice 引数の配列、もしくは可変長引数をランダムに無限に列挙する生成子
Cycle 引数の配列、もしくは可変長引数を無限に繰り返す生成子
Do IxのDoと同じ
Finally IxのFinallyと同じ
Flatten ネストされた配列を平らにする
Force シーケンスを列挙する
ForEach IxのForEachと同じ
From 配列やDOMなど長さを持つオブジェクトをEnumerableに変換、linq.jsの要の生成子
Generate ファクトリ関数を毎回実行して値を作る無限シーケンス生成子、IxのGenerateとは違う(IxのGenerateはUnfoldで代用可)
IndexOf 指定した値を含む最初のインデックス値を返す
Insert 指定したインデックスの箇所に値を挿入、Insert(0, value)とすればIxのStartWithと同じ
LastIndexOf 指定した値を含む最後のインデックス値を返す
Let 自分自身を引数に渡し、一時変数を使わず自分自身に変化を加えられる
Matches 正規表現のマッチ結果をシーケンスとして列挙する生成子
MaxBy IxのMaxByと同じ
MemoizeAll IxのMemoizeと同じ、次のアップデートでMemoizeに改称予定
MinBy IxのMinByと同じ
Pairwise 隣り合う要素とのペアを列挙
PartitionBy キーで指定した同じ値が続いているものをグループ化する
RangeDown 指定個数のマイナス方向数値シーケンス生成子
RangeTo 指定した値まで(プラス方向、マイナス方向)の数値シーケンス生成子
RepeatWithFinalize 単一要素の無限リピート、列挙完了時にその要素を受け取る指定した関数を実行
Return IxのReturnと同じ
Scan IxのScanと同じ
Share IxのShareと同じ
Shuffle シーケンスをランダム順に列挙する
TakeExceptLast IxのSkipLastと同じ
TakeFromLast IxのTakeLastと同じ
ToInfinity 無限大までの数値シーケンス生成子
ToJSON シーケンスをJSON文字列に変換(組み込みのJSON関数のあるブラウザかjson2.jsの読み込みが必要)
ToNegativeInfinity マイナス無限大までの数値シーケンス生成子
ToObject JSのオブジェクトに変換
ToString 文字列として値を連結
Trace console.logで値をモニタ
Unfold Aggregateの逆、関数を連続適用する無限シーケンス生成子
Write document.writelnで値を出力
WriteLine document.writeln + <br />で値を出力
TojQuery シーケンスをjQueryオブジェクトに変換
toEnumerable jQueryの選択している複数の要素を単一要素のjQueryオブジェクトにしてEnumerableへ変換
ToObservable 引数のSchduler上で(デフォルトはCurrentThread)Observableへ変換
ToEnumerable Cold ObservableのみEnumerableへ変換

Ixと被るものもあれば、そうでもないものも。ToStringなどは分かりやすく便利でよく使うのではかと。ToJSONもいいですね。Fromは拡張メソッドのない/prototype汚染をしないための、JavaScriptだけのためのメソッド。Matchesは地味に便利です、JSの正規表現は使いやすいようでいて、マッチの列挙はかなり面倒くさいので、そこを解消してくれます。linq.jsは移植しただけ、ではあるんですが、同時に移植しただけではなくて、JavaScriptでLINQはどうあるべきか、どうあると便利なのか、という考えに基づいて調整されています。

JavaScriptにはyield returnがないので(Firefoxにはyieldありますが)、シーケンスは全て演算子の組み合わせだけで表現できなければならない。というのが、手厚くメソッドを用意している理由でもあります。これだけあれば何だって作れるでしょう、きっと多分恐らく。

まとめ

これで今日からLINQ to Objectsマスター。Rx版もそのうち書きます(以前にReactive Extensions入門 + メソッド早見解説表を書きましたが、今は結構変わってしまいましたからね)。

ToArray vs ToList

LINQの結果は遅延評価なので、その場で全部評価して欲しかったりする場合などに使うToArrayとToList。どちらを使っていますか?私はToArrayのほうが好みです。と、いうのも、LINQで書く以上、長さは決まったようなものなので、これ以上AddやRemoveしたいことなんてほとんどない。勿論、必ずないとは言いませんので、その場合だけToListを使いますが、そうでない場合は、長さが固定だという意図を示すためにもToArrayが好ましい。

パフォーマンス

T[]やList<T>に変換されたあとだと、T[]のほうが、大体においてパフォーマンスは良い。という点でもToArrayがいいかなあ、と思うわけですが、それはさておき、ではToArrayとToListメソッドそれ自体のパフォーマンスはどちらのほうが良いでしょうか?理屈の上ではToListのほうが上です。というのも、変換処理は下記の図のようになっているからです。

元ソースがIEnumerable<T>である以上、長さは分からないので、ToArrayでも動的配列としていっぱいになったら二倍に拡大する、という動作を行うのはToListと変わりありません。この辺の話は動的配列への追加コストはなぜ O(1)?や、2倍だけじゃないを参考に。.NETは2倍です。そして、最後に拡大された配列の長さを整えるためコピーするのがToArray、そのまま渡すのがToList。つまり、ToArrayのほうが最後の一回のコピー動作が増えているわけです。

でも、ベンチマークをとると、ToArrayのほうが速かったりします。

// 適当さ溢れている(若干恣意的な)測り方なので、それはそれとしてくだしあ
// ToArray:00:00:01.5002685
// ToList :00:00:01.8124284
var source = Enumerable.Range(1, 100000000);

var sw = Stopwatch.StartNew();
source.ToArray();
Console.WriteLine("ToArray:" + sw.Elapsed);
GC.Collect();
sw.Restart();
source.ToList();
Console.WriteLine("ToList:" + sw.Elapsed);

へー、ToArrayのほうが速いんだー、ではなくて、要素数1億件でこの程度しかでないので、どうでもいい程度の差でしかないということです。ここ注意。こういう適当なマイクロベンチのマイクロな差で、こっちのほうが速いからこうしなければならない、これが最適化のための10箇条、みたいなことをやるのは間抜けだと思います。JavaScriptにはそういう記事があまりにも多すぎるとも思っています。

それはともかく、何で理屈の上ではコピーが多いToArrayのほうが"速い"のか。それは中身をゴニョゴニョしてみてみれば分かりますが

public static List<T> ToList<T>(this IEnumerable<T> source)
{
    // ICollection<T>の場合はnew List<T>(source)の中で最適化されてます
    // 最適化されない場合はforach(var item in source) this.Add(item) という感じ
    return new List<T>(source)
}

// 実際のコードとは違います、あくまでイメージです
public static T[] ToArray<T>(this IEnumerable<T> source)
{
    // ICollection<T>の場合はCopyToで最適化
    var collection = source as ICollection<T>;
    if (collection!= null)
    {
        var dest = new T[collection.Count];
        collection.CopyTo(dest, 0);
        return dest;
    }

    // そうでないなら列挙して配列を伸ばしながら作る
    var array = new T[4];
    var count = 0;
    foreach (var item in source)
    {
        if (array.Length == count)
        {
            var dest = new T[count * 2];
            Array.Copy(array, dest, count);
            array = dest;
        }
        array[count++] = item;
    }

    // 生成したものと長さが同じならそのまま返す
    if (array.Length == count) return array;

    // そうでないなら長さを整えてから返す
    var result = new T[count];
    Array.Copy(array, result, count);
    return result;
}

これだけだとよくわからない?うーん、そうですね。ToArrayの場合は配列を作る、それだけに最適化されていて余計なコードが一切ありません。反面、ToList、というかnew List<T>(source)は、内部では少し色々なものの呼び出し回数が多かったりしています。その辺のことが、コピー回数以上に「ほんの少しだけ」速度の差を生んでいるのではないかな、ということのようです。

// パフォーマンスを一切考えないのならこれでいいのよね
public static T[] ToArray<T>(this IEnumerable<T> source)
{
    // 実際はreturn new Buffer<T>(source).ToArray();
    return new List<T>(source).ToArray();
}

理屈的にはこれでいいわけですが、実際はBuffer<T>クラスというものをinternalでもっていて、それはLINQで使うためだけに余計なものが一切ない動的配列で、LINQ to Objectsの各メソッドは、動的配列が必要な場合ではList<T>ではなく、そのBuffer<T>クラスを使っています。DRYはどうした、という気は少しだけしますが、まあ、ユーザーとしては速いに越したことはないです。

Array.Copy

ところで、Array.CopyやICollection<T>のCopyToって面倒くさいですよね、長さを持った空の配列を作って、渡さなければならないって。と、Array Copy - Memo+の記事を見て改めて思いましたが、しかし、一番よくあるケースである一次元配列のコピーならToArrayを使えばOKです。↑の実装イメージであるように、ちゃんとis asで判定して最適化してくれているので、LINQだとforeachで全部舐めるから遅いんじゃないかなー、と考えなくても大丈夫。

まとめ

今日、Twitterで間違ったこと投稿しちゃって恥ずかすぃかったので反省して書いた。まる。とりあえずToArray使えばいいです。

Deep Dive AsEnumerable

AsEnumerable、といったらLINQのAsEnumerableです。その挙動は、IEnumerable<T>へと型変換をします。それだけ、なので実に影が薄いのですが、それでいて奥深く使いこなしが求められる、はずなのですが陰が薄いので無視されている感がなきにしもあらずなので、しっかりと紹介したいと思います。

AsEnumerableの実装

実装は非常に単純明快で、中身ほとんど空っぽです。

public static IEnumerable<T> AsEnumerable<T>(this IEnumerable<T> source)
{
    return source;
}

ようするにアップキャストです。どういう時に使えばいいかというと、例えば可変長引数とIEnumerable<T>の両方を受けたいオーバーロードを作る場合。

public void Show(params string[] values)
{
    Show(values.AsEnumerable());
}

public void Show(IEnumerable<string> values)
{
    foreach (var item in values)
    {
        Console.WriteLine(item);
    }
}

foreachでグルグル値を取り出すだけなので、IEnumerable<T>で受けるようにしたい。でも利便性のため可変長引数も用意しておきたい。という場合はよくあります。なので毎回このオーバーロードを用意するんですが、その時に、こうしてAsEnumerableを使います。なお、AsEnumerableを忘れると無限に再帰してStackOverflowしてしまいます……。

AsEnumerableがラップするのではなく、ただのアップキャストにすぎないということは重要です。以前にLinqとCountの効率でも書きましたが、LINQの一部のメソッドはIList<T>であったりICollection<T>であるとき、asやisを使って最適化を図ります。foreachするだけだとあまり関係ないですが、受け取ったIEnumerable<T>を使ってLINQで処理する場合だと、このことが効いてきます。

ならば常にアップキャストでよくて、ラップなど必要ないのではないか?というと必ずしもそうではありません。アップキャストは、ダウンキャストを可能にします。

// アップキャストされている状態というのは
var numbers = new List<int>().AsEnumerable();

// ダウンキャストが可能にするということ、そして、ダウンキャストは危険
var list = (List<int>)numbers;

ダウンキャストが危険だ、やるな、というのなら、そもそもアップキャストをすべきではない。抽象で受けることこそがオブジェクト指向だとか、形だけのパターンにはまってるとそうなる。原則は比較的シンプルで。メソッドのシグネチャにおいて、引数の型は最大に受け入れるため出来る限り抽象で、戻り値の型は最大に利用出来るようにするため具象にすればいい。ローカル変数に関しては、原則varでよし。どうしても必要ならば、ローカル変数側、つまりメソッドの利用側が安全なアップキャストで適宜、抽象で受ければよいでしょう。

ダウンキャストはダメ基本的に。そして、ダウンキャストは可能な状態にすること自体がダメなので、アップキャストも最小限に。というのがメソッド定義の基本だと思っていますが、プロパティだと少し事情は変わってくるかも。一々ラップすることのパフォーマンスロスや手間を考えると、しかたがなくアップキャストで提供するのも、ありかなー、とは。

Hide

そんなわけで、具象型を消去して、完全にラップしてIEnumerable<T>として提供したいという場合もあるかと思います。そこで、Ix(Interactive Extensions、Reactive Extensionsのオマケで提供されているEnumerableの拡張メソッド群、NuGetのIx_Experimental-Mainで入れるのが手っ取り早い。Experimentalのとおり、まだ実験的な代物で保証されていないことは注意)にはHideというものがあります。これも実装は単純明快で

public static IEnumerable<T> Hide<T>(this IEnumerable<T> source)
{
    foreach (var item in source)
    {
        yield return item;
    }
}

といった形。Hideというメソッド名は具体的な型を隠す、といった意味合いで付けられているのでしょうね。

Rx(AsObservable)の場合

Enumerableと関連性の深いObservable、Rxにも同様に型変換をするAsObservableというメソッドが用意されています。主に使うシチュエーションは、Subjectの隠蔽をするときでしょうか。

// 5秒後に非同期で値を返すというだけのもの
public static IObservable<T> SendValueAfter5Seconds<T>(T value)
{
    var asyncSubject = new AsyncSubject<T>();

    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(5)); // とりまsleep

        asyncSubject.OnNext(value); // AsyncSubjectのキャッシュへ値送信
        asyncSubject.OnCompleted(); // 非同期処理完了の合図(ここでObserverに値が送られる)
    });

    return asyncSubject.AsObservable();
}

このコード自体には何の意味もありません、非同期処理を模して、スレッドプールで5秒待って値を送る、というだけです。大事なのはAsyncSubjectをAsObservableして返していること。このAsObservableはただのアップキャストではなく、新しい型でラップして具象型(AsyncSubject)を隠しています。つまり、AsEnumerableではなくHideに等しい挙動です。ここで、もしAsObservableを書いていないと

// return時にAsObservableが書かれていないとダウンキャスト可能になる
var subject = (AsyncSubject<int>)SendValueAfter5Seconds(100);

subject.Subscribe(Console.WriteLine);

// なので、外側から発火可能になってしまう、これは最悪
subject.OnNext(-1);
subject.OnCompleted(); 

Subject(標準だと4種類ある)は、Rxにおけるイベントの表現です。C#でのイベントは、内部からは発火可能、外側からは購読しかできない。というようになっていると思います。その挙動にするために、また、純粋に安全性のために、Subjectを購読させるために外側に出す場合は、AsObservableでラップして型を消さなければなりません。

※極初期(RxがReactive Frameworkと言われていた頃なぐらいに前)は、このAsObservableはHideというメソッド名でした。AsObservableのほうが分かりやすくて良いとは思いますが、Enumerableでの挙動と合わせるなら、キャストするだけのAsObservableとHideに分けるべきだったのでは?と思わなくは全くないです←Rxにおいてはただのキャストしただけのものは使う機会ないと思うので、現在の形で正解

IQueryableにおけるAsEnumerableの重要性

Enumerable、Observableと来たので、QueryableでのAsEnumerableも見てみましょう。QueryableにおけるAsEnumerableは、クエリ構築の終了です。IQueryableでのクエリ構築をそこで打ち切るというスイッチです。どういうことか、というと

// とあるContextによるQueryableはSkipWhileとCountをサポートしていなかったとします
var count = toaru.createContext() // IQueryeable<T>とする
    .Where(x => x % 2 == 0)
    .SkipWhile(x => x < 100)
    .Count(); // 未サポートなのでExceptionが来る!

// そういう場合、ToListするといい、というアドバイスがよく上がります
var count = toaru.createContext()
    .Where(x => x % 2 == 0)
    .ToList() // ここまでの式でクエリ生成+List化
    .SkipWhile(x => x < 100) // ここからはIEnumerable<T>
    .Count();

// でも、それならAsEnumerableでいいんだよ?
var count = toaru.createContext()
    .Where(x => x % 2 == 0)
    .AsEnumerable() // 後続がGetEnumeratorを呼んだ時にここまででクエリ生成
    .SkipWhile(x => x < 100) // ここからはIEnumerable<T>
    .Count();

Queryableの連鎖で、例えばLinq to SqlだったらSQL文を作っていきます。で、foreachであったりToListであったりをすると、SQLが作られて発行されてデータベースと通信されて。それって、どのタイミングでQueryableの中の式木がSQL文に変換されるかというと、GetEnumeratorが呼ばれた時、です。それはいつ呼ばれるの?というと、foreachされたりToListされたり、AsEnumerableしてその後のEnumerableのメソッドがGetEnumeratorを呼んだ、その時。

こんな感じです。ToArrayやToListは、そこで実体化するので、メソッドチェーンの後続がIEnumerable<T>なのは当然のことですが、AsEnumerableがただのキャストにすぎないのに意味合いが変化するのは、拡張メソッドの解決の優先度のため。型がIQueryable<T>の状態だとWhereやSelectはQueryableのWhereやSelectが選択されますが、型がIEnumerable<T>の状態だとEnumerableのWhereやSelectが選択される、ということです。Enumerable自体は遅延評価なので、後続のIEnumerable<T>がGetEnumeratorを呼び出したときに評価が開始されるのは変わらず。

AsEnumerableやToArray、ToListは実はQueryableクラスにはありません。なので、素の状態で拡張メソッドの解決がIEnumerable<T>側を呼び出すようになっています。

ところでクエリ文の構築はGetEnumeratorが呼ばれた時と言いましたが、GetEnumeratorを呼ばないとき、例えばQueryableでのFirstやSumはどうなっているのかというと、内部でExecuteが呼ばれた時です。IQueryProviderはこんなインターフェイス。

public interface IQueryProvider
{
    IQueryable<TElement> CreateQuery<TElement>(Expression expression);
    TResult Execute<TResult>(Expression expression);
    // 非ジェネリックなものもありますが省略
}

FirstやSumなど、単独の結果を返すものは内部でExecuteを呼びます。なので、クエリプロバイダの実装次第ですが、通常はこのExecuteが呼ばれた時にクエリ文の構築と実行を同時に行うものと思われます。SelectやWhereなど、後続にIQueryableのチェーンを繋げるものは、内部でCreateQueryのほうを呼びます。そして最終的に複数の結果(IEnumerable<T>)を返す場合は、GetEnumeratorが呼ばれた時にクエリ文の構築と実行を行うものと思われます。

まとめ

AsEnumerableは、ようするにただのキャストなだけですが、その果たしている役割というものを考えると非常に深い。その割には(QueryableでToListばかり使われたりと)今ひとつ知名度に欠ける気もしますので、ドサッと紹介を書いてみました。ただのキャストだって語ろうと思えば幾らでも語れるLINQは素敵ですね!

DynamicObjectでより沢山の型情報を取る方法

Chaining AssertionのAsDynamicに少し不具合があったので、ver.1.5.0.0として更新しました。今まではメソッドの引数にnullを渡すと死んでました。ぬるぽ!まあしかし、引数そのもののからしかTypeが取れなかったので、nullだと、どのみちメソッドを特定するための型情報がないからオーバーロードの解決は不可能なので、仕様ですよ仕様、という言い訳。

などとふざけたことを思っていたのですけれど、コンパイル時に決定される引数の型を取り出す方法が判明したので、そのへんも含めて完全に解決しました。やったね。その方法は、というと

public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result)
{
    var csharpBinder = binder.GetType().GetInterface("Microsoft.CSharp.RuntimeBinder.ICSharpInvokeOrInvokeMemberBinder");
    if (csharpBinder == null) throw new ArgumentException("is not csharp code"); // CSharpコードではない

    // ジェネリックの型引数の取得(Hoge<T>(1, t)とかでのTのこと)
    var typeArgs = (csharpBinder.GetProperty("TypeArguments").GetValue(binder, null) as IList<Type>).ToArray();
    // コンパイル時に決定されているパラメータの型の取得
    var parameterTypes = (binder.GetType().GetField("Cache", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(binder) as Dictionary<Type, object>)
        .First()
        .Key
        .GetGenericArguments()
        .Skip(2)
        .Take(args.Length)
        .ToArray();
    
    // それらの情報からMethodInfoを特定する
    var method = MatchMethod(binder.Name, args, typeArgs, parameterTypes);
    // 呼び出し
    result = method.Invoke(target, args);

    return true; // 呼べてれば必ずTrueなので。
}

……。ひどそうな匂いが!まず、素のままでは情報が足りなすぎるので、基本的にリフレクション全開です。その中でも、parameterTypesが今回追加したところです。binderのCacheに、CallSiteのデリゲート(dynamicを使って呼び出すと、コンパイル時にこの辺のものが自動生成される)があるので、そこから型情報を持ってこれることに気づいたのだ(キリッ

もう少し詳しく説明しますと

// このヘンテツもないどうでもいいコードは
dynamic d = null;
var result = d.Hoge<string, int>(100, (string)null, (ICollection<int>)null);

// コンパイル後はこんな結果に化けちゃいますあら不思議!
object d = null;
if (Program.<Main>o__SiteContainer0.<>p__Site1 == null)
{
	Program.<Main>o__SiteContainer0.<>p__Site1 = CallSite<Func<CallSite, object, int, string, ICollection<int>, object>>.Create(Binder.InvokeMember(CSharpBinderFlags.None, "Hoge", new Type[]
	{
		typeof(string), 
		typeof(int)
	}, typeof(Program), new CSharpArgumentInfo[]
	{
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.None, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null)
	}));
}
object result = Program.<Main>o__SiteContainer0.<>p__Site1.Target(Program.<Main>o__SiteContainer0.<>p__Site1, d, 100, null, null);

細かい部分はどうでもいいので、良く分からないFuncが生成されてるんだな、というとこだけ見てもらえれば。さて、これを頭に入れた上で、DynamicObjectのTryInvokeMemberでbinderの奥底のCacheを探してみると、

このFunc6というものが、Func<CallSite, object, int, string, ICollection<int>, object>です、発見出来ました!これの型引数が、コード上での呼び出し時の型引数になるわけです。なお、第一引数はCallSite、第二引数はインスタンスなので無視してSkip(2)、そして引数の個数分だけTake(まあ、ようするに最後が戻り値の型なわけですが)。

というわけで、実際に改善されたAsDynamicを使ってみますと、

// こんな何のヘンテツもないオーバーロードのあるクラスがあるとして
public class PrivateClass
{
    private string Hoge(IEnumerable<int> xs)
    {
        return "enumerable";
    }

    private string Hoge(List<int> xs)
    {
        return "list";
    }
}

// 型はdynamicです←意地でもvarで書きたい人
var mock = new PrivateClass().AsDynamic();

// 型でオーバーロード分けが出来るようになった!
List<int> list = new List<int>();
IEnumerable<int> enumerable = new List<int>();

(mock.Hoge(list) as string).Is("list");
(mock.Hoge(enumerable) as string).Is("enumerable");

というわけで、より正確なオーバーロードの解決が図れるようになりました。何をアタリマエのことを言ってるんだお前は、と思うかもしれませんが、TryInvokeMemberに渡ってくる情報はobject[] argsなのです。args[0]の型はobjectなわけで、それをGetTypeしたら、出てくるのはList<int>なのです。何をどうやっても、.csファイルではIEnumerable<int>と書かれているという情報を得ることは出来なかったわけです、今までは。ましてやnullだったら型もヘッタクレもなかったわけです。でもこれからは違う。コード上のデータが取れる!

などとツラツラと書いてみましたが、利用者的にはどうでもいい話ですね、はい。それに、リフレクションはいいとしても、Cacheって何よ?CacheのFirstが決め打ちなのって何よ?などなどは、ぶっちゃけよくわかっていなくて(だって全部internalだしね……)若干怖いのですが、まあ、多分、大丈夫でしょう、多分……。それと、まだ完璧じゃあなくてサポートしてないオーバーロードのケースが幾つかあります。とはいえ、ほとんどのシチュエーションでは問題ないのではかと思います。

まとめ

このChaining Assertionですが、私は結構普通に使いまくっていて、ないと死にます。激しく便利。そうそう、紹介しますと、 actual.Is(expected) と、メソッドチェーン形式で流れるようにアサーションが書けます。ラムダ式での指定やコレクションへの可変長引数など、ちょうどかゆいところに手が届く拡張を施してあって、随分とテストを書くのが楽になります。

AsDynamicは、オマケ機能というか。privateなメソッドやプロパティ、フィールドにもdynamicを通してアクセス出来るように変換します。たまにしか使いませんが(MSTestにはPrivate Accessorがあるので)、あると便利よね、という時もそこそこあり。

MSTestだけではなく、NUnitやMbUnit、xUnit.NETでも使えますので&NuGet経由でも入れられますので、一度是非お試しを。

非同期の再帰的な辿り方、或いはRxとC# 5.0 Asyncの連携について

例えば、ページを辿る。何度もアクセスを繰り返して、辿る。非同期で。単純なようで、やってみると何気にこれが結構難しい。例としてコードレシピのReactive Extensionsを使用してTwitterから非同期にデータを取得し表示するがありました。MVVMも絡めて、素晴らしいサンプルですね!

というわけで、お題を拝借して、Twitter ApiのGET statuses/friendsを使わせて頂きます。んが、今回は、手を抜いてフォロワーのscreen_name(@hogehogeのhogehogeの部分)だけを取れれば良い、ということにします。JSON解析やデシリアライズも面倒だし話の本題でもないので省略するため、DynamicJsonを使って、JSONを生のまんまっぽく扱うことにします。DynamicJsonは便利だなあ(棒)

さて、まずTwitter APIのcursorですが、大体こんな風になっています。目的はカーソルを辿って全てのuser(に含まれるscreen_name)を集めること。

JSON取得毎にnext_cursor_strという、次のページへのIDが取れるので、それを辿っていって、0が出たらページ末尾。といった具合です。next_cursor_strの値は一見ランダムに見える整数(2121409421とかそんな値になっている)であり、next_cursorという数字のものもあるのに、_strという文字列として得られるほうを使っています。何故かというと、TwitterのステータスIDが53bitを越えたお話 - tmytのらくがきを参照ください。DynamicJsonでは数字(Number)はdoubleとして扱うので、_strのほうを使わないと、危ういわけです。

まあ、ただのお題で本題な話ではないので、その辺は深く考えずそういうものなのだなあ、ぐらいで。

同期とyield returnと非同期

コードレシピのサンプルを見させて頂いたのですが、ネットワークアクセス部分がOpenReadなので、非同期"ではない"です。でも挙動は非同期だよ?というのは、Scheduler.ThreadPoolを使っているからなわけですが、つまるところ挙動的にはBackgroundWorkerを使って非同期にするのと同じことです。その場合ですと、Generateも確かに良いのですが、APIへのアクセスがそもそも同期であるならば、難しく考える必要はなく、yield returnを使ったほうが簡単です。単純なものは演算子の組み合わせで、複雑なものは素直に偉大なるコンパイラ生成(yield return)に頼る。そういう切り分けがLINQ的には大事かなって。

static IEnumerable<string> EnumerateFriends(string screenName)
{
    var cursor = "-1"; // 初期値は-1から
    while (cursor != "0") // 0が出たら終了
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = new WebClient().OpenRead(url))
        {
            var json = DynamicJson.Parse(stream); // 面倒くさいんでDynamicJson使いますよ:)
            foreach (var item in json.users)
            {
                yield return item.screen_name; // screen_nameを列挙
            }
            cursor = json.next_cursor_str; // 次のカーソルにセット
        }
    }
}

static void Main()
{
    var friends = EnumerateFriends("neuecc").ToArray();
}

すっきりと書けるのが分かると思います。え、これだとブロックしてしまって良くない?その通り。じゃあ非同期にしましょう。いえ、Reactive Extensionsで簡単にできてしまいます。yield returnで生成されたEnumerableをObservableに変換するのは、ToObservableです。

static void Main()
{
    EnumerateFriends("neuecc")
        .ToObservable(Scheduler.ThreadPool) // ThreadPoolで実行!
        .Subscribe(Console.WriteLine);
        
    Console.ReadLine(); // 終了してしまうからね
}

ToObservableはデフォルトでは Scheduler.CurrentThread 上で実行されるため、同期的にブロックしますが(※Push型シーケンスだからといって必ずしも非同期とは限らない)、任意のものに変更することも可能です。今回はScheduler.ThreadPoolを指定したので、ThreadPool上で動くようになっています。そのため、ブロックされません。

こういった書き方のほうが、コードがクリアになるし、IEnumerable<T>とIObservable<T>に両対応できてる、という柔軟性の点でも良いかと思います。また、BackgroundWorkerを使うよりも遥かに簡単ですよね。プログレス通知もなく、ただ処理をバックグラウンドでやりたい、というだけならば、Rxを使ったほうが楽チンです。プログレスが必要な場合は、Rxだとその辺の処理を作りこまなければならないので、素直にBackgroundWorkerを用いるのもいいかもしれません。私だったらRxをちょっと拡張してプログレス処理を作り込むほうを選ぶかな?その辺の話はReactive ExtensionsとAsync CTPでの非同期のキャンセル・プログレス処理を参照ください。

また、Observable化するとPublishによる分配 - C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsなど、色々と応用な使い方が広がるのもメリットの一つと言えるでしょう。

Async CTP

今回は例がWPFなため、WebClientで同期的(OpenRead)に取ってしまいましたし、それでも全然問題ないわけですが、SilverlightとかWP7だったらこの手(同期でJSON取ってきてyield returnで返す)は使えません。同期のOpenReadがそもそもなくて、非同期のOpenReadAsyncしかないからね。どうしましょう?それだとyield returnが使えないのはモチロンのこと、Generateでもうまく動きません。もしページ番号がカーソルのように不定ではなく1,2,3...といった形で辿れたとしても、Observable.Rangeでやると、うまくいきません。非同期なので結果が帰ってくる時間が不定だからです。結果を取得してから次の結果を取得する、という形式にしないとダメなのです。

ところでそもそも、同期的に書いたとしても、本来は書くのは大変なはずなのです。それが、yield returnというコンパイラ生成があるから簡単に書ける。ということは、そうです、非同期もコンパイラ生成してしまえばいいのです、ということでAsync CTPで書きましょう。Async CTPはC# 5.0で入る、かもしれない、async/await構文を使えるようにするためのものです。コミュニティテクノロジープレビュー。ようするにベータ版ですね。

// このコードはAsync CTP (SP1 Refresh)によるもので、将来的にも同じコードで動作することは保証しません
async static Task<List<string>> EnumerateFriends(string screenName)
{
    var list = new List<string>();

    var cursor = "-1";
    while (cursor != "0")
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
        {
            var json = DynamicJson.Parse(stream);
            foreach (var item in json.users)
            {
                list.Add(item.screen_name); // yield returnの代わりに……
            }
            cursor = json.next_cursor_str;
        }
    }

    return list;
}

Async CTPは簡単に解説すると、awaitキーワードを使うと、本来非同期のものが同期のように書けるようになります。詳しくは非同期処理 (C# によるプログラミング入門)を参照のこと。コード的にも見たように、差異はWebClientのOpenReadの部分を、await OpenReadTaskAsyncに変更しただけで、あとはまるっきり一緒です。非同期なんて簡単なものだね。と、言いたかったのですが、全部読み込んでListで返してるぢゃん……。これじゃEnumerateじゃないよ、yield returnじゃないの?これだと結果取得に時間かかるし、Takeなどを用いて、途中で止めることもできないし。あまりよくない。

結論としては今のところどうやら無理ということで。asyncの返すものはTaskもしくはTask<T>でなければならない。いや、Task<IEnumerable<T>>を返してくれればいいぢゃん、await yield returnとか出来たら素敵ぢゃないのん?と思わなくもないというか、普通にそういうリクエストも上がっているのですが、それにはIAsyncEnumerable<T>のようなものと、それに対するコンパイラサポートが必要だよね、という返しでした。

Rx + Async

IAsyncEnumerable<T>、それってIObservable<T>で代替出来る話だよね。IObservable<T>は連続的な非同期を内包しているから。(※IObservable<T>は一つのインターフェイスであまりにも多くのものを表現出来てしまい、内部の状態が読みづらく(同期なのか非同期なのか、遅延なのか即時なのか)混乱を生みがちという問題もありますが……)。なので、Rxでやってみましょう。といっても、Rxで完全に自前でやるのは相当大変なので、Async CTPのサポートも併用します。これにより非同期の待機が同期的に書けるようになり、yield returnであったりlist.Addであったりの部分を、OnNextに置き換えるだけになります。

Stable版のRxにはAsync CTP連携は入っていないのですが、Experimental(実験的)版には、awaitで待機出来る、というだけはなく、幾つかAsync CTPと連携できるメソッドが入っています。

// ExperimentalのRxのため、将来的にもこのコードが動作し続けることは保証しません
static IObservable<string> EnumerateFriends(string screenName)
{
    // ラムダ式の中でasync書けることがポイント
    return Observable.Create<string>(async (observer, cancelToken) =>
    {
        try
        {
            var cursor = "-1";
            while (cursor != "0")
            {
                if (cancelToken.IsCancellationRequested) return; // cancelをチェック

                var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
                    screenName, cursor);
                using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
                {
                    var json = DynamicJson.Parse(stream);
                    foreach (var item in json.users)
                    {
                        observer.OnNext(item.screen_name); // yield returnのかわりに
                    }
                    cursor = json.next_cursor_str;
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return; // 例外発生時はOnErrorを呼んで終了
        }

        observer.OnCompleted(); // 例外発生もキャンセルもなく完了したなら、OnCompletedを呼ぶ
    });
}

static void Main()
{
    EnumerateFriends("neuecc")
        .Take(350) // 350件後にDisposeされてtokenがcancelになる
        .Subscribe(
            s => Console.WriteLine(s),
            e => Console.WriteLine("error:" + e),
            () => Console.WriteLine("完了")); // Takeのほうから呼び出されるので、cancel扱いになっても表示される

    Console.ReadLine();
}

Observable.Create(RangeやGenerateなどの生成子、WhereやSelectなどの演算子の全てが使っている、本当のプリミティブの生成子)を使って、生のobserverでOnNext, OnError, OnCompletedの3つを制御してやります。Createやtry-catchの分、ネストが深くなってしまっていますが、コード自体は同期的に、yield returnを使って書いていたものとほとんど変わってないのが分かると思います。yield returnの部分にOnNextを置いた、それだけでそのまま置き換えられています。

これならIObservable<T>でも十分に自動生成のサポートが効いていると言えなくもないですね。やってみて、結構満足できてしまった。パフォーマンス的にも、演算子をベタベタ組み合わせるのはあまり良くはならないので、こうしてasync/awaitと連携させて作れると、素直に書けるうえに、パフォーマンス向上も狙えるのが嬉しい。ただ、OnErrorやOnCompleted、キャンセル(Dispose)をどうするか。考慮する事項が多いので、ある程度分かっていないと大変かもしれません。全て考えておかないと、正しく動作しません。既存演算子の組み合わせだけで済ませられるなら、そういった考慮事項は演算子が受け持ってくれるので、考えなくて済むのですが……。どうしても演算子の組み合わせじゃうまく出来ない、逆に複雑になりすぎる、そういった時の奥の手、ぐらいに考えておくと良さそう。

ところでasyncはメソッドの宣言だけでなく、ラムダ式の部分でも宣言できてawaitすることが出てきてしまうんですよね、ならば、同じようなコンパイラ生成であるyield returnも、現状は外部メソッドでしか使えないわけですが、以下のようにインラインでも使えるようになってくれると嬉しいなって。思ってしまうのです。

// 妄想なので、現状はこれは出来ませんが!
var infinity = Enumerable.Create(()=>
{
    var num = 0;
    while(true)
    {
        yield return num++;
    }
});

そんなもの散々突っ込み受けたですって?Iterator Blocks Part Seven: Why no anonymous iterators? - Fabulous Adventures In Coding。ええ、知ってます。しかし、awaitなどで必要さの要請を受けて、コストとベネフィットが逆転するときが来た、と、思うのです。それに、VBでも、いや、なんでもない。

Expand

さて、ともかくAsync CTPは未来の話であり、現状手元にあるもので何とかする方法はないのだろうかというと、あります。ようするところ、再帰的に辿ってるわけですよね、cursorを。じゃあ、Expandです。ExpandはReactive Extensions v1.0安定版リリースでEnumerableバージョンのものを説明しましたが、Observableバージョンもあります。

// 補助メソッド、Async CTPにはOpenReadTaskAsyncとか、そういうのがデフォで用意されてますが、
// Rxにはないので、自前で用意しなきゃあいけないという、それだけの話です(ダウンロードしてストリングを返すだけのもの)
public static class WebRequestExtensions
{
    public static IObservable<string> DownloadStringAsync(this WebRequest request)
    {
        return Observable.Defer(() => Observable.FromAsyncPattern<WebResponse>(
                request.BeginGetResponse, request.EndGetResponse)())
            .Select(res =>
            {
                using (var stream = res.GetResponseStream())
                using (var sr = new StreamReader(stream))
                {
                    return sr.ReadToEnd();
                }
            });
    }
}

// ExpandはStable版にはまだ搭載されていないので、Experimental版を使ってください
static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url).DownloadStringAsync().Select(DynamicJson.Parse);
    };

    return downloadJson("-1")
        .Expand(d => (d.next_cursor_str == "0")
            ? Observable.Empty<dynamic>() // TakeWhileで判定すると最後の一つを取りこぼすので
            : downloadJson(d.next_cursor_str))
        .SelectMany(d => (dynamic[])d.users)
        .Select(d => (string)d.screen_name);
}

そこそこ直感的ではないでしょうか?最初 Expand().TakeWhile(next_cursor_str != "0") と書いたのですが、それだと最後のページを取りこぼしてしまうのに気づいて、Emptyを投げる方針に変更しました。その辺、境界については注意を払わなきゃですね。

そして、残念ながら、ExpandはRxのStable版にはまだない。ということはWP7にもないわけで。

再帰とRx

Stable版でもやりましょう。awaitなし、Expandなし。では、どうやって作りましょうか。うーん、再帰的というのなら、本当に再帰させてしまえばいいのではないか?

static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = null; // 再帰するにはこーして最初にnull代入
    downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url)
            .DownloadStringAsync()
            .Select(DynamicJson.Parse) // ここまでExpandと共通
            .SelectMany(json =>
            {
                // Expandメソッドの中でやってることを大幅簡易化、ということです、つまるところ。
                var next = (json.next_cursor_str == "0")
                    ? Observable.Empty<dynamic>()
                    : downloadJson((string)json.next_cursor_str);
                return (IObservable<dynamic>)Observable.StartWith(next, json);
            });
    };

    return downloadJson("-1")
        .SelectMany(d => (dynamic[])d.users) // ここからもExpandと共通
        .Select(d => (string)d.screen_name);
}

これを再帰というには、あまり再帰してないのですが、まあ雰囲気雰囲気。Expandと大体共通です。つまるところ、Expandを自前で作る、のは結構大変なので、Expandのように汎用的ではなく、特化したものをその場で作る、といった程度の代物。そうすれば、少しは簡単に用意できます。

まとめ

同期でListに格納するだけなら簡単。遅延でやるのもyield returnのお陰で簡単。非同期で辿るのは難しい。awaitで複数の値をyield的に列挙するのは現状難しい。Rxとawaitの連携は大変素晴らしい。Expandは便利。なければないで何とかなる。でもやっぱ大変。

一見簡単なことが存外難しいってのはいくないですね。一見簡単なら、簡単なままでできないと。Expandも悪くはないんですけど、中々どうして慣れてないと分かりづらい。しかし、将来のC#には十分期待できそう。と、思いました、まる。あとRxとAsyncは全然仲良しなんですよ~、というところです。

Rxでのイベント変換まとめ - FromEvent vs FromEventPattern

Reactive Extensionsの機能の一つに.NETにおけるイベントをIObservable<T>に変換する、というものがあります。Bridging with Existing .NET Events。そして、そのためのメソッドがFromEventでした。ところが最近のRxでは二つ、FromEventとFromEventPatternが用意されています。この差異は何なのでしょうか?

結論としては、過去のRx(このサイトの古い記事や他のサイトの過去の記事などで触れられている)やWindows Phone 7でのFromEventはFromEventPatternに改名されました。後続にEventPatternという(object Sender, TEventArgs EventArgs)を持つ.NETのイベントの引数そのものを渡すものです。そして、空席になったFromEventに新しく追加されたFromEvent(紛らわしい!)は、EventArgsだけを送ります。それ以外の差異はありません。

つまるところFromEventは FromEventPattern.Select(e => e.EventArgs) ということになります。なら、それでいいぢゃん、何も混乱を生む(WP7のFromEventがFromEventPatternである、というのは致命的よねえ)ことはないよ、とは思うのですが、パフォーマンスの問題でしょうかね。確かに、Senderは必要なく使うのはEventArgsだけの場合が多い。それなのに、毎回EventPatternを生成していたり、Selectというメソッド呼び出しが入るのは無駄です。

そもそもインスタンスに対してFromEventで包むということは、クロージャでsenderは変数としていつでもどこでも使えてしまうのですよね、そもそも、そもそも。そういう意味でも送られてくるのはEventArgsだけでいいのであった。というわけで、基本的にはFromEventでいいと思います。

FromEventPatternについて

では、改めてFromEventPatternを復習します(WP7の人はFromEventで考えてください)。Observable.FromEventPattern(TEventArgs) Method (Object, String) (System.Reactive.Linq)にサンプルコードがあるのですけれどね。そうそう、MSDNのリファレンスには、一部のメソッド/一部のオーバーロードにはサンプルコードがあります。全部ではないのがミソです、見て回って発掘しましょう。まあ、というわけで、とりあえずそのFileSystemWatcherで。

// FileSystemWatcherは指定フォルダを監視して、変化があった場合にイベントを通知します
// 例えばCreatedイベントはファイルが作成されたらイベントが通知されます
var fsw = new FileSystemWatcher(@"C:\", "*.*") { EnableRaisingEvents = true };

// FromEventPatternその1、文字列でイベント名指定
Observable.FromEventPattern<FileSystemEventArgs>(fsw, "Created")
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

// FromEventPatternその2、静的なイベントをイベント名指定(WP7にはない)
Observable.FromEventPattern<ConsoleCancelEventArgs>(typeof(Console), "CancelKeyPress")
    .Subscribe(e => Console.WriteLine(e.EventArgs.SpecialKey));

一番馴染み深いと思うのですが、文字列でイベント名を指定するものです。その2のほうはあまり見ないかもしれませんが、静的イベントに対しての指定も可能です。これら文字列指定によるメリットは、比較的シンプルであること。デメリットは、リフレクションを使うので若干遅い・スペルミスへの静的チェックが効かない・リファクタリングが効かない、といった、リフレクション系のデメリットそのものとなります。

リフレクションしかないの?というと、勿論そんなことはありません。

// FromEventPatternその3、EventHandlerに対する変換
var current = AppDomain.CurrentDomain;
Observable.FromEventPattern(h => current.ProcessExit += h, h => current.ProcessExit -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs));

// FromEventPatternその4、EventHandler<T>に対する変換
Observable.FromEventPattern<ContractFailedEventArgs>(
        h => Contract.ContractFailed += h, h => Contract.ContractFailed -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.Message));

// FromEventPatternその5、独自イベントハンドラに対する変換
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => new FileSystemEventHandler(h),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

イベントの登録と削除を行うためのラムダ式を渡してやります。その3とその4は比較的分かりやすいのではないでしょうか。その5の第一引数が謎いのですが、これはconversionです。C#の型システムの都合上、そのまんまだと独自イベントハンドラを処理出来ないので、型を変換してやる必要があるという定型句。

数あるFromEventPatternのオーバーロードの中で、一番多く使うのはその5だと思います。何故なら、C#のイベントは独自イベントハンドラになっていることが多いから。はっきしいって、最低です。EventHandler<T>を使ってくれてさえいれば、こんな苦労はしなくて済むというのに。独自イベントハンドラは100害あって一利なし。え、WPFとか.NET標準がイベントハンドラは独自のものを使ってる?それは、WPFが悪い、.NET設計の黒歴史、悪しき伝統。

それと、もはや独自デリゲートも最低です。FuncやActionを使いましょう。C#のデリゲートはメソッドの引数や戻り値が一致していようが、型が違ったら別のものとして扱われます。そのことによる不都合は、↑で見たように、あるんです。極力ジェネリックデリゲートを使いましょう。そうすれば、こんな腐った目に合わなくても済みます。

ところで、その5は、もう少しだけ記述が短くなります。

// FromEventPatternその5、第一引数別解、こう書くと短くて素敵
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => h.Invoke,
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

h.Invoke。というのは、割とhそのものなわけですが、しかしInvokeと書くことで型が変換されます。この辺はコンパイラの都合上のマジックというか何というか。そういうものだと思えばいいのではかと。その5のスタイルで書くときは、この書き方をすると良いと思います。で、まだオーバーロードがあって

// その6
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => fsw.Created += h, h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

conversionが不要で書けたりもします。一見素晴らしい、のですが、これ、中でなにやってるかというとconversionに相当するものをリフレクションで生成してるだけだったりして。そのため、なるべくconversionを使うオーバーロードのほうを使ったほうがよいでしょう。h => h.Invokeを書くだけですしね。このオーバーロードは紛らわしいだけで存在意義が不明すぎる。

FromEventについて

と、長々と見てきましたが、ではFromEventのほうも。

// FromEvent
Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
        h => (sender, e) => h(e),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.FullPath));

// FromEventPatternその5(比較用)
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => (sender, e) => h(sender, e),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Select(e => e.EventArgs)
    .Subscribe(e => Console.WriteLine(e.FullPath));

というわけで、FromEventPatternのその5に近いわけですが、conversionでEventArgsしか渡していない、という点が差異ですね。なので、後続にはsenderが伝わってこず、EventArgsしか通りません。まあ、senderは、↑の例ですとfswでどこでも使えるので、そもそも不要なわけで、これで良いかと思います。

ところでFromEventも色々なオーバーロードがあるにはあるんですが、私の頭では存在意義が理解できなかったので無視します。挙動とかは理解したんですが、なんというか、存在する必要性、有効な利用法がさっぱり分からなかったのです……。まあ、多分、あんま意味ないと思うので気にしないでもいいかと。

拡張メソッドに退避させよう

FromEventにせよFromEventPatternにせよ、長いです。長い上に定型句です。なので、拡張メソッドに退避させると、スッキリします。例えば、今まで見てきたFileSystemWatcherだったら

// .NETのFromEventなら IObservable<TEventArgs>
// .NETのFromEventPatternなら IObservable<EventPattern<TEventArgs>>
// WP7のFromEventなら IObservable<IEvent<TEventArgs>>
// を返す拡張メソッド群を用意する。
// 命名規則はイベント名AsObservableがIntelliSenseの順序的にお薦め
public static class FileSystemWatcherExtensions
{
    public static IObservable<FileSystemEventArgs> CreatedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Created += h, h => watcher.Created -= h);
    }

    public static IObservable<FileSystemEventArgs> DeletedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Deleted += h, h => watcher.Deleted -= h);
    }

    public static IObservable<RenamedEventArgs> RenamedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<RenamedEventHandler, RenamedEventArgs>(
            h => (sender, e) => h(e), h => watcher.Renamed += h, h => watcher.Renamed -= h);
    }

    public static IObservable<FileSystemEventArgs> ChangedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Changed += h, h => watcher.Changed -= h);
    }
}
var fsw = new FileSystemWatcher(@"C:\", "*.*") { EnableRaisingEvents = true };

// 例えば、ただ変更をロギングしたいだけなんだよ、という場合の結合
// FromEventを外出ししていることによって、すっきり書ける
Observable.Merge(
        fsw.CreatedAsObservable(),
        fsw.DeletedAsObservable(),
        fsw.ChangedAsObservable(),
        fsw.RenamedAsObservable())
    .Subscribe(e => Console.WriteLine(e.ChangeType + ":" + e.Name));

といった形です。また、普通に+-でのイベント以外のものへの登録も可能です。例えば

// LayoutRootはWPFの一番外枠の<Grid Name="LayoutRoot">ということで。
Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => (sender, e) => h(e),
        h => LayoutRoot.AddHandler(UIElement.MouseDownEvent, h),
        h => LayoutRoot.RemoveHandler(UIElement.MouseDownEvent, h))
    .Subscribe(e => Debug.WriteLine(e.ClickCount));

こんな形のものもObservable化が可能です。

イベントの解除

Subscribeの戻り値はIDisposableで、Disposeを呼ぶことでイベントが解除されます。

// アタッチ
var events = Observable.Merge(
        fsw.CreatedAsObservable(),
        fsw.DeletedAsObservable(),
        fsw.ChangedAsObservable(),
        fsw.RenamedAsObservable())
    .Subscribe(e => Console.WriteLine(e.ChangeType + ":" + e.Name));

// デタッチ(合成などをしていて、元ソースが複数ある場合も、すべて解除されます)
events.Dispose();

Rxのこの仕組みは、従来に比べて圧倒的にイベントの解除がやりやすくなっていると思います。

まとめ

非同期の説明ばかりしてきていて、イベントはすっかり置き去りだったことを、まずはゴメンナサイ。少し前からFromEvent周りは大きな仕様変更が入ったわけですが、ようやくまともに解説できました。基本中のキの部分であるここが、過去のリソースがそのまま適用出来ないという最悪の自体に陥っていたので、とりあえずこれで何とか、でしょうかどうでしょうか。

小さなこととはいえ、WP7との互換性が絶えているのが痛いのですが、その辺どうにかならなかったのかねー、とは思います。けれど、このEventArgsだけ送るFromEvent自体は良いと思います。 .Select(e => e.EventArgs) が定型句だったので、こういった変更は喜ばしい限り。それと、今まで思っていた、ぶっちゃけラムダ式とかRxでイベント登録するならsenderって不要じゃね?に対する答え(その通りで、完全に不要)を出してくれたのが嬉しい。

さて、変換できるのはいいけれど、じゃあどこで使うのがいいの?という話がいつもありません。次回は、時間周りと絡めて、その辺のお話が出来ればと思いますが、いつも次回予告が達成されたことはないので、別のことを書くでしょう←ダメぢゃん。

Reactive Extensions v1.0安定版リリース

Reactive Extensions v1.0 Stable and v1.1 Experimental available now! ということで、今までも安定版だの正式だの何なり言っていましたが、今回こそ、本当に本当に正式リリース、v1.0だそうです。整理されたドキュメント、多くのチュートリアルビデオ、大幅なパフォーマンス改善、そして、よくテストされた(かどうかは不明)安定版としてのライブラリ本体。全てが整いました。さあ、使いましょう!実際のプロダクトに!

また、NuGetではRx-でStable版を、Rx_experimental-で実験版を、他にIx_experimental-でIxをインストールすることが可能です。

Ix復活

Ix(Interactive Extensions、Reactiveの反対ということでEnumerableEx、Linq to Objectsを拡張する拡張メソッド群)が復活しました。復活前に、今後についての意見を募集していたのですが、素敵なことに私の意見が全部反映されていました!XxxEnumerableはIntelliSenseの邪魔だから廃止してね、とかForEachにはindex付きのオーバーロード入れてよ、とかTreeを走査するメソッド入れてよ、とか。これは嬉しい。いやあ、やっぱり言っておくものですね。

そんなわけで、もうForEachを自作する必要はありません:)

標準クエリ演算子で何が「出来ない」のかを知っておくことは大切です。標準では複数回列挙なしで前後の値を使うことは出来ないし、再帰的に辿るような、複数段のSelectManyも出来ない。MaxByなどキーを使った最大値の取得もない。

Ixは、そこを補完してくれます。

Expand

Ixの中から、Expandを紹介します。なお、Rx(Observable)にはExperimental Releaseのほうにはあるのですが、まだStableには入っていません。

これは何かというと幅優先探索でツリーを解きほぐします。イミワカリマセンネ。ええと、ツリー状のオブジェクトを辿る場合は、通常は再帰を使って書くと思います。でも、そうして再帰で辿るのって、各枝の値が欲しいわけなんですよね?もしそうなら、ツリーは一直線上に出来る。IEnumerable<T>に出来る。LINQが適用できる。

ええ。熟練した C# 使いは再帰を書かないのです。で、ツリーです。ツリーを辿るのは頻出の再帰構造でパターン化できて、うんたらかんたら。ああ、もう!与えられた木から,子→親への対応を作る,を C# で - NyaRuRuの日記を見るといいです、それで全部解決です!

ExpandはBreadthFirstにあたります。DepthFirstはないの?というと、今のところないですねー。ないものはないです、しょうがない。それはさておき、このツリー的なものを辿るのというのは割とあるシチュエーションで、そしてExpandというメソッドは実に強力なので、是非使い方をマスターして欲しい。ので、例を出します。例えばWinFormsのコントロール。

Panelの下にButtonや、子Panelが並んでいます。さて、この中から全てのButtonを取り出したいのですが、どうしましょうか?予め配列にButtonを持っておく、のもまあ答えですが、ルート階層から辿るようにしましょう。Formに並ぶコントロールは、階層に別れたツリー構造をしています。Expandを使ってみましょう。

Expandはセレクターの結果がEmptyになるまで、辿り続けます。辿る順番は階層順(幅優先)。今回はContorolsを辿って全てのControlを列挙、Buttonが欲しいのでOfTypeでフィルタリング。というわけです。WPFやSilverlightでも、同様に辿ることが出来ます(ちょっとWPFのコントロール階層が面倒くさくて、コードがゴチャゴチャするので、今回はWinFormsを例とさせていただきました)

こういった走査メソッドはlinq.jsにもあります(CascadeBreadthFirst、メソッド名のとおり、NyaRuRuさんの作成されたAchiralから大きな影響を受けています)。JavaScriptの場合、ツリーの代表的なものはDOMです、というわけで勿論DOMに適用できるのですが、DOMの列挙はjQueryでやったほうがいいですよー、なので、JSONやオブジェクト(これもまたツリーになっている)の走査に使うのがいいかもしれません。何にせよ、使いどころというのは存外あるものです。

気になった点

そんな素敵なIxですが、触っていて幾つか気になった点があったので、またForumに投げておきました。リクエストが反映されたことで、調子に乗って味を占めているのかもしれません。ではなくて、フィードバックは積極的に出してあげたほうがいいでしょう常識的に考えて。英語?全部機械翻訳です、はは、まあ、コードがあれば伝わるはず。伝わりました。返答が15分で来た。

まず、Scanの挙動。seedなしの場合に、Rxは最初の値も列挙しますが、Ixは最初の値をスルーするという、挙動の違い。で、これ、最初の値がスルーされるのは都合が悪いので(スルーしたきゃあSkip(1)すればいい)、修正かなあ、と思います。返答では、この点については何も言ってませんでしたが。

Scanにはもう一つ、seed有りの際に、Rx, Ixともにseedをスルーしますが、F#などはseedも列挙します。これは以前はScan0というseed含めて列挙する別のメソッドがあった(WP7版にはある)のですが、今はScan0は廃止されたので、それならScanの挙動をseed込みでの列挙に変更すべき。と、思ったのですが、返答はStartWith(seed)を使えばいいとのこと。それでも確かにいいのですが、基本seed有りにしてseed飛ばしたい時はSkip(1)のほうが使いやすいと思うのだけど。まあ、これはWP7版との互換性の問題もありますし、そもそもRxはStableと言った以上、もう挙動は変えられないので、しょうがないところかもしれません。

他にはRepeat拡張メソッド(無限リピートする)のソースがEmptyの場合の挙動。例えば Range(1,1).Repeat().Take(3) と1,1,1になるわけですが、 Empty().Repeat().Take(3) の結果はどうでしょう?答えは、無限ループを彷徨って止まらなくなります。Emptyに対するRepeatをどう解釈するか、は正直微妙なところですけれど、元ソースが空だと死ぬというのは、結構リスキーなのではないかな、と考えてしまうのです。また、Takeをつけるのは止まることを期待するという点もあるわけですが、この場合はTakeをつけようが何しようが無駄、というのも怖い。Rxチームからの回答は、この挙動は仕様とのことでした。

最後にちょっとリクエストしてみた。今回のIxではDistinctにオーバーロードが足されています。通常だとIEqualityComparer<T>というダルいものをクラス定義して(ここが最悪!)渡さなければいけないのですが、ラムダ式でキーを指定するだけで済みます。言うならばDistinctByといったところ。これは、実に大変有益です。このオーバーロードは、AnonymousComparerという私が以前に作ったものにも載っているのですが(ちなみに拡張メソッドがかち合ってしまうため、Ixと同時使用は不可能になってしまった!まあ、.csファイル一個のライブラリなので、かち合う部分はコメントアウトすればいいのですが)、大変重宝しています。しょっちゅう使ってます。特にExceptとかで多用しているんですが……、今回IxではDistinctにしかオーバーロード足されていません。他の集合系メソッドであるIntersect, Except, Unionでも使えると便利なのになー、って思ってしまうのです。なのでリクエストしておきました。回答は、考えておく、とのことなのでもしかしたら次のバージョンでは乗っかっているかもしれません。

日本語だと言えるけれど、バッサリ切って機械翻訳した英語だと伝えたいニュアンスは吹っ飛んでしまうなあ、ううみぅ。必ずしもコードがあれば伝えられる、わけでもないか、当たり前だけど。

QueryableEx(笑)

ネタ。NuGetではIx_Experimental-Providerで入るんですが、まあ、ネタ。中身はEnumerableExのIQueryable版です。何がネタなのかというと、IQueryableはそれだけでは何の意味もなくて、解釈するエンジン(Linq to SqlなりLinq to Twitterなり)が大事なわけです。そうでなければ、not supportedをぶん投げるだけです。さて、そして、標準クエリ演算子ですらnot supported率が少なくないのに、QueryableExに対応するクエリプロバイダ……。ありえない、です。

そんなわけで、使う機会はないでしょう。んまあ、気合でQueryableExすらもフルサポートするクエリプロバイダを自作すれば、活用出来ますが、やはりそんな機会はないでしょう。

RxJSは?

ドキュメント書いたりとか、QueryableEx作ったり(笑)とか、色々忙しかったのでしょふ。今回「も」全く音沙汰なしですが、次こそは更新されるんじゃないですかねー、分かりませんけど。jQueryにはDeferred乗りましたが、アレは正直かなり使いづらいのでその点でも私はRxJSにしたいなあ。それと、MS支援でのNode.jsのWindows対応も発表されましたし、JavaScriptのAsyncを何とかするためのRx、はかなり価値があると思うので、もう少し頑張って欲しいな、と思います。

まとめ

しっかりとMSDN入りしている、ドキュメントもある、正式にv1 Stableと告知されている、など、もう採用できない理由はなくなりました。日本語リソースはないですが、それは気合で乗り切ればいいぢゃない(とか言ってるうちはダメなのでしょうがー)。

Jesse Liberty(オライリーから出ているプログラミングC#の著者)による解説本も今秋に出るようだし、確実に、順調にメインストリームに乗るテクノロジとしての道を歩んでいますので、安心して追えるのではないかと思います。

ReactiveOAuth ver.0.4 - Twitpic(OAuth Echo)対応

ver.0.4になりました。少し前に0.3.0.1をこっそり出していたので、それを含めて0.3からの差分は、「対象Rxのバージョンが現在最新の1.0.10605(Stable)」に、というのと「Realmが含まれていると認証が正しく生成出来なかったバグの修正」と、「TwitpicClientサンプルの追加」になります。バグのほうは本当にすみません……。Twitterでしかテストしてない&TwitterはRealm使わないため、全然気づいていなくて。ダメですねホント。

OAuth Echo

TwitpicはOAuth Echoという仕組みでTwitterと連携した認証をして、画像を投稿できます。詳しくはUsing OAuth Echo | dev.twitter.comTwitPic Developers - API Documentation - API v2 » uploadにありますが、よくわかりませんね!Twitpicに画像を投稿、というわけでTwitpicのAPIにアクセスするわけですが、その際のヘッダにTwitterに認証するためのOAuthのヘッダを付けておくと、Twitpic側がTwitterに問い合せて認証を行う。という仕組みです、大雑把に言って。

ただのOAuthとはちょっと違うので、今までのReactiveOAuthのOAuthClientクラスは使えない。けれど、認証用ヘッダの生成は同じように作る。というわけで、ここはReactiveOAuthにひっそり用意されているOAuthBaseクラスを継承して、Twitpic専用のTwitpicClientクラスを作りましょう。

が、作るのもまた少し面倒なので Sample/TwitpicClient/TwitpicClient.cs に作成したのを置いておきました。ファイルごとコピペってご自由にお使いください。.NET 4 Client Profile, Silverlight 4, Windows Phone 7の全てに対応しています。

Windows Phone 7でのカメラ撮影+投稿のサンプル

TwitpicClient.cs の解説は後でやりますが、その前に利用例を。WP7でカメラ撮影+投稿をしてみます。CameraCaptureTaskの利用法に関しては CameraCaptureTaskを使ってカメラで静止画撮影を行う – CH3COOH(酢酸)の実験室 を参考にさせて頂きました。TwitterのAccessTokenの取得に関しては、ここでは解説しませんので neue cc - ReactiveOAuth - Windows Phone 7対応のOAuthライブラリ を参照ください。

// CameraCaptureTaskのCompletedイベント
void camera_Completed(object sender, PhotoResult e)
{
    if (e.TaskResult == TaskResult.OK)
    {
        // 撮影画像(Stream)をバイト配列に格納
        var stream = e.ChosenPhoto;
        var buffer = new byte[stream.Length];
        stream.Read(buffer, 0, buffer.Length);

        // key, secret, tokenは別に設定・取得しておいてね
        new TwitpicClient(ConsumerKey, ConsumerSecret, accessToken)
            .UploadPicture(e.OriginalFileName, "from WP7!", buffer)
            .ObserveOnDispatcher()
            .Catch((WebException ex) =>
            {
                MessageBox.Show(new StreamReader(ex.Response.GetResponseStream()).ReadToEnd());
                return Observable.Empty<string>();
            })
            .Subscribe(s => MessageBox.Show(s), ex => MessageBox.Show(ex.ToString()));
    }
}

new TwitpicClient(キー, シークレット, アクセストークン).UploadPicture(ファイル名, メッセージ, 画像) といった風に使います。戻り値はIObservable<string>で結果(投稿後のURLとか)が返ってくるので、あとは好きなように。投稿に失敗した場合は、WebExceptionが投げられるので、それを捉えてエラーメッセージを読み取ると開発には楽になれそうです。

TwitpicClient.cs

以下ソース。Sample/TwitpicClient/TwitpicClient.cs と同じですが、自由にコピペって使ってください。大事なことなので2回言いました。このコード自体はTwitpicに特化してありますが、認証部分のヘッダを少しと画像アップロードを変更する部分を弄れば、他のOAuth Echoサービスにも対応させることができると思います。

using System;
using System.Linq;
using System.Text;
using System.Net;
using System.IO;

#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive.Linq;
#endif

namespace Codeplex.OAuth
{
    public class TwitpicClient : OAuthBase
    {
        const string ApiKey = ""; // set your apikey

        readonly AccessToken accessToken;

        public TwitpicClient(string consumerKey, string consumerSecret, AccessToken accessToken)
            : base(consumerKey, consumerSecret)
        {
            this.accessToken = accessToken;
        }

        private WebRequest CreateRequest(string url)
        {
            const string ServiceProvider = "https://api.twitter.com/1/account/verify_credentials.json";
            const string Realm = "http://api.twitter.com/";

            var req = WebRequest.Create(url);

            // generate oauth signature and parameters
            var parameters = ConstructBasicParameters(ServiceProvider, MethodType.Get, accessToken);
            // make auth header string
            var authHeader = BuildAuthorizationHeader(new[] { new Parameter("Realm", Realm) }.Concat(parameters));

            // set authenticate headers
            req.Headers["X-Verify-Credentials-Authorization"] = authHeader;
            req.Headers["X-Auth-Service-Provider"] = ServiceProvider;

            return req;
        }

        public IObservable<string> UploadPicture(string filename, string message, byte[] file)
        {
            var req = CreateRequest("http://api.twitpic.com/2/upload.xml"); // choose xml or json
            req.Method = "POST";

            var boundaryKey = Guid.NewGuid().ToString();
            var boundary = "--" + boundaryKey;
            req.ContentType = "multipart/form-data; boundary=" + boundaryKey;

            return Observable.Defer(() =>
                    Observable.FromAsyncPattern<Stream>(req.BeginGetRequestStream, req.EndGetRequestStream)())
                .Do(stream =>
                {
                    using (stream)
                    using (var sw = new StreamWriter(stream, new UTF8Encoding(false)))
                    {
                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"key\"");
                        sw.WriteLine();
                        sw.WriteLine(ApiKey);

                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"message\"");
                        sw.WriteLine();
                        sw.WriteLine(message);

                        sw.WriteLine(boundary);
                        sw.WriteLine("Content-Disposition: form-data; name=\"media\"; filename=\"" + filename + "\"");
                        sw.WriteLine("Content-Type: application/octet-stream");
                        sw.WriteLine("Content-Transfer-Encoding: binary");
                        sw.WriteLine();
                        sw.Flush();

                        stream.Write(file, 0, file.Length);
                        stream.Flush();

                        sw.WriteLine();
                        sw.WriteLine("--" + boundaryKey + "--");
                        sw.Flush();
                    }
                })
                .SelectMany(_ => Observable.FromAsyncPattern<WebResponse>(req.BeginGetResponse, req.EndGetResponse)())
                .Select(res =>
                {
                    using (res)
                    using (var stream = res.GetResponseStream())
                    using (var sr = new StreamReader(stream, Encoding.UTF8))
                    {
                        return sr.ReadToEnd();
                    }
                });
        }
    }
}

認証ヘッダ作成はConstructBasicParametersとBuildAuthorizationHeaderというprotectedメソッドで行います。わけわかんないよね…気持ち悪いよね…。使いにくいメソッドです、すみません、私もそう思います。そういうものだと思って、見ないふりしてもらえれば幸いです。

コードの大半を占めているのは画像を投稿するためのmultipart/form-dataのもので、これはもうOAuth Echo関係ない話、で、面倒ぃ。特にWP7での非同期だと涙が出る。POSTはBeginGetRequestStreamとBeginGetResponseの二つの非同期メソッドをセットで使う必要があるため、コードがごちゃごちゃするのです。

しかしReactive Extensionsを使えばあら不思議!でもないですが、ネストがなくなって完全に平らなので、結構普通に読めるのではないでしょうか?(ストリーム書き込みのコード量が多いのは、これは同期でやっても同じ話なので)。例外処理も利用例のところで見たように、Catchメソッドをくっつけるだけ。実に色々とスッキリします。

Rxがあれば非同期POSTも怖くない。

やっていることは単純で、FromAsyncPatternでBegin-Endを変換。StreamへのWriteは後続への射影はなく、対象(Stream)に対しての副作用(書き込み)のみなのでDo、RequestStream->Responseへの切り替えはSelectMany、Responseから結果のStringへの変換はSelect、と、お決まりの定型メソッドに置き換えていっただけです。この辺はパターンみたいなものなので、これやるにはこのメソッドね、というのを覚えてしまえばそれでお終いです。

Stream読み書きは非同期にしないの?

StreamにもBeginReadとかBeginWriteとかありますものね。しかし、しません(キリッ。理由は死ぬほど面倒だからです。やってみると分かりますが想像以上に大変で、おまけに何とか実現するためにはRxでのチェーンを大量に重ねる必要がありオーバーヘッドがバカにならない……。なので、わざわざやるメリットも全くありません。

一応、ReactiveOAuthのOAuthClientは、そこも非同期でやってますが、わざわざ頑張った意味があったかは、かなり微妙なところ。実装は Internal/AsynchronousExtensions.cs にあるので参照ください。それと、この AsynchronousExtensions.cs はReactive Extensionsで非同期処理を簡単にで言った「拡張メソッドのすゝめ」を実践したものでもあります。WebRequestはプリミティブすぎて扱い難いので、Rxに特化したうえで簡単に扱えるようにDownloadStringやUploadValueなどといったメソッドを拡張してあります。便利だと思いますので、こちらも TwitpicClient.cs と同様に、ファイルごと自由にコピペって使ってやってください。

まとめ

ReactiveOAuthを公開する目的に、「これが入り口になってRxの世界を知ってもらえると嬉しい」というのもあったのですが、WP7開発で利用してもらったりと、その目的は少しは達成出来たかもで、良かった良かった。ちょっと練りたりなかったり、未だにバグがあったり(本当にごめんなさい!)と至らない点も多いですが、今後も改善していきますのでよろしくお願いします。

MSDNの上のReactive Extensions

DevLabsを卒業し、晴れて正式にData Developer Center入りしたReactive Extensionsですが、徐々に正式リリースへ向かう準備として、ついにドキュメントがMSDN入りを果たしました。

まだPre-releaseということで工事中の部分が多いですが、これはドキドキしますね。Getting Started with Rxはどんなところで使えるかの説明とダウンロード先について、Using Rxは詳細なドキュメント(まだ工事中項目も幾つか、でもかなり充実している感)、Reactive Extensions Class Libraryはリファレンス。必要十分には揃っていますね、あとは大きめのサンプルも欲しいところだけど、追加されるかな?

ところで、ツリー階層を見てください。

.NET Development直下です。何て素晴らしい位置にあるのか!Entity FrameworkやSilverlight などと同列に並べられているのを見ると、ついに始まったな、と感無量です。

その他のRx

Rx本体が順風満帆なのに比べると、RxJSとIx(EnumerableEx)はどうしちゃったのでしょう……。まず、RxJSは、やる気は一応あるみたいです。ダウンロードも可能ですしね。APIは随分更新されていなくて、.NET版Rxとかなり差が開いてしまいましたが(バグも幾つか見つかっているのですが)。今はRxチーム自体がドキュメント周りで忙しいということで手を付けてられないっぽく、ゆっくり待つしかないです。プロジェクト自体は死んでいないそうなので、そこだけは安心してもいいかと。(私は内心不安ですが!)

Ixのほうは、一旦死亡です。ダウンロードセンターはもとより、NuGetからも、もう手に入らなくなりました。が、しかし、完全終了ではなく、一旦引っ込めて練りなおして、Rxとは別枠となるかもですが提供する意思はあるそうで。「Don't worry. Ix is not disappearing forever.」とのこと。そこで、フィードバックが欲しいとのことなので、答えてあげるといいんじゃないかしら。Asking your input - Interactive Extensions functionality。私も好きなメソッドと嫌いなもの(XxxEnumerableがIntelliSenseを汚染して嫌いだった)、それとドサクサに紛れて再帰/ツリー探索系のメソッドを入れてよ!とリクエストしておきました。

NuGet

6月頭にバージョンが少し上がって、現在はStableがv1.0.10605、Experimentalがv1.1.10605が最新バージョンとなっています。そしてNuGetのパッケージ名も少し変化して、Stable版はRx-MainなどプリフィックスがRx-のもの、ExperimentalはRx_Experimental-MainなどプリフィックスがRx_Experimental-と完全に別れて提供されるようになりました。

と、まあ

そんなところです。最近は私もこうして、更新されたよー、と言うだけで、技術的な情報提供を怠っていてすみませんすみませんなので、徐々に再開していきたいと思います。Schedulerの話とか非常に重要なのに、このブログでは一度も書いていないですしね、全くもってイクない。こないだのセッションReactive Extensionsで非同期処理を簡単には非同期中心だったので、イベントサイドについての話もしっかりまとめたいなあ、とかも思いつつ。

Prev | | Next

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive