Reactive ExtensionsとAsync CTPでの非同期のキャンセル・プログレス処理

暫くはAsync CTPを特集していく!と思っていたのですが、何だか随分と間があいてしまいました。じっくり非同期操作に必要なオペレーションは何か、と考えるに「バックグラウンドでの実行」「進捗のUI表示」「結果のUI表示」「キャンセル処理」「エラー時処理」が挙げられる気がします。というわけで、こないだまではRxで進捗表示とかエラー時処理とか見ていたわけです、決してAsync CTPをスルーしていたわけではありません!ホントダヨ?記事の分量的にどうしてもRxだけで埋まってしまったのです。

さて、ところでこれらってつまり、BackgroundWorkerですよねー。ただたんに裏で実行するだけならThreadPool.QueueUserWorkItemでいいし、結果のUIへの伝達ぐらいならDispatcher.BeginInvoke書けば…… ですが、進捗やキャンセルなどを加えていくとドロドロドロドロしてしまいます。それらが統合された上でポトペタプロパティ設定で使えるBackgroundWorkerは偉大なわけです。

では、BackgroundWorkerを使った場合とReactive Extensionsを使った場合、そしてAsync CTPのasync/await、つまりはTaskを使った場合とで比較していきます。

あ、そうそう、Async CTPの本格的な解説はmatarilloさんの訳されているEric Lippertの継続渡しスタイル(CPS)と非同期構文(async/await)やufcppさんの非同期処理 (C# によるプログラミング入門)でがっちりと解説されています。私はがっちりした記事は書けないのでひたすらゆるふわに機能を雑多につまみ食いで。あと、Reactive Extensionsとしつこく比較するのも忘れません。

BackgroundWorkerの場合

BackgroundWorkerは、DoWorkはバックグラウンドで、ProgressChangedとRunWorkerCompletedはUIスレッド上で動きます。これにより、Dispatcherだとか、そういうことを意識せずに使えます。勿論、DoWork内でDispatcher.BeginInvokeすることも可能ですが、そういう場合はBackgroundWorkerの意味があまりなくなってしまうので、設計には素直に従っておいたほうが良いです。というわけで例など。

static string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

static void Main()
{
    var bw = new BackgroundWorker
    {
        WorkerReportsProgress = true,
        WorkerSupportsCancellation = true
    };

    bw.ProgressChanged += (sender, e) =>
    {
        var percentage = e.ProgressPercentage;
        var state = e.UserState;
        Console.WriteLine(percentage + "%" + ":" + state);
    };

    bw.DoWork += (sender, e) =>
    {
        var worker = sender as BackgroundWorker; // くろーぢゃな場合はbwが直接取れるので不要ですが
        var result = (string)e.Argument;
        if (result == null) throw new ArgumentNullException("引数よこせゴルァ");
        worker.ReportProgress(1, result); // 進捗報告

        // 重たい処理が幾つかあって最終的な結果を出す
        // キャンセルは随時出来るようにする
        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(33, result); // 進捗報告

        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(66, result); // 進捗報告

        result = HeavyHeavyHeavyMethod(result);
        if (worker.CancellationPending) { e.Cancel = true; return; }
        worker.ReportProgress(100, result); // 進捗報告

        e.Result = result; // 結果セットして正常完了
    };

    bw.RunWorkerCompleted += (sender, e) =>
    {
        if (e.Cancelled) // // キャンセルした場合
        {
            Console.WriteLine("キャンセルされたー");
        }
        else if (e.Error != null) // 例外発生の場合
        {
            Console.WriteLine("例外出たー");
            Console.WriteLine(e.Error);
        }
        else // 正常終了の場合
        {
            var result = e.Result;
            Console.WriteLine("終わった、結果:" + result);
        }
    };

    // 以下実行例

    bw.RunWorkerAsync("hoge"); // 非同期実行開始と初期引数

    Thread.Sleep(6000);
    bw.CancelAsync(); // 6秒後にキャンセルするなど

    while (bw.IsBusy) Thread.Sleep(1000);
    bw.RunWorkerAsync(null); // 今度は引数なしで実行するなど

    while (bw.IsBusy) Thread.Sleep(1000);
    bw.RunWorkerAsync("hoge"); // 最後まで実行

    Console.ReadLine();
}

実行結果などは他愛もないものなのでスルーで。さて、コードは見たとおりに、些か冗長なところはありますが一般的に考えられる処理は全て行えます。受け渡しがObjectなのダセーとか、EventArgsに値をセットして受け渡しダセーとか、キャンセルするのにCancellationPendingのチェックだりー、などなど思うところは色々あります。BackgroundWorkerのメリットはポトペタにあったと思われるので、時代背景的に、もうそぐわないかなあという気がかなりしています。

Reactive Extensionsの場合

Reactive Extensionsは、この手の非同期処理はお手の物。というわけでBackgroundWorkerで行った機能をまんま代替してみます。実行スレッドの切り替えはObserveOnで。

static string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

// WPFで適当なリストボックス(経過表示用)と適当なキャンセルボタンがあるとする
public MainWindow()
{
    InitializeComponent();

    Action<int, string> reportProgress = (i, s) => listBox1.Items.Add(i + "%:" + s);

    var disposable = Observable.Return("hoge", Scheduler.ThreadPool)
        .ObserveOnDispatcher().Do(s => reportProgress(1, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(33, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(66, s))
        .ObserveOn(Scheduler.ThreadPool).Select(HeavyHeavyHeavyMethod)
        .ObserveOnDispatcher().Do(s => reportProgress(100, s))
        .Subscribe(
            s => listBox1.Items.Add("終わった、結果:" + s),
            e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
            () => { });

    // キャンセルボタンクリックでキャンセル
    CancelButton.Click += (sender, e) =>
    {
        listBox1.Items.Add("キャンセルしたー");
        disposable.Dispose();
    };
}

んん、あれれ?進捗表示する時はDispatcherに切り替え、重い処理をする時はThreadPoolに流すよう切り替える。理屈は簡単。書くのもそのまま。しかし、しかし、これは、どう見ても非効率的。おまけにコードの見た目もUgly。ダメだこりゃ。そんな時は拡張メソッド。例えばこんなものを用意しよう。

public static class ObservableExtensions
{
    /// <summary>Report on Dispatcher</summary>
    public static IObservable<T> Report<T>(this IObservable<T> source, Action<T> action)
    {
        return source.Report(action, Scheduler.Dispatcher);
    }

    /// <summary>Report on Scheduler</summary>
    public static IObservable<T> Report<T>(this IObservable<T> source, Action<T> action, IScheduler scheduler)
    {
        return source.Do(x => scheduler.Schedule(() => action(x)));
    }
}

Doの変形バージョンで、actionをDispatcher.BeginInvoke(デフォルトでは。オーバーロードのISchedulerを渡すものを使えば、任意のスケジューラに変更出来ます)で行う、というものです。これなら進捗表示などにピッタリ合うはず。というわけで、適用してみます。

var disposable = Observable.Return("hoge", Scheduler.ThreadPool)
    .Report(s => reportProgress(1, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(33, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(66, s))
    .Select(HeavyHeavyHeavyMethod)
    .Report(s => reportProgress(100, s))
    .ObserveOnDispatcher()
    .Subscribe(
        s => listBox1.Items.Add("終わった、結果:" + s),
        e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
        () => { });

無難に仕上がりました。BackgroundWorkerと比べると、随分とすっきりします。受け渡しがオブジェクトではなく、しっかり型がついたままチェーンされること、例外処理もOnErrorの流れに沿ってすっきり記述できること、そして、何よりもキャンセル処理が楽!Disposeを呼ぶだけで、CancellationPendingのようなものをチェックする必要なくサクッとキャンセルすることが可能です。これは、処理単位が小さなメソッド毎に分割される、この場合は進捗報告を抜くとSelectの連打という形になりますが、その連打がちゃんと意味を持つわけです。

余談ですが、INotifyPropertyChanged経由のデータバインディングは自動でDispatcher経由にしてくれるようなので、その辺楽。UIパーツなんて直接触るもんじゃない、MVVM! でもObservableCollectionだとダメだったりするんですね、色々んもー。

Task(async/await)の場合

TaskにおけるキャンセルもBackgroundWorkerと同じく、キャンセル用オブジェクトの状態を確認して自分で挙動を挟む必要があります。ThrowIfCancellationRequested() を呼べばキャンセルされていた時は例外を送出して強制終了。

string HeavyHeavyHeavyMethod(string s)
{
    Thread.Sleep(5000); // 重たい処理をするとする
    return s + s;
}

// 進捗表示用入れ物クラス
class ProgressResult
{
    public int Percentage { get; set; }
    public string Value { get; set; }
}

async void DoAsync(string start, CancellationToken token, IProgress<ProgressResult> progress)
{
    // 進捗報告はIProgress<T>のReportを呼ぶ
    progress.Report(new ProgressResult { Percentage = 1, Value = start });

    try
    {
        var s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(start));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出
        progress.Report(new ProgressResult { Percentage = 33, Value = s });

        s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(s));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出
        progress.Report(new ProgressResult { Percentage = 66, Value = s });

        s = await TaskEx.Run(() => HeavyHeavyHeavyMethod(s));
        token.ThrowIfCancellationRequested(); // キャンセルされた場合は例外送出

        listBox1.Items.Add("終わった、結果:" + s);
    }
    catch (OperationCanceledException)
    {
        listBox1.Items.Add("キャンセルされたー");
    }
}

public MainWindow()
{
    InitializeComponent();

    // プログレスが変化したときの挙動の登録
    var progress = new EventProgress<ProgressResult>();
    progress.ProgressChanged += (sender, e) =>
        listBox1.Items.Add(e.Value.Percentage + "%" + ":" + e.Value.Value);

    // キャンセルボタンを押したとする、時にキャンセルする
    var ctsSource = new CancellationTokenSource();
    button1.Click += (_, __) => ctsSource.Cancel();

    // 非同期実行
    DoAsync("hoge", ctsSource.Token, progress);
}

例外送出という形なので、BackgroundWorkerよりはキャンセルが楽です。プログレスに関しては、EventProgress<T>を用意して、それのReportメソッドを呼ぶという形になります。これはBackgroundWorkerに非常に近い感じですね。

同期→非同期

今まで見た「重い処理」であるHeavyHeavyHeavyMethodは同期的なものでした。言うならばWebRequestのGetResponse。もしくはCPU時間を喰う処理。では、BeginGetResponseのような、重い処理が非同期の場合の非同期処理(こんがらがる)はどうなるでしょう。

void HeavyMethod2(string s, Action<string> action)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        Thread.Sleep(5000);
        var result = s + s;
        action(result);
    });
}

