Заметки с тегом «nodejs»

Получение текущего состояние коллекции MongoDB в виде набора обновлений

В заметке про отслеживание обновлений коллекции я показал как сделать простой север, который будет посылать в браузер информацию о появлении, удалении или изменении данных в определенной коллекции. В этой заметке я покажу как получить текущее состояние коллекции.

Получение состояния коллекции

Нам нужно будет написать еще один оператор (файл state.js).

const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .find({})
    )),
    // #2
    flatMap((cursor) => (
      fromStream(cursor)
    )),
    // #3
    map(payload => ({
      type: 'insert',
      payload
    }))
  );
};

Он оказался очень похож на оператор, который генерирует поток изменений. Все так же с помощью функции pipe делаем композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор на все записи, которые хранятся в коллекции.
  2. Преобразуем Stream в Observable и получаем поток данных.
  3. Для упрощения кода предположим, что текущее состояние коллекции формируется из вставки записей в пустой массив. Другими словами, для клиента в браузере записи, соответствующие текущему состоянию, и вновь добавляемые записи имею один и тот же тип — insert.

Внесем изменения в index.js

require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

// создадим оператор для получения начального состояния
const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  // поток, в котором будет сгенерировано состояние коллекции
  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // объединим поток с начальным состоянием и поток изменений
  merge(state$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Как только браузер подключится к серверу, так в консоли отобразятся все записи, которые были на тот момент в коллекции в виде событий insert.

Скорее всего, такое поведение сервера будет не всегда оправдано. Браузер может на какое-то время потерять соединение с сервером и после восстановления соединения сервер отправит всё состояние коллекции. Но браузеру оно не нужно, так как оно уже находится у него в памяти.

Клиент может послать серверу специальное событие в тот момент, когда он хочет получить текущее состояние состояние (например, в момент загрузки страницы).

Добавим в скрипты index.html следующую строку:

socket.emit('getAllRecords');

А финальная версия index.js будет выглядеть так:

require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent, merge } = require('rxjs');
const { flatMap, takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const state = require('./state');
const changes = require('./changes');

const testState = state('test');
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  const state$ = db$.pipe(testState);
  const change$ = db$.pipe(testChanges);

  // отдаём состояние только по запросу
  const getAllRecords$ = fromEvent(socket, 'getAllRecords')
    .pipe(flatMap(() => state$));

  // объединяем поток изменений с потоком, в котором состояние
  // коллекции появится только после специального события
  merge(getAllRecords$, change$)
    .pipe(takeUntil(stop$))
    .subscribe(data => socket.emit('test', data));
});

Заключение

Текущее состояние коллекции можно получить как список изменений для пустого массива на клиенте. Его можно отдавать сразу при подключении или по запросу. И на клиенте для этого не потребовалось вносить существенных изменений.

Оставте свой комментарий

Отслеживание обновлений коллекции MongoDB в браузере почти в реальном времени

Я как-то заинтересовался возможностью получать все изменения из определенной таблицы базы данных в браузере, чтобы отображать их в интерфейсе в почти реальном времени. Например, Firebase изначально работает по такому принципу. Но хотелось бы использовать какое-то не облачное решение.

В начале 2018 официальные драйвера MongoDB (начиная с версии 3.0.0) стали предоставлять такую возможность с минимальными затратами усилий разработчика. Покажу на небольшом примере как это можно реализовать.

Поключение к базе данных

Сервер будет работать на Node.js и зависеть от модулей mongodb, socket.io, rxjs и stream-to-observable.

Начнем с подключения к базе данных (файл db.js):

const { from } = require('rxjs');
const { map } = require('rxjs/operators');
const { MongoClient } = require('mongodb');

// впишите необходимые параметры подключения
const MONGODB_URL = '';
const MONGODB_OPTS = {};
const MONGODB_DATABASE = '';

const client = MongoClient.connect(MONGODB_URL, MONGODB_OPTS);

module.exports = function () {
  return from(client)
    .pipe(map(c => c.db(MONGODB_DATABASE)));
};

Константа client будет содержать Promise, который резолвится в экземпляр клиента при успешном подключении. Когда мы вызовем экспортированную функцию, то получим Observable с потоком из одного события, содержащего экземпляр базы данных.

Поток с подключением к базе данных

Теперь займёмся сервером, принимающим запросы браузеров (файл index.js):

const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    .pipe(takeUntil(stop$))
    .subscribe(() => socket.emit('test', 'Hello, world!'));
});

Проверяем как работает сервер с помощью скрипта в браузере (файл index.html)

<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/2.1.1/socket.io.js"></script>
<script>
  const socket = io.connect('localhost:8081');
  socket.on('test', data => console.log(JSON.stringify(data)));
</script>

Если всё было сделано правильно, то в консоли браузера появится сообщение

> Hello, world!

Фактически, это сообщение появится, когда будет доступно подключение к базе данных. Оно инициализируется один раз при запуске сервера и используется для всех подключений к серверу. На каждое подключение к серверу извне создается Observable с экземпляром базы данных.

Поток завершился раньше, чем подключение к базе данных

Если браузер не дождался пока сервер подключится к базе данных, то этот Observable завершается без генерации данных. Это поведение управляется через другой Observable stop$. Его значимость я объясню чуть позже.

Получение изменений

Теперь напишем модуль, который будет отдавать изменения из базы данных (файл changes.js)

const { pipe } = require('rxjs');
const { flatMap, map } = require('rxjs/operators');
const fromStream = require('stream-to-observable');

module.exports = function (collection) {
  return pipe(
    // #1
    map(db => (
      db
        .collection(collection)
        .watch({ fullDocument: 'updateLookup' })
    )),
    // #2
    flatMap(cursor => (
      fromStream(cursor, { dataEvent: 'change' })
    )),
    // #3
    map(data => ({
      type: data.operationType,
      payload: data.fullDocument || data.documentKey
    }))
  );
};

Эта функция возвращает оператор, который будет поставлять изменения в указанной коллекции.

Функция pipe делает композицию из трех функций:

  1. Из экземпляра базы данных получаем курсор с изменениями. Функция map() заменит в потоке значение db на то значение, которое вернул колбек.
  2. Из курсора получаем фактические данные данные. В этом месте используется преобразование Stream в Observable. Функция flatMap() будет генерировать значения пока не закончится поток данных, порожденный курсором.
  3. Данные конвертируются в нужный формат.

Смотрите в чём отличие map() от flatMap().

Функция map() из одного элемента генерирует один элемент, но с другим здачением.

Экземпляр базы данных Курсор, возвращающий записи из таблицы

Когда внутри потока появляется еще один поток, то flatMap() начинает объединять родительский и дочерние потоки в один. Так в потоке из одного элемента могут появиться другие элементы.

Появляется вложенный поток Потоки объединяются в один

Внесем изменения в файл index.js

// зарегистрируем библиотеку rxjs как поставщик объектов
// типа Observable для модуля stream-to-observable
require('any-observable/register')('rxjs');
const io = require('socket.io')(8081);
const { fromEvent } = require('rxjs');
const { takeUntil } = require('rxjs/operators');
const getDb = require('./db');
const changes = require('./changes');

// создадим оператор для наблюдения за коллекцией test
const testChanges = changes('test');

io.on('connect', socket => {
  const db$ = getDb();
  const stop$ = fromEvent(socket, 'disconnecting');

  db$
    // добавим оператор testChanges в цепочку
    .pipe(testChanges, takeUntil(stop$))
    // отправляем данные в браузер
    .subscribe(data => socket.emit('test', data));
});

Предполагается, что MongoDB будет генерировать 5 типов изменений: «insert», «update» и «delete», «replace» и «invalidate». Все они будут приходить в браузер через событие test клиента Socket.IO.

Самое время запустить консоль mongo и попробовать добавлять, изменять и удалять данные в таблице.

> db.test.insert({value: 0.1})
> db.test.insert({value: 0.6})
> db.test.insert({value: 0.8})
> db.test.insert({value: 0.3})
> db.test.updateMany({value: {$gt: 0.5}}, {$set: {value: 1}})
> db.test.remove({value: {$lte: 0.5}})

Тем временем в консоли браузера будут тут же отображаться результаты операций.

> {"type":"insert","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3","value":0.1}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":0.6}}
> {"type":"insert","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":0.8}}
> {"type":"insert","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6","value":0.3}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a4","value":1}}
> {"type":"update","payload":{"_id":"5b3f29ee2e303b6b6a3b27a5","value":1}}
> {"type":"delete","payload":{"_id":"5b3f29ed2e303b6b6a3b27a3"}}
> {"type":"delete","payload":{"_id":"5b3f29ef2e303b6b6a3b27a6"}}

Остановка потока

Так как поток изменений получился бесконечным, то вот здесь и пригодится Observable stop$, который я упомянул ранее.

Браурез отключился раньше чем завешрился поток

Данные в stop$ появятся когда браузер будет отключаться от сервера (например, при перезагрузке страницы или закрытии окна). Для потока обновлений это будет служить признаком того, что его нужно завершить (оператор takeUntil). Когда мы корректно завершаем поток, то все подписчики автоматически отписываются и освобождают выделенные ресурсы, если это требуется.

Заключение

Так совсем небольшими усилиями удалось получить доступ к данным практически в реальном времени. Библиотека socket.io скрыла от разработчика всю сложность работы с WebSocket. А использование библиотеки rxjs сделало код лаконичным.

Дальнейшим логичным шагом в развитии этого эксперимента будет получение текущего состояния коллекции при инициализации клиента в браузере.

Оставте свой комментарий

util.promisify() для замены колбэков на промисы

Промисы на практике оказались удобнее колбэков из-за того, что не требуют постоянного увеличения вложенности при работе с цепочками асинхронных функций. Более того, на базе промисов строится работа async function.

Все асинхронные функции node.js и подавляющее большинство асинхронных функций внешних модулей на данный момент всё же используют колбэки, чтобы вернуть результат работы или сообщить об ошибке.

Технически, не сложно обернуть асинхронную функцию в промиз. Но, к сожалению, это не одна строчка кода.

const fs = require('fs');

function reader(path) {
    return new Promise((resolve, reject) => {
        fs.readdir(path, (err, list) => {
            if (err) {
                reject(err);
                return;
            }

            resolve(list);
        });
    });
}

reader('.')
    .then(list => {
        console.log(list);
        process.exit(0);
    })
    .catch(err => {
        console.error(err);
        process.exit(1);
    });

В стандартной библиотеке node.js 8 util появилась функция promisify(), которая как раз решает эту задачу.

const fs = require('fs');
const util = require('util');

const reader = util.promisify(fs.readdir);

reader('.')
    .then(list => {
        console.log(list);
        process.exit(0);
    })
    .catch(err => {
        console.error(err);
        process.exit(1);
    });

Функция promisify() придерживается соглашения:

  1. колбек в параметрах исходной функции всегда идет последним;
  2. первым параметром колбека приходит объект ошибки, если она есть.

Экспериментируя с этим новым методом я почти сразу натолкнулся на ситуацию, когда мой код переставал работать ожидаемым образом.

function Timer(delay) {
    this._delay = delay;
}

Timer.prototype.start = function (callback) {
    setTimeout(callback, this._delay);
};

const timer = new Timer(1000);
timer.start(() => {
    console.log('Finish');
});

Из метода start() я хочу сделать новую функцию, возвращающую мне промиз.

const start = util.promisify(timer.start);
start().then(() => {
    console.log('Finish');
});

Но этот фрагмент кода перестаёт работать. Он начинает выдавать ошибку:

TypeError: Cannot read property '_delay' of undefined

Проблема кроется в том, что метод внутри себя использует ссылку на экземпляр класса Timer — в частности, он вытаскиваем величину задержки. А так как метод был вызван вне контекста экземпляра таймера, то this оказалось не определено.

Если вернуться к примеру с колбэком, то следующий фрагмент кода тоже не будет работать и выдаст такую же ошибку.

const start = timer.start;
start(() => {
    console.log('Finish');
});

Исправить ситуацию можно несколькими путями.

Связать функцию с экземпляром

const start = util.promisify(timer.start).bind(timer);

Теперь не важно как мы вызовем функцию start(). Она всегда будет иметь экземпляр таймера в качестве контекста исполнения.

Сохранить функцию в экземпляр класса

timer.start = util.promisify(timer.start);
timer.start().then(() => {
    console.log('Finish');
});

Для данного экземпляра мы подменили реализацию метода из прототипа на новую функцию. Контекст исполнения будет передаваться естественным образом.

Развивая этот вариант дальше, новый метод можно сохранить в прототипе класса.

Timer.prototype.start = util.promisify(Timer.prototype.start);

Такой вариант опасен тем, что логика работы изменится для абсолютно всех экземпляров данного класса. Возможно, это не страшно для кода, который вы сами пишете и обслуживаете. Но, вряд-ли будет приемлемо для сторонних модулей.

Оставте свой комментарий

Знакомство с AWS Lambda

Amazon Web Services Lambda — это веб-сервис, запускающий ваш код на Node.js, Python или Java в ответ на определенные события и отвечающий за автоматическое выделение необходимых вычислительных ресурсов. Функции Lambda не хранят состояние, поэтому AWS Lambda может быстро запустить столько копий функции, сколько нужно для обработки входящих событий.

Оплата производится за количество вызовов функции и время, необходимое для исполнения кода этой функции. Изучение и маленькие домашние проекты для вас могут быть совершенно бесплатными в течение года с момента регистрации в AWS.

Создание пользователя

После регистрации в AWS нужно создать пользователя и назначить ему права доступа к сервисам.

Все инструкции вы найдёте в документации AWS Identity and Access Management «IAM Users». Сохраните файл с ключами — они потребуются при настройке CLI.

Только что созданный пользователь не имеет никаких прав. Чтобы он получил доступ к созданию и запуску функций Lambda, нужно назначить ему соответствующую политику. В разделе «Working with Policies» описано как добраться до списка политик. Нам нужно добавить для пользователя политику «AWSLambdaFullAccess». Позже аналогично можно будет добавить или удалить другие политики.

AWS Command Line Interface

Почти все функции AWS доступны через веб-интерфейс, но на практике удобнее использовать консольные команды или скрипты.

Инструкции по установке утилит вы найдёте на странице «AWS Command Line Interface». После того как вы проделаете все описанные там действия, запустите команду:

aws configure

и укажите ключи пользователя, которые вы получили при его создании. Я так же выбрал регион по-умолчанию «eu-central-1». В разных регионах доступны разные наборы сервисов.

Создание функции AWS Lambda

Для начала, напишем простой скрипт, который будет возвращать строку "Hello, world!". Так же в логах мы увидим с какими параметрами вызывалась функция.

'use strict';

exports.handler = (event, context, callback) => {
  console.log(JSON.stringify(event, null, 2));
  callback(null, 'Hello, world!');
};

Сохраним этот код в файл index.js.

В принципе, функция может быть любой сложности и использовать другие модули из npm. Все файлы потом будут запакованы в один архив.

Перед тем как загрузить функцию в AWS, нужно создать роль для неё. Если у вас уже существует подходящая роль, то этот шаг можно пропустить.

aws iam create-role \
  --role-name MyFirstLambda-Execution \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }]
  }' \
  --output text \
  --query 'Role.Arn'

В ответ вы получите строку вида:

arn:aws:iam::000000000000:role/MyFirstLambda-Execution

Так как наша функция будет вести логгирование, то нужно добавить ей соответствующую политику. Иначе лог не сохранится в CloudWatch.

aws iam attach-role-policy \
  --role-name MyFirstLambda-Execution \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

Вот теперь можно загрузить нашу функцию в Lambda.

zip - index.js | \
aws lambda create-function \
  --function-name MyFirstLambda \
  --runtime nodejs4.3 \
  --role arn:aws:iam::000000000000:role/MyFirstLambda-Execution \
  --handler index.handler \
  --zip-file fileb:///dev/stdin

Код функции будет архивироваться и загружаться в AWS без создания временных файлов. Так же код можно загружать через S3.

В параметре --handler указываем точку входа — название модуля (index) и метод, который экспортирует модуль.

В параметре --role указываем роль, которую мы создали для запуска функции.

Выполнение функции

Запустить функцию можно из терминала

aws lambda invoke \
  --function-name MyFirstLambda \
  /tmp/out.json && cat /tmp/out.json

Результат сохранится в файл /tmp/out.json, содержимое которого мы затем выведем в терминале.

aws lambda invoke \
  --function-name MyFirstLambda \
  --payload '{"foo":1,"bar":true}' \
  /tmp/out.json

Обновление кода и конфигурации функции

Обновляется код функции аналогично тому как она создавалась.

zip - index.js | \
aws lambda update-function-code \
  --function-name MyFirstLambda \
  --zip-file fileb:///dev/stdin

Если нужно обновить какие-то параметры конфигурации, то это делается командой update-function-configuration.

Заключение

С AWS Lambda можно легко получить масштабируемое окружение.

На практике функции вызываются по какому-либо событию. Например, запуск по расписанию, обновление таблицы в DynamoDB, появление файла в S3 или при поступлении HTTP-запроса в сервис API Gateway.

В свою очередь, код Lambda-функции может взаимодействовать с другими серверами AWS, внешними базами данных и веб-ресурсами.

Оставте свой комментарий

Шаблон тестирования Page Object

Функциональные тесты большинством воспринимаются как линейные сценарии: открой страничку, нажми эту кнопку, введи такой-то текст в поле ввода, нажми другую кнопку. Сценарии похожи на то, как действует реальный пользователь. Однако они плохи тем, что требуется повторять много однотипных действий в разных тестах, чтобы попасть в нужное исходное состояние. Если меняется содержимое станицы (например, разметка или классы у элементов), то нужно во все тестах внести соответствующие изменения.

В документации к Selenium описан другой подход — Page Object. Его особенность заключается в том, что страница представлена в виде модели с которой взаимодействует тест. Это уменьшает количество повторения кода. Доступ к элементам страницы осуществляется только в одном месте. Если поменяется разметка, то потребуется минимум изменений в коде Page Object, а тесты не нужно будет менять вообще.

Таким образом, с точки зрения кода функциональные тесты превращаются в тестирование сервиса, который знает всё о внутреннем устройстве всей страницы или её части и реализует публичные методы, выполняющие определённые действия.

Смотрите пример — https://gist.github.com/mistakster/597f110631fe8a5cde6b. Инфраструктура для запуска этого теста описана в заметке Node.js + mocha + selenium-webdriver.

Я тестирую главную страницу Яндекса. Она должна удовлетворять следующей спецификации:

Yandex home page
  √ should be valid (4857ms)
  √ should have at least 9 tabs above the search (4358ms)
  √ should have a weather widget (1562ms)

Сам тест выглядит очень просто и понятно. Вся магия скрыта в объекте Page и двух компонентах, которые должны быть на странице.

Когда мы открываем страницу, то не должны знать её адреса. Для разных окружений (локальная разработка, стейджинг, продакшин) он может быть разным. В тесте у Page Object просто вызывается соответствующий метод open(). Мы так же должны быть уверены, что страница открылась и на ней правильное содержимое. За это отвечает метод validate(). Итак, всю логику по взаимодействию с драйвером мы прячем в Page Object за компактным фасадом из методов.

Методы Page Object должны возвращать объекты других страниц или объекты компонент. Так осуществляются переход между страницами или в тесте можно получить доступ к отдельным блокам на странице. Например, в тесте можно получить список всех пунктов меню, которое расположено над поисковой строкой, или текущую погоду в вашем городе.

Комментарии к заметке: 2