RxJS を学ぼう #5 - Subject について学ぶ / Observable × Observer

RxJS にはイベントストリームを流す『Observable』とそこを流れる値を受け取る『Observer』と二人の役者がいますが、一人二役をこなすものが別にいます。それがSubjectです。今回はこの Subject について詳しくご紹介します。

【おさらい】COLD の場合、Observable はそれぞれの Observer に対して個別に値を流す

Subject を理解するにはまず『COLD』と『HOT』の概念が頭に入っていなくてはなりません。以下のサンプルコードを見てみましょう。

const source$ = Rx.Observable.interval(1000).take(5);
const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A complete`)
};
source$.subscribe(observerA);  // create an execution
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B complete`)
};
setTimeout(() => source$.subscribe(observerB), 2000);

1000ミリ秒ごとに整数のシーケンスを5回流し、それをobserverAobserverBが subscribe するというものです。まず『A』が先に subscribe し、その2000ミリ秒後に『B』が subscribe しています。結果はどうなるでしょうか?

"A next: 0"
"A next: 1"
"A next: 2"
"B next: 0"
"A next: 3"
"B next: 1"
"A next: 4"
"A -------- complete --------"
"B next: 2"
"B next: 3"
"B next: 4"
"B -------- complete --------"

connect()share() といったオペレータを使っていないのでこの Observable は『COLD』です。従って『A』よりも後に subscribe した『B』ですが0,1... と最初から値を受け取っており、『A』と同期がとれていません。

仮に以下のように『A』が値を受け取るタイミングで『B』にも同じ値を渡してあげれば『A』との同期が可能ですが、『A』と『B』が密結合となってしまいます。

const observerA = {
  next:  (x)   => {
    console.log(`A next: ${x}`);
    observerB.next(x);
  },
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
"A next: 0"
"B next: 0"
"A next: 1"
"B next: 1"
"A next: 2"
"B next: 2"
⋮

【解決策】ひとつの Observable と複数の Observer を中継する bridgeObserver を作る

二つの Observable を中継する bridgeObserver なるものを作成します。

const source$ = Rx.Observable.interval(1000).take(5);
const bridgeObserver = {
  next: (x)    => bridgeObserver.observers.forEach(o => o.next(x)),
  error: (err) => bridgeObserver.observers.forEach(o => o.error(err)),
  complete: () => bridgeObserver.observers.forEach(o => o.complete()),
  observers: [],
  addObserver: (observer) => bridgeObserver.observers.push(observer)
};
source$.subscribe(bridgeObserver);
const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
bridgeObserver.addObserver(observerA);
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B -------- complete --------`)
};
setTimeout(() => bridgeObserver.addObserver(observerB), 2000));

従来の Observer 同様に next, error, complete メソッドを持ち、それらに加えて子 Observer の配列を持たせます。bridgeObserver は Observable からイベントを受け取ると forEach() で全ての子 Observer に対してイベントを中継します。実際に Observable を subscribe しているのは bridgeObserver ひとつだけです。つまり Observable が COLD であってもストリームは複製されません。そのため、『A』より2秒後に追加された『B』も『A』と全く同じイベントを受け取るので以下のような実行結果となります。

"A next: 0"
"A next: 1"
"A next: 2"
"B next: 2"
"A next: 3"
"B next: 3"
"A next: 4"
"B next: 4"
"A -------- complete --------"
"B -------- complete --------"

『HOT』と全く同じ挙動が実現できていますね。

Subject とは bridgeObserver のことである

ちなみに bridgeObserver の addObserver というメソッド名を subscribe に変えたらどうでしょうか?

const bridgeObserver = {
  next: (x)    => bridgeObserver.observers.forEach(o => o.next(x)),
  error: (err) => bridgeObserver.observers.forEach(o => o.error(err)),
  complete: () => bridgeObserver.observers.forEach(o => o.complete()),
  observers: [],
  subscribe: (observer) => bridgeObserver.observers.push(observer)
};
⋮
bridgeObserver.subscribe(observerA);

子 Observer にイベントストリームを中継 ( 流す )するということは、言ってしまえば Observable と同じですよね。さらに bridgeObserver 自身も next,error,complete を持っているので subscribe も可能です。つまり bridgeObserver は Observable と Observer の両方の機能を備えていると言えます。bridgeObserver は正式には Subject といいます。

const source$ = Rx.Observable.interval(1000).take(5);
const subject = new Rx.Subject();
source$.subscribe(subject);
⋮
subject.subscribe(observerA);
⋮
setTimeout(() => subject.subscribe(observerB), 2000);
"A next: 0"
"A next: 1"
"A next: 2"
"B next: 2"
"A next: 3"
"B next: 3"
"A next: 4"
"B next: 4"
"A -------- complete --------"
"B -------- complete --------"

そんな Subject には幾つか種類があります。ここからはそれぞれの Subject についてサンプルコードを交えながらご紹介します。

Subject - 任意のタイミングでイベントを流す

Observable.of などは subscribe されてはじめて動作します。Subject は Observable の有無に関係なく next(), error(), complete() を明示的に呼び出すことが出来るので、任意のタイミングでイベントを流すことができます。

const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B -------- complete --------`)
};
const subject = new Rx.Subject();
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
setTimeout(() => subject.subscribe(observerB), 2000);
"A next: 1"
"A next: 2"
"A next: 3"
"A -------- complete --------"
"B -------- complete --------"

Subject は 『1 Observable / N Observer』となり、Subject.observers プロパティに子 Observer が配列形式で格納されています。RxJS 4.x 系までは Subject.hasObservers() という子 Observer があるかどうかを boolean 型で返すメソッドがありましたが、5.x 系から廃止されました。子があるかどうかは Subject.observers.length で確認することになります。

BehaviorSubject - 直近の値を記憶して subscribe および next のタイミングで流す

先ほどの Subject の例では、『A』は即座に subscribe してるので全てのイベントを受け取れていますが、『B』が subscribe する頃には全てのイベントが流れてしまっているため、『B』は何も受け取れていません。これを RxMarble で表現してみると以下のようになります。

subject   : ----1---2---3------------
observerA : A...1...2...3....
observerB :                     B....

『A』はイベントが流れる前 ( もしくは同時 ) に subscribe しているので、1,2,3全てのイベントを受け取ることができ、更に将来のイベントも受け取ることができます。

このようなシチュエーションの場合、『B』には直近の値である 3 のイベントを受け取らせたくなります。そこで使うのがBehaviorSubjectです。BehaviorSubject は初期値もしくは最新値を記憶し、子 Observer が subscribe したタイミングもしくは Subject が next()したタイミングで最新の値を流すイベントを発火します。

const subject = new Rx.BehaviorSubject(0);
const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
subject.subscribe(observerA);
console.log(`A -------- subscribed --------`);
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B -------- complete --------`)
};
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
  subject.subscribe(observerB);
  console.log(`B -------- subscribed --------`);
}, 2000);
"A next: 0"
"A -------- subscribed --------"
"A next: 1"
"A next: 2"
"A next: 3"
"B next: 3"
"B -------- subscribed --------"

よって最後の3が流れた後に『B』が subscribe しても『B』は 3 だけは受け取れるのです。RxMarble で表現するとこうなります。

subject   : 0---1---2---3------------
observerA : .0..1...2...3....
observerB :                     3....

ただし、BehaviorSubject.complete()が発火してストリームが終了すると、それ以降に追加された子 Observer は最新の値も受け取れなくなります。

subject   : 0---1---2---3---|
observerA : .0..1...2...3...|
observerB :                     |

ReplaySubject - 指定された数の値を記憶し、subscribe のタイミングでそれらの値を一気に流す

BehaviorSubject は直近の値をひとつだけ記憶しておくことが出来ますが、過去に流された全ての値ないし任意の前までの値を『B』に渡したい場合は ReplaySubjectを使います。

const subject = new Rx.ReplaySubject(2);
const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
subject.subscribe(observerA);
console.log(`A -------- subscribed --------`);
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B -------- complete --------`)
};
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.complete(), 2000);
setTimeout(() => {
  subject.subscribe(observerB);
  console.log(`B -------- subscribed --------`);
}, 4000);

ReplaySubject はコンストラクタ時に BufferSize を引数として受け取ります。BufferSize は『いくつ前までの値を記憶しておきたいか』です。ここでは2を渡しています。つまり『B』が subscribe すると 2つ前までの値を受け取ることになります。

"A -------- subscribed --------"
"A next: 1"
"A next: 2"
"A next: 3"
"A -------- complete --------"
"B next: 2"
"B next: 3"
"B -------- complete --------"
"B -------- subscribed --------"

RxMarbleで表現してみると以下のようになります。

subject   : 1---2---3------------
observerA : ...1...2...3....
observerB :                   2,3....

Replay を直訳すると『再現』ですが、新しい購読者 ( Observer ) が加入すると、それ以前に起きたことを全て再生するという意味から来ているというわけですね。

windowSize で回数でなく時間ベースでバッファ保存を指定

さらに ReplaySubject は windowSize という第二引数を受け取ることができます。windowSize は回数ではなく内部バッファに保存する時間を指定します。例えば 4000ミリ秒と指定すると、4秒よりも古いイベントはそれ以降に追加された Observer には再生されなくなります。bufferSize と windowSize が競合した場合はバッファサイズの少ない ( 短い ) 方が優先されます。

const subject = new Rx.ReplaySubject(2, 2000);
⋮
subject.subscribe(observerA);
console.log(`A -------- subscribed --------`);
⋮
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.complete(), 2000);
setTimeout(() => {
  subject.subscribe(observerB);
  console.log(`B -------- subscribed --------`);
}, 4000);
"A -------- subscribed --------"
"A next: 1"
"A next: 2"
"A next: 3"
"A -------- complete --------"
"B -------- complete --------"
"B -------- subscribed --------"

AsyncSubject - 最新の値を記憶し、complete のタイミングで流す

AsyncSubject はストリームを最後に流れた値のみを記憶し、complete() を呼び出すとその値を流します。RxMarble で表現してみると以下のようになります。

subject   : 1---2---3--|
observerA :            3|
const subject = new Rx.AsyncSubject();
const observerA = {
  next:  (x)   => console.log(`A next: ${x}`),
  error: (err) => console.log(`A error: ${err}`),
  complete: () => console.log(`A -------- complete --------`)
};
subject.subscribe(observerA);
console.log(`A -------- subscribed --------`);
const observerB = {
  next:  (x)   => console.log(`B next: ${x}`),
  error: (err) => console.log(`B error: ${err}`),
  complete: () => console.log(`B -------- complete --------`)
};
setTimeout(() => subject.next(1), 500);
setTimeout(() => subject.next(2), 1000);
setTimeout(() => subject.next(3), 1500);
setTimeout(() => subject.complete(), 2000);
setTimeout(() => {
  subject.subscribe(observerB);
  console.log(`B -------- subscribed --------`);
}, 4000);

AsyncSubject.complete() 発火後に子 Observer が追加された場合は、追加されたと同時に最後の値と complete イベントが流れます。

"A -------- subscribed --------"
"A next: 3"
"A -------- complete --------"
"B next: 3"
"B -------- complete --------"
"B -------- subscribed --------"

RxMarble で表現すると以下のようになります。

subject   : 1---2---3--|
observerA :            3|
observerB :                 3|

Promise と挙動が似ていますが、用途も概ね同じようなものと考えられています。ReplaySubject でもほぼ AsyncSubject と同じことが出来ますので、もしかしたらそこまで利用頻度は高くないかもしれません。

JS フレームワークにおける Subject の使いどころ

RxJS は 最近の JavaScript フレームワークでも依存ライブラリとして使われており、利用者がプロダクションコードを書く際でもその恩恵を受けることができます。ここでは Angular と Cycle.js における Subject の使いどころをご紹介します。

Angular では $rootScope の代用として有用

AngularJS ( 1.x 系 ) では複数ページ間やディレクティブ ( コンポーネント ) 間でのデータの受け渡しないしイベントディスパッチのために $rootScope を使用するケースがあります。しかし後継である Angular ( 2 系 ~ ) にはそのような組み込みの機能がないため、代わりに Subject を使って値を流します。

@Injectable()
export class SearchService {
  public space: Subject<string> = new BehaviorSubject<string>(null);
  broadcastTextChange(text: string) {
    this.space.next(text);
  }
}
@Component({
  selector: 'some-component'
  providers: [SearchService], // only add it to one common parent if you want a shared instance
  template: `some-component`
)}
export class SomeComponent {
  constructor(searchService: SearchService) {
    searchService.space
      .subscribe((val: string) => console.log(val));
  }
}

Cycle.js では Driver を自作する上で必須

Cycle.js は Observable をリアクティブプログラミングな世界のアプリケーション層とイベントを subscribe して副作用を伴う処理を行うドライバ層という世界を二つに分割するフレームワークです。

http://cycle.js.org/drivers.html より引用

アプリケーション層 ( main()関数 ) から流れる Sink Stream はいわゆる Observable であり、Driver はこれを受け取って様々な処理を行います。そしてその結果を再びアプリケーション層に返してあげなくてはならないわけですが ( Source Streams ) 、ここに Subject を使います。

/**
 * アプリケーション
 * @param so
 * @returns {{DOM: Observable&lt;R>, Scroll: O&lt;number>}}
 */
function main(so: Sources): Sinks {
  const input$ = so.DOM.select('.scrollable__input').events('input');
  const offsetTop$ = Observable
    .from(input$)
    .map((ev: Event): number => Number((ev.currentTarget as HTMLInputElement).value));
  const vdom$ = Observable.combineLatest(
    so.Scroll.startWith('0px'),
    offsetTop$.startWith(0),
    (scroll, offsetTop) => {
      return div([
        input('.scrollable__input.form-control', {attrs: {type: 'number', value: offsetTop}}),
        p('.scrollable__counter', scroll)
      ]);
    }
  );
  return {
    DOM: vdom$,
    Scroll: offsetTop$
  };
}
/**
 * 指定された値だけスクロールする
 * window.scrollY 値を返す
 * @returns {(offsetTop$: Observable&lt;number>) => Subject}
 */
function makeScrollDriver() {
  return function ScrollDriver(offsetTop$) {
    const source = new Subject();
    window.addEventListener('scroll', () => {
      source.next(`${window.scrollY}px`);
    });
    Observable
      .from(offsetTop$)
      .subscribe(
        offsetTop => window.scrollTo(0, offsetTop)
      );
    return source;
  }
}
const drivers = {
    DOM: makeDOMDriver('#cycleApp'),
    Scroll: makeScrollDriver()
};
run(main, drivers);

Driver については別途執筆している Cycle.js シリーズにて追々ご紹介します。

締め

以上、Subject についてご紹介してきました。『Subject = Observable + Observer』とよく紹介されていますが、イマイチこれだけではピンとこない方も多いのではないでしょうか?当エントリの始めにご紹介した bridgeObserver のように実際に同じものを作ってみることでより理解を深められたのではないかと思います。Cycle.js はもちろん、Angular でも普通によく使う機能ですので、表面上だけでなくしっかり理解しておくことが大切です。