Kafka Transactions: exactly-once для consume-transform-produce

Предпосылки: Consumer internals (poll loop, offset commit, at-least-once), Producer reliability (idempotent producer, PID, sequence number, границы), Гарантии доставки (exactly-once = at-least-once + idempotency), Reliability patterns (idempotency).

Idempotent producer гарантирует exactly-once на стороне записи: PID и sequence number исключают дубликаты и переупорядочивание в каждой партиции. На стороне чтения at-least-once + идемпотентный обработчик даёт exactly-once обработку для простого consumer’а вроде seller_read_db: прочитал событие, выполнил UPDATE orders SET status = 'paid' WHERE id = 42, закоммитил offset. Повтор при crash’е безвреден — UPDATE идемпотентен.

Но рядом с seller_read_db появился enrichment-сервис. Его задача — прочитать событие из order_events, обогатить данными продавца (имя, рейтинг, комиссия) и записать результат в topic enriched_orders. Downstream consumer’ы — billing (начисление комиссии) и analytics (ClickHouse) — читают уже из enriched_orders. Enrichment-сервис — consume-transform-produce: внутри одного процесса живут и consumer (для order_events), и producer (для enriched_orders).

flowchart LR
    OE["order_events<br>#42 paid<br>#77 paid"]
    EN["enrichment-сервис<br>read → обогатить → write"]
    EO["enriched_orders<br>#42 paid+seller<br>#77 paid+seller"]
    B["billing"]
    A["analytics"]

    OE --> EN --> EO
    EO --> B & A

Consume-transform-produce: разрыв атомарности

Цикл обработки enrichment-сервиса — три операции за итерацию:

while true
  records = consumer.poll(timeout: 500ms)

  for record in records
    enriched = enrich_with_seller_data(record)
    producer.send("enriched_orders", key: record.key, value: enriched)
  end

  consumer.commit_sync   # зафиксировать offset в order_events
end

Enrichment-сервис обработал 10 событий, записал 10 обогащённых записей в enriched_orders и упал до commit_sync. Billing видит все 10 событий — они записаны, committed, выше high watermark. При рестарте enrichment-сервис перечитает те же 10 событий из order_events (offset не сдвинулся) и запишет их повторно. Billing получит 20 событий вместо 10.

Для seller_read_db идемпотентность была дешёвой — один идемпотентный UPDATE. Billing начисляет комиссию продавцу: INSERT INTO commissions (order_id, seller_id, amount) VALUES (42, 7, 15.00). Чтобы сделать это идемпотентным, нужен unique constraint на order_id или проверка перед вставкой — дополнительная логика в каждом downstream consumer’е. Analytics пишет в ClickHouse через batch insert — дедупликация в аналитической БД дорогая. А если downstream — внешний API? Billing вызывает платёжную систему для начисления — внешний API может не поддерживать идемпотентность.

Enrichment-сервис не может гарантировать отсутствие дубликатов в output topic’е, и ответственность перекладывается вниз по цепочке. С одним downstream это терпимо. С пятью — каждый из пяти защищается от дубликатов по-своему. Если бы enrichment-сервис гарантировал, что каждое событие появляется в enriched_orders ровно один раз — downstream consumer’ам не нужна идемпотентность для защиты от дубликатов enrichment’а.

Для этого запись в enriched_orders и commit offset’а в order_events должны быть атомарными: либо оба произойти, либо оба не произойти. Offset commit — это запись в __consumer_offsets, а это другой topic, другая партиция. Два разных topic’а, две разных партиции, одна атомарная операция.

Атомарность в append-only логе требует решения трёх задач: скрыть незакоммиченные записи от consumer’ов (механизм видимости), идентифицировать producer через рестарты и отсекать зомби-процессы (механизм идентификации), координировать commit в несколько партиций одновременно (механизм координации).

Видимость в append-only логе

Атомарность в PostgreSQL реализована через rollback: crash до COMMIT — recovery откатывает незавершённые изменения, строки становятся невидимы через xmin/xmax и snapshot. В Kafka лог append-only — записанное нельзя удалить, перезаписать или пометить флагом. Запись иммутабельна: broker отдаёт данные через zero-copy transfer (sendfile()) прямо с диска, без чтения в пользовательское пространство. Если broker будет модифицировать записи на лету, zero-copy сломается.

