Очереди задач в PostgreSQL: FOR UPDATE SKIP LOCKED

Предпосылки: блокировки (row-level locks, очереди ожидания), практические паттерны (пессимистичный подход, hot spots).

Распространённые ошибки | VACUUM

FOR UPDATE решает координацию записи в одну строку: два запроса одновременно списывают деньги с одного аккаунта — блокировка сериализует их, каждый завершается с первой попытки. Но очередь задач — другой паттерн. Воркеры конкурируют не за одну строку, а за «следующую свободную» из тысяч. Чистый FOR UPDATE здесь создаёт проблему: если первая задача заблокирована, все воркеры встают в очередь за ней, хотя следующие задачи свободны.

Интернет-магазин после оформления заказа ставит в очередь фоновые операции: отправить подтверждение, начислить кешбэк, обновить аналитику. В пиковые часы — 200 задач в секунду, обрабатывают 5 воркеров. Каждый воркер в цикле забирает задачу, выполняет и берёт следующую. Цель — обеспечить, чтобы каждая операция была обработана ровно одним воркером, без простоя остальных.

Проблема: два воркера забирают одну задачу

Таблица очереди:

CREATE TABLE jobs (
  id BIGSERIAL PRIMARY KEY,
  run_at TIMESTAMP NOT NULL,
  status TEXT NOT NULL DEFAULT 'queued', -- queued/processing/done/failed
  processing_started_at TIMESTAMP,
  payload JSONB NOT NULL
);

Наивная логика воркера — SELECT, потом UPDATE:

BEGIN;
 
SELECT id
FROM jobs
WHERE status = 'queued' AND run_at <= now()
ORDER BY run_at
LIMIT 1;
 
UPDATE jobs SET status = 'processing' WHERE id = $id;
 
COMMIT;

Два воркера выполняют этот код одновременно. Оба видят одну и ту же строку в SELECT. Оба пытаются обновить её. В лучшем случае один ждёт блокировку и обновляет строку, которая уже в статусе processing. В худшем — двойная обработка: оба воркера отправляют подтверждение заказа, клиент получает два письма.

Блокировка выбранной строки при чтении

SELECT ... FOR UPDATE берёт row-level lock на выбранные строки прямо при чтении. Выбор и блокировка происходят атомарно — между ними нет окна, в которое может вклиниться другой воркер.

BEGIN;
 
WITH next_job AS (
  SELECT id
  FROM jobs
  WHERE status = 'queued' AND run_at <= now()
  ORDER BY run_at
  FOR UPDATE
  LIMIT 1
)
UPDATE jobs
SET status = 'processing', processing_started_at = now()
WHERE id IN (SELECT id FROM next_job)
RETURNING *;
 
COMMIT;

Если RETURNING вернул строку — воркер забрал задачу. Если пусто — очередь пуста.

Проблема проявляется при нагрузке. Воркер A забрал задачу #1 и обрабатывает её 30 секунд. Воркеры B, C, D, E делают тот же SELECT — он возвращает задачу #1 как первую подходящую (она всё ещё queued до COMMIT воркера A). Все четыре воркера встают в очередь ожидания на row lock задачи #1. Задачи #2, #3, #4, #5 свободны, но никто их не берёт. Вместо 5 параллельных воркеров работает один — пропускная способность падает в 5 раз.

SKIP LOCKED: пропустить заблокированное, взять следующее

SKIP LOCKED меняет поведение: вместо ожидания блокировки строка пропускается, и запрос переходит к следующей подходящей.

WITH next_job AS (
  SELECT id
  FROM jobs
  WHERE status = 'queued' AND run_at <= now()
  ORDER BY run_at
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
UPDATE jobs
SET status = 'processing', processing_started_at = now()
WHERE id IN (SELECT id FROM next_job)
RETURNING *;

Теперь воркер A держит lock на задаче #1, воркер B пропускает её и берёт #2, воркер C — #3. Все пять воркеров работают параллельно. Таблица jobs превращается в многопотребительскую очередь без внешнего диспетчера — координацию обеспечивает сам PostgreSQL через row-level locks. Альтернативный подход — вынести очередь во внешний брокер: Sidekiq использует Redis LIST + BRPOP, где конкуренция за задачу решается не блокировкой строк, а атомарным извлечением элемента из списка.

SKIP LOCKED сознательно жертвует строгой справедливостью (FIFO): если ранняя задача заблокирована долго, более поздние обработаются раньше. Для очередей фоновых задач это нормально — важнее пропускная способность, чем строгий порядок.

PostgreSQL также предлагает модификатор NOWAIT (вместо ожидания — немедленная ошибка), но для очередей SKIP LOCKED подходит лучше: ошибка создаёт лишнюю логику retry на стороне приложения, а пропуск заблокированной строки решает задачу напрямую.

Порядок и производительность

Без ORDER BY PostgreSQL не обязан возвращать строки в стабильном порядке. Задача с run_at = 10:00 может обработаться позже задачи с run_at = 10:05. Для приближения к FIFO порядок фиксируется явно:

ORDER BY run_at, id

Добавление id разрешает ties: если две задачи запланированы на одно время, порядок определяется по id.

При 200 задачах в секунду таблица быстро растёт. Каждый poll воркера выполняет запрос с фильтром status = 'queued' AND run_at <= now() и сортировкой по run_at. Без индекса PostgreSQL делает Seq Scan по всей таблице — включая миллионы завершённых задач. Индекс под форму запроса:

CREATE INDEX jobs_queue_idx ON jobs(run_at, id) WHERE status = 'queued';

Partial index (WHERE status = 'queued') содержит только задачи, ожидающие обработки. По мере обработки задачи уходят из индекса — его размер пропорционален длине очереди, а не общему количеству задач.

Жизненный цикл задачи

Воркер забрал задачу через FOR UPDATE SKIP LOCKED и поставил status = 'processing', processing_started_at = now(). Дальше три исхода.

Успех: воркер выполнил работу и обновил статус на done. Задача завершена.

Ошибка: воркер поймал исключение. Можно сразу перевести в failed или вернуть в queued для повторной попытки (с увеличением счётчика attempts). После N неудач — failed, задача попадает в аналог Dead Letter Queue: остаётся в таблице для ручного разбора.

Зависание: воркер взял задачу и пропал — процесс убит, сервер перезагружен, OOM. Задача осталась в processing навсегда. Для обнаружения таких задач нужен внешний процесс (cron, отдельный воркер), который периодически ищет зависшие:

UPDATE jobs
SET status = 'queued', processing_started_at = NULL
WHERE status = 'processing'
  AND processing_started_at < now() - INTERVAL '5 minutes'
RETURNING id;

Таймаут (здесь 5 минут) зависит от максимального ожидаемого времени обработки. Слишком короткий — задача «оживёт» раньше, чем воркер закончит, и будет обработана дважды. Слишком длинный — зависшие задачи копятся.

Идемпотентность обработки

Возврат задачи по таймауту создаёт ситуацию, знакомую по гарантиям доставки: at-least-once. Задача будет обработана минимум один раз, но возможны повторы. Конкретные причины повторов в этой схеме:

  • Воркер выполнил внешний эффект (отправил письмо), но упал до COMMIT — транзакция откатилась, задача вернулась в queued.
  • Воркер завис, задача вернулась по таймауту, хотя внешний эффект уже произошёл.
  • Транзакция откатилась из-за ошибки базы (deadlock, timeout), задача снова стала видимой.

Во всех случаях задача обработается повторно. Подход к защите — idempotency: обработчик должен быть написан так, чтобы повторное выполнение не создавало дублирующих побочных эффектов. Для внешних API — idempotency key. Для записей в PostgreSQL — INSERT ... ON CONFLICT DO NOTHING или проверка статуса перед выполнением.

Очереди и dead tuples

Каждая смена статуса задачи (queuedprocessingdone/failed) и каждый retry создают новую версию строки — старая становится dead tuple. Таблица-очередь с высоким throughput генерирует dead tuples в разы быстрее типичной OLTP-таблицы: при 200 задачах в секунду и трёх сменах статуса на задачу — 600 dead tuples в секунду.

Без частого VACUUM таблица разбухает, а partial index jobs_queue_idx накапливает указатели на мёртвые строки, замедляя каждый poll воркера. Для таблиц-очередей стоит настроить autovacuum агрессивнее per-table — снизить autovacuum_vacuum_scale_factor и autovacuum_vacuum_threshold так, чтобы VACUUM запускался при меньшем накоплении dead tuples, чем на обычных таблицах. Конкретные значения зависят от размера таблицы и характера нагрузки.

Sources


Распространённые ошибки | VACUUM