Подписка на обновления БД

В текущий момент реализация интерфейса подписки присутвует только в Go-клиенте.

Пользовательское приложение может получать уведомления об изменениях, происходящих в БД, к которой оно подключено. Этот механизм может быть полезен в ситуациях, когда с одним инстансом Reindexer работают несколько приложений одновременно. Например, по событию от БД приложение может сбрасывать какие-то внутренние кеши или отправлять нотификацию.

Для Go создание потока эвентов выглядит следующим образом:

import events "git.restream.ru/itv-backend/reindexer/v5/events"

// Параметры подписки, определяющие, на какие типы событий требуется подписаться (в данном случае - все события одиночных модификаций документов и комиты транзакций)
opts := events.DefaultEventsStreamOptions().WithDocModifyEvents().WithTransactionCommitEvents()
// Создание потока событий
stream := db.Subscribe(opts)
if err := stream.Error(); err != nil {
	panic(err)
}
// При удалении для объекта Stream должен быть вызван Close(). По смыслу Close() означает "отписаться от событий"
defer stream.Close(context.Background())

for {
	// Чтение событий в цикле
	event, ok := <- stream.Chan()
	if !ok {
		// Канал событий был закрыт
		fmt.Printf("Reindexer events stream was invalidated: %v\n", stream.Error())
		break
	}
	// Тут может быть логика обработки событий
	...
}

Одиночный объект Go-байндинга поддерживает до 32 потоков событий. Все потоки обрабатываются через одно общее подключение (для cproto и ucproto) или через одну общую горутину (для builtin и builtinserver).

В случае возникновения критических ошибок (например, при обрыве соединения) канал, возвращаемый из Chan() будет закрыт автоматически. Для возобновления подписки требуется заново вызвать Subscribe().

Для конфигурирования параметров подписки используется объект EventsStreamOptions. Он позволяет подписаться на конкретные типы событий или на отдельные неймспейсы (в том числе на системный #config-неймспейс). В файле streamopts.go есть набор вспомогательных функций, позволяющих выбрать какую-то отдельную группу событий. При необходимости отдельные типы событий могут быть заданы явно через метод WithEvents. С параметрами по умолчанию поток событий будет получать все события для всех неймспейсов за исключением неймспейса #config.

Помимо этого EventsStreamOptions позволяет настроить метаинформацию, отправляемую вместе с событием: LSN, имя БД, метку времени и т.д. Поддержка отправки содержимого документа (JSON/CJSON) вместе с событием пока что не реализована.

Ядро Reindexer использует внутреннюю очередь для агрегирования и рассылки событий подписчикам. Объекты событий являются общими для очереди подписки и очереди репликации. Максимальный размер этих очередей задаётся через параметры maxupdatessize(для standalone), MaxUpdatesSizeBytes(для builtinserver) и WithMaxUpdatesSize()(для builtin). По умолчанию размер очереди ограничен 1 ГБ. Если очередь превышает заданный размер (например, обновления не успевают обработаться из-за сетевых проблем), события из очереди удаляются, а подписчикам отправляется событие EventTypeUpdatesDrop.