Stream
Предпосылки: List. Полезно знать Apache Kafka на уровне понятий topic, partition, consumer group.
← Sorted Set | HyperLogLog →
Проблема надёжности
LIST как очередь работает, но имеет фундаментальное ограничение: после BRPOP элемент исчез из Redis. Если обработчик получил сообщение, но упал до завершения обработки — сообщение потеряно. Это at-most-once доставка (сообщение доставляется не более одного раза — либо один раз, либо ноль): без подтверждения обработки гарантии нет. Нет способа перечитать, нет способа узнать, что именно было потеряно. Для фоновых задач с retry-логикой на уровне приложения это допустимо. Для потока платежей или аудит-лога — нет.
Streams (Redis 5.0+) решают эту проблему: сообщения не удаляются при чтении, каждый получатель отслеживает свою позицию, а необработанные сообщения можно перечитать и переназначить.
Append-only лог с ID
Stream — append-only лог (журнал только для добавления): сообщения только добавляются в конец, существующие записи не изменяются и не удаляются. Каждое получает автоматически сгенерированный ID вида <millisecondsTime>-<sequenceNumber> (например, 1700000000000-0). ID монотонно возрастают — по ним можно читать поток с любой позиции. Каждое сообщение — набор полей (field-value пар), аналогичный Hash:
XADD mystream * sensor_id 1234 temperature 23.5
-- * = автоматический ID → "1700000000000-0"
XADD mystream * sensor_id 1234 temperature 24.1
-- → "1700000000000-1" (та же миллисекунда, sequence +1)
XLEN mystream -- → 2Сообщения сохраняются в потоке и доступны любому количеству читателей. Но как каждый читатель отслеживает, где он остановился?
Чтение: XRANGE, XREAD, блокировка
XRANGE читает сообщения по диапазону ID — для исторических данных и повторного чтения:
XRANGE mystream - + -- все сообщения (от начала до конца)
XRANGE mystream 1700000000000-0 + COUNT 10 -- 10 сообщений начиная с IDXREAD читает новые сообщения, начиная с указанной позиции — для «хвоста» потока:
XREAD COUNT 10 STREAMS mystream 0 -- все сообщения с начала
XREAD COUNT 10 STREAMS mystream $ -- только новые (после текущего момента)
XREAD COUNT 10 BLOCK 5000 STREAMS mystream $ -- блокировать до 5 секундXREAD BLOCK по поведению похож на BRPOP: клиент ждёт, пока не появится новое сообщение. Но в отличие от BRPOP, сообщение не удаляется — его увидят все вызовы XREAD.
XREAD доставляет каждое сообщение каждому читателю. Для распределения работы между несколькими обработчиками нужен другой механизм.
Consumer groups: распределённая обработка
Consumer group (группа потребителей) позволяет нескольким обработчикам читать один поток параллельно: каждое сообщение доставляется ровно одному обработчику в группе.
XGROUP CREATE mystream processors 0 -- создать группу, начать с ID 0 (все сообщения)
XREADGROUP GROUP processors worker-1 COUNT 10 BLOCK 5000 STREAMS mystream >
-- > означает "только непрочитанные этой группой"XREADGROUP забирает сообщения из потока и помечает их как доставленные конкретному обработчику. Группа ведёт указатель last-delivered-id (ID последнего доставленного сообщения): каждый вызов XREADGROUP с > получает сообщения после этого указателя. Два воркера, читающих из одной группы, никогда не получат одно и то же сообщение.
Но доставленное — не значит обработанное. Если worker-1 получил сообщение и упал, группа должна знать, что сообщение не подтверждено.
Pending и XACK
Каждое сообщение, доставленное через XREADGROUP, попадает в pending entries list (PEL) — список сообщений, которые доставлены, но не подтверждены. PEL — ключевая структура, обеспечивающая надёжность: она отслеживает, кому и когда доставлено каждое сообщение.
XPENDING mystream processors -- сводка: сколько pending, у кого, диапазон ID
XPENDING mystream processors - + 10 -- детали: ID, consumer, idle time, delivery count
XACK mystream processors 1700000000000-0 -- подтвердить обработку
-- сообщение удаляется из PELXACK — единственный способ убрать сообщение из PEL. Пока XACK не вызван, сообщение числится как необработанное. Жизненный цикл сообщения в группе: XADD добавляет в поток → XREADGROUP доставляет воркеру и помещает в PEL → воркер обрабатывает → XACK подтверждает и удаляет из PEL.
Восстановление после сбоя: XCLAIM и XAUTOCLAIM
Если воркер умер, его pending-сообщения нужно переназначить. XCLAIM позволяет другому обработчику забрать сообщение, которое слишком долго висит без подтверждения:
XCLAIM mystream processors worker-2 60000 1700000000000-0
-- забрать сообщение, висящее в pending > 60 секунд, и отдать worker-2Параметр 60000 — минимальный idle time (время простоя) в миллисекундах. Если сообщение было доставлено менее 60 секунд назад, XCLAIM не сработает — возможно, оригинальный воркер ещё обрабатывает его.
Redis 6.2 добавил XAUTOCLAIM, который автоматически находит и перенаправляет зависшие сообщения без необходимости предварительно вызывать XPENDING:
XAUTOCLAIM mystream processors worker-2 60000 0
-- найти все pending старше 60 секунд, начиная с ID 0, и отдать worker-2
-- возвращает курсор для продолжения + забранные сообщенияКаждое сообщение имеет счётчик доставок (delivery count), который увеличивается при каждом XCLAIM/XAUTOCLAIM. Если сообщение перезахватывалось 5 раз и ни разу не было подтверждено — вероятно, оно вызывает ошибку при обработке, и его стоит перенести в dead letter queue (отдельный поток для сообщений, которые не удалось обработать после N попыток).
Сценарий: сбой воркера
Проследим путь одного сообщения через все механизмы. Продюсер добавляет показание датчика: XADD mystream * sensor_id 1234 temperature 23.5 — Redis присваивает ID 1700000000000-5. Сообщение теперь в потоке.
worker-1 вызывает XREADGROUP GROUP processors worker-1 STREAMS mystream > и получает сообщение. Redis перемещает указатель группы last-delivered-id на 1700000000000-5 и добавляет запись в PEL: сообщение 1700000000000-5 доставлено worker-1, idle time = 0, delivery count = 1.
worker-1 начинает обработку, но процесс падает до вызова XACK. Сообщение остаётся в PEL — Redis не знает, обработано оно или нет.
Проходит 60 секунд. worker-2 вызывает XAUTOCLAIM mystream processors worker-2 60000 0. Redis находит сообщение 1700000000000-5 с idle time > 60000 мс, переназначает его worker-2 и увеличивает delivery count до 2. worker-2 получает сообщение, обрабатывает его и вызывает XACK mystream processors 1700000000000-5. Redis удаляет запись из PEL — сообщение обработано.
Если бы worker-2 тоже упал, цикл повторился бы. После пятой неудачной доставки (delivery count = 5) разумно переместить сообщение в dead letter queue и разобраться вручную.
Управление памятью: XTRIM
Append-only лог растёт бесконечно. Без обрезки поток из тысячи сообщений в секунду за сутки превращается в 86 миллионов записей. XTRIM ограничивает размер:
XTRIM mystream MAXLEN 10000 -- оставить не более 10 000 сообщений
XTRIM mystream MAXLEN ~ 10000 -- приблизительно 10 000 (эффективнее)
XTRIM mystream MINID 1700000000000-0 -- удалить всё старше указанного ID~ (приблизительная обрезка) позволяет Redis удалять записи целыми узлами radix tree, не проходя поэлементно. Результат может быть чуть больше указанного лимита, но операция значительно быстрее.
Для удаления конкретного сообщения по ID — XDEL mystream 1700000000000-0. В отличие от XTRIM, удаляющего старые сообщения массово, XDEL убирает одно сообщение независимо от его позиции.
XADD поддерживает встроенную обрезку: XADD mystream MAXLEN ~ 10000 * field value — добавить сообщение и обрезать хвост за одну команду.
Для удаления consumer group, которая больше не нужна: XGROUP DESTROY mystream processors. Это удаляет группу и её PEL, но не сообщения из потока.
Stream, LIST, Pub/Sub и Kafka
LIST доставляет сообщение одному получателю, и после BRPOP оно исчезает. Повторное чтение невозможно. Это подходит для простых очередей задач, где retry обеспечивается на уровне приложения.
Sub доставляет сообщение всем подписчикам, но не хранит его. Если подписчик отсутствовал в момент публикации — сообщение потеряно. Pub/Sub — для real-time broadcast, где допустима потеря.
Stream объединяет оба паттерна: XREAD доставляет каждому (как Pub/Sub, но с хранением), XREADGROUP доставляет одному в группе (как LIST, но с подтверждением). Сообщения сохраняются и доступны для повторного чтения по ID.
Kafka решает ту же задачу, но на другом масштабе. Kafka шардирует топик на партиции, хранит данные на диске с retention на дни и недели, рассчитан на терабайты данных. Stream хранит поток в памяти одного узла и даёт микросекундную латентность. Для тяжёлых систем обработки событий Kafka остаётся правильным выбором. Stream подходит, когда уже есть Redis и нужна надёжная очередь без дополнительной инфраструктуры.
Внутри: radix tree из listpack’ов
ID потока делят префикс миллисекунд, и внутренняя структура Redis использует это. Stream хранится как radix tree (src/rax.c) — дерево, в котором узлы хранят общие префиксы ключей. Ключи — ID сообщений, и поскольку ID в пределах одной секунды отличаются только в последних цифрах, radix tree компактно сжимает общие префиксы. Значения в узлах — listpack’и (компактные последовательные массивы, заменившие ziplist в Redis 7), содержащие поля сообщений. Несколько последовательных сообщений с одинаковыми именами полей упаковываются в один listpack, и имена полей хранятся только один раз в заголовке — дальнейшая экономия памяти.
См. также
- Аудит-лог платежей на Stream в Rails — XADD, XREADGROUP, XACK, consumer groups
Sources
- Redis Documentation: Streams. https://redis.io/docs/data-types/streams/
- Redis Documentation: Streams tutorial. https://redis.io/docs/data-types/streams-tutorial/
- Redis source:
src/t_stream.c,src/rax.c
← Sorted Set | HyperLogLog →