Rx(Reactive Extensions)を自前簡易再実装する

という表題でUstreamやりました。Reactive Extensions for .NET (Rx)のSelect, Where, ToObservable, FromEventを実装することで、挙動を知ろうという企画。結果?酷いものです!

Shift+Alt+F10はお友達。それにしたってぐだぐだ。想像以上に頭が真っ白。セッションやライブコーディングしてる人は凄いね、と実感する。プレゼンどころか人と話すのも苦手です、な私には敷居が高かった。とにかく説明ができない。デバッガで動かせば分かりやすいよねー、なんてやる前は思ってたんですが、人がデバッガ動かしてるの見ててもさっぱり分かりやすくないよ!ということに途中で気づいて青ざめる。

まあ、こういうのも経験積まないとダメよね、と考えると、リスクゼロ(見てくれた人には申し訳ないですが)で練習出来るので、これからもネタがあればやっていきたいとは思います。反省は活かして。ネタはあまりないのでリクエストあればお願いします。Ustreamの高画質配信については、去年に書いた高画質配信するためのまとめ記事が自分で役に立ったぜ、経験が活きたな、的な。私自身の環境はちょっと、というかかなり変わったのですが、配信の基本的部分に関しては今も昔も(といっても1年前か)変わってなかったね。

さて、そんなUstreamはともかくとして、Rxの基本的な拡張メソッド「Select, Where」と、基本的な生成メソッド「ToObservable, FromEvent」を自前で実装してみる/デバッガで追ってみましょう。自分の手で動かして追うと理解しやすくなります。なので、以下に出すソースはコピペでもいいので、実際にVisualStudio上で動かしてもらえればと思います。

IEnumerableで考える

IObservableの拡張メソッド実装、の前に復習を兼ねてIEnumerableの拡張メソッドを実装してみましょう。

public static IEnumerable<TR> Select<T, TR>(IEnumerable<T> source, Func<T, TR> selector)
{
    foreach (var item in source)
    {
        yield return selector(item);
    }
}

恐ろしく簡単です。こんなにも簡単に書けるのは、yield returnのお陰。裏では、コンパイラが自動で対応するIEnumerable, IEnumeratorを生成してくれます。もしこれを教科書通りに自前で書くとしたら

public static IEnumerable<TR> Select<T, TR>(IEnumerable<T> source, Func<T, TR> selector)
{
    return new SelectEnumerable<TR>(); // 本当は引数も必要ですが省略
}

class SelectEnumerable<T> : IEnumerable<T>
{
    public IEnumerator<T> GetEnumerator()
    {
        return new SelectEnumerator<T>();
    }
    // 以下略
    // IEnumerator IEnumerable.GetEnumerator()
}

class SelectEnumerator<T> : IEnumerator<T>
{
    // Current, Dispose, MoveNextが必要ですが略
}

ああ、長い。やってられない。こんなものがオブジェクト指向だなどと言うならば、クソったれだと唾を吐きたくなる。そこで、AnonymousHogeパターンを用いれば……

public static IEnumerable<TR> Select<T, TR>(this IEnumerable<T> source, Func<T, TR> selector)
{
    return new AnonymousEnumerable<TR>(() =>
    {
        var enumerator = source.GetEnumerator();
        return new AnonymousEnumerator<TR>(
            () => enumerator.MoveNext(),
            () => selector(enumerator.Current),
            () => enumerator.Dispose()
        );
    });
}

驚くほどスッキリ。デザインパターンの本はC#でラムダ式全開でやり直すと、考え方はともかく、コードは全然違った内容になるんじゃないかなあ、とか思いつつ。この突然出てきたAnonymousEnumerableに関しては.NET Reactive Framework メソッド探訪第二回:AnonymousEnumerableを参照にどうぞ。去年の9月ですか……。AnonymousObservableも紹介する、といって10ヶ月後にようやく果たせている辺りが、やるやる詐欺すぎて本当にごめんなさい。

簡単に説明すれば、コンストラクタにラムダ式で各メソッドの本体を与えてあげることで、その場でクラスを作ることが出来るという代物です。クロージャによる変数キャプチャにより、引数を渡し回す必要もないため非常にすっきり書く事ができます。

これってようするにJavaの無名クラスでしょ?と言うと、その通り。おお、Java、大勝利。なんてこたぁーない。大は小を兼ねない、むしろこれは、小は大を兼ねる事の証明。

AnonymousObservable

IObservableはIEnumerableのようなコンパイラサポートはないので、自前で書かなければなりません。が、普通に書くと面倒なので、AnonymousObservableを使って書くことにしましょう。

public class AnonymousObservable<T> : IObservable<T>
{
    Func<IObserver<T>, IDisposable> subscribe;

    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        this.subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return subscribe(observer);
    }
}

public class AnonymousObserver<T> : IObserver<T>
{
    Action<T> onNext;
    Action<Exception> onError;
    Action onCompleted;

