Deep Dive AsEnumerable

AsEnumerable、といったらLINQのAsEnumerableです。その挙動は、IEnumerable<T>へと型変換をします。それだけ、なので実に影が薄いのですが、それでいて奥深く使いこなしが求められる、はずなのですが陰が薄いので無視されている感がなきにしもあらずなので、しっかりと紹介したいと思います。

AsEnumerableの実装

実装は非常に単純明快で、中身ほとんど空っぽです。

public static IEnumerable<T> AsEnumerable<T>(this IEnumerable<T> source)
{
    return source;
}

ようするにアップキャストです。どういう時に使えばいいかというと、例えば可変長引数とIEnumerable<T>の両方を受けたいオーバーロードを作る場合。

public void Show(params string[] values)
{
    Show(values.AsEnumerable());
}

public void Show(IEnumerable<string> values)
{
    foreach (var item in values)
    {
        Console.WriteLine(item);
    }
}

foreachでグルグル値を取り出すだけなので、IEnumerable<T>で受けるようにしたい。でも利便性のため可変長引数も用意しておきたい。という場合はよくあります。なので毎回このオーバーロードを用意するんですが、その時に、こうしてAsEnumerableを使います。なお、AsEnumerableを忘れると無限に再帰してStackOverflowしてしまいます……。

AsEnumerableがラップするのではなく、ただのアップキャストにすぎないということは重要です。以前にLinqとCountの効率でも書きましたが、LINQの一部のメソッドはIList<T>であったりICollection<T>であるとき、asやisを使って最適化を図ります。foreachするだけだとあまり関係ないですが、受け取ったIEnumerable<T>を使ってLINQで処理する場合だと、このことが効いてきます。

ならば常にアップキャストでよくて、ラップなど必要ないのではないか?というと必ずしもそうではありません。アップキャストは、ダウンキャストを可能にします。

// アップキャストされている状態というのは
var numbers = new List<int>().AsEnumerable();

// ダウンキャストが可能にするということ、そして、ダウンキャストは危険
var list = (List<int>)numbers;

ダウンキャストが危険だ、やるな、というのなら、そもそもアップキャストをすべきではない。抽象で受けることこそがオブジェクト指向だとか、形だけのパターンにはまってるとそうなる。原則は比較的シンプルで。メソッドのシグネチャにおいて、引数の型は最大に受け入れるため出来る限り抽象で、戻り値の型は最大に利用出来るようにするため具象にすればいい。ローカル変数に関しては、原則varでよし。どうしても必要ならば、ローカル変数側、つまりメソッドの利用側が安全なアップキャストで適宜、抽象で受ければよいでしょう。

ダウンキャストはダメ基本的に。そして、ダウンキャストは可能な状態にすること自体がダメなので、アップキャストも最小限に。というのがメソッド定義の基本だと思っていますが、プロパティだと少し事情は変わってくるかも。一々ラップすることのパフォーマンスロスや手間を考えると、しかたがなくアップキャストで提供するのも、ありかなー、とは。

Hide

そんなわけで、具象型を消去して、完全にラップしてIEnumerable<T>として提供したいという場合もあるかと思います。そこで、Ix(Interactive Extensions、Reactive Extensionsのオマケで提供されているEnumerableの拡張メソッド群、NuGetのIx_Experimental-Mainで入れるのが手っ取り早い。Experimentalのとおり、まだ実験的な代物で保証されていないことは注意)にはHideというものがあります。これも実装は単純明快で

public static IEnumerable<T> Hide<T>(this IEnumerable<T> source)
{
    foreach (var item in source)
    {
        yield return item;
    }
}

といった形。Hideというメソッド名は具体的な型を隠す、といった意味合いで付けられているのでしょうね。

Rx(AsObservable)の場合

Enumerableと関連性の深いObservable、Rxにも同様に型変換をするAsObservableというメソッドが用意されています。主に使うシチュエーションは、Subjectの隠蔽をするときでしょうか。

// 5秒後に非同期で値を返すというだけのもの
public static IObservable<T> SendValueAfter5Seconds<T>(T value)
{
    var asyncSubject = new AsyncSubject<T>();

    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(TimeSpan.FromSeconds(5)); // とりまsleep

        asyncSubject.OnNext(value); // AsyncSubjectのキャッシュへ値送信
        asyncSubject.OnCompleted(); // 非同期処理完了の合図(ここでObserverに値が送られる)
    });

    return asyncSubject.AsObservable();
}

このコード自体には何の意味もありません、非同期処理を模して、スレッドプールで5秒待って値を送る、というだけです。大事なのはAsyncSubjectをAsObservableして返していること。このAsObservableはただのアップキャストではなく、新しい型でラップして具象型(AsyncSubject)を隠しています。つまり、AsEnumerableではなくHideに等しい挙動です。ここで、もしAsObservableを書いていないと

// return時にAsObservableが書かれていないとダウンキャスト可能になる
var subject = (AsyncSubject<int>)SendValueAfter5Seconds(100);

subject.Subscribe(Console.WriteLine);

// なので、外側から発火可能になってしまう、これは最悪
subject.OnNext(-1);
subject.OnCompleted(); 

Subject(標準だと4種類ある)は、Rxにおけるイベントの表現です。C#でのイベントは、内部からは発火可能、外側からは購読しかできない。というようになっていると思います。その挙動にするために、また、純粋に安全性のために、Subjectを購読させるために外側に出す場合は、AsObservableでラップして型を消さなければなりません。

※極初期(RxがReactive Frameworkと言われていた頃なぐらいに前)は、このAsObservableはHideというメソッド名でした。AsObservableのほうが分かりやすくて良いとは思いますが、Enumerableでの挙動と合わせるなら、キャストするだけのAsObservableとHideに分けるべきだったのでは?と思わなくは全くないです←Rxにおいてはただのキャストしただけのものは使う機会ないと思うので、現在の形で正解

IQueryableにおけるAsEnumerableの重要性

Enumerable、Observableと来たので、QueryableでのAsEnumerableも見てみましょう。QueryableにおけるAsEnumerableは、クエリ構築の終了です。IQueryableでのクエリ構築をそこで打ち切るというスイッチです。どういうことか、というと

// とあるContextによるQueryableはSkipWhileとCountをサポートしていなかったとします
var count = toaru.createContext() // IQueryeable<T>とする
    .Where(x => x % 2 == 0)
    .SkipWhile(x => x < 100)
    .Count(); // 未サポートなのでExceptionが来る!

// そういう場合、ToListするといい、というアドバイスがよく上がります
var count = toaru.createContext()
    .Where(x => x % 2 == 0)
    .ToList() // ここまでの式でクエリ生成+List化
    .SkipWhile(x => x < 100) // ここからはIEnumerable<T>
    .Count();

// でも、それならAsEnumerableでいいんだよ?
var count = toaru.createContext()
    .Where(x => x % 2 == 0)
    .AsEnumerable() // 後続がGetEnumeratorを呼んだ時にここまででクエリ生成
    .SkipWhile(x => x < 100) // ここからはIEnumerable<T>
    .Count();

Queryableの連鎖で、例えばLinq to SqlだったらSQL文を作っていきます。で、foreachであったりToListであったりをすると、SQLが作られて発行されてデータベースと通信されて。それって、どのタイミングでQueryableの中の式木がSQL文に変換されるかというと、GetEnumeratorが呼ばれた時、です。それはいつ呼ばれるの?というと、foreachされたりToListされたり、AsEnumerableしてその後のEnumerableのメソッドがGetEnumeratorを呼んだ、その時。

こんな感じです。ToArrayやToListは、そこで実体化するので、メソッドチェーンの後続がIEnumerable<T>なのは当然のことですが、AsEnumerableがただのキャストにすぎないのに意味合いが変化するのは、拡張メソッドの解決の優先度のため。型がIQueryable<T>の状態だとWhereやSelectはQueryableのWhereやSelectが選択されますが、型がIEnumerable<T>の状態だとEnumerableのWhereやSelectが選択される、ということです。Enumerable自体は遅延評価なので、後続のIEnumerable<T>がGetEnumeratorを呼び出したときに評価が開始されるのは変わらず。

AsEnumerableやToArray、ToListは実はQueryableクラスにはありません。なので、素の状態で拡張メソッドの解決がIEnumerable<T>側を呼び出すようになっています。

ところでクエリ文の構築はGetEnumeratorが呼ばれた時と言いましたが、GetEnumeratorを呼ばないとき、例えばQueryableでのFirstやSumはどうなっているのかというと、内部でExecuteが呼ばれた時です。IQueryProviderはこんなインターフェイス。

public interface IQueryProvider
{
    IQueryable<TElement> CreateQuery<TElement>(Expression expression);
    TResult Execute<TResult>(Expression expression);
    // 非ジェネリックなものもありますが省略
}

FirstやSumなど、単独の結果を返すものは内部でExecuteを呼びます。なので、クエリプロバイダの実装次第ですが、通常はこのExecuteが呼ばれた時にクエリ文の構築と実行を同時に行うものと思われます。SelectやWhereなど、後続にIQueryableのチェーンを繋げるものは、内部でCreateQueryのほうを呼びます。そして最終的に複数の結果(IEnumerable<T>)を返す場合は、GetEnumeratorが呼ばれた時にクエリ文の構築と実行を行うものと思われます。

まとめ

AsEnumerableは、ようするにただのキャストなだけですが、その果たしている役割というものを考えると非常に深い。その割には(QueryableでToListばかり使われたりと)今ひとつ知名度に欠ける気もしますので、ドサッと紹介を書いてみました。ただのキャストだって語ろうと思えば幾らでも語れるLINQは素敵ですね!

DynamicObjectでより沢山の型情報を取る方法

Chaining AssertionのAsDynamicに少し不具合があったので、ver.1.5.0.0として更新しました。今まではメソッドの引数にnullを渡すと死んでました。ぬるぽ!まあしかし、引数そのもののからしかTypeが取れなかったので、nullだと、どのみちメソッドを特定するための型情報がないからオーバーロードの解決は不可能なので、仕様ですよ仕様、という言い訳。

などとふざけたことを思っていたのですけれど、コンパイル時に決定される引数の型を取り出す方法が判明したので、そのへんも含めて完全に解決しました。やったね。その方法は、というと

