XboxInfoTwit - ver.2.3.0.2

電源OFFなのにONということになって投稿し続けるという酷い暴走していました、ごめんなさい……。というわけで直しました。

解説(言い訳とも言う)

XboxInfoTwitはスクレイピングという、普通にブラウザで表示する際のページのHTMLを解析してデータを取り出しています。今回は、HTML側でほんのちょびっと変更があって、その影響をモロに被って暴走しました。Ouch!こういう、うまくデータが取れなかった場合を想定してちゃんとエラーにしないとダメなのですね。私は今回はその辺まったく対策取ってなかったので、……といった結果になってしまいました。ほんとすみません。

一応、次からは今回のようなのが原因の暴走はなくなりました。別の原因がもとに、ってのは、うむむ……。

l2o.js 世界最速分解解説

l2o.js、でググっても何も出てきませんね!じゃあl2o.jsって何?って話なのですが、その前に、Reactive Extensionsが12/24にChristmas Releaseと称してリリースされたわけで、RxJSにも若干更新が入っていました。

· Fix for scheduling in the Concat, Catch, and OnError operators.
· If operator can now be used without supplying an “else” case.

特に大したこともないわけでしたが。しかし、インストールディレクトリである Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\RX_JS を見ると、l2o.jsという見慣れないものが……?

l2o.js、つまり、Linq 2 Objects.js。ええ、ええ……。linq.jsとモロにガチにかちあいそうな匂いがします。RxJS全然更新しないなあ、やる気あんのかよお、JVGoghも辞めちゃったしー、とか思っていたのですが、その裏でこっそりとんでもないものを仕込んでいたようです。いざ出てみると、むしろやらなかったのが不思議なぐらいな。

では、使ってみましょう。ついでにlinq.jsと比較しましょう。

var array = [12, 21, 4, 5, 36, 3, 10];

// l2o.js
L2O.Enumerable.FromArray(array)
    .Where(function (x) { return x % 2 == 0 })
    .Select(function (x) { return x * x })
    .ForEach(function (x) { alert(x) }); // 144, 16, 1296, 100

// linq.js
Enumerable.From(array)
    .Where(function (x) { return x % 2 == 0 })
    .Select(function (x) { return x * x })
    .ForEach(function (x) { alert(x) }); // 144, 16, 1296, 100

名前空間はL2Oから。そこにRange, Repeat, FromArrayなどなどの生成子があり、あとはメソッドチェーンでクエリ演算子があり。linq.jsと完全に一致。……これはlinq.jsオワタ。

いや待て。私がlinq.jsを作ったときには既に3つぐらいLinq to Objectsライブラリはあったけれど、それのどれにも不満があったから、自分で作ったわけで、同じように書けるからといって内部のクオリティが保証されているわけではない。Rxチームが作ってるからといって生半可なものだったら許さんぞ、と、いうわけで中身を覗いてみました。

Minifyされてますが、改行/インデント整形を施すだけで十分読めます。変数難読化が入っても、構造がシンプルなので全然読める。と、いうか、そうして普通に読めるのは、linq.jsと構造がまるっきり一緒だからですが。……。一緒ですが。一緒ですねこれ。そりゃC#の忠実移植を目指して作ったlinq.jsなわけなので、l2o.jsも同じ目標に向かってるだろうから一緒になるのは当然なのですが当然すぎてlinq.jsの存在意義ががが。

中身をチラッと見てみましょう。Selectを、Minifyされていたので変数名は私の方で付け直しました。

// l2o.js
L2O.Enumerable.prototype.Select = function (selector)
{
    var source = this;

    return L2O.Enumerable.Create(function ()
    {
        var current, count = 0, enumerator;

        return L2O.Enumerator.Create(
            function () // MoveNext
            {
                if (enumerator === void 0) // initialize
                {
                    enumerator = source.GetEnumerator()
                }
                if (!enumerator.MoveNext())
                {
                    return false
                }
                current = selector(enumerator.GetCurrent(), count++);
                return true
            },
            function () { return current }, // GetCurrent
            function () { enumerator.Dispose() }) // Dispose
    })
}

// linq.js
Enumerable.prototype.Select = function (selector)
{
    var source = this;
    selector = Utils.CreateLambda(selector);

    return new Enumerable(function ()
    {
        var enumerator;
        var index = 0;

        return new IEnumerator(
            function () { enumerator = source.GetEnumerator(); }, // Initialize
            function () // MoveNext & Current
            {
                return (enumerator.MoveNext())
                    ? this.Yield(selector(enumerator.Current(), index++))
                    : false;
            },
            function () { Utils.Dispose(enumerator); }) // Dispose
    });
}

完全に一致。linq.jsのほうでは、MoveNextのほうに定型句のようにif(enumerator === undefined){初期化処理}を書くのが嫌だったので、そもそも別関数として隔離、Currentはどうせキャッシュを返すだけなのだから省略してMoveNextと統合させてしまえ(this.Yieldというメソッドがその辺を受け持ってる、yield returnっぽく)とか、細々としたのを入れていますが、実質的には一緒です。

なお、この辺のLinq to Objectsの仕組みは、先日紹介しましたが、JSでも一緒です。Selectなど拡張メソッは、以前の物(this = source)を内包したうえで、新しいオブジェクト(new Enumerable)を返し、GetEnumeratorによりEnumeratorを生成し、最初のMoveNextが呼ばれた時に初めて動作が始まる。

アルファ

現在l2o.jsはアルファ版というか、それ以前の状態と思われるので、今実践に投げ込むのはダメです。メソッド全然足りないしバグいっぱいあるし。具体的に挙げると、Rangeはこれだとオーバーフローしない?とかReverseが即時評価ですよー、とかDisposeが不徹底で機能してない場合がある、とか、いっぱい。

メソッドは基本的なのしかありません。OrderByや、それと集合系がごっそり抜けているので、普通に使う分にも困るぐらいなので。まあ、集合系は(Dictionaryがないので)ちょっと実装が面倒ではある……。そのために私はDictionaryを導入しています。neue cc - linq.js ver 2.1.0.0 - ToDictionary, Share, Let, MemoizeAll というのはver2.1と、つい最近からで、それまではDictionary導入してないが故にバグ持ちだったんですよね、恐ろしや……。

まあ、Script#からの生成だろうから、Dictionaryを持ってくるぐらいはお茶の子さいさいかもしれません。

まとめ

l2o.jsは、今はまだこんな風にやるよ、という骨組みを見せているだけに過ぎませんが、すぐに標準クエリ演算子は実装されるでしょう。勿論、歓迎すべきことです!私の心中が穏やかでないのもしょうがない話です!linq.jsはJSでのLinqライブラリでは最後発ですが、現在CodePlex調べでは、同種のライブラリで月々のDL数が最も多いところまで行きました。地道に続けていれば良いものはいつか認められる、という甘い幻想を少しだけ見させてもらったのですが(3/dayとか微々たるものな世界なのですけどね、そう考えるとLinqでJSというものは、今は需要が……。でも、ASP.NET MVCの普及と共に.NETerにJSでのでのLINQは、まだまだ需要が発生する余地はありますな)、公式で十分なクオリティのものが出てしまった以上は、ただある俺々ライブラリの一つとなる。

とはいっても、公式では出来ないこと、出来ない付加価値は、まだまだ幾らでも足せるはずです!現時点でも無名関数の文字列ショートカット、WSH対応、vsdoc、大量の拡張メソッド、jQueryプラグイン化とやってきたし、これらは公式では出せないはず。で、まだネタはあります。今思っているのはClosure CompilerのAdvanced Optimizations対応、任意で配列のprototype拡張の追加、ですねえ。特に後者は、ちょっとしたことには大分便利になるはずだと思っています。

あとは、もしかしたらlinq.jsがRxJSやl2o.jsに影響を与えたのではないか?と考えると、嬉しい話ですかねー。ふむふむ。ま、その辺も含めて今年のRx周りは激しく加速しそうですね。

LINQの仕組みと遅延評価の基礎知識

新年明けましておめでとうございます。その第一弾の記事は実践 F# 関数型プログラミング入門の書評にしようと思っていたのですが、もう少し時間がかかりそうなので、せっかくの年始は基礎から考えようということで、LINQと遅延評価について最初から解説します。まあ、何をもって最初だとか基礎だとか言うのも難しいので私的な適当な基準で。つまり役に立たな(ry。なお、ここではLinq to Objects、IEnumerable<T>の連鎖についてのみ扱いますので、IQueryableについてはまた後日というか実のところ私はQueryableは全然分かってなくてやるやる詐欺が今も続いているといううがががが。

メソッドチェーン != return this

例によって単純なコードで。

var query = Enumerable.Range(1, 10).Select(i => i * i).Take(5);
foreach (var item in query)
{
    Console.WriteLine(item); // 1, 4, 9, 16, 25
}

1から10を二乗したうちの先頭5つを出力という、それだけのコードです。foreachする場合のinの右側が長くなるのは個人的に好きじゃないので、わざわざ変数に置いたりするのをよくやるのですが、これは好みですかねえ。なのでリスト内包表記とかあんま好きじゃなかったりはする、記法的に。

それはともかく、ドットで繋げていると実体が隠れてしまいがちなので、分解します。

var rangeEnumerable = Enumerable.Range(1, 10);
var selectEnumerable = rangeEnumerable.Select(i => i * i);
var takeEnumerable = selectEnumerable.Take(5);
foreach (var item in takeEnumerable)
{
    Console.WriteLine(item);
}

変数だらけでゴチャゴチャして余計に分からない。良く分からないものは、とりま図で。

こうなってます。中に、一つ前のものを内包している新しいオブジェクトを返しています。メソッドチェーンというと所謂ビルダー的な、もしくはjQueryなんかを想像してしまってチェーン毎に内部の状態が変化して return this するか、もしくは完全に新しいものを生成して返す(array.filter.mapしたら.filterで完全に新しい配列が生成され返って、.mapでも、的な。DeepCopyも同じようなものですか)みたいなのを想像してしまう感もあるのですが、そのどちらでもない。中に仕舞い込んで新しい包を返す。実に副作用レスでピュアい。

このことはデバッガで確認出来ます。

面白いのはSelectの戻り値の型で、WhereSelectEnumerableIteratorとなっていて、名前のとおりWhereとSelectが統合されていたりします。これは、Where->Selectが頻出パターンのためパフォーマンス向上のためでしょうねえ。面白いですがユーザー的にはあまり気にすることではないので深追いしないで次へ。

Takeの戻り値であるTakeIteratorはsourceとして中にSelectの戻り値であるWhereSelectEnumerableIteratorを抱えていて、Selectの戻り値はRangeの戻り値であるRangeIteratorを、中に抱えています。という連鎖が成り立っていることがしっかり確認できました。Visual Studioのデバッガは大変見やすくてよろしい。

遅延評価と実行

hogeEnumerableに包まれている状態では、まだ何も実行は開始されていません。そう、遅延評価!このままWhereやSkipを繋いでも、新たなhogeEnumerableで包んで返されるだけで実行はされません。ではいつ実行されるかといえば、IEnumerable<T>以外の、何らかの結果を要求した時です。それはToArrayであったり、Maxであったり、foreachであったり。

foreachを実行した時の動きを、図(但し致命的に分かりづらい)で見ると……

まず最初は最外周のtakeEnumerableに対しGetEnumeratorを実行し、IEnumerator<T>を取り出します。そして取り出したIEnumerator<T>に対しMoveNextの実行をすると、その先ではまた中に抱えたIEnumerable<T>に対しGetEnumeratorでIEnumerator<T>を取り出し、の連鎖が大元(この場合はrangeEnumerable)に届くまで続きます。

大元まで届いたら、いよいよMoveNextの結果が返されます。trueか、falseか。trueの場合は、通常は即座に現在値(Current)の取得も行うので、Currentが根本から下まで降りていくイメージとなります。あとは、どこかのMoveNextがfalseを返してくるまで、その繰り返し。今回はRangeが10個出力、Takeが5個出力なので、Rangeが5回分余りますがTakeで列挙は途中打ち切り。falseを流して終了させます。SumやCountなど値を返すものは、falseが届いたら結果を返しますが今回はforeachなのでvoid、何もなしで終了。

イテレータの実装

ついでなので、動作の実態であるイテレータも実装します。単純な、0から10までを返すだけのものを例として。

public class ZeroToTenIterator : IEnumerator<int>
{
    private int current = -1;

    public int Current
    {
        get { return current; }
    }

    public bool MoveNext()
    {
        return ++current <= 10;
    }

    // 必要でなければ空でもいいや、という感じ
    public void Dispose() { }

    // TじゃないほうはTのほうを返すようにするだけでおk
    object System.Collections.IEnumerator.Current { get { return Current; } }

    // Resetは産廃なのでスルー、実装しなくていいです、Interfaceからも削られて欲しいぐらい
    public void Reset() { throw new NotImplementedException(); }
}

// 使うときはこんな感じでしょーか
// IEnumerator<T>利用時はusingも忘れないように……
using (var e = new ZeroToTenIterator())
{
    while (e.MoveNext())
    {
        Console.WriteLine(e.Current);
    }
}

IEnumerator<T>ですが、見てきたとおり、中核となるのはMoveNextとCurrentです、といってもCurrentはキャッシュした値を中継するだけなので、実質実装しなければならないのはMoveNextだけ(場合によりDisposeも)。

見たとおりに一行の超単純な、10超えるまでインクリメントでtrue、超えたらfalse。なんかとってもいい加減な感じで、falseだろうとMoveNext()を呼んだらCurrentの値がどんどん増加していっちゃって大丈夫か?というと、全然問題ない。と、いうのも、そういうのは利用側の問題であって実装側が気にする必要はないから。

MoveNextする前のCurrentの値は保証されていないので使うな、であり、MoveNextがfalseを返した後のCurrentの値は保証されてないので使うな、です。大事なお約束です。お約束を守れない人は生イテレータを使うべからず。Linqのクエリ演算子やforeachは、そんな他所事を考えないで済むようになっているので、それらを使いましょう。生イテレータを取得したら負けです(拡張メソッド定義時は除く、つまりライブラリ的な局面以外では避けましょう)

ちなみにStringのイテレータは列挙前/列挙後のCurrentへのアクセスで例外が飛び、List<T>は列挙前は0、列挙後も0にリセットされ、Enumerable.Rangeでは列挙後は最後の値が返る、といったように、実際に挙動はバラバラです。

実装側が守らなければならないルールは、MoveNextが一度falseを返したら、以後はずっとfalseを返し続けること。で、その観点で、このZeroToTenIteratorを見ると、実のところ全然ダメです。MoveNextがint.MaxValue回呼び出されるとcurrentがオーバーフローしてint.MinValueになって、つまりはMoveNextの結果もfalseからtrueに変わってしまいます。腐ってますね。殺害されるべき。誰がそんなに呼ぶんだよ、という感じに普段はあんま気にしないゆとりな私ですが、いえいえ、こういう時ぐらいは気にしたりします。

オーバーフローはうっかりで見落としがちなので、ヘラヘラゆとりゆとりと笑ってないで、普段から注意すべきだと自戒するこの頃。

まあ、今時はイテレータの手実装なんてする必要ないのですがね!シンプルなものならばLinqの組み合わせで実現出来ますし、そうでないものはyield returnを使えばいいので。手実装じゃなきゃダメなシチュエーションってなにかある、かなあ?

まとめ

「return thisじゃなくて新しいオブジェクトを返してる」「配列的なイメージで扱えるけれど実体はストリームのほうが近い」「デバッガ素晴らしすぎる」「生禁止」の以上四点でした。

Linq to ObjectsのJavaScript移植であるlinq.jsも同じ仕組みでやっているので、そちらのコードのほうがブラックボックスでなく、また、素直に書いているので分かりやすくお薦め、かどうかは、微妙なところですんがー。ブラックボックスになっている部分(yield returnなど)を表に出しているので(というか出さないと実装出来ない)余計分かりにくい感も。

で、基礎からのLinqといえば紹介したいシリーズが一つ。

LondonのGooglerでMicrosoft MVPでC# in Depthの著者でStack Overflowで凄まじい解答量を誇るJon Skeet氏が、BlogでReimplementing LINQ to Objectsと称して、これまた凄まじい勢いで再実装&超詳細な解説をやっているので必見です。

詳細、どころの話じゃなく詳細で大変ヤバ素晴らしすぎる。単純なサンプルコードと結果を貼ってメソッド紹介、などという記事とは一線を画しすぎるクオリティ。私もこういう記事を書いていきたいものです。こんな量とスピードの両立は超人すぎて無理ですが、今年は記事のクオリティは上げたいですね。

C# in Depth 2nd Editionはつい二ヶ月前に出たばかりで、内容も良さそうですね、読んでみたい本です。しかし私の手元には積み本がいっぱいで、とほほ。で、本といえばもう一つ、C# 4.0 Unleashedがもうすぐ(2011/1/14、明日だね)出ます。これは著者がBart De Smet氏なので大注目です。B# .NET BlogでキレキレのLinqコード、だけじゃなくILからSQLからあらゆる領域にエキスパートな凄さを見せているので超楽しみです。こちらは予約してあるので、届くのが本当に楽しみで(洋書なので届くのは月末予定のよう、F#本が読み終えた頃になる予定なのでちょうどいいー)。

Bart氏は大学時代はベルギーのMicrosoft MVP for Visual C#、その後MicrosoftのWPFチームに入り、現在はCloud Programmability Team、つまりRxを開発しているチームに入ってます。氏が入ってからIQbservableとかヘンテコなのが次々と上がってきて、ますます目が離せない状態に。PDC10ではLINQ, Take Two - Realizing the LINQ to Everything Dreamというセッションを行ってましたが、これは本当に必見。Linqの過去、そして未来を見る素晴らしいセッションでした。感動しすぎて3回ぐらい見直した。

LINQは今後「ますます」重要になるので、しっかり土台を固めて、未来へ向かおう!

2010年を振り返る

今年もありがとうございました。無事、更新を続けられました。一応毎年、31日は一年間を振り返っていたようなので、今年も振り返ることにします。

まず言えるのはもはやゲーマーじゃねえ、ということのようで、悲しいことに。ゲサイトじゃなくなったのはしょうがないとしても、プレイすらしなくなっていました……。今年は過去最高潮にゲームしてません。積みまくりです。こんな自分になるとは想像もつかなかったなあ。とか言いながらもこっそりGeometry Wars 2のEvolvedモードで日本3位ぐらい世界50位ぐらいなスコアに更新していたりはしました。全体的にゲームやらなかったぶん、ジオメトリだけはしっかりやったということで、ヨシとしますか。来年はもっとゲーマーでいたいですね。

なお、去年遊んだ中でのベストゲームはいりす症候群!です。10月頃に初めて知ったのですが、もう激ハマリ。フリーゲームですし、最高に面白いので皆様も是非。ゲームは勿論、ストーリー/演出面でも味わい深かった。大変素晴らしい。来年のゲームはPortal2に大期待、かな。そういえば、年末のついこないだに配信されたXbox LIVE インディーズ ゲームのREVOLVER360は、大変美しい映像と素晴らしいプレイ感覚で非常に面白いので現在プレイ中。

さて、ゲーム以外ではどんな一年だったかというと、キャッキャウフフ。過去の自分からじゃ全く考えられないぐらいキャッキャウフフした一年でした。そんなに沢山というわけじゃないですがC#系の会に出たりしました。そうやって実際に人と顔を合わせることでTwitter上でのコミュニケーションの輪も広がる、という効果でか、Twitterでの@率も高くなっていきました。おお、キャッキャウフフ。と、いうとアレですが、悪いことではないですよね。人付き合いは相当に苦手なのですが、幾分か何とかなってきたようにも思えます。いや、まだ全然ですが。徐々に徐々に。

C#erとしての成長度は、うーん。ちょっと鈍いかなあ。さすがに、初めてC#に触れ本格的にプログラミングを始めましたな2008年、ようやくプログラミングが分かってLinq最高ーとどんどん知識を吸収出来た2009年、に比べて鈍化してしまうのもしょうがないのかなー、とは思いつつも、その原因ってジャンルを広げられなかったからなのではないかなあ、と思います。結局今年もLinqですからね。来年もLinqな気がしますが。今年の成果物はlinq.jsをガッと書き換えたりDynamicJson作ったり、そこそこなくはないけれど、しかし1年分の仕事じゃあないのですよねえ。

来年

継続中の積みタスクの消化がー。DynamicJsonの更新とかReactiveOAuthの更新とかlinq.jsの更新とか。やるべきことは決まっているし、別にやり始めればそんなに時間のかかるものでもないので、とっとと手をつけて終わらせるべきなのである。と、11月頃から思っていたはずが手付かずでここまで来てしまった……。ということは、来年も永遠とこのままではフラグ。

何か作ると言い続けてやるやる詐欺が続いているのですが、手元にWindows Phone 7があるのが好機なので、それでSilverlightの学習と何か作るの達成を。と思ってますです。クライアントサイドでは。サーバーサイドは、ねえ。C#から離れてnode.jsを触るのが良いかな、とは思っていて。もしくは、Rxチームの次のミッションはサーバーサイドでの応用だと思うので、とりあえず追っかけてればそのうち時流に乗れるのではないかという楽観論。

というわけで、来年も暫くはReactive Extensionsとその周辺、プラスアルファで何か、といった感じでしょうか。興味あるのことは沢山ありますが、色々なことに手を出せるほどにはキャパシティの限界容量があまりないわけなので、現実を見つつ、着実に成長出来れば良いかな、と。まだあわてるような時間じゃない。と、言いたいところなのですが、私もあまり若いとも言えない年齢なので(一応まだ20台ですがー)、そろそろ焦ったほうがいい感じではある。やっぱ実務的な知識があまりにも欠けてるのよねえ、とほほ。来年は、少しその辺も見据えて動かなきゃいけないかもですね。

そんな感じですが、来年もよろしくお願いします。

Linqと総当り

各所でAdvent Calendarも終了し皆様お疲れさまでした。自分のよく知っている言語はもちろんですが、他の言語を見ていても非常に楽しめて良いですね、特に普段自分の書かない言語で、入門からちょっと突っ込んだものまでまとめて見れるというのは非常に良い機会でした。

そんな中で見かけたScalaによる リストモナドを使ってみる - みずぴー日記 という記事。おお、スッキリかっこよく求まりますね、Scalaカコイイ。さて、総当り。非決定性計算。リストモナド。といったら、Linq、クエリ式。そんな風に反射的に連想が成りたつならもう立派なLinq使いですね!いやまあ、クエリ式で総当たり - NyaRuRuの日記で読んだことがあるから、というだけの話なのですがー。ほとんど直訳でいけるとあるように、Scalaで書かれたsend + more = moneyもまた、直訳で書けました。

var digits = Enumerable.Range(0, 10);

var solve = from s in digits
            from e in digits.Except(new[] { s })
            from n in digits.Except(new[] { s, e })
            from d in digits.Except(new[] { s, e, n })
            from m in digits.Except(new[] { s, e, n, d })
            from o in digits.Except(new[] { s, e, n, d, m })
            from r in digits.Except(new[] { s, e, n, d, m, o })
            from y in digits.Except(new[] { s, e, n, d, m, o, r })
            where s != 0
            where m != 0
            let send = int.Parse("" + s + e + n + d)
            let more = int.Parse("" + m + o + r + e)
            let money = int.Parse("" + m + o + n + e + y)
            where send + more == money
            select new { send, more, money };

foreach (var item in solve) Console.WriteLine(item);

ちゃんちゃん。こういうの見ると、各言語は同じとこに向かってる感がありますね。微妙に表現は違いますが、同じ発想が通じるし同じように書ける。その点はC#もScalaもF#も同じパラダイム、同じ未来を目指して向かってるのではないかと思います。Javaは微妙に落伍している気がしますが、もう少し何とかなって欲しいものです。

と、いうだけなのもアレなので、 int.Parse("" + s + e + n + d) の部分について。数字を文字列的な並びとして解釈したい、というわけですが、手段をまとめるとこんな感じになるかしらん。

// という数字があった時に123にしたい
var x = 1;
var y = 2;
var z = 3;

// ダルい
var xyz1 = int.Parse(x.ToString() + y.ToString() + z.ToString());
// 複数ある時はこちらで
var xyz2 = int.Parse(string.Concat(x, y, z));
// コンパイル後の結果はxyz2と一緒
var xyz3 = int.Parse("" + x + y + z);
// 文字列変換しない
var xyz4 = x * 100 + y * 10 + z;
// ↑のは桁数が多い時に泣くのでこうしてやる
var xyz5 = new[] { x, y, z }.Aggregate((a, b) => a * 10 + b);

文字列にして並べてintに変換するのがお手軽なわけですが、.ToString()を並べていくのはダルいわけですよね!というわけで、そんな時はstring.Concatを使うと全部まとめて結合出来ます。しかし int.Parse(string.Concat と並ぶのすらダルい、とかいう不届き者は先頭に""と足し合わせることでstring.Concatで結合したのと同じ結果を得られます。これは、演算子は左から適用されていきますが、文字列と数値を+で足すと文字列として足される、以下繰り返し。の結果。キモチワルイといえばキモチワルイので、積極的に使うかというと悩ましいところですが……。

そもそも数字を扱うのに文字列に変えてー、とかが邪道だという話も割とある。効率的な意味でも。なので、そういうときは2の桁は10倍、3の桁は100倍……。とかやっていると桁数が多いときはどうするのどうもしないの?という話なので、Aggregateを使うという手もあります。Aggregateは一般的な関数型言語でいうfoldlに相当。左からの畳み込み演算。ところで、では右からの畳み込みはどうすればいいの?つまりはfoldrはどうなのか、というと、これは.Reverse().Aggregate() のようで。右からなら逆にすればいいぢゃない。

ところで、C#のLinqで出来ることはJavaScriptのlinq.js - LINQ for JavaScriptでも出来ますよ?やってみます?

var digits = Enumerable.Range(0, 10);

var solve = digits
    .SelectMany(function(s){ return digits.Except([s])
    .SelectMany(function(e){ return digits.Except([s, e])
    .SelectMany(function(n){ return digits.Except([s, e, n])
    .SelectMany(function(d){ return digits.Except([s, e, n, d])
    .SelectMany(function(m){ return digits.Except([s, e, n, d, m])
    .SelectMany(function(o){ return digits.Except([s, e, n, d, m, o])
    .SelectMany(function(r){ return digits.Except([s, e, n, d, m, o, r])
    .Select(function (y) { return { s: s, e: e, n: n, d: d, m: m, o: o, r: r, y: y} })})})})})})})})
    .Where("$.s != 0")
    .Where("$.m != 0")
    .Select(function(x){ return { 
        send: parseInt("" + x.s + x.e + x.n + x.d),
        more: parseInt("" + x.m + x.o + x.r + x.e),
        money: parseInt("" + x.m + x.o + x.n + x.e + x.y)}})
    .Where(function (x) { return x.send + x.more === x.money });

solve.ForEach(function (x) { document.writeln(x.send + ":" + x.more + ":" + x.money) });

C#クエリ式からの変換のポイントは、from連鎖をSelectManyの連鎖で、但しカッコは閉じず変数のキャプチャを内包したままで最後にSelectで一旦整形してやるというところです。正確なクエリ式の再現とはなりませんが、この程度ならば、まあ何とか書けなくもないレベルとなります(正確なクエリ式の変形結果の再現をやると手では到底書けないものになる)。

ちなみに総当りなので結構時間がかかってIEだと泣きます。Chromeなら、まあそれなりな速度で求まるかなー。

Rx Christmas Release 2010によるPublishの変化解説

メリークルシミマス。Happyなことに、Reactive Extensions for .NET (Rx)が更新されました。なんと、WP7に標準搭載されたことだし安定性・互換性はある程度保証されてきたよねー、とか思った側から完全に互換を崩壊させる素晴らしいBreaking Changesをかましてきました!常識では計り知れない大胆な所業。そこにしびれるあこがれれぅ。

さて、今回の変化はJoin/GroupJoin/Windowの新規搭載とPublish系の変更です。Joinについてはまた後日ということで、今回はPublishの変更について解説します。で、ですねえ、これはもう100%変わってます。昨日ちょうどPublishによる分配、とか書いたわけですが、出したコードは全部動かないです!素晴らしいタイミング!Ouch!

端的に言えばPublishは引数にISubjectを受け取るよう変化。Prune/ReplayはPublishに統合されたことにより消滅。何でかというと、元々、分配ソースにSubjectを使うのがPublish、BehaviorSubjectを使うのが初期値付きPublish、AsyncSubjectを使うのがPrune、ReplaySubjectを使うのがReplayでした。分配ソースを任意で受け取るようになったため、メソッドが分かれる意味がなくなったというわけですね。

ナンタラSubjectの解説は、まあまた後日にでも。簡単に言えばSubjectは普通のイベントと同じで素直な挙動、つまりOnNextしたら値が流れる。AsyncSubjectは非同期を模していて、OnNextされたら値を一つだけキャッシュし、OnCompletedされたらキャッシュした値を流す。以降はSubscribeされるたびにキャッシュした値を即座に流し、OnCompletedも発行。ReplaySubjectはOnNextされる度に値をキャッシュし、それを流す。Subscribeされるとキャッシュした値を全て即座に流す。BehaviorSubjectはOnNextされる度に最新の値一つのみをキャッシュし、それを流す。Subscribeされるとキャッシュした値を即座に流す。

イマイチ分かりづらいですね:) というわけで本当に詳しいことは後日で。AsyncSubjectだけは、今までにも、そういう挙動であることの意味とかしつこく書いてきましたが。

そして、IConnectableObservableが戻り値となるものはMulticastというメソッド名になりました。引数は勿論、ISubjectを受け取るという形に。では、昨日のコードで例を。新旧比較ということで。

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(new Subject<int>(), xs => xs.Min().Zip(xs.Max(), (min, max) => new { min, max }))
    .Subscribe(Console.WriteLine);

Publishの第一引数にSubjectを突っ込みました。第二引数にはそれで分配されたIObservableが渡ってくるという塩梅です。

var input = Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Multicast(new Subject<int>());

input.Min().Zip(input.Max(), (min, max) => new { min, max })
    .Subscribe(Console.WriteLine);

input.Connect();

こちらがMulticastです。まあ、普通にISubjectを受け取るようになったというだけで、今までのPublishでIConnectableObservableが返ってくるのと同じです。さて、ISubjectを受け取るということは、外部のISubjectを渡してもOKです。新しくこんな分配が可能になりました。

var input = new ReplaySubject<int>();

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(input) // 流れてくる値をReplaySubjectに保存する
    .Max()
    .Subscribe(i => Console.WriteLine("Max:" + i));

var inputArray = input.ToEnumerable().ToArray(); // 入力値を配列に変換

といっても、この例は全く意味がなくて、Max()で止めておいて、その戻り値をinputとして受ければいいだけの話なのですが。上手い利用例が浮かばなかったのです!まあ、色々と応用しどころというのは生まれてくるのではないかと思います。

まとめ

この破壊的変更ですが、私としては好意的に捉えたいです。如何せんPruneというメソッド名は意味不明でしたし、オーバーロードが8つもあるという状況も良くなかった。今回整理されたことで、使いやすくなったと思います。が、しかし、WP7版との互換性が切れてしまったのは相当痛い。今までusingをプリプロセッサディレクティブで#if WINDOWS_PHONE using Microsoft.Phone.Reactive という感じに切り替えてWPF/SL/WP7で互換を取っていたのですが、ここまで派手に互換性なくなるとなあ。

ちなみにWP7標準搭載"ではない"DevLabs版のRx for WP7というのも用意されているので、そちらを使えばいいわけなのですが、それはそれでどうかなー、どうかなー、困った話で。

それと、この変更はまだ追随されていませんが、そのうち System.Interactive(Ix.NET/EnumerableEx) や RxJS にも派生してくるような気がするので、再びAPI安定していない状態に戻った感がありますねー。要チェックで要注意で。Rxチームは大変なクリスマスプレゼントを贈ってきました。そして、Rxが.NET 4 SP1に入るの?入らないの?的な希望観測もあったわけですが、私個人の印象としては、SP1入りな可能性はなくなったな、という気がしてます。まだまだ作り替える気満々だもの、これ。

Reactive ExtensionsとPublishによるシーケンスの分配

Twitter上で見たお題、10個コンソールから整数の入力を受けて最少と最大を出す。Linqで書くならこうでしょうか。通常は0-10でfor文を回すところはRangeで。あとはMinとMax。int.Parseなので数字以外は例外飛んでしまいますが、その辺は無視で。

var input = Enumerable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()));

var max = input.Max();
var min = input.Min();

はい。これだと20回入力させることになってしまいます。Maxで列挙され、Minでも列挙され。ダメですね。というわけで、配列に一度保存しますか。

var input = Enumerable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .ToArray();

var max = input.Max();
var min = input.Min();

よろしい。しかしこれの列挙回数は3回です。ToArrayで一度、Maxで一度、Minで一度。しかもMaxとMinを出したいだけなのに、わざわざ配列に保存してしまっています。もし配列が巨大になる場合はメモリ的な意味でもちょっと嫌ですねえ。さて、そこで出番なのが、プッシュ型シーケンスを持つReactive Extensions。プッシュ型ならば分配が可能です。やってみましょう?

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs => xs.Min().Zip(xs.Max(), (min, max) => new { min, max }))
    .Subscribe(Console.WriteLine);

ふむ、これはクールだ……。Rxの格好良さは異常。

Publishは今までにも何度も出してきましたが、シーケンスを分配するメソッドです。今までは引数なしにして、IConnectableObservableを変数に受けて、それに対してポコポコ足して、最後にConnect。としていましたが、Publishには引数有り版も用意されています。引数有りの場合は分配されたIObservableが変数として渡ってくるわけです。つまり上のコードは

var input = Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish();

input.Min().Zip(input.Max(), (min, max) => new { min, max })
    .Subscribe(Console.WriteLine);

input.Connect();

と等しいということになります。ちなみにPublish内でSubscribeしたって構わないわけなので

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs =>
    {
        xs.Min().Subscribe(x => Console.WriteLine("min:" + x));
        xs.Max().Subscribe(x => Console.WriteLine("max:" + x));
        xs.Sum().Subscribe(x => Console.WriteLine("sum:" + x));
        xs.Average().Subscribe(x => Console.WriteLine("avg:" + x));
        return xs;
    })
    .Subscribe();

といったような書き方も可能ですね。では、min,max,sum,avgと4つを返したい場合はどうしよう。Join-And-Thenを使います。

Observable.Range(0, 10)
    .Select(_ => int.Parse(Console.ReadLine()))
    .Publish(xs => Observable.Join(xs
        .Min().And(xs.Max()).And(xs.Sum()).And(xs.Average())
        .Then((min, max, sum, avg) => new { min, max, sum, avg })))
    .Subscribe(Console.WriteLine);

この例だと単純に直列に繋いでいるだけなので、Zipと同じようにAndで繋いでThen。というだけですね。ちょいとゴチャッとしていますが。Joinの強力なところはAnd-Thenで作るPlan(Thenの戻り値)を複数受けられるという点にあります。これにより複雑な結合を生成する事ができます。ZipとMergeが混ざったような、でももう少し複雑な何か。といった雰囲気。

Publish

Publishはオーバーロードが8つもあって最初戸惑ってしまいますが、分類としては二つに分けられます(そしてそれぞれFuncを受け取る/スケジューラを受け取るの組み合わせで4つに分かれて2x4=8のオーバーロード)。一つは引数として初期値を受け取らないもの。もう一つは受け取るもの。今までは全部、受け取らないものを見てきました。内部的にはSubjectを使って分配しています。一方、初期値を受け取るものは内部的にBehaviorSubjectを使って分配します。というわけで、どんな挙動を取るのかよくわからない、という場合はBehaviorSubjectを試してみると良いでしょう!(何だこの解説する気ゼロな投げっぱなしな説明は)

ところでな余談