    public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }

    public void OnCompleted()
    {
        onCompleted();
    }

    public void OnError(Exception error)
    {
        onError(error);
    }

    public void OnNext(T value)
    {
        onNext(value);
    }
}

public class AnonymousDisposable : IDisposable
{
    Action dispose;
    bool isDisposed = false;

    public AnonymousDisposable(Action dispose)
    {
        this.dispose = dispose;
    }

    public void Dispose()
    {
        if (!isDisposed)
        {
            isDisposed = true;
            dispose();
        }
    }
}

そのまま書き出すだけなので、難しいことは何一つありませんが、面倒くさい……。なお、今回はRx抜きでの実装のためこうして自前で定義していますが、RxにはObservable.Create/CreateWithDisposable、Observer.Create、Disposable.Createというメソッドが用意されていて、それらは今回定義したAnonymousHogeと同一です。new ではなくCreateメソッドで生成するため型推論が効くのが嬉しい。

Observable.Select/Where

下準備が済んだので実装していきましょう。まずはSelect。

public static IObservable<R> Select<T, TR>(this IObservable<T> source, Func<T, TR> selector)
{
    return new AnonymousObservable<TR>(observer => source.Subscribe(
        new AnonymousObserver<T>(
            t => observer.OnNext(selector(t)),
            observer.OnError,
            observer.OnCompleted)));
}

Enumerableと似ているようで非常に分かりにくい。AnonymousObservableの引数のラムダ式は、Subscribeされた時に実行されるもの。というわけで、突然出てきているかのような引数のobserverは、Subscribeによって一つ後ろのメソッドチェーンから渡されるものとなります。

Observable.Range(1, 10) // これがsource
    .Select(i => i * i)
    .Subscribe(i => Console.WriteLine(i)); // これがobserver

こんな前後関係の図式になっています。ドットの一つ前のメソッドがsource、一つ後ろのメソッドがobserver。 最終的な目的としては元ソースからOnNext->OnNext->OnNextと値を伝搬させる必要があるわけですが、元ソースは末端どころか次に渡す先すら知りません。そのため、まず最初(Subscribeされた時)にsource.Subscribeの連鎖で元ソースまで遡ってやる必要がある、というわけです。非常に説明しづらいのでデバッガで追ってみてください。

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new AnonymousObservable<T>(observer => source.Subscribe(
        new AnonymousObserver<T>(
            t => { if (predicate(t)) observer.OnNext(t); },
            observer.OnError,
            observer.OnCompleted)));
}

WhereはSelectのOnNext部分が違うだけのもの。コピペ量産体制。

ToObservable

Selectなどと同じくreturn new AnonymousObservableですが、もうSubscribeはしません(そもそもIObservable sourceがないので出来ないですが)。ここからは、末端から伝達されてきたobserverに対して値をPushしてやります。

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source)
{
    return new AnonymousObservable<T>(observer =>
    {
        var isErrorOccured = false;
        try
        {
            foreach (var item in source)
            {
                observer.OnNext(item);
            }
        }
        catch (Exception e)
        {
            isErrorOccured = true;
            observer.OnError(e);
        }
        if (!isErrorOccured) observer.OnCompleted();

        return new AnonymousDisposable(() => { });
    });
}

Subscribeされると即座にforeachが回ってOnNext呼びまくる。ToObservableはHot or ColdのうちColdで、Subscribeされるとすぐに値が列挙されるわけです。Coldってのは、なんてことはなく、ようはすぐforeachされるからってだけの話でした。

戻り値のIDisposableは、FromEventではイベントのデタッチなどの処理がありますが、ToObservableでは何もする必要がないので何も無し。

FromEvent徹底解剖

Coldだけでは、別にEnumerbaleと全然変わらなくて全く面白くないので、Hot Observableも見てみます。Hotの代表格はFromEvent。そんなFromEventには4つのオーバーロードがあります。せっかくなので、細かく徹底的に見てみましょう。

public class EventSample
{
    public event EventHandler BlankEH;
    public event EventHandler<SampleEventArgs> GenericEH;
    public event SampleEventHandler SampleEH;
}

public class SampleEventArgs : EventArgs { }
public delegate void SampleEventHandler(object sender, SampleEventArgs e);

static void Main(string[] args)
{
    var sample = new EventSample();
    // 1. EventHandlerに対応するもの
    Observable.FromEvent(
        h => sample.BlankEH += h, h => sample.BlankEH -= h);
    // 2. EventHandler<EventArgs>に対応するもの
    Observable.FromEvent<SampleEventArgs>(
        h => sample.GenericEH += h, h => sample.GenericEH -= h);
    // 3. 独自EventHandlerに対応するもの
    Observable.FromEvent<SampleEventHandler, SampleEventArgs>(
        h => new SampleEventHandler(h),
        h => sample.SampleEH += h, h => sample.SampleEH -= h);
    // 4. リフレクション
    Observable.FromEvent<SampleEventArgs>(sample, "GenericEH");
    Observable.FromEvent<SampleEventArgs>(sample, "SampleEH");
}

FromEventと言ったら文字列で渡して―― という感じだったりですが、むしろそれのほうが例外的なショートカットで、基本はeventをadd/removeする関数を渡します。3つもありますが、基本的には三番目、conversionが必要なものが最も多く出番があるでしょうか。ただのEventHandlerなんて普通は使わないし、ジェネリクスのEventHandlerもほとんど見かけないしで、どうせみんな独自のEventHandlerなんでしょ、みたいな。もしEventHandler<T>で統一されていれば、こんな面倒くさいconversionなんて必要なかったのに!もしくは、みんなAction<object, TEventArgs>で良かった。名前付きデリゲートの氾濫の弊害がこんなところにも……。

実際のとこ文字列渡しで良いよねー、と思います。リフレクションのコストはどうせ最初の一回だけだし。リファクタリング効かないといっても、別にイベントの名前なんて変更しないっしょっていうか、フレームワークに用意されてるイベントは固定だし、って話ですし。

FromEventの作成

そんなわけで、今回は3引数のFromEventを作ります。FromEventの戻り値はIEventなので、IEventの定義も一緒に。

public interface IEvent<TEventArgs> where TEventArgs : EventArgs
{
    object Sender { get; }
    TEventArgs EventArgs { get; }
}

public class AnonymousEvent<TEventArgs> : IEvent<TEventArgs> where TEventArgs : EventArgs
{
    readonly object sender;
    readonly TEventArgs eventArgs;

    public AnonymousEvent(object sender, TEventArgs eventArgs)
    {
        this.sender = sender;
        this.eventArgs = eventArgs;
    }

    public object Sender
    {
        get { return sender; }
    }

    public TEventArgs EventArgs
    {
        get { return eventArgs; }
    }
}

public static IObservable<IEvent<TEventArgs>> FromEvent<TDelegate, TEventArgs>(
    Func<EventHandler<TEventArgs>, TDelegate> conversion,
    Action<TDelegate> addHandler,
    Action<TDelegate> removeHandler) where TEventArgs : EventArgs
{
    return new AnonymousObservable<IEvent<TEventArgs>>(observer =>
    {
        var handler = conversion((sender, e) =>
        {
            observer.OnNext(new AnonymousEvent<TEventArgs>(sender, e));
        });
        addHandler(handler);
        return new AnonymousDisposable(() => removeHandler(handler));
    });
}

感覚的にはToObservableの時と一緒。Subscribeされたら実行される関数を書く。Subscribe時に実際に実行されるのはaddHandlerだけ。つまりイベント登録。そしてイベントが発火した場合は、conversionのところのラムダ式に書いたものが呼び出される、つまり次のobserverに対してOnNextでIEventを送る。そして、DisposeされたらremoveHandlerの実行。

これが、Hotなわけですね。つまりSubscribeだけではOnNextが呼ばれず、もう一段階、奥から実行される。

// 実行例としてObservableCollectionなどを用意。
var collection = new ObservableCollection<int>();

var collectionChanged = Observable.FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            h => new NotifyCollectionChangedEventHandler(h),
            h => collection.CollectionChanged += h,
            h => collection.CollectionChanged -= h)
        .Select(e => (int)e.EventArgs.NewItems[0]);

// attach
collectionChanged.Subscribe(new AnonymousObserver<int>(i => Console.WriteLine(i), e => { }, () => { }));
collectionChanged.Subscribe(new AnonymousObserver<int>(i => Console.WriteLine(i * i), e => { }, () => { }));

collection.Add(100); // 100, 10000
collection.Add(200); // 200, 40000

利用時は大体こんな感じになります。いたって普通。

まとめ

というわけで実装を見ていきましたが、意外と簡単です。リフレクタでToObservable見たけどこんな簡単じゃなかったぞ!と言われると、そうですね、実際のRxはScheduler(カレントスレッドで実行するかスレッドプールで実行するか、などなどが選べる)が絡むので実装はもう少し、というかもうかなり複雑です。だからこそ惑わされてしまうというわけで、基本的な骨格部分にのみ絞ってみれば十二分にシンプル、というのを掴むのが肝要じゃないかと思います。

次回は前回予告の通りに、後回しにしちゃってるけれど結合周りを紹介できればいいなあ。あと、FromAsyncか、Timer周辺か、Schedulerか……。RxJSもちゃんと例を出したいし、例を出したいといえば、そう、メソッド紹介だけじゃなく実例も出していきたいなあ、だし。うーん。まあ、ボチボチとやっていきます。最近ほんとRxの知名度・注目度が高まってるような気がしてます。ぐぐる検索で私のへっぽこ記事が上位に出てしまうという現状なので、申し訳ない、じゃなくて、それ相応の責任を果たすという方向で頑張りたいと思います。つまりは記事をちゃんと充実させよう。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive