Reactive Extensionsの非同期周りの解説と自前実装

最近id:okazukiさんが凄い勢いでRx解説を書いていて凄い!そして、刺激を受けますねー。というわけで、今回はRxの非同期周りの解説をします。今日の昼に非同期処理を行うメソッドの戻り値は全てIObservable<T>にしてしまえばいいんじゃないんだろうかを読んで、Twitterで少しAsyncSubjectについて書いたのでそれをまとめて……、と思ったらReactive Extensions入門 11「非同期処理用のSubject」ですって!早!私なんて一年かけて何も書いていやしなかったりするのに、この早さは本当に見習いたい、ごほごほ。

そんなわけかで、色々と被ってしまっているのですが、Rxの非同期実行についてまとめます。まず、Rxの非同期の起点はStart, ToAsync, FromAsyncPatternの3つ。Start,ToAsyncはFromAsyncPatternの簡易化みたいなものなので、実質は1つです。これらを使うと長さ1のIObservable<T>として非同期処理が扱えるわけです。ところで非同期処理が完了するまでの間に複数Subscribeしたらどうなるのだろう、または非同期処理が完了してしまった後にSubscribeしたらどうなるのだろう? Silverlightで実際に触って試してみてください。

Microsoft Silverlight の取得

ボタンだらけでイミフ? 解説します……。Rxの非同期系処理は全てAsyncSubjectを通ります。AsyncSubjectのOnNextが非同期で実行された値の戻り値を、OnCompletedが非同期処理の完了を示します。通常はOnNextとOnCompletedはワンセットで非同期処理です。というわけで、例えばSubscribeを二回押す→OnNextを一回押す→OnCompletedを一回押すで、非同期のIObservable<int>に対して二つSubscribe(Console.WriteLine)したということになり、1が二つ右側のログに表示されたはずです。続けてSubscribeを押す(非同期処理が完了した後にSubscribeした場合)とどうなるか?というと、1が追加されたはずです。

以前にHot vs ColdとしてIObservableの性質を少し取り上げました。ColdはSubscribeするとすぐに実行される。HotはSubscribeしても値が流れてくるまで待機する。非同期におけるIObservable<T> = AsyncSubjectは、つまり、両方の性質を持ちます。OnCompleted以前はHotで、値が流れてくる(非同期処理が完了する)まで待機される。OnCompleted以後はColdとなって、Subscribeするとすぐに値を流す。

何となくAsyncSubjectの中身が想像付いてきました?そう、非同期の実行結果である値をキャッシュしています。もし複数回OnNextされたらどうなるか、というと、これは最後の値だけが残ります。(一度Resetしてから)OnNextを連打してからOnCompletedを押して、Subscribeしてみてください。

非同期というとキャンセルはどうするの?というと、ありますよ!Subscribeの戻り値はIDisposable。イベントのRx化、FromEventの場合はイベントのデタッチでしたが、非同期で使う場合はキャンセルになります。例えばSubscribeを押してから、Disposeを押して(キャンセル!)、OnNext→OnCompleted(非同期処理完了)を押してみてください。ログに何も出てきません。非同期処理が完了する前にDisposeしたということで、Subscribeがキャンセルされた、ということです。では、続けて(OnCompleted完了後)Subscribeを押すと……、ログに値が表示されます。Disposeは処理自体をキャンセルするわけではなく、 Subscribeのキャンセルということですね。

そうそう、ResetボタンはSubscribeやDispose、AsyncSubjectの状態をリセットして初期化します。Clearボタンはログの消去になります。

AsyncSubjectの簡易実装

AsyncSubjectの実態が大体分かったので、次は自分で実装してみましょう!その前にSubjectって何ぞや、というと、IObservable<T>かつIObserver<T>。これはRxネイティブのイベントだと思ってください。詳しくはReactive Extensions入門 + メソッド早見解説表をどうぞ。

とりあえず実装したコードを。

public class MyAsyncSubject<T> : IObservable<T>, IObserver<T>
{
    bool isCompleted = false;
    T lastValue = default(T);
    readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    // OnCompleted済みなら即座にOnNext呼び出し、そうでないならListへAdd
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (isCompleted)
        {
            observer.OnNext(lastValue);
            observer.OnCompleted();
            return Disposable.Empty;
        }
        else
        {
            observers.Add(observer);
            // 正しくはキャンセルが可能なように、Disposeが呼ばれた際にListからremoveされるよう
            // ラップした特別なIDisposableを返す必要があるけれど、簡略化した例ということでその辺は省きます
            return Disposable.Empty;
        }

    }

    // 初回呼び出しの場合は最後の値で全てのobserverのOnNextとOnCompletedを呼ぶ
    public void OnCompleted()
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnNext(lastValue);
            o.OnCompleted();
        });
        observers.Clear();
    }

    // OnCompletedと同じ。これも呼ばれたらCompleted済みとなる
    public void OnError(Exception error)
    {
        if (isCompleted) return;
        isCompleted = true;
        observers.ForEach(o =>
        {
            o.OnError(error);
        });
        observers.Clear();
    }

    // Completed済みでなければキャッシュを置き換える
    public void OnNext(T value)
    {
        if (isCompleted) return;
        lastValue = value;
    }
}

あくまでこれは簡易化したものです。実際はこれより、もう少し複雑です。あとlockを省いているのにも注意。まあ、ちゃんとしたのが知りたい場合はリフレクタとキャッキャウフフしてくださいということで。コードの中身ですが、Silverlightのデモで見てきた通りの、比較的単純な作りになっています。OnCompleted前後のSubscribeの挙動なんて、コードで見たほうが一目瞭然で早いですね。

これの挙動がRxの非同期系の挙動です。もしAsyncSubjectではなくSubject(値のキャッシュなし)を使った場合は、非同期開始からSubscribeまでの間に完了してしまった場合、何も起きないことになってしまいます。キャッシュを取るというのは、非同期に最適な理に叶った、というか、少なくとも不都合は起きないような挙動になります。もし自前で非同期でIObservable<T>を返すようなメソッドを実装する場合は、必ずAsyncSubjectを使いましょう。ユーザーの期待する挙動を取らなければならないという、義務として。

FromAsyncPatternの簡易実装

ここまで来たら、ついでなのでFromAsyncPatternも実装してしまいましょう!AsyncSubjectがあれば簡単です。あ、こちらでも断っておくと、あくまで簡易実装であって実際のものとは若干異なります。

static class Program
{
    static Func<T, IObservable<TR>> MyFromAsyncPattern<T, TR>(Func<T, AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, TR> end)
    {
        return arg =>
        {
            var asyncSubject = new AsyncSubject<TR>(Scheduler.ThreadPool); // おや、引数に……

            // 引数を渡されたら、Subscribeを待たずBeginInvokeで非同期実行が始まります
            begin.Invoke(arg, ar =>
            {
                TR result;
                try
                {
                    result = end.Invoke(ar); // EndInvokeで結果を得て
                }
                catch (Exception error)
                {
                    asyncSubject.OnError(error);
                    return;
                }
                asyncSubject.OnNext(result); // OnNext!
                asyncSubject.OnCompleted();
            }, null);

            return asyncSubject.AsObservable(); // SubjectのOnNextなどを隠す(なので昔はHideというメソッド名でした)
        };
    }

    // ToAsyncはFunc/ActionのBeginInvoke,EndInvoke簡略化版です
    // というのは少し嘘です、実際はSchedulerを使うのであってBeginInvokeは使いません、詳しくはまたそのうち
    static Func<T, IObservable<TR>> MyToAsync<T, TR>(this Func<T, TR> func)
    {
        return MyFromAsyncPattern<T, TR>(func.BeginInvoke, func.EndInvoke);
    }

    // StartはToAsyncの引数無しのものを即時実行というものです
    // Func<T>のみのFromAsyncPatternを作っていないので、Rx本来のToAsyncで書きます
    static IObservable<T> MyStart<T>(Func<T> func)
    {
        return Observable.ToAsync(func).Invoke(); // ここで即座にInvoke
    }

    static void Main(string[] args)
    {
        // *が引数の分だけ並ぶという関数があったとする
        Func<int, string> repeat = i => new String('*', i);

        var obs = MyFromAsyncPattern<int, string>(repeat.BeginInvoke, repeat.EndInvoke)
            .Invoke(3); // Subscribe時ではなく、Invokeした瞬間に非同期の実行は開始されていることに注意

        obs.Subscribe(Console.WriteLine); // ***
        Console.ReadKey();
    }
}