注意しないといけないのは、ふつーのシーケンスがあって、それをRxで分配やるとはっきし言って遅いです。列挙数が多くなると、ただ配列舐めるのに比べて数万倍のパフォーマンス差が出たりします。ほとんどのシチュエーションでRxで分配するぐらいなら二度三度舐めたほうが速いです。まあ、分配に限らずRxが普通に遅いということなのですがー。ご利用は計画的に。今回の例のConsole.ReadLineで、というのは、当然Rxのほうが速いですが。入力とリアルタイムに応答していますからね。Reactive!ようするところ、Rxが活きるのってそういうところ、かしらん。

linq.js & Reactive Extensions for JavaScript(RxJS)入門

このエントリはJavaScript Advent Calendar 2010 : ATNDの20日目として書きます。一つ前はsecondlifeさんのコマンドラインから JavaScript のシンタックスチェックを行う方法 - って、なんでですか〜 - subtechでした。

普段はC#でもしゃもしゃしている、@neuecc(twitter)といいます。そんなわけで今回はC#畑からのJavaScriptライブラリを二つほど紹介します。

ここではC#の中でも、LINQ: .NET 統合言語クエリという機能から来ているlinq.jsとRxJSを紹介します。linq.jsはコレクション操作を、RxJSはイベント操作と非同期操作に関するライブラリとなっています。どちらもwindowオブジェクトには一切触らない、DOMやXmlHttpRequestとは全く無関係の中立で地味な、phpspotで紹介されそうもない、それだけで何が出来るかというと別に何もできないけれど、あると便利。また、Enumerable(linq.js)とObservable(RxJS)という枠組みが双対であるという、面白い影響の与え方をしているため、セットで考えるとより深イイ話になるから、ちょっと長くなりますが二つ一緒に紹介します。

linq.js

linq.jsはLINQのうち、Linq to ObjectsというものをJavaScriptに移植したものとなります。ダウンロードは下記URLから。ライセンスはMs-PL、というと聞きなれないかもしれませんが、MITライセンスに近い、かなり緩めのものとなっています。

linq.js - LINQ for JavaScript

公式で移植されたものではなく、野良で勝手にやっているだけの話であり、作者は、私です。ん……。あ……。……。さて、LINQとは何ぞや、統合言語クエリがうんちゃらかんちゃら、SQLがどうのこうの、というのはJS的には全く関係ないのでスルーで!Linq to Objectsに絞って何ぞやというと、「関数型っぽいコレクション処理ライブラリ」です。ひとまず簡単な例から。

// こんなテキトーな配列があったとして
var array = [101, 20, 2, 42, 33, 47, 52];

// 偶数だけ二倍して出力 : 40, 4, 84, 104
Enumerable.From(array)
   .Where(function(x){ return x % 2 == 0 })
   .Select(function(x){ return x * 2 })
   .ForEach(function(x){ document.writeln(x) });

よくある関数を渡してコレクション処理するスタイルです。グローバルに配置される名前空間はEnumerable。そこから、既存の配列に対して適用する場合はFromで包んで(jQueryの$のようなものと考えればOK、数字列の場合はRangeなどもあります)、以降はLinqの専用メソッドをチェーンで繋いでいく、というスタイルを取ります。ちなみにEnumerableはイニュミラボーと読むようです。最後のボーは、私は日本人なのでブルって言いますが。イニュミラブル。

ところでfilterしてmapしてforeach。そうですFirefoxにあるArrayへのmap/filterです。Whereがfilter、Selectがmap。名前はSQL風味ですが、つまるところ単純な話そういうことです。フィルタぐらいならjQueryにもmap/grep/eachがあるね。他に目を向ければ、コレクション系専用ライブラリではUnderscore.jsが有名のようです。

それらとの違いですが、一つはfunction(){}という、冗長にすぎる関数記述を省く文字列による簡易記法の搭載。もう一つは完全な遅延評価の実現。そのため、適用する関数を自由に追加していくことができます。簡単な例をもう少し。

var array = [101, 20, 2, 42, 33, 47, 52];

// この時点ではまだ列挙されていない。
// $は引数を示すプレースホルダーで、この場合 function(x){ return x%2==0 } と同じ
var query = Enumerable.From(array).Where("$%2==0");

// 更に二乗したうえで配列へ変換するならToArray
var array2 = query.Select("$*$").ToArray(); // [400, 4, 1764, 2704]
// 昇順に並び替えた上でセパレータをつけて文字列化
var orderedStr = query.OrderBy().ToString(":"); // "2:20:42:52"
// 先頭二つのみを列挙
query.Take(2).ForEach("alert($)"); // 20, 2

と、いったように様々なメソッドを繋げてコレクションを変形し、最後に任意の形に変換するというのが基本的な利用法になるわけです。わからん。というわけで図にするとこんな感じです。

構図的にはjQueryと同じで、Enumerable.From/Range/etc.. によってEnumerableオブジェクトが生成され、その中でのメソッド(Select/Where/etc..)はEnumerableオブジェクトを返す。そのため幾らでもメソッドチェーンを繋げられる。jQueryと違うのは、繋げた関数が実行されるのは、戻り値がEnumerable以外になるときまで遅延される。Enumerable以外を返すメソッドとは、ToArray(配列に変換)、ToString(文字列に変換)、ToObject(オブジェクトに変換)、Contains(値が含まれていればtrue、含まれていなければfalse)、Max(最大値を返す)、ForEach(列挙する)、etc...(まだまだ大量にあります)。

遅延評価であることによるメリットとして、無限リストを扱えることが挙げられます。というわけで、無限リストを活用する例を一つ。「nを1から初めてその2乗を足していき、和が2000を初めて超えたとき和はいくつになるかという問題」をScalaで解いてみたから、nを1から(以下略)をlinq.jsで解いてみます。

var result = Enumerable.ToInfinity(1) // 1から無限大まで数値をジェネレート [1, 2, 3, 4,...]
    .Select("$*$") // 二乗 [1, 4, 9, 16,...]
    .Scan("$+$$") // 和 [1,5,14,30,...]
    .First("$>2000"); // 2000を超えた最初の要素

元がC#/Linq版のコードであるから当然ではありますが、リンク先のScala版のコードと完全に一致、ですね。JavaScriptは関数型言語。無限リストの生成には、今回はToInifinityを使いましたが、関数型言語に馴染みのある人ならばUnfoldなどもありますので、望む物が生成出来るはずです。

"$+$$"でサクッと和を出せるのは中々強力で便利。やりすぎるとイミフになるので、こういうササッとした部分にだけ限定すれば。任意の識別子を使いたい場合は"sum, x => sum + x"というように=>の左側は引数、右側は式、という形でも書けます。なお、実装は new Function("$,$$,$$$,$$$$", "return " + expression) というだけです。渡す文字列が式でないとダメな非常に単純な理由。

その他、メソッド類の一覧と、その場で実行/確認可能なLINQ Padはlinq.js Referenceで。

といったように、リアルタイムに実行結果を確認しながら試せます。無限リストを停止条件つけないで書いてしまっても、列挙は1000件までにリミッターかかるので一安心。

メソッド群ですが、LINQにある標準クエリ演算子(と、言うと大仰ですが、ようするにただのメソッドです)を全て実装、その上でHaskellやRubyなどのコレクション用メソッドを眺めて、便利そうなら移植してあります。そのため、コレクションライブラリとしてはとしてこのメソッドが不足してる!と不満を感じることはないはず。また、遅延評価を活かしてメソッドを組み合わせることにより、大抵の操作が可能になっています。

とはいえ、そのせいで肥大しすぎな感がなきにしもあらず。とりあえず、フィルタ(Where)とマップ(Select)、非破壊的で複数キー連結が可能なソート(OrderBy/ThenBy)、重複した値を取り除く(Distinct)といった辺りだけ押さえておけば良いかなー、と。

そういえばでドットが前置なのは何で?というと、その方が入力補完に便利だからです。

この辺の話はJavaScriptエディタとしてのVisual Studioの使い方入門で。IDEを使ってJSを書くなら、ドットは前置一択になります。補完のない普通のエディタで書くのならスタイルはお好みで。私としては、入力補完・コードフォーマッタ・デバッガとのシームレスな融合などなどから、JavaScriptであってもIDEを使うのが良いと思ってます。

Reactive Extensions for JavaScript

続けてReactive Extensions for JavaScript(RxJS)について。Linqという名前は付いていませんがLinqの一味と見なせます。いわばLinq to Events, Linq to Asynchronous。ということで、対象となるのはイベントと非同期。ダウンロードは下記URLの右側、Rx for JavaScriptから。

Reactive Extensions for .NET (Rx)

こちらはlinq.jsと違って公式が提供しています。

さて、ではこれは何が出来るのでしょう。出来るのは(Functional) Reactive Programmingです。とは何ぞや。というと、既に親切な解説があるので やさしいFunctional reactive programming(概要編) - maoeのブログ そちらを見るといいと思うな!

とりあえず簡単な例をまず先に。

// マウスの動きの座標ストリーム(無限リスト)
var mousemove = $("#js_advcal_field").toObservable("mousemove")
    .Select(function (e) { return { X: e.pageX, Y: e.pageY} });

// 位置をTextに書き出し
mousemove.Subscribe(function (p) { $("#js_advcal_status").text("X=" + p.X + ":Y=" + p.Y) });

// 1.5秒遅れて四角形を座標位置に出す
mousemove.Delay(1500)
    .Subscribe(function (p) { $("#js_advcal_rect").css({ left: p.X, top: p.Y }) });
X=0:Y=0

コード似てませんか?今まで出してきたLinq to Objectsのコードに。RxJSは、イベントをコレクションとして扱うことでフィルタ(Where)やマップ(Select)を可能にします。図にするとこうです。

toObservableすることにより、jQueryでアタッチされるMouseMoveイベントは、時間軸上に無限に発生し続ける「無限リスト:Observable Collections」として変換されます。このコレクションの上では時間軸を自由に扱えるため(そもそもMouseMove自体が、いつ次の値が発生するか不確定な時間の上にのっている)、ごくごく自然に、1.5秒後(Delay)という指示を与えるだけで到達時間を遅らせることが出来ます。

RxJSもlinq.jsと同じく、基本的にメソッドの戻り値はObservableオブジェクトとなっていてひたすらメソッドチェーンしていきます。違うのは、linq.jsはToArrayやToString、ForEachなどなど、Enumerable外に出るメソッドが複数ありますが、Observableの場合はSubscribeのみです。SubscribeメソッドがForEachのような役割を担っています。何でToArray出来ないの!というと理由は簡単で、扱う対象が時間軸上に流れる無限リストだからです。無限を有限のArrayに変換は出来ませんよねー。本質的にObservable Collectionsは非同期な状態なので、何か戻り値を返すということは出来ません。出来るのは、向こうからやってくる値に対して実行する処理を示すことだけです。

RxJSは、扱いにくい時間や非同期、複数のイベントが同時に絡む時のイベントの合成を、慣れ親しんだシンプルなコレクション処理のように見た目上落としこんで、foreachするように処理を適用することが出来ます。それが特徴となります。

jQuery

前のコードに出ていた$("#hoge")はjQueryです。脈絡なく出してきているわけですね!解説が前後してれぅー。どういうことなのかというと、RxJSは基本的にwindowとは中立です、が、メインで扱う物はDOM(イベント)だったりXmlHttpRequest(非同期)で、これらは抽出の必要があったりクロスブラウザの必要があったりと、一手間二手間な問題を抱えている。それを解決してくれるのがjQueryだったり他のライブラリだったりするわけですね。そこで取られた手段が、jQueryに乗っかること。ようするにプラグイン。RxJSとjQueryを繋ぐ部分を注入。そうして注入されたのがtoObservableというわけです。

linq.jsもjQueryもRxJSも、基本的にメソッドチェーンで自分の世界に閉じっぱなしです。jQueryはモナドだという記事がありましたが、linq.jsもモナドです。RxJSもモナドです。いや、本当に。ただそのへんの理屈は割とどうでもいいわけですが、ただ、交互に変換出来ると便利よねー、なところはあるわけで、三者はプラグインを介して上記の図のような形で遷移可能になっています。

Asynchronous

最後に非同期を。非同期もObservable Collectionsになる、というわけで例から行きましょう。

$("#js_advcal_twbutton").toObservable("click")
    .Do(function () { $("#js_advcal_twitter").empty() })
    .SelectMany(function () { return $.ajaxAsObservable({ url: "http://twitter.com/statuses/public_timeline.json", dataType: "jsonp" }) })
    .SelectMany(function (json) { return Rx.Observable.FromArray(json.data) })
    .Where(function (status) { return status.user.lang == "ja" })
    .Select(function (status) { return $("<p>").text(status.user.screen_name + ":" + status.text) })
    .Subscribe(function (q)
    {
        $("#js_advcal_twitter").append(q);
    });
statuses...

胡散臭い(笑)public_timelineボタンをクリックすると、Twitterのpublic_timelineから日本人のツイート(user.lang=="ja")のみを表示します。これは、ちょっとメソッドチェーンが多めで微妙にワケワカラン。そんな困ったときはとりあえず図。

起点はボタンのクリックです。これもまたMouseMoveの時と同じで考え方としては無限リスト状。一回目のクリック、二回目のクリック、と無限に続いていきます。このコレクションに対する処理として、次に流れてくるのはDo。これは副作用で、コレクションの値以外の、流れてきたということをトリガーにして外部に影響を与えたい時に使います。今回はクリックをトリガーとして、一旦DIVの中の要素をクリア(empty())しています。

そしてSelectMany。SelectManyはSelectと似ていますが、1:多(だからMany)へと分配するのが特徴です。ここでコレクションの流れは非同期へとバトンタッチされます。非同期のリクエスト、今回はjQueryにおんぶに抱っこでajaxAsObservableにより、twitterのpublic_timeline.jsonからデータを取得。特徴的なのは、非同期なので戻り値が得られるまでに若干のタイムラグがあるわけですが、それは以前に扱ったDelayと同じように、時間軸が少し右に移るだけで、流れ自体はそのままで扱えてしまいます。

1:多ですが、非同期リクエストの戻り値は1なので、見た目上は一個の値が変形しているだけのように見えて、再び次のSelectMany。ここでの値はJSONで、20個分のpublic_timelineのstatusが届いています。それを1:他という形でバラす。RxJS上では「イベント」「非同期」を載せてきましたが、「配列」も問題なく載っかるということです。

ここまで来たら、あとは普通のコレクション処理と同じようにWhereでフィルタリングし(言語が"ja"のみを通す)、Selectで変形し(jQueryで新しいDOMノードを作成)、Subscribeで実行処理を書く(divの中にappend)。

というわけで、「イベント」「非同期」「配列」を一本のコレクションへと合成し、統合しました。一見すると、ただそれだけのことに何でわざわざ複雑めいたことを、と思えてしまいますが、複数の非同期を合成したくなったら?待ち合せたくなったら?などなど、シチュエーションが複雑になればなるほどに、威力を発揮します。

つまりそれJSDeferredで...

です。領域はかなり被ると思います。waitはDelay、nextはSelectまたはSelectManyが相当するでしょう。

// Hello -> 5秒後 -> HelloWorld のalertが出るというもの(UIはフリーズしません)
Rx.Observable.Return("Hello") // パイプラインに"Hello"を流し始める
   .Do(function (x) { alert(x) }) // alertを出す
   .Delay(5000) // 5秒遅延
   .Select(function (x) { return x + "World" }) // 値にWorldを結合
   .Subscribe(function (x) { alert(x) }); // パイプラインの実行開始+alertを出す

例はJSDeferred 紹介より、少し違いますが。また、関数のDeferred化のdeferred.call/failはAsyncSubjectのOnNext/OnErrorが相当しそうです。詳しい話はまたそのうち、もしくはC#で良ければReactive Extensionsの非同期周りの解説と自前実装などを。

まとめ

なげー。スミマセンスミマセン。Enumerable -> コレクション処理 -> 無限リスト -> Observable -> イベントが無限リスト -> 時間軸が不定 -> 非同期 -> コレクション処理。という流れのつもりでした!分量的にEnumerableかObservableか、どっちかに絞るべきでしたね……。もっとあっさり終えるはずだったのにどうしてこうなった。

prototype.jsはRubyっぽい色がある。Firefoxで拡張が続いてるJavaScriptはPythonからの影響が濃ゆい感じ。linq.js/RxJSは勿論C#からで、更にLinqの元はSQLは勿論なのですが、Haskellからの影響も濃く(Linqや、そして現在はRxの開発チームを率いているErik MeijerはHaskellの人で、The Haskell 98 Language Reportにも名前を連ねている)、そうこうして他言語同士が相互に影響を与えてより良くなる。というのはイイ話だなー、って思っていまして。

そして、他言語の文化を受け入れられる懐の広さと、それでもなお自分の色を持ち続けられるJavaScriptってイイ言語だよね、と思います。

と、駄エントリを〆て次のAdvent Calendarにタッチ!

Titanium Mobile + Visual Studio用のAPI入力補完vsdoc自動生成T4 Temlate

Titanium Mobileをご存知ですか?私はつい最近知ったばかりです。かなりHotみたいで紹介をよく見るし、はてブでも色々な記事がブクマ数百行ってたりしますが、さっぱり意識していなかったせいで右から左に流れていくだけで名前を覚えていませんでした。が、 【就職先決定】Titanium MobileのAppceleratorに勤めることになりました - @masuidrive blog を見て、ようやくTitanium Mobileの名前と出来ることが一致しました。つまりはJavaScriptでNativeなAndroid/iPhoneアプリケーションが作れて、とってもHotなテクノロジだそうですはい。

ちょっと触った感じコンパイルというかコード変換というか、な部分だけが提供されてるような雰囲気でIDEとかそーいうのは今のところないようで?かしらん。デバッガとかないのかな、printオンリーかしら……。ともあれ、Javascriptということで何で開発してもいいわけですね。エディタですか?ありえない!入力補完のない環境で開発なんて出来ません。デバッガがなくても生きていけますが入力補完はないと生きていけません。

というわけでVisual Studioです。以前にJavaScriptエディタとしてのVisual Studioの使い方入門という記事を書いたように、Visual StudioはJavaScriptエディタとしても最高なのです。さて、では入力補完だ。というわけで捜すとあった。Appcelerator Titanium vsdoc for Visual Studio。へー、確かに動いてるね、簡単だね。以上。