Kafka решает видимость иначе: записи идут в лог как есть, без модификации. Вместо флага на каждой записи — один маркер в конце. Когда транзакция завершается, Kafka записывает в партицию специальное control messageCOMMIT или ABORT. Это обычная запись в тот же лог, просто с особым типом (кто именно координирует запись маркеров — разберём в секции о transaction coordinator’е).

enriched_orders, partition 0:

offset:  0      1      2      3      4      5
       [msg]  [msg]  [msg]  [msg]  [msg]  [COMMIT]
       │      │      │      │      │      │
       └──────┴──────┴──────┴──────┘      │
           записи транзакции T1           маркер

Consumer с дефолтными настройками (isolation.level=read_uncommitted) видит все записи, включая незакоммиченные. Consumer с isolation.level=read_committed видит данные только до определённой границы.

Broker отслеживает все незавершённые транзакции и вычисляет Last Stable Offset (LSO) — offset самой ранней незавершённой транзакции. Всё ниже LSO гарантированно либо нетранзакционное, либо уже завершено (COMMIT или ABORT).

enriched_orders, partition 0:

offset:  0    1    2    3    4    5        6    7
       [T1] [T1] [T1] [T1] [T1] [COMMIT] [T2] [T2]
                                    ^             │
                                    │       не завершена
                                    T1 завершена

LSO = 6  (T2 начинается с offset 6, маркера нет)

read_uncommitted consumer: видит 0..7
read_committed consumer:   видит 0..5 (до LSO)

Consumer с read_committed при poll() получает записи только до LSO. Broker ограничивает выдачу одним сравнением offset’а — consumer’у не нужно парсить маркеры или знать, какие записи транзакционные. Клиентская библиотека фильтрует control messages (COMMIT/ABORT) — приложение видит только data-записи.

При ABORT записи транзакции остаются в логе физически, но клиентская библиотека знает их PID (каждая запись несёт PID в метаданных) и, увидев маркер ABORT, отбрасывает записи этой транзакции. Записи от других producer’ов проходят нормально:

offset:  0    1    2    3    4    5       6
       [T1] [P2] [T1] [P2] [T1] [ABORT] [P2]

LSO сдвигается на 7 (нет открытых транзакций)

read_committed consumer получает:
  offset 1: P2 ✓
  offset 3: P2 ✓
  offset 6: P2 ✓
  offset 0, 2, 4: отфильтрованы (T1, ABORT)

Задержка видимости: trade-off открытых транзакций

В партицию пишут несколько producer’ов — транзакционных и обычных. Записи чередуются, а LSO — одно число на партицию, не фильтр по producer’у. Если транзакция T1 открыта с offset 0 и обычный producer P2 записал данные на offset 1, read_committed consumer не увидит ни offset 0 (T1, не завершена), ни offset 1 (P2, за LSO) — хотя P2 к транзакции не имеет отношения.

Альтернатива — broker фильтрует записи по PID на каждом fetch: читает данные в память, проходит каждую запись, проверяет принадлежность к открытой транзакции, собирает отфильтрованный батч. Zero-copy невозможен, CPU на каждом fetch при высоком throughput. LSO — грубый, но дешёвый механизм: одно сравнение offset’а вместо per-record фильтрации.

Trade-off реален, но он в задержке, не в потере данных. Как только транзакция завершается (COMMIT или ABORT), LSO сдвигается и все задержанные записи — включая чужие — становятся доступны. При коротких транзакциях (один poll-цикл, миллисекунды) задержка незаметна.

Если producer завис (GC-пауза, сетевой сбой) и не отправил ни COMMIT, ни ABORT — LSO стоит на месте. transaction.timeout.ms (дефолт 60 секунд) — предохранитель: если транзакция не завершена за это время, Kafka записывает ABORT, LSO сдвигается. Но 60 секунд при 150 events/sec — до 9 000 записей, невидимых для read_committed consumer’ов в этой партиции. Отсюда практическое правило: транзакции должны быть короткими — один poll-цикл: прочитал, обработал, записал, commit.

LSO и маркеры решают видимость на стороне чтения — read_committed consumer не видит незакоммиченные данные. Но чтобы producer мог открывать транзакции и при рестарте abort’нуть незавершённую или отсечь зомби-процесс, нужен механизм идентификации, который переживает рестарты.

transactional.id и epoch: идентификация через рестарты

Idempotent producer получает PID от broker’а при подключении. При рестарте — новый PID, broker не связывает старый и новый. Для транзакционного producer’а этого недостаточно: если enrichment-сервис упал посреди транзакции, при рестарте нужно знать, что предыдущая транзакция от того же логического сервиса не завершена — и abort’нуть её.

transactional.id — строка, заданная приложением (не кластером), например "enrichment-service-1". Она переживает рестарты. При подключении producer регистрируется с transactional.id, и broker назначает ему PID и epoch — монотонно растущий счётчик поколений:

Первый запуск:
  transactional.id = "enrichment-service-1"
  broker назначает PID = 7, epoch = 0

Crash, рестарт:
  transactional.id = "enrichment-service-1"  (тот же)
  broker назначает PID = 7, epoch = 1        (epoch +1)

Epoch решает проблему зомби-процессов (split-brain). Enrichment-сервис завис (GC-пауза), Kubernetes поднял новый pod. Старый процесс жив, но парализован. Два процесса с одним transactional.id:

Старый процесс: PID=7, epoch=0  (завис, но жив)
Новый процесс:  PID=7, epoch=1  (зарегистрировался)

Старый процесс очнулся и пытается записать в enriched_orders или закоммитить транзакцию. Broker видит epoch=0, текущий epoch=1 — ProducerFencedException: неретраиабельная ошибка, старый процесс должен остановиться. Его транзакция абортирована. В любой момент времени ровно один процесс владеет данным transactional.id.

Transaction coordinator: атомарный commit нескольких партиций

При commit_transaction записи могли уйти в несколько партиций enriched_orders (разные partition key) плюс __consumer_offsets. COMMIT-маркер нужен в каждой из них, и все должны появиться атомарно. Partition leader каждой партиции видит только свою часть — ни один не видит полной картины.

Transaction coordinator — broker, назначенный для конкретного transactional.id. Определяется хешем transactional.id по партициям внутреннего topic’а __transaction_state — аналогично тому, как group coordinator определяется хешем имени группы по партициям __consumer_offsets.

hash("enrichment-service-1") % 50 = 12
→ leader партиции 12 в __transaction_state = Broker 1
→ Broker 1 — transaction coordinator для этого transactional.id

Transaction coordinator хранит состояние транзакции в __transaction_state: transactional.id, PID/epoch, затронутые партиции, статус. Данные персистентны и реплицированы — __transaction_state работает с acks=all, как любой внутренний topic Kafka.

Transaction coordinator — это не partition leader data-партиций. Это разные роли на потенциально разных broker’ах:

Broker 0                    Broker 1 (Tx Coordinator)     Broker 2
─────────                   ─────────────────────────     ─────────
leader partition 0          leader __transaction_state    leader partition 2
leader __consumer_offsets

Coordinator не записывает COMMIT-маркеры сам — он отправляет запросы partition leader’ам: «запиши COMMIT-маркер в partition 0» (Broker 0), «запиши COMMIT-маркер в partition 2» (Broker 2).

Цикл commit’а — упрощённый two-phase commit (координатор один, «участники» — partition leaders — не голосуют, а записывают маркер по команде):

Enrichment-сервис         Tx Coordinator (Broker 1)       Partition leaders
      │                          │                              │
      │  begin_transaction       │                              │
      │─────────────────────────>│  статус: ONGOING             │
      │                          │                              │
      │  send("enriched_orders") │                              │
      │──────────────────────────┼─────────────────────────────>│ partition 0
      │  (coordinator узнаёт:    │                              │ partition 2
      │   T1 затрагивает p0, p2) │                              │
      │                          │                              │
      │  send_offsets_to_txn     │                              │
      │──────────────────────────┼─────────────────────────────>│ __consumer_offsets
      │                          │                              │
      │  commit_transaction      │                              │
      │─────────────────────────>│                              │
      │                          │  1. PREPARE_COMMIT           │
      │                          │     в __transaction_state    │
      │                          │     (acks=all, реплицировано)│
      │                          │                              │
      │                          │  2. COMMIT-маркер            │
      │                          │─────────────────────────────>│ partition 0
      │                          │─────────────────────────────>│ partition 2
      │                          │─────────────────────────────>│ __consumer_offsets
      │                          │                              │
      │                          │  3. COMMITTED                │
      │                          │     в __transaction_state    │

Шаг 1 — point of no return. PREPARE_COMMIT записан в __transaction_state с acks=all — подтверждён всеми ISR-репликами. Coordinator не переходит к рассылке маркеров, пока PREPARE_COMMIT не реплицирован. С этого момента решение принято: транзакция будет закоммичена, даже если coordinator упадёт. Аналогия: запись COMMIT в WAL с fsync в PostgreSQL — point of no return для транзакции. Даже если процесс упадёт до обновления data-файлов, recovery доиграет WAL.

Если coordinator упал после шага 1 (PREPARE_COMMIT записан), но до завершения шага 2 — часть партиций получила COMMIT-маркер, часть нет. Новый leader __transaction_state (выбранный через ISR, как при обычном failover) прочитает PREPARE_COMMIT и доразошлёт COMMIT-маркеры в оставшиеся партиции.

Если coordinator упал до шага 1 (PREPARE_COMMIT не записан) — новый coordinator увидит транзакцию в статусе ONGOING и abort’нет её по transaction.timeout.ms.

Полный цикл: consume-transform-produce с транзакциями

Ключевое отличие от цикла без транзакций: offset commit происходит через producer, а не через consumer. send_offsets_to_transaction записывает offset’ы в __consumer_offsets как часть той же транзакции. При commit — и записи в enriched_orders, и offset’ы становятся видимы атомарно. При abort — ни то, ни другое.

producer = create_producer(
  transactional_id: "enrichment-service-1"
  # idempotent producer включается автоматически
)

producer.init_transactions   # один раз при старте: регистрация transactional.id,
                             # получение PID/epoch, abort незавершённых транзакций

consumer = create_consumer(
  group_id: "enrichment-group",
  isolation_level: "read_committed",  # не видеть чужие незакоммиченные записи
  enable_auto_commit: false           # offset'ы коммитятся через producer
)

while true
  records = consumer.poll(timeout: 500ms)

  producer.begin_transaction

  for record in records
    enriched = enrich_with_seller_data(record)
    producer.send("enriched_orders", key: record.key, value: enriched)
  end

  # offset commit ВНУТРИ транзакции, через producer
  producer.send_offsets_to_transaction(
    consumer.current_offsets,        # {partition_0: offset_23, ...}
    group_id: "enrichment-group"
  )

  producer.commit_transaction        # атомарно: и записи, и offset'ы
end

init_transactions при старте делает три вещи: регистрирует transactional.id у transaction coordinator’а, получает PID и новый epoch (fencing предыдущего экземпляра), abort’ит любую незавершённую транзакцию предыдущего epoch’а. После этого producer готов к транзакционной работе.

Транзакция охватывает батч из одного poll(), а не каждое событие по отдельности. begin_transaction + commit_transaction — запись маркеров в лог, координация с transaction coordinator, запись в __consumer_offsets. При 150 events/sec транзакция на каждое событие — 150 транзакций в секунду, ощутимый overhead. Транзакция на батч (десятки-сотни событий из poll()) — единицы транзакций в секунду. Штраф за abort выше (переобработать весь батч), но abort — редкое событие, а overhead на commit — постоянный.

Enrichment-сервис — consumer в группе enrichment-group. Если посреди открытой транзакции произошёл rebalancing (например, добавился новый экземпляр сервиса), consumer теряет часть партиций. Offset’ы для отозванных партиций уже включены в send_offsets_to_transaction — commit таких offset’ов некорректен, потому что consumer больше не владеет этими партициями. Транзакция должна быть abort’нута до передачи партиций. На практике это реализуется через ConsumerRebalanceListener: в callback’е onPartitionsRevoked вызывается producer.abort_transaction(), а после получения новых партиций — начинается новая транзакция с чистого состояния.

Граница exactly-once

Kafka Transactions гарантируют exactly-once для операций внутри Kafka: прочитать из topic’а, записать в topic, закоммитить offset — атомарно и ровно один раз. Enrichment-сервис (Kafka → Kafka) — идеальный кандидат.

Как только обработка выходит за пределы Kafka, транзакции не контролируют side effect. Billing прочитал обогащённое событие из enriched_ordersread_committed — без дубликатов от enrichment’а) и вызвал платёжный API: «начисли 15$ продавцу #7». API ответил «ok». Billing упал до commit’а offset’а. При рестарте — повтор вызова. Exactly-once гарантия Kafka не защитила billing от дубликата во внешней системе.

Для таких случаев остаётся at-least-once + идемпотентность на стороне приложения: unique constraint в БД, idempotency key во внешнем API, upsert вместо insert. seller_read_db с идемпотентным UPDATE — пример, где транзакции Kafka не нужны: одна операция «прочитал → обработал → закоммитил offset», без записи в другой topic, и at-least-once + идемпотентный обработчик достаточен.

Exactly-once в Kafka — это exactly-once processing между topic’ами, не exactly-once delivery во внешний мир.

Sources