Заметки с тегом «rxjs»

Получение текущего состояние коллекции MongoDB в виде набора обновлений

В заметке про отслеживание обновлений коллекции я показал как сделать простой север, который будет посылать в браузер информацию о появлении, удалении или изменении данных в определенной коллекции. В этой заметке я покажу как получить текущее состояние коллекции.

Получение состояния коллекции

Нам нужно будет написать еще один оператор (файл state.js).


const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .find({})
    )),
    // #2
    flatMap((cursor) => (
      fromStream(cursor)
    )),
    // #3
    map(payload => ({
      type: 'insert',
      payload
    }))
  );
};

Он оказался очень похож на оператор, который генерирует поток изменений. Все так же с помощью функции pipe делаем композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор на все записи, которые хранятся в коллекции.
  2. Преобразуем Stream в Observable и получаем поток данных.
  3. Для упрощения кода предположим, что текущее состояние коллекции формируется из вставки записей в пустой массив. Другими словами, для клиента в браузере записи, соответствующие текущему состоянию, и вновь добавляемые записи имею один и тот же тип — insert.

Внесем изменения в index.js


require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

// создадим оператор для получения начального состояния
const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  // поток, в котором будет сгенерировано состояние коллекции
  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // объединим поток с начальным состоянием и поток изменений
  merge(state$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Как только браузер подключится к серверу, так в консоли отобразятся все записи, которые были на тот момент в коллекции в виде событий insert .

Скорее всего, такое поведение сервера будет не всегда оправдано. Браузер может на какое-то время потерять соединение с сервером и после восстановления соединения сервер отправит всё состояние коллекции. Но браузеру оно не нужно, так как оно уже находится у него в памяти.

Клиент может послать серверу специальное событие в тот момент, когда он хочет получить текущее состояние состояние (например, в момент загрузки страницы).

Добавим в скрипты index.html следующую строку:

socket.emit('getAllRecords');

А финальная версия index.js будет выглядеть так:


require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { flatMap, takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // отдаём состояние только по запросу
  const getAllRecords$ = fromEvent(socket, 'getAllRecords')
    .pipe(flatMap(() => state$));

  // объединяем поток изменений с потоком, в котором состояние
  // коллекции появится только после специального события
  merge(getAllRecords$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Заключение

Текущее состояние коллекции можно получить как список изменений для пустого массива на клиенте. Его можно отдавать сразу при подключении или по запросу. И на клиенте для этого не потребовалось вносить существенных изменений.

Оставте свой комментарий

Отслеживание обновлений коллекции MongoDB в браузере почти в реальном времени

Я как-то заинтересовался возможностью получать все изменения из определенной таблицы базы данных в браузере, чтобы отображать их в интерфейсе в почти реальном времени. Например, Firebase изначально работает по такому принципу. Но хотелось бы использовать какое-то не облачное решение.

В начале 2018 официальные драйвера MongoDB (начиная с версии 3.0.0) стали предоставлять такую возможность с минимальными затратами усилий разработчика. Покажу на небольшом примере как это можно реализовать.

Поключение к базе данных

Сервер будет работать на Node.js и зависеть от модулей mongodb, socket.io, rxjs и stream-to-observable.

Начнем с подключения к базе данных (файл db.js):


const { from } = require('rxjs');
const { map } = require('rxjs/operators');
const { MongoClient } = require('mongodb');

// впишите необходимые параметры подключения
const MONGODB_URL = '';
const MONGODB_OPTS = {};
const MONGODB_DATABASE = '';

const client = MongoClient.connect(MONGODB_URL, MONGODB_OPTS);

module.exports = function () {
  return from(client)
    .pipe(map(c => c.db(MONGODB_DATABASE)));
};

Константа client будет содержать Promise, который резолвится в экземпляр клиента при успешном подключении. Когда мы вызовем экспортированную функцию, то получим Observable с потоком из одного события, содержащего экземпляр базы данных.

Поток с подключением к базе данных

Теперь займёмся сервером, принимающим запросы браузеров (файл index.js):


const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    .pipe(takeUntil(stop$))
    .subscribe(() => socket.emit('test', 'Hello, world!'));
});

Проверяем как работает сервер с помощью скрипта в браузере (файл index.html)


<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.1/socket.io.js"></script>
<script>
  const socket = io.connect('localhost:8081');
  socket.on('test', data => console.log(JSON.stringify(data)));
</script>

Если всё было сделано правильно, то в консоли браузера появится сообщение

> Hello, world!

Фактически, это сообщение появится, когда будет доступно подключение к базе данных. Оно инициализируется один раз при запуске сервера и используется для всех подключений к серверу. На каждое подключение к серверу извне создается Observable с экземпляром базы данных.

Поток завершился раньше, чем подключение к базе данных

Если браузер не дождался пока сервер подключится к базе данных, то этот Observable завершается без генерации данных. Это поведение управляется через другой Observable stop$ . Его значимость я объясню чуть позже.

Получение изменений

Теперь напишем модуль, который будет отдавать изменения из базы данных (файл changes.js)


const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .watch({ fullDocument: 'updateLookup' })
    )),
    // #2
    flatMap(cursor => (
      fromStream(cursor, { dataEvent: 'change' })
    )),
    // #3
    map(data => ({
      type: data.operationType,
      payload: data.fullDocument || data.documentKey
    }))
  );
};

Эта функция возвращает оператор , который будет поставлять изменения в указанной коллекции.

Функция pipe делает композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор с изменениями. Функция map() заменит в потоке значение db на то значение, которое вернул колбек.
  2. Из курсора получаем фактические данные данные. В этом месте используется преобразование Stream в Observable. Функция flatMap() будет генерировать значения пока не закончится поток данных, порожденный курсором.
  3. Данные конвертируются в нужный формат.

Смотрите в чём отличие map() от flatMap().

Функция map() из одного элемента генерирует один элемент, но с другим здачением.

Экземпляр базы данных Курсор, возвращающий записи из таблицы

Когда внутри потока появляется еще один поток, то flatMap() начинает объединять родительский и дочерние потоки в один. Так в потоке из одного элемента могут появиться другие элементы.

Появляется вложенный поток Потоки объединяются в один

Внесем изменения в файл index.js


// зарегистрируем библиотеку rxjs как поставщик объектов
// типа Observable для модуля stream-to-observable
require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const changes = require('./changes');

// создадим оператор для наблюдения за коллекцией test
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    // добавим оператор testChanges в цепочку
    .pipe(testChanges, takeUntil(stop$))
    // отправляем данные в браузер
    .subscribe(data => socket.emit('test', data));
});

Предполагается, что MongoDB будет генерировать 5 типов изменений: «insert», «update» и «delete», «replace» и «invalidate». Все они будут приходить в браузер через событие test клиента Socket.IO.

Самое время запустить консоль mongo и попробовать добавлять, изменять и удалять данные в таблице.


> db.test.insert({value: 0.1})
> db.test.insert({value: 0.6})
> db.test.insert({value: 0.8})
> db.test.insert({value: 0.3})
> db.test.updateMany({value: {$gt: 0.5}}, {$set: {value: 1}})
> db.test.remove({value: {$lte: 0.5}})

Тем временем в консоли браузера будут тут же отображаться результаты операций.


> {"type":"insert","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3","value":0.1}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":0.6}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":0.8}}
> {"type":"insert","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6","value":0.3}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":1}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":1}}
> {"type":"delete","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3"}}
> {"type":"delete","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6"}}

Остановка потока

Так как поток изменений получился бесконечным, то вот здесь и пригодится Observable stop$ , который я упомянул ранее.

Браурез отключился раньше чем завешрился поток

Данные в stop$ появятся когда браузер будет отключаться от сервера (например, при перезагрузке страницы или закрытии окна). Для потока обновлений это будет служить признаком того, что его нужно завершить (оператор takeUntil). Когда мы корректно завершаем поток, то все подписчики автоматически отписываются и освобождают выделенные ресурсы, если это требуется.

Заключение

Так совсем небольшими усилиями удалось получить доступ к данным практически в реальном времени. Библиотека socket.io скрыла от разработчика всю сложность работы с WebSocket. А использование библиотеки rxjs сделало код лаконичным.

Дальнейшим логичным шагом в развитии этого эксперимента будет получение текущего состояния коллекции при инициализации клиента в браузере.

Комментарии к заметке: 2

Способы создания реактивного потока данных

В реактивном программировании класс Observable является базовым и все манипуляции, как правило, происходят с его экземплярами.


const observable = Rx.Observable.create(
  function subscribe(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  }
);

observable.subscribe(n => console.log(n));

В этом примере данные генерируются только внутри функции subscribe . Повлиять на поток данных извне никак нельзя. Чтобы управлять потоком данных, объект должен реализовывать методы класса Observer, т.е. методы next() , error() и complete().

Когда нужно генерировать данные в другом месте кода, то чаще всего используют класс Subject. Он представляет собой комбинацию Observable и Observer.


const subject = new Rx.Subject();

subject.subscribe(n => console.log(n));

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();

На практике это может понадобиться, когда компонент приложения использует одну или несколько функций обратного вызова, чтобы оповестить родительский компонент об изменениях.


const delta$ = new Rx.Subject();

function handleNextTrack() {
  delta$.next(1);
}
function handlePrevTrack() {
  delta$.next(-1);
}

const playlist = (
  <Playlist
    onNextTrack={handleNextTrack}
    onPrevTrack={handlePrevTrack}
  />
);

Экземпляр класса Subject уведомляет своих подписчиков только когда появляются новые данные. Это напоминает как работают события — если вы подписались после того как событие произошло, то уведомление вы получите лишь в момент наступления следующего события. По этой причине для экземпляров Subject бесполезно применять операторы типа .startWith().

В RxJS есть специальный класс BehaviorSubject . Экземпляр этого класса хранит последнее событие и как только появляется очередной подписчик, он сразу получает последнее значение. На примере плейлиста, поток направления воспроизведения — это экземпляр Subject, а поток позиций в плейлисте — это экземпляр BehaviorSubject.

С другой стороны, поток позиции может быть получен из потока направления через свёртку методом .scan().


const position$ = Rx.Observable.from(delta$)
  .startWith(0)
  .scan((position, delta) => position + delta);
Оставте свой комментарий