【Unity】MessagePipeの非同期処理

MessagePipe解読3回目

MessagePipeは非同期処理が実装されているということで、それがどういう使い方ができるか。という部分の記事


async/await で待機

async/await で順次処理、並列処理も可能になっているとのことです!

非同期になっているからと言って使い方が変わっているというわけではなく

Publisher → IAsyncPublusher

Subscriber → IAsyncSubscriber

という Async という命名がついたクラスを使用するだけで使い方は変わらないそうです。


非同期処理についていくつかテストをしてみたいと思います。


  • イベントを送信するクラス Publisher
  • イベントを受信するクラス Subscriber
  • イベント MyEvent

とします


イベント送り終わるまで await で待機する

まずはイベントを送る側にasync/awaitを適用させてみます。

イベント送る側(Publisher)が、イベント送り終わるまでawaitするケース。

// イベント受け取る方
public class Subscriber
{
    [Inject] private ISubscriber<MyEvent> _subscriber;

    public void Subscribe()
    {
        // イベントが来たら反応する
        _subscriber
            .Subscribe(ev =>
            {
                Debug.Log($"イベント受信完了");
            });
    }
}

イベント受信する側は前回と同じ。イベントが来たらSubscribeの引数のラムダ式が反応します。

// イベント送る方
public class Publisher
{
    [Inject] private IAsyncPublisher<MyEvent> _asyncPublisher;

    public async UniTask SendAsync(MyEvent ev)
    {
        // Subscriberの購読処理が終わるまで待つ
        await _asyncPublisher.PublishAsync(ev);

        // イベント全て送り終えた
    }
}

イベント送る方であるPublisherの IPublisher が IAsyncPublisher になっています。

この形で、IAsyncPublisherのPublishAsyncでイベントを送ることにより、イベントが送り終わるまで await され続けます。

後はPublisherクラスを作成して SendAsync を呼び出すだけです

{
    // イベント投げる
    var publisher = _container.Instantiate<Publisher>();
    await publisher.SendAsync(new MyEvent()); // ← イベントすべて送り終わるまで待機される
}


「イベント受け取り終わるまで待機...?」 というところに疑問を持たれると思います。

現在はイベント受け取るほうが特に何も変わったことをしていないためawaitは特に何もせずに先に進みます。

特に意味がないケース。


しかしIPublisherをIPublishAsyncにするだけで async/await が使用できるということがわかったと思います。

使い方がほぼ変わらずに適用できるという便利さ

イベントを受信したときに await させたい

次はイベントを受信する側。のクラスでasync/awaitを適用させるパターンを考えます。

イベントを受信したら1秒待機して、ログを出したいケース。


// イベント受け取る方
public class Subscriber
{
    [Inject] private IAsyncSubscriber<MyEvent> _asyncSubscriber;

    /// <summary>
    /// Subscribe内で待機
    /// </summary>
    public void SubscribeAsync()
    {
        _asyncSubscriber
            .Subscribe(async (x, ctr) =>
            {
                var time = Time.realtimeSinceStartup;

                // ここで非同期処理が可能
                // 大体1秒待機する
                await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ctr);
                time = Time.realtimeSinceStartup - time;

                Debug.Log($"イベント受信完了: {x.Message} 待機秒数: {time}秒");
            });
    }
}

イベント受け取る方を変えました。送る方はさっきと同じです。

ISubscriberをIAsyncSubscriberに変えて、これで非同期処理が使用可能になってます

_asyncSubscriberのSubscribeのラムダに async がついていることがわかると思います。


つまり、ラムダの中で await が使用できるということ。

ラムダの中で1秒待機する UniTask.Delay を使用して、その後ログを出すようにしました。

流れとしては以下です

  1. PublisherAsyncでイベントを投げた
  2. Subscriberクラスがイベントを受け取った
  3. UniTask.Delayで1秒待機
  4. 待機後ログを出した。
  5. Subscribe終了
  6. PublishAsync の待機が終わり先に進む

ここでひっそりと6番目に処理が追加されていますが、新しくコードを追加したのではなくもともとPublisherクラスは「イベントの送信処理がすべて終了した時先に進む」というawait処理がありました。

今回Subscriberクラスは1秒待機してログを出すようにしています。

つまり、4番目の 「待機後ログを出した。Subscribe終了」が終わるまで Publisherクラスの PublishAsyncが待機される。ということです

f:id:toshizabeth:20210718211455p:plain


イベント送信側で await する理由は、受信側で await される可能性がある時 その処理が終わるまで待つことが出来る。

一度だけイベントをawaitで受け付けたい

イベントを一度だけ待機したい。というパターンは割と多いんじゃないかと思います。

  • 再生したアニメーションが終わるまで待機したい。
  • 撃った弾が消えるまで待機したい

一度だけイベントが来るまでawaitして、イベント来たら処理を先に進ませたい


Subscirbe (() => .... ); でラムダの中に処理を書いても良いんですが、awaitを使用してもう少しモダンに処理をさせたいケースで使えるのが FristAsync

// イベント受け取る方
public class Subscriber
{
    [Inject] private IAsyncSubscriber<MyEvent> _asyncSubscriber;

    public async UniTask WaitAsync(CancellationTokenSource cts)
    {
        var ev = await _asyncSubscriber.FirstAsync(cts.Token);

        Debug.Log($"イベント受信完了!!");
    }
}

Subscribe で購読処理を書くのではなくFistAsyncを書くことで「MyEvent が送られてくるまで待機する。」という処理が可能になりました。

一度だけ処理したい場合はこちらのほうが見やすく、直感的ではないでしょうか


ただ、イベントが来るまでawaitで待ち続けてしまいます。これは途中で購読をやめたい時には不便です。

このために引数で CancellationToken を受け取るようになっています。

これによりトークンをいつでもCancelすることでawaitの待機処理を破棄することができます

ここらへんもCyshapのUniTaskとの協調性が高くとても使いやすいと感じます。

終わりに

MessagePipeの真骨頂はこのUniTaskと連動した非同期処理にあるものだと言っても過言じゃないと思います。 async/awaitが使用できることによりイベントの送受信だけに収まらず、待機処理まで行えることで処理が分散されないことで視認性が高くなり、バグが出にくいコードにもなると考えます

とても素晴らしい...