Транзакции в Reindexer

Reindexer поддерживает транзакции. При их выполнении происходит атомарное обновление неймспейса. Транзакции поддерживаются при следующих вариантах взаимодействия с СУБД:

Стратегии коммитов транзакций

В Reindexer, в зависимости от объема изменений данных в рамках транзакции, применяется одна из следующих стратегий коммитов:

  1. Блокирующее атомарное обновление.

  2. Reindexer блокирует неймспейс на время выполнения транзакции и применяет все изменения под действием общей блокировки. Этот режим используется при небольших объемах изменения данных.

  3. Копирование и атомарная замена. В этом режиме Reindexer делает снимок неймспейса, применяет все изменения к полученному снимку, и заменяет неймспейс атомарно. Параллельно с выполнением транзакции возможно чтение данных из неймспейса, другие модифицирующие операции будут ждать ее завершения.

Управлять количеством данных, необходимых для выбора стратегии коммитов, можно в конфигурации неймспейса (служебный неймспейс #config) с помощью параметров:

  • start_copy_policy_tx_size — включает копирование неймспейса для транзакции с числом шагов, превышающим заданное значение (при условии, что параметр copy_politics_multiplier также разрешает это). Значение по умолчанию — 10000.

  • copy_politics_multiplier — отключает копирование, если размер неймспейса больше, чем copy_policy_multiplier * start_copy_tx_size. Значение по умолчанию 5.

  • tx_size_to_always_copy — включает принудительное копирование неймспейсов для транзакций с количеством шагов больше указанного значения. Значение по умолчанию — 100000.

Синхронный и асинхронный режим выполнения транзакций

В Reindexer поддерживается выполнение транзакций в синхронном и асинхронном режимах.

Синхронный режим

Ниже представлены примеры выполнения транзакций в синхронном режиме при использовании разных вариантов взаимодействия с Reindexer.

В Go для начала транзакции используется метод db.BeginTx(). Он создает транзакцию как объект, который предоставляет приложению интерфейсы Update/Upsert/Insert/Delete. Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.

    //  Создание нового объекта транзакции
	tx, err := db.BeginTx("items");
	if err != nil {
		panic(err)
	}
	// Заполнение объекта транзакции
	tx.Upsert(&Item{ID: 100})
	tx.Upsert(&Item{ID: 101})
	tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()
	// Применение транзакции (коммит)
	if err := tx.Commit(); err != nil {
		panic(err)
	}

При использовании транзакций в кластерном режиме, разумно создавать их через экземпляр Rx с установленным тайм-аутом - для предотвращения “зависаний” в случаях несогласованности Raft-кластера.

Примечание!: В текущей Go реализации Rx, транзакции, создаваемые подобным способом, будут иметь ограничение на общую длительность выполнения (txTimeoutSeconds из примера ниже), т.е. вся транзакция от момента BeginTx до tx.Commit() или tx.Rollback() должна укладываться в указанный период.

    //  Создание нового тайм-аут контекста. Стоит учитывать, что deadline рассчитывается в момент создания контекста
	txTimeoutSeconds := 10
	ctx, cancel := context.WithTimeout(context.Background(), txTimeoutSeconds * time.Second)
	defer cancel()

	//  Создание нового объекта транзакции
	tx, err := db.WithContext(ctx).BeginTx("items");
	if err != nil {
		panic(err)
	}
	// Заполнение объекта транзакции
	tx.Upsert(&Item{ID: 100})
	tx.Upsert(&Item{ID: 101})
	tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()
	// Применение транзакции (коммит)
	if err := tx.Commit(); err != nil {
		panic(err)
	}

Пример работы с транзакциями при взаимодействии с БД по HTTP:

  1. Создание транзакции для неймспейса items базы данных testdb:
curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/namespaces/items/transactions/begin'

При успешном создании транзакции в ответ приходит ее идентификатор (tx_id), который необходим для выполнения запросов в рамках транзакции и ее коммита либо отката:

{
  "tx_id": "iKFdsSDNGRzQLUioPSsl_424347868677"
}

Замечание: Для всех операций с транзакциями через протокол HTTP применяется тайм-аут http_write_timeout для предотвращения «зависаний» в случае несогласованности Raft-кластера. Общая длительность транзакции не ограничивается, однако накладывается ограничение в tx_idle_timeout секунд на периоды отсутствия активности между одиночными операциями (такими, как добавление данных в существующую транзакцию, ее коммит или откат) при превышении которого, транзакция будет считаться недействительной - просроченной.

  1. Выполнение запросов в рамках транзакции:

Добавление новой записи в неймспейс:

curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
--header 'Content-Type: text/plain' \
--data-raw '{
  "ID": 4,
  "Name": "Article",
  "Articles": [
    1,
    2
  ],
  "Year": 2022
}'

