Транзакции

Reindexer поддерживает транзакции. При их выполнении происходит атомарное обновление неймспейса. Транзакции поддерживаются при следующих вариантах взаимодействия с СУБД:

Стратегии коммитов транзакций

В Reindexer, в зависимости от объема изменений данных в рамках транзакции, применяется одна из следующих стратегий коммитов:

  1. Блокирующее атомарное обновление.
  2. Reindexer блокирует неймспейс на время выполнения транзакции и применяет все изменения под действием общей блокировки. Этот режим используется при небольших объемах изменения данных.
  3. Копирование и атомарная замена. В этом режиме Reindexer делает снимок неймспейса, применяет все изменения к полученному снимку, и заменяет неймспейс атомарно. Параллельно с выполнением транзакции возможно чтение данных из неймспейса, другие модифицирующие операции будут ждать ее завершения.

Управлять количеством данных, необходимых для выбора стратегии коммитов, можно в конфигурации неймспейса (служебный неймспейс #config) с помощью параметров:

  • start_copy_policy_tx_size — включает копирование неймспейса для транзакции с числом шагов, превышающим заданное значение (при условии, что параметр copy_politics_multiplier также разрешает это). Значение по умолчанию — 10000;
  • copy_politics_multiplier — отключает копирование, если размер неймспейса больше, чем copy_policy_multiplier * start_copy_tx_size. Значение по умолчанию 5;
  • tx_size_to_always_copy — включает принудительное копирование неймспейсов для транзакций с количеством шагов больше указанного значения. Значение по умолчанию — 100000.

Синхронный и асинхронный режим выполнения транзакций

В Reindexer поддерживается выполнение транзакций в синхронном и асинхронном режимах.

Синхронный режим

Ниже представлены примеры выполнения транзакций в синхронном режиме при использовании разных вариантов взаимодействия с Reindexer.

В Go для начала транзакции используется метод db.BeginTx(). Он создает транзакцию как объект, который предоставляет приложению интерфейсы Update/Upsert/Insert/Delete. Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.

// Создание нового объекта транзакции
tx, err := db.BeginTx("items");
if err != nil {
	panic(err)
}

// Заполнение объекта транзакции
tx.Upsert(&Item{ID: 100})
tx.Upsert(&Item{ID: 101})
tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()

// Применение транзакции (коммит)
if err := tx.Commit(); err != nil {
	panic(err)
}

При использовании транзакций в кластерном режиме, разумно создавать их через экземпляр Rx с установленным тайм-аутом — для предотвращения «зависаний» в случаях несогласованности Raft-кластера.

Примечание!: В текущей Go-реализации Rx, транзакции, создаваемые подобным способом, будут иметь ограничение на общую длительность выполнения (txTimeoutSeconds из примера ниже), т. е. вся транзакция от момента BeginTx до tx.Commit() или tx.Rollback() должна укладываться в указанный период.

// Создание нового тайм-аут контекста. Стоит учитывать, что deadline рассчитывается в момент создания контекста

txTimeoutSeconds := 10
ctx, cancel := context.WithTimeout(context.Background(), txTimeoutSeconds * time.Second)
defer cancel()

// Создание нового объекта транзакции
tx, err := db.WithContext(ctx).BeginTx("items");
if err != nil {
	panic(err)
}

// Заполнение объекта транзакции
tx.Upsert(&Item{ID: 100})
tx.Upsert(&Item{ID: 101})
tx.Query().WhereInt("id", reindexer.EQ, 102).Set("Name", "Petya").Update()

// Применение транзакции (коммит)
if err := tx.Commit(); err != nil {
	panic(err)
}

В Python для начала транзакции используется метод db.new_transaction(). Он создает транзакцию как объект, который предоставляет приложению интерфейсы Update/Upsert/Insert/Delete. Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.

# Создание нового объекта транзакции
tx = db.new_transaction("actors"),

# Заполнение объекта транзакции
tx.upsert({ "id": 100 })
tx.upsert({ "id": 101, "name": "Stasya" })

tx.new_query("payments")
    .where("id", CondType.CondEq, 101)
    .set("name", ["Petya"])
    .update()

# Применение транзакции (коммит)
tx.commit()

При использовании транзакций в кластерном режиме, разумно создавать их с установленным тайм-аутом — для предотвращения «зависаний» в случаях несогласованности Raft-кластера.

# Создание нового объекта транзакции
tx = db.new_transaction("actors", timedelta(milliseconds=10000))

# Заполнение объекта транзакции
tx.upsert({ "id": 100 })
tx.upsert({ "id": 101, "name": "Stasya" })

tx.new_query("payments")
    .where("id", CondType.CondEq, 101)
    .set("name", ["Petya"])
    .update()

# Применение транзакции (коммит)
tx.commit()

В Java для начала транзакции используется метод db.beginTransaction(). Он создает транзакцию как объект, который предоставляет приложению интерфейсы update/upsert/insert/delete. Для RPC-клиентов действует ограничение по количеству транзакций — каждое соединение не может иметь более 1024 открытых транзакций одновременно.

import java.util.concurrent.*;

ExecutorService executor = Executors.newSingleThreadExecutor();
final int txTimeoutSeconds = 10;

try {
    Future<Void> future = executor.submit(() -> {
        // Создание нового объекта транзакции
        Transaction<Item> tx = db.beginTransaction("items", Item.class);

        try {
            // Заполнение объекта транзакции
            tx.upsert(new Item(100));
            tx.upsert(new Item(101));
            tx.query().where("id", EQ, 102)
                .set("name", "Petya")
                .update();

            // Применение транзакции (коммит)
            tx.commit();
        } catch (Exception e) {
            tx.rollback();
            throw e;
        }
        return null;
    });

    future.get(txTimeoutSeconds, TimeUnit.SECONDS);
    System.out.println("Transaction committed successfully");

} catch (TimeoutException e) {
    System.err.println("Transaction timed out");
} catch (InterruptedException | ExecutionException e) {
    System.err.println("Transaction failed: " + e.getCause().getMessage());
} finally {
    executor.shutdown();
}

Пример работы с транзакциями при взаимодействии с БД по HTTP:

  1. Создание транзакции для неймспейса items базы данных testdb:

    curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/namespaces/items/transactions/begin'
    

    При успешном создании транзакции в ответ приходит ее идентификатор (tx_id), который необходим для выполнения запросов в рамках транзакции и ее коммита либо отката:

    {
      "tx_id": "iKFdsSDNGRzQLUioPSsl_424347868677"
    }
    

    Замечание: Для всех операций с транзакциями через протокол HTTP применяется тайм-аут http_write_timeout для предотвращения «зависаний» в случае несогласованности Raft-кластера. Общая длительность транзакции не ограничивается, однако накладывается ограничение в tx_idle_timeout секунд на периоды отсутствия активности между одиночными операциями (такими, как добавление данных в существующую транзакцию, ее коммит или откат) при превышении которого, транзакция будет считаться недействительной — просроченной.

  2. Выполнение запросов в рамках транзакции:

    Добавление новой записи в неймспейс:

    curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
    --header 'Content-Type: text/plain' \
    --data-raw '{
      "ID": 4,
      "Name": "Article",
      "Articles": [
        1,
        2
      ],
      "Year": 2022
    }'
    

    Обновление существующей записи (в данном случае — изменение значение поля Name и наполнение значениями массива Articles):

    curl --location --request PUT 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
    --header 'Content-Type: text/plain' \
    --data-raw '{
      "ID": 2,
      "Name": "New_Article",
      "Articles": [
        1,
        2
      ],
      "Year": 2022
    }'
    

    Удаление записи из неймспейса:

    curl --location --request DELETE 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/items' \
    --header 'Content-Type: text/plain' \
    --data-raw '{
      "ID": 1
    }'
    
  3. Коммит транзакции:

    curl --location --request POST 'http://127.0.0.1:9088/api/v1/db/testdb/transactions/iKFdsSDNGRzQLUioPSsl_424347868677/commit'
    

Асинхронный режим выполнения транзакций

Асинхронный режим доступен при использовании Go, Java и .NET-коннекторов для Reindexer и может использоваться для ускорения массовой вставки записей в неймспейсы БД. Ускорение достигается за счёт того, что клиент не ожидает ответа от сервера на каждое добавление шага в транзакцию. Такой подход обеспечивает быстрое заполнение транзакции в случае cproto-соединения. Для Built-in-режима выигрыша по скорости не будет.

Примеры выполнения транзакции в асинхронном режиме:

// Создание нового объекта транзакции
tx, err := db.BeginTx("items");
if err != nil {
	panic(err)
}

// Заполнение объекта асинхронной транзакции запросами
tx.UpsertAsync(&Item{ID: 100},func(err error) {})
tx.UpsertAsync(&Item{ID: 100},func(err error) {})

// Ожидание выполнения запросов и завершение транзакции
if err := tx.Commit(); err != nil {
	panic(err)
}

В настоящий момент Python-коннектор не поддерживает асинхронный режим выполнения транзакций.

// Создание нового объекта транзакции
Transaction<Item> tx = db.beginTransaction("items", Item.class);

try {
    // Заполнение объекта асинхронной транзакции запросами
    tx.upsertAsync(new Item(100));
    tx.upsertAsync(new Item(101))
            .thenAccept(item -> {
                if (item != null) {
                    System.out.println("Successfully processed item: " + item.getName());
                }
            })
            .exceptionally(e -> {
                System.err.println("Error in async operation: " + e.getMessage());
                return null;
            });

    // Ожидание выполнения запросов и завершение транзакции.
    tx.commit();
    System.out.println("Transaction committed successfully");

} catch (Exception e) {
    tx.rollback();
    System.err.println("Transaction failed: " + e.getMessage());
}

Второй аргумент в UpsertAsync — функция завершения, которая вызывается после получения ответа на запрос от сервера. Если при подготовке транзакции произошла какая-либо ошибка, tx.Commit() вернет ошибку. Поэтому, чтобы проверить, зафиксировались ли вносимые в рамках транзакции изменения, достаточно убедиться, что tx.Commit() не вернул ошибку.

Особенности реализации транзакций в Reindexer, ограничения

  1. Объект транзакции не является потокобезопасным и не может использоваться из разных потоков.
  2. Объект транзакции потребляет ресурсы. Поэтому, чтобы избежать утечки ресурсов, приложение должно явно вызывать метод для отката транзакции (например, tx.Rollback() для Go либо POST-запрос /db/{database}/transactions/{tx_id}/rollback для HTTP) или для коммита (например, tx.Commit() для Go либо POST-запрос /db/{database}/transactions/{tx_id}/commit для HTTP).
  3. После коммита можно безопасно вызвать метод для отката транзакции.
  4. Изнутри транзакций возможно вызывать произвольные запросы с помощью соответствующих методов для разных языков (например, tx.query().Exec() ... для Go).
  5. Все транзакции работают с уровнем изоляции serialized. Каждая транзакции начинает модифицировать данные только в момент вызова Commit, при этом все изменения вносятся под эксклюзивной блокировкой неймспейса.
  6. Для Go, транзакции, создаваемые через экземпляр Rx c установленным тайм-аутом, будут иметь ограничение на общую длительность выполнения соответствующую тайм-ауту, т. е. вся транзакция от момента BeginTx до tx.Commit() или tx.Rollback() должна укладываться в указанный период.
  7. В текущей реализации транзакции возвращают только количество затронутых документов, но не их содержимое. Таким образом, например, при использовании serial() (или других прецептов) в транзакции не выйдет получить присвоенное им значение непосредственно из результатов tx.Commit().