Очереди
Предпосылки: List, Sorted Set, Stream, атомарность одной команды.
После оформления заказа нужно отправить email, обработать изображение, обновить поисковый индекс. Эти задачи не должны выполняться в процессе, обрабатывающем HTTP-запрос, — они тяжёлые и некритичны по времени. Для этого нужна очередь: продюсер добавляет задачу, обработчик (отдельный процесс) забирает и выполняет. Redis предоставляет несколько структур с блокирующими операциями и атомарными перемещениями, на которых строятся очереди разной надёжности.
Простая очередь на LIST
LPUSH добавляет задачу в начало списка, BRPOP атомарно забирает задачу с конца (FIFO). BRPOP блокирует соединение, если список пуст, и возвращает элемент, как только он появится. Клиент не тратит CPU на polling.
-- продюсер:
LPUSH queue:emails '{"to":"user@example.com","subject":"Welcome"}'
-- обработчик (блокирующий):
BRPOP queue:emails 0
-- → ["queue:emails", '{"to":"user@example.com","subject":"Welcome"}']
-- 0 = ждать бесконечноНесколько обработчиков могут вызвать BRPOP на одной очереди — Redis отдаст каждое сообщение ровно одному из них.
Проблема: после BRPOP элемент удалён из Redis. Если обработчик упал до завершения работы — сообщение потеряно. Email не отправлен, пользователь не получил подтверждение заказа.
Очередь с гарантией обработки (reliable queue)
BRPOP удаляет элемент в момент чтения — между «забрал» и «обработал» нет страховки. Вместо этого используется двухсписковая схема. LMOVE source destination RIGHT LEFT атомарно забирает элемент из конца основной очереди и помещает в начало processing-списка. Элемент не исчезает — он перемещается.
-- обработчик:
LMOVE queue:emails queue:emails:processing RIGHT LEFT
-- → забрал из queue:emails, поместил в queue:emails:processing
-- после успешной обработки:
LREM queue:emails:processing 1 '<сообщение>'
-- удалить из processing-спискаЕсли обработчик упал — сообщение остаётся в processing-списке. У Redis нет встроенного механизма для возврата таких сообщений, поэтому нужен отдельный процесс в коде приложения (monitor). Он периодически читает processing-список, проверяет timestamp начала обработки внутри payload каждого элемента, и возвращает в основную очередь те, что висят дольше заданного таймаута.
Схема с LREM работает надёжно, если элементы в очереди уникальны. Если одинаковый payload встречается несколько раз, LREM 1 '<сообщение>' может удалить не тот элемент. На практике в очередь кладут уникальный ID задачи, а payload хранят отдельно (например, HSET jobs <id> <json>).
До Redis 6.2 использовался RPOPLPUSH (делает то же самое, но с фиксированным направлением). Блокирующий вариант — BLMOVE source destination RIGHT LEFT timeout (Redis 6.2+).
Отложенные задачи (delayed queue)
Очередь с гарантией обработки решает проблему потери сообщений, но предполагает немедленную обработку. Некоторые задачи нужно выполнить позже: напоминание через час, retry после сбоя с экспоненциальной задержкой, отложенное уведомление. LIST не умеет «придержать» элемент до нужного момента — BRPOP отдаёт первый доступный.
ZSET решает задачу: score — это Unix timestamp момента выполнения. Задачи со score в будущем просто лежат, обработчик выбирает только те, чьё время наступило.
-- добавить задачу с выполнением через 5 минут:
ZADD queue:delayed 1700000300 '{"task":"send_reminder","user":123}'
-- обработчик периодически проверяет:
ZRANGEBYSCORE queue:delayed 0 <текущий_timestamp> LIMIT 0 10
-- → задачи, время которых наступило
-- после получения задачи — удалить и обработать:
ZREM queue:delayed '<сообщение>'ZRANGEBYSCORE + ZREM — две команды. Если два обработчика одновременно прочитают одну задачу через ZRANGEBYSCORE, оба попытаются её обработать. Для атомарности «выбрать и удалить» оборачивают в Lua-скрипт:
EVAL "
local items = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)
if #items > 0 then
redis.call('ZREM', KEYS[1], items[1])
return items[1]
end
return nil
" 1 queue:delayed 1700000060Очереди с приоритетом
Отложенные задачи упорядочены по времени, но иногда задачи одного момента имеют разный приоритет — платёж важнее аналитического события. Два подхода: BRPOP нескольких LIST (Redis проверяет ключи слева направо и забирает из первого непустого — простейшие приоритеты без дополнительных структур) и ZSET с приоритетом как score (гибче — приоритетов может быть сколько угодно, но требует Lua для атомарного «выбрать и удалить»).
Streams как очередь
Все предыдущие подходы требуют ручной реализации подтверждения и обработки сбоев: processing-список, monitor, Lua-скрипты. Streams решают эти задачи на уровне Redis. Consumer group распределяет сообщения между обработчиками, каждый обработчик подтверждает получение через XACK. Redis автоматически отслеживает необработанные сообщения (pending entries) — не нужен отдельный monitor. Если обработчик завис, XCLAIM или XAUTOCLAIM (Redis 6.2+) позволяют другому обработчику забрать зависшее сообщение. В отличие от LIST, Streams хранят историю: одно и то же сообщение можно перечитать, а новый consumer group может начать обработку с произвольного момента.
Выбор реализации
| Требование | Реализация |
|---|---|
| Простая FIFO-очередь, потеря допустима | LIST + BRPOP |
| FIFO с гарантией обработки | LIST + BLMOVE (reliable queue) |
| Отложенные задачи | ZSET + Lua |
| Приоритеты | BRPOP нескольких LIST или ZSET |
| Consumer groups, подтверждение, повторное чтение | Stream |
Для критичных задач (платежи, заказы) Redis-очередь — не замена message broker (RabbitMQ, Kafka). При падении Redis без персистентности задачи в очереди теряются — платёж не обработан, заказ завис. Streams приближаются к гарантиям message broker (at-least-once — сообщение доставляется хотя бы один раз, возможны дубликаты — через XACK), но в пределах RAM и одного кластера.
См. также
- Очередь email на LIST в Rails — LPUSH/BRPOP
- Sidekiq — фреймворк фоновых задач: BasicFetch (BRPOP) vs SuperFetch (LMOVE) как конкретная инстанциация простой и reliable queue
Sources
- Redis Documentation: Patterns — Reliable queue. https://redis.io/commands/rpoplpush/#pattern-reliable-queue
- Redis Documentation: Streams. https://redis.io/docs/data-types/streams/