Skip to content

ADR-007: Очереди фоновых задач

Дата: 2026-01-04
Статус: Принято
Авторы: Backend Team


Контекст

Проект требует асинхронной обработки задач:

  • Обработка загруженных изображений (resize, optimize)
  • Отправка SMS для верификации
  • Отправка email-уведомлений
  • Денормализация счётчиков
  • Очистка устаревших данных
  • Генерация отчётов для админки

Требования

  • Надёжность: задачи не должны теряться
  • Retry: автоматический повтор при ошибках
  • Приоритизация: критичные задачи важнее
  • Масштабируемость: возможность добавления workers
  • Мониторинг: видимость состояния очередей

Решение

Используем BullMQ — библиотеку очередей на базе Redis.

Почему BullMQ

КритерийBullMQAgendaBee-Queue
BackendRedisMongoDBRedis
TypeScript✅ Native⚠️ Partial❌ No
Retry✅ Flexible✅ Yes⚠️ Limited
Delayed jobs✅ Yes✅ Yes❌ No
Rate limiting✅ Yes❌ No❌ No
Dashboard✅ Bull Board✅ Agendash❌ No
Производительность✅ High⚠️ Medium✅ High

Архитектура

┌─────────────────────────────────────────────────────────────┐
│                     BACKEND (Producer)                       │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  await queue.add('send-sms', { phone, code })           ││
│  │  await queue.add('process-image', { imageId }, {        ││
│  │    priority: 1,                                          ││
│  │    delay: 0,                                             ││
│  │    attempts: 3                                           ││
│  │  })                                                      ││
│  └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                         REDIS                                │
│  ┌─────────────────────────────────────────────────────────┐│
│  │  Queues:                                                 ││
│  │  ├── bull:notifications:waiting                         ││
│  │  ├── bull:notifications:active                          ││
│  │  ├── bull:notifications:completed                       ││
│  │  ├── bull:notifications:failed                          ││
│  │  ├── bull:images:waiting                                ││
│  │  └── ...                                                 ││
│  └─────────────────────────────────────────────────────────┘│
└──────────────────────────┬──────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                    WORKERS (Consumers)                       │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │   Worker 1   │  │   Worker 2   │  │   Worker 3   │       │
│  │ (images)     │  │ (notify)     │  │ (cleanup)    │       │
│  └──────────────┘  └──────────────┘  └──────────────┘       │
└─────────────────────────────────────────────────────────────┘

Очереди и задачи

Структура очередей

ОчередьПриоритетConcurrencyОписание
notificationsHigh5SMS, Email
imagesMedium3Обработка изображений
analyticsLow2Счётчики, статистика
cleanupLow1Удаление старых данных
reportsLow1Генерация отчётов

Типы задач

typescript
// src/jobs/types.ts

// Notifications
interface SendSmsJob {
  phone: string;
  message: string;
  template?: 'verification' | 'order' | 'reminder';
}

interface SendEmailJob {
  to: string;
  subject: string;
  template: string;
  data: Record<string, any>;
}

// Images
interface ProcessImageJob {
  imageId: string;
  productId: string;
  originalKey: string;
}

interface DeleteImagesJob {
  keys: string[];
}

// Analytics
interface UpdateCountersJob {
  type: 'product_views' | 'seller_rating' | 'category_count';
  entityId: string;
}

interface SyncDenormalizedDataJob {
  table: 'users' | 'products' | 'categories';
}

// Cleanup
interface CleanupExpiredSessionsJob {
  olderThan: Date;
}

interface CleanupVerificationCodesJob {
  olderThan: Date;
}

Реализация

Конфигурация очередей

typescript
// src/config/queues.ts
import { Queue, Worker, QueueScheduler } from 'bullmq';
import Redis from 'ioredis';

const connection = new Redis(process.env.REDIS_URL, {
  maxRetriesPerRequest: null, // Важно для BullMQ
});

export const queues = {
  notifications: new Queue('notifications', { connection }),
  images: new Queue('images', { connection }),
  analytics: new Queue('analytics', { connection }),
  cleanup: new Queue('cleanup', { connection }),
  reports: new Queue('reports', { connection }),
};

// Scheduler для delayed/repeated jobs
export const schedulers = {
  notifications: new QueueScheduler('notifications', { connection }),
  images: new QueueScheduler('images', { connection }),
  analytics: new QueueScheduler('analytics', { connection }),
  cleanup: new QueueScheduler('cleanup', { connection }),
};

Добавление задач (Producer)

typescript
// src/services/notification.service.ts
import { queues } from '../config/queues';

export class NotificationService {
  async sendVerificationSms(phone: string, code: string): Promise<void> {
    await queues.notifications.add(
      'send-sms',
      {
        phone,
        message: `Ваш код подтверждения: ${code}`,
        template: 'verification',
      },
      {
        priority: 1, // Высокий приоритет
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000, // 1s, 2s, 4s
        },
        removeOnComplete: true,
        removeOnFail: 100, // Хранить 100 последних failed
      }
    );
  }

  async sendWelcomeEmail(email: string, name: string): Promise<void> {
    await queues.notifications.add(
      'send-email',
      {
        to: email,
        subject: 'Добро пожаловать на AutoParts!',
        template: 'welcome',
        data: { name },
      },
      {
        priority: 2,
        attempts: 5,
        delay: 5000, // Отправить через 5 секунд
      }
    );
  }
}

Workers (Consumers)

typescript
// src/workers/notification.worker.ts
import { Worker, Job } from 'bullmq';
import { SmsService } from '../services/sms.service';
import { EmailService } from '../services/email.service';

const smsService = new SmsService();
const emailService = new EmailService();

export const notificationWorker = new Worker(
  'notifications',
  async (job: Job) => {
    switch (job.name) {
      case 'send-sms':
        return await handleSendSms(job);
      case 'send-email':
        return await handleSendEmail(job);
      default:
        throw new Error(`Unknown job: ${job.name}`);
    }
  },
  {
    connection,
    concurrency: 5,
    limiter: {
      max: 10,
      duration: 1000, // 10 SMS в секунду максимум
    },
  }
);

async function handleSendSms(job: Job<SendSmsJob>): Promise<void> {
  const { phone, message } = job.data;
  
  try {
    await smsService.send(phone, message);
    console.log(`SMS sent to ${phone}`);
  } catch (error) {
    console.error(`SMS failed: ${error.message}`);
    throw error; // BullMQ выполнит retry
  }
}

async function handleSendEmail(job: Job<SendEmailJob>): Promise<void> {
  const { to, subject, template, data } = job.data;
  
  await emailService.send({ to, subject, template, data });
}

// Event handlers
notificationWorker.on('completed', (job) => {
  console.log(`Job ${job.id} completed`);
});

notificationWorker.on('failed', (job, err) => {
  console.error(`Job ${job?.id} failed: ${err.message}`);
});

Worker для изображений

typescript
// src/workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import sharp from 'sharp';
import { S3Service } from '../services/s3.service';

export const imageWorker = new Worker(
  'images',
  async (job: Job<ProcessImageJob>) => {
    const { imageId, productId, originalKey } = job.data;
    
    // Обновляем прогресс
    await job.updateProgress(10);
    
    // Скачиваем оригинал
    const original = await s3Service.download(originalKey);
    await job.updateProgress(20);
    
    // Создаём варианты
    const variants = ['thumbnail', 'medium', 'large'];
    
    for (let i = 0; i < variants.length; i++) {
      const variant = variants[i];
      const config = IMAGE_VARIANTS[variant];
      
      const processed = await sharp(original)
        .resize(config.width, config.height, { fit: 'cover' })
        .webp({ quality: config.quality })
        .toBuffer();
      
      await s3Service.upload(
        `products/${variant}/${productId}/${imageId}.webp`,
        processed,
        'image/webp'
      );
      
      await job.updateProgress(20 + ((i + 1) / variants.length) * 60);
    }
    
    // Обновляем статус в БД
    await updateImageStatus(imageId, 'ready');
    await job.updateProgress(100);
    
    return { success: true, variants: variants.length };
  },
  {
    connection,
    concurrency: 3, // Ограничиваем CPU-intensive задачи
  }
);

Scheduled Jobs (Cron)

typescript
// src/workers/scheduled.ts
import { queues } from '../config/queues';

// Очистка устаревших сессий — каждый час
await queues.cleanup.add(
  'cleanup-sessions',
  { olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }, // 30 дней
  {
    repeat: {
      pattern: '0 * * * *', // Каждый час
    },
    jobId: 'scheduled-cleanup-sessions', // Уникальный ID для повторяющихся задач
  }
);

// Синхронизация денормализованных данных — каждую ночь
await queues.analytics.add(
  'sync-denormalized',
  { tables: ['users', 'products', 'categories'] },
  {
    repeat: {
      pattern: '0 3 * * *', // В 3:00 каждый день
    },
    jobId: 'scheduled-sync-denormalized',
  }
);

// Очистка verification codes — каждые 15 минут
await queues.cleanup.add(
  'cleanup-verification-codes',
  {},
  {
    repeat: {
      pattern: '*/15 * * * *',
    },
    jobId: 'scheduled-cleanup-codes',
  }
);

Dashboard (Bull Board)

typescript
// src/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { queues } from '../../../config/queues';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(queues.notifications),
    new BullMQAdapter(queues.images),
    new BullMQAdapter(queues.analytics),
    new BullMQAdapter(queues.cleanup),
    new BullMQAdapter(queues.reports),
  ],
  serverAdapter,
});

// Доступ: https://admin.partizap.ru/queues
// Требуется аутентификация админа

Error Handling

Retry стратегии

typescript
const JOB_CONFIGS = {
  'send-sms': {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
  },
  'send-email': {
    attempts: 5,
    backoff: { type: 'exponential', delay: 2000 },
  },
  'process-image': {
    attempts: 2,
    backoff: { type: 'fixed', delay: 5000 },
  },
  'cleanup-sessions': {
    attempts: 1, // Не повторять, запустится по расписанию
  },
};

Dead Letter Queue

typescript
// При исчерпании попыток — перенос в DLQ
notificationWorker.on('failed', async (job, err) => {
  if (job && job.attemptsMade >= job.opts.attempts!) {
    // Перенос в Dead Letter Queue
    await queues.deadLetter.add(
      'failed-notification',
      {
        originalJob: job.data,
        error: err.message,
        stack: err.stack,
        failedAt: new Date(),
      }
    );
    
    // Алерт в мониторинг
    await alertService.send({
      level: 'error',
      message: `Job permanently failed: ${job.name}`,
      data: { jobId: job.id, error: err.message },
    });
  }
});

Graceful Shutdown

typescript
// src/index.ts
import { notificationWorker, imageWorker, cleanupWorker } from './workers';

const workers = [notificationWorker, imageWorker, cleanupWorker];

async function shutdown() {
  console.log('Shutting down workers...');
  
  // Остановить принятие новых задач
  await Promise.all(workers.map(w => w.pause()));
  
  // Дождаться завершения текущих задач (max 30 секунд)
  await Promise.race([
    Promise.all(workers.map(w => w.close())),
    new Promise(resolve => setTimeout(resolve, 30000)),
  ]);
  
  console.log('Workers stopped');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Мониторинг

Метрики для Prometheus

typescript
// src/metrics/queue-metrics.ts
import { Registry, Gauge, Counter } from 'prom-client';

const registry = new Registry();

const queueSizeGauge = new Gauge({
  name: 'bullmq_queue_size',
  help: 'Number of jobs in queue',
  labelNames: ['queue', 'status'],
  registers: [registry],
});

const jobDurationHistogram = new Histogram({
  name: 'bullmq_job_duration_seconds',
  help: 'Job processing duration',
  labelNames: ['queue', 'job'],
  buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
  registers: [registry],
});

// Обновление метрик
async function updateMetrics() {
  for (const [name, queue] of Object.entries(queues)) {
    const counts = await queue.getJobCounts();
    
    queueSizeGauge.set({ queue: name, status: 'waiting' }, counts.waiting);
    queueSizeGauge.set({ queue: name, status: 'active' }, counts.active);
    queueSizeGauge.set({ queue: name, status: 'failed' }, counts.failed);
    queueSizeGauge.set({ queue: name, status: 'delayed' }, counts.delayed);
  }
}

setInterval(updateMetrics, 10000); // Каждые 10 секунд

Последствия

Положительные

  • Надёжность: Redis persistence, retry механизм
  • Масштабируемость: горизонтальное масштабирование workers
  • Мониторинг: Bull Board, метрики Prometheus
  • Гибкость: приоритеты, delays, rate limiting

Отрицательные

  • Сложность: дополнительная инфраструктура
  • Отладка: сложнее трейсить асинхронные операции
  • Зависимость: Redis должен быть доступен

Митигация

ПроблемаРешение
Redis недоступенFallback на синхронную обработку для критичных задач
Worker упалPM2 автоперезапуск, multiple workers
Очередь переполненаАлерты при >1000 задач, rate limiting на входе

Связанные решения