В реактивном программировании класс Observable
является базовым и все манипуляции, как правило, происходят с его экземплярами.
const observable = Rx.Observable.create(
function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}
);
observable.subscribe(n => console.log(n));
В этом примере данные генерируются только внутри функции subscribe
. Повлиять на поток данных извне никак нельзя. Чтобы управлять потоком данных, объект должен реализовывать методы класса Observer
, т.е. методы next()
, error()
и complete()
.
Когда нужно генерировать данные в другом месте кода, то чаще всего используют класс Subject
. Он представляет собой комбинацию Observable
и Observer
.
const subject = new Rx.Subject();
subject.subscribe(n => console.log(n));
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
На практике это может понадобиться, когда компонент приложения использует одну или несколько функций обратного вызова, чтобы оповестить родительский компонент об изменениях.
const delta$ = new Rx.Subject();
function handleNextTrack() {
delta$.next(1);
}
function handlePrevTrack() {
delta$.next(-1);
}
const playlist = (
<Playlist
onNextTrack={handleNextTrack}
onPrevTrack={handlePrevTrack}
/>
);
Экземпляр класса Subject
уведомляет своих подписчиков только когда появляются новые данные. Это напоминает как работают события — если вы подписались после того как событие произошло, то уведомление вы получите лишь в момент наступления следующего события. По этой причине для экземпляров
Subject
бесполезно применять операторы типа .startWith()
.
В RxJS есть специальный класс BehaviorSubject
. Экземпляр этого класса хранит последнее событие и как только появляется очередной подписчик, он сразу получает последнее значение. На примере плейлиста, поток направления воспроизведения — это экземпляр
Subject
, а поток позиций в плейлисте — это экземпляр BehaviorSubject
.
С другой стороны, поток позиции может быть получен из потока направления через свёртку методом .scan()
.
const position$ = Rx.Observable.from(delta$)
.startWith(0)
.scan((position, delta) => position + delta);