public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result)
{
    var csharpBinder = binder.GetType().GetInterface("Microsoft.CSharp.RuntimeBinder.ICSharpInvokeOrInvokeMemberBinder");
    if (csharpBinder == null) throw new ArgumentException("is not csharp code"); // CSharpコードではない

    // ジェネリックの型引数の取得(Hoge<T>(1, t)とかでのTのこと)
    var typeArgs = (csharpBinder.GetProperty("TypeArguments").GetValue(binder, null) as IList<Type>).ToArray();
    // コンパイル時に決定されているパラメータの型の取得
    var parameterTypes = (binder.GetType().GetField("Cache", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(binder) as Dictionary<Type, object>)
        .First()
        .Key
        .GetGenericArguments()
        .Skip(2)
        .Take(args.Length)
        .ToArray();
    
    // それらの情報からMethodInfoを特定する
    var method = MatchMethod(binder.Name, args, typeArgs, parameterTypes);
    // 呼び出し
    result = method.Invoke(target, args);

    return true; // 呼べてれば必ずTrueなので。
}

……。ひどそうな匂いが!まず、素のままでは情報が足りなすぎるので、基本的にリフレクション全開です。その中でも、parameterTypesが今回追加したところです。binderのCacheに、CallSiteのデリゲート(dynamicを使って呼び出すと、コンパイル時にこの辺のものが自動生成される)があるので、そこから型情報を持ってこれることに気づいたのだ(キリッ

もう少し詳しく説明しますと

// このヘンテツもないどうでもいいコードは
dynamic d = null;
var result = d.Hoge<string, int>(100, (string)null, (ICollection<int>)null);

// コンパイル後はこんな結果に化けちゃいますあら不思議!
object d = null;
if (Program.<Main>o__SiteContainer0.<>p__Site1 == null)
{
	Program.<Main>o__SiteContainer0.<>p__Site1 = CallSite<Func<CallSite, object, int, string, ICollection<int>, object>>.Create(Binder.InvokeMember(CSharpBinderFlags.None, "Hoge", new Type[]
	{
		typeof(string), 
		typeof(int)
	}, typeof(Program), new CSharpArgumentInfo[]
	{
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.None, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null), 
		CSharpArgumentInfo.Create(CSharpArgumentInfoFlags.UseCompileTimeType | CSharpArgumentInfoFlags.Constant, null)
	}));
}
object result = Program.<Main>o__SiteContainer0.<>p__Site1.Target(Program.<Main>o__SiteContainer0.<>p__Site1, d, 100, null, null);

細かい部分はどうでもいいので、良く分からないFuncが生成されてるんだな、というとこだけ見てもらえれば。さて、これを頭に入れた上で、DynamicObjectのTryInvokeMemberでbinderの奥底のCacheを探してみると、

このFunc6というものが、Func<CallSite, object, int, string, ICollection<int>, object>です、発見出来ました!これの型引数が、コード上での呼び出し時の型引数になるわけです。なお、第一引数はCallSite、第二引数はインスタンスなので無視してSkip(2)、そして引数の個数分だけTake(まあ、ようするに最後が戻り値の型なわけですが)。

というわけで、実際に改善されたAsDynamicを使ってみますと、

// こんな何のヘンテツもないオーバーロードのあるクラスがあるとして
public class PrivateClass
{
    private string Hoge(IEnumerable<int> xs)
    {
        return "enumerable";
    }

    private string Hoge(List<int> xs)
    {
        return "list";
    }
}

// 型はdynamicです←意地でもvarで書きたい人
var mock = new PrivateClass().AsDynamic();

// 型でオーバーロード分けが出来るようになった!
List<int> list = new List<int>();
IEnumerable<int> enumerable = new List<int>();

(mock.Hoge(list) as string).Is("list");
(mock.Hoge(enumerable) as string).Is("enumerable");

というわけで、より正確なオーバーロードの解決が図れるようになりました。何をアタリマエのことを言ってるんだお前は、と思うかもしれませんが、TryInvokeMemberに渡ってくる情報はobject[] argsなのです。args[0]の型はobjectなわけで、それをGetTypeしたら、出てくるのはList<int>なのです。何をどうやっても、.csファイルではIEnumerable<int>と書かれているという情報を得ることは出来なかったわけです、今までは。ましてやnullだったら型もヘッタクレもなかったわけです。でもこれからは違う。コード上のデータが取れる!

などとツラツラと書いてみましたが、利用者的にはどうでもいい話ですね、はい。それに、リフレクションはいいとしても、Cacheって何よ?CacheのFirstが決め打ちなのって何よ?などなどは、ぶっちゃけよくわかっていなくて(だって全部internalだしね……)若干怖いのですが、まあ、多分、大丈夫でしょう、多分……。それと、まだ完璧じゃあなくてサポートしてないオーバーロードのケースが幾つかあります。とはいえ、ほとんどのシチュエーションでは問題ないのではかと思います。

まとめ

このChaining Assertionですが、私は結構普通に使いまくっていて、ないと死にます。激しく便利。そうそう、紹介しますと、 actual.Is(expected) と、メソッドチェーン形式で流れるようにアサーションが書けます。ラムダ式での指定やコレクションへの可変長引数など、ちょうどかゆいところに手が届く拡張を施してあって、随分とテストを書くのが楽になります。

AsDynamicは、オマケ機能というか。privateなメソッドやプロパティ、フィールドにもdynamicを通してアクセス出来るように変換します。たまにしか使いませんが(MSTestにはPrivate Accessorがあるので)、あると便利よね、という時もそこそこあり。

MSTestだけではなく、NUnitやMbUnit、xUnit.NETでも使えますので&NuGet経由でも入れられますので、一度是非お試しを。

非同期の再帰的な辿り方、或いはRxとC# 5.0 Asyncの連携について

例えば、ページを辿る。何度もアクセスを繰り返して、辿る。非同期で。単純なようで、やってみると何気にこれが結構難しい。例としてコードレシピのReactive Extensionsを使用してTwitterから非同期にデータを取得し表示するがありました。MVVMも絡めて、素晴らしいサンプルですね!

というわけで、お題を拝借して、Twitter ApiのGET statuses/friendsを使わせて頂きます。んが、今回は、手を抜いてフォロワーのscreen_name(@hogehogeのhogehogeの部分)だけを取れれば良い、ということにします。JSON解析やデシリアライズも面倒だし話の本題でもないので省略するため、DynamicJsonを使って、JSONを生のまんまっぽく扱うことにします。DynamicJsonは便利だなあ(棒)

さて、まずTwitter APIのcursorですが、大体こんな風になっています。目的はカーソルを辿って全てのuser(に含まれるscreen_name)を集めること。

JSON取得毎にnext_cursor_strという、次のページへのIDが取れるので、それを辿っていって、0が出たらページ末尾。といった具合です。next_cursor_strの値は一見ランダムに見える整数(2121409421とかそんな値になっている)であり、next_cursorという数字のものもあるのに、_strという文字列として得られるほうを使っています。何故かというと、TwitterのステータスIDが53bitを越えたお話 - tmytのらくがきを参照ください。DynamicJsonでは数字(Number)はdoubleとして扱うので、_strのほうを使わないと、危ういわけです。

まあ、ただのお題で本題な話ではないので、その辺は深く考えずそういうものなのだなあ、ぐらいで。

同期とyield returnと非同期

コードレシピのサンプルを見させて頂いたのですが、ネットワークアクセス部分がOpenReadなので、非同期"ではない"です。でも挙動は非同期だよ?というのは、Scheduler.ThreadPoolを使っているからなわけですが、つまるところ挙動的にはBackgroundWorkerを使って非同期にするのと同じことです。その場合ですと、Generateも確かに良いのですが、APIへのアクセスがそもそも同期であるならば、難しく考える必要はなく、yield returnを使ったほうが簡単です。単純なものは演算子の組み合わせで、複雑なものは素直に偉大なるコンパイラ生成(yield return)に頼る。そういう切り分けがLINQ的には大事かなって。

static IEnumerable<string> EnumerateFriends(string screenName)
{
    var cursor = "-1"; // 初期値は-1から
    while (cursor != "0") // 0が出たら終了
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = new WebClient().OpenRead(url))
        {
            var json = DynamicJson.Parse(stream); // 面倒くさいんでDynamicJson使いますよ:)
            foreach (var item in json.users)
            {
                yield return item.screen_name; // screen_nameを列挙
            }
            cursor = json.next_cursor_str; // 次のカーソルにセット
        }
    }
}

static void Main()
{
    var friends = EnumerateFriends("neuecc").ToArray();
}

すっきりと書けるのが分かると思います。え、これだとブロックしてしまって良くない?その通り。じゃあ非同期にしましょう。いえ、Reactive Extensionsで簡単にできてしまいます。yield returnで生成されたEnumerableをObservableに変換するのは、ToObservableです。

static void Main()
{
    EnumerateFriends("neuecc")
        .ToObservable(Scheduler.ThreadPool) // ThreadPoolで実行!
        .Subscribe(Console.WriteLine);
        
    Console.ReadLine(); // 終了してしまうからね
}

ToObservableはデフォルトでは Scheduler.CurrentThread 上で実行されるため、同期的にブロックしますが(※Push型シーケンスだからといって必ずしも非同期とは限らない)、任意のものに変更することも可能です。今回はScheduler.ThreadPoolを指定したので、ThreadPool上で動くようになっています。そのため、ブロックされません。

こういった書き方のほうが、コードがクリアになるし、IEnumerable<T>とIObservable<T>に両対応できてる、という柔軟性の点でも良いかと思います。また、BackgroundWorkerを使うよりも遥かに簡単ですよね。プログレス通知もなく、ただ処理をバックグラウンドでやりたい、というだけならば、Rxを使ったほうが楽チンです。プログレスが必要な場合は、Rxだとその辺の処理を作りこまなければならないので、素直にBackgroundWorkerを用いるのもいいかもしれません。私だったらRxをちょっと拡張してプログレス処理を作り込むほうを選ぶかな?その辺の話はReactive ExtensionsとAsync CTPでの非同期のキャンセル・プログレス処理を参照ください。

また、Observable化するとPublishによる分配 - C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsなど、色々と応用な使い方が広がるのもメリットの一つと言えるでしょう。

Async CTP

今回は例がWPFなため、WebClientで同期的(OpenRead)に取ってしまいましたし、それでも全然問題ないわけですが、SilverlightとかWP7だったらこの手(同期でJSON取ってきてyield returnで返す)は使えません。同期のOpenReadがそもそもなくて、非同期のOpenReadAsyncしかないからね。どうしましょう?それだとyield returnが使えないのはモチロンのこと、Generateでもうまく動きません。もしページ番号がカーソルのように不定ではなく1,2,3...といった形で辿れたとしても、Observable.Rangeでやると、うまくいきません。非同期なので結果が帰ってくる時間が不定だからです。結果を取得してから次の結果を取得する、という形式にしないとダメなのです。