Обновление существующей записи (в данном случае — изменение значение поля Name и наполнение значениями массива Articles):

curl --location --request PUT 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
--header 'Content-Type: text/plain' \
--data-raw '{
  "ID": 2,
  "Name": "New_Article",
  "Articles": [
    1,
    2
  ],
  "Year": 2022
}'

Удаление записи из неймспейса:

curl --location --request DELETE 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
--header 'Content-Type: text/plain' \
--data-raw '{
  "ID": 1
}'
  1. Коммит транзакции:
curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/commit'

Асинхронный режим выполнения транзакций

Асинхронный режим доступен при использовании Go, Java и .NET-коннекторов для Reindexer и может использоваться для ускорения массовой вставки записей в неймспейсы БД. Ускорение достигается за счёт того, что клиент не ожидает ответа от сервера на каждое добавление шага в транзакцию. Такой подход обеспечивает быстрое заполнение транзакции в случае cproto-соединения. Для Built-in-режима выигрыша по скорости не будет.

Пример выполнения транзакции в асинхронном режиме на Go:

    // Создание нового объекта транзакции
	tx, err := db.BeginTx("items");
	if err != nil {
		panic(err)
	}
	// Заполнение объекта асинхронной транзакции запросами.
	tx.UpsertAsync(&Item{ID: 100},func(err error) {})
	tx.UpsertAsync(&Item{ID: 100},func(err error) {})
	// Ожидание выполнения запросов и завершение транзакции.
	if err := tx.Commit(); err != nil {
		panic(err)
	}

Второй аргумент в UpsertAsync — функция завершения, которая вызывается после получения ответа на запрос от сервера. Если при подготовке транзакции произошла какая-либо ошибка, tx.Commit() вернет ошибку. Поэтому, чтобы проверить, зафиксировались ли вносимые в рамках транзакции изменения, достаточно убедиться, что tx.Commit() не вернул ошибку.

Особенности реализации транзакций в Reindexer, ограничения

  1. Объект транзакции не является потокобезопасным и не может использоваться из разных потоков.

  2. Объект транзакции потребляет ресурсы. Поэтому, чтобы избежать утечки ресурсов, приложение должно явно вызывать метод для отката транзакции (например, tx.rollback() для Go либо POST-запрос /db/{database}/transactions/{tx_id}/rollback — для HTTP) или для коммита (например, tx.commit() для Go либо POST-запрос /db/{database}/transactions/{tx_id}/commit — для HTTP).

  3. После коммита можно безопасно вызвать метод для отката транзакции.

  4. Изнутри транзакций возможно вызывать произвольные запросы с помощью соответствующих методов для разных языков (например, tx.query().execute() ... для Go).

  5. Все транзакции работают с уровнем изоляции serialized. Каждая транзакции начинает модифицировать данные только в момент вызова Commit, при этом все изменения вносятся под эксклюзивной блокировкой неймспейса.

  6. Для Go, транзакции, создаваемые через экземпляр Rx c установленным тайм-аутом, будут иметь ограничение на общую длительность выполнения соответствующую тайм-ауту, т.е. вся транзакция от момента BeginTx до tx.Commit() или tx.Rollback() должна укладываться в указанный период.