Message Queues: асинхронная коммуникация между сервисами
Предпосылки: гарантии доставки (at-most-once, at-least-once, exactly-once = idempotency), паттерны надёжности (retry, cascading failure).
← Гарантии доставки в распределённых системах | Выбор хранилища под паттерн доступа →
Кэширование решает проблему повторных чтений — горячие данные обслуживаются из Redis, не нагружая PostgreSQL. Но оформление заказа — не чтение данных. Это цепочка действий: зарезервировать товар на складе, списать деньги, отправить подтверждение, записать аналитику. Каждое действие — вызов внешнего сервиса. Кэш здесь не поможет: нужно выполнить работу, а не вернуть сохранённый результат.
Цена синхронной цепочки
Пользователь нажимает «Оформить заказ». Обработчик вызывает четыре сервиса последовательно:
POST /orders
→ warehouse.reserve() 50ms
→ billing.charge() 500ms
→ email.send() 2 000ms
→ analytics.record() 100ms
→ return "Заказ оформлен"
Пользователь ждёт суммарные 2 650ms. При нормальной работе — терпимо. Проблемы начинаются, когда что-то идёт не по плану.
Coupling по доступности. Каждый сервис — точка отказа. Если email-сервис недоступен, весь заказ не оформляется, хотя склад и биллинг работают. Четыре зависимости с доступностью 99.9% каждая дают общую доступность 99.9%⁴ ≈ 99.6% — в четыре раза больше простоя, чем у каждого сервиса по отдельности. Добавление пятого сервиса ухудшает ситуацию.
Потеря работы при падении. Обработчик вызвал склад (товар зарезервирован) и биллинг (деньги списаны), но упал до отправки email. Пользователь заплатил, товар заблокирован — подтверждение не отправлено. При перезапуске состояние потеряно: непонятно, какие шаги выполнены, а какие нет — восстановить процесс невозможно.
Разница в скорости. Склад отвечает за 50ms, email — за 2 секунды. Для ответа «заказ принят» достаточно проверить наличие товара и списать оплату, а отправка письма и запись аналитики могут произойти позже. Но при синхронной цепочке пользователь ждёт самый медленный компонент. В пиковые часы аналитика не справляется с потоком и начинает таймаутить — timeout срабатывает, весь заказ возвращает ошибку из-за некритичного компонента.
У всех трёх проблем одна причина: producer (обработчик заказа) и consumers (warehouse, billing, email) связаны во времени — producer ждёт, пока каждый consumer закончит работу. Оба должны быть доступны одновременно, работать на одной скорости, и сбой одного немедленно бьёт по другому.
Temporal decoupling: развязка во времени
Решение — разорвать временну́ю связь. Temporal decoupling (развязка во времени): producer и consumer работают независимо. Producer отправляет сообщение и продолжает работу, не дожидаясь consumer. Consumer обрабатывает сообщение, когда готов — через миллисекунды, минуты или часы.
Message broker (брокер сообщений) — сервис-посредник, который делает эту развязку возможной. Он принимает сообщения от producers, хранит их и доставляет consumers.
flowchart TB P["Producer<br>(обработчик заказа)"] -->|"publish<br>(заказ #42 оформлен)"| B["Message Broker"] B -->|"consume<br>(когда consumer готов)"| C["Consumer<br>(warehouse / billing / email / analytics)"]
С брокером обработчик заказа меняется:
POST /orders
→ billing.charge() 500ms ← синхронно, критичная операция
→ broker.publish("order_created", order_data) 5ms ← асинхронно, остальное
→ return "Заказ оформлен"
505ms вместо 2 650ms. Пользователь не ждёт email и аналитику. Склад, email-сервис и аналитика подпишутся на событие «заказ оформлен» и обработают его в своём темпе.
Почему брокер — отдельный сервис, а не очередь в памяти приложения? Очередь в памяти умирает вместе с процессом: перезапуск web-сервера — все накопленные сообщения потеряны. Брокер переживает падение обоих участников: producer положил сообщение и упал — сообщение в брокере; consumer упал — сообщение дождётся нового consumer. Персистенция, подтверждение доставки, повторные попытки — ответственность брокера, а не прикладного кода.
Temporal decoupling решает все три проблемы синхронной цепочки. Coupling по доступности исчезает: email-сервис может быть недоступен часами — сообщения накопятся в брокере и обработаются, когда сервис вернётся. Потеря работы компенсируется: после публикации сообщение сохранено в брокере — даже если web-сервер упал сразу после отправки, сообщение доставится consumers. Разница в скорости абсорбируется: брокер работает как буфер между быстрым producer и медленными consumers.
Acknowledgment
Producer положил сообщение и забыл — temporal decoupling на стороне отправителя работает. Но consumer должен это сообщение обработать. Что произойдёт, если consumer получил сообщение, начал резервирование товара на складе и упал?
Если брокер отдаёт сообщение и сразу удаляет его — сообщение потеряно, товар не зарезервирован. Это at-most-once: доставка без подтверждения, потеря при любом сбое.
Acknowledgment (ACK) решает проблему: consumer явно сообщает брокеру «обработка завершена», и брокер удаляет сообщение только после ACK. Пока ACK не пришёл — сообщение остаётся в очереди.
Broker: [reserve_stock, send_email, track_analytics]
↓
Consumer (warehouse)
↓ товар зарезервирован
↓ crash до отправки ACK
Broker: [reserve_stock, send_email, track_analytics] ← msg не удалён
↓ повторная доставка
Consumer₂ (warehouse)
↓ пытается зарезервировать тот же товар повторно
Consumer упал до ACK — брокер отдаёт сообщение другому consumer. Сообщение обработается как минимум один раз, но, возможно, не один — warehouse резервирует тот же товар повторно. Warehouse-сервис защищается от дубликатов через idempotency: проверяет «заказ #42 уже зарезервирован?» и не резервирует повторно.
Point-to-Point и Pub/Sub
Один consumer для обработки заказов на складе — хватает при 50 заказах в минуту. В пиковые часы поступает 200 заказов в минуту, consumer не успевает. Нужен второй, третий, пятый. Но каждый заказ должен быть зарезервирован ровно один раз — если пять consumers получат одно и то же сообщение, товар зарезервируется пять раз.
Point-to-point (очередь задач) решает это: несколько consumers конкурируют за сообщения из одной очереди, но каждое конкретное сообщение достаётся только одному.
flowchart LR P["Producer"] --> Q["Queue"] Q --> C1["Consumer₁<br>(заказ #42)"] Q --> C2["Consumer₂<br>(заказ #43)"] Q --> C3["Consumer₃<br>(заказ #44)"]
Пять consumers обрабатывают заказы параллельно, каждый заказ — ровно одним consumer. Так работает Sidekiq: воркеры конкурируют за задачи из очереди.
Теперь другая сторона задачи. Событие «заказ #42 оформлен» интересует не только склад. Его должны получить четыре независимые системы: склад (зарезервировать), email (отправить подтверждение), аналитика (записать событие), программа лояльности (начислить баллы). Каждая система должна получить каждое событие — это не конкуренция за сообщения, а широковещание.
Pub/Sub (publish/subscribe): одно сообщение доставляется всем подписчикам.
flowchart LR P["Publisher"] --> T["Topic:<br>order_created"] T --> S1["Subscriber (warehouse)"] T --> S2["Subscriber (email)"] T --> S3["Subscriber (analytics)"] T --> S4["Subscriber (loyalty)"]
На практике оба паттерна комбинируются через consumer group. Consumers внутри одной группы конкурируют за сообщения (point-to-point: каждый заказ резервирует один worker). Разные группы на одном топике получают все сообщения независимо (pub/sub: и склад, и email видят каждый заказ). Группа «warehouse» из 5 workers плюс группа «email» из 2 workers — обе получают каждое событие заказа. Между группами — широковещание (каждая группа видит все события), внутри группы — конкуренция (каждое событие обрабатывает один consumer). Consumer groups реализованы в Kafka и Redis Streams.
Две модели: очередь сообщений и лог
Прошёл месяц. Появилась команда data science, которая хочет построить модель прогнозирования спроса на основе всех заказов за последние 30 дней. Нужно перечитать историю заказов из брокера — но сообщения удалялись после ACK. История потеряна.
Это различие определяет две модели хранения сообщений.
Очередь сообщений (RabbitMQ, AWS SQS, Sidekiq+Redis) удаляет сообщение после подтверждения. Очередь — буфер для необработанных задач. Сообщение — это задача: выполнить и забыть.
Producer → [msg3, msg2, msg1] → Consumer
ACK(msg1)
[msg3, msg2] ← msg1 удалён навсегда
Лог-ориентированный брокер (Kafka, Redis Streams) записывает сообщения в append-only лог и не удаляет их после обработки. Consumer отслеживает свою позицию (offset) в логе. Разные consumers читают один и тот же лог с разных позиций. Сообщение — это факт (событие), которое произошло. Факт не исчезает после того, как кто-то о нём узнал — он хранится до истечения настроенного срока хранения (retention period), обычно дни или недели, после чего старые сегменты удаляются.
[order_1, order_2, order_3, order_4, order_5, ...] ← лог растёт
↑ ↑
warehouse group data science
(offset=5, догнал) (offset=1, читает историю)
Ключевое свойство лога — replay: возможность перечитать поток событий с любой точки. Команда data science подключает consumer, ставит offset на начало лога и проходит все 30 дней заказов. С очередью сообщений эти сообщения давно удалены — перечитать невозможно.
Выбор между моделями определяется тем, нужна ли история. Отправка email после регистрации — задача, которую нужно выполнить и забыть: очередь сообщений. Поток бизнес-событий, к которым могут подключаться новые consumers (аналитика, модели прогнозирования, аудит) — лог. На практике сервис часто использует оба: Sidekiq для фоновых задач (отправить email, обработать изображение), Kafka для потока бизнес-событий (заказ создан, оплата прошла, товар отгружен).
Ordering и partitions
Consumer groups позволили масштабировать warehouse до пяти workers. Но параллельная обработка порождает проблему порядка.
Пользователь сменил адрес доставки для заказа #42 на «ул. Пушкина, 10», а через секунду — на «ул. Лермонтова, 5». Два события попали в очередь. Consumer A забрал первое, обрабатывает 500ms. Consumer B забрал второе, обработал за 50ms. Результат: адрес доставки — «ул. Пушкина, 10» вместо «ул. Лермонтова, 5», потому что Consumer A завершил позже.
Даже если брокер отдаёт сообщения в порядке поступления, параллельная обработка несколькими consumers ломает порядок завершения. Для независимых операций (каждый email отправляется сам по себе) это неважно. Для операций над одной сущностью — порядок критичен.
Partition (партиция) решает задачу. Топик физически разбивается на несколько независимых сегментов, каждый — отдельный упорядоченный лог. Сообщение попадает в конкретную партицию через хеширование ключа:
partition = hash(partition_key) % количество_партиций
Если partition key = order_id, все события одного заказа попадают в одну партицию. Одну партицию читает один consumer. «Адрес → Пушкина» и «адрес → Лермонтова» для одного order_id обработаются строго последовательно одним consumer — порядок гарантирован.
Topic: order-events (3 партиции)
Partition 0: [order_42: addr→Пушкина, order_42: addr→Лермонтова] → Consumer A
Partition 1: [order_43: status→shipped] → Consumer B
Partition 2: [order_44: created] → Consumer C
Аналогия с шардингом прямая: партиция — шард, partition key — shard key, хеш-функция распределяет данные по узлам.
Количество партиций определяет потолок параллелизма. Одну партицию читает один consumer из группы. При 4 партициях и 6 consumers — 4 работают, 2 простаивают. Типичная практика: создавать партиции с запасом (8–16 при текущих 2–3 consumers). Когда нагрузка вырастет — добавятся consumers, и они перераспределят партиции автоматически (rebalancing). Добавить партиции в существующий топик сложнее — ломается распределение ключей, часть событий одного заказа может оказаться в новой партиции.
Партиция гарантирует ordering только для того, что уже записано. Порядок записи — ответственность producer. При асинхронной отправке батчами первый batch может уйти в retry, а второй успешно запишется — ordering нарушен на стороне producer. Ordering — end-to-end свойство, его нужно обеспечивать на каждом звене: от producer через брокер до consumer.
Backpressure
Temporal decoupling абсорбирует разницу в скорости — в этом суть буфера. Но буфер конечен. В «Чёрную пятницу» producer (web-сервер) кладёт 10 000 заказов в минуту, consumers обрабатывают 2 000. Очередь растёт на 8 000 сообщений в минуту — через час в ней 480 000 необработанных заказов. Память брокера не бесконечна: в какой-то момент у брокера кончится память, latency обработки вырастет до часов, или сообщения начнут теряться.
Backpressure (обратное давление) — механизм, который сообщает producer: «притормози, consumer не справляется».
Простейший ответ — drop: когда очередь полна, новые сообщения отклоняются. Producer получает ошибку и решает сам: retry, сохранить локально, потерять. Redis при достижении maxmemory с дефолтной политикой noeviction возвращает ошибку на команды записи — producer (Rails-приложение) получит исключение. Подходит, когда потеря допустима — метрики, логи, необязательные уведомления. Но для заказов потеря неприемлема.
Альтернатива — block: producer блокируется на операции send(), пока в очереди не появится место, автоматически замедляясь до скорости consumer. Ничего не теряется, однако если producer — web-сервер, заблокированный send() означает зависший HTTP-запрос: треды копятся, начинается cascading failure.
Kafka решает задачу иначе — overflow to disk: данные пишутся на диск, page cache операционной системы обслуживает горячие сегменты. Throughput падает при чтении холодных данных, но ничего не теряется и ничего не блокируется.
На практике backpressure чаще всего сигнализирует о необходимости масштабирования consumers: добавить workers, увеличить количество партиций, оптимизировать обработку.
Dead Letter Queue
At-least-once гарантирует: если обработка упала, сообщение вернётся в очередь. Но что если сообщение невозможно обработать в принципе?
# Consumer получает сообщение о заказе
{ "order_id": 42, "amount": "not-a-number" }
# Парсинг падает
Integer("not-a-number") # ArgumentErrorСообщение возвращается в очередь. Другой consumer забирает — та же ошибка. Бесконечный цикл. Такое сообщение — poison message: оно блокирует обработку остальных (при последовательной обработке) или бессмысленно расходует ресурсы (при параллельной).
Dead Letter Queue (DLQ) — отдельная очередь для необработанных сообщений. После N неудачных попыток брокер перемещает сообщение из основной очереди в DLQ.
Основная очередь: [order_44, order_43]
↓
Consumer
↓ 5 failures на order_42
DLQ: [order_42] ← poison message изолирован, основная очередь движется
DLQ — диагностический инструмент, а не автоматическое восстановление. Инженер смотрит содержимое DLQ: баг в consumer? Невалидные данные от producer? После исправления причины сообщения можно переложить обратно в основную очередь (replay из DLQ) или удалить.
Перед DLQ обычно стоит цикл retry с exponential backoff. Sidekiq делает до 25 попыток за 21 день: первая через 16 секунд, далее по нарастающей. Длинный цикл оправдан для transient failures — внешний сервис вернётся через час, денег на счёте не хватило сегодня, но хватит завтра, команда задеплоит фикс за пару дней. Против poison message с невалидными данными retry бессилен: Integer("not-a-number") будет падать и через 21 день. DLQ — конечная точка для таких сообщений.
Мониторинг
Система работает: warehouse group обрабатывает заказы, email group рассылает подтверждения, DLQ ловит poison messages. Как узнать, что что-то идёт не так, до того как покупатели начнут жаловаться на задержку доставки?
Очередь — буфер. Здоровый буфер почти пуст: сообщения приходят и быстро уходят. Проблема проявляется, когда буфер растёт.
Первый сигнал — queue depth, количество сообщений, ожидающих обработки. Растущий тренд означает, что consumers не справляются. Это leading indicator: показывает проблему до того, как пользователи заметят задержки в доставке email или обновлении склада. Для log-based систем (Kafka, Streams) эквивалент — consumer lag: разница между последним записанным offset и текущей позицией consumer. Lag в 1 000 000 сообщений при скорости consumer 1 000 msg/sec — отставание ~17 минут. Если lag растёт — consumer не успевает за producer.
Queue depth показывает, что consumers отстают, но не почему. Причину раскрывает processing time — время обработки одного сообщения. Рост указывает на деградацию зависимостей (медленный внешний API, тяжёлые запросы к БД) или усложнение самих сообщений. Если processing time стабилен, а queue depth растёт — consumers не виноваты, producer просто генерирует больше нагрузки.
Отдельная диагностика — error rate, доля сообщений, уходящих в retry или DLQ. Рост error rate при стабильном queue depth означает, что появились poison messages или деградировал внешний сервис. Рост error rate вместе с queue depth — consumers захлёбываются: ошибки порождают retry, retry увеличивают очередь.
Характерный паттерн здоровой системы: queue depth вырос с 0 до 10 000 за 10 минут, затем стабилизировался и начал падать, error rate не изменился. Очередь абсорбировала пик нагрузки, consumers постепенно разгребают накопившееся — temporal decoupling работает как задумано.
Когда очереди не нужны
Queue depth, consumer lag, error rate — каждая метрика требует алертов и реакции. Брокер — дополнительный компонент: деплой, мониторинг, бэкапы, масштабирование. Код усложняется: вместо синхронного вызова — producer, consumer, idempotency, DLQ, мониторинг лага. Добавлять эту сложность стоит только при конкретной проблеме.
Запись в локальную БД занимает 5ms и почти никогда не падает — очередь ради этого добавляет overhead без пользы. Пользователь нажал «купить» и ему нужен ответ «оплата прошла» или «ошибка» прямо сейчас — критический путь должен быть синхронным, асинхронность уместна для побочных эффектов (email, аналитика). При 10 запросах в минуту и одном сервере нет ни пиковых нагрузок, ни проблемы со скоростью зависимостей. Если потеря задачи при падении допустима — можно обойтись in-memory очередью в том же процессе.
Практическое правило: начинай с синхронного вызова. Добавляй очередь, когда появляется конкретная боль — ненадёжный внешний сервис, пиковые нагрузки, долгие операции, блокирующие HTTP-ответ, или потребность в развязке между командами (одна команда пишет producer, другая — consumer, каждая деплоит независимо).
См. также
- Sidekiq: архитектура — три роли, данные в Redis, устройство серверного процесса
- Sidekiq: retry и обработка ошибок — формула retry, sorted sets, Dead Letter Queue
- Очереди на Redis — реализация через LIST, LMOVE, Streams
- Redis Streams — log-based структура с consumer groups
- Очереди в PostgreSQL —
FOR UPDATE SKIP LOCKEDкак альтернатива внешнему брокеру - Kafka: consumer internals — poll loop, offset commit, rebalancing
- Бронирование отелей — async-обработка и state machine заказа
Sources
- Kleppmann, 2017, Designing Data-Intensive Applications, Chapter 11 — stream processing, message brokers, exactly-once semantics
- Kreps, 2013, The Log: What every software engineer should know about real-time data’s unifying abstraction — лог как основа messaging
- Narkhede, Shapira, Palino, 2017, Kafka: The Definitive Guide — Kafka architecture, partitions, consumer groups
- Hohpe, Woolf, 2003, Enterprise Integration Patterns — messaging patterns, temporal decoupling, channel types
← Гарантии доставки в распределённых системах | Выбор хранилища под паттерн доступа →