ところでそもそも、同期的に書いたとしても、本来は書くのは大変なはずなのです。それが、yield returnというコンパイラ生成があるから簡単に書ける。ということは、そうです、非同期もコンパイラ生成してしまえばいいのです、ということでAsync CTPで書きましょう。Async CTPはC# 5.0で入る、かもしれない、async/await構文を使えるようにするためのものです。コミュニティテクノロジープレビュー。ようするにベータ版ですね。

// このコードはAsync CTP (SP1 Refresh)によるもので、将来的にも同じコードで動作することは保証しません
async static Task<List<string>> EnumerateFriends(string screenName)
{
    var list = new List<string>();

    var cursor = "-1";
    while (cursor != "0")
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
        {
            var json = DynamicJson.Parse(stream);
            foreach (var item in json.users)
            {
                list.Add(item.screen_name); // yield returnの代わりに……
            }
            cursor = json.next_cursor_str;
        }
    }

    return list;
}

Async CTPは簡単に解説すると、awaitキーワードを使うと、本来非同期のものが同期のように書けるようになります。詳しくは非同期処理 (C# によるプログラミング入門)を参照のこと。コード的にも見たように、差異はWebClientのOpenReadの部分を、await OpenReadTaskAsyncに変更しただけで、あとはまるっきり一緒です。非同期なんて簡単なものだね。と、言いたかったのですが、全部読み込んでListで返してるぢゃん……。これじゃEnumerateじゃないよ、yield returnじゃないの?これだと結果取得に時間かかるし、Takeなどを用いて、途中で止めることもできないし。あまりよくない。

結論としては今のところどうやら無理ということで。asyncの返すものはTaskもしくはTask<T>でなければならない。いや、Task<IEnumerable<T>>を返してくれればいいぢゃん、await yield returnとか出来たら素敵ぢゃないのん?と思わなくもないというか、普通にそういうリクエストも上がっているのですが、それにはIAsyncEnumerable<T>のようなものと、それに対するコンパイラサポートが必要だよね、という返しでした。

Rx + Async

IAsyncEnumerable<T>、それってIObservable<T>で代替出来る話だよね。IObservable<T>は連続的な非同期を内包しているから。(※IObservable<T>は一つのインターフェイスであまりにも多くのものを表現出来てしまい、内部の状態が読みづらく(同期なのか非同期なのか、遅延なのか即時なのか)混乱を生みがちという問題もありますが……)。なので、Rxでやってみましょう。といっても、Rxで完全に自前でやるのは相当大変なので、Async CTPのサポートも併用します。これにより非同期の待機が同期的に書けるようになり、yield returnであったりlist.Addであったりの部分を、OnNextに置き換えるだけになります。

Stable版のRxにはAsync CTP連携は入っていないのですが、Experimental(実験的)版には、awaitで待機出来る、というだけはなく、幾つかAsync CTPと連携できるメソッドが入っています。

// ExperimentalのRxのため、将来的にもこのコードが動作し続けることは保証しません
static IObservable<string> EnumerateFriends(string screenName)
{
    // ラムダ式の中でasync書けることがポイント
    return Observable.Create<string>(async (observer, cancelToken) =>
    {
        try
        {
            var cursor = "-1";
            while (cursor != "0")
            {
                if (cancelToken.IsCancellationRequested) return; // cancelをチェック

                var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
                    screenName, cursor);
                using (var stream = await new WebClient().OpenReadTaskAsync(url)) // await!
                {
                    var json = DynamicJson.Parse(stream);
                    foreach (var item in json.users)
                    {
                        observer.OnNext(item.screen_name); // yield returnのかわりに
                    }
                    cursor = json.next_cursor_str;
                }
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            return; // 例外発生時はOnErrorを呼んで終了
        }

        observer.OnCompleted(); // 例外発生もキャンセルもなく完了したなら、OnCompletedを呼ぶ
    });
}

static void Main()
{
    EnumerateFriends("neuecc")
        .Take(350) // 350件後にDisposeされてtokenがcancelになる
        .Subscribe(
            s => Console.WriteLine(s),
            e => Console.WriteLine("error:" + e),
            () => Console.WriteLine("完了")); // Takeのほうから呼び出されるので、cancel扱いになっても表示される

    Console.ReadLine();
}

Observable.Create(RangeやGenerateなどの生成子、WhereやSelectなどの演算子の全てが使っている、本当のプリミティブの生成子)を使って、生のobserverでOnNext, OnError, OnCompletedの3つを制御してやります。Createやtry-catchの分、ネストが深くなってしまっていますが、コード自体は同期的に、yield returnを使って書いていたものとほとんど変わってないのが分かると思います。yield returnの部分にOnNextを置いた、それだけでそのまま置き換えられています。

これならIObservable<T>でも十分に自動生成のサポートが効いていると言えなくもないですね。やってみて、結構満足できてしまった。パフォーマンス的にも、演算子をベタベタ組み合わせるのはあまり良くはならないので、こうしてasync/awaitと連携させて作れると、素直に書けるうえに、パフォーマンス向上も狙えるのが嬉しい。ただ、OnErrorやOnCompleted、キャンセル(Dispose)をどうするか。考慮する事項が多いので、ある程度分かっていないと大変かもしれません。全て考えておかないと、正しく動作しません。既存演算子の組み合わせだけで済ませられるなら、そういった考慮事項は演算子が受け持ってくれるので、考えなくて済むのですが……。どうしても演算子の組み合わせじゃうまく出来ない、逆に複雑になりすぎる、そういった時の奥の手、ぐらいに考えておくと良さそう。

ところでasyncはメソッドの宣言だけでなく、ラムダ式の部分でも宣言できてawaitすることが出てきてしまうんですよね、ならば、同じようなコンパイラ生成であるyield returnも、現状は外部メソッドでしか使えないわけですが、以下のようにインラインでも使えるようになってくれると嬉しいなって。思ってしまうのです。

// 妄想なので、現状はこれは出来ませんが!
var infinity = Enumerable.Create(()=>
{
    var num = 0;
    while(true)
    {
        yield return num++;
    }
});

そんなもの散々突っ込み受けたですって?Iterator Blocks Part Seven: Why no anonymous iterators? - Fabulous Adventures In Coding。ええ、知ってます。しかし、awaitなどで必要さの要請を受けて、コストとベネフィットが逆転するときが来た、と、思うのです。それに、VBでも、いや、なんでもない。

Expand

さて、ともかくAsync CTPは未来の話であり、現状手元にあるもので何とかする方法はないのだろうかというと、あります。ようするところ、再帰的に辿ってるわけですよね、cursorを。じゃあ、Expandです。ExpandはReactive Extensions v1.0安定版リリースでEnumerableバージョンのものを説明しましたが、Observableバージョンもあります。

// 補助メソッド、Async CTPにはOpenReadTaskAsyncとか、そういうのがデフォで用意されてますが、
// Rxにはないので、自前で用意しなきゃあいけないという、それだけの話です(ダウンロードしてストリングを返すだけのもの)
public static class WebRequestExtensions
{
    public static IObservable<string> DownloadStringAsync(this WebRequest request)
    {
        return Observable.Defer(() => Observable.FromAsyncPattern<WebResponse>(
                request.BeginGetResponse, request.EndGetResponse)())
            .Select(res =>
            {
                using (var stream = res.GetResponseStream())
                using (var sr = new StreamReader(stream))
                {
                    return sr.ReadToEnd();
                }
            });
    }
}

// ExpandはStable版にはまだ搭載されていないので、Experimental版を使ってください
static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url).DownloadStringAsync().Select(DynamicJson.Parse);
    };

    return downloadJson("-1")
        .Expand(d => (d.next_cursor_str == "0")
            ? Observable.Empty<dynamic>() // TakeWhileで判定すると最後の一つを取りこぼすので
            : downloadJson(d.next_cursor_str))
        .SelectMany(d => (dynamic[])d.users)
        .Select(d => (string)d.screen_name);
}

そこそこ直感的ではないでしょうか?最初 Expand().TakeWhile(next_cursor_str != "0") と書いたのですが、それだと最後のページを取りこぼしてしまうのに気づいて、Emptyを投げる方針に変更しました。その辺、境界については注意を払わなきゃですね。

そして、残念ながら、ExpandはRxのStable版にはまだない。ということはWP7にもないわけで。

再帰とRx

Stable版でもやりましょう。awaitなし、Expandなし。では、どうやって作りましょうか。うーん、再帰的というのなら、本当に再帰させてしまえばいいのではないか?

static IObservable<string> EnumerateFriends(string screenName)
{
    Func<string, IObservable<dynamic>> downloadJson = null; // 再帰するにはこーして最初にnull代入
    downloadJson = cursor =>
    {
        var url = string.Format("http://api.twitter.com/1/statuses/friends.json?screen_name={0}&cursor={1}",
            screenName, cursor);
        return WebRequest.Create(url)
            .DownloadStringAsync()
            .Select(DynamicJson.Parse) // ここまでExpandと共通
            .SelectMany(json =>
            {
                // Expandメソッドの中でやってることを大幅簡易化、ということです、つまるところ。
                var next = (json.next_cursor_str == "0")
                    ? Observable.Empty<dynamic>()
                    : downloadJson((string)json.next_cursor_str);
                return (IObservable<dynamic>)Observable.StartWith(next, json);
            });
    };

    return downloadJson("-1")
        .SelectMany(d => (dynamic[])d.users) // ここからもExpandと共通
        .Select(d => (string)d.screen_name);
}

これを再帰というには、あまり再帰してないのですが、まあ雰囲気雰囲気。Expandと大体共通です。つまるところ、Expandを自前で作る、のは結構大変なので、Expandのように汎用的ではなく、特化したものをその場で作る、といった程度の代物。そうすれば、少しは簡単に用意できます。

まとめ

同期でListに格納するだけなら簡単。遅延でやるのもyield returnのお陰で簡単。非同期で辿るのは難しい。awaitで複数の値をyield的に列挙するのは現状難しい。Rxとawaitの連携は大変素晴らしい。Expandは便利。なければないで何とかなる。でもやっぱ大変。

一見簡単なことが存外難しいってのはいくないですね。一見簡単なら、簡単なままでできないと。Expandも悪くはないんですけど、中々どうして慣れてないと分かりづらい。しかし、将来のC#には十分期待できそう。と、思いました、まる。あとRxとAsyncは全然仲良しなんですよ~、というところです。

Rxでのイベント変換まとめ - FromEvent vs FromEventPattern

Reactive Extensionsの機能の一つに.NETにおけるイベントをIObservable<T>に変換する、というものがあります。Bridging with Existing .NET Events。そして、そのためのメソッドがFromEventでした。ところが最近のRxでは二つ、FromEventとFromEventPatternが用意されています。この差異は何なのでしょうか?

