Отслеживание обновлений коллекции MongoDB в браузере почти в реальном времени

Я как-то заинтересовался возможностью получать все изменения из определенной таблицы базы данных в браузере, чтобы отображать их в интерфейсе в почти реальном времени. Например, Firebase изначально работает по такому принципу. Но хотелось бы использовать какое-то не облачное решение.

В начале 2018 официальные драйвера MongoDB (начиная с версии 3.0.0) стали предоставлять такую возможность с минимальными затратами усилий разработчика. Покажу на небольшом примере как это можно реализовать.

Поключение к базе данных

Сервер будет работать на Node.js и зависеть от модулей mongodb, socket.io, rxjs и stream-to-observable.

Начнем с подключения к базе данных (файл db.js):


const { from } = require('rxjs');
const { map } = require('rxjs/operators');
const { MongoClient } = require('mongodb');

// впишите необходимые параметры подключения
const MONGODB_URL = '';
const MONGODB_OPTS = {};
const MONGODB_DATABASE = '';

const client = MongoClient.connect(MONGODB_URL, MONGODB_OPTS);

module.exports = function () {
  return from(client)
    .pipe(map(c => c.db(MONGODB_DATABASE)));
};

Константа client будет содержать Promise, который резолвится в экземпляр клиента при успешном подключении. Когда мы вызовем экспортированную функцию, то получим Observable с потоком из одного события, содержащего экземпляр базы данных.

Поток с подключением к базе данных

Теперь займёмся сервером, принимающим запросы браузеров (файл index.js):


const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    .pipe(takeUntil(stop$))
    .subscribe(() => socket.emit('test', 'Hello, world!'));
});

Проверяем как работает сервер с помощью скрипта в браузере (файл index.html)


<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.1/socket.io.js"></script>
<script>
  const socket = io.connect('localhost:8081');
  socket.on('test', data => console.log(JSON.stringify(data)));
</script>

Если всё было сделано правильно, то в консоли браузера появится сообщение

> Hello, world!

Фактически, это сообщение появится, когда будет доступно подключение к базе данных. Оно инициализируется один раз при запуске сервера и используется для всех подключений к серверу. На каждое подключение к серверу извне создается Observable с экземпляром базы данных.

Поток завершился раньше, чем подключение к базе данных

Если браузер не дождался пока сервер подключится к базе данных, то этот Observable завершается без генерации данных. Это поведение управляется через другой Observable stop$ . Его значимость я объясню чуть позже.

Получение изменений

Теперь напишем модуль, который будет отдавать изменения из базы данных (файл changes.js)


const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .watch({ fullDocument: 'updateLookup' })
    )),
    // #2
    flatMap(cursor => (
      fromStream(cursor, { dataEvent: 'change' })
    )),
    // #3
    map(data => ({
      type: data.operationType,
      payload: data.fullDocument || data.documentKey
    }))
  );
};

Эта функция возвращает оператор , который будет поставлять изменения в указанной коллекции.

Функция pipe делает композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор с изменениями. Функция map() заменит в потоке значение db на то значение, которое вернул колбек.
  2. Из курсора получаем фактические данные данные. В этом месте используется преобразование Stream в Observable. Функция flatMap() будет генерировать значения пока не закончится поток данных, порожденный курсором.
  3. Данные конвертируются в нужный формат.

Смотрите в чём отличие map() от flatMap().

Функция map() из одного элемента генерирует один элемент, но с другим здачением.

Экземпляр базы данных Курсор, возвращающий записи из таблицы

Когда внутри потока появляется еще один поток, то flatMap() начинает объединять родительский и дочерние потоки в один. Так в потоке из одного элемента могут появиться другие элементы.

Появляется вложенный поток Потоки объединяются в один

Внесем изменения в файл index.js


// зарегистрируем библиотеку rxjs как поставщик объектов
// типа Observable для модуля stream-to-observable
require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const changes = require('./changes');

// создадим оператор для наблюдения за коллекцией test
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    // добавим оператор testChanges в цепочку
    .pipe(testChanges, takeUntil(stop$))
    // отправляем данные в браузер
    .subscribe(data => socket.emit('test', data));
});

Предполагается, что MongoDB будет генерировать 5 типов изменений: «insert», «update» и «delete», «replace» и «invalidate». Все они будут приходить в браузер через событие test клиента Socket.IO.

Самое время запустить консоль mongo и попробовать добавлять, изменять и удалять данные в таблице.


> db.test.insert({value: 0.1})
> db.test.insert({value: 0.6})
> db.test.insert({value: 0.8})
> db.test.insert({value: 0.3})
> db.test.updateMany({value: {$gt: 0.5}}, {$set: {value: 1}})
> db.test.remove({value: {$lte: 0.5}})

Тем временем в консоли браузера будут тут же отображаться результаты операций.


> {"type":"insert","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3","value":0.1}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":0.6}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":0.8}}
> {"type":"insert","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6","value":0.3}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":1}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":1}}
> {"type":"delete","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3"}}
> {"type":"delete","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6"}}

Остановка потока

Так как поток изменений получился бесконечным, то вот здесь и пригодится Observable stop$, который я упомянул ранее.

Браурез отключился раньше чем завешрился поток

Данные в stop$ появятся когда браузер будет отключаться от сервера (например, при перезагрузке страницы или закрытии окна). Для потока обновлений это будет служить признаком того, что его нужно завершить (оператор takeUntil). Когда мы корректно завершаем поток, то все подписчики автоматически отписываются и освобождают выделенные ресурсы, если это требуется.

Заключение

Так совсем небольшими усилиями удалось получить доступ к данным практически в реальном времени. Библиотека socket.io скрыла от разработчика всю сложность работы с WebSocket. А использование библиотеки rxjs сделало код лаконичным.

Дальнейшим логичным шагом в развитии этого эксперимента будет получение текущего состояния коллекции при инициализации клиента в браузере.