と、まあ興味持った当時はそれで一旦終えたのですが、昨日に【追記あり】「Titanium Mobile1.4」の入力支援をMicrosoft Visual Web Developer 2010 Expressで行う。 - 葛城の日記という記事を見て、そもそも元はjsonとして公式にデータが提供されていてそれを加工したものであること、またその加工っぷりはちょっといま一つであることを知りました。

さて、前置きが長くなりましたが、つまりイマイチなら自分で作ればいいんですね。幸いにもデータは公式で用意してくれているので。

T4でjsonからvsdocを生成する

いつもならドヤ顔でコード張るんですが、ちょっと長いのとC#の話なのでスルー。というわけでコードはBitbucketに置いておきますのでご自由にどうぞ。TitaniumMobileApi-vsdoc.tt。使い方は後ほど説明します。そんなの面倒臭いー、という人は生成済みのほうをどうぞ。TitaniumMobileApi-vsdoc.js。2010/12/10のver1.5.0で生成していますが、どうせVS使うならそんな手間でもないので、.ttで自分で生成するほうをお薦めします。

.ttの使い方

.tt、の前にVisual StudioでのJavaScriptの開発の仕方。ですがJavaScriptエディタとしてのVisual Studioの使い方入門で解説しているのでその通りというわけで省略。

TitaniumMobileApi-vsdoc.ttと、公式サイトにあるAPIリファレンスのJSON(api.json)を同じフォルダに入れて、TitaniumMobileApi-vsdoc.ttを開き保存(Ctrl+S)を押します。するとTitaniumMobileApi-vsdoc.jsが生成されます。オシマイ。

こんな感じに階層が出来て、8000行ほどのjsが生成されているはずです!

さて、では使ってみましょう。適当な名前のjsを作り、reference pathに先ほど生成されたjsを記述。あとはTitanium名前空間から始まるので、ポンポンと書くだけ。

解説付きで引数などなどもバッチシ!

煩わしい大量の定数も、入力補完で一覧が出るので楽ちん!

海外の、Jeremy Meltonさんの作ったvsdocとの最大の違いは、関数での生成後の戻り値のObjectもある程度補完し続けてくれることです。例えばcreateHogeとか。

これ、もとのapi.jsonでは戻り値がobjectとして記載されているため、普通に自動生成するだけだと追随出来ないのです。私はただの機械的な変換ではなく、実用に根ざしたアドホックな対応を施したため、多少は追随するようになっています。

ただし、一点だけ残念なことがあって、プロパティ名にハイフンがつくものは補完に表示されません。例えばMapViewはfont-familyといったプロパティを持つのですが補完に出てきません。これはVisual Studioの仕様というか不具合というか残念な点というか。直るといいです。要望としての報告が出てなければ報告しよう……。と思って要望を出してしまったのですが、そもそもJavaScript的にハイフンのつくのが動くほうがオカシイ!普通は動かん!ということに要望出して30秒後に気づきました。頭がホカホカだったので全然考えが回ってなかったのでする。こういうのって出した直後に頭が冷えて気づきますよね。というわけで、これはむしろTitanium MobileのAPIがオカシイのでは感。そして私は悲しいほどに赤っ恥を書いてしまった、いや、今更一つや二つ、しょうもない恥が増えてもいいんですけど、いいんですけど、恥ずかしすぎるぅ。泣きたい。

余談(T4の話)

配布用なので1ファイルにするためJsonReaderWriterFactoryを生で使って書いてる(ためちょっと冗長)のですが、そうでなければDynamicJson使ってサクッと済ませます。DynamicJsonは楽ですね、ほんと。

あと、T4で困ったのが末尾カンマ。最後の要素だけカンマ不要なわけなんですよねー、これがどう生成したものか困ってしまって。ただの文字列ならstring.Join(",",xs)とかで対応するわけですが、T4だとインデントとかの絡みもあるので、そういうのは使わない方が楽だし綺麗に仕上がる、けれど上手く出来ない!とりあえず今回はこんな対応をしてみました。

// こんなコードを吐く、つまり最後の要素の末尾には","は不要
{
    "anchorPoint": null,
    "animate": true,
    "animatedCenterPoint": null
}    

// こんな拡張メソッド(第二引数のboolは末尾でないか末尾か)
public static void Each<T>(this T[] source, Action<T, bool> action)
{
    for (int i = 0; i < source.Length; i++)
    {
        var hasNext = (i < source.Length - 1);
        action(source[i], hasNext);
    }
}

// T4のView側はこんな感じで、最後に<#= hasNext ? "," : "" #>をつけて対応

<#o.Properties.Each((x, hasNext) =>{#>
    "<#=x.Name #>": <#=x.Value #><#= hasNext ? "," : "" #>
<#});#>

微妙に苦しい感じでしたねえ。こういうのは ASP.NET/ASP.NET MVCではどう対応しているものなのかしらん。全然知らないので誰か教えてくれれば嬉しいです。

まとめ

私はこの手のダミーのvsdoc生成は色々やっていて、linq.jsのvsdocは手作業、RxJSのvsdoc作りはmono.cecilを使ってdllとXMLから結合するConsoleApplicationを作成、などなどで割と手慣れてきました。テンプレートが統合されている点と(printlnってねえ...)更新の楽さ(新しいjsonが出たらそれを放りこんで再セーブするだけ、ある意味T4はC#/VSにおけるLL的スクリプト言語と見ても良いのではないでしょーか)を考えると、T4で作るのがベストな選択に思えます。スタンドアロンで動かしたい場合は"前処理されたテキストテンプレート"も使えますしね。

さて、こうしてわざわざ手間かけて入力補完作る理由ですが、だって補完ないとやる気出ませんから!IDEで楽が出来ないなら楽が出来るよう苦労すればいいぢゃない。そしてIDEに依存するのもまた美徳です。プログラマの三大美徳。楽で何が悪いって?何も悪くないよ!あと、誰か一人が苦労すれば、お裾分け出来るわけなので、先立って作ってシェアしたいところです。

というわけで、Visual StudioでiPhone開発もAndroid開発も幸せになりましょー。個人的にはTitaniumは良いと思いますが、C#で開発できるmonotouchやmonodroidに興味津々ではありますが。んで、何で手を出し始めているかというと、linq.jsを使うシーンを求めてるんですぅー。ふふふ。というわけで、Titanium Mobileな方はlinq.jsもよろしぅお願いします。

Reactive ExtensionsとAsync CTPでの非同期のキャンセル・プログレス処理

暫くはAsync CTPを特集していく!と思っていたのですが、何だか随分と間があいてしまいました。じっくり非同期操作に必要なオペレーションは何か、と考えるに「バックグラウンドでの実行」「進捗のUI表示」「結果のUI表示」「キャンセル処理」「エラー時処理」が挙げられる気がします。というわけで、こないだまではRxで進捗表示とかエラー時処理とか見ていたわけです、決してAsync CTPをスルーしていたわけではありません!ホントダヨ?記事の分量的にどうしてもRxだけで埋まってしまったのです。

さて、ところでこれらってつまり、BackgroundWorkerですよねー。ただたんに裏で実行するだけならThreadPool.QueueUserWorkItemでいいし、結果のUIへの伝達ぐらいならDispatcher.BeginInvoke書けば…… ですが、進捗やキャンセルなどを加えていくとドロドロドロドロしてしまいます。それらが統合された上でポトペタプロパティ設定で使えるBackgroundWorkerは偉大なわけです。

では、BackgroundWorkerを使った場合とReactive Extensionsを使った場合、そしてAsync CTPのasync/await、つまりはTaskを使った場合とで比較していきます。

あ、そうそう、Async CTPの本格的な解説はmatarilloさんの訳されているEric Lippertの継続渡しスタイル(CPS)と非同期構文(async/await)やufcppさんの非同期処理 (C# によるプログラミング入門)でがっちりと解説されています。私はがっちりした記事は書けないのでひたすらゆるふわに機能を雑多につまみ食いで。あと、Reactive Extensionsとしつこく比較するのも忘れません。

BackgroundWorkerの場合

BackgroundWorkerは、DoWorkはバックグラウンドで、ProgressChangedとRunWorkerCompletedはUIスレッド上で動きます。これにより、Dispatcherだとか、そういうことを意識せずに使えます。勿論、DoWork内でDispatcher.BeginInvokeすることも可能ですが、そういう場合はBackgroundWorkerの意味があまりなくなってしまうので、設計には素直に従っておいたほうが良いです。というわけで例など。

static string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

static void Main()
{
    var bw = new BackgroundWorker
    {
        WorkerReportsProgress = true,
        WorkerSupportsCancellation = true
    };

    bw.ProgressChanged += (sender, e) =>
    {
        var percentage = e.ProgressPercentage;
        var state = e.UserState;
        Console.WriteLine(percentage + "%" + ":" + state);
    };

    bw.DoWork += (sender, e) =>
    {
        var worker = sender as BackgroundWorker; // くろーぢゃな場合はbwが直接取れるので不要ですが
        var result = (string)e.Argument;
        if (result == null) throw new ArgumentNullException("引数よこせゴルァ");
        worker.ReportProgress(1, result); // 進捗報告

        // 重たい処理が幾つかあって最終的な結果を出す
        // キャンセルは随時出来るようにする
        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(33, result); // 進捗報告

        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(66, result); // 進捗報告

        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(100, result); // 進捗報告

        e.Result = result; // 結果セットして正常完了
    };

    bw.RunWorkerCompleted += (sender, e) =>
    {
        if (e.Cancelled) // // キャンセルした場合
        {
            Console.WriteLine("キャンセルされたー");
        }
        else if (e.Error != null) // 例外発生の場合
        {
            Console.WriteLine("例外出たー");
            Console.WriteLine(e.Error);
        }
        else // 正常終了の場合
        {
            var result = e.Result;
            Console.WriteLine("終わった、結果:" + result);
        }
    };

    // 以下実行例

    bw.RunWorkerAsync("hoge"); // 非同期実行開始と初期引数

    Thread.Sleep(6000);
    bw.CancelAsync(); // 6秒後にキャンセルするなど

    while (bw.IsBusy) Thread.Sleep(1000);
    bw.RunWorkerAsync(null); // 今度は引数なしで実行するなど

    while (bw.IsBusy) Thread.Sleep(1000);
    bw.RunWorkerAsync("hoge"); // 最後まで実行

    Console.ReadLine();
}

実行結果などは他愛もないものなのでスルーで。さて、コードは見たとおりに、些か冗長なところはありますが一般的に考えられる処理は全て行えます。受け渡しがObjectなのダセーとか、EventArgsに値をセットして受け渡しダセーとか、キャンセルするのにCancellationPendingのチェックだりー、などなど思うところは色々あります。BackgroundWorkerのメリットはポトペタにあったと思われるので、時代背景的に、もうそぐわないかなあという気がかなりしています。

Reactive Extensionsの場合

Reactive Extensionsは、この手の非同期処理はお手の物。というわけでBackgroundWorkerで行った機能をまんま代替してみます。実行スレッドの切り替えはObserveOnで。

static string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

// WPFで適当なリストボックス(経過表示用)と適当なキャンセルボタンがあるとする
public MainWindow()
{
    InitializeComponent();

    Action<int, string> reportProgress = (i, s) => listBox1.Items.Add(i + "%:" + s);

    var disposable = Observable.Return("hoge", Scheduler.ThreadPool)
        .ObserveOnDispatcher().Do(s => reportProgress(1, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(33, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(66, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(100, s))
        .Subscribe(
            s => listBox1.Items.Add("終わった、結果:" + s),
            e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
            () => { });

    // キャンセルボタンクリックでキャンセル
    CancelButton.Click += (sender, e) =>
    {
        listBox1.Items.Add("キャンセルしたー");
        disposable.Dispose();
    };
}

んん、あれれ?進捗表示する時はDispatcherに切り替え、重い処理をする時はThreadPoolに流すよう切り替える。理屈は簡単。書くのもそのまま。しかし、しかし、これは、どう見ても非効率的。おまけにコードの見た目もUgly。ダメだこりゃ。そんな時は拡張メソッド。例えばこんなものを用意しよう。

public static class ObservableExtensions
{
    /// <summary>Report on Dispatcher</summary>
    public static IObservable<T> Report<T>(this IObservable<T> source, Action<T> action)
    {
        return source.Report(action, Scheduler.Dispatcher);
    }

    /// <summary>Report on Scheduler</summary>
    public static IObservable<T> Report<T>(this IObservable<T> source, Action<T> action, IScheduler scheduler)
    {
        return source.Do(x => scheduler.Schedule(() => action(x)));
    }
}

Doの変形バージョンで、actionをDispatcher.BeginInvoke(デフォルトでは。オーバーロードのISchedulerを渡すものを使えば、任意のスケジューラに変更出来ます)で行う、というものです。これなら進捗表示などにピッタリ合うはず。というわけで、適用してみます。

var disposable = Observable.Return("hoge", Scheduler.ThreadPool)
    .Report(s => reportProgress(1, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(33, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(66, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(100, s))
    .ObserveOnDispatcher()
    .Subscribe(
        s => listBox1.Items.Add("終わった、結果:" + s),
        e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
        () => { });

無難に仕上がりました。BackgroundWorkerと比べると、随分とすっきりします。受け渡しがオブジェクトではなく、しっかり型がついたままチェーンされること、例外処理もOnErrorの流れに沿ってすっきり記述できること、そして、何よりもキャンセル処理が楽!Disposeを呼ぶだけで、CancellationPendingのようなものをチェックする必要なくサクッとキャンセルすることが可能です。これは、処理単位が小さなメソッド毎に分割される、この場合は進捗報告を抜くとSelectの連打という形になりますが、その連打がちゃんと意味を持つわけです。

余談ですが、INotifyPropertyChanged経由のデータバインディングは自動でDispatcher経由にしてくれるようなので、その辺楽。UIパーツなんて直接触るもんじゃない、MVVM! でもObservableCollectionだとダメだったりするんですね、色々んもー。

Task(async/await)の場合

TaskにおけるキャンセルもBackgroundWorkerと同じく、キャンセル用オブジェクトの状態を確認して自分で挙動を挟む必要があります。ThrowIfCancellationRequested() を呼べばキャンセルされていた時は例外を送出して強制終了。

string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

// 進捗表示用入れ物クラス
class ProgressResult
{
    public int Percentage { get; set; }
    public string Value { get; set; }
}

async void DoAsync(string start, CancellationToken token, IProgress<ProgressResult> progress)
{
    // 進捗報告はIProgress<T>のReportを呼ぶ
    progress.Report(new ProgressResult { Percentage = 1, Value = start });

    try
    {
        var s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(start));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出
        progress.Report(new ProgressResult { Percentage = 33, Value = s });

        s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(s));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出
        progress.Report(new ProgressResult { Percentage = 66, Value = s });

        s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(s));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出

        listBox1.Items.Add("終わった、結果:" + s);
    }
    catch (OperationCanceledException)
    {
        listBox1.Items.Add("キャンセルされたー");
    }
}

public MainWindow()
{
    InitializeComponent();

    // プログレスが変化したときの挙動の登録
    var progress = new EventProgress<ProgressResult>();
    progress.ProgressChanged += (sender, e) =>
        listBox1.Items.Add(e.Value.Percentage + "%" + ":" + e.Value.Value);

    // キャンセルボタンを押したとする、時にキャンセルする
    var ctsSource = new CancellationTokenSource();
    button1.Click += (_, __) => ctsSource.Cancel();

    // 非同期実行
    DoAsync("hoge", ctsSource.Token, progress);
}

例外送出という形なので、BackgroundWorkerよりはキャンセルが楽です。プログレスに関しては、EventProgress<T>を用意して、それのReportメソッドを呼ぶという形になります。これはBackgroundWorkerに非常に近い感じですね。

同期→非同期

今まで見た「重い処理」であるHeavyHeavyHeavyMethodは同期的なものでした。言うならばWebRequestのGetResponse。もしくはCPU時間を喰う処理。では、BeginGetResponseのような、重い処理が非同期の場合の非同期処理(こんがらがる)はどうなるでしょう。

void HeavyMethod2(string s, Action<string> action)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(5000);
        var result = s + s;
        action(result);
    });
}

こんな、なんちゃって非同期メソッドがあるとして、こいつをどう料理出来るか。

非同期とBackgroundWorker

元から非同期のものに対し、BackgroundWorkerは無力です。破綻です。さようならです。

// DoWorkは実行されるとすぐに抜けて(HeavyMethod2が非同期のため)
// RunWorkerCompletedが呼ばれることになって全く正常に動かない
bw.DoWork += (sender, e) =>
{
    HeavyMethod2("hoge", s1 =>
    {
        bw.ReportProgress(33, s1);
        HeavyMethod2(s1, s2 =>
        {
            bw.ReportProgress(66, s2);
            HeavyMethod2(s2, s3 =>
            {
                bw.ReportProgress(100, s3);
            });
        });
    });
};

bw.RunWorkerCompleted += (sender, e) =>
{
    var result = e.Result;
    listBox1.Items.Add("終わった、結果:" + result);
};

bw.RunWorkerAsync("hoge");

これはちっとも動きません。というかReportProgressで例外が出ます(実行が完了=RunWorkerCompletedが呼ばれている状態ではReportProgressは呼べない)。なんとも、ならないですねえ。ここでAutoResetEventなどを呼んでDoWorkの完了を待機してやるぜ、という策もありますが、そんなことをやる意味は全くないでしょう。

Reactive Extensions

補助拡張メソッドとしてXxxAsObservableを定義しましょう。Begin-EndパターンのものならFromAsyncPatternが使えますが、今回のような俺々非同期メソッドには使えないので、AsyncSubjectを使って自前でラップします。

IObservable<string> HeavyMethod2AsObservable(string input)
{
    var asyncSubject = new AsyncSubject<string>();
    HeavyMethod2(input, s =>
    {
        try
        {
            asyncSubject.OnNext(s);
            asyncSubject.OnCompleted();
        }
        catch(Exception e)
        {
             asyncSubject.OnError(e);
        }    
    });
    return asyncSubject.AsObservable();
}

ラップ自体はそんなに難しいものでもないですし、定型なので割と楽です。AsyncSubjectの詳細、もしくは何故AsyncSubjectを使わなければならないのか、非同期ラップの落とし穴、的なものは以前の記事を参照してください。

var disposable = Observable.Return("hoge")
    .Report(s => reportProgress(1, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(33, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(66, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(100, s))
    .ObserveOnDispatcher()
    .Subscribe(
        s => listBox1.Items.Add("終わった、結果:" + s),
        e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
        () => { });

同期のものと見比べてもらうと分かりますが、ほとんど変わりません。SelectをSelectManyに変えただけです。同期だとか非同期だとか、そんなの全く関係なく同じように取りまとめられてしまう。これはRxの強みの一つです。

async/await

RxでAsyncSubjectを使ってラップしたように、こちらではTaskCompletationSourceを使ってラップします。詳細はRxを使って非同期プログラミングを簡単にで。そうしたら、後は以前のものと同じように書きます。同じなので割愛。

まとめ

BackgroundWorkerの成したことは大きいと思います。全く非同期を意識させずにコントロールのポトペタで、UIをブロックしないコードが書ける。でもその反面、受け渡しがobjectであったりと、弊害と限界が見えているように思えます。そしてそれは、非同期APIしかないSilverlightでついに限界を向かえた。もうそろそろ、お役御免。しょうがない。

では代わりに何を使うかと言ったら、Rxを使えばいいんじゃないでしょうか、いやこれは本気で。見てきたとおり、十分にBackgroundWorkerの機能を代替出来ていますし。TaskはSilverlightにはまだ入ってないし、素のままでは使いやすいとは言い難い。目の前に現実的な解が転がっているのだから、とりあえず使ってみるのもいいんじゃないかな。機能的にはReactive Extensionsがイケてるのは間違いないと思うので(キャンセルの容易さは非常に大きい!)、そして、現実的に使える形で提供されている状態でもあるので、Rx使うといいんぢゃないかな(そればっか)。

今後。私は、Reactive Extensionsとasync/awaitは共存するものだと思っています。そして、どちらも、必須であると、両者を知れば知るほど思い始めています。なので、もう単純に比較してどうこうはお終い。次は連携を考えていきたいと思います。とりあえず、何で共存するのか、何故に両者が必須であるのか(私であるのならばRxだけじゃダメなんですか!ダメなんです、の理由などなどり)は、そのうち書きます。

Reactive Extensionsとエラーハンドリング

例外処理は非常に大事だけれど、非常に難しい。非同期となると、なおのこと難しくなる!一体どう処理したらいいものか。勿論、放置するわけにもいかない避けては通れない話。そのため、Reactive Extensionsには豊富な例外処理手段が用意されています。決してOnErrorだけではありません。想像を遥かに越える、恐るべき柔軟さがそこにはあります。そして、これもまた、何故Rxを使うべきなのか、の強い理由の一つになるでしょう。

OnError

まずは基本の例から。なお、DownloadStringAsyncはReactive Extensions用のWebRequest拡張メソッドからです。BeginGetResponseしてStreamをReadToEndしたもの、と思ってください。この辺も書いてると長くなるし本質的には関係ないので。「非同期で文字列(HTML)をダウンロード」程度の意味で。

// 存在しないアドレスにアクセスしようとする(当然例外が起こる!)
// 出力結果は「リモート名が解決出来ませんでした」など。Timeoutを待つ必要はあります。
WebRequest.Create("http://goooooogllle.co.jp/")
    .DownloadStringAsync()
    .Subscribe(
        s => Console.WriteLine(s), // OnNext
        e => Console.WriteLine(e.Message), // OnError
        () => Console.WriteLine("completed!")); // OnCompleted

// OnNextのみの場合はcatchされずに例外が外にthrowされる
WebRequest.Create("http://goooooogllle.co.jp/")
    .DownloadStringAsync()
    .Subscribe(Console.WriteLine);

存在しないアドレスにアクセスしたため、WebExceptionが発生します。Rxにおける基本の例外処理は、Subscribe時にOnErrorにエラー時処理を書くこと。ここにシーケンス内で発生した例外が集められ、一括で処理出来ます。

ところで、Subscribeのオーバーロードは多数用意されている、ように見えて実のところ一つしかありません。IObserver<T>を受け取るものただ一つ。では、普段やっているAction<T>を一つだけ渡しているのは何なの?というと、OnNextだけ定義されたIObserver<T>を作るショートカットにすぎません。挙動としては、OnErrorを省略した場合は例外をcatchせずそのままthrowされていきます。OnErrorを定義した場合は、ここでExceptionを丸ごとcatch。

OnErrorで丸ごとキャッチというのは、try-catch(Exception e)のようかもしれません。例外は何でもキャッチはダメ、なるべく上の階層で処理すべき、というセオリーから考えると違和感も?ただ、同期的に書いた場合は下位層でcatchせず、最上位でcatchしよう、という話が成立しますが、非同期には最上位などというものはなく、OnErrorで掴まなければ集約例外ハンドラ行きとなるので、最上位の呼び出しメソッドなどというものはなく、そんな当てはまるものでもないかもです。というか、つまりはOnErrorが最上位のcatchの役割を担っているわけですね。

Catch

出てくる例外によって処理内容を変えたかったり、例外の種類によってはCatchしないで欲しかったりするシチュエーションはいっぱいあります。その場合OnErrorでとりあえずExceptionを取ってe is HogeException... などと分岐、というのは格好悪い。というわけで、Catchメソッド。

// TwitterのUserTimeLineは認証(OAuth)が必要なので例外が発生する!
// 出力結果は、以下のレスポンス
// {"error":"This method requires authentication.","request":"\/statuses\/user_timeline.json"}
WebRequest.Create("http://twitter.com/statuses/user_timeline.json")
    .DownloadStringAsync()
    .Catch((WebException e) => e.Response.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

Catchに渡すメソッドは型が指定出来て、型が一致しない例外はCatchしません。そして、ここで少し面白いのが渡すメソッドの戻り値はIObservable<T>でなければならないということ。例外が発生した場合は、後続に代わりのシーケンスを渡すことが出来ます。

例えばWebRequestではエラー原因を知るため、WebExceptionからResponseを取ってデータを取得したいわけです。そこで、そのままWebException中のResponseStreamから非同期で読み取ってそのまま流す。という例が上のコードです。WebAPIを試してる間はこうやってエラーメッセージが簡単に読み取れると非常に楽。OAuthとかややこしくて中々認証通せなくて泣きますからね……。

Empty/Never

Catchしたら例外処理する(ログに書くなどなど)だけで、別に後続に渡したいものなどない。という場合はEmptyを渡してやりましょう。

// 5が出たら例外出してみる
// 出力結果は1,2,3,4,completed
Observable.Range(1, 10)
    .Do(i => { if (i == 5) throw new Exception(); })
    .Catch((Exception e) => Observable.Empty<int>())
    .Subscribe(i => Console.WriteLine(i), e => { }, () => Console.WriteLine("completed"));

他に同種のものとして、Neverがあります。

// Neverは何も返さない物
// Emptyとの違いは、OnCompletedすら発生しない
// 余談ですが、FromEventの第一引数は最新情報によると h => h.Invoke が一番短く書けてお薦めです!
var collection = new ObservableCollection<int>();

Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
        h => h.Invoke, h => collection.CollectionChanged += h, h => collection.CollectionChanged -= h)
    .Select(e => (int)e.EventArgs.NewItems[0])
    .Do(i => { if (i == -1)  throw new Exception(); })
    .Catch((Exception e) => Observable.Never<int>())
    .Subscribe(Console.WriteLine, e => { }, () => Console.WriteLine("終了"));

// 出力結果は 300
collection.Add(300);  // 300が出力される
collection.Add(-1);   // 例外発生
collection.Add(1000); // デタッチ済みなので何も発生しない

NeverはOnCompletedを発生させないという性質が、Emptyとの使い分けとして面白いかもしれません。

OnCompletedとFinallyの違いについて

Catchがあるなら、当然Finallyもあります!そこでふと思うFinallyとOnCompletedってどう違うの?というと、OnErrorとの絡みで違います。

// 出力結果は 例外発生, ふぁいなりー
Observable.Throw<int>(new Exception()) // Throwは例外のみを出すというもの
    .Finally(() => Console.WriteLine("ふぁいなりー"))
    .Subscribe(
        i => Console.WriteLine(i),
        e => Console.WriteLine("例外発生"),
        () => Console.WriteLine("こんぷりーてっど"));

Reactive Extensionsにおいて守られている、また、もし自分で拡張メソッドを書く場合などに守らなければならない原則があります。それは「OnErrorとOnCompletedはどちらか一つのみが発生する」「OnError/OnCompleted後はOnNextは発生しない」という点。というわけで、FinallyとOnCompletedの違いですが、例外発生時にも実行されるのがFinally、そうでないのがOnCompletedといったところです。

また、Finallyは途中で挟むのでメソッドチェーンの並びによっては実行されるタイミングを調整出来るのもポイントです。例えば

// 1, 2, 3, ふぁいなりー1, 100, 101, 102, ふぁいなりー2
Observable.Range(1, 3)
    .Do(Console.WriteLine)
    .Finally(() => Console.WriteLine("ふぁいなりー1"))
    .TakeLast(1)
    .SelectMany(i => Observable.Range(100, 3))
    .Finally(() => Console.WriteLine("ふぁいなりー2"))
    .Subscribe(Console.WriteLine);

TakeLast(1)は最後の1つのみを取得する、そのためにそれ以前のものは「完了」していなければならない。完了したということはFinallyが発動する。というわけで、SelectrMany後の、SubscribeされているOnNextに届く前に一つ目のFinallyが実行されます。二つ目のFinallyに関しては、全てのシーケンスが列挙された最後となります。

この性質は、Streamなど適切にClose/Disposeしなければならないものの実行タイミングの調整に使えます。

OnErrorResumeNext

Catchと同じような性質を持つメソッドとして、OnErrorResumeNextがあります。両者の違いは、例外非発生時に現れます。

// 実行結果は「1, 2, 3」Catchは例外非発生時は後続を渡さない
Observable.Range(1, 3)
    .Catch(Observable.Range(100, 3))
    .Subscribe(Console.WriteLine);

// 実行結果は「1, 2, 3, 100, 101, 102」OnErrorResumeNextは例外非発生時も後続に繋ぐ
Observable.Range(1, 3)
    .OnErrorResumeNext(Observable.Range(100, 3))
    .Subscribe(Console.WriteLine);

代わりに渡す後続が、例外非発生時にも渡されるのがOnErrorResumeNext、渡さないのがCatch。つまりOnErrorResumeNextは必ず後続が繋がれるので、Catch().Concat()、で表現できます。

Retry/Timeout

Webにアクセスする時って失敗したらリトライしたいですよねー、特にTwitterなんて普通にサーバー不調でエラー返してきやがりますからね!という時はRetry。

// Access Start x3回のあとに401例外発生
Observable.Defer(() =>
    {
        Console.WriteLine("Access Start:");
        return WebRequest.Create("http://twitter.com/statuses/user_timeline.json").DownloadStringAsync();
    })
    .Retry(3)
    .Subscribe(Console.WriteLine);

再アクセスしているのが分かるようコードが少しゴチャついてしまいましたが……。この例では認証が必要なものにアクセスしているため、100% WebExceptionが発生してます。んが、Retry(3)ということで、3回リトライしています。リトライ処理は必須ではあるものの、面倒くさいものの筆頭でしたが、恐ろしく簡単に書けてしまいました。同様に、タイムアウトもあります。

// Timeoutは指定時間以内に値が通過しなければTimeoutExceptionが発生します
// カウントのタイミングはSubscribeされたらスタート
// この例では恐らく例外発生します(100ミリ秒で結果を返せる、ことは恐らくないでしょふ)
var wc = new WebClient();
Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
        h => h.Invoke, h => wc.DownloadStringCompleted += h, h => wc.DownloadStringCompleted -= h)
    .Timeout(TimeSpan.FromMilliseconds(100)) // 100ミリ秒
    .Take(1) // これつけないとTimeoutのチェックが働き続けます←WebClientもWebRequest感覚になるようにしたほうがRx的にはお薦め
    .Subscribe(e => Console.WriteLine(e.EventArgs.Result));

wc.DownloadStringAsync(new Uri("http://bing.com/"));

Web関連だとWebRequestであればTimeoutがありますが、ないもの(WebClient)もありますし、また、こちらのほうがお手軽なので、それなりに重宝すると思います。勿論、Web関連以外のメソッドでも使えますので、例えば一つのボタンが押されて、指定時間内にもう一つのボタンが押されなければTimeoutExceptionを出す、などなども考えられますね。更にそれをTimeoutExceptionをCatchで、指定時間内で二つのボタンが押されなかった場合の処理を追加、などなど、従来面倒くさかったことが恐ろしくスッキリ記述出来ます。

まとめ

ReactiveOAuthが機能的にスッカラカンなのは手抜きじゃなくて、(Retryとか)Rxに丸投げ出来るからです。ひたすら機能は分解、モジュールは小さく。というのは大変関数型的だと思うのですが、まさにそれです。(ただ、ちょっと、というかかなり手直ししないとマズいところがあるので近いうちに更新します……)

といったわけで、Rxはかなり柔軟にエラーを処理することが出来るため、もう同期とか非同期とか関係なく、Rx挟まないでネットワーク処理書くのは面倒くさくて嫌です、という勢いだったり。大変素晴らしい。というかもう全ての処理をRx上に載せてしまってもいいんじゃね?ぐらいの勢いだったりしますがそれはやりすぎである。

なお、この記事はC# Advent Calendar jp: 2010の12/4分として書きました。リレーが途切れると、寂しいです。最後まで完走させたいですね。JavaScriptなど割とすぐに埋まったのに、こんなところで言語の(日本のネット上での)人気不人気が視覚化されるとしたら、寂しい話です。

あと、私はJavaScript Advent Calendar 2010のほうでも12/20に書くので、20日もよろしくどうも。linq.jsとRxJSについて書きます。

Reactive Extensions用のWebRequest拡張メソッド

WebClientは楽ちんです。WebRequestはシンドイです。そのシンドさといったら、FromAsyncPatternでラップした程度じゃあまり意味がなかったりなわけです。いえ、単純なダウンロード程度ならいいのです。でも、アップロードとか!プログレスとか!そんなのに対応しようとすると、やっぱどうしょうもなく面倒臭い。とはいえ、面倒くさいのも一度書いてしまえば済むわけなので、一通り使いそうなものを書いてみました。Reactive Extensionsが動く環境(.NET 4 Client Profile, Silverlight4)と、Windows Phone 7環境では標準搭載のMicrosoft.Phone.Reactiveで確認取ってあります。

ソースコード、の前に利用例のほうを。

// DownloadStringAsyncメソッドで非同期読み込みも楽チン
var req1 = WebRequest.Create("http://www.twitter.com/statuses/public_timeline.json");
req1.DownloadStringAsync().Subscribe(Console.WriteLine);
 
// UploadValuesAsyncで非同期POSTも楽チン
// この例はgoo.gl短縮URLにPOSTして結果のJSONを取得するというもの
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsync(new Dictionary<string, string>
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    })
    .SelectMany(res => res.DownloadStringAsync())
    .Subscribe(Console.WriteLine);

実に簡単に、「WebClientの同期のように」WebRequestで非同期が扱えます。WebClientと同じ感覚で扱える、というのを大事にするためにも、メソッド名は同じにしてあります(但しUpload系の戻り値はWebResponseとしました、それが最もRx的に制御しやすいと感じたので)。名前末尾にAsyncを付けていますが、この命名規則はAsyncCTPから。

WebRequestでポストする方法のおさらい。GetRequestStreamでRequestStreamを取り出し、それにPOST内容を書きこんでStreamを閉じたら、今度はGetResponseでWebResponseを取得、そのResponseから GetResponseStreamでResponseStreamを取り出し、結果を読み込む。つまりは request.BeginGetRequestStream -> stream.BeginWrite -> request.BeginGetResponse -> stream.BeginRead。ふつーに書いたら人間の扱えるネスト量ではありませんね!

というわけで、streamへのWrite/Readは隠蔽し、単純にアップロードしたら長さ1のReactiveシーケンスが、ダウンロードしたら長さ1のReactiveシーケンスが戻ってくる。という形で簡単に扱えるようにしました。非同期が難しいとか面倒くさいとか、過去の話でしたね!

プログレス表示

プログレス(進捗表示)は大事です。WebClientにもProgressChangedイベントがあるし、せっかく非同期でやってるのだからプログレスも扱いたい。というわけで、それ関連のメソッドも作りました。プログレスは***WithProgressというメソッドを使うと、戻り値が IObservable<Progress<T>>になっていて、このProgressクラスにはValue:値, CurrentLength:現在の長さ, TotalLength:全体の長さ, Percentage:パーセント、の読み取り専用プロパティが入ってます。

// DownloadDataAsyncWithProgressメソッドで進捗を出しながら非同期に画像をダウンロード(して最後に保存する)
var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg")
    .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K
    .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示
    .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換
    .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存
 
// UploadValuesAsyncWithProgressで非同期アップロードの進捗表示もスムーズに
var req2 = WebRequest.Create("http://goo.gl/api/shorten");
req2.Method = "POST";
req2.ContentType = "application/x-www-form-urlencoded";
 
req2.UploadValuesAsyncWithProgress(new Dictionary<string, string> 
    {
        {"url", "http://google.co.jp/"},
        {"security_token", "null" }
    }, 10) // 分割サイズ、あまりに小さいのは良くない、本当は画像とか大きなファイルでやるべきなのですが適当なPOST先が見つからなくて...
    .Do(p => Console.WriteLine("Up: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // アップロード進捗表示
    .TakeLast(1) // アップロード完了(=最後の1個が通過)までスルー
    .SelectMany(_ => req2.DownloadDataAsyncWithProgress(10)) // 戻り値がProgress<Unit>なので、WebRequestからレスポンス取得
    .Do(p => Console.WriteLine("Down: {0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // ダウンロード進捗表示
    .Aggregate(new List<byte>(), (l, p) => { l.AddRange(p.Value); return l; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => Encoding.UTF8.GetString(l.ToArray())) // 応答を文字列(JSON)に変換
    .Subscribe(Console.WriteLine);

基本的には非プログレスと同じなのですが、長さ1ではなくて進捗状況に応じた分だけ値が流れてくるという違いがあります。また、書き込み時はUnit、読み込み時はbyte[]が来るので、それを一本にまとめる必要があります。というわけで、若干の手間とクセがあるのですが、まあそれなりに平易に扱えるのではと思います。UploadProgressとDownloadProgressを連結する辺りには、Rxの合成のパワーが見えるのではないでしょうかどうでしょうか。パワーが見えるのはいいとして意味が伝わるかは微妙なところなので、以下図解。

まずはResponseStreamに細切れにWriteし、進捗状況を垂れ流します。Writeなので戻り値はない。ので、Unit(voidみたいなものだと思ってくだしあ)。そしてSelectManyでWriteからReadへ。そう、RxにおいてのSelectManyは、1対多とかフラットにするとかというよりも、イメージとしては「摩り替える」だと思っています。マウスダウンしたと思っていたらマウスムーブに摩り替わっていた、何を (ry。といったように。今回のコードだと、Writeしていたと思ったらいつのまにかReadになっていた、何を言っているのか以下略。

Writeの戻り値にそのままSelectManyを繋げてはいけません。Readを始めるのはWriteが「終わってから」でなければならない。そこで出番なのがTakeLast(1)。意味はそのまんまで、最後の1個を通すというもの。対としてTake(1) - 最初の1個だけ通す、もよく使いますね。

Readもまた細切れにbyte[]で値が読み込まれ送られてきますが、文字列に変換するにせよ何にせよ、 IObservable<byte[]>からbyte[]にしたい。ここでIEnumerable<byte[]>だったら SelectMany.ToArray って感じですが、Observableではそうもいかないので、ここはAggregateを使ってリストに値を詰めてみました。

最後は文字列に変換して、煮るなり焼くなり好きにどうぞ。ブロックはいっぱいありますが、Subscribeに届くのは最後の、集計が全て終わった一つだけというわけでした。なお、進捗はDoメソッドで随時、その箇所に値が流れるたびに画面に出力しています。一見メソッドチェーンだらけで複雑そうですが、順を追ってみてみれば、メソッド全てが、メソッド名通りの意味を明確に持った挙動を取るのと、それぞれのメソッド自体はお馴染みのLinqの挙動そのものなので、Rxの導入までの学習コストというのは存外低いかもしれません(導入を超えた後の敷居に関してはノーコメント)。

あと、このようにブロックが細切れになっているわけですが、これによりキャンセルが容易になるという性質を持っています。キャンセルは Subscribeの戻り値(IDisposable)のDisposeを呼ぶだけで済むのですが、図のとおりに分割されているため、簡単にブロック間に割って入って処理を止めることが可能です。この、処理単位がIObservableとして分割されていることによるキャンセルの容易さは、わざわざ CancellationTokenをチェックしたり(Task)、CancellationPendingをチェックしたり (BackgroundWorker)を処理の途中に挟みこむ必要がないという、Rxの大きな利点となっています。この辺の比較などは次回にでも。

なお、プログレスや細かい単位でのキャンセルが必要なければ、冒頭の例のようにWithProgerss抜きの方のメソッドを使えば、通常の非同期と同じく長さ1のReactiveシーケンスとして、Aggregateとかの処理も拡張メソッド側で全部やってくれますのでお手軽に使えます。

ソースコード

以下ソースコード。ご利用はご自由にのパブリックライセンスで。.NET 4 Client Profile, Silverlight4, Windows Phone 7環境下で動くのを確認してます。名前空間としてusing AsynchronousExtensions することで、拡張メソッド群が使えるようになります。そんなわけでお薦めのファイル名はAsynchronousExtensions.cs。ソースコード一つというわけで、気に入らない箇所がありましたら、勿論当然直接書き換えてしまえばOKです。バグがあれば教えてください。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
#if WINDOWS_PHONE
using Microsoft.Phone.Reactive;
#else
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
#endif

namespace AsynchronousExtensions
{
    internal static class Progress
    {
        public static Progress<T> Create<T>(T value, double currentLength, double totalLength)
        {
            return new Progress<T>(value, currentLength, totalLength);
        }
    }

    internal class Progress<T>
    {
        public T Value { get; private set; }
        public double TotalLength { get; private set; }
        public double CurrentLength { get; private set; }
        public int Percentage
        {
            get
            {
                return (TotalLength <= 0 || CurrentLength <= 0)
                    ? 0
                    : (int)((CurrentLength / TotalLength) * 100);
            }
        }

        public Progress(T value, double currentLength, double totalLength)
        {
            Value = value;
            TotalLength = totalLength;
            CurrentLength = currentLength;
        }
    }

    internal static class WebRequestExtensions
    {
        public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<WebResponse>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar =>
                {
                    var res = request.EndGetResponse(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<Stream> GetRequestStreamAsObservable(this WebRequest request)
        {
            return ObservableForCompatible.Create<Stream>(observer =>
            {
                var disposable = new BooleanDisposable();

                Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, ar =>
                {
                    var res = request.EndGetRequestStream(ar);
                    if (disposable.IsDisposed) res.Close();
                    return res;
                })().Subscribe(observer);

                return disposable;
            });
        }

        public static IObservable<byte[]> DownloadDataAsync(this WebRequest request)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsync());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize));
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request)
        {
            return DownloadStringAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringAsync(encoding));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request)
        {
            return DownloadStringLineAsync(request, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebRequest request, Encoding encoding)
        {
            return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadStringLineAsync(encoding));
        }

        public static IObservable<WebResponse> UploadStringAsync(this WebRequest request, string data)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadStringAsyncWithProgress(this WebRequest request, string data, int chunkSize = 65536)
        {
            var bytes = Encoding.UTF8.GetBytes(data);
            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadValuesAsync(this WebRequest request, IDictionary<string, string> parameters)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsync(bytes);
        }

        public static IObservable<Progress<Unit>> UploadValuesAsyncWithProgress(this WebRequest request, IDictionary<string, string> parameters, int chunkSize = 65536)
        {
            var parameter = string.Join("&", parameters
                .Select(kvp => Uri.EscapeDataString(kvp.Key) + "=" + Uri.EscapeDataString(kvp.Value)).ToArray());
            var bytes = Encoding.UTF8.GetBytes(parameter);

            return request.UploadDataAsyncWithProgress(bytes, chunkSize);
        }

        public static IObservable<WebResponse> UploadDataAsync(this WebRequest request, byte[] data)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsObservable(data, 0, data.Length).Finally(() => stream.Close()))
                .TakeLast(1)
                .SelectMany(_ => request.GetResponseAsObservable());
        }

        public static IObservable<Progress<Unit>> UploadDataAsyncWithProgress(this WebRequest request, byte[] data, int chunkSize = 65536)
        {
            return Observable.Defer(() => request.GetRequestStreamAsObservable())
                .SelectMany(stream => stream.WriteAsync(data, chunkSize))
                .Scan(0, (i, _) => i + 1)
                .Select(i =>
                {
                    var currentLength = i * chunkSize;
                    if (currentLength > data.Length) currentLength = data.Length;
                    return Progress.Create(new Unit(), currentLength, data.Length);
                });
        }
    }

    internal static class WebResponseExtensions
    {
        public static IObservable<byte[]> DownloadDataAsync(this WebResponse response)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync())
                .Finally(() => response.Close())
                .Aggregate(new List<byte>(), (list, bytes) => { list.AddRange(bytes); return list; })
                .Select(x => x.ToArray());
        }

        public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize))
                .Finally(() => response.Close())
                .Scan(Progress.Create(new byte[0], 0, 0),
                    (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength));
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response)
        {
            return DownloadStringAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringAsync(this WebResponse response, Encoding encoding)
        {
            return response.DownloadDataAsync().Select(x => encoding.GetString(x, 0, x.Length));
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response)
        {
            return DownloadStringLineAsync(response, Encoding.UTF8);
        }

        public static IObservable<string> DownloadStringLineAsync(this WebResponse response, Encoding encoding)
        {
            return Observable.Defer(() => response.GetResponseStream().ReadLineAsync(encoding))
                .Finally(() => response.Close());
        }
    }

    internal static class StreamExtensions
    {
        public static IObservable<Unit> WriteAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern((ac, o) => stream.BeginWrite(buffer, offset, count, ac, o), stream.EndWrite)();
        }

        public static IObservable<int> ReadAsObservable(this Stream stream, byte[] buffer, int offset, int count)
        {
            return Observable.FromAsyncPattern<int>((ac, o) => stream.BeginRead(buffer, offset, count, ac, o), stream.EndRead)();
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data)
        {
            return WriteAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, encoding.GetBytes(data));
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IEnumerable<byte> data, int chunkSize = 65536)
        {
            return WriteAsync(stream, data.ToObservable(), chunkSize);
        }

        public static IObservable<Unit> WriteAsync(this Stream stream, IObservable<byte> data, int chunkSize = 65536)
        {
            return Observable.Defer(() => data)
                .Buffer(chunkSize)
                .SelectMany(l => stream.WriteAsObservable(l.ToArray(), 0, l.Count))
                .Finally(() => stream.Close());
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, string data, Encoding encoding)
        {
            return WriteAsync(stream, data + Environment.NewLine, encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data)
        {
            return WriteLineAsync(stream, data, Encoding.UTF8);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IEnumerable<string> data, Encoding encoding)
        {
            return WriteLineAsync(stream, data.ToObservable(), encoding);
        }

        public static IObservable<Unit> WriteLineAsync(this Stream stream, IObservable<string> data, Encoding encoding)
        {
            return WriteAsync(stream, data.SelectMany(s => encoding.GetBytes(s + Environment.NewLine)));
        }

        public static IObservable<byte[]> ReadAsync(this Stream stream, int chunkSize = 65536)
        {
            return Observable.Defer(() => Observable.Return(new byte[chunkSize], Scheduler.CurrentThread))
                .SelectMany(buffer => stream.ReadAsObservable(buffer, 0, chunkSize),
                    (buffer, readCount) => new { buffer, readCount })
                .Repeat()
                .TakeWhile(a => a.readCount != 0)
                .Select(a =>
                {
                    if (a.readCount == chunkSize) return a.buffer;

                    var newBuffer = new byte[a.readCount];
                    Array.Copy(a.buffer, newBuffer, a.readCount);
                    return newBuffer;
                })
                .Finally(() => stream.Close());
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, int chunkSize = 65536)
        {
            return ReadLineAsync(stream, Encoding.UTF8, chunkSize);
        }

        public static IObservable<string> ReadLineAsync(this Stream stream, Encoding encoding, int chunkSize = 65536)
        {
            return ObservableForCompatible.Create<string>(observer =>
            {
                var decoder = encoding.GetDecoder();
                var bom = encoding.GetChars(encoding.GetPreamble()).FirstOrDefault();
                var sb = new StringBuilder();
                var prev = default(char);

                return stream.ReadAsync(chunkSize)
                    .SelectMany(bytes =>
                    {
                        var charBuffer = new char[encoding.GetMaxCharCount(bytes.Length)];
                        var count = decoder.GetChars(bytes, 0, bytes.Length, charBuffer, 0);
                        return charBuffer.Take(count);
                    })
                    .Subscribe(
                        c =>
                        {
                            if (c == bom) { } // skip bom
                            else if (prev == '\r' && c == '\n') { } // when \r\n do nothing
                            else if (c == '\r' || c == '\n')   // reach at EndOfLine
                            {
                                var str = sb.ToString();
                                sb.Length = 0;
                                observer.OnNext(str);
                            }
                            else sb.Append(c); // normally char

                            prev = c;
                        },
                        observer.OnError,
                        () =>
                        {
                            var str = sb.ToString();
                            if (str != "") observer.OnNext(str);
                            observer.OnCompleted();
                        });
            });
        }
    }

    internal static class ObservableForCompatible
    {
#if WINDOWS_PHONE
        public static IObservable<IList<T>> Buffer<T>(this IObservable<T> source, int count)
        {
            return source.BufferWithCount(count);
        }
#endif

        public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
        {
#if WINDOWS_PHONE
            return Observable.CreateWithDisposable(subscribe);
#else
            return Observable.Create(subscribe);
#endif
        }
    }
}

Progress用のクラスと、WebRequest, WebResponse, Streamに対する拡張メソッドです。各メソッドは大体数行でローカル変数もほとんどないコンパクトなものですが(Rxの強力さに全力で乗っかってるだけ)、コード自体はあまり追いやすくはないかもです。まあ、面倒くさい部分は拡張メソッド側で隠蔽してやれるならそれで良いと思ってます。利用側はWebClientを同期で使うように、簡単にWebRequestが非同期で扱えるんじゃないかなー、と思いますがどうでしょう。そしてついでなので多めに作ってしまった……。

StreamのReadAsync、WriteAsyncですが、それぞれ一度の読み込み/書き込みサイズ指定(デフォルトは64K)で、ストリーミングで読み書きをするようになっています。特にWriteAsyncで要求するbyteはIEunmerable/IObservableですから。つまり、「ストリーミングで読みながら」「ストリーミングで書きこむ」ことが出来るという、メモリに非常に優しいプログラミングが可能になっています。(まあ、読み込みが速くて書き込みが遅い場合は、読み込みだけどんどん進んで、どこかで溜め込まれてしまうわけですけどー、WriteされたらReadが始まる、みたいな仕組み作れないかなー、とは思いつつ方法分からない)

ところでしかし、Rxも素のままではなく、拡張補助ライブラリ的なのを用意するといいのかもねえ。Achiralのように。T4によるFromEventの自動生成などと一緒にまとめて、Rx Supplemental Library。うーん。Rx利用の俺々MVVMライブラリ(を、いつか作りたい、今はまだMVVM自体がヨクワカッテナイのですが)と一緒に、そのうちにでも。

まとめ

WPF, SL, WP7で全く同じコードが動くって素敵。Rx素晴らしい。んで、もしかしてRxって面倒くさいの……? と思ったとしたら、いえ、違います。面倒くさいのはWebRequestでプログレスやアップロード処理を作るのが面倒くさいだけです。上のコードがゲップでそうな感じであるとしたら、Rxが面倒なせいではなくて、WebRequestが面倒なせいなだけです。これRx抜きに同期で書いても面倒臭いです。それどころか、一層大変なことになっていました。むしろRxの上に乗っかっているからこそ、色々な演算子が使えて、コンパクトに書けたのではないかと思われます。また、プログレスや柔軟なキャンセルなど、「クライアントアプリならスレッド余ってるし同期的に書いてThreadPoolに突っ込んでも問題ないし楽っしょー」といった次元を超えた価値をReactive Extensionsは提供できているのではないでしょうか。

プログレスは大事。何が大事って、最近Windows Phone 7を輸入して電波法違反、じゃなくて、ええと、まあ、b-mobile回線で使っているのですけど、死ぬほど遅い。b-mobile遅い。MAX 300kbpsと謳っていて、それも遅いわけだけど実測だと100kbpsで大変遅い。なので、TwitterのXML引っ張ってくる程度であっても(モバイル回線は貧弱なのでXMLじゃなくてJSONがいいですねえ)、何%といった表示は欲しかったりなのです。さすがにそれはやり過ぎだとしても、Twitpicなどにカメラ画像を上げる時などではもう必須と言ってもいいぐらい。

でも、WebRequest使うとプログレスはご覧のように面倒臭いので、どうしてもサポート出来なかったりなのですよね。WebClient使えよってのは正論なのですが、諸事情あったりでWebRequest使いたいって場合もあるでしょう(OAuthとかあるしね)。もうひとつは、Reactive Extensionsに載せるならイベントベースになっているWebClientよりもWebRequestのほうが使いよいということもあり。

そんなジレンマの日々も今日でサヨナラです。Rx + WebRequestでConsoleでも、WPFでも、Silverlightでも、Windows Phone 7でも、幸せな非同期生活を送りましょう。ところで今回のプログレスの話はasync/awaitやBackgroundWorkerと絡めてお話したかったのですが、長くなるので断念。これは次回に(いや、次々回かもしれませんが)必ず書きます!もう既に半分ぐらいサンプルコードとかも書いてはあるので、絶対に近いうちには。

なお、今回の記事はRxTeamのJeffery Van GoghのブログシリーズRx on the serverが下敷きになってます。そして、Rx on the serverで紹介されているコードは、最新のRxのリリースに含まれているSystem.Reactive.ClientProfileに収録されています。が、このPart2で紹介されているAsyncReadLines(非同期でのStreamから一行毎にString取り出し)は簡易的すぎて使い物になりません。2バイト圏無視してるし、1バイトであっても挙動は怪しい。サンプルレベルなら良いと思うし記事は素晴らしく参考になったのですが、本体に収録/配布はやめて欲しかったなあ、少しがっかり。

でもAsyncReadLines自体は欲しいですねえ。今のところ一行毎に取り出すにはStreamReaderしかないのだけれど、非同期APIがないので。非同期読み込み可能なStreamReaderは、何故かSystem.DiagnosticsにAsyncStreamReaderとしてあったりするのですが、internalクラスなので外からでは使えません。というわけで、非同期で一行毎読み込みをやるには、自前実装しか無いようで。うーん、やだなー。私はゆとりゆるふわプログラマなので、バイトとかエンコーディングとかが絡むのはやりたくないなー。というか絶対ミスするでする。

※追記@12/03, やっぱり欲しいと思ったのでReadLineAsyncとして実装しました。↑ソースコードにも反映させてあります。

※追記@2011/8/29, .NET版とWP7同梱版との互換性をもたせました。また、リソースの扱いを正確にしました。

※追記@2011/10/15, ReactiveProperty : WPF/SL/WP7のためのRxとMVVMを繋ぐ拡張ライブラリの一部として、より機能向上させライブラリとしてまとめあげたものを公開しました。↑ソースコードをそのまま使うより、ReactivePropertyを参照するほうをお薦めします。

余談:Windows Phone 7

そういえば、WP7ですか?中々良いですよ、日本語さえまともならば。それを抜きにしても、正直なところ想像以上に自由の効かないガチガチ縛りなので、Androiderな方々は絶対気に入らないと思っています。まあでも、私は好きですねえ。初版なのでダメなところが目立つのはしょうがないし、徐々に改善される(といいなあ!)でしょう。良いところに目を向ければ、大変可能性を感じるOSに仕上がってると思います。

ちなみにWP7の開発環境に望むのはJSONサポート何とかしる!ってことですかね。標準だとシリアライザしか入ってないんですよ、ふざけんな死ね。System.Json入れるかJsonReaderWriterFactory入れるかしろって話ですよ本当に。あとdynamicのサポートが入ってくれればDynamicJsonを対応させるんだけどなあ。

今のところ実機にアプリケーションを転送できません。WP7は認証が必要なので(有料)。で、認証のための証明書の発行手続きはしたのですが、その発行を請け負っているGeoTrustという会社があまりにも仕事しないせいでうばばばば。しょうがないので、今やれることは基盤をがっつし固めることぐらいですはい。

C#のEnumを(Javaのように)別の値を持たせるなど拡張する

Enumに文字列を与えたいというのは少なくなくよくあると思います。例えばFruits.Appleには.ToString()したら「リンゴ」と出て欲しいなー、とか。それならFruits.リンゴと、日本語名つければ?というのはごもっとも。でも、同時に「林檎」とも付けたいなー、とかも思ってしまったりするわけです。しません?Java→C#な人が一番不満に思うのはEnumのようですし(JavaのEnumは高機能!)。

例えばこんな風にかけたらいいな、って。

// こうやって属性定義するだけ!
public enum Color
{
    [Japanese("黒"), Hex("000000"), Rgb(0, 0, 0)]
    Black,
    [Japanese("白"), Hex("FFFFFF"), Rgb(255, 255, 255)]
    White,
    [Japanese("赤"), Hex("FF0000"), Rgb(255, 0, 0)]
    Red
}

class Program
{
    static void Main(string[] args)
    {
        var red = Color.Red;
        Console.WriteLine(red.ToHex()); // FF0000
        Console.WriteLine(red.ToJpnName()); // 赤
        Console.WriteLine(red.ToRgb()); // 255000000
    }
}

んね、非常にすっきり定義出来て幸せ度高い。これをもし普通に書くならば

interface IColor
{
    string EngName { get; }
    string JpnName { get; }
    string Rgb { get; }
    string Hex { get; }
}

public class Red : IColor
{
    public string EngName { get { return "Red"; } }
    public string JpnName { get { return "赤"; } }
    public string Rgb { get { return "255000000"; } }
    public string Hex { get { return "FF0000"; } }
}

といった感じになって面倒くさいことこの上ない(いや別にこれクラスの意味あんまなくてnew Color("Red", "赤",..)といった感じにインスタンスでいいやん、という感じではありますががが。Enumはswitch要因なとこがメインなので、そもそもColorという例が良くないかしら...) まあともかく、Enumは素晴らしい。属性もまた素晴らしい。んで、Enumへの別名などの定義の仕組みは非常に単純で、個別のEnumへ拡張メソッドを定義しているだけです。Enumと拡張メソッドは相性が良い。

public static class ColorExtensions
{
    private static Dictionary<Color, RgbAttribute> rgbCache;
    private static Dictionary<Color, HexAttribute> hexCache;
    private static Dictionary<Color, JapaneseAttribute> jpnCache;

    static ColorExtensions()
    {
        // Enumから属性と値を取り出す。
        // この部分は汎用的に使えるようユーティリティクラスに隔離してもいいかもですね。
        var type = typeof(Color);
        var lookup = type.GetFields()
            .Where(fi => fi.FieldType == type)
            .SelectMany(fi => fi.GetCustomAttributes(false),
                (fi, Attribute) => new { Color = (Color)fi.GetValue(null), Attribute })
            .ToLookup(a => a.Attribute.GetType());

        // キャッシュに突っ込む
        jpnCache = lookup[typeof(JapaneseAttribute)].ToDictionary(a => a.Color, a => (JapaneseAttribute)a.Attribute);
        hexCache = lookup[typeof(HexAttribute)].ToDictionary(a => a.Color, a => (HexAttribute)a.Attribute);
        rgbCache = lookup[typeof(RgbAttribute)].ToDictionary(a => a.Color, a => (RgbAttribute)a.Attribute);
    }

    public static string ToJpnName(this Color color)
    {
        return jpnCache[color].Value;
    }

    public static string ToHex(this Color color)
    {
        return hexCache[color].Value;
    }