結論としては、過去のRx(このサイトの古い記事や他のサイトの過去の記事などで触れられている)やWindows Phone 7でのFromEventはFromEventPatternに改名されました。後続にEventPatternという(object Sender, TEventArgs EventArgs)を持つ.NETのイベントの引数そのものを渡すものです。そして、空席になったFromEventに新しく追加されたFromEvent(紛らわしい!)は、EventArgsだけを送ります。それ以外の差異はありません。

つまるところFromEventは FromEventPattern.Select(e => e.EventArgs) ということになります。なら、それでいいぢゃん、何も混乱を生む(WP7のFromEventがFromEventPatternである、というのは致命的よねえ)ことはないよ、とは思うのですが、パフォーマンスの問題でしょうかね。確かに、Senderは必要なく使うのはEventArgsだけの場合が多い。それなのに、毎回EventPatternを生成していたり、Selectというメソッド呼び出しが入るのは無駄です。

そもそもインスタンスに対してFromEventで包むということは、クロージャでsenderは変数としていつでもどこでも使えてしまうのですよね、そもそも、そもそも。そういう意味でも送られてくるのはEventArgsだけでいいのであった。というわけで、基本的にはFromEventでいいと思います。

FromEventPatternについて

では、改めてFromEventPatternを復習します(WP7の人はFromEventで考えてください)。Observable.FromEventPattern(TEventArgs) Method (Object, String) (System.Reactive.Linq)にサンプルコードがあるのですけれどね。そうそう、MSDNのリファレンスには、一部のメソッド/一部のオーバーロードにはサンプルコードがあります。全部ではないのがミソです、見て回って発掘しましょう。まあ、というわけで、とりあえずそのFileSystemWatcherで。

// FileSystemWatcherは指定フォルダを監視して、変化があった場合にイベントを通知します
// 例えばCreatedイベントはファイルが作成されたらイベントが通知されます
var fsw = new FileSystemWatcher(@"C:\", "*.*") { EnableRaisingEvents = true };

// FromEventPatternその1、文字列でイベント名指定
Observable.FromEventPattern<FileSystemEventArgs>(fsw, "Created")
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

// FromEventPatternその2、静的なイベントをイベント名指定(WP7にはない)
Observable.FromEventPattern<ConsoleCancelEventArgs>(typeof(Console), "CancelKeyPress")
    .Subscribe(e => Console.WriteLine(e.EventArgs.SpecialKey));

一番馴染み深いと思うのですが、文字列でイベント名を指定するものです。その2のほうはあまり見ないかもしれませんが、静的イベントに対しての指定も可能です。これら文字列指定によるメリットは、比較的シンプルであること。デメリットは、リフレクションを使うので若干遅い・スペルミスへの静的チェックが効かない・リファクタリングが効かない、といった、リフレクション系のデメリットそのものとなります。

リフレクションしかないの?というと、勿論そんなことはありません。

// FromEventPatternその3、EventHandlerに対する変換
var current = AppDomain.CurrentDomain;
Observable.FromEventPattern(h => current.ProcessExit += h, h => current.ProcessExit -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs));

// FromEventPatternその4、EventHandler<T>に対する変換
Observable.FromEventPattern<ContractFailedEventArgs>(
        h => Contract.ContractFailed += h, h => Contract.ContractFailed -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.Message));

// FromEventPatternその5、独自イベントハンドラに対する変換
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => new FileSystemEventHandler(h),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

イベントの登録と削除を行うためのラムダ式を渡してやります。その3とその4は比較的分かりやすいのではないでしょうか。その5の第一引数が謎いのですが、これはconversionです。C#の型システムの都合上、そのまんまだと独自イベントハンドラを処理出来ないので、型を変換してやる必要があるという定型句。

数あるFromEventPatternのオーバーロードの中で、一番多く使うのはその5だと思います。何故なら、C#のイベントは独自イベントハンドラになっていることが多いから。はっきしいって、最低です。EventHandler<T>を使ってくれてさえいれば、こんな苦労はしなくて済むというのに。独自イベントハンドラは100害あって一利なし。え、WPFとか.NET標準がイベントハンドラは独自のものを使ってる?それは、WPFが悪い、.NET設計の黒歴史、悪しき伝統。

それと、もはや独自デリゲートも最低です。FuncやActionを使いましょう。C#のデリゲートはメソッドの引数や戻り値が一致していようが、型が違ったら別のものとして扱われます。そのことによる不都合は、↑で見たように、あるんです。極力ジェネリックデリゲートを使いましょう。そうすれば、こんな腐った目に合わなくても済みます。

ところで、その5は、もう少しだけ記述が短くなります。

// FromEventPatternその5、第一引数別解、こう書くと短くて素敵
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => h.Invoke,
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

h.Invoke。というのは、割とhそのものなわけですが、しかしInvokeと書くことで型が変換されます。この辺はコンパイラの都合上のマジックというか何というか。そういうものだと思えばいいのではかと。その5のスタイルで書くときは、この書き方をすると良いと思います。で、まだオーバーロードがあって

// その6
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => fsw.Created += h, h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.EventArgs.FullPath));

conversionが不要で書けたりもします。一見素晴らしい、のですが、これ、中でなにやってるかというとconversionに相当するものをリフレクションで生成してるだけだったりして。そのため、なるべくconversionを使うオーバーロードのほうを使ったほうがよいでしょう。h => h.Invokeを書くだけですしね。このオーバーロードは紛らわしいだけで存在意義が不明すぎる。

