RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable

wakamsha
71

RxJS を学ぼう #1 – これからはじめる人のための導入編にて以下のように解説しました。

Observable はただ書いただけでは値を流しません。下流で subscribe ( 購読 ) することで初めて値が流れます。

これはいわゆる『COLD』という性質に限った話であり、実は Observable には subscribe せずともデータを流す仕組みが用意されています。そのような性質を『HOT』と呼びます。この二つの性質の違いを理解することは Rx を使いこなす上で避けて通ることは出来ません。今回はこれら COLD と HOT についてご紹介します。

COLD

1) subscribe されるまで値を流さない

COLD は下流で subscribe されるまで値を流しません ( ストリームが稼働していない状態 ) 。仮に subscribe されていない Observable に値を流しても流れが途切れるだけで、値は処理されることなく消滅してしまいます。通常、Observable はこの COLD という性質になります。

ストリームが subsribe されていないので値が流れていかない
subscribe されて初めて値が流れる

川を流れる大きな桃は、最終的に下流にておじいさんに拾ってもらわなくては物語が進みません。おじいさんがいなくては無意味ということで桃を流すことも出来ないのです。

const clock$ = Rx.Observable
  .interval(1000)
  .take(5)
  .map(x => x + 1)
  .do(x => console.log(`do: ${x}`));
// ( ˘ω˘) …静かだ…

clock$ ストリームを subscribe する Observer がいないので値が流れはじめません。末端のdoオペレータで console.log 出力しようとしてもここまで値が来ることがないので、何も出力されずに終了します。

余談ですがObservable.intervalは一定時間ごとに値を通知するイベントのようなものなので HOT のように思われますが、れっきとした COLD です。

2) それぞれの Observer に対して個別に値を流す

もう一つの COLD な性質として、IObservable<T>ストリームに関連付けたそれぞれのIObserver<T>に対して個別に値を流すというのがあります。一つの Observable に対して複数 subscribe した場合、それぞれに対して個別に Observable が複製されて割り当てられるというわけです。

COLD な Observable を複数 subscribe するとストリームが複製される
const clock$ = Rx.Observable
  .interval(1000)
  .take(5)
  .map(x => x + 1)
  .do(x => console.log(`do: ${x}`));

// Observer A
clock$.subscribe(x => console.log(` A: ${x}`));

// Observer B
setTimeout(() => {
  clock$.subscribe(x => console.log(`    B: ${x}`));
}, 2500);

上記のコードでは、clock$という Observable に対して二つの Observer AB が subscribe しています。B は A より 2.5秒遅く subscribe しているので、普通に考えれば B が受け取れる値は345だけのはずですが、実際は以下のように全ての値を受け取ります。

"do: 1"
" A: 1"
"do: 2"
" A: 2"
"do: 3"
" A: 3"
"do: 1"     // Aとは別にB用に複製されたObservableから新規に値が流れている
"    B: 1"
"do: 4"
" A: 4"
"do: 2"
"    B: 2"
"do: 5"
" A: 5"
"do: 3"
"    B: 3"
"do: 4"
"    B: 4"
"do: 5"
"    B: 5"

桃を受け取る事ができるおじいさんは世界で唯一人だけです。もし二人目のおじいさんが現れた場合は、その人は別世界の川で別の桃を受け取るということになります。パラレルワールドみたいなものですね。

See the Pen RxJS Cold Observable - DEMO by wakamsha (@wakamsha) on CodePen.

HOT

1) subscribe されなくとも値を流す

HOT オペレータが存在するとその時点で Observable を稼働させ、値が流れ始めます。下流で Observer が subscribe してなかろうが関係なく流れるため、もし Observer がいなければ値は Observable を流れきって最後に消滅します。

const clock$ = Rx.Observable
  .interval(1000)
  .take(5)
  .do(x => console.log(`do: ${x}`))
  .map(x => x + 1)
  .publish();

clock$.connect();

// Observer B
setTimeout(() => {
  clock$.subscribe(x => console.log(`    B: ${x}`));
}, 2500);

上記の例では、後述するpublishオペレータで Observable を COLD から HOT に変換しています。その直後にconnectオペレータを呼び出すことで強制的に Observable を稼働させ、値を流しはじめています ( connect についても後述 ) 。Observer B は Observable 稼働開始から 2.5秒後に subscribe しているため、値を受け取れるのは3からになります。

"do: 0"
"do: 1"
"do: 2"
"    B: 3"  // ここから subscribe 開始
"do: 3"
"    B: 4"
"do: 4"
"    B: 5"

See the Pen RxJS HOT Observable - DEMO1 by wakamsha (@wakamsha) on CodePen.

2) ストリームを分岐させて一度に同じ値を流す

もう一つの HOT な性質として、同一のIObservable<T>ストリームに関連付けたすべてのIObserver<T>に対して一度に同じ値を流すというのがあります。COLD の場合は subscribe される度にパラレルワールドかのごとく Observable が複製されますが、HOT オペレータ が間にいると複製されず、そこでストリームが分岐して後続に同一の値を流します

HOT な Observable の場合、そこでストリームが分岐する

ちなみに分岐した後は別の値として扱われるので、以下のように B に対してのみ個別の処理を加えて値を変えることも可能です。

clock$
  .map(x => x * 10)
  .subscribe(x => console.log(`    B: ${x}`));
"    B: 30"
"    B: 40"
"    B: 50"

以上が COLD と HOT の大まかな解説になります。Observable は通常 COLD な性質であり、それを HOT な性質に変換することで複数の Observer に同一の値を流したり subscribe される前に値を流しはじめることが可能になることが分かりました。

では COLD から HOT に変換するにはどうすればよいのでしょうか?先ほどのサンプルコードにて publish と connect という新しいオペレータが登場しました。これらはConnectableObservableというクラスのオペレータであり、以降はこれらについてご紹介します。

ConnectableObservable とは?

Observable を継承したクラスです。通常の Observable は subscribe された時点で値を流し始めますが、ConnectableObservable は いくらsubscribe しようが connect という処理が実行されるまで値を流しません

逆に言えば connect してしまえば subscribe されていなくても値は流れはじめるということです。つまり値が流れきってしまった後に subscribe しても何の値も受け取れないことになります。その場合は単に complete 処理が実行されて終わりです。

もうお分かりですね。ConnectableObservable とはすなわち HOT な性質の Observable なのです。

ConnectableObservable.publish

ConnectableObservable<T>インスタンスを返すオペレータです。これまで学習してきたオペレータは全て Observableインスタンスを返していましたが、publish は ConnectableObservable インスタンスを返すことで COLD から HOT に変換します。ソースコードを見ると以下のような実装になっています。

new ConnectableObservable(this, new Subject())

Observable インスタンスは subscribe が実行されたタイミングでデータを流しはじめるのに対して、ConnectableObservable インスタンスは connect() を実行することで流しはじめるという違いがあります。

ここまでのまとめ

  1. publish() して Observable を HOT な状態に変換する ※ この時点ではまだストリームは稼働しない
  2. connect()を実行すると Observer の有無に関係なくストリームが稼働しだす

ConnectableObservable.refCount

Observer が一つ以上ある間は connect 状態を維持し、Observer が 0になると connect 状態を解除する Observable インスタンスを返します。以下のサンプルコードを見てみましょう。

const count$ = Rx.Observable
  .from([1, 2, 3, 4, 5])
  .do(x => console.log(`do: ${x}`))
  .map(x => x + 1)
  .publish()
  .refCount();

// Observer A
count$.subscribe(
  x => console.log(` A: ${x}`),
  (error: Error) => console.log(error),
  () => console.log('---- A is completed. ----')
);

// Observer B
setTimeout(() => {
  count$.subscribe(
    x => console.log(`    B: ${x}`),
    (error: Error) => console.log(error),
    () => console.log('---- B is completed. ----')
  );
}, 2500);
"do: 1"
" A: 2"
"do: 2"
" A: 3"
"do: 3"
" A: 4"
"do: 4"
" A: 5"
"do: 5"
" A: 6"
"---- A is completed. ----"
"---- B is completed. ----"

publish しているにも関わらず connect がありません。代わりに refCountを呼び出しています。通常 publish は connect が呼び出されるまでストリームを稼働させませんが、refCount は最初に subscribe されたタイミングで自動的にストリームを稼働させます。上記のコードでは Observer A が subscribe したタイミングでストリームが稼働し始めます。しかも今回は Observable ソースがfrom([1, 2, 3, 4, 5]) と同期的なものとなっています。そのため、Observer B が 2.5秒遅延して subscribe していますが、その時にはすでに A によって全ての値が流れきってしまっているので B は何も受け取ることが出来ずに complete イベントだけを発火させて終了となります。

See the Pen RxJS HOT Observable - refCount - DEMO by wakamsha (@wakamsha) on CodePen.

次に非同期の例を見てみましょう。

const count$ = Rx.Observable
  .interval(1000)
  .take(5)
  .do(x => console.log(`do: ${x}`))
  .map(x => x + 1)
  .publish()
  .refCount();

// Observer A
count$.subscribe(
  x => console.log(` A: ${x}`),
  (error: Error) => console.log(error),
  () => console.log('---- A is completed. ----')
);

// Observer B
setTimeout(() => {
  count$.subscribe(
    x => console.log(`    B: ${x}`),
    (error: Error) => console.log(error),
    () => console.log('---- B is completed. ----')
  );
}, 2500);
"do: 0"
" A: 1"
"do: 1"
" A: 2"
"do: 2"
" A: 3"
"    B: 3"
"do: 3"
" A: 4"
"    B: 4"
"do: 4"
" A: 5"
"    B: 5"
"---- A is completed. ----"
"---- B is completed. ----"

Observer A が subscribe したタイミングでストリームが稼働して値が流れ始めています。Observer B はストリーム稼働から 2.5秒後に subscribe しているため、値3から受け取っています。

See the Pen RxJS HOT Observable - refCount - DEMO2 by wakamsha (@wakamsha) on CodePen.

Observable.share

一言で言ってしまうと publish().refCount() のエイリアスです。ソースコードを見ると以下のような実装になっています。

new ConnectableObservable(this, new Subject()).refCount()

refCount を返していますね。つまり publish().refCount() と同じ挙動をすると考えて良いわけです。

HOT に変換した Observable をすぐに稼働したい場合は share() を使うのが一番お手軽ということですね。

締め

Observable の超重要な性質である COLD / HOT について見てきました。同一の値を複数のObserverに『分配 ( 同じデータを複数の購読者に送信すること ) 』するのも、COLD から HOT に変換することで実現できます。他にも YouTube の『PC で途中まで観た動画の続きをスマホで観る』というUI / UX はこの HOT な Observable そのものと言えます1)実際のところ、Rx で実装されているのかは分かりませんが…

ストリームの分岐を意識した設計はなかなかどうして難易度が高いですが、小さなものからコツコツと作っていくことで徐々に慣れていきましょう。

脚注

脚注
1 実際のところ、Rx で実装されているのかは分かりませんが…