Получение текущего состояние коллекции MongoDB в виде набора обновлений

В заметке про отслеживание обновлений коллекции я показал как сделать простой север, который будет посылать в браузер информацию о появлении, удалении или изменении данных в определенной коллекции. В этой заметке я покажу как получить текущее состояние коллекции.

Получение состояния коллекции

Нам нужно будет написать еще один оператор (файл state.js).


const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .find({})
    )),
    // #2
    flatMap((cursor) => (
      fromStream(cursor)
    )),
    // #3
    map(payload => ({
      type: 'insert',
      payload
    }))
  );
};

Он оказался очень похож на оператор, который генерирует поток изменений. Все так же с помощью функции pipe делаем композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор на все записи, которые хранятся в коллекции.
  2. Преобразуем Stream в Observable и получаем поток данных.
  3. Для упрощения кода предположим, что текущее состояние коллекции формируется из вставки записей в пустой массив. Другими словами, для клиента в браузере записи, соответствующие текущему состоянию, и вновь добавляемые записи имею один и тот же тип — insert.

Внесем изменения в index.js


require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

// создадим оператор для получения начального состояния
const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  // поток, в котором будет сгенерировано состояние коллекции
  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // объединим поток с начальным состоянием и поток изменений
  merge(state$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Как только браузер подключится к серверу, так в консоли отобразятся все записи, которые были на тот момент в коллекции в виде событий insert .

Скорее всего, такое поведение сервера будет не всегда оправдано. Браузер может на какое-то время потерять соединение с сервером и после восстановления соединения сервер отправит всё состояние коллекции. Но браузеру оно не нужно, так как оно уже находится у него в памяти.

Клиент может послать серверу специальное событие в тот момент, когда он хочет получить текущее состояние состояние (например, в момент загрузки страницы).

Добавим в скрипты index.html следующую строку:

socket.emit('getAllRecords');

А финальная версия index.js будет выглядеть так:


require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { flatMap, takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // отдаём состояние только по запросу
  const getAllRecords$ = fromEvent(socket, 'getAllRecords')
    .pipe(flatMap(() => state$));

  // объединяем поток изменений с потоком, в котором состояние
  // коллекции появится только после специального события
  merge(getAllRecords$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Заключение

Текущее состояние коллекции можно получить как список изменений для пустого массива на клиенте. Его можно отдавать сразу при подключении или по запросу. И на клиенте для этого не потребовалось вносить существенных изменений.