В заметке про отслеживание обновлений коллекции я показал как сделать простой север, который будет посылать в браузер информацию о появлении, удалении или изменении данных в определенной коллекции. В этой заметке я покажу как получить текущее состояние коллекции.
Получение состояния коллекции
Нам нужно будет написать еще один оператор (файл state.js).
const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');
module.exports = function (collection) {
return pipe(
// #1
map(db => (
db
.collection(collection)
.find({})
)),
// #2
flatMap((cursor) => (
fromStream(cursor)
)),
// #3
map(payload => ({
type: 'insert',
payload
}))
);
};
Он оказался очень похож на оператор, который генерирует поток изменений. Все так же с помощью функции pipe
делаем композицию из трех функций:
- Из экземпляра базы данных получаем курсор на все записи, которые хранятся в коллекции.
- Преобразуем Stream в Observable и получаем поток данных.
- Для упрощения кода предположим, что текущее состояние коллекции формируется из вставки записей в пустой массив. Другими словами, для клиента в браузере записи, соответствующие текущему состоянию, и вновь добавляемые записи имею один и тот же тип — insert.
Внесем изменения в index.js
require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');
// создадим оператор для получения начального состояния
const testState = state('test');
const testChanges = changes('test');
io.on('connect', socket => {
const db$ = getDb();
const stop$ = fromEvent(socket, 'disconnecting');
// поток, в котором будет сгенерировано состояние коллекции
const state$ = db$.pipe(testState);
const change$ = db$.pipe(testChanges);
// объединим поток с начальным состоянием и поток изменений
merge(state$, change$)
.pipe(takeUntil(stop$))
.subscribe(data => socket.emit('test', data));
});
Как только браузер подключится к серверу, так в консоли отобразятся все записи, которые были на тот момент в коллекции в виде событий insert .
Скорее всего, такое поведение сервера будет не всегда оправдано. Браузер может на какое-то время потерять соединение с сервером и после восстановления соединения сервер отправит всё состояние коллекции. Но браузеру оно не нужно, так как оно уже находится у него в памяти.
Клиент может послать серверу специальное событие в тот момент, когда он хочет получить текущее состояние состояние (например, в момент загрузки страницы).
Добавим в скрипты index.html следующую строку:
socket.emit('getAllRecords');
А финальная версия index.js будет выглядеть так:
require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { flatMap, takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');
const testState = state('test');
const testChanges = changes('test');
io.on('connect', socket => {
const db$ = getDb();
const stop$ = fromEvent(socket, 'disconnecting');
const state$ = db$.pipe(testState);
const change$ = db$.pipe(testChanges);
// отдаём состояние только по запросу
const getAllRecords$ = fromEvent(socket, 'getAllRecords')
.pipe(flatMap(() => state$));
// объединяем поток изменений с потоком, в котором состояние
// коллекции появится только после специального события
merge(getAllRecords$, change$)
.pipe(takeUntil(stop$))
.subscribe(data => socket.emit('test', data));
});
Заключение
Текущее состояние коллекции можно получить как список изменений для пустого массива на клиенте. Его можно отдавать сразу при подключении или по запросу. И на клиенте для этого не потребовалось вносить существенных изменений.