FromEventについて

と、長々と見てきましたが、ではFromEventのほうも。

// FromEvent
Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
        h => (sender, e) => h(e),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Subscribe(e => Console.WriteLine(e.FullPath));

// FromEventPatternその5(比較用)
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
        h => (sender, e) => h(sender, e),
        h => fsw.Created += h,
        h => fsw.Created -= h)
    .Select(e => e.EventArgs)
    .Subscribe(e => Console.WriteLine(e.FullPath));

というわけで、FromEventPatternのその5に近いわけですが、conversionでEventArgsしか渡していない、という点が差異ですね。なので、後続にはsenderが伝わってこず、EventArgsしか通りません。まあ、senderは、↑の例ですとfswでどこでも使えるので、そもそも不要なわけで、これで良いかと思います。

ところでFromEventも色々なオーバーロードがあるにはあるんですが、私の頭では存在意義が理解できなかったので無視します。挙動とかは理解したんですが、なんというか、存在する必要性、有効な利用法がさっぱり分からなかったのです……。まあ、多分、あんま意味ないと思うので気にしないでもいいかと。

拡張メソッドに退避させよう

FromEventにせよFromEventPatternにせよ、長いです。長い上に定型句です。なので、拡張メソッドに退避させると、スッキリします。例えば、今まで見てきたFileSystemWatcherだったら

// .NETのFromEventなら IObservable<TEventArgs>
// .NETのFromEventPatternなら IObservable<EventPattern<TEventArgs>>
// WP7のFromEventなら IObservable<IEvent<TEventArgs>>
// を返す拡張メソッド群を用意する。
// 命名規則はイベント名AsObservableがIntelliSenseの順序的にお薦め
public static class FileSystemWatcherExtensions
{
    public static IObservable<FileSystemEventArgs> CreatedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Created += h, h => watcher.Created -= h);
    }

    public static IObservable<FileSystemEventArgs> DeletedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Deleted += h, h => watcher.Deleted -= h);
    }

    public static IObservable<RenamedEventArgs> RenamedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<RenamedEventHandler, RenamedEventArgs>(
            h => (sender, e) => h(e), h => watcher.Renamed += h, h => watcher.Renamed -= h);
    }

    public static IObservable<FileSystemEventArgs> ChangedAsObservable(this FileSystemWatcher watcher)
    {
        return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
            h => (sender, e) => h(e), h => watcher.Changed += h, h => watcher.Changed -= h);
    }
}
var fsw = new FileSystemWatcher(@"C:\", "*.*") { EnableRaisingEvents = true };

// 例えば、ただ変更をロギングしたいだけなんだよ、という場合の結合
// FromEventを外出ししていることによって、すっきり書ける
Observable.Merge(
        fsw.CreatedAsObservable(),
        fsw.DeletedAsObservable(),
        fsw.ChangedAsObservable(),
        fsw.RenamedAsObservable())
    .Subscribe(e => Console.WriteLine(e.ChangeType + ":" + e.Name));

といった形です。また、普通に+-でのイベント以外のものへの登録も可能です。例えば

// LayoutRootはWPFの一番外枠の<Grid Name="LayoutRoot">ということで。
Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => (sender, e) => h(e),
        h => LayoutRoot.AddHandler(UIElement.MouseDownEvent, h),
        h => LayoutRoot.RemoveHandler(UIElement.MouseDownEvent, h))
    .Subscribe(e => Debug.WriteLine(e.ClickCount));

こんな形のものもObservable化が可能です。

イベントの解除

Subscribeの戻り値はIDisposableで、Disposeを呼ぶことでイベントが解除されます。

// アタッチ
var events = Observable.Merge(
        fsw.CreatedAsObservable(),
        fsw.DeletedAsObservable(),
        fsw.ChangedAsObservable(),
        fsw.RenamedAsObservable())
    .Subscribe(e => Console.WriteLine(e.ChangeType + ":" + e.Name));

// デタッチ(合成などをしていて、元ソースが複数ある場合も、すべて解除されます)
events.Dispose();

Rxのこの仕組みは、従来に比べて圧倒的にイベントの解除がやりやすくなっていると思います。

まとめ

非同期の説明ばかりしてきていて、イベントはすっかり置き去りだったことを、まずはゴメンナサイ。少し前からFromEvent周りは大きな仕様変更が入ったわけですが、ようやくまともに解説できました。基本中のキの部分であるここが、過去のリソースがそのまま適用出来ないという最悪の自体に陥っていたので、とりあえずこれで何とか、でしょうかどうでしょうか。

小さなこととはいえ、WP7との互換性が絶えているのが痛いのですが、その辺どうにかならなかったのかねー、とは思います。けれど、このEventArgsだけ送るFromEvent自体は良いと思います。 .Select(e => e.EventArgs) が定型句だったので、こういった変更は喜ばしい限り。それと、今まで思っていた、ぶっちゃけラムダ式とかRxでイベント登録するならsenderって不要じゃね?に対する答え(その通りで、完全に不要)を出してくれたのが嬉しい。

さて、変換できるのはいいけれど、じゃあどこで使うのがいいの?という話がいつもありません。次回は、時間周りと絡めて、その辺のお話が出来ればと思いますが、いつも次回予告が達成されたことはないので、別のことを書くでしょう←ダメぢゃん。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive