見出し画像

RxJS_非同期データストリームの扱い例 #386

Angularで開発をしていて、RxJS(Reactive Extensions for JavaScript)というライブラリが提供する非同期データストリームの処理に触れました。今回はその例を示しながら整理したいと思います。

RxJSはAngularと非常によく統合されており、Angularの公式ドキュメントでも頻繁に参照されています。特に、AngularのHTTPクライアントやフォームコントロール、ルーティングなど、多くの機能がRxJSのObservableとともに動作するように設計されています。

以下は15秒ごとにGETリクエストを送信し、myappDataを最新化してセットするプログラムです。

  public subscription = new Subscription();


    this.subscription.add(
      interval(15000)
        .pipe(
          startWith(-1),
          flatMap(x => this.myappService.getMyappData(x >= 0))
        )
        .subscribe(myappData => {
          this.myappData = myappData;
          this.setData();
        })
    );

前提として、this.myappService.getMyappData(x >= 0)でGETリクエストを投げています。

1つずつ解説していきます。


Subscription

冒頭のSubscription()は、RxJSのObservableからデータを受け取るためのものです。Observableからデータを受け取るには、そのObservableに対して「購読」(subscribe)する必要があり、それがSubscriptionです。

Observableは後述するintervalやstartWithで作成され、.subscribeで購読を行うと、Observableはデータを発行し始めます。

サンプルコード上で最終的に発行されるのは、getMyappData(x >= 0)で投げたGETリクエストの結果です。発行されたデータを.subscribe()で受け取り、ここではthis.myappDataの値を更新してセットしています。

また、これらの一連の処理をsubscriptionというSubscriptionオブジェクトに追加していますが、これは後で処理を停止するためです。例えばユーザがページを離れる時などに、subscription.unsubscribe()を呼び出すことで、15秒ごとの更新を停止できます。


Subscriptionのaddメソッド

上記の通り、このプログラムではsubscriptionという名前でSubscriptionオブジェクトを保持しています。

このオブジェクトにはaddメソッドで複数の購読を追加でき、その後一度に全ての購読を止めることができます。コンポーネントが破棄される際など、メモリリークを防ぐために全ての購読を止めるのが一般的です。


interval()

ミリ秒単位で数字を発行するObservableを作成します。発行する数値は0から始まり、1ずつ増えていきます(0, 1, 2, 3, ...)。ここではinterval(15000)となっているので、15秒毎に数字が発行されます。

以下のイメージです。

  • 開始直後: 数値を発行しない

  • 15秒後: 0を発行

  • 30秒後: 1を発行

  • 45秒後: 2を発行

  • 60秒後: 3を発行

  • ・・・以下同様に続く


pipe()

pipeはObservableに対して一連のオペレーター(関数)を順次適用するためのメソッドです。それぞれのオペレーターの出力は次のオペレーターの入力となります。

pipe()メソッドの役割はオペレータにObservableを渡すことです。


startWith()

startWithは最初のオペレーターとして用いられ、指定された値(この場合は-1)をすぐに発行する新しいObservableを作成します。

サンプルコードにおいて最初にObservableを作成するのはinterval()です。そのObservableは15秒ごとに値を発行しますが、開始直後は何も発行しません。

interval()で作成した、そのObservableに対してpipeを使用し、その中でstartWith(-1)を使用すると、startWithは新しいObservableを作成します。この新しいObservableは、元のObservableが発行する値の前に指定した値(ここでは-1)を発行します。

つまり、startWithは既存のObservableに対して動作し、そのObservableが発行する値のシーケンスに追加の値を挿入した新しいObservableを生成します。


flatMap()

Observableが発行する各値に対して関数を適用し、その結果をフラットな(ネストされていない)Observableに結合します。つまり関数の処理結果が新しいObservableとして発行されます。

ここではintervalとstartWithで発行された数値(最初は-1、その後は0、1、2...)を取り、getMyappData(x >= 0)に渡してGETリクエストを送信し、その結果を新たなObservableとして作成します。

そのObservableが最初に説明した.subscribeで購読され、最終的な処理が実施されている形です。

結果として、15秒ごとにmyappDataの取得処理が実行されます。



なお、getMyappData(x >= 0)の中でx >= 0のチェックを行っているのは、最初の-1が関数に渡されて実行した時と、その後の15秒間隔で実行した時とで処理を分けるためです。

これらの結果、このコードは開始直後に一度、そしてその後15秒ごとにgetMyappData関数を呼び出し、得られたキャンペーンのデータをthis.myappDataに設定し、setData()を呼び出すという動作をします。


↓再掲

  public subscription = new Subscription();


    // pollingしてstatus更新する
    // リクエスト毎に-1からカウントアップされる引数を使って、初回表示時のリクエストとstatus更新用のリクエストを区別する
    this.subscription.add(
      interval(15000)
        .pipe(
          startWith(-1),
          flatMap(x => this.myappService.getMyappData(x >= 0))
        )
        .subscribe(myappData => {
          this.myappData = myappData;
          this.setData();
        })
    );


ここまでお読みいただきありがとうございました!

この記事が気に入ったらサポートをしてみませんか?