こんな、なんちゃって非同期メソッドがあるとして、こいつをどう料理出来るか。

非同期とBackgroundWorker

元から非同期のものに対し、BackgroundWorkerは無力です。破綻です。さようならです。

// DoWorkは実行されるとすぐに抜けて(HeavyMethod2が非同期のため)
// RunWorkerCompletedが呼ばれることになって全く正常に動かない
bw.DoWork += (sender, e) =>
{
    HeavyMethod2("hoge", s1 =>
    {
        bw.ReportProgress(33, s1);
        HeavyMethod2(s1, s2 =>
        {
            bw.ReportProgress(66, s2);
            HeavyMethod2(s2, s3 =>
            {
                bw.ReportProgress(100, s3);
            });
        });
    });
};

bw.RunWorkerCompleted += (sender, e) =>
{
    var result = e.Result;
    listBox1.Items.Add("終わった、結果:" + result);
};

bw.RunWorkerAsync("hoge");

これはちっとも動きません。というかReportProgressで例外が出ます(実行が完了=RunWorkerCompletedが呼ばれている状態ではReportProgressは呼べない)。なんとも、ならないですねえ。ここでAutoResetEventなどを呼んでDoWorkの完了を待機してやるぜ、という策もありますが、そんなことをやる意味は全くないでしょう。

Reactive Extensions

補助拡張メソッドとしてXxxAsObservableを定義しましょう。Begin-EndパターンのものならFromAsyncPatternが使えますが、今回のような俺々非同期メソッドには使えないので、AsyncSubjectを使って自前でラップします。

IObservable<string> HeavyMethod2AsObservable(string input)
{
    var asyncSubject = new AsyncSubject<string>();
    HeavyMethod2(input, s =>
    {
        try
        {
            asyncSubject.OnNext(s);
            asyncSubject.OnCompleted();
        }
        catch(Exception e)
        {
             asyncSubject.OnError(e);
        }    
    });
    return asyncSubject.AsObservable();
}

ラップ自体はそんなに難しいものでもないですし、定型なので割と楽です。AsyncSubjectの詳細、もしくは何故AsyncSubjectを使わなければならないのか、非同期ラップの落とし穴、的なものは以前の記事を参照してください。

var disposable = Observable.Return("hoge")
    .Report(s => reportProgress(1, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(33, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(66, s))
    .SelectMany(HeavyMethod2AsObservable)
    .Report(s => reportProgress(100, s))
    .ObserveOnDispatcher()
    .Subscribe(
        s => listBox1.Items.Add("終わった、結果:" + s),
        e => { listBox1.Items.Add("例外出たー"); listBox1.Items.Add(e); },
        () => { });

同期のものと見比べてもらうと分かりますが、ほとんど変わりません。SelectをSelectManyに変えただけです。同期だとか非同期だとか、そんなの全く関係なく同じように取りまとめられてしまう。これはRxの強みの一つです。

async/await

RxでAsyncSubjectを使ってラップしたように、こちらではTaskCompletationSourceを使ってラップします。詳細はRxを使って非同期プログラミングを簡単にで。そうしたら、後は以前のものと同じように書きます。同じなので割愛。

まとめ

BackgroundWorkerの成したことは大きいと思います。全く非同期を意識させずにコントロールのポトペタで、UIをブロックしないコードが書ける。でもその反面、受け渡しがobjectであったりと、弊害と限界が見えているように思えます。そしてそれは、非同期APIしかないSilverlightでついに限界を向かえた。もうそろそろ、お役御免。しょうがない。

では代わりに何を使うかと言ったら、Rxを使えばいいんじゃないでしょうか、いやこれは本気で。見てきたとおり、十分にBackgroundWorkerの機能を代替出来ていますし。TaskはSilverlightにはまだ入ってないし、素のままでは使いやすいとは言い難い。目の前に現実的な解が転がっているのだから、とりあえず使ってみるのもいいんじゃないかな。機能的にはReactive Extensionsがイケてるのは間違いないと思うので(キャンセルの容易さは非常に大きい!)、そして、現実的に使える形で提供されている状態でもあるので、Rx使うといいんぢゃないかな(そればっか)。

今後。私は、Reactive Extensionsとasync/awaitは共存するものだと思っています。そして、どちらも、必須であると、両者を知れば知るほど思い始めています。なので、もう単純に比較してどうこうはお終い。次は連携を考えていきたいと思います。とりあえず、何で共存するのか、何故に両者が必須であるのか(私であるのならばRxだけじゃダメなんですか!ダメなんです、の理由などなどり)は、そのうち書きます。

Profile

Yoshifumi Kawai

Cysharp, Inc
CEO/CTO

Microsoft MVP for Developer Technologies(.NET)
April 2011
|
July 2025

X:@neuecc GitHub:neuecc

Archive