FromAsyncPatternは引数の多さや引数のイミフさにビビりますが、デリゲートのBeginInvoke, EndInvokeに合わせてやるだけです。こんなの私もソラでは書けませんよー。さて、今回は引数を一つ取るFromAsyncPatternを定義してみました。戻り値がFunc<T, IObservable<TR>>なのは、引数を一つ取ってIObservable<T>にする、ということです。

中身は割と簡単で、とりあえずBeginInvokeして、EndInvokeを待たずに(そりゃ非同期なので当然だ)とりあえずAsyncSubjectを戻します。つまり、これでIObservableとしてはHotの状態。そして何らかの処理が終えたらEndInvokeで戻り値を得て、AsyncSubjectのOnNext, OnCompletedを呼んでやる。これでSubscribeされていたものが実行されて、ついでにIObservableとしてColdになる。これだけ。意外とシンプル。

ついでのついでなのでObservable.ToAsyncとObservable.Startも定義してみました。ToAsyncはFunc/Actionを簡単にRx化するもの。FromAsyncPatternだとジェネリックの引数を書いたりと、何かと面倒くさいですから。StartはToAsyncを更に簡略化したもので、引数なしのものならInvoke不要で即時実行するというもの。

実行開始タイミングについて

FromAsyncPatternは(ToAsync/Startも)Invokeした瞬間に非同期実行が始まります。 FromAsyncPattern().Invoke().Subscribe() といったように、直接繋げる場合は気にする必要も特にないかもですが、一時変数などに置いたりする場合などは、Subscribeされたときに初めて非同期実行が開始されて欲しい、と思ったりあるかもですね。そんな場合はDeferを使います。

Microsoft Silverlight の取得

// 100を返すだけのどうでもいい関数
Func<int> func = () =>
{
    Console.WriteLine("fire");
    return 100;
};

var start = Observable.Start(func);
var defer = Observable.Defer(() => Observable.Start(func));
var prune= Observable.Defer(() => Observable.Start(func)).Prune();

StartButton.Click += (sender, e) =>
{
    start.Subscribe(Console.WriteLine);
};

DeferButton.Click += (sender, e) =>
{
    defer.Subscribe(Console.WriteLine);
};

var isConnected = false;
ReplayButton.Click += (sender, e) =>
{
    if (!isConnected) { prune.Connect(); isConnected = true; }
    prune.Subscribe(Console.WriteLine);
};

何もボタンを押さなくてもログにfireと出てしまっています。Observable.Startのせいなわけですががが。そんなわけで、Deferを使うとSubscribeまで実行を遅延することができます。ところで注意なのが、どちらもIObservable<T>なのですが、StartはColdとしてキャッシュされた値を返し続けるのに対して、Deferは中の関数を再度実行します。もしキャッシュしたい場合は、Pruneを使うといいでしょう。Pruneはこのブログでも延々と出してきた値の分配のためのPublishの親戚で、Connect後は最後の値をキャッシュして返し続けるという、AsyncSubjectと同じ動作をします(というか中身がAsyncSubjectなのですが)。この辺の使い分けというのもヤヤコシイところですねえ、そもそもPruneっていうメソッド名がイミフ……。

まとめ

Linq to Objectsで、最初分からなかったんですよ、yield returnなどで返される遅延評価としてのIEnumerable<T>と、配列(これもIEnumerable<T>ではある)の違いが。初めてLinqを知った後、半年ぐらいは分からないままだった(C#歴も半年でしたが)。同じインターフェイスなのに、状態としては、ちょっと違う。こういうのって結構分かりづらくて、躓いてしまうところです。

Rxの厄介なところは、IObservable<T>が一つで色々な状態を持ちすぎ。HotとColdの違いもある上に、更には混じり合った状態まである、Deferと非Deferも、外からだけだと全く区別がつかない。もう分かりづらいったらない。これに関しては一個一個丁寧に見ていくしかない、かな。今回はRxにおける非同期を徹底的に解剖してみました。一つ一つ、丁寧に。Rxも徐々に盛り上がりつつあるようなので、これからも、私なりに出来る限りに情報を発信していけたらと思います。

ところで、凄くシンプルなんですよね、Rxって。何を言ってるんだ?って話ですが、ええと、ほら、あくまでも「簡易」だからってのもありますが、実装のコード行数も少ないし全然難しいことやってないという。Linq to Objectsもそれ自体は凄くシンプルで、シンプルなのにとんでもなく強力という。それと同じで。Rx触ってて、内部をリフレクタでちょろちょろと見てて、本当に驚く。シンプルなのだけど、自分じゃ絶対書けないし思いつけないしっていう。悔しいし、そして、憧れるところですねえ。

特に魔法もなく、素直に中間層厚めな実装は、パフォーマンスは(もがもがもがもが)。何か言いました?

ReactiveOAuth - Windows Phone 7対応のOAuthライブラリ

Windows Phone 7用のOAuth認証ライブラリを作成し、公開しました。他のライブラリに比べての特徴は、非同期APIしか用意されていないWindows Phone 7での利用を念頭に置き、Reactive Extensions(Rx)をフル活用しているという点です。そもそもWindows Phone 7対応のOAuthライブラリが少ないので、特にWindows Phone 7開発者は是非どうぞ。

Windows Phone 7専用というわけでもないですが(Console/WPFで使えるよう、DLLとサンプルコードを用意してあります)、Windows Phone 7以外では別途Rxのインストールが必要です。Windows Phone 7環境では最初から入っているのでRxのインストール不要。Silverlight用は、コードコピペで別プロジェクト立てるだけで動くと思うんですが、確認取るのが面倒だった(クロスドメインがー)ので、そのうちに。

ところでそもそもRxって何、という人は Reactive Extensions入門 + メソッド早見解説表 をどうぞ。

何故Rxを使うのか

ReactiveOAuthの説明に入る前に、何故Rxを使うのかということを少し。理由は簡単で、非同期プログラミングは大変だから。論より証拠で、WebRequestのPOSTを全てBegin-Endパターンで構築してみましょう。

var req = (HttpWebRequest)WebRequest.Create("http://google.co.jp/"); // dummy
req.Method = "POST";
req.BeginGetRequestStream(ar =>
{
    var stream = req.EndGetRequestStream(ar);
    stream.BeginWrite(new byte[10], 0, 10, _ar =>
    {
        stream.EndWrite(_ar);
        req.BeginGetResponse(__ar =>
        {
            var res = req.EndGetResponse(__ar);
            var resStream = res.GetResponseStream();
            var s = new StreamReader(resStream).ReadToEnd();
            Console.WriteLine(s);
        }, null);
    }, null);
}, null);

コールバックの連鎖とはこういうことであり、大変酷い。冗談のようだ。こんなに面倒なら、こんなに苦しいのなら、非同期などいらぬ!しかし現実問題、Silverlightには、Windows Phone 7には、非同期APIしか搭載されていません。Begin-Endが強要される世界。理由は分かる。ユーザーエクスペリエンスの為でしょう。モバイル機器は性能が貧弱だから重い処理はいけない、と言うけれど、大事なのは処理が重いか否かではなく、体感。UIを止めさえしなければ、不快感を与えることはない。だから、強制的に非同期操作のみとした。

けれど、それで開発難しくなったり面倒になってしまってはいけない。非同期処理が簡単に出来れば……。その答えが、Rx。「簡単に出来るから」「開発者が幸せで」「全てが非同期になり」「ユーザーも幸せになる」。楽しい開発って大事だよね。本当にそう思っていて。開発者が不幸せで、コードに愛がなければ良いものなんて生まれやしないんだって、本当に思っていて。

// Rxならこう書ける(...AsObservableは拡張メソッドとして別途定義)
req.GetRequestStreamAsObservable()
    .SelectMany(stream => stream.WriteAsObservable(new byte[10], 0, 10))
    .SelectMany(_ => req.GetResponseAsObservable())
    .Select(res => new StreamReader(res.GetResponseStream()).ReadToEnd())
    .Subscribe(Console.WriteLine);

// SelectManyが苦手ならばクエリ構文という手もあります
var query = from stream in req.GetRequestStreamAsObservable()
            from _ in stream.WriteAsObservable(new byte[10], 0, 10)
            from res in req.GetResponseAsObservable()
            select new StreamReader(res.GetResponseStream()).ReadToEnd();
query.Subscribe(Console.WriteLine);

ネストが消滅して、メソッドチェーンの形をとって非同期が同期的のように書けるようになります。利点は他にもあって、様々な操作(合成・射影・抽出・待機などなど)が可能になる、ということもありますが、それはまたそのうち。

ところで、最初のコールバックの連鎖って何だか見覚えのあるような雰囲気ありませんか?JavaScriptで。そう、XmlHttpRequestであったりsetTimeoutであったりの連鎖と同じです。RxのJavaScript版、RxJSではJavaScriptでのそれらのネストを殺害することが可能です。興味があれば、そちらも是非試してみてくださいな。

ReactiveOAuthとは?

OAuthであっても、ようするにWebRequestなわけです。GET/POSTしてResponseを取ってくるということにかわりはない。そんなわけで、ネットワーク通信してResponseを取ってくる部分を片っ端から全てIObservableにしたのがReactiveOAuthです。Rxにべったり依存したことで、良くも悪くも、他のOAuthライブラリとは全く毛色の違う仕上がりになっています。

とはいっても、とにかく「簡単に書けること」にこだわりを持ってデザインしたので、利用自体は簡単ですし、Rxの知識もそんなに必要ありません。普通に取得するなら、SelectとSubscribeしか使わないので、全然安心です!まあ、できれば、これが入り口になってRxの世界を知ってもらえると嬉しいなあ、という皮算用もあったりですが。

使いかた1. AccessToken取得

デスクトップアプリケーション上でのOAuthの仕組みを簡単に説明すると、RequestToken取得→認証URL表示→PINコード入力→PINコード+RequestTokenを使ってAccessToken取得。という形になっています。まず、ユーザー名・パスワードの代わりとなるものはAccessTokenです。これを取得して、API呼び出しの時に使い、また、パスワード代わりに保存するわけです。そのRequestTokenやPINコードはAccessTokenを取得するための一時的な認証用キーということです。

AccessToken取得までにはOAuthAuthorizerクラスを使います。

// グローバル変数ということで。
const string ConsumerKey = "consumerkey";
const string ConsumerSecret = "consumersecret";
RequestToken requestToken;
AccessToken accessToken;

private void GetRequestTokenButton_Click(object sender, RoutedEventArgs e)
{
    var authorizer = new OAuthAuthorizer(ConsumerKey, ConsumerSecret);
    authorizer.GetRequestToken("http://twitter.com/oauth/request_token")
        .Select(res => res.Token)
        .ObserveOnDispatcher()
        .Subscribe(token =>
        {
            requestToken = token;
            var url = authorizer.BuildAuthorizeUrl("http://twitter.com/oauth/authorize", token);
            webBrowser1.Navigate(new Uri(url)); // navigate browser
        });
} 

GetRequestTokenとBuildAuthorizeUrlを使い、RequestTokenの取得と、内蔵ブラウザに認証用URLを表示させました。

private void GetAccessTokenButton_Click(object sender, RoutedEventArgs e)
{
    var pincode = PinCodeTextBox.Text; // ユーザーの入力したピンコード

    var authorizer = new OAuthAuthorizer(ConsumerKey, ConsumerSecret);
    authorizer.GetAccessToken("http://twitter.com/oauth/access_token", requestToken, pincode)
        .ObserveOnDispatcher()
        .Subscribe(res =>
        {
            // Token取得時のレスポンスには、Token以外に幾つかのデータが含まれています
            // Twitterの場合はuser_idとscreeen_nameがついてきます
            // ILookup<string,string>なので、First()で取り出してください
            UserIdTextBlock.Text = res.ExtraData["user_id"].First();
            ScreenNameTextBlock.Text = res.ExtraData["screen_name"].First();
            accessToken = res.Token; // AccessToken
        });
}

画像は認証が全て終わった時の図になっています。RequestTokenとPinCodeをGetAccessTokenメソッドに渡すだけです。これでAccessTokenが取得できたので、全ての認証が必要なAPIにアクセス出来るようになりました。

使いかた2. APIへのGet/Post

ここからはConsoleApplicationのサンプルコードで説明。

var client = new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    Url = "http://api.twitter.com/1/statuses/home_timeline.xml",
    Parameters = { { "count", 20 }, { "page", 1 } },
    ApplyBeforeRequest = req => { req.Timeout = 1000; req.UserAgent = "ReactiveOAuth"; }
};
client.GetResponseText()
    .Select(s => XElement.Parse(s))
    .Run(x => Console.WriteLine(x.ToString()));

IObservable<T>連鎖の最後のメソッドとしてRunを使うと、同期的になります。通常はSubscribeで非同期にすると良いですが、コンソールアプリケーションなどでは、同期的な動作のほうが都合が良いでしょう。

OAuthClientを作成し、オブジェクト初期化子でURL、パラメータ(コレクション初期化子が使えます)を設定したら、GetResponseTextを呼ぶだけ。あとはIObservable<string>になっているので、Linqと同じように操作していけます。

ApplyBeforeRequestではリクエストが発行される前に、生のHttpWebRequestが渡されるので(!)、TimeoutやUserAgentなど細かい設定がしたい場合は、ここにラムダ式を埋めてください。

では、POSTは?

new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    MethodType = MethodType.Post,
    Url = "http://api.twitter.com/1/statuses/update.xml",
    Parameters = { { "status", "PostTest from ReactiveOAuth" } }
}.GetResponseText()
    .Select(s => XElement.Parse(s))
    .Run(x => Console.WriteLine("Post Success:" + x.Element("text")));

POSTの場合はMethodTypeにMethodType.Postを指定します(デフォルトがGETなので、GETの場合は指定の省略が可)。それ以外はGETと同じです。Urlとパラメータ指定して、GetResponse。

ストリーミングもいけます

OAuthClientには3つのメソッドがあります。GetResponseは生のWebResponseを返すもので細かい制御をしたい時にどうぞ。GetResponseTextはStreamReaderのReadToEndで応答をテキストに変えたものを返してくれるもので、お手軽です。そのままXElement.Parseとかに流すと楽ちん。そして、GetResponseLinesはReadLineで一行ずつ返してくれるもの、となっています。GetResponseTextとGetResponseLinesは型で見ると両方共IObservable<string>なため戸惑ってしまうかもですが、前者は流れてくるのは一つだけ、後者は行数分だけ、となります。

GetResponseLinesはStreamingAPIで使うことを想定しています。とりあえず、WPF用のサンプルを見てください。

var client = new OAuthClient(ConsumerKey, ConsumerSecret, accessToken)
{
    Url = "http://chirpstream.twitter.com/2b/user.json"
};
// streamingHandleはIDisposableで、これのDisposeを呼べばストリーミング停止
streamingHandle = client.GetResponseLines()
    .Where(s => !string.IsNullOrWhiteSpace(s)) // filter invalid data
    .Select(s => DynamicJson.Parse(s))
    .Where(d => d.text()) // has text is status
    .ObserveOnDispatcher()
    .Subscribe(
        d => StreamingViewListBox.Items.Add(d.user.screen_name + ":" + d.text),
        ex => MessageBox.Show(ReadWebException(ex))); // エラー処理

ストリーミングAPIを使うにあたっても、何の面倒くささもなく、至って自然に扱えてしまいます!Dynamicを使ったJsonへの変換にはDynamicJson - C# 4.0のdynamicでスムーズにJSONを扱うライブラリを、また、RxとStreamingAPIとの相性については過去記事C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensionsを見てください。

そうそう、それとネットワーク通信で起こったエラーのハンドリングが、SubscribeのOnErrorに書くだけで済むというのもRxを使って嬉しいことの一つです。

実装について

構造は全体的に@ugaya40さんのOAuthAccessがベースになっています(パク……)。そもそもにTwitterTL to HTMLのOAuth対応した時に、OAuthAccessを使って、あー、OAuth周りってRxに乗せると快適になるなー、とか思ったのが作ろうとした最初の動機だったりもして。非常に感謝。

WebClient風の、認証がOAuthなだけのベタなWebRequestラッパーという感じなので、特別なところはありません。インターフェイスとかなくて、本当にただのベタ書き。特に奇をてらってるところはないんですが、ストリーミングAPIで使うために用意したGetResponseLinesは個人的には笑えたり。

var req = WebRequest.Create(Url);
return Observable.Defer(() => req.GetRequestStreamAsObservable())
    .SelectMany(stream => stream.WriteAsObservable(postData, 0, postData.Length))
    .SelectMany(_ => req.GetResponseAsObservable());
    .Select(res => res.GetResponseStream())
    .SelectMany(s => Observable.Using(() => new StreamReader(s), sr => Observable.Repeat(sr)))
    .TakeWhile(sr => !sr.EndOfStream)
    .Select(sr => sr.ReadLine());

うわー……。利用者としては、ただのIObservable<string>としか見えないので、前段階でこんなにチェーンが繋がってるだなんてこと、気にする必要は全くないんですけどねー。これがベストな書き方だとは全然思えないので、誰かアドバイス欲すぃです。

まとめ

RxというとLinq to Events、イベントのLinq化という方向が目につきますが、今回は非同期のLinq化のほうにフォーカスしました。何というか、実に、素晴らしい!asynchronus programming is hard、と、思っていた時もありました。今や私達にはRxがある。恐れることはなにもない。

今回WPFとWindows Phone 7(Silverlight)でサンプルを作ったのですが、コードがコピペで、完全な互換性もって動いちゃうんですね。WPFで書いてSilverlightに持っていく時に、ああ、BeginGetResponseに書き換えなきゃ…… みたいなことが起こらない。最初から非同期で統一することで、全部ライブラリがネットワーク周りを吸収してくれる。非同期→同期にするのも簡単だし(RunやToEnumerableを使えばいい)、そもそも、Rxの土台に乗っている方が、普通に同期的に書くよりもむしろ楽だったりします。

個人的には、Windows Phone 7のローンチに粗製乱造Twitterアプリを送り込むという野望があるんですがねえー。いや、粗製乱造にするつもりはないんですが、機能をザックリと削って、一般性を無視して「私が使うシチュエーションで私が使いやすいような」アプリを出したいなーと思ってます。構想はあって、そこそこ尖った個性ある内容になる予定なので一部の人がフィットしてくれればいいな、と。多くの人に目に触れては欲しいのでローンチのタイミングは外したくない。問題はWorldのローンチタイミングに合わせてもJapanだと実機がないってことですね!開発機欲しい(お金は払いますからどうかー)。

と、思ってたのですがサンプル作りで、あきらかーなXAML知識のなさ(ていうか何も知りません)が露呈したので、ローンチに間にあわせるとか寝言すぎるのですが。せめてJapanのローンチまでにはそれなりな技量を身につけたいところです。

C#から使うMicrosoft Ajax MinifierでのJavaScriptコード圧縮

いつのまにか独立してCodePlex入りしているMicrosoft Ajax Minifier。その名の通り、Microsoft謹製のJavaScript/CSSの圧縮/整形ツールです。発表からすぐに、GoogleのClosure Compilerが出てしまったのですっかり影も薄く、ていうか名前悪いよね、Ajax関係ないじゃん……。CodePlexのプロジェクトページも何だか活気ない寂れた感じで、あーあ、といった趣。良いツールなんですけどねえ。

コマンドライン版とDLL版が用意されていて、コマンドラインの、単体で実行可能なexeの解説は【ハウツー】Microsoft Ajax MinifierでJavaScriptを縮小化しようで解説されているので、DLL版の使い方を簡単に解説します。

C#が使える人ならばDLL版のほうが遥かに使いやすかったりして。設定をダラダラと引数を連ねるのではなく、オブジェクト初期化子とenumで出来るので、そう、IntelliSenseが効くわけです。ヘルプ要らずで書けるのは快適。Visual Studioは偉大だなぁ。コマンドライン, PowerShellを捨て、VSのConsoleApplicationを常に立ち上げよう。実際、ちょっとしたテキスト処理とかC#で書いちゃうんですよねえ、私。Linqが楽だから。

Minifierオブジェクトを作ってMinifyJavaScript(CSSの場合はMinifyStyleSheet)メソッドを実行するだけなので、コード見たほうが早いかな。例としてlinq.jsをlinq.min.jsに圧縮します。そして、解析されたWarningをコンソールに表示します。ただの圧縮だけでなく、このコード分析機能が実にありがたい。

using System;
using System.IO;
using Microsoft.Ajax.Utilities;

class Program
{
    static void Main(string[] args)
    {
        var minifier = new Minifier
        {
            WarningLevel = 3, // 最大4。4はウザいのが多いので3がいいね。
            FileName = "linq.js" // エラー表示用に使うだけなので、なくてもいい
        };

        var settings = new CodeSettings // CSSの場合はCSSSettings
        {
            LocalRenaming = LocalRenaming.CrunchAll,
            OutputMode = OutputMode.SingleLine, // MultiLineにすると整形
            IndentSize = 4, // SingleLineの時は意味なし
            CollapseToLiteral = true,
            CombineDuplicateLiterals = true
            // その他いろいろ(それぞれの意味はIntelliSenseのsummaryを読めば分かるね!)
        };

        var load = System.IO.File.ReadAllText(@"linq.js"); // 読み込み

        // 最後の引数はグローバル領域に何の変数が定義されてるか指定するもの
        // 別になくても構わないんだけど、warningに出るので多少は指定しておく
        var result = minifier.MinifyJavaScript(load, settings, "Enumerable", "Enumerator", "JSON", "console", "$", "jQuery");

        // Warningで発見されたエラーはErrorsに格納されてる
        foreach (var item in minifier.Errors)
        {
            Console.WriteLine(item);
        }

        File.WriteAllText("linq.min.js", result); // 書き出し
        Console.WriteLine();
        Console.WriteLine("Original:" + new FileInfo("linq.js").Length / 1024 + "KB");
        Console.WriteLine("Minified:" + new FileInfo("linq.min.js").Length / 1024 + "KB");
    }
}

二つエラー出てますね。変数がvarで宣言されてないそうです。つまり、コードがその行通ると変数がグローバルに置かれてしまいます。確認したら、OrderBy実行したらsortContextという変数がグローバルに飛んでしまってました。ええ、つまりバグと言っていいです。……。あうあう、すみません、直します。これからは必ずAjax Minifierの分析を通してから公開しよう。どうでもよくどうでもよくないんですがlinq.jsもアップデートすべきことが結構溜まってるので近いうちには……。

というわけで、Ajax Minifierの偉大さが分かりました。素晴らしい!みんな使おう!DLLの形になっているので、T4 Templateと混ぜたりなどもしやすく、夢が膨らむ。minifyだけでなく、JSのパーサーとして抽象構文木を取り出したりなども出来ます。

WebでGUIでサクッと実行できる環境とかあるといいと思うんですよね。それこそDLLになっているので、Silverlightならすぐ作れるわけですし。誰か作ればいいのに。いや、お前が作れよって話なんですが。Ajax Minifierが出てすぐの頃に作ろうとしてお蔵入りしちゃったんだよね、ふーみぅ。そうそう、あと、VSと統合されて欲しい!このWarningは大変強力なので、その場でVSのエラー一覧に表示して欲しいぐらい。今のVSのエラー表示はWarningLevelで言うところの1ぐらいで警告出してくれて、それはそれで良い感じなんですが、やっぱこう、もっとビシッと言って欲しいわけですよ。

ワンクリックで整形/圧縮してくれるのとエラー一覧表示が出来るようなVisual Studio拡張を誰か作って欲しいなあ。いやー、本気で欲しいので、とりあえず私も自分で作ってみようと思い、VS2010 SDKは入れてみた。気力が折れてなければ来月ぐらいには公開、したい、です、予定は未定のやるやる詐欺ですががが。

TwitterTLtoHTML ver.0.2.0.0

TwitterのBasic認証の有効期限が8月いっぱいだったのですよね。それは知っていて動かなくなるのも知っておきながら、実際に動かなくなるまで放置していたという酷い有様。結果的に自分で困ってました(普通に今も使っていたので)。というわけで、TwitterTL to HTMLをOAuth対応にしました。認証時のアプリケーション名がTL to HTMLなのですが、これは「Twitter」という単語名をアプリケーション名に入れられないためです。面倒くさいねえ。

OAuth認証は@ugaya40さんのOAuthAccessを利用しています。XboxInfoTwitでは自前のものを使っていたのですが、どうしょうもなく酷いので自分で作り直すかライブラリを使うか、でズルズル悩んで締切りを迎えたのでライブラリ利用に決定ー。使いやすくて良いと思います。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive