В реактивном программировании класс Observable является базовым и все манипуляции, как правило, происходят с его экземплярами.
const observable = Rx.Observable.create(
function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}
);
observable.subscribe(n => console.log(n));
В этом примере данные генерируются только внутри функции subscribe
. Повлиять на поток данных извне никак нельзя. Чтобы управлять потоком данных, объект должен реализовывать методы класса Observer, т.е. методы next()
, error() и complete().
Когда нужно генерировать данные в другом месте кода, то чаще всего используют класс Subject. Он представляет собой комбинацию Observable и Observer.
const subject = new Rx.Subject();
subject.subscribe(n => console.log(n));
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
На практике это может понадобиться, когда компонент приложения использует одну или несколько функций обратного вызова, чтобы оповестить родительский компонент об изменениях.
const delta$ = new Rx.Subject();
function handleNextTrack() {
delta$.next(1);
}
function handlePrevTrack() {
delta$.next(-1);
}
const playlist = (
<Playlist
onNextTrack={handleNextTrack}
onPrevTrack={handlePrevTrack}
/>
);
Экземпляр класса Subject
уведомляет своих подписчиков только когда появляются новые данные. Это напоминает как работают события — если вы подписались после того как событие произошло, то уведомление вы получите лишь в момент наступления следующего события. По этой причине для экземпляров
Subject бесполезно применять операторы типа .startWith().
В RxJS есть специальный класс BehaviorSubject
. Экземпляр этого класса хранит последнее событие и как только появляется очередной подписчик, он сразу получает последнее значение. На примере плейлиста, поток направления воспроизведения — это экземпляр
Subject, а поток позиций в плейлисте — это экземпляр BehaviorSubject.
С другой стороны, поток позиции может быть получен из потока направления через свёртку методом .scan().
const position$ = Rx.Observable.from(delta$)
.startWith(0)
.scan((position, delta) => position + delta);