    public static string ToRgb(this Color color)
    {
        var rgb = rgbCache[color];
        return string.Format("{0:D3}{1:D3}{2:D3}", rgb.R, rgb.G, rgb.B);
    }
}
// 属性などり
[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class JapaneseAttribute : Attribute
{
    public string Value { get; private set; }

    public JapaneseAttribute(string value)
    {
        Value = value;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class RgbAttribute : Attribute
{
    public int R { get; private set; }
    public int G { get; private set; }
    public int B { get; private set; }

    public RgbAttribute(int r, int g, int b)
    {
        R = r;
        G = g;
        B = b;
    }
}

[AttributeUsage(AttributeTargets.Field, Inherited = false, AllowMultiple = false)]
public sealed class HexAttribute : Attribute
{
    public string Value { get; private set; }

    public HexAttribute(string value)
    {
        Value = value;
    }
}

少し、処理が多いですかね。属性定義の部分はスルーでいいので、本質的にはToJpnNameなどの拡張メソッド定義の部分と、静的コンストラクタでEnumから属性と値を取り出してる部分だけなので、そんなでもないです。静的コンストラクタを使っているのは、ここでDictionaryに突っ込むことで、毎回リフレクションがゴニョゴニョと走り出すのを避けています。

属性は固定で複数を付加するなら名前付き引数でもいいですね。

まとめ

まとめというか、このネタは以前にも enumの日本語別名とか三項演算子ネストとか という記事で書いてたりはしたのですが、もう少し実用的になるような形にしました。

C#のEnumは、私は好きです。素朴というか、シンプルで使いやすいですよね。Visual Studioでswitch使うと一気に列挙してくれるのもいいし、勿論ビットフラグも。機能だけならクラス立てれば代替出来ないこともないけれど、色々と遥かに面倒くさくなる。Enumはシンプルに、すっきり書けるのが素敵。Javaは高機能(コンパイル時にクラス作るわけですしね)なのはいいとしても、EnumSetとか、なんだか大仰で、その辺があまり好きではない。

この拡張メソッドは、隙間を少し埋めてくれるかな、と思います。

イベントベースの非同期処理をReactive Extensionsで扱う方法

Reactive Extensionsを使用して複数のサービス非同期コールを連続して呼べるか試してみた。その2 - y_maeyamaの日記 という記事にて、Rxを使ってWCFを綺麗に呼んでました。なるほど!Silverlightで!WCFで!非同期コールを連続して呼ぶ!とっても実践的なテーマ。XboxInfoほげほげがどうのこうの、とかやってたどこかの私と違います、げふんげふん。とても素晴らしいテーマとコードだと思ったので、拝借して私も少し書いてみました。

// 実行部分(実行後10秒後ぐらいにWCFサービスの連鎖を経てメッセージボックスに結果を表示)
var client = new ServiceReference1.Service1Client();

var asyncQuery = from a in Observable.Defer(() => client.GetAAsObservable("てすと"))
                 from b in client.GetBAsObservable(a.EventArgs.Result)
                 from c in client.GetCAsObservable(b.EventArgs.Result)
                 select c.EventArgs.Result;

asyncQuery.Subscribe(s => MessageBox.Show("チェーン終了 - " + s));

「前に呼んだサービスの引数にアクセス」に関しては、SelectManyで匿名型を作る、が解だと思います。で、幾つも連鎖する時はクエリ構文を使うのが大変お手軽。ただしクエリ構文は独自定義のメソッドと繋がりが悪くなるという欠点があります。RxではDoとかObserveOnとか結構使いますから、クエリ構文、便利な用で使いどころに困る代物。その辺は実際に書く内容によって判断つけるところですねー。

XxxAsObservable

突然出てきているGetAAsObservableって何だ、といったら、勿論、拡張メソッドです。FromEventやFromAsyncPatternなどはコード本体に書いてあると鬱陶しいので、基本は拡張メソッドで隔離してしまいましょう。命名規則に特に決まりはありませんが、私はXxxAsObservableという規則で書いています。今回はWCFのサービス参照で自動生成されたイベントベースの非同期処理のラップなので、FromEventをベースに、ただし少し小細工を。

public static class IObservableExtensions
{
    // イベントベースの非同期はRxと今一つ相性が悪いので変換する
    public static IObservable<IEvent<T>> ToAsynchronousObservable<T>(this IObservable<IEvent<T>> source) where T : EventArgs
    {
        var connectable = source
            .Take(1) // 実行は一回のみ(これしとかないとイベントハンドラの解除がされないし、そもそもPruneが動かなくなる)
            .Prune(); // AsyncSubjectに変換
        var detacher = connectable.Connect(); // コネクト(イベント登録実行)
        return connectable.Finally(() => detacher.Dispose()); // 任意のDispose時にイベントのデタッチ(なくても構いません)
    }
}

public static class Service1Extensions
{
    // 量産なので多いようならT4テンプレートで生成してしまうと良いでしょう
    public static IObservable<IEvent<GetACompletedEventArgs>> GetAAsObservable(this Service1Client client, string key)
    {
        var o = Observable.FromEvent<GetACompletedEventArgs>(
                h => client.GetACompleted += h, h => client.GetACompleted -= h)
            .ToAsynchronousObservable();
        client.GetAAsync(key); // 非同期実行開始
        return o;
    }
    
    // あと二つほど
}

ふつーにFromEventをラップして終わり、ではなくて少々細工を仕込んでいます。理由は、イベントベースの非同期処理はそのままだとRxでは扱いづらいから。

イベントベース非同期処理とReactive Extensions

.NET Frameworkには基本的に2つの非同期処理の方法があります。一つはBeginXxx-EndXxxによるAPM(非同期プログラミングモデル)。もう一つはXxxAsyncとXxxCompletedによるイベントベースの非同期処理。イベントベースは素で扱う分にはAPMよりもずっと書きやすかった。だからWebClient、BackgroundWorkerなど、手軽に使える系のものに採用された(というのが理由かは知りませんが)。そしてサービス参照の自動生成のものもイベントベース。

しかし、ラッピングして使う場合はとにかく使いづらい!一度の登録で何度も何度も実行されてしまうことは合成時に都合が悪い。また、ラップしたメソッドと、引数を渡す処理の実行箇所が離れてしまうことは(FromEvent.Subscribe で登録して client.GetAsync で実行)書くのが二度手間になり面倒、という他に、Subscribeよりも前に非同期実行して、更にSubscribeよりも前に非同期実行が完了してしまった場合は何も起こらなくなる。といった実行タイミングの面倒くさい問題まで絡んでくる。

というわけで、イベントベースの非同期処理をReactive Extensionsに載せる場合は、ただたんにFromEventで包むだけではなく、一工夫することをお薦めします。

それが上で定義したToAsynchronousObservable拡張メソッド。これはRxを使って非同期プログラミングを簡単にの時に少し出しました。 source.Take(1).Prune() ですって! ふむ、よくわからん。一行ずつ見ていくと、sourceは、この場合はFromEvent後のものを想定しています(なので拡張メソッドの対象はIObservable<IEvent>)。それをTake(1)。これは、ふつーの非同期処理では実行完了は1回だけだから、それを模しています。イベントベースのラップなので何もしないと無限回になっていますから。

続けてPrune。これは内部処理をAsyncSubjectに変換します。AsyncSubjectに関してはReactive Extensionsの非同期周りの解説と自前実装で簡易実装しましたが、処理が完了(OnCompleted)まで待機して、完了後はキャッシュした値を流すという、非同期処理で不都合(実行タイミングの問題など)が起こらなくするようにしたもの。対になるのはSubjectでイベント処理を模したもの。FromEventを使うと内部ではSubjectが使われることになるので、それを非同期で都合が良い形であるAsyncSubjectに変換する、ためにPruneを使いました。そしてここでConnectすることで即座にイベントにアタッチします(実行タイミングの前後の問題をなくすため)。

awaitは救いの手?

Reactive Extensionsは準備さえ済んでしまえば、非同期が恐ろしく簡単に書ける、のですが、如何せん準備が決して簡単とは言いません。さて、それがC# 5.0のasync/awaitは魔法のように解決してくれるのか、というと、そんなことはありません。現在のAsync CTPではAsyncCtpLibrary.dllにAsyncCtpExtensionsというクラスが用意されていて、その中には既存クラスに対して涙ぐましいまでにAsyncで使うのに最適なようにとXxxAsync拡張メソッドが大量に定義されています。例えばWebClientのDownloadStringにはDownloadStringTaskAsyncが。少しその中身を見てみれば、上で書いたものとやっていることは同じようなもの。イベントを登録して、完了時にはイベントハンドラを解除して。

既存クラスは用意してもらったのがあるからいいけど、サービス参照のような自動生成されるクラスにたいしてはどうすれば?答えは勿論、自分で書こうね!orz。まだまだ、そんな受難な時代は続きそうです。でも、Rxは一度分かってしまえばTake(1).Prune()で済むし、T4で自動生成もそんな難しくない雰囲気なので、まあ悪くはないんじゃないでしょーか。悪いのはむしろイベントベースの非同期処理なのでは、という気がしてきた昨今。

まとめ

今月はまだまだ非同期周りの記事を書くよ!いっぱい書くよ!

そういえばで、今回のソースをBitbucketに上げました。今後も何か書くときは合わせてBitbucketに上げていきたいと思っています。

neuecc / EventToAsync / overview – Bitbucket

RSSなどのアイコンが並んでいる部分の一番右のget sourceからzipで落とせます(Bitbucketはナビゲーションが少しわかりづらいのよねえ、Mercurialベースで非常に快適なソースホスティングサービスだとは思うのですが)。また、RxのライブラリはNuGetを使って参照しています(なのでRx本体のインストールがしてなくても試せます、かわりにNuGetが必要?不必要?その辺まだ仕組みがよく分かってないのですが……)。以前はNuPackっていう名前でしたが、名前が被るとかで変更されたそうです。VS2010の拡張機能マネージャのオンラインギャラリーからもVS拡張がインストール出来るので、とりあえずお薦め。色々なライブラリが簡単にインストール出来ます。

私もlinq.js登録しちゃおうかな、かな……。まあ、Fixしなきゃいけないことがかなり溜まってるので、まずはそれやれよって話なのですががが。

実践例から見るReactive Extensinosを用いた非同期処理

私はXboxInfoTwitという、Xbox.comからデータをスクレイピングで取り出してTwitterに投稿するという、大変不届き者なアプリケーションを公開しています。お陰様で認証者数も2500人を超えて、割と活況。このアプリケーションでのデータ取得ですが、正規なルートでの情報取得ならばAPI叩けば一発、というケースも少なくないですが、いかんせんルートがアレなので一回の取得で取れる情報は断片。あちらこちらから値を受け渡し組み立てなければならないという、入り組んだ通信手順になっています。これを、もし全部非同期でやろうとしたら目眩がするなあ。ふむ。そこでReactive Extensions。むしろ良いネタがあるじゃないか!というわけでクローラーのコア部分を完全非同期 + Reactive Extensinosで全面的に書きなおしてみました。

neuecc / XboxInfoTwitCore / overview – Bitbucket

dllでライブラリという形体を取っていますが、基本的には誰にも使い道がないものかと思われます。というわけで、このXboxInfoTwitCoreの内容自体はどうでもいいとスルーして、この現実的な課題(?)にReactive Extensinosがどう簡単にしてくれたかを見ていきます。

非同期通信の連続

まずは、手順について図で。最近、パワーポイントでのポンチ絵を書く練習を兼ねて、しかしこんなヘタクソで大丈夫か??

プレイ状況の取得は、MyXbox/Profileから取得、実績内容の取得はそこからゲームのタイトルIDを取り出して、個別のゲーム画面のURLを作り取得。最終的には両者を結合してデータを作り出します。ただし、ログイン前にMyXbox/Profileにアクセスすると認証フォームにリダイレクトされるので、そこでごそごそと処理してlive.comの認証クッキーを取得する必要があります。この認証クッキー取得が若干ややこしい。一度分かってしまえばそんなでもないのですけれど、ちょびっと嫌らしい造りになっています(恐らくこの認証部分はWindows Liveで共通だと思うので、SkyDriveをモニョモニョしたい、など考えている人は参考にでもどうぞ)

ちなみに非同期のPOSTってのは、BeginGetRequestStreamで非同期でRequestStreamを呼び出して書き込む(POST)してから、BeginGetResponseで非同期でレスポンスを取得するので、ネスト量2倍。クッキー認証部分だけで、普通に書くと7つぐらいネストするわけです……。

Asynchronus Query

細部を見る前に、最終的にどうなったか、を。

var fetcher = new LiveTokenFetcher("userid", "password");
var locale = new Locale("ja-JP", "日本");

// 非同期クエリ組み立て
var asyncQuery =
    from crawler in
        from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
        select new XboxInfoCrawler(cookie, locale)
    from pi in crawler.GetPlayerInfo()
    from ai in pi.GameInfo.TitleId != 0
        ? crawler.GetAchievement(pi.GameInfo.TitleId)
        : Observable.Return<AchievementInfo>(null)
    select pi.Tap(x => x.GameInfo.AchievementInfo = ai); // Tapについては解説しな...い

// 実行
asyncQuery.Subscribe(pi => Console.WriteLine("{0} - {1}", pi.GamerTag, pi.PlayerState));

(私にしては珍しく)クエリ構文を使っていますが、こう見ると本当に、非同期処理がLinqに統合された形で問い合わせられる、ということが良く伝わるのではないでしょうか?処理の順序が一見イミフ、というのは慣れの問題です!解説します。

GetLoginCookie、GetPlayerInfo、GetAchievementがIObservable<T>を返す、非同期処理。GetLoginCookieはポンチ絵で言うところの、クッキー取得で、GetPlayerInfoが中央左、GetAchievementが中央右で、最後のselectがマージというわけです。ややこしくネストするはずの非同期が非常にあっさりと記述できました。

GETとPOSTの連続

一番面倒くさいログインクッキー取得の部分について。

public IObservable<CookieCollection> GetLoginCookie(string url)
{
    return Observable.Defer(() => CreateWebRequest(url).GetResponseAsObservable())
        .Select(res => res.TransformResponseHtml(xml => new
        {
            Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
            PostUrl = GetPostUrl(xml),
            PPFT = GetInputValue(xml, "PPFT"),
            PPSX = GetInputValue(xml, "PPSX")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post, a.Cookie)
            .UploadValues(new Dictionary<string, string>
            {
                {"LoginOptions", "3"},
                {"PPFT", a.PPFT},
                {"PPSX", a.PPSX},
                {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
                {"login", loginId},
                {"passwd", password}
            }))
        .Select(res => res.TransformResponseHtml(xml => new
        {
            PostUrl = GetPostUrl(xml),
            Anon = GetInputValue(xml, "ANON"),
            T = GetInputValue(xml, "t")
        }))
        .SelectMany(a => CreateWebRequest(a.PostUrl, MethodType.Post)
            .Tap(r => r.AllowAutoRedirect = false)
            .UploadValues(new Dictionary<string, string>
            {
                {"ANON", a.Anon},
                {"t",Uri.EscapeDataString(a.T)}
            }))
        .Select(res => ConstructCookie(res.Headers["Set-Cookie"]));
}

となってます。長くて一瞬ウゲ、となりますが、基本的にはSelectとSelectManyしか使っていません。Selectは流れてくる値を変換。SelectManyは、次の非同期処理の実行。そう覚えて貰えれば、どうでしょう?

先頭から見るとurl(MyXbox/Profile)に対して非同期でのGetResponseを開始。認証フォームにリダイレクトされるので、HTMLをスクレイピングしてPOSTに必要なデータ(PPFT,PPSXなど)を集める。そして非同期でポスト(UploadValuesは、詳細はソースコードを参照してもらうとして非同期で投稿してIObservable<WebResponse>を返す自前定義の拡張メソッドです)。続いて、スクレイピングして次のPOSTに必要なデータ(ポスト先URL,Anon,T)を生成、そして再びUploadValuesで非同期ポスト。するとWebResponseに認証クッキーがあるので、ヘッダーの"Set-Cookie"からクッキーを取ってCookieCollectionを生成したものを返す。

となってます。流れるように!また、最後がSelectで終わっているように、流れは終わっていません。利用者が、ここから更に非同期での流れを繋いでいくわけです。

SelectとSelectMany、そしてクエリ構文

御大層なことを言いつつ、SelectとSelectManyしか使ってないじゃないか!というと、はい、その通りです。それでいいんです。ふつーのシチュエーションでの非同期処理って、そういうものでしょう?データを加工するSelectと、ネストをフラットにするSelectMany。これだけ覚えれば十分であり、そして、役立ちなのです。

ところで、冒頭の例で(メソッド構文信者な私が)何でクエリ構文を使ったかというと、そこそこ綺麗に見えるかなー、とか思ったのともう一つは、割と必要に迫られたから。書き方は色々あります。例えばクエリ構文をベースにするなら、他にも

// クエリ構文とメソッド構文を混ぜるのも時には分かりやすさに繋がる
from crawler in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
                       .Select(cookie => new XboxInfoCrawler(cookie, locale))
from pi in crawler.GetPlayerInfo() // 以下略

// select intoを使うことで完全フラットに流すことも可能です
from cookie in fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
select new XboxInfoCrawler(cookie, locale) into crawler
from pi in crawler.GetPlayerInfo() // 以下略

と出来るでしょう。特に後者のselect intoを使った書き方は冒頭のものより良いかもしれません。ネストして順番が上下するのは複雑さの現れになりますから。では、メソッド構文で書くと?

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale)) // クッキーをラップしたもの
    .SelectMany(crawler => crawler.GetPlayerInfo())
    .SelectMany(playerInfo => crawler.GetAchievement(playerInfo.GameInfo.TitleId));
    // と、書きたいのですが、SelectManyでplayerInfoに変換されているのでcrawlerはスコープ外でこうは書けない!

SelectManyは、前方の値が使えません。GetLoginCookieのように前の値は一切使わないのならば何ら問題ないのですが、GetAchievement(実績取得)はクッキーとGetPlayerInfoで得られたタイトルIDの"両方"が必要。この両方、というシチュエーションの時に、上から流れて変形するだけのSelectManyだと非常に、書きづらい。ではメソッド構文ではどうするか、というと、クエリ構文のように考える。

var asyncQ = fetcher.GetLoginCookie(new XboxUrlBuilder(locale).Profile())
    .Select(cookie => new XboxInfoCrawler(cookie, locale))
    .SelectMany(crawler => crawler.GetPlayerInfo().Select(playerInfo =>
        new { crawler, playerInfo })
    .SelectMany(a => a.crawler.GetAchievement(a.playerInfo.GameInfo.TitleId)));

下側のフローに二つ以上の値を持ち越したいのならば、二つ以上の値を格納した匿名型を作ればいいぢゃない。ということですね。クエリ構文の実態もこうなっています。正直、メソッド構文でこれを書くのは二つだけならまだいいですが、幾つも繋いでいくとなると相当汚くなるので、これはもう素直にクエリ構文の出番だな、と私は思いました。

奇怪なるクエリ構文の実態については多重 from の展開結果 - ++C++; // 管理人の日記で書かれています。って、よくみるとTrackbackに昔の私が送ってますね、ふむふむ、Linq to ObjectsのJavaScript移植であるlinq.jsで書くとこうなるようですよ?

// クエリ
var apart = Enumerable.Range(1, 5);
var query = apart
   .SelectMany(function(baker){ return apart
   .SelectMany(function(cooper){ return apart
   .SelectMany(function(fletcher){ return apart
   .SelectMany(function(miller){ return apart
   .Select(function(smith){ return {
       baker: baker, cooper: cooper, fletcher: fletcher, miller: miller, smith: smith}})})})})})
   .Where("Enumerable.From($).Distinct('$.Value').Count() == 5")
   .Where("$.baker != 5")
   .Where("$.cooper != 1")
   .Where("$.fletcher != 1 && $.fletcher != 5")
   .Where("$.miller > $.cooper")
   .Where("Math.abs($.smith - $.fletcher) != 1")
   .Where("Math.abs($.fletcher - $.cooper) != 1");

// 出力
var result = Enumerable.From(query.Single()) // 答えは一つなのでSingle,そしてobjectをKeyValuePair[]に分解
   .OrderBy("$.Value")
   .ToString(", ", "$.Key + ':' + $.Value"); // シーケンスを文字列に結合

alert(result); // smith:1, cooper:2, baker:3, fletcher:4, miller:5

linq.jsは"完全な"移植なので、.NETで動くコードが動かない、ということはありません。JavaScriptにもLinqの革命を!と、宣伝はさておき、SelectManyで下方に値を持ち出しているのに、毎回匿名型を作っていません。SelectManyのresultSelectorの部分でネストさせることで、値を下の方まで持ち運ぶ事が可能です。これはJavaScriptだけでなくC#でのLinq/Rxでも有効です。一つのパターンというかテクニックというか。C#ではわざわざこんなことやるぐらいならクエリ構文使えという気がしますが、クエリ構文の使えないReactive Extensions for JavaScript(RxJS)では活きるかもしれません。

ただ、この手のインデントの細工による対策は、IDEの自動整形と相性悪いんですよね。上のSelectManyのものも、コードフォーマッタにかけるとボロボロに崩れ去ります。基本的には私は、インデントは機械任せでやるべき。人が調整するものではないし調整してはならない。と思っているので、残念ながら上記テクニックは使うことはないかもなー、いや、どうだろう、積極的にはやらないという程度かしら。

The Future of C#(async/await)

つい数日前に開催されたPDC2010で、C#設計者のAnders Hejlsbergが、The Future of C#と題してC#5.0(とは言ってませんが、恐らくそうなる)のFeatureを語りました。一つは今まで言われていたCompiler as a Service。もう一つが、asynchronusの言語統合。C# 2.0でのyield returnによるIEnumerable生成のようなコンパイル時生成で非同期をコードの見た目上、完全に同期のように扱うことが出来ます。

Asynchronous Programming for C# and Visual BasicにてVisual Studio Async CTPが出ていて既に試してみることが可能なので(要:英語版VS2010)、実際にXboxInfoTwitCoreをasync/awaitで書き直してみました。全体はbitbucketのリポジトリ上にあるので興味あればそちらもどうぞ。以下は、ログインクッキーを取得する部分。

private async static Task<WebResponse> UploadValuesAsync(WebRequest request, IDictionary<string, string> parameters)
{
    var bytes = parameters.ToQueryParameter().Pipe(Encoding.UTF8.GetBytes);
    using (var stream = await request.GetRequestStreamAsync())
    {
        await stream.WriteAsync(bytes, 0, bytes.Length);
    }
    return await request.GetResponseAsync();
}

public async Task<CookieCollection> GetLoginCookie(string url)
{
    var res = await CreateWebRequest(url).GetResponseAsync();
    var prepare = res.TransformResponseHtml(xml => new
    {
        Cookie = ConstructCookie(res.Headers["Set-Cookie"]),
        PostUrl = GetPostUrl(xml),
        PPFT = GetInputValue(xml, "PPFT"),
        PPSX = GetInputValue(xml, "PPSX")
    });

    var req2 = CreateWebRequest(prepare.PostUrl, MethodType.Post, prepare.Cookie);
    var res2 = await UploadValuesAsync(req2, new Dictionary<string, string>
    {
        {"LoginOptions", "3"},
        {"PPFT", prepare.PPFT},
        {"PPSX", prepare.PPSX},
        {"PwdPad", PwdPadBase.Substring(0, PwdPadBase.Length - password.Length)},
        {"login", loginId},
        {"passwd", password}
    });

    var prepare2 = res2.TransformResponseHtml(xml => new
    {
        PostUrl = GetPostUrl(xml),
        Anon = GetInputValue(xml, "ANON"),
        T = GetInputValue(xml, "t")
    });

    var req3 = CreateWebRequest(prepare2.PostUrl, MethodType.Post);
    req3.AllowAutoRedirect = false;

    var res3 = await UploadValuesAsync(req3, new Dictionary<string, string>
    {
        {"ANON",prepare2.Anon},
        {"t",Uri.EscapeDataString(prepare2.T)}
    });

    return ConstructCookie(res3.Headers["Set-Cookie"]);
}

まあ見事にベッタベタに書かれているのが良く見えますねえ。このasync/awaitですが、非同期として扱うものにasyncを宣言したメソッドを用意。あとは同期のように書くだけ。yield returnのかわりにawait + XxxAsync と書くだけ。それで、まるで同期のように書けてしまいます。あまりにもお手軽。魔法のように。Silverlightのネック(そして初心者キラー)なところは、分かりづらい非同期でしたが、ウルトラC的に解決……。

この発表を見た時は普通にショックで、更に一日経って冷静に考えると更にショックで寝こむ勢いでしたね(笑) 正直なところ、被ります。単純な非同期の簡易化という点だけで考えればめっちゃ被ります。それはTaskとRxが被るよねー、Rxのほうが書きやすいっすよー、とか少し前に言っていた程度にはモロ被ります。実際問題Taskはあまり使いやすいとは言えないのですが、コンパイラサポートがガッツシ来てしまったら、それは話は別ですな。

勿論、RxのメリットはIEnumerable<T>や、イベントやタイマーなどの他のIObservable<T>とのシームレスな連携にもあるので、非同期を利用するシーンにおいてasync/awaitが完全に置き換えるもの、とは言いませんけれど。そして、私はそういった統合っぷりに魅力を感じているのでasync/awaitはあまり使わないかなー、という気がしてなくはないのですが、ちょっとその辺、判断が難しい。

さて、ところで両者を置き換えるのは非常に簡単です。戻り値がTask<T>かIObservable<T>か。RxではSelectManyだった位置にawaitを置く。それだけです。これは覚えておいて損はないかもです。そういう意味でも普通にかぶってるよな、とか思いつつ。awaitが分かればRx-SelectManyも簡単だし、その逆もまた然り。

SilverlightとWindows Phone 7

Silverlightは同期APIがないので、Rxで組むことで互換性が狙えます。このXboxInfoTwitCoreはフル非同期・フルRxなので、Silverlight/Windows Phone 7にも対応させ、られませんでした!えー。CookieExceptionが発生したり取れる最終的なCookieが何か違ったりと、Cookieが鬼門すぎて無理でした、とほほほ。マジワケワカラン。ええと、一応はWPF-Silverlight-WP7で完全なコード共有を実現する、という点もRxの強力な武器であり、使う理由になるとは思っています。それを示したかったのですが、うーむ。完全に同一なファイルで非同期としての挙動は問題なく取れているのですが、実際にデータが出せないとねえ……。カッコワルイ。

SgmlReader for Silverlight/Windows Phone 7

XboxInfoTwitCoreではHtml to Xml変換にSgmlReaderを使用しています。以前に紹介を書きましたが、これは本当に重宝します。ストリームとXElement.Loadの間に置くことで、不正なXMLである(ためXElement.Loadに渡すと例外が発声する)ネット上のHTMLをLinq to Xmlで扱えるようになります。しかし、元のコードベースが古いのと開発が活況とは言い難い状況であるために、SilverlightやWindows Phone 7に対応していません。大変困った。困ったので、Silverlight/Windows Phone 7で動くように少々コード弄ったところ、問題なく動いた!ので、こちらもbitbucketにあげときました。

neuecc / SgmlReader.SL / overview – Bitbucket

基本的にはデスクトップ版と全く同じ感覚で使えます。

var req = WebRequest.Create("http://google.com/");
req.BeginGetResponse(ar =>
{
    var res = req.EndGetResponse(ar);
    using (var stream = res.GetResponseStream())
    using (var sr = new StreamReader(stream))
    using (var sgmlReader = new SgmlReader { DocType = "HTML", CaseFolding = CaseFolding.ToLower, InputStream = sr })
    {
        var xml = XElement.Load(sgmlReader); // これ。
        Dispatcher.BeginInvoke(() => MessageBox.Show(xml.ToString()));
    }
}, null);

プロパティにURLを渡すと例外で落ちます(同期通信は許可されていません!)。というわけで、WebRequestなどから非同期でウェブからStreamを取り出して、それをInputStreamプロパティに渡すという形を取ってください。この制限は、SilverlightではXElement.Load("url")が許可されていないのと同じことです。

XboxInfoTwitの今後

ここまで見てくれた人がXboxInfoTwit本体の利用者とは思えません!のですが、とりあえず書きますと、コア部分は書き換わったので、あとはGUIというか設定画面を複数言語に対応させてCodePlexで公開。を目指しています。が、いかんせんGUIは難敵です……。今のXboxInfoTwitも一応WPFなんですが見よう見まねで組んだXAMLで汚いので、きっちりと綺麗に書き換えたい、のですが、それをやるにはあまりにもWPF/XAMLの知識がなさすぎる。そのため、今日明日でフル改装で公開!というわけにもいきません。もう少し時間がかかりそうです。

Twitterへの投稿に関しては、以前書いたReactiveOAuth(完全なRxベースのOAuthライブラリ)があるので、シームレスに統合出来そう(こちらも幾つか課題が溜まっているので更新しないと……)。あとは、今使ってる自動アップデートプログラムがタコな出来なので、これも作り直したいなあ。なんて思っていたり。

まとめ

ふつーの非同期で書いてネストしまくりで、こんなの書いてられないだろほれみろバーカバーカ、とか言ってみたかったのですが気力がなくてそれは断念。ともあれ、今ある非同期の厳しさへの現実解として、Rxはアリ、です。非同期処理が、かなり綺麗にLinqクエリとして溶け込む様は分かるのではないかと思います。C#5.0のasync/awaitと比較してみても、awaitの位置にSelectMany、というだけの話ですしね。そして何よりも、async/awaitは今日明日に出るわけじゃないのです! Visual Studio 2005 -> 2008 -> 2010のペースで考えるならば2013年。まだ2年も先です。

async/awaitがないから「しょうがなく」使う、ってスタンスもアレゲ。いえいえ、喜んで使うのです。Rxならではのメリットが、当然あります。ただの非同期処理の一本化というだけに終わらず、イベントやタイマーなど他のソースとの融合は魅力です。そう、そもそも、Rxの持つ側面は3つ。「Asynchronus」「Event」「Pull to Push」。ここ最近はAsynchronusのことしか書いてなかったですが、他の2つのほうも見逃せないというか、むしろRxは最初はLinq to Eventsとして紹介されていたぐらいだしEventのほうがメインですよ?

ところで話は全く変わりますが、Bitbucket使い始めました。バージョン管理は当然Mercurialです。Mercurialは非常にいいです。もうSubversionは使う気になれない。Team Foundation Serverは使ったことないので知らないけれど(あれはオールインワンなところに価値がある気がしつつ、まあ、重量級ですよね)Mercurialのサクサク感は、バージョン管理はしっくりこないんです!とかほざいていた私にガツーンとバージョン管理がないともう生きていけない!ちょっとしたプロジェクトもhg initするし!な心構えに変えさせる威力がありました。あとちょっとした変更でもローカルでコミット、な素敵さとか。

TortoiseHgは、若干UIの導線がイケてないとは思いつつ、まあまあ使いやすく(日本語化もデフォルトでされています←要環境変数にLANG=ja)、Visual Studio拡張のVisualHGも素敵な使い勝手。というか、UI部分はTortoiseHgを呼び出しているだけというシンプルさがいい。私はEclipse下ではMercurial Eclipseを使ってますが、こちらは独自にがっつし作りこんであって、それが今一つな使い勝手で。私はVisual Hgの方向性を支持します。

最後に、毎回嘘っぱちな次回予告ですが(SelectManyの解説をすると言い続けて半年以上経過)、ポーリングをイベントとしてRx化してPushで共通化とか何とかかんとか、を予定。引き続き題材はXboxInfoほげほげを使うつもりです。乞うご期待しない。

Prev | | Next

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive