Message Queues: асинхронная коммуникация между сервисами

Предпосылки: гарантии доставки (at-most-once, at-least-once, exactly-once = idempotency), паттерны надёжности (retry, cascading failure).

Гарантии доставки в распределённых системах | Выбор хранилища под паттерн доступа

Кэширование решает проблему повторных чтений — горячие данные обслуживаются из Redis, не нагружая PostgreSQL. Но оформление заказа — не чтение данных. Это цепочка действий: зарезервировать товар на складе, списать деньги, отправить подтверждение, записать аналитику. Каждое действие — вызов внешнего сервиса. Кэш здесь не поможет: нужно выполнить работу, а не вернуть сохранённый результат.

Цена синхронной цепочки

Пользователь нажимает «Оформить заказ». Обработчик вызывает четыре сервиса последовательно:

POST /orders
  → warehouse.reserve()    50ms
  → billing.charge()      500ms
  → email.send()        2 000ms
  → analytics.record()    100ms
  → return "Заказ оформлен"

Пользователь ждёт суммарные 2 650ms. При нормальной работе — терпимо. Проблемы начинаются, когда что-то идёт не по плану.

Coupling по доступности. Каждый сервис — точка отказа. Если email-сервис недоступен, весь заказ не оформляется, хотя склад и биллинг работают. Четыре зависимости с доступностью 99.9% каждая дают общую доступность 99.9%⁴ ≈ 99.6% — в четыре раза больше простоя, чем у каждого сервиса по отдельности. Добавление пятого сервиса ухудшает ситуацию.

Потеря работы при падении. Обработчик вызвал склад (товар зарезервирован) и биллинг (деньги списаны), но упал до отправки email. Пользователь заплатил, товар заблокирован — подтверждение не отправлено. При перезапуске состояние потеряно: непонятно, какие шаги выполнены, а какие нет — восстановить процесс невозможно.

Разница в скорости. Склад отвечает за 50ms, email — за 2 секунды. Для ответа «заказ принят» достаточно проверить наличие товара и списать оплату, а отправка письма и запись аналитики могут произойти позже. Но при синхронной цепочке пользователь ждёт самый медленный компонент. В пиковые часы аналитика не справляется с потоком и начинает таймаутить — timeout срабатывает, весь заказ возвращает ошибку из-за некритичного компонента.

У всех трёх проблем одна причина: producer (обработчик заказа) и consumers (warehouse, billing, email) связаны во времени — producer ждёт, пока каждый consumer закончит работу. Оба должны быть доступны одновременно, работать на одной скорости, и сбой одного немедленно бьёт по другому.

Temporal decoupling: развязка во времени

Решение — разорвать временну́ю связь. Temporal decoupling (развязка во времени): producer и consumer работают независимо. Producer отправляет сообщение и продолжает работу, не дожидаясь consumer. Consumer обрабатывает сообщение, когда готов — через миллисекунды, минуты или часы.

Message broker (брокер сообщений) — сервис-посредник, который делает эту развязку возможной. Он принимает сообщения от producers, хранит их и доставляет consumers.

flowchart TB
    P["Producer<br>(обработчик заказа)"] -->|"publish<br>(заказ #42 оформлен)"| B["Message Broker"]
    B -->|"consume<br>(когда consumer готов)"| C["Consumer<br>(warehouse / billing / email / analytics)"]

С брокером обработчик заказа меняется:

POST /orders
  → billing.charge()                                 500ms  ← синхронно, критичная операция
  → broker.publish("order_created", order_data)        5ms  ← асинхронно, остальное
  → return "Заказ оформлен"

505ms вместо 2 650ms. Пользователь не ждёт email и аналитику. Склад, email-сервис и аналитика подпишутся на событие «заказ оформлен» и обработают его в своём темпе.

Почему брокер — отдельный сервис, а не очередь в памяти приложения? Очередь в памяти умирает вместе с процессом: перезапуск web-сервера — все накопленные сообщения потеряны. Брокер переживает падение обоих участников: producer положил сообщение и упал — сообщение в брокере; consumer упал — сообщение дождётся нового consumer. Персистенция, подтверждение доставки, повторные попытки — ответственность брокера, а не прикладного кода.

Temporal decoupling решает все три проблемы синхронной цепочки. Coupling по доступности исчезает: email-сервис может быть недоступен часами — сообщения накопятся в брокере и обработаются, когда сервис вернётся. Потеря работы компенсируется: после публикации сообщение сохранено в брокере — даже если web-сервер упал сразу после отправки, сообщение доставится consumers. Разница в скорости абсорбируется: брокер работает как буфер между быстрым producer и медленными consumers.

Acknowledgment

Producer положил сообщение и забыл — temporal decoupling на стороне отправителя работает. Но consumer должен это сообщение обработать. Что произойдёт, если consumer получил сообщение, начал резервирование товара на складе и упал?

Если брокер отдаёт сообщение и сразу удаляет его — сообщение потеряно, товар не зарезервирован. Это at-most-once: доставка без подтверждения, потеря при любом сбое.

Acknowledgment (ACK) решает проблему: consumer явно сообщает брокеру «обработка завершена», и брокер удаляет сообщение только после ACK. Пока ACK не пришёл — сообщение остаётся в очереди.

Broker: [reserve_stock, send_email, track_analytics]
            ↓
        Consumer (warehouse)
            ↓ товар зарезервирован
            ↓ crash до отправки ACK

Broker: [reserve_stock, send_email, track_analytics]  ← msg не удалён
            ↓ повторная доставка
        Consumer₂ (warehouse)
            ↓ пытается зарезервировать тот же товар повторно

Consumer упал до ACK — брокер отдаёт сообщение другому consumer. Сообщение обработается как минимум один раз, но, возможно, не один — warehouse резервирует тот же товар повторно. Warehouse-сервис защищается от дубликатов через idempotency: проверяет «заказ #42 уже зарезервирован?» и не резервирует повторно.

Point-to-Point и Pub/Sub

Один consumer для обработки заказов на складе — хватает при 50 заказах в минуту. В пиковые часы поступает 200 заказов в минуту, consumer не успевает. Нужен второй, третий, пятый. Но каждый заказ должен быть зарезервирован ровно один раз — если пять consumers получат одно и то же сообщение, товар зарезервируется пять раз.

Point-to-point (очередь задач) решает это: несколько consumers конкурируют за сообщения из одной очереди, но каждое конкретное сообщение достаётся только одному.

flowchart LR
    P["Producer"] --> Q["Queue"]
    Q --> C1["Consumer₁<br>(заказ #42)"]
    Q --> C2["Consumer₂<br>(заказ #43)"]
    Q --> C3["Consumer₃<br>(заказ #44)"]

Пять consumers обрабатывают заказы параллельно, каждый заказ — ровно одним consumer. Так работает Sidekiq: воркеры конкурируют за задачи из очереди.

Теперь другая сторона задачи. Событие «заказ #42 оформлен» интересует не только склад. Его должны получить четыре независимые системы: склад (зарезервировать), email (отправить подтверждение), аналитика (записать событие), программа лояльности (начислить баллы). Каждая система должна получить каждое событие — это не конкуренция за сообщения, а широковещание.

Pub/Sub (publish/subscribe): одно сообщение доставляется всем подписчикам.

flowchart LR
    P["Publisher"] --> T["Topic:<br>order_created"]
    T --> S1["Subscriber (warehouse)"]
    T --> S2["Subscriber (email)"]
    T --> S3["Subscriber (analytics)"]
    T --> S4["Subscriber (loyalty)"]

На практике оба паттерна комбинируются через consumer group. Consumers внутри одной группы конкурируют за сообщения (point-to-point: каждый заказ резервирует один worker). Разные группы на одном топике получают все сообщения независимо (pub/sub: и склад, и email видят каждый заказ). Группа «warehouse» из 5 workers плюс группа «email» из 2 workers — обе получают каждое событие заказа. Между группами — широковещание (каждая группа видит все события), внутри группы — конкуренция (каждое событие обрабатывает один consumer). Consumer groups реализованы в Kafka и Redis Streams.

Две модели: очередь сообщений и лог

Прошёл месяц. Появилась команда data science, которая хочет построить модель прогнозирования спроса на основе всех заказов за последние 30 дней. Нужно перечитать историю заказов из брокера — но сообщения удалялись после ACK. История потеряна.

Это различие определяет две модели хранения сообщений.

Очередь сообщений (RabbitMQ, AWS SQS, Sidekiq+Redis) удаляет сообщение после подтверждения. Очередь — буфер для необработанных задач. Сообщение — это задача: выполнить и забыть.

Producer → [msg3, msg2, msg1] → Consumer
                                  ACK(msg1)
           [msg3, msg2]         ← msg1 удалён навсегда

Лог-ориентированный брокер (Kafka, Redis Streams) записывает сообщения в append-only лог и не удаляет их после обработки. Consumer отслеживает свою позицию (offset) в логе. Разные consumers читают один и тот же лог с разных позиций. Сообщение — это факт (событие), которое произошло. Факт не исчезает после того, как кто-то о нём узнал — он хранится до истечения настроенного срока хранения (retention period), обычно дни или недели, после чего старые сегменты удаляются.

[order_1, order_2, order_3, order_4, order_5, ...]   ← лог растёт
       ↑                         ↑
  warehouse group            data science
  (offset=5, догнал)         (offset=1, читает историю)

Ключевое свойство лога — replay: возможность перечитать поток событий с любой точки. Команда data science подключает consumer, ставит offset на начало лога и проходит все 30 дней заказов. С очередью сообщений эти сообщения давно удалены — перечитать невозможно.

Выбор между моделями определяется тем, нужна ли история. Отправка email после регистрации — задача, которую нужно выполнить и забыть: очередь сообщений. Поток бизнес-событий, к которым могут подключаться новые consumers (аналитика, модели прогнозирования, аудит) — лог. На практике сервис часто использует оба: Sidekiq для фоновых задач (отправить email, обработать изображение), Kafka для потока бизнес-событий (заказ создан, оплата прошла, товар отгружен).

Ordering и partitions

Consumer groups позволили масштабировать warehouse до пяти workers. Но параллельная обработка порождает проблему порядка.

Пользователь сменил адрес доставки для заказа #42 на «ул. Пушкина, 10», а через секунду — на «ул. Лермонтова, 5». Два события попали в очередь. Consumer A забрал первое, обрабатывает 500ms. Consumer B забрал второе, обработал за 50ms. Результат: адрес доставки — «ул. Пушкина, 10» вместо «ул. Лермонтова, 5», потому что Consumer A завершил позже.

Даже если брокер отдаёт сообщения в порядке поступления, параллельная обработка несколькими consumers ломает порядок завершения. Для независимых операций (каждый email отправляется сам по себе) это неважно. Для операций над одной сущностью — порядок критичен.

Partition (партиция) решает задачу. Топик физически разбивается на несколько независимых сегментов, каждый — отдельный упорядоченный лог. Сообщение попадает в конкретную партицию через хеширование ключа:

partition = hash(partition_key) % количество_партиций

Если partition key = order_id, все события одного заказа попадают в одну партицию. Одну партицию читает один consumer. «Адрес → Пушкина» и «адрес → Лермонтова» для одного order_id обработаются строго последовательно одним consumer — порядок гарантирован.

Topic: order-events (3 партиции)

Partition 0: [order_42: addr→Пушкина, order_42: addr→Лермонтова]  → Consumer A
Partition 1: [order_43: status→shipped]                            → Consumer B
Partition 2: [order_44: created]                                   → Consumer C

Аналогия с шардингом прямая: партиция — шард, partition key — shard key, хеш-функция распределяет данные по узлам.

Количество партиций определяет потолок параллелизма. Одну партицию читает один consumer из группы. При 4 партициях и 6 consumers — 4 работают, 2 простаивают. Типичная практика: создавать партиции с запасом (8–16 при текущих 2–3 consumers). Когда нагрузка вырастет — добавятся consumers, и они перераспределят партиции автоматически (rebalancing). Добавить партиции в существующий топик сложнее — ломается распределение ключей, часть событий одного заказа может оказаться в новой партиции.

Партиция гарантирует ordering только для того, что уже записано. Порядок записи — ответственность producer. При асинхронной отправке батчами первый batch может уйти в retry, а второй успешно запишется — ordering нарушен на стороне producer. Ordering — end-to-end свойство, его нужно обеспечивать на каждом звене: от producer через брокер до consumer.

Backpressure

Temporal decoupling абсорбирует разницу в скорости — в этом суть буфера. Но буфер конечен. В «Чёрную пятницу» producer (web-сервер) кладёт 10 000 заказов в минуту, consumers обрабатывают 2 000. Очередь растёт на 8 000 сообщений в минуту — через час в ней 480 000 необработанных заказов. Память брокера не бесконечна: в какой-то момент у брокера кончится память, latency обработки вырастет до часов, или сообщения начнут теряться.

Backpressure (обратное давление) — механизм, который сообщает producer: «притормози, consumer не справляется».

Простейший ответ — drop: когда очередь полна, новые сообщения отклоняются. Producer получает ошибку и решает сам: retry, сохранить локально, потерять. Redis при достижении maxmemory с дефолтной политикой noeviction возвращает ошибку на команды записи — producer (Rails-приложение) получит исключение. Подходит, когда потеря допустима — метрики, логи, необязательные уведомления. Но для заказов потеря неприемлема.

Альтернатива — block: producer блокируется на операции send(), пока в очереди не появится место, автоматически замедляясь до скорости consumer. Ничего не теряется, однако если producer — web-сервер, заблокированный send() означает зависший HTTP-запрос: треды копятся, начинается cascading failure.

Kafka решает задачу иначе — overflow to disk: данные пишутся на диск, page cache операционной системы обслуживает горячие сегменты. Throughput падает при чтении холодных данных, но ничего не теряется и ничего не блокируется.

На практике backpressure чаще всего сигнализирует о необходимости масштабирования consumers: добавить workers, увеличить количество партиций, оптимизировать обработку.

Dead Letter Queue

At-least-once гарантирует: если обработка упала, сообщение вернётся в очередь. Но что если сообщение невозможно обработать в принципе?

# Consumer получает сообщение о заказе
{ "order_id": 42, "amount": "not-a-number" }
 
# Парсинг падает
Integer("not-a-number")  # ArgumentError

Сообщение возвращается в очередь. Другой consumer забирает — та же ошибка. Бесконечный цикл. Такое сообщение — poison message: оно блокирует обработку остальных (при последовательной обработке) или бессмысленно расходует ресурсы (при параллельной).

Dead Letter Queue (DLQ) — отдельная очередь для необработанных сообщений. После N неудачных попыток брокер перемещает сообщение из основной очереди в DLQ.

Основная очередь: [order_44, order_43]
                       ↓
                   Consumer
                       ↓ 5 failures на order_42
DLQ: [order_42]  ← poison message изолирован, основная очередь движется

DLQ — диагностический инструмент, а не автоматическое восстановление. Инженер смотрит содержимое DLQ: баг в consumer? Невалидные данные от producer? После исправления причины сообщения можно переложить обратно в основную очередь (replay из DLQ) или удалить.

Перед DLQ обычно стоит цикл retry с exponential backoff. Sidekiq делает до 25 попыток за 21 день: первая через 16 секунд, далее по нарастающей. Длинный цикл оправдан для transient failures — внешний сервис вернётся через час, денег на счёте не хватило сегодня, но хватит завтра, команда задеплоит фикс за пару дней. Против poison message с невалидными данными retry бессилен: Integer("not-a-number") будет падать и через 21 день. DLQ — конечная точка для таких сообщений.

Мониторинг

Система работает: warehouse group обрабатывает заказы, email group рассылает подтверждения, DLQ ловит poison messages. Как узнать, что что-то идёт не так, до того как покупатели начнут жаловаться на задержку доставки?

Очередь — буфер. Здоровый буфер почти пуст: сообщения приходят и быстро уходят. Проблема проявляется, когда буфер растёт.

Первый сигнал — queue depth, количество сообщений, ожидающих обработки. Растущий тренд означает, что consumers не справляются. Это leading indicator: показывает проблему до того, как пользователи заметят задержки в доставке email или обновлении склада. Для log-based систем (Kafka, Streams) эквивалент — consumer lag: разница между последним записанным offset и текущей позицией consumer. Lag в 1 000 000 сообщений при скорости consumer 1 000 msg/sec — отставание ~17 минут. Если lag растёт — consumer не успевает за producer.

Queue depth показывает, что consumers отстают, но не почему. Причину раскрывает processing time — время обработки одного сообщения. Рост указывает на деградацию зависимостей (медленный внешний API, тяжёлые запросы к БД) или усложнение самих сообщений. Если processing time стабилен, а queue depth растёт — consumers не виноваты, producer просто генерирует больше нагрузки.

Отдельная диагностика — error rate, доля сообщений, уходящих в retry или DLQ. Рост error rate при стабильном queue depth означает, что появились poison messages или деградировал внешний сервис. Рост error rate вместе с queue depth — consumers захлёбываются: ошибки порождают retry, retry увеличивают очередь.

Характерный паттерн здоровой системы: queue depth вырос с 0 до 10 000 за 10 минут, затем стабилизировался и начал падать, error rate не изменился. Очередь абсорбировала пик нагрузки, consumers постепенно разгребают накопившееся — temporal decoupling работает как задумано.

Когда очереди не нужны

Queue depth, consumer lag, error rate — каждая метрика требует алертов и реакции. Брокер — дополнительный компонент: деплой, мониторинг, бэкапы, масштабирование. Код усложняется: вместо синхронного вызова — producer, consumer, idempotency, DLQ, мониторинг лага. Добавлять эту сложность стоит только при конкретной проблеме.

Запись в локальную БД занимает 5ms и почти никогда не падает — очередь ради этого добавляет overhead без пользы. Пользователь нажал «купить» и ему нужен ответ «оплата прошла» или «ошибка» прямо сейчас — критический путь должен быть синхронным, асинхронность уместна для побочных эффектов (email, аналитика). При 10 запросах в минуту и одном сервере нет ни пиковых нагрузок, ни проблемы со скоростью зависимостей. Если потеря задачи при падении допустима — можно обойтись in-memory очередью в том же процессе.

Практическое правило: начинай с синхронного вызова. Добавляй очередь, когда появляется конкретная боль — ненадёжный внешний сервис, пиковые нагрузки, долгие операции, блокирующие HTTP-ответ, или потребность в развязке между командами (одна команда пишет producer, другая — consumer, каждая деплоит независимо).

См. также

Sources

  • Kleppmann, 2017, Designing Data-Intensive Applications, Chapter 11 — stream processing, message brokers, exactly-once semantics
  • Kreps, 2013, The Log: What every software engineer should know about real-time data’s unifying abstraction — лог как основа messaging
  • Narkhede, Shapira, Palino, 2017, Kafka: The Definitive Guide — Kafka architecture, partitions, consumer groups
  • Hohpe, Woolf, 2003, Enterprise Integration Patterns — messaging patterns, temporal decoupling, channel types

Гарантии доставки в распределённых системах | Выбор хранилища под паттерн доступа