C#とLinq to JsonとTwitterのChirpUserStreamsとReactive Extensions

何か盛り沢山になったのでタイトルも盛り沢山にしてみました。SEO(笑)

最近話題のTwitterのChirpUserStreamsを使ってみましょー。ChirpUserStreamsとは、自分のタイムラインのあらゆる情報がストリームAPIによりリアルタイムで取得出来る、というもの。これを扱うには、まずはストリームをIEnumerable化します。そのまま扱うよりも、一度IEnumerable化すると非常に触りやすくなる、というのがLinq時代の鉄則です。C#でのストリームAPIの取得方法は以前にも記事にしましたが、かなり汚かったのでリライト。WebClient愛してる。

public static IEnumerable<XElement> ConnectChirpStream(string username, string password)
{
    const string StreamApiURL = "http://chirpstream.twitter.com/2b/user.json";

    var wc = new WebClient() { Credentials = new NetworkCredential(username, password) };
    using (var stream = wc.OpenRead(StreamApiURL))
    using (var reader = new StreamReader(stream))
    {
        var query = reader.EnumerateLines() // 1行に1JSONなのです
            .Where(s => !string.IsNullOrEmpty(s)) // 空文字が来るので除去
            .Select(s => // 文字列JSONからXElementへ変換
            {
                using (var jsonReader = JsonReaderWriterFactory.CreateJsonReader(Encoding.Default.GetBytes(s), XmlDictionaryReaderQuotas.Max))
                    return XElement.Load(jsonReader);
            });

        foreach (var item in query) yield return item; // 無限列挙
    }
}

// StreamReaderの補助用拡張メソッド(あると大変便利)
public static IEnumerable<string> EnumerateLines(this StreamReader streamReader)
{
    while (!streamReader.EndOfStream)
    {
        yield return streamReader.ReadLine();
    }
}

中々シンプルに書けます。C#もLLと比べても全然引けを取らないでしょう(誰に言ってる)。ChirpUserStreamsはJSONでしか取れないのですが、StreamAPIではXmlよりもJSONのほうが使い易いので、JSONだけでも全然問題ありません。とはいえ、C#でJSONはちょっと扱いにくいんだよねー?と思いきや、意外に普通に標準ライブラリだけで何とかなりました。

参照設定にSystem.Runtime.Serializationを加えます(注:VS2008ではSystem.ServiceModel.Webを参照設定に加えてください)。この参照で通常使うのはDataContractJsonSerializerだと思いますが、もう一つ、JsonReaderWriterFactoryというクラスが用意されていて、このReaderはJSONをXmlReaderとして扱うことが出来ます。この図式は以前のLinq to HtmlのためのSGMLReader利用法と同じです。そのままXElementに流しこめば、JSONをXmlとして扱って、Linq to Jsonが成り立ちます。

さて、StreamAPIなのでEndOfStreamは来ない。繋ぎっぱなし、つまりはConnectChirpStreamメソッドは無限リストになります。この中には色々な情報が混ぜこぜになって来ます。投稿した、という他にも、誰かをフォローした、何かをふぁぼった、何かをリツイートした、などなどなどなど。クライアントソフト作るなら、当然どの情報も漏れ無く扱いたいわけですが、どうしましょう?foreachでグルッと回して、ifで分ける、しか、ない、かもですね? しかしそれは敗北です。退化です。foreachを使ったら負けだと思っている。

ところで突然に、今のところ思うReactive Extensions for .NET (Rx)を使うメリットは3つ。「複雑になりがちな複数イベントの合成」「同じく複雑になりがちな非同期処理のLinq化」そして、「列挙の分配」。従来型のLinqでは、一回の列挙には一個の処理しか挟めませんでした。例えば、MaxとCountを同時に取得する方法はなかった。MaxとCountを別々に二度列挙するか、または旧態依然なやり方、つまりforeachでグルグルと回してMaxとCountを手動で計算するかしかなかった。それはIEnumerableがPullモデルなためで、PushモデルのIObservableならば、出来ないこともない。

では、Rxでこのストリームを分配してみましょう。

static void Main(string[] args)
{
    // IEnumerableをIObservableに変換し、Publish(Connectするまで列挙されない(ので分配が可能になる))
    var connecter = ConnectChirpStream("username", "password")
        .ToObservable()
        .Publish();

    // 1件目は必ず自分のフレンドのIDリストが来るらしいっぽいのでまるっと保存
    HashSet<int> friendList;
    connecter.Take(1).Subscribe(x => friendList = new HashSet<int>(
        x.Element("friends").Elements().Select(id => (int)id)));

    // どんなのが来るのかよく分からないのでモニタ用にテキストにまるっと保存
    var sw = new StreamWriter("streamLog.txt") { AutoFlush = true };
    connecter.Subscribe(x => sw.WriteLine(x));

    // userがあるなら普通の投稿(ってことにしておく)
    connecter.Where(x => x.Element("user") != null)
        .Select(x => new
        {
            Text = x.Element("text").Value,
            Name = x.Element("user").Element("screen_name").Value
        })
        .Subscribe(a => Console.WriteLine(a.Name + ":" + a.Text));

    // favoriteとかretweetは "event":"favorite" というJSONが来る
    var events = connecter.Where(x => x.Element("event") != null);

    // favoriteの場合の処理
    events.Where(x => x.Element("event").Value == "favorite")
        .Subscribe(x => Console.WriteLine(x)); // favorite用の何か処理
    // retweetの場合の処理
    events.Where(x => x.Element("event").Value == "retweet")
        .Subscribe(x => Console.WriteLine(x)); // retweet用の何か処理


    // 同期か非同期かは、ToObservableの引数で変わる。デフォルトは同期
    // Scheduler.ThreadPoolを引数に入れるとThreadPoolで非同期になる
    connecter.Connect(); // 列挙開始
}

ID一覧取得、テキスト保存、投稿時処理、Fav時処理、リツイート時処理の5つへの分配が非常にスマートに出来ました。ToObservable、Publish、Connect。たったこれだけで一つストリームを複数に分配することが出来ます。普通にそれぞれをWhereだのSelectだの、独立してLinqでコネコネ出来ました。で、何が嬉しいかっていうと、それぞれが完全に独立していて見やすいってのは勿論あります。あと、部品化されてるので外部に分割しやすくなるんですね、物凄く。組み合わせたりもしやすいし。

結論

Rxヤバい。というわけで、みんなRx触ろう! .NET Framework 4.0ではRxで使うIObservableとIObserverインターフェイスが搭載されています。インターフェイスだけでどうすんだよボケ、っていうと、実際のところどうにもなりませんね、たはー。それでもインターフェイスだけ先行搭載ということは、RxはDevLabs内だけで終わる実験的プロジェクトではなく、必ず標準搭載するから安心しろよ!というメッセージだと受け取ることにしました。きっと.NET Framework 4.0 SP1には標準搭載されます。される、と、いいなあ。ちなみにRxは初期の頃と結構変わってますし、まだ変わるかも。でも、だからこそ、それに付き合うのも楽しいってものですよ?

ああ、あと、ChirpUserStreamsもヤバいですね。リアルタイムでゴリゴリ迫ってくる感覚は素敵というか、なんか別次元のメディアになった感じでもあります。今、新規にTwitterクライアント作るならStreamAPI完全対応すれば差別化出来て良いですね!私は作りませんが。ただ、ストリームAPIとRxは相性良いと思うので、ストリームAPI時代の到来と同時にRx時代も到来!する、かなあ?

ToDo

linq.jsがようやく一段落したので、Rxの紹介もまたやっていきたいですねー(すみません、かなり長いこと放置していて)。ToObservableによる列挙の分配は中々に強烈なので、linq.jsとRxJSを橋渡しするようなコードというかJSというかも用意したいなあ(軽く作ってみたんですが、RxJSでもPublishで分配出来て中々に威力ありそうでした)。うーん、考えてみるとやることはいっぱいあるねえ。適当に待っていてください。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive