Синхронная репликация в RAFT-кластере

В Reindexer реализована синхронная репликация с поддержкой RAFT-like алгоритма консенсуса выборов лидера. Она совместима с асинхронной репликацией (подробнее — в разделе «Комбинирование асинхронной и синхронной репликации»).

В качестве узла в кластере при синхронной репликации может выступать сервер Reindexer Standalone или приложение с Built-in server Reindexer.

Роли узлов в кластере

Сервер Reindexer, входящий в состав кластера при синхронной репликации, может иметь одну из трех ролей:

Роль Описание роли
Leader Координирует любые операции вставки/изменения/удаления данных. Операция изменения считается законченной, если данные успешно доставлены на N/2+1 узлов кластера (где N — общее количество узлов)
Follower Данные узла доступны только на чтение. Запросы на запись будут автоматически проксированы на лидера
Candidate Узел находится в состоянии выдвижения себя новым Leader-ом (если Leader существует, он останется в своем статусе, а этот узел станет Follower-ом)

Роли динамически меняются в процессе работы кластера в соответствии с алгоритмом выбора.

Выборы лидера в кластере

Любой узел из кластера может инициировать новый терм выборов Leader-а. Происходит это в следующих случаях:

  • Если нода в течении 1.5 секунды не получает ping-сообщения (LeadersPing в RPC-логе) от Leader-а.
  • Если репликация на ноде только что включилась и у нее еще нет никакой информации о Leader-е.

Каждая итерация выборов происходит в следующем порядке:

  1. Инициировавшая выборы нода переходит в роль Candidate.

  2. Candidate инкрементирует свой номер терма.

  3. Candidate голосует сам за себя и рассылает членам кластера запросы для голосования. Все члены кластера в рамках одного цикла выборов голосуют только один раз: за себя (если это они инициировали этот цикл), либо за первую ноду, от которой был получен запрос на голосование. Если нода считает, что в данный момент Leader доступен (от него недавно получено ping-сообщение), она проголосует за своего текущего Leader-а.

  4. Если эта нода достигает консенсуса (получает большинство голосов), она становится Leader-ом, а все остальные члены кластера получают статус Follower.

  5. В противном случае она ожидает сообщение от уже действующего Leader-а текущего терма или от другой ноды более старшего терма. И при получении такого сообщения переходит в статус Follower.

  6. Если в рамках этого цикла Leader-а выбрать не удалось, новый цикл инициируется после рандомизированной задержки.

Работа ноды в статусе Follower

Нода со статусом Follower отслеживает доступность Leader-а. Для этого используются отметки времени последнего полученного ping-сообщения. Если Leader недоступен, Follower инициирует новый цикл выборов.

Нода, получившая статус Follower, меняет содержимое объекта clusterization_status для каждого своего неймспейса, участвующего в синхронной репликации, следующим образом:

  • в leader_id записывается id текущего Leader-а,
  • в role записывается роль cluster_replica (означает, что нода — один из Follower-ов в синхронном RAFT-кластере).

С этого момента:

  1. Все неймспейсы ноды, участвующие в синхронной репликации, переходят в статус read-only для любых внешних адресатов, кроме Leader-а кластера.
  2. Все запросы на запись или модификацию данных проксируются на Leader и выполняются на нем.
  3. Запросы, не требующие доступа на запись (например, на выборку данных), выполняются локально на ноде.
  4. Запросы к служебным неймспейсам выполняются на ноде.

Работа ноды в статусе Leader

Нода, получившая статус Leader, начинает с определенной периодичностью рассылать ping-сообщения для всех Follower-ов. Если какой-то из них не отвечает на запрос, Leader считает его недоступным. Если недоступно N/2 и более нод (где N — количество узлов в кластере), Leader инициирует новые выборы.

Получив статус Leader, нода не начинает сразу же реплицировать данные на Follower-ов. Сначала происходит начальная синхронизация initial sync. В ходе нее Leader собирает наиболее актуальные данные для каждого неймспейса, исходя из их LSN, по крайней мере от N/2 других узлов кластера.

После завершения начальной синхронизации Leader синхронизирует данные на Follower-ах. На этом этапе применяются 3 механизма (по аналогии с асинхронной репликацией): полная синхронизация, WAL-синхронизация и онлайн-обновления.

Leader реплицирует данные на Follower-ов до тех пор, пока не получит запрос на выборы в ручном режиме или не возникнет ошибка в консенсусе при записи данных. В обоих этих случаях нода инициирует новый цикл выборов.

Гарантии сохранности данных при синхронной репликации в Reindexer

Сохранность данных при синхронной репликации в Reindexer обеспечиваются за счет следующих подходов:

  1. Консенсус для каждой записи Для обеспечения безопасности данных Reindexer использует алгоритм консенсуса. При каждой записи на Leader производится записи на все Follower-ы. Чтобы операция по изменению данных применилась, она должна получить подтверждение не менее чем от N/2 Follower-ов. Follower не может подтвердить обновление, если WAL/force-синхронизация не завершена.

  2. Гарантии read-after-write Операция по добавлению или изменению данных, которая была проксирована с Follower на Leader, не будет в общем применена, пока не получит подтверждения от самого же Follower-а-отправителя. Поэтому, если данные добавляются или изменяются через какую-то ноду кластера, после применения изменений они могут быть прочитаны с нее же.

  3. Соблюдение порядка параллельных записей В целях оптимизации в Reindexer доступны параллельные записи, даже если некоторые из операций записи из других потоков ждут консенсуса прямо сейчас. При этом гарантируется, что все параллельные записи будут выполнены на всех узлах синхронного кластера в одном и том же порядке.

Настройка синхронной репликации в RAFT-кластере

Синхронную репликацию можно использовать для режимов Standalone и Built-in server.

На текущий момент синхронная репликация может быть сконфигурирована только в момент запуска сервера при помощи конфигурационных файлов.

Для этого в сторадж каждого из узлов кластера должны быть добавлены файлы конфигурации cluster.conf (содержимое должной быть одинаковым для всех нод) и replication.conf (server_id должен быть уникален для каждой ноды).

Конфигурация c помощью YML-файлов

При запуске сервера из файлов, которые должны быть размещены в директории с базой данных, считываются:

  • Общие настройки репликации. Они указываются в файле replication.conf (пример). Здесь cluster_id (id кластера) должен быть одинаковым для всех нод из состава одного кластера, а server_id (уникальный идентификатор сервера) должен быть уникальным для каждой ноды. Также эти параметры могут быть настроены в объекте replication служебного неймспейса #config.

  • Специальные параметры синхронной репликации. Они указываются в файле cluster.conf (пример с описанием параметров). Содержимое этого файла должно быть одинаковым для всех нод, входящих в состав кластера.

Проверка статуса и получение статистики

Статистические данные по репликации хранятся в служебном неймспейсе #replicationstats. Для их получения выполните запрос вида:

Reindexer> SELECT * FROM #replicationstats where type='cluster'
SELECT * FROM #replicationstats where type = 'cluster'
curl --location 'http://127.0.0.1:9098/api/v1/db/new_db/query' \
--header 'Content-Type: application/json' \
--data '{
  "namespace": "#replicationstats",
  "type": "select",
  "filters": [
    {
      "field": "type",
      "cond": "EQ",
      "value": "cluster"
    }
  ]
}'

Запрос к неймспейсу #replicationstats для получения статистических сведений по репликации должен содержать условие с фильтрацией по полю type со значением cluster или async. Если указано условие type='cluster', запрос проксируется на Leader кластера.

В ответ вернется JSON-объект вида:

{
  "type": "cluster",
  "initial_sync": {
    "force_syncs": {
      "count": 0,
      "max_time_us": 0,
      "avg_time_us": 0
    },
    "wal_syncs": {
      "count": 0,
      "max_time_us": 0,
      "avg_time_us": 0
    },
    "total_time_us": 2806
  },
  "force_syncs": {
    "count": 0,
    "max_time_us": 0,
    "avg_time_us": 0
  },
  "wal_syncs": {
    "count": 0,
    "max_time_us": 0,
    "avg_time_us": 0
  },
  "update_drops": 0,
  "pending_updates_count": 275,
  "allocated_updates_count": 275,
  "allocated_updates_size": 43288,
  "nodes": [
    {
      "dsn": "cproto://127.0.0.1:14000/node0",
      "server_id": 0,
      "pending_updates_count": 60,
      "status": "offline",
      "sync_state": "awaiting_resync",
      "role": "none",
      "is_synchronized": false,
      "namespaces": []
    },
    {
      "dsn": "cproto://127.0.0.1:14002/node2",
      "server_id": 2,
      "pending_updates_count": 0,
      "status": "online",
      "sync_state": "online_replication",
      "role": "follower",
      "is_synchronized": true,
      "namespaces": []
    },
    {
      "dsn": "cproto://127.0.0.1:14003/node3",
      "server_id": 3,
      "pending_updates_count": 0,
      "status": "online",
      "sync_state": "online_replication",
      "role": "follower",
      "is_synchronized": true,
      "namespaces": []
    },
    {
      "dsn": "cproto://127.0.0.1:14004/node4",
      "server_id": 4,
      "pending_updates_count": 275,
      "status": "online",
      "sync_state": "online_replication",
      "role": "follower",
      "is_synchronized": true,
      "namespaces": []
    },
    {
      "dsn": "cproto://127.0.0.1:14001/node1",
      "server_id": 1,
      "pending_updates_count": 0,
      "status": "online",
      "sync_state": "online_replication",
      "role": "leader",
      "is_synchronized": true,
      "namespaces": []
    }
  ]
}

Здесь:

  • initial_sync — статистика по начальным синхронизациям initial sync для Leader (доступна только для синхронной репликации):

    • force_sync — статистика полных синхронизаций force-sync:

      • count — количество синхронизаций,
      • max_time_us — максимальное время синхронизации неймспейса,
      • avg_time_us — среднее время синхронизации неймспейса,
    • wal_sync — статистика WAL-синхронизаций:

    • total_time_us — общее время, затраченное на начальные синхронизации.

  • force_syncs — глобальная статистика по полным синхронизациям неймспейсов Follower-ов:

  • wal_syncs — глобальная статистика по WAL-синхронизациям неймспейсов Follower-ов.

  • update_drops — количество переполнений буфера обновлений. Обновления сначала собираются в очередь, а затем отправляются по сети. Если количество обновлений в очереди в какой-то момент превышает ограничение, заданное при запуске сервера, очередь очищается и происходит синхронизация при помощи других механизмов.

  • pending_updates_count — количество обновлений в очереди, ожидающих репликации.

  • allocated_updates_count — количество обновлений в очереди, включая уже реплицированные, но еще не удаленные.

  • allocated_updates_size — общий размер обновлений, указанных в allocated_updates_count, в байтах.

  • nodes — статистика по каждому узлу кластера:

    • dsn — DSN узла,

    • server_id — уникальный идентификатор сервера (ноды),

    • pending_updates_count — количество обновлений в очереди, ожидающих репликации на данного Follower-а,

    • status — статус подключения узла к сети. Возможные значения: none, online, offline, raft_error,

    • sync_state — статус синхронизации/репликации узла. Возможные значения:

      • none,
      • online_replication,
      • awaiting_resync,
      • syncing,
      • initial_leader_sync.
    • role — роль ноды в репликации. Возможные значения:

      • none,
      • follower,
      • leader,
      • candidate.
    • is_synchronized — состояние синхронизации. Показывает, были ли все утвержденные обновления реплицированы на этот узел,

    • namespaces — неймспейсы, для которых настроена репликация на этом узле.

Ручное назначение Leader-а в RAFT-кластере

Для ручного назначения Leader-а необходимо выполнить команду посредством записи action в служебный неймспейс #config. Имя команды — set_leader_node, параметр — server_id нового Leader-а. В случае невозможности назначить нового Leader-а в течении времени T Leader-ом остается текущий.

Пример команды для ручного назначения Leader-а через reindexer_tool:

Reindexer> \upsert #config { "type":"action","action":{ "command":"set_leader_node", "server_id": 2 } }

Ограничения синхронной репликации

Реализация синхронной репликации в Reindexer имеет ряд особенностей и ограничений:

  1. Отсутствие глобального WAL Для каждого неймспейса используется индивидуальный журнал упреждающей записи. Из-за этого такие операции, как переименование или удаление неймспейса, не могут быть должным образом реплицированы при полной синхронизации (force sync) и WAL-синхронизации. В результате могут возникать ситуации, когда новый Leader в ходе начальной синхронизации (initial sync) восстанавливает удаленные неймспейсы.

  2. Начальная синхронизация без загрузки «дополнительных» неймспейсов При начальной синхронизации (initial sync) «дополнительные» неймспейсы не загружаются с Follower-ов на Leader-а и в то же время не удаляются с Follower-ов. Из-за этого возможны ситуации, когда на некоторых Follower-ах будут неймспейсы, которые не существуют больше ни на каких узлах RAFT-кластера. Возникают такие ситуации в случае, если нода с «дополнительными» неймспейсами была отключена во время выборов Leader-а. Например, есть кластер из трех узлов: два — пустые, третий — с какими-то данными в «дополнительном» неймспейсе (в одной или нескольких). Два пустых узла будут запускаться быстрее и смогут выбрать Leader-а, пока третий считывает свои данные и остается в автономном режиме. После его запуска, в ходе initial sync «дополнительный» неймспейс не будет перенесена на уже выбранного Leader-а.

  3. Отсутствие проверки соответствия id ноды в настройках Если id сервера в конфигурации кластера не соответствует фактическому id (в общих настройках репликации), это может привести к неопределенному поведению. В настоящее время в Reindexer не реализована автоматическая проверка соответствия этих id.

  4. Настройка синхронной репликации через YML-файл На данный момент в Reindexer нет возможности изменения настроек синхронной репликации по HTTP/RPC. Единственный способ — изменение YML-файла конфигурации с перезапуском сервера.