Транзакции
Reindexer поддерживает транзакции. При их выполнении происходит атомарное обновление неймспейса. Транзакции поддерживаются при следующих вариантах взаимодействия с СУБД:
- по HTTP,
- через коннектор для Go,
- через коннектор для Python,
- через коннектор для Java,
- через коннектор для .NET.
Стратегии коммитов транзакций
В Reindexer, в зависимости от объема изменений данных в рамках транзакции, применяется одна из следующих стратегий коммитов:
- Блокирующее атомарное обновление.
- Reindexer блокирует неймспейс на время выполнения транзакции и применяет все изменения под действием общей блокировки. Этот режим используется при небольших объемах изменения данных.
- Копирование и атомарная замена. В этом режиме Reindexer делает снимок неймспейса, применяет все изменения к полученному снимку, и заменяет неймспейс атомарно. Параллельно с выполнением транзакции возможно чтение данных из неймспейса, другие модифицирующие операции будут ждать ее завершения.
Управлять количеством данных, необходимых для выбора стратегии коммитов, можно в конфигурации неймспейса (служебный неймспейс #config) с помощью параметров:
start_copy_policy_tx_size— включает копирование неймспейса для транзакции с числом шагов, превышающим заданное значение (при условии, что параметрcopy_politics_multiplierтакже разрешает это). Значение по умолчанию —10000;copy_politics_multiplier— отключает копирование, если размер неймспейса больше, чемcopy_policy_multiplier*start_copy_tx_size. Значение по умолчанию5;tx_size_to_always_copy— включает принудительное копирование неймспейсов для транзакций с количеством шагов больше указанного значения. Значение по умолчанию —100000.
Синхронный и асинхронный режим выполнения транзакций
В Reindexer поддерживается выполнение транзакций в синхронном и асинхронном режимах.
Синхронный режим
Ниже представлены примеры выполнения транзакций в синхронном режиме при использовании разных вариантов взаимодействия с Reindexer.
В Go для начала транзакции используется метод db.BeginTx().
Он создает транзакцию как объект, который предоставляет приложению интерфейсы Update/Upsert/Insert/Delete.
Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.
// Создание нового объекта транзакции
tx, err := db.BeginTx("items");
if err != nil {
panic(err)
}
// Заполнение объекта транзакции
tx.Upsert(&Item{ID: 100})
tx.Upsert(&Item{ID: 101})
tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()
// Применение транзакции (коммит)
if err := tx.Commit(); err != nil {
panic(err)
}
При использовании транзакций в кластерном режиме, разумно создавать их через экземпляр Rx с установленным тайм-аутом — для предотвращения «зависаний» в случаях несогласованности Raft-кластера.
Примечание!: В текущей Go-реализации Rx, транзакции, создаваемые подобным способом, будут иметь ограничение на общую длительность выполнения (txTimeoutSeconds из примера ниже), т. е. вся транзакция от момента BeginTx до tx.Commit() или tx.Rollback() должна укладываться в указанный период.
// Создание нового тайм-аут контекста. Стоит учитывать, что deadline рассчитывается в момент создания контекста
txTimeoutSeconds := 10
ctx, cancel := context.WithTimeout(context.Background(), txTimeoutSeconds * time.Second)
defer cancel()
// Создание нового объекта транзакции
tx, err := db.WithContext(ctx).BeginTx("items");
if err != nil {
panic(err)
}
// Заполнение объекта транзакции
tx.Upsert(&Item{ID: 100})
tx.Upsert(&Item{ID: 101})
tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()
// Применение транзакции (коммит)
if err := tx.Commit(); err != nil {
panic(err)
}
В Python для начала транзакции используется метод db.new_transaction().
Он создает транзакцию как объект, который предоставляет приложению интерфейсы Update/Upsert/Insert/Delete.
Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.
# Создание нового объекта транзакции
tx = db.new_transaction("actors"),
# Заполнение объекта транзакции
tx.upsert({ "id": 100 })
tx.upsert({ "id": 101, "name": "Stasya" })
tx.new_query("payments")
.where("id", CondType.CondEq, 101)
.set("name", ["Petya"])
.update()
# Применение транзакции (коммит)
tx.commit()
При использовании транзакций в кластерном режиме, разумно создавать их с установленным тайм-аутом — для предотвращения «зависаний» в случаях несогласованности Raft-кластера.
# Создание нового объекта транзакции
tx = db.new_transaction("actors", timedelta(milliseconds=10000))
# Заполнение объекта транзакции
tx.upsert({ "id": 100 })
tx.upsert({ "id": 101, "name": "Stasya" })
tx.new_query("payments")
.where("id", CondType.CondEq, 101)
.set("name", ["Petya"])
.update()
# Применение транзакции (коммит)
tx.commit()
В Java для начала транзакции используется метод db.beginTransaction().
Он создает транзакцию как объект, который предоставляет приложению интерфейсы update/upsert/insert/delete.
Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.
import java.util.concurrent.*;
ExecutorService executor = Executors.newSingleThreadExecutor();
final int txTimeoutSeconds = 10;
try {
Future<Void> future = executor.submit(() -> {
// Создание нового объекта транзакции
Transaction<Item> tx = db.beginTransaction("items", Item.class);
try {
// Заполнение объекта транзакции
tx.upsert(new Item(100));
tx.upsert(new Item(101));
tx.query().where("id", EQ, 102)
.set("name", "Petya")
.update();
// Применение транзакции (коммит)
tx.commit();
} catch (Exception e) {
tx.rollback();
throw e;
}
return null;
});
future.get(txTimeoutSeconds, TimeUnit.SECONDS);
System.out.println("Transaction committed successfully");
} catch (TimeoutException e) {
System.err.println("Transaction timed out");
} catch (InterruptedException | ExecutionException e) {
System.err.println("Transaction failed: " + e.getCause().getMessage());
} finally {
executor.shutdown();
}
Пример работы с транзакциями при взаимодействии с БД по HTTP:
-
Создание транзакции для неймспейса
itemsбазы данныхtestdb:curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/namespaces/items/transactions/begin'При успешном создании транзакции в ответ приходит ее идентификатор (
tx_id), который необходим для выполнения запросов в рамках транзакции и ее коммита либо отката:{ "tx_id": "iKFdsSDNGRzQLUioPSsl_424347868677" }Замечание: Для всех операций с транзакциями через протокол HTTP применяется тайм-аут
http_write_timeoutдля предотвращения «зависаний» в случае несогласованности Raft-кластера. Общая длительность транзакции не ограничивается, однако накладывается ограничение вtx_idle_timeoutсекунд на периоды отсутствия активности между одиночными операциями (такими, как добавление данных в существующую транзакцию, ее коммит или откат) при превышении которого, транзакция будет считаться недействительной — просроченной. -
Выполнение запросов в рамках транзакции:
Добавление новой записи в неймспейс:
curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \ --header 'Content-Type: text/plain' \ --data-raw '{ "ID": 4, "Name": "Article", "Articles": [ 1, 2 ], "Year": 2022 }'Обновление существующей записи (в данном случае — изменение значение поля
Nameи наполнение значениями массиваArticles):curl --location --request PUT 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \ --header 'Content-Type: text/plain' \ --data-raw '{ "ID": 2, "Name": "New_Article", "Articles": [ 1, 2 ], "Year": 2022 }'Удаление записи из неймспейса:
curl --location --request DELETE 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \ --header 'Content-Type: text/plain' \ --data-raw '{ "ID": 1 }' -
Коммит транзакции:
curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/commit'
Асинхронный режим выполнения транзакций
Асинхронный режим доступен при использовании Go, Java и .NET-коннекторов для Reindexer и может использоваться для ускорения массовой вставки записей в неймспейсы БД.
Ускорение достигается за счёт того, что клиент не ожидает ответа от сервера на каждое добавление шага в транзакцию.
Такой подход обеспечивает быстрое заполнение транзакции в случае cproto-соединения. Для Built-in-режима выигрыша по скорости не будет.
Примеры выполнения транзакции в асинхронном режиме:
// Создание нового объекта транзакции
tx, err := db.BeginTx("items");
if err != nil {
panic(err)
}
// Заполнение объекта асинхронной транзакции запросами
tx.UpsertAsync(&Item{ID: 100},func(err error) {})
tx.UpsertAsync(&Item{ID: 100},func(err error) {})
// Ожидание выполнения запросов и завершение транзакции
if err := tx.Commit(); err != nil {
panic(err)
}
В настоящий момент Python-коннектор не поддерживает асинхронный режим выполнения транзакций.
// Создание нового объекта транзакции
Transaction<Item> tx = db.beginTransaction("items", Item.class);
try {
// Заполнение объекта асинхронной транзакции запросами
tx.upsertAsync(new Item(100));
tx.upsertAsync(new Item(101))
.thenAccept(item -> {
if (item != null) {
System.out.println("Successfully processed item: " + item.getName());
}
})
.exceptionally(e -> {
System.err.println("Error in async operation: " + e.getMessage());
return null;
});
// Ожидание выполнения запросов и завершение транзакции.
tx.commit();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
tx.rollback();
System.err.println("Transaction failed: " + e.getMessage());
}
Второй аргумент в UpsertAsync — функция завершения, которая вызывается после получения ответа на запрос от сервера.
Если при подготовке транзакции произошла какая-либо ошибка, tx.Commit() вернет ошибку.
Поэтому, чтобы проверить, зафиксировались ли вносимые в рамках транзакции изменения, достаточно убедиться, что tx.Commit() не вернул ошибку.
Особенности реализации транзакций в Reindexer, ограничения
- Объект транзакции не является потокобезопасным и не может использоваться из разных потоков.
- Объект транзакции потребляет ресурсы. Поэтому, чтобы избежать утечки ресурсов, приложение должно явно вызывать метод для отката транзакции (например,
tx.Rollback()для Go либо POST-запрос/db/{database}/transactions/{tx_id}/rollbackдля HTTP) или для коммита (например,tx.Commit()для Go либо POST-запрос/db/{database}/transactions/{tx_id}/commitдля HTTP). - После коммита можно безопасно вызвать метод для отката транзакции.
- Изнутри транзакций возможно вызывать произвольные запросы с помощью соответствующих методов для разных языков (например,
tx.query().Exec() ...для Go). - Все транзакции работают с уровнем изоляции
serialized. Каждая транзакции начинает модифицировать данные только в момент вызоваCommit, при этом все изменения вносятся под эксклюзивной блокировкой неймспейса. - Для Go, транзакции, создаваемые через экземпляр Rx c установленным тайм-аутом, будут иметь ограничение на общую длительность выполнения соответствующую тайм-ауту, т. е. вся транзакция от момента
BeginTxдоtx.Commit()илиtx.Rollback()должна укладываться в указанный период. - В текущей реализации транзакции возвращают только количество затронутых документов, но не их содержимое. Таким образом, например, при использовании
serial()(или других прецептов) в транзакции не выйдет получить присвоенное им значение непосредственно из результатовtx.Commit().