Appearance
ADR-007: Очереди фоновых задач
Дата: 2026-01-04
Статус: Принято
Авторы: Backend Team
Контекст
Проект требует асинхронной обработки задач:
- Обработка загруженных изображений (resize, optimize)
- Отправка SMS для верификации
- Отправка email-уведомлений
- Денормализация счётчиков
- Очистка устаревших данных
- Генерация отчётов для админки
Требования
- Надёжность: задачи не должны теряться
- Retry: автоматический повтор при ошибках
- Приоритизация: критичные задачи важнее
- Масштабируемость: возможность добавления workers
- Мониторинг: видимость состояния очередей
Решение
Используем BullMQ — библиотеку очередей на базе Redis.
Почему BullMQ
| Критерий | BullMQ | Agenda | Bee-Queue |
|---|---|---|---|
| Backend | Redis | MongoDB | Redis |
| TypeScript | ✅ Native | ⚠️ Partial | ❌ No |
| Retry | ✅ Flexible | ✅ Yes | ⚠️ Limited |
| Delayed jobs | ✅ Yes | ✅ Yes | ❌ No |
| Rate limiting | ✅ Yes | ❌ No | ❌ No |
| Dashboard | ✅ Bull Board | ✅ Agendash | ❌ No |
| Производительность | ✅ High | ⚠️ Medium | ✅ High |
Архитектура
┌─────────────────────────────────────────────────────────────┐
│ BACKEND (Producer) │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ await queue.add('send-sms', { phone, code }) ││
│ │ await queue.add('process-image', { imageId }, { ││
│ │ priority: 1, ││
│ │ delay: 0, ││
│ │ attempts: 3 ││
│ │ }) ││
│ └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ REDIS │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ Queues: ││
│ │ ├── bull:notifications:waiting ││
│ │ ├── bull:notifications:active ││
│ │ ├── bull:notifications:completed ││
│ │ ├── bull:notifications:failed ││
│ │ ├── bull:images:waiting ││
│ │ └── ... ││
│ └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ WORKERS (Consumers) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │
│ │ (images) │ │ (notify) │ │ (cleanup) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘Очереди и задачи
Структура очередей
| Очередь | Приоритет | Concurrency | Описание |
|---|---|---|---|
notifications | High | 5 | SMS, Email |
images | Medium | 3 | Обработка изображений |
analytics | Low | 2 | Счётчики, статистика |
cleanup | Low | 1 | Удаление старых данных |
reports | Low | 1 | Генерация отчётов |
Типы задач
typescript
// src/jobs/types.ts
// Notifications
interface SendSmsJob {
phone: string;
message: string;
template?: 'verification' | 'order' | 'reminder';
}
interface SendEmailJob {
to: string;
subject: string;
template: string;
data: Record<string, any>;
}
// Images
interface ProcessImageJob {
imageId: string;
productId: string;
originalKey: string;
}
interface DeleteImagesJob {
keys: string[];
}
// Analytics
interface UpdateCountersJob {
type: 'product_views' | 'seller_rating' | 'category_count';
entityId: string;
}
interface SyncDenormalizedDataJob {
table: 'users' | 'products' | 'categories';
}
// Cleanup
interface CleanupExpiredSessionsJob {
olderThan: Date;
}
interface CleanupVerificationCodesJob {
olderThan: Date;
}Реализация
Конфигурация очередей
typescript
// src/config/queues.ts
import { Queue, Worker, QueueScheduler } from 'bullmq';
import Redis from 'ioredis';
const connection = new Redis(process.env.REDIS_URL, {
maxRetriesPerRequest: null, // Важно для BullMQ
});
export const queues = {
notifications: new Queue('notifications', { connection }),
images: new Queue('images', { connection }),
analytics: new Queue('analytics', { connection }),
cleanup: new Queue('cleanup', { connection }),
reports: new Queue('reports', { connection }),
};
// Scheduler для delayed/repeated jobs
export const schedulers = {
notifications: new QueueScheduler('notifications', { connection }),
images: new QueueScheduler('images', { connection }),
analytics: new QueueScheduler('analytics', { connection }),
cleanup: new QueueScheduler('cleanup', { connection }),
};Добавление задач (Producer)
typescript
// src/services/notification.service.ts
import { queues } from '../config/queues';
export class NotificationService {
async sendVerificationSms(phone: string, code: string): Promise<void> {
await queues.notifications.add(
'send-sms',
{
phone,
message: `Ваш код подтверждения: ${code}`,
template: 'verification',
},
{
priority: 1, // Высокий приоритет
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
removeOnComplete: true,
removeOnFail: 100, // Хранить 100 последних failed
}
);
}
async sendWelcomeEmail(email: string, name: string): Promise<void> {
await queues.notifications.add(
'send-email',
{
to: email,
subject: 'Добро пожаловать на AutoParts!',
template: 'welcome',
data: { name },
},
{
priority: 2,
attempts: 5,
delay: 5000, // Отправить через 5 секунд
}
);
}
}Workers (Consumers)
typescript
// src/workers/notification.worker.ts
import { Worker, Job } from 'bullmq';
import { SmsService } from '../services/sms.service';
import { EmailService } from '../services/email.service';
const smsService = new SmsService();
const emailService = new EmailService();
export const notificationWorker = new Worker(
'notifications',
async (job: Job) => {
switch (job.name) {
case 'send-sms':
return await handleSendSms(job);
case 'send-email':
return await handleSendEmail(job);
default:
throw new Error(`Unknown job: ${job.name}`);
}
},
{
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000, // 10 SMS в секунду максимум
},
}
);
async function handleSendSms(job: Job<SendSmsJob>): Promise<void> {
const { phone, message } = job.data;
try {
await smsService.send(phone, message);
console.log(`SMS sent to ${phone}`);
} catch (error) {
console.error(`SMS failed: ${error.message}`);
throw error; // BullMQ выполнит retry
}
}
async function handleSendEmail(job: Job<SendEmailJob>): Promise<void> {
const { to, subject, template, data } = job.data;
await emailService.send({ to, subject, template, data });
}
// Event handlers
notificationWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
notificationWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
});Worker для изображений
typescript
// src/workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import sharp from 'sharp';
import { S3Service } from '../services/s3.service';
export const imageWorker = new Worker(
'images',
async (job: Job<ProcessImageJob>) => {
const { imageId, productId, originalKey } = job.data;
// Обновляем прогресс
await job.updateProgress(10);
// Скачиваем оригинал
const original = await s3Service.download(originalKey);
await job.updateProgress(20);
// Создаём варианты
const variants = ['thumbnail', 'medium', 'large'];
for (let i = 0; i < variants.length; i++) {
const variant = variants[i];
const config = IMAGE_VARIANTS[variant];
const processed = await sharp(original)
.resize(config.width, config.height, { fit: 'cover' })
.webp({ quality: config.quality })
.toBuffer();
await s3Service.upload(
`products/${variant}/${productId}/${imageId}.webp`,
processed,
'image/webp'
);
await job.updateProgress(20 + ((i + 1) / variants.length) * 60);
}
// Обновляем статус в БД
await updateImageStatus(imageId, 'ready');
await job.updateProgress(100);
return { success: true, variants: variants.length };
},
{
connection,
concurrency: 3, // Ограничиваем CPU-intensive задачи
}
);Scheduled Jobs (Cron)
typescript
// src/workers/scheduled.ts
import { queues } from '../config/queues';
// Очистка устаревших сессий — каждый час
await queues.cleanup.add(
'cleanup-sessions',
{ olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }, // 30 дней
{
repeat: {
pattern: '0 * * * *', // Каждый час
},
jobId: 'scheduled-cleanup-sessions', // Уникальный ID для повторяющихся задач
}
);
// Синхронизация денормализованных данных — каждую ночь
await queues.analytics.add(
'sync-denormalized',
{ tables: ['users', 'products', 'categories'] },
{
repeat: {
pattern: '0 3 * * *', // В 3:00 каждый день
},
jobId: 'scheduled-sync-denormalized',
}
);
// Очистка verification codes — каждые 15 минут
await queues.cleanup.add(
'cleanup-verification-codes',
{},
{
repeat: {
pattern: '*/15 * * * *',
},
jobId: 'scheduled-cleanup-codes',
}
);Dashboard (Bull Board)
typescript
// src/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { queues } from '../../../config/queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(queues.notifications),
new BullMQAdapter(queues.images),
new BullMQAdapter(queues.analytics),
new BullMQAdapter(queues.cleanup),
new BullMQAdapter(queues.reports),
],
serverAdapter,
});
// Доступ: https://admin.partizap.ru/queues
// Требуется аутентификация админаError Handling
Retry стратегии
typescript
const JOB_CONFIGS = {
'send-sms': {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
'send-email': {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
},
'process-image': {
attempts: 2,
backoff: { type: 'fixed', delay: 5000 },
},
'cleanup-sessions': {
attempts: 1, // Не повторять, запустится по расписанию
},
};Dead Letter Queue
typescript
// При исчерпании попыток — перенос в DLQ
notificationWorker.on('failed', async (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts!) {
// Перенос в Dead Letter Queue
await queues.deadLetter.add(
'failed-notification',
{
originalJob: job.data,
error: err.message,
stack: err.stack,
failedAt: new Date(),
}
);
// Алерт в мониторинг
await alertService.send({
level: 'error',
message: `Job permanently failed: ${job.name}`,
data: { jobId: job.id, error: err.message },
});
}
});Graceful Shutdown
typescript
// src/index.ts
import { notificationWorker, imageWorker, cleanupWorker } from './workers';
const workers = [notificationWorker, imageWorker, cleanupWorker];
async function shutdown() {
console.log('Shutting down workers...');
// Остановить принятие новых задач
await Promise.all(workers.map(w => w.pause()));
// Дождаться завершения текущих задач (max 30 секунд)
await Promise.race([
Promise.all(workers.map(w => w.close())),
new Promise(resolve => setTimeout(resolve, 30000)),
]);
console.log('Workers stopped');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Мониторинг
Метрики для Prometheus
typescript
// src/metrics/queue-metrics.ts
import { Registry, Gauge, Counter } from 'prom-client';
const registry = new Registry();
const queueSizeGauge = new Gauge({
name: 'bullmq_queue_size',
help: 'Number of jobs in queue',
labelNames: ['queue', 'status'],
registers: [registry],
});
const jobDurationHistogram = new Histogram({
name: 'bullmq_job_duration_seconds',
help: 'Job processing duration',
labelNames: ['queue', 'job'],
buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
registers: [registry],
});
// Обновление метрик
async function updateMetrics() {
for (const [name, queue] of Object.entries(queues)) {
const counts = await queue.getJobCounts();
queueSizeGauge.set({ queue: name, status: 'waiting' }, counts.waiting);
queueSizeGauge.set({ queue: name, status: 'active' }, counts.active);
queueSizeGauge.set({ queue: name, status: 'failed' }, counts.failed);
queueSizeGauge.set({ queue: name, status: 'delayed' }, counts.delayed);
}
}
setInterval(updateMetrics, 10000); // Каждые 10 секундПоследствия
Положительные
- Надёжность: Redis persistence, retry механизм
- Масштабируемость: горизонтальное масштабирование workers
- Мониторинг: Bull Board, метрики Prometheus
- Гибкость: приоритеты, delays, rate limiting
Отрицательные
- Сложность: дополнительная инфраструктура
- Отладка: сложнее трейсить асинхронные операции
- Зависимость: Redis должен быть доступен
Митигация
| Проблема | Решение |
|---|---|
| Redis недоступен | Fallback на синхронную обработку для критичных задач |
| Worker упал | PM2 автоперезапуск, multiple workers |
| Очередь переполнена | Алерты при >1000 задач, rate limiting на входе |
Связанные решения
- ADR-005: Кэширование — Redis как бэкенд
- ADR-006: Файлы — обработка изображений
- ADR-008: Логирование — мониторинг очередей