Способы создания реактивного потока данных

В реактивном программировании класс Observable является базовым и все манипуляции, как правило, происходят с его экземплярами.


const observable = Rx.Observable.create(
  function subscribe(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  }
);

observable.subscribe(n => console.log(n));

В этом примере данные генерируются только внутри функции subscribe . Повлиять на поток данных извне никак нельзя. Чтобы управлять потоком данных, объект должен реализовывать методы класса Observer, т.е. методы next() , error() и complete().

Когда нужно генерировать данные в другом месте кода, то чаще всего используют класс Subject. Он представляет собой комбинацию Observable и Observer.


const subject = new Rx.Subject();

subject.subscribe(n => console.log(n));

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();

На практике это может понадобиться, когда компонент приложения использует одну или несколько функций обратного вызова, чтобы оповестить родительский компонент об изменениях.


const delta$ = new Rx.Subject();

function handleNextTrack() {
  delta$.next(1);
}
function handlePrevTrack() {
  delta$.next(-1);
}

const playlist = (
  <Playlist
    onNextTrack={handleNextTrack}
    onPrevTrack={handlePrevTrack}
  />
);

Экземпляр класса Subject уведомляет своих подписчиков только когда появляются новые данные. Это напоминает как работают события — если вы подписались после того как событие произошло, то уведомление вы получите лишь в момент наступления следующего события. По этой причине для экземпляров Subject бесполезно применять операторы типа .startWith().

В RxJS есть специальный класс BehaviorSubject . Экземпляр этого класса хранит последнее событие и как только появляется очередной подписчик, он сразу получает последнее значение. На примере плейлиста, поток направления воспроизведения — это экземпляр Subject, а поток позиций в плейлисте — это экземпляр BehaviorSubject.

С другой стороны, поток позиции может быть получен из потока направления через свёртку методом .scan().


const position$ = Rx.Observable.from(delta$)
  .startWith(0)
  .scan((position, delta) => position + delta);