Appearance
Partizap MVP Design, v2.9
Date: 2026-01-24 Status: Approved Approach: Fresh start, build to ADR spec with improvements.
Overview
Multi-vendor auto parts marketplace for St. Petersburg region. MVP focuses on core listing and search functionality.
MVP Scope
| Included | Deferred |
|---|---|
| Email auth (registration, login, verification) | Phone verification (SMS.ru) |
| Products (create, edit, view, search) | Messaging/chat |
| Year/Make/Model/Region search | Reviews & ratings |
| Admin panel (moderation, content management, analytics) | Reports/complaints |
| Personal account (profile, my ads) | Payments (YooKassa) |
| Background image processing | |
| Favorites |
Tech Stack
- Framework: Slim 4 + PHP-DI
- ORM: Doctrine with PHP 8 attributes
- Database: PostgreSQL 16
- Cache/Queue: Redis
- Auth: PHP Sessions + HTTP-only cookies (replaces JWT)
- Jobs: Symfony Messenger with Redis transport
- Storage: S3-compatible (Selectel)
- Search: PostgreSQL FTS (Meilisearch migration path for >50K products)
Database Connection Pooling (PgBouncer)
PgBouncer as connection pooler between PHP-FPM and PostgreSQL to handle connection limits.
Why PgBouncer:
| Scenario | Without PgBouncer | With PgBouncer |
|---|---|---|
| 50 PHP-FPM workers | 50 connections held | 5-10 actual connections |
| 2 queue workers | +2 connections | Shared pool |
| Burst traffic | Connection exhaustion | Queued, stable |
| PostgreSQL default | max_connections=100 | Sufficient headroom |
PgBouncer Configuration:
ini
# docker/pgbouncer/pgbouncer.ini
[databases]
partizap = host=db port=5432 dbname=partizap
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = plain
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction # Release connection after each transaction
max_client_conn = 200 # PHP-FPM workers can connect
default_pool_size = 20 # Actual PostgreSQL connections
reserve_pool_size = 5 # For burst handling
max_db_connections = 50 # Hard limit per databasetxt
# docker/pgbouncer/userlist.txt
"postgres" "secret"Pool Mode Choice: transaction
| Mode | Connection Release | Use Case |
|---|---|---|
session | On client disconnect | Long-lived connections, prepared statements |
transaction | After COMMIT/ROLLBACK | Short requests (PHP), maximum reuse |
statement | After each query | Read-only, no transactions |
transaction is optimal for PHP: requests are short (50-200ms), Doctrine uses query strings (not prepared statements).
Features NOT available in transaction mode (not needed for MVP):
LISTEN/NOTIFY— use Redis pub/sub insteadSETsession variables — not used- Named prepared statements — Doctrine uses inline queries
Docker Compose (Production):
yaml
services:
pgbouncer:
image: edoburu/pgbouncer:1.21.0
container_name: marketplace-pgbouncer
restart: unless-stopped
environment:
DATABASE_URL: postgres://${DB_USERNAME}:${DB_PASSWORD}@db:5432/${DB_DATABASE}
POOL_MODE: transaction
MAX_CLIENT_CONN: 200
DEFAULT_POOL_SIZE: 20
RESERVE_POOL_SIZE: 5
ports:
- "6432:6432"
networks:
- marketplace
depends_on:
- dbDoctrine Configuration:
php
// config/doctrine.php
return [
'connection' => [
'driver' => 'pdo_pgsql',
'host' => $_ENV['DB_HOST'], // 'pgbouncer' in production, 'db' in dev
'port' => (int) $_ENV['DB_PORT'], // 6432 in production, 5432 in dev
'dbname' => $_ENV['DB_DATABASE'],
'user' => $_ENV['DB_USERNAME'],
'password' => $_ENV['DB_PASSWORD'],
'charset' => 'utf8',
],
];Environment Variables:
bash
# .env.production
DB_HOST=pgbouncer
DB_PORT=6432
# .env.development (direct connection for easier debugging)
DB_HOST=db
DB_PORT=5432Monitoring:
sql
-- Connect to PgBouncer admin console
psql -h pgbouncer -p 6432 -U postgres pgbouncer
-- Useful commands
SHOW POOLS; -- Connection pool stats
SHOW CLIENTS; -- Connected clients
SHOW SERVERS; -- Backend connections
SHOW STATS; -- Query statisticsArchitecture
┌─────────────────────────────────────────────────────────────────┐
│ CLIENTS │
│ Browser (Nuxt 3 SSR) ←──── HTTP-only session cookies │
└──────────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SLIM 4 APPLICATION │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌───────────┐ │
│ │ /store │ │ /vendor │ │ /admin │ │ /auth │ │
│ │ (public) │ │ (seller) │ │(superusers)│ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ └───────────┘ │
│ │ │
│ APPLICATION LAYER │
│ (Services, DTOs, Application Exceptions) │
│ │ │
│ DOMAIN LAYER │
│ (Entities, Repository Interfaces, Enums) │
│ │ │
│ INFRASTRUCTURE LAYER │
│ (Doctrine Repos, Redis, S3, Messenger, Session Handler) │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ PostgreSQL │ │ Redis │ │ S3 Storage │
└──────────────┘ └──────────────┘ └──────────────┘Database Schema (per ADR partizap_DB-structure.json)
Key Design Decisions
- SERIAL primary keys (not UUID) — simpler, faster JOINs
- 4-level geo hierarchy: Region → City → District → Metro Station
is_adminboolean instead of role enum- Timestamps for verification:
email_verified_at,phone_verified_at - Dual OEM approach:
oem_numberfield on product + M:Noem_numberstable
Car References
sql
car_makes (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
logo_url VARCHAR(500),
is_popular BOOLEAN DEFAULT false
);
car_models (
id SERIAL PRIMARY KEY,
make_id INTEGER NOT NULL REFERENCES car_makes(id),
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
UNIQUE(make_id, slug)
);
car_generations (
id SERIAL PRIMARY KEY,
model_id INTEGER NOT NULL REFERENCES car_models(id),
name VARCHAR(100) NOT NULL,
code VARCHAR(50), -- E46, W213, XV70
year_from INTEGER NOT NULL,
year_to INTEGER, -- NULL = current
steering VARCHAR(10) DEFAULT 'left' -- left|right|both
);Geo Data (4 tables)
sql
regions (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
lat DECIMAL(10,8),
lon DECIMAL(11,8)
);
cities (
id SERIAL PRIMARY KEY,
region_id INTEGER NOT NULL REFERENCES regions(id),
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
lat DECIMAL(10,8) NOT NULL,
lon DECIMAL(11,8) NOT NULL,
UNIQUE(region_id, slug)
);
districts (
id SERIAL PRIMARY KEY,
city_id INTEGER NOT NULL REFERENCES cities(id),
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
UNIQUE(city_id, slug)
);
metro_stations (
id SERIAL PRIMARY KEY,
city_id INTEGER NOT NULL REFERENCES cities(id),
line VARCHAR(100),
line_color VARCHAR(7),
name VARCHAR(100) NOT NULL,
lat DECIMAL(10,8),
lon DECIMAL(11,8),
UNIQUE(city_id, name)
);Users & Auth
sql
users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
phone VARCHAR(20) UNIQUE,
password_hash VARCHAR(255) NOT NULL,
account_type VARCHAR(20) DEFAULT 'personal', -- personal|business
email_verified_at TIMESTAMP,
phone_verified_at TIMESTAMP,
display_name VARCHAR(100),
avatar_url VARCHAR(500),
city_id INTEGER REFERENCES cities(id),
district_id INTEGER REFERENCES districts(id),
metro_station_id INTEGER REFERENCES metro_stations(id),
rating DECIMAL(3,2) DEFAULT 0,
reviews_count INTEGER DEFAULT 0,
products_count INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT true,
is_admin BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
user_sessions (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token_hash VARCHAR(255) NOT NULL UNIQUE,
ip_address VARCHAR(45),
user_agent TEXT,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_activity_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
verification_codes (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
type VARCHAR(10) NOT NULL, -- email|phone
code VARCHAR(6) NOT NULL,
attempts INTEGER DEFAULT 0,
expires_at TIMESTAMP NOT NULL,
used_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
business_profiles (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL UNIQUE REFERENCES users(id) ON DELETE CASCADE,
company_name VARCHAR(200) NOT NULL,
inn VARCHAR(12),
address VARCHAR(500),
website VARCHAR(255),
working_hours VARCHAR(100),
is_verified BOOLEAN DEFAULT false,
verified_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Categories
sql
categories (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL UNIQUE,
icon VARCHAR(100),
parent_id INTEGER REFERENCES categories(id),
category_type VARCHAR(20) DEFAULT 'part', -- part|condition|attribute
sort_order INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT true,
products_count INTEGER DEFAULT 0
);Products
sql
products (
id SERIAL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
steering VARCHAR(15) DEFAULT 'universal', -- left|right|universal
oem_number VARCHAR(50), -- denormalized for quick lookup
manufacturer VARCHAR(100),
seller_id INTEGER NOT NULL REFERENCES users(id),
city_id INTEGER NOT NULL REFERENCES cities(id),
region_id INTEGER REFERENCES regions(id), -- DENORM: synced from city via trigger
primary_category_id INTEGER REFERENCES categories(id), -- DENORM: for search perf
district_id INTEGER REFERENCES districts(id),
metro_station_id INTEGER REFERENCES metro_stations(id),
address VARCHAR(255),
views_count INTEGER DEFAULT 0,
favorites_count INTEGER DEFAULT 0,
status VARCHAR(20) DEFAULT 'pending', -- draft|pending|active|sold|archived|rejected
rejection_reason TEXT,
is_available BOOLEAN DEFAULT true,
promoted_until TIMESTAMP,
search_vector TSVECTOR, -- Full-text search
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
published_at TIMESTAMP
);
-- Indexes (ADR defined)
CREATE INDEX idx_products_oem ON products(oem_number);
CREATE INDEX idx_products_seller ON products(seller_id);
CREATE INDEX idx_products_city ON products(city_id);
CREATE INDEX idx_products_price ON products(price);
CREATE INDEX idx_products_status ON products(status);
CREATE INDEX idx_products_steering ON products(steering);
CREATE INDEX idx_products_available ON products(is_available);
CREATE INDEX idx_products_created ON products(created_at);
-- Indexes (search optimization with denormalized columns)
CREATE INDEX idx_products_region_category_date
ON products(region_id, primary_category_id, created_at DESC, id DESC)
WHERE status = 'active';
CREATE INDEX idx_products_region_category_price_asc
ON products(region_id, primary_category_id, price ASC, id ASC)
WHERE status = 'active';
CREATE INDEX idx_products_region_category_price_desc
ON products(region_id, primary_category_id, price DESC, id DESC)
WHERE status = 'active';
CREATE INDEX idx_products_search ON products USING GIN(search_vector)
WHERE status = 'active';
-- Indexes for cursor-based pagination (fallback without filters)
CREATE INDEX idx_products_cursor_date ON products(created_at DESC, id DESC)
WHERE status = 'active';
CREATE INDEX idx_products_cursor_price_asc ON products(price ASC, id ASC)
WHERE status = 'active';
CREATE INDEX idx_products_cursor_price_desc ON products(price DESC, id DESC)
WHERE status = 'active';
product_categories (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
category_id INTEGER NOT NULL REFERENCES categories(id),
is_primary BOOLEAN DEFAULT false,
UNIQUE(product_id, category_id)
);
-- Indexes (ADR defined)
CREATE INDEX idx_product_categories_product ON product_categories(product_id);
CREATE INDEX idx_product_categories_category ON product_categories(category_id);
product_images (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
original_key VARCHAR(500) NOT NULL, -- S3 key in private bucket (not URL)
-- Dual format: WebP + JPEG fallback
thumbnail_webp VARCHAR(500),
thumbnail_jpeg VARCHAR(500),
medium_webp VARCHAR(500),
medium_jpeg VARCHAR(500),
large_webp VARCHAR(500),
large_jpeg VARCHAR(500),
status VARCHAR(20) DEFAULT 'processing', -- processing|ready|error
retry_count SMALLINT DEFAULT 0,
error_message TEXT, -- last error for debugging
is_primary BOOLEAN DEFAULT false,
sort_order INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
product_compatibility (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
make_id INTEGER NOT NULL REFERENCES car_makes(id),
model_id INTEGER REFERENCES car_models(id),
generation_id INTEGER REFERENCES car_generations(id),
note TEXT
);
-- Indexes (ADR + recommended additions)
CREATE INDEX idx_compatibility_product ON product_compatibility(product_id);
CREATE INDEX idx_compatibility_make ON product_compatibility(make_id);
CREATE INDEX idx_compatibility_make_model ON product_compatibility(make_id, model_id);
CREATE INDEX idx_compatibility_generation ON product_compatibility(generation_id); -- ADDED
CREATE INDEX idx_compatibility_full ON product_compatibility(make_id, model_id, generation_id); -- ADDEDOEM Numbers
sql
oem_numbers (
id SERIAL PRIMARY KEY,
oem VARCHAR(50) NOT NULL UNIQUE, -- normalized (no dashes/spaces)
oem_display VARCHAR(50) NOT NULL, -- with dashes for display
manufacturer VARCHAR(100)
);
product_oem (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
oem_number_id INTEGER NOT NULL REFERENCES oem_numbers(id),
is_primary BOOLEAN DEFAULT true,
UNIQUE(product_id, oem_number_id)
);
oem_cross_references (
id SERIAL PRIMARY KEY,
original_oem_id INTEGER NOT NULL REFERENCES oem_numbers(id),
analog_oem_id INTEGER NOT NULL REFERENCES oem_numbers(id),
quality_rating VARCHAR(20), -- OEM|OES|aftermarket|premium
confidence DECIMAL(3,2) DEFAULT 1,
source VARCHAR(50),
UNIQUE(original_oem_id, analog_oem_id)
);Favorites (per ADR)
sql
favorites (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX idx_favorites_user_product ON favorites(user_id, product_id);Note: ADR specifies "No action" on delete; CASCADE used here for data integrity.
Analytics
sql
product_views (
id SERIAL PRIMARY KEY,
product_id INTEGER NOT NULL REFERENCES products(id) ON DELETE CASCADE,
user_id INTEGER REFERENCES users(id),
ip_address VARCHAR(45) NOT NULL,
user_agent TEXT,
referer VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
search_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
query TEXT,
make_id INTEGER REFERENCES car_makes(id),
model_id INTEGER REFERENCES car_models(id),
category_id INTEGER REFERENCES categories(id),
city_id INTEGER REFERENCES cities(id),
filters JSONB,
results_count INTEGER,
ip_address VARCHAR(45),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
admin_audit_log (
id SERIAL PRIMARY KEY,
admin_id INTEGER NOT NULL REFERENCES users(id),
action VARCHAR(50) NOT NULL, -- user_ban, product_reject, etc.
target_type VARCHAR(50) NOT NULL, -- user, product, category
target_id INTEGER NOT NULL,
details JSONB, -- Action-specific data
ip_address VARCHAR(45) NOT NULL,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_audit_admin ON admin_audit_log(admin_id);
CREATE INDEX idx_audit_target ON admin_audit_log(target_type, target_id);
CREATE INDEX idx_audit_action ON admin_audit_log(action);
CREATE INDEX idx_audit_created ON admin_audit_log(created_at);Admin actions that require audit logging:
- User ban/unban, role changes
- Product approve/reject/force delete
- Category/reference data changes
- Any bulk operations
Denormalized Counters (Race Condition Protection)
The schema has denormalized counters for performance. Concurrent updates require careful handling.
Counter Inventory
| Table | Counter | Update Frequency | Accuracy Need |
|---|---|---|---|
products | views_count | Very high | Approximate OK |
products | favorites_count | Medium | Exact |
users | products_count | Low | Exact |
users | reviews_count | Low | Exact |
users | rating | Low | Exact |
categories | products_count | Low | Exact |
Strategy by Frequency
High-frequency counters (views) — Redis buffer + periodic sync:
┌─────────┐ INCR ┌─────────┐ Cron 1min ┌───────────┐
│ Request │ ────────────▶ │ Redis │ ──────────────▶ │ PostgreSQL│
└─────────┘ │ DB 1 │ └───────────┘
└─────────┘
Key: views:{product_id}
Sync: GETSET returns value and resets to 0php
// On view
$this->redis->cache()->incr("views:{$productId}");
// Cron job (every minute) — uses SCAN to avoid blocking Redis
// Lua script for atomic get-and-delete (prevents race condition if multiple syncs run)
$luaScript = <<<'LUA'
local val = redis.call('GET', KEYS[1])
if val then
redis.call('DEL', KEYS[1])
return val
end
return nil
LUA;
$cursor = 0;
do {
[$cursor, $keys] = $this->redis->cacheWrite()->scan($cursor, ['MATCH' => 'views:*', 'COUNT' => 100]);
foreach ($keys as $key) {
$productId = (int) str_replace('views:', '', $key);
$count = (int) $this->redis->cacheWrite()->eval($luaScript, [$key], 1);
if ($count > 0) {
$this->db->executeStatement(
'UPDATE products SET views_count = views_count + ? WHERE id = ?',
[$count, $productId]
);
}
}
} while ($cursor !== 0);Medium-frequency counters (favorites) — Atomic SQL:
php
// Add favorite
$this->db->transactional(function () use ($userId, $productId) {
$this->db->executeStatement(
'INSERT INTO favorites (user_id, product_id) VALUES (?, ?)
ON CONFLICT DO NOTHING',
[$userId, $productId]
);
// Only increment if insert happened
$this->db->executeStatement(
'UPDATE products SET favorites_count = favorites_count + 1
WHERE id = ? AND EXISTS (
SELECT 1 FROM favorites
WHERE user_id = ? AND product_id = ?
AND created_at > NOW() - INTERVAL \'1 second\'
)',
[$productId, $userId, $productId]
);
});Low-frequency counters (products_count, reviews) — Atomic SQL with single statement:
php
// On product publish
$this->db->executeStatement(
'UPDATE users SET products_count = products_count + 1 WHERE id = ?',
[$sellerId]
);
$this->db->executeStatement(
'UPDATE categories SET products_count = products_count + 1 WHERE id = ?',
[$categoryId]
);Calculated counters (rating) — Atomic recalculation:
php
// On review submit
$this->db->transactional(function () use ($sellerId, $rating) {
// Insert review first
$this->db->executeStatement(
'INSERT INTO reviews (seller_id, rating, ...) VALUES (?, ?, ...)',
[$sellerId, $rating, ...]
);
// Atomic recalculate from source of truth
$this->db->executeStatement(
'UPDATE users SET
reviews_count = (SELECT COUNT(*) FROM reviews WHERE seller_id = ?),
rating = (SELECT COALESCE(AVG(rating), 0) FROM reviews WHERE seller_id = ?)
WHERE id = ?',
[$sellerId, $sellerId, $sellerId]
);
});Why Not SELECT FOR UPDATE?
sql
-- DON'T: Lock contention under load
BEGIN;
SELECT products_count FROM users WHERE id = 1 FOR UPDATE; -- Blocks others
UPDATE users SET products_count = 5 WHERE id = 1;
COMMIT;
-- DO: Atomic increment (no lock contention)
UPDATE users SET products_count = products_count + 1 WHERE id = 1;PostgreSQL's UPDATE ... SET col = col + 1 is atomic — it uses row-level locking internally but doesn't block concurrent reads and resolves write conflicts automatically.
Consistency Recovery
Cron job to fix any drift (weekly):
php
// Recalculate products_count for all users
$this->db->executeStatement('
UPDATE users u SET products_count = (
SELECT COUNT(*) FROM products p
WHERE p.seller_id = u.id AND p.status = \'active\'
)
');
// Recalculate category counts
$this->db->executeStatement('
UPDATE categories c SET products_count = (
SELECT COUNT(*) FROM product_categories pc
JOIN products p ON p.id = pc.product_id
WHERE pc.category_id = c.id AND p.status = \'active\'
)
');Authentication (Security Hardened)
Session-based auth with Redis storage and multi-layer security.
Core Auth
- Session cookie:
PARTIZAP_SESSION, HttpOnly, Secure, SameSite=Lax - CSRF: Signed Double Submit Cookie (HMAC-SHA256 with session binding)
- Password hashing: Argon2id (memory: 64MB, time: 4, threads: 3)
- Session data: user_id, email, is_admin, account_type, verified flags
- Database tracking:
user_sessions+login_attemptstables - Session fixation protection: Regenerate session ID on login/logout
php
// In LoginAction — prevent session fixation attacks
public function __invoke(Request $request, Response $response): Response
{
// ... validate credentials ...
// Regenerate session ID before storing user data
session_regenerate_id(true);
// Store user in session
$_SESSION['user_id'] = $user->getId();
$_SESSION['email'] = $user->getEmail();
// ...
// Create database session record with new session ID
$this->sessionService->create($user, session_id());
return $response;
}
// In LogoutAction — destroy session completely
public function __invoke(Request $request, Response $response): Response
{
$this->sessionService->destroy(session_id());
// Clear session data
$_SESSION = [];
// Delete session cookie
if (ini_get('session.use_cookies')) {
$params = session_get_cookie_params();
setcookie(session_name(), '', time() - 42000, $params['path'],
$params['domain'], $params['secure'], $params['httponly']);
}
session_destroy();
return $response;
}Rate Limiting (Redis)
| Endpoint | Limit | Window | Key |
|---|---|---|---|
| POST /auth/login | 5 attempts | 15 min | IP + email |
| POST /auth/register | 3 attempts | 1 hour | IP |
| POST /auth/forgot-password | 3 attempts | 1 hour | IP + email |
| POST /auth/verify-* | 5 attempts | 15 min | IP + user_id |
| Global API (auth'd) | 100 req | 1 min | user_id |
| Global API (anon) | 30 req | 1 min | IP |
Brute Force Protection
| Failed Attempts | Action |
|---|---|
| 1-3 | Normal response |
| 4-5 | 2 second delay |
| 6-7 | 5 second delay + CAPTCHA |
| 8-9 | 10 second delay + CAPTCHA |
| 10+ | Account locked 30 min + email alert |
Password Requirements
- Length: 8-128 characters
- Must contain: uppercase, lowercase, digit, special character
- Blocklist: 10k common passwords
- No 3+ sequential chars (abc, 123)
- No 3+ repeated chars (aaa, 111)
- Cannot contain email/username
Signed Double Submit CSRF
csrf_token = random_bytes(32)
csrf_signature = HMAC-SHA256(csrf_token, session_id, secret_key)
csrf_cookie = base64(csrf_token || '.' || csrf_signature)
Set-Cookie: CSRF_TOKEN={csrf_cookie}; Secure; SameSite=Lax
Client sends: X-CSRF-TOKEN: {csrf_cookie}
Server validates: signature + session bindingNote: Both session and CSRF cookies use SameSite=Lax (not Strict) to allow top-level navigations from external sites (e.g., email links, OAuth redirects).
Security Headers (Middleware)
X-Frame-Options: DENY
X-Content-Type-Options: nosniff
X-XSS-Protection: 1; mode=block
Referrer-Policy: strict-origin-when-cross-origin
Permissions-Policy: geolocation=(), microphone=(), camera=()
Strict-Transport-Security: max-age=31536000; includeSubDomains; preload
Content-Security-Policy: default-src 'none'; script-src 'self'; ...CORS Configuration
Required for Nuxt 3 SSR frontend calling API from browser:
php
// config/cors.php
return [
'allowed_origins' => [
$_ENV['FRONTEND_URL'], // https://partizap.ru
'http://localhost:3000', // Development
],
'allowed_methods' => ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
'allowed_headers' => ['Content-Type', 'Accept', 'X-CSRF-TOKEN', 'X-Request-Id'],
'exposed_headers' => ['X-Request-Id'],
'max_age' => 86400, // Preflight cache 24h
'credentials' => true, // Allow cookies
];php
final class CorsMiddleware implements MiddlewareInterface
{
public function __construct(private array $config) {}
public function process(Request $request, RequestHandler $handler): Response
{
$origin = $request->getHeaderLine('Origin');
// Check allowed origin
if (!in_array($origin, $this->config['allowed_origins'], true)) {
if ($request->getMethod() === 'OPTIONS') {
return new Response(204);
}
return $handler->handle($request);
}
// Handle preflight
if ($request->getMethod() === 'OPTIONS') {
return (new Response(204))
->withHeader('Access-Control-Allow-Origin', $origin)
->withHeader('Access-Control-Allow-Methods', implode(', ', $this->config['allowed_methods']))
->withHeader('Access-Control-Allow-Headers', implode(', ', $this->config['allowed_headers']))
->withHeader('Access-Control-Max-Age', (string) $this->config['max_age'])
->withHeader('Access-Control-Allow-Credentials', 'true');
}
$response = $handler->handle($request);
return $response
->withHeader('Access-Control-Allow-Origin', $origin)
->withHeader('Access-Control-Allow-Credentials', 'true')
->withHeader('Access-Control-Expose-Headers', implode(', ', $this->config['exposed_headers']));
}
}Input Validation (Middleware)
- Content-Type validation for POST/PUT/PATCH (must be
application/json) - Request size limit (10 MB)
- JSON body sanitization (null bytes, trim, length limit)
- Accept header validation
Validation rules per field type:
| Field Type | Validation |
|---|---|
| Valid format, max 255 chars, lowercase normalized | |
| password | 8-128 chars, complexity rules (see Password Requirements) |
| phone | E.164 format, Russian numbers only |
| title | 3-255 chars, no HTML, trim whitespace |
| description | Max 10,000 chars, allow limited markdown |
| price | Positive decimal, max 99,999,999.99 |
| OEM number | Alphanumeric + dashes, max 50 chars, uppercase normalized |
| slug | Lowercase alphanumeric + dashes, max 100 chars |
| IDs (foreign keys) | Positive integers, must exist in referenced table |
| cursor | Valid base64, valid HMAC signature |
Validation library: respect/validation or symfony/validator
php
// Example: Product creation validation
final class CreateProductValidator
{
public function validate(array $data): ValidationResult
{
$rules = [
'title' => v::stringType()->length(3, 255)->notBlank(),
'description' => v::optional(v::stringType()->length(0, 10000)),
'price' => v::number()->positive()->max(99999999.99),
'city_id' => v::intVal()->positive(),
'category_ids' => v::arrayType()->each(v::intVal()->positive()),
'oem_number' => v::optional(v::alnum('-')->length(1, 50)),
'steering' => v::in(['left', 'right', 'universal']),
];
// ... validate and return result
}
}Additional Tables
sql
-- Add to users table
ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER DEFAULT 0;
ALTER TABLE users ADD COLUMN locked_until TIMESTAMP;
ALTER TABLE users ADD COLUMN last_failed_login_at TIMESTAMP;
-- Login audit log (internal security monitoring)
CREATE TABLE login_attempts (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL,
ip_address VARCHAR(45) NOT NULL,
user_agent TEXT,
success BOOLEAN NOT NULL,
failure_reason VARCHAR(50), -- invalid_password|user_not_found|locked|rate_limited
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_login_attempts_email ON login_attempts(email);
CREATE INDEX idx_login_attempts_ip ON login_attempts(ip_address);
CREATE INDEX idx_login_attempts_created ON login_attempts(created_at);User enumeration prevention:
- Login returns same error message for invalid email and invalid password
- Rate limiting applies before checking if user exists
login_attemptsis for internal security monitoring only, never exposed via APIforgot-passwordalways returns success message regardless of email existence
Auth Endpoints
POST /auth/register → Create user, start session
POST /auth/login → Verify password, start session
POST /auth/logout → Clear session
POST /auth/logout-all → Revoke all sessions
GET /auth/me → Current user info
POST /auth/verify-email → Verify with code
POST /auth/resend-verification → Resend verification email
POST /auth/forgot-password → Send reset email
POST /auth/reset-password → Reset with tokenPassword Reset Flow
Secure password reset with single-use, time-limited tokens:
sql
-- Add to verification_codes or create separate table
password_reset_tokens (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token_hash VARCHAR(255) NOT NULL, -- SHA256 hash of token
expires_at TIMESTAMP NOT NULL,
used_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Security properties:
- Token expires in 1 hour
- Single-use (delete or mark used after successful reset)
- Token stored as SHA256 hash (not plaintext)
- Same response for valid/invalid email (prevent enumeration)
- Invalidates all active sessions after password change
php
// POST /auth/forgot-password
public function forgotPassword(string $email): Response
{
$user = $this->userRepository->findByEmail($email);
if ($user !== null) {
// Generate cryptographically secure token
$token = bin2hex(random_bytes(32));
$tokenHash = hash('sha256', $token);
// Store hashed token with 1-hour expiry
$this->resetTokenRepository->create([
'user_id' => $user->getId(),
'token_hash' => $tokenHash,
'expires_at' => new \DateTime('+1 hour'),
]);
// Queue email with plaintext token
$this->queue->dispatch(new SendEmailMessage(
type: 'password_reset',
user: $user,
data: ['token' => $token]
));
}
// Always same response (prevent user enumeration)
return $this->json(['message' => 'If email exists, reset link sent']);
}
// POST /auth/reset-password
public function resetPassword(string $token, string $newPassword): Response
{
$tokenHash = hash('sha256', $token);
$resetToken = $this->resetTokenRepository->findValidByHash($tokenHash);
if ($resetToken === null) {
return $this->json(['error' => 'Invalid or expired token'], 400);
}
// Update password
$user = $resetToken->getUser();
$user->setPasswordHash(password_hash($newPassword, PASSWORD_ARGON2ID, $this->argonOptions));
// Mark token as used
$resetToken->setUsedAt(new \DateTime());
// Invalidate all sessions (logout everywhere)
$this->sessionService->destroyAllForUser($user->getId());
$this->em->flush();
return $this->json(['message' => 'Password updated successfully']);
}API Structure
Public /store
GET /store/products List with filters
GET /store/products/:id Product details
GET /store/products/search Full-text search
GET /store/cars/makes Car makes
GET /store/cars/makes/:id/models Models for make
GET /store/cars/models/:id/generations Generations for model
GET /store/categories Category tree
GET /store/geo/regions Regions
GET /store/geo/regions/:id/cities Cities
GET /store/geo/cities/:id/districts Districts
GET /store/geo/cities/:id/metro Metro stations
GET /store/sellers/:id Seller profile
GET /store/sellers/:id/products Seller's productsVendor /vendor (auth required)
GET /vendor/me My profile
PUT /vendor/me Update profile
POST /vendor/me/avatar Upload avatar
GET /vendor/products My products
POST /vendor/products Create product
GET /vendor/products/:id Get my product
PUT /vendor/products/:id Update my product
DELETE /vendor/products/:id Delete product
POST /vendor/products/:id/publish Submit for moderation
POST /vendor/products/:id/images Upload image
PUT /vendor/products/:id/images/order Reorder images
DELETE /vendor/products/:id/images/:imgId Delete image
GET /vendor/sessions List my sessions
DELETE /vendor/sessions/:id Revoke sessionAdmin /admin (is_admin required)
GET /admin/stats Dashboard overview
GET /admin/stats/products Products statistics
GET /admin/stats/users Users statistics
GET /admin/users List users (paginated)
GET /admin/users/:id User details
PUT /admin/users/:id Update/ban user
DELETE /admin/users/:id Soft delete user
GET /admin/products List all products
GET /admin/products/pending Pending moderation
PUT /admin/products/:id/approve Approve product
PUT /admin/products/:id/reject Reject with reason
DELETE /admin/products/:id Force delete
POST /admin/cars/makes Create make
PUT /admin/cars/makes/:id Update make
DELETE /admin/cars/makes/:id Delete make
POST /admin/cars/models Create model
PUT /admin/cars/models/:id Update model
DELETE /admin/cars/models/:id Delete model
POST /admin/cars/generations Create generation
PUT /admin/cars/generations/:id Update generation
DELETE /admin/cars/generations/:id Delete generation
POST /admin/categories Create category
PUT /admin/categories/:id Update category
DELETE /admin/categories/:id Delete category
POST /admin/geo/regions Create region
PUT /admin/geo/regions/:id Update region
POST /admin/geo/cities Create city
PUT /admin/geo/cities/:id Update city
POST /admin/geo/districts Create district
PUT /admin/geo/districts/:id Update district
POST /admin/geo/metro Create metro station
PUT /admin/geo/metro/:id Update metro stationAPI Documentation (OpenAPI/Swagger)
Auto-generated API documentation using OpenAPI 3.0 specification with Swagger UI.
Stack
| Component | Package | Purpose |
|---|---|---|
| Spec generator | zircote/swagger-php | PHP attributes → OpenAPI JSON |
| UI | swagger-api/swagger-ui | Interactive documentation |
| Validation | league/openapi-psr7-validator | Request/response validation |
PHP Attributes Approach
php
use OpenApi\Attributes as OA;
#[OA\Info(
version: '1.0.0',
title: 'Partizap API',
description: 'Auto parts marketplace API for St. Petersburg region'
)]
#[OA\Server(url: 'https://api.partizap.ru', description: 'Production')]
#[OA\Server(url: 'http://localhost:8000', description: 'Development')]
class OpenApiSpec {}Endpoint Documentation Example
php
#[OA\Get(
path: '/store/products',
summary: 'List products with filters',
tags: ['Store'],
parameters: [
new OA\Parameter(
name: 'region_id',
in: 'query',
required: false,
schema: new OA\Schema(type: 'integer'),
description: 'Filter by region'
),
new OA\Parameter(
name: 'make_id',
in: 'query',
required: false,
schema: new OA\Schema(type: 'integer'),
description: 'Filter by car make'
),
new OA\Parameter(
name: 'cursor',
in: 'query',
required: false,
schema: new OA\Schema(type: 'string'),
description: 'Pagination cursor (base64)'
),
],
responses: [
new OA\Response(
response: 200,
description: 'Product list',
content: new OA\JsonContent(ref: '#/components/schemas/ProductListResponse')
),
new OA\Response(response: 422, description: 'Validation error'),
]
)]
final class ListProductsAction extends Action { ... }Schema Definitions
php
#[OA\Schema(schema: 'Product', type: 'object')]
final class ProductSchema
{
#[OA\Property(type: 'integer', example: 12345)]
public int $id;
#[OA\Property(type: 'string', example: 'Генератор BMW E46')]
public string $title;
#[OA\Property(type: 'number', format: 'float', example: 15000.00)]
public float $price;
#[OA\Property(type: 'string', enum: ['left', 'right', 'universal'])]
public string $steering;
#[OA\Property(type: 'string', enum: ['draft', 'pending', 'active', 'sold', 'archived', 'rejected'])]
public string $status;
#[OA\Property(ref: '#/components/schemas/Seller')]
public object $seller;
#[OA\Property(type: 'array', items: new OA\Items(ref: '#/components/schemas/ProductImage'))]
public array $images;
}
#[OA\Schema(schema: 'ProductListResponse', type: 'object')]
final class ProductListResponseSchema
{
#[OA\Property(type: 'array', items: new OA\Items(ref: '#/components/schemas/Product'))]
public array $data;
#[OA\Property(ref: '#/components/schemas/CursorMeta')]
public object $meta;
}
#[OA\Schema(schema: 'CursorMeta', type: 'object')]
final class CursorMetaSchema
{
#[OA\Property(type: 'integer', example: 20)]
public int $per_page;
#[OA\Property(type: 'boolean', example: true)]
public bool $has_more;
#[OA\Property(type: 'string', nullable: true)]
public ?string $next_cursor;
#[OA\Property(type: 'string', nullable: true)]
public ?string $prev_cursor;
}Security Schemes
php
#[OA\SecurityScheme(
securityScheme: 'sessionAuth',
type: 'apiKey',
in: 'cookie',
name: 'PARTIZAP_SESSION',
description: 'Session cookie (HttpOnly)'
)]
#[OA\SecurityScheme(
securityScheme: 'csrfToken',
type: 'apiKey',
in: 'header',
name: 'X-CSRF-TOKEN',
description: 'CSRF token for state-changing requests'
)]
class SecuritySchemes {}Generation Command
php
// bin/console openapi:generate
final class GenerateOpenApiCommand extends Command
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
$openapi = \OpenApi\Generator::scan([
__DIR__ . '/../../app/Actions',
__DIR__ . '/../../app/Application/DTO',
]);
file_put_contents(
__DIR__ . '/../../public/openapi.json',
$openapi->toJson()
);
$output->writeln('OpenAPI spec generated: public/openapi.json');
return Command::SUCCESS;
}
}Routes
php
// config/routes.php
// Swagger UI (dev only)
if ($settings['app']['env'] === 'development') {
$app->get('/docs', function ($request, $response) {
$html = <<<HTML
<!DOCTYPE html>
<html>
<head>
<title>Partizap API Docs</title>
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist@5/swagger-ui.css">
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@5/swagger-ui-bundle.js"></script>
<script>
SwaggerUIBundle({
url: '/openapi.json',
dom_id: '#swagger-ui',
presets: [SwaggerUIBundle.presets.apis],
requestInterceptor: (req) => {
// Auto-attach CSRF token from cookie
const csrf = document.cookie.match(/CSRF_TOKEN=([^;]+)/);
if (csrf) req.headers['X-CSRF-TOKEN'] = csrf[1];
return req;
}
});
</script>
</body>
</html>
HTML;
$response->getBody()->write($html);
return $response->withHeader('Content-Type', 'text/html');
});
$app->get('/openapi.json', function ($request, $response) {
return $response
->withHeader('Content-Type', 'application/json')
->withBody(new \Slim\Psr7\Stream(fopen(__DIR__ . '/../public/openapi.json', 'r')));
});
}Request Validation Middleware (Optional)
php
use League\OpenAPIValidation\PSR7\ValidatorBuilder;
final class OpenApiValidationMiddleware implements MiddlewareInterface
{
private \League\OpenAPIValidation\PSR7\RequestValidator $validator;
public function __construct(string $specPath)
{
$this->validator = (new ValidatorBuilder())
->fromJsonFile($specPath)
->getRequestValidator();
}
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
try {
$this->validator->validate($request);
} catch (\League\OpenAPIValidation\PSR7\Exception\ValidationFailed $e) {
return new JsonResponse([
'error' => 'validation_failed',
'message' => $e->getPrevious()?->getMessage() ?? $e->getMessage(),
], 422);
}
return $handler->handle($request);
}
}Directory Structure
app/
├── Actions/
│ ├── Auth/
│ │ └── LoginAction.php # @OA\Post annotations
│ ├── Store/
│ │ └── ListProductsAction.php # @OA\Get annotations
│ └── ...
├── Application/
│ └── DTO/
│ └── OpenApi/ # Schema definitions
│ ├── ProductSchema.php
│ ├── UserSchema.php
│ └── ...
public/
└── openapi.json # Generated specCI Integration
yaml
# .github/workflows/ci.yml
- name: Generate OpenAPI spec
run: php bin/console openapi:generate
- name: Validate OpenAPI spec
run: npx @apidevtools/swagger-cli validate public/openapi.json
- name: Upload spec artifact
uses: actions/upload-artifact@v4
with:
name: openapi-spec
path: public/openapi.jsonError Logging & Monitoring
Monolog 3 + Sentry for error tracking with context enrichment.
Architecture
Code → Monolog → SentryHandler (ERROR+) → Sentry Cloud → Alerts/Dashboard
→ RotatingFileHandler (DEBUG+) → /var/log/app/*.log (14 days)Dependencies
bash
composer require monolog/monolog:^3.0
composer require sentry/sentry-php
composer require sentry/sentry-monologConfiguration
php
// config/logging.php
return [
'sentry_dsn' => $_ENV['SENTRY_DSN'],
'log_path' => '/var/log/app/app.log',
'log_days' => 14,
'log_level' => $_ENV['APP_ENV'] === 'production' ? Logger::INFO : Logger::DEBUG,
'app_env' => $_ENV['APP_ENV'],
'app_version' => $_ENV['APP_VERSION'] ?? 'unknown',
];Logger Factory
php
final class LoggerFactory
{
public function __construct(
private array $config,
private ?SessionInterface $session,
private RequestIdGenerator $requestId,
) {}
public function create(string $channel = 'app'): Logger
{
$logger = new Logger($channel);
// Context enrichment processor
$logger->pushProcessor(new ContextProcessor($this->session, $this->requestId));
// ERROR+ → Sentry (filtered)
if (!empty($this->config['sentry_dsn'])) {
\Sentry\init([
'dsn' => $this->config['sentry_dsn'],
'environment' => $this->config['app_env'],
'release' => $this->config['app_version'],
]);
$sentryHandler = new SentryHandler(Hub::getCurrent(), Logger::ERROR);
$logger->pushHandler(new FilteredSentryHandler($sentryHandler));
}
// DEBUG+ → rotating file (backup)
$logger->pushHandler(
new RotatingFileHandler(
$this->config['log_path'],
$this->config['log_days'],
$this->config['log_level']
)
);
return $logger;
}
}Context Processor
Automatically attaches request and user context to all log entries:
php
final class ContextProcessor
{
public function __construct(
private ?SessionInterface $session,
private RequestIdGenerator $requestId,
) {}
public function __invoke(LogRecord $record): LogRecord
{
$extra = $record->extra;
$extra['request_id'] = $this->requestId->get();
$extra['timestamp'] = (new \DateTimeImmutable())->format('c');
if ($this->session?->has('user_id')) {
$extra['user_id'] = $this->session->get('user_id');
}
return $record->with(extra: $extra);
}
}Filtered Sentry Handler
Skip expected/handled exceptions from Sentry to avoid noise:
php
final class FilteredSentryHandler extends AbstractHandler
{
private const IGNORED_EXCEPTIONS = [
ValidationException::class,
NotFoundException::class,
AuthenticationException::class,
AuthorizationException::class,
RateLimitException::class,
ImageValidationException::class,
CsrfTokenException::class,
];
public function __construct(
private SentryHandler $inner,
) {
parent::__construct(Logger::ERROR, true);
}
public function handle(LogRecord $record): bool
{
$exception = $record->context['exception'] ?? null;
if ($exception instanceof \Throwable) {
foreach (self::IGNORED_EXCEPTIONS as $ignoredClass) {
if ($exception instanceof $ignoredClass) {
return false; // Skip, don't send to Sentry
}
}
}
return $this->inner->handle($record);
}
}Sentry User Context
Set user context for better error tracking:
php
// In AuthMiddleware after successful session validation
\Sentry\configureScope(function (\Sentry\State\Scope $scope) use ($user): void {
$scope->setUser([
'id' => (string) $user->getId(),
'email' => $user->getEmail(),
]);
});Queue Worker Logging
Workers use dedicated channel with job context. See unified QueueWorker implementation in Cancellation Tokens section which includes both logging and graceful shutdown.
Logging context per job:
php
$context = [
'job_id' => $job->getId(),
'job_class' => $job::class,
'attempt' => $job->getAttempt(),
];
$this->logger->info('Job started', $context);
// ... execute job ...
$this->logger->info('Job completed', $context);
// or on failure:
$this->logger->error('Job failed', [...$context, 'exception' => $e]);Request ID Generator
Unique ID per request for tracing:
php
final class RequestIdGenerator
{
private ?string $id = null;
public function get(): string
{
if ($this->id === null) {
$this->id = bin2hex(random_bytes(8));
}
return $this->id;
}
}Error Handler Middleware
Catch unhandled exceptions and log:
php
final class ErrorHandlerMiddleware implements MiddlewareInterface
{
public function __construct(
private LoggerInterface $logger,
private RequestIdGenerator $requestId,
private bool $debug,
) {}
public function process(Request $request, RequestHandler $handler): Response
{
try {
return $handler->handle($request);
} catch (\Throwable $e) {
$requestId = $this->requestId->get();
$this->logger->error('Unhandled exception', [
'exception' => $e,
'request_id' => $requestId,
'url' => (string) $request->getUri(),
'method' => $request->getMethod(),
]);
if ($this->debug) {
throw $e;
}
// Include request_id for support correlation
return new JsonResponse([
'error' => 'Internal server error',
'request_id' => $requestId,
], 500);
}
}
}Configuration Notes
| Item | Setting |
|---|---|
| Sentry free tier | ~5K errors/month |
| File rotation | 14 days |
| Log level to Sentry | ERROR and above |
| Log level to file | DEBUG (dev), INFO (prod) |
| Request ID header | X-Request-Id (optional, for client tracing) |
When to Scale
| Trigger | Action |
|---|---|
| Sentry limits exceeded | Paid plan or self-hosted Sentry |
| Need log search | Add ELK or Grafana Loki |
| Multiple servers | Centralized logging (Loki, CloudWatch) |
Redis Architecture (HA)
Primary-Replica with Sentinel for high availability without Cluster complexity.
Topology
┌─────────────────────────────────────────────────────────────────┐
│ APPLICATION │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Writes │ │ Reads │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Primary │ ──── replication ────────▶│ Replica │ │
│ │ (writes) │ │ (reads) │ │
│ └──────┬──────┘ └─────────────┘ │
│ │ │
└─────────┼───────────────────────────────────────────────────────┘
│ monitored by
▼
┌─────────────────────────────────────────────────────────────────┐
│ SENTINEL QUORUM (×3) │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Sentinel 1│ │ Sentinel 2│ │ Sentinel 3│ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
│ • Monitor primary health │
│ • Automatic failover (replica → primary) │
│ • Notify clients of topology changes │
└─────────────────────────────────────────────────────────────────┘Database Distribution
Logical separation across Redis databases (0–15 available):
| DB | Purpose | Criticality | Flush Safe? |
|---|---|---|---|
| 0 | Sessions | Critical | No — logs out all users |
| 1 | Cache | Disposable | Yes — regenerates from DB |
| 2 | Queues | Important | No — loses pending jobs |
| 3 | Rate limits | Disposable | Yes — resets all counters |
Benefits:
FLUSHDBon DB 1 clears cache without affecting sessions- Separate memory monitoring per purpose
- Easier debugging (
redis-cli -n 0for sessions only) - Different eviction policies possible per DB (future)
Connection Strategy
| Operation | Target | DB | Rationale |
|---|---|---|---|
| Session read/write | Primary | 0 | Consistency required |
| Cache reads | Replica | 1 | Offload primary |
| Cache writes | Primary | 1 | Single source of truth |
| Locks (stampede) | Primary | 1 | Tied to cache keys |
| Queue (Messenger) | Primary | 2 | Job ordering |
| Rate limits | Primary | 3 | Atomic counters |
PHP Configuration
php
// config/redis.php
return [
'sentinel' => [
'master' => 'partizap-master',
'nodes' => [
['host' => 'sentinel-1', 'port' => 26379],
['host' => 'sentinel-2', 'port' => 26379],
['host' => 'sentinel-3', 'port' => 26379],
],
],
'databases' => [
'sessions' => 0, // Critical — user auth state
'cache' => 1, // Disposable — can flush anytime
'queues' => 2, // Important — pending jobs
'rate_limits' => 3, // Disposable — resets counters
],
];RedisConnectionFactory Service
php
final class RedisConnectionFactory
{
/** @var array<string, \Redis> Pooled connections by "role:db" */
private array $connections = [];
public function __construct(
private array $config,
) {}
// Purpose-specific getters (preferred API)
public function sessions(): \Redis
{
return $this->getConnection('master', $this->config['databases']['sessions']);
}
public function cache(): \Redis
{
return $this->getConnection('slave', $this->config['databases']['cache']);
}
public function cacheWrite(): \Redis
{
return $this->getConnection('master', $this->config['databases']['cache']);
}
public function queues(): \Redis
{
return $this->getConnection('master', $this->config['databases']['queues']);
}
public function rateLimits(): \Redis
{
return $this->getConnection('master', $this->config['databases']['rate_limits']);
}
private function getConnection(string $role, int $database): \Redis
{
$key = "{$role}:{$database}";
if (!isset($this->connections[$key])) {
$redis = $this->connectViaSentinel($role);
$redis->select($database);
$this->connections[$key] = $redis;
}
return $this->connections[$key];
}
private function connectViaSentinel(string $role): \Redis
{
foreach ($this->config['sentinel']['nodes'] as $node) {
$sentinel = new \Redis();
if (!$sentinel->connect($node['host'], $node['port'], 0.5)) {
continue;
}
$master = $this->config['sentinel']['master'];
if ($role === 'master') {
$info = $sentinel->rawCommand('SENTINEL', 'get-master-addr-by-name', $master);
} else {
$slaves = $sentinel->rawCommand('SENTINEL', 'slaves', $master);
$info = $slaves[array_rand($slaves)] ?? null; // Random replica
}
if ($info) {
$redis = new \Redis();
$redis->connect($info[0], (int)$info[1]);
return $redis;
}
}
throw new RedisConnectionException("Cannot connect to Redis {$role}");
}
}Usage Examples
php
// Session handler
$redis->sessions()->setex("sess:{$id}", 3600, $data);
// Cache (read from replica, write to primary)
$data = $redis->cache()->get("product:{$id}"); // Read from replica
$redis->cacheWrite()->setex("product:{$id}", 900, $data); // Write to primary
// Rate limiting
$redis->rateLimits()->incr("ratelimit:{$ip}:{$endpoint}");
// Queue (Symfony Messenger configured to use DB 2)
// DSN: redis://redis-primary:6379/2Docker Compose (Development)
yaml
services:
redis-primary:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_primary:/data
redis-replica:
image: redis:7-alpine
command: redis-server --replicaof redis-primary 6379
depends_on:
- redis-primary
sentinel-1:
image: redis:7-alpine
command: redis-sentinel /etc/redis/sentinel.conf
volumes:
- ./docker/redis/sentinel.conf:/etc/redis/sentinel.conf
depends_on:
- redis-primary
- redis-replica
sentinel-2:
image: redis:7-alpine
command: redis-sentinel /etc/redis/sentinel.conf
volumes:
- ./docker/redis/sentinel.conf:/etc/redis/sentinel.conf
depends_on:
- redis-primary
- redis-replica
sentinel-3:
image: redis:7-alpine
command: redis-sentinel /etc/redis/sentinel.conf
volumes:
- ./docker/redis/sentinel.conf:/etc/redis/sentinel.conf
depends_on:
- redis-primary
- redis-replicaSentinel Configuration
conf
# docker/redis/sentinel.conf
sentinel monitor partizap-master redis-primary 6379 2
sentinel down-after-milliseconds partizap-master 5000
sentinel failover-timeout partizap-master 60000
sentinel parallel-syncs partizap-master 1Failover Behavior
| Event | Sentinel Action | App Impact |
|---|---|---|
| Primary down | Promote replica | ~5-10s reconnect |
| Replica down | No action | Reads fall back to primary |
| Network partition | Quorum decides | Safe failover with 2/3 agreement |
Caching (Stampede Protected)
Redis caching with Lock + Stale-While-Revalidate protection against cache stampede.
Cache Stampede Problem
When cache expires, multiple concurrent requests hit DB simultaneously:
T=0.001s Request A → Cache MISS → Query DB
T=0.002s Request B → Cache MISS → Query DB
...100 requests → 100 DB queries (STAMPEDE!)Solution: Lock + Stale-While-Revalidate
Cache entry structure:
{
"data": { ... },
"stale_at": 1706099700, // Soft expiry (refresh starts)
"expires_at": 1706100000 // Hard expiry (must refresh)
}
Flow:
├─ FRESH (before stale_at) → Return data
├─ STALE (between stale_at and expires_at)
│ ├─ GOT LOCK → Return stale + refresh async
│ └─ NO LOCK → Return stale (another is refreshing)
└─ EXPIRED (after expires_at)
├─ GOT LOCK → Query DB, cache, return
└─ NO LOCK → Wait briefly, retry from cacheStampedeProtectedCache Service
php
final class StampedeProtectedCache
{
private const LOCK_TTL = 10; // Lock timeout
private const STALE_MARGIN = 300; // Soft expiry 5 min before hard
public function __construct(
private RedisConnectionFactory $redis,
) {}
public function get(string $key, callable $regenerate, int $ttl): mixed
{
// READ from replica (DB 1)
$cached = $this->redis->cache()->get($key);
if ($cached !== false) {
$entry = json_decode($cached, true);
$now = time();
if ($now < $entry['stale_at']) {
return $entry['data']; // FRESH
}
if ($now < $entry['expires_at']) {
$this->tryRefreshAsync($key, $regenerate, $ttl);
return $entry['data']; // STALE + async refresh
}
}
return $this->refreshWithLock($key, $regenerate, $ttl); // EXPIRED
}
private function tryRefreshAsync(string $key, callable $regenerate, int $ttl): void
{
$lockKey = "lock:{$key}";
// LOCK on primary (DB 1 — atomic operation)
$acquired = $this->redis->cacheWrite()->set($lockKey, '1', ['NX', 'EX' => self::LOCK_TTL]);
if ($acquired) {
try {
$data = $regenerate();
$this->store($key, $data, $ttl);
} finally {
$this->redis->cacheWrite()->del($lockKey);
}
}
}
private function store(string $key, mixed $data, int $ttl): void
{
$entry = [
'data' => $data,
'stale_at' => time() + $ttl - self::STALE_MARGIN,
'expires_at' => time() + $ttl,
];
// WRITE to primary DB 1 (replicates to replica)
$this->redis->cacheWrite()->setex($key, $ttl, json_encode($entry));
}
}Cache Key Patterns
| Data Type | Key Pattern | TTL | Stale Margin |
|---|---|---|---|
| Search results | search:{hash} | 5 min | 45 sec |
| Car makes | car_makes:all | 24 h | 5 min |
| Car models | car_models:{make_id} | 24 h | 5 min |
| Categories | categories:tree | 24 h | 5 min |
| Regions/Cities | geo:regions | 24 h | 5 min |
| Product detail | product:{id} | 15 min | 2 min |
| User profile | user:{id} | 10 min | 1 min |
Replication Lag Considerations
Redis replication is async with typical lag <1ms on same network. For cache data this is acceptable:
| Scenario | Lag Impact | Mitigation |
|---|---|---|
| Cache miss after write | User may re-fetch from DB | Acceptable (rare, <1ms window) |
| Stale check | May serve slightly stale | Already designed for staleness |
| Lock check | Could allow duplicate refresh | Only wastes 1 extra DB query |
When NOT to use replica reads:
- Sessions (use primary — consistency required)
- Rate limit counters (use primary — atomic ops)
- Locks (use primary — atomic ops)
Background Jobs
Symfony Messenger with Redis transport.
Queues
| Queue | Purpose | Workers |
|---|---|---|
images | Image resize/optimize | 2 |
emails | Email sending | 1 |
Email Service
SMTP configuration for transactional emails (verification, password reset):
php
// config/mail.php
return [
'driver' => 'smtp',
'host' => $_ENV['MAIL_HOST'], // smtp.mailgun.org
'port' => (int) $_ENV['MAIL_PORT'], // 587
'username' => $_ENV['MAIL_USERNAME'],
'password' => $_ENV['MAIL_PASSWORD'],
'encryption' => 'tls',
'from' => [
'address' => $_ENV['MAIL_FROM_ADDRESS'], // noreply@partizap.ru
'name' => $_ENV['MAIL_FROM_NAME'], // Partizap
],
'frontend_url' => $_ENV['FRONTEND_URL'], // https://partizap.ru
];php
final class EmailService
{
public function __construct(
private \Symfony\Component\Mailer\MailerInterface $mailer,
private array $config,
) {}
public function sendVerificationEmail(User $user, string $code): void
{
$email = (new Email())
->from(new Address($this->config['from']['address'], $this->config['from']['name']))
->to($user->getEmail())
->subject('Подтвердите email — Partizap')
->html($this->renderTemplate('emails/verification.html.twig', [
'user' => $user,
'code' => $code,
]));
$this->mailer->send($email);
}
public function sendPasswordResetEmail(User $user, string $token): void
{
$email = (new Email())
->from(new Address($this->config['from']['address'], $this->config['from']['name']))
->to($user->getEmail())
->subject('Сброс пароля — Partizap')
->html($this->renderTemplate('emails/password-reset.html.twig', [
'user' => $user,
'reset_url' => $this->config['frontend_url'] . '/reset-password?token=' . $token,
]));
$this->mailer->send($email);
}
}Email Message Handler (queue worker):
php
#[AsMessageHandler]
final class SendEmailHandler
{
public function __construct(
private EmailService $emailService,
private LoggerInterface $logger,
) {}
public function __invoke(SendEmailMessage $message): void
{
try {
match ($message->type) {
'verification' => $this->emailService->sendVerificationEmail(
$message->user,
$message->data['code']
),
'password_reset' => $this->emailService->sendPasswordResetEmail(
$message->user,
$message->data['token']
),
};
} catch (\Throwable $e) {
$this->logger->error('Email sending failed', [
'type' => $message->type,
'user_id' => $message->user->getId(),
'exception' => $e,
]);
throw $e;
}
}
}Image Validation
Validation pipeline runs synchronously before S3 upload to reject malicious/invalid files early.
Pipeline:
Upload → Size check → Magic bytes → Decode test → Strip EXIF → Re-encode → S3| Step | Check | Fail Action |
|---|---|---|
| 1. Size | Max 10MB | Reject: "File too large" |
| 2. Magic bytes | JPEG/PNG/WebP/GIF headers | Reject: "Unsupported format" |
| 3. Decode test | imagecreatefrom*() succeeds | Reject: "Corrupted image" |
| 4. EXIF strip | Remove all metadata | — |
| 5. Re-encode | Save as clean image | Sanitizes embedded code |
Why re-encoding sanitizes: Malicious payloads hide in metadata, comments, or trailing data. Re-encoding through GD creates a fresh image with only pixel data — any embedded PHP, JS, or polyglot payloads are discarded.
ImageValidator:
php
final class ImageValidator
{
private const MAX_SIZE = 10 * 1024 * 1024; // 10MB
private const ALLOWED_TYPES = [
'image/jpeg' => 'jpeg',
'image/png' => 'png',
'image/webp' => 'webp',
'image/gif' => 'gif',
];
public function validate(UploadedFileInterface $file): ValidatedImage
{
// 1. Size check
if ($file->getSize() > self::MAX_SIZE) {
throw new ImageValidationException('File too large (max 10MB)');
}
// 2. Magic bytes (don't trust Content-Type header)
$stream = $file->getStream();
$header = $stream->read(12);
$stream->rewind();
$detectedType = $this->detectType($header);
if ($detectedType === null) {
throw new ImageValidationException('Unsupported image format');
}
// 3. Decode test — fails if corrupted or fake
$tempPath = $this->saveToTemp($file);
$image = @imagecreatefromstring(file_get_contents($tempPath));
if ($image === false) {
unlink($tempPath);
throw new ImageValidationException('Corrupted or invalid image');
}
// 4. Strip EXIF + 5. Re-encode (creates clean image)
$cleanPath = $this->reencodeClean($image, $detectedType);
imagedestroy($image);
unlink($tempPath);
return new ValidatedImage($cleanPath, $detectedType);
}
private function detectType(string $header): ?string
{
if (str_starts_with($header, "\xFF\xD8\xFF")) return 'image/jpeg';
if (str_starts_with($header, "\x89PNG")) return 'image/png';
if (str_starts_with($header, "RIFF") && substr($header, 8, 4) === 'WEBP') return 'image/webp';
if (str_starts_with($header, "GIF87a") || str_starts_with($header, "GIF89a")) return 'image/gif';
return null;
}
private function reencodeClean(GdImage $image, string $type): string
{
$cleanPath = tempnam(sys_get_temp_dir(), 'img_clean_');
match ($type) {
'image/jpeg' => imagejpeg($image, $cleanPath, 95),
'image/png' => imagepng($image, $cleanPath, 6),
'image/webp' => imagewebp($image, $cleanPath, 95),
'image/gif' => imagegif($image, $cleanPath),
};
return $cleanPath;
}
}Image Processing Flow
Status Flow:
uploading → processing → ready
↓
error (fallback to original)Steps:
- Validate and re-encode uploaded file (see Image Validation above)
- Upload clean original to private S3 bucket:
originals/{product_id}/{uuid}.{ext} - Create
ProductImagerecord (status: 'processing') - Dispatch
ProcessImageMessageto queue - Worker generates dual-format variants (WebP + JPEG fallback)
- Upload all variants to S3
- Update record: status='ready', populate URLs
Dual Format Output (WebP + JPEG):
| Variant | Size | WebP Quality | JPEG Quality |
|---|---|---|---|
| thumbnail | 150×150 | 80 | 75 |
| medium | 600×600 | 85 | 80 |
| large | 1200×1200 | 90 | 85 |
6 files per image (3 sizes × 2 formats). Frontend uses <picture> with WebP primary, JPEG fallback.
S3 Bucket Organization:
| Bucket | Access | Purpose |
|---|---|---|
partizap-private | Private | Originals (worker access via signed URLs, 15 min TTL) |
partizap-public | Public + CDN | Processed variants (direct CDN access, 1 year cache) |
partizap-private/ # Private bucket
└── originals/
└── {product_id}/
└── {uuid}.{ext} # Original uploads (worker-only access)
partizap-public/ # Public bucket + CDN
└── products/
└── {product_id}/
├── thumbnail/{uuid}.webp
├── thumbnail/{uuid}.jpg
├── medium/{uuid}.webp
├── medium/{uuid}.jpg
├── large/{uuid}.webp
└── large/{uuid}.jpgS3StorageService:
php
final class S3StorageService
{
private const SIGNED_URL_TTL = 900; // 15 minutes
public function __construct(
private S3Client $s3,
private string $privateBucket,
private string $publicBucket,
private string $cdnBaseUrl,
) {}
// Upload original to private bucket (returns key, not URL)
public function uploadOriginal(string $localPath, int $productId, string $uuid, string $ext): string
{
$key = "originals/{$productId}/{$uuid}.{$ext}";
$this->s3->putObject([
'Bucket' => $this->privateBucket,
'Key' => $key,
'Body' => fopen($localPath, 'rb'),
'ACL' => 'private',
]);
return $key;
}
// Generate signed URL for worker to download original
public function getSignedOriginalUrl(string $key): string
{
$cmd = $this->s3->getCommand('GetObject', [
'Bucket' => $this->privateBucket,
'Key' => $key,
]);
$request = $this->s3->createPresignedRequest($cmd, '+' . self::SIGNED_URL_TTL . ' seconds');
return (string) $request->getUri();
}
// Upload processed variant to public bucket (returns CDN URL)
public function uploadVariant(string $localPath, int $productId, string $variant, string $uuid, string $ext): string
{
$key = "products/{$productId}/{$variant}/{$uuid}.{$ext}";
$this->s3->putObject([
'Bucket' => $this->publicBucket,
'Key' => $key,
'Body' => fopen($localPath, 'rb'),
'ACL' => 'public-read',
'ContentType' => $ext === 'webp' ? 'image/webp' : 'image/jpeg',
'CacheControl' => 'public, max-age=31536000', // 1 year
]);
return $this->cdnBaseUrl . '/' . $key;
}
// Delete all images for a product
public function deleteProductImages(int $productId): void
{
$this->deleteByPrefix($this->privateBucket, "originals/{$productId}/");
$this->deleteByPrefix($this->publicBucket, "products/{$productId}/");
}
}Retry Strategy:
- 3 attempts with exponential backoff: 1s, 4s, 16s
- On permanent failure: status='error', serve original, alert admin
- Silent for sellers — no notification, product works with original image
ProcessImageHandler:
php
#[AsMessageHandler]
class ProcessImageHandler
{
private const MAX_RETRIES = 3;
private const VARIANTS = [
'thumbnail' => ['width' => 150, 'height' => 150, 'webp_q' => 80, 'jpeg_q' => 75],
'medium' => ['width' => 600, 'height' => 600, 'webp_q' => 85, 'jpeg_q' => 80],
'large' => ['width' => 1200, 'height' => 1200, 'webp_q' => 90, 'jpeg_q' => 85],
];
public function __invoke(ProcessImageMessage $message): void
{
$image = $this->imageRepository->find($message->imageId);
if ($image === null || $image->getStatus() === 'ready') {
return;
}
try {
// Get signed URL for worker to download from private bucket
$signedUrl = $this->s3->getSignedOriginalUrl($image->getOriginalKey());
$original = $this->downloadFromUrl($signedUrl);
foreach (self::VARIANTS as $variant => $config) {
$resized = $this->resizer->resize($original, $config['width'], $config['height']);
// WebP
$webp = $this->encoder->toWebP($resized, $config['webp_q']);
$webpPath = $this->uploadVariant($image, $variant, $webp, 'webp');
// JPEG fallback
$jpeg = $this->encoder->toJpeg($resized, $config['jpeg_q']);
$jpegPath = $this->uploadVariant($image, $variant, $jpeg, 'jpg');
$image->setVariantUrls($variant, $webpPath, $jpegPath);
}
$image->setStatus('ready');
} catch (\Throwable $e) {
$image->incrementRetryCount();
$image->setErrorMessage($e->getMessage());
if ($image->getRetryCount() >= self::MAX_RETRIES) {
$image->setStatus('error');
$this->logger->error('Image processing failed permanently', [
'image_id' => $image->getId(),
'product_id' => $image->getProduct()->getId(),
'error' => $e->getMessage(),
]);
// Original image will be served as fallback
} else {
// Re-queue with delay
$delay = (int)(1000 * pow(4, $image->getRetryCount() - 1)); // 1s, 4s, 16s
throw new RecoverableMessageHandlingException($e->getMessage(), $delay);
}
}
$this->em->flush();
}
}Frontend <picture> Usage:
html
<picture>
<source srcset="{thumbnailWebp}" type="image/webp">
<source srcset="{thumbnailJpeg}" type="image/jpeg">
<img src="{thumbnailJpeg}" alt="{title}" loading="lazy">
</picture>Fallback Logic (when status='error'):
When processing fails, serve original via signed URL (or re-upload original to public bucket during processing as fallback):
php
// In ProductImage entity
public function getDisplayUrl(string $variant, S3StorageService $s3): string
{
if ($this->status === 'error') {
// Generate signed URL for original (short TTL, regenerated per request)
// Alternatively: copy original to public bucket during error handling
return $s3->getSignedOriginalUrl($this->originalKey);
}
return $this->getVariantUrl($variant, 'webp'); // normal path
}
// Alternative: Store public fallback URL when processing fails
public function getDisplayUrl(string $variant): string
{
if ($this->status === 'error' && $this->fallbackUrl !== null) {
return $this->fallbackUrl; // Public copy of original
}
if ($this->status === 'error') {
return '/images/placeholder.jpg'; // Ultimate fallback
}
return $this->getVariantUrl($variant, 'webp');
}Scheduled Jobs (Cron)
| Schedule | Job | Purpose |
|---|---|---|
* * * * * | SyncViewCountsCommand | Flush Redis view counters to PostgreSQL |
0 3 * * 0 | RecalculateCountersCommand | Full counter consistency recovery |
0 4 * * * | CleanExpiredSessionsCommand | Remove expired sessions from DB |
*/5 * * * * | CleanExpiredVerificationCodesCommand | Remove used/expired codes |
Cancellation Tokens
Graceful shutdown for long-running jobs via hierarchical cancellation tokens stored in Redis.
Architecture:
┌─────────────────┐ ┌─────────────────┐
│ Worker Process │ │ Redis DB 2 │
├─────────────────┤ ├─────────────────┤
│ Job receives │ check │ cancel:{id} │
│ CancellationTok ├────────►│ cancel:{id}:par │
│ │ │ TTL: 24h │
│ TokenSource │ cancel │ │
│ .cancel() ├────────►│ SET cancel:{id} │
└─────────────────┘ └─────────────────┘Components:
| Class | Purpose |
|---|---|
CancellationToken | Immutable value object passed to jobs, checks isCancelled() |
CancellationTokenSource | Creates tokens and triggers cancellation |
CancellationTokenStorage | Redis storage with tree-walk + local cache |
CancellationTokenFactory | DI factory, creates sources |
Token Hierarchy:
Parent jobs can spawn child tasks that inherit cancellation:
php
// Parent job creates child token
$childToken = $source->createChild();
$queue->dispatch(new ProcessProductJob($productId, $childToken->token()->id()));Redis structure for bulk import with children:
cancel:job_abc123 = "" # parent token
cancel:job_abc123:parent = (none) # no parent
cancel:prod_001:parent = "job_abc123" # child → parent
cancel:prod_002:parent = "job_abc123"
cancel:prod_003:parent = "job_abc123"Cancellation check walks up the tree — when parent is cancelled, all children see it immediately:
php
public function isCancelled(string $id, ?string $parentId): bool
{
if (isset($this->cancelledCache[$id])) {
return true;
}
$current = $id;
while ($current !== null) {
if ($this->redis->queues()->exists("cancel:{$current}")) {
$this->cancelledCache[$id] = true;
return true;
}
$current = $this->redis->queues()->get("cancel:{$current}:parent");
}
return false;
}Worker Integration (unified with logging):
php
final class QueueWorker
{
private array $activeTokenSources = [];
private bool $shouldStop = false;
private LoggerInterface $logger;
public function __construct(
private CancellationTokenFactory $tokenFactory,
private RedisConnectionFactory $redis,
private LoggerFactory $loggerFactory,
) {
$this->logger = $loggerFactory->create('queue');
pcntl_signal(SIGTERM, [$this, 'handleShutdown']);
pcntl_signal(SIGINT, [$this, 'handleShutdown']);
}
public function handleShutdown(): void
{
$this->logger->info('Shutdown signal received, cancelling active jobs');
$this->shouldStop = true;
foreach ($this->activeTokenSources as $source) {
$source->cancel();
}
}
public function run(): void
{
$this->logger->info('Worker started');
while (!$this->shouldStop) {
pcntl_signal_dispatch();
$job = $this->fetchJob();
if ($job === null) {
usleep(100_000); // 100ms
continue;
}
$source = $this->tokenFactory->create();
$this->activeTokenSources[$job->id] = $source;
$context = [
'job_id' => $job->getId(),
'job_class' => $job::class,
'attempt' => $job->getAttempt(),
];
try {
$this->logger->info('Job started', $context);
$job->handle($source->token());
$this->logger->info('Job completed', $context);
} catch (\Throwable $e) {
$this->logger->error('Job failed', [...$context, 'exception' => $e]);
throw $e;
} finally {
unset($this->activeTokenSources[$job->id]);
}
}
// Wait up to 30s for in-progress jobs to finish gracefully
$this->logger->info('Worker stopped');
}
}Job Implementation:
php
class ProcessImagesJob
{
public function handle(CancellationToken $token): void
{
foreach ($this->images as $image) {
if ($token->isCancelled()) {
$this->markAsIncomplete();
return; // exit gracefully
}
$this->processImage($image);
}
}
}Class Interfaces:
php
final readonly class CancellationToken
{
public function __construct(
private string $id,
private CancellationTokenStorage $storage,
private ?string $parentId = null,
) {}
public function id(): string { return $this->id; }
public function isCancelled(): bool
{
return $this->storage->isCancelled($this->id, $this->parentId);
}
}
final class CancellationTokenSource
{
public function __construct(
private CancellationToken $token,
private CancellationTokenStorage $storage,
) {}
public function token(): CancellationToken { return $this->token; }
public function cancel(): void
{
$this->storage->markCancelled($this->token->id());
}
public function createChild(): CancellationTokenSource
{
return $this->storage->createChild($this->token);
}
}Redis Keys (DB 2):
| Key | Value | TTL |
|---|---|---|
cancel:{id} | "1" (when cancelled) | 24h |
cancel:{id}:parent | parent token ID | 24h |
Error Handling:
- Fail-open policy: If Redis unreachable, jobs continue (don't cancel incorrectly)
- Local caching: After first
isCancelled() = true, cache locally to avoid Redis calls - Auto-cleanup: 24h TTL on all keys, no manual cleanup needed
Testing:
php
public function test_child_cancelled_when_parent_cancelled(): void
{
$storage = new CancellationTokenStorage($this->redis);
$parent = $storage->create();
$child = $parent->createChild();
$this->assertFalse($child->token()->isCancelled());
$parent->cancel();
$this->assertTrue($child->token()->isCancelled());
}Search
PostgreSQL Full-Text Search for MVP with query optimization.
The Problem: Naive Query
sql
-- BAD: 4 JOINs + FTS + range = slow
SELECT p.* FROM products p
JOIN product_compatibility pc ON pc.product_id = p.id
JOIN product_categories pcat ON pcat.product_id = p.id
JOIN cities c ON c.id = p.city_id
WHERE p.status = 'active'
AND p.search_vector @@ to_tsquery('генератор')
AND pc.make_id = 5 AND pc.model_id = 12
AND c.region_id = 1
AND pcat.category_id = 7
AND p.price BETWEEN 1000 AND 5000
ORDER BY p.created_at DESC
LIMIT 20;Solution: Denormalize + Filter-First Strategy
Step 1: Denormalize hot filters into products table
sql
ALTER TABLE products ADD COLUMN region_id INTEGER REFERENCES regions(id);
ALTER TABLE products ADD COLUMN primary_category_id INTEGER REFERENCES categories(id);
-- Trigger to sync region_id from city
CREATE OR REPLACE FUNCTION sync_product_region() RETURNS TRIGGER AS $$
BEGIN
NEW.region_id := (SELECT region_id FROM cities WHERE id = NEW.city_id);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_product_region
BEFORE INSERT OR UPDATE OF city_id ON products
FOR EACH ROW EXECUTE FUNCTION sync_product_region();Step 2: Composite partial indexes for common queries
sql
-- Most common: region + category + date (catalog browse)
CREATE INDEX idx_products_region_category_date
ON products(region_id, primary_category_id, created_at DESC, id DESC)
WHERE status = 'active';
-- Price sort variants
CREATE INDEX idx_products_region_category_price_asc
ON products(region_id, primary_category_id, price ASC, id ASC)
WHERE status = 'active';
CREATE INDEX idx_products_region_category_price_desc
ON products(region_id, primary_category_id, price DESC, id DESC)
WHERE status = 'active';
-- FTS + region (text search within region)
CREATE INDEX idx_products_fts_region
ON products USING GIN(search_vector)
WHERE status = 'active';Step 3: Two-phase query for compatibility filter
sql
-- Phase 1: Get product IDs matching compatibility (uses index)
WITH matching_products AS (
SELECT DISTINCT product_id
FROM product_compatibility
WHERE make_id = 5
AND (model_id IS NULL OR model_id = 12)
AND (generation_id IS NULL OR generation_id = 45)
LIMIT 1000 -- Cap to prevent explosion
)
-- Phase 2: Filter and paginate on products table
SELECT p.*
FROM products p
WHERE p.id IN (SELECT product_id FROM matching_products)
AND p.status = 'active'
AND p.region_id = 1
AND p.primary_category_id = 7
AND p.price BETWEEN 1000 AND 5000
ORDER BY p.created_at DESC, p.id DESC
LIMIT 20;Step 4: Query builder with filter priority
php
final class ProductSearchQueryBuilder
{
private const FILTER_PRIORITY = [
'compatibility' => 1, // Most selective, use CTE
'fts' => 2, // GIN index, very fast
'region' => 3, // Denormalized, indexed
'category' => 4, // Denormalized, indexed
'price' => 5, // Range, use after other filters
'steering' => 6, // Low cardinality
];
public function build(SearchFilters $filters): string
{
$qb = $this->em->createQueryBuilder()
->select('p')
->from(Product::class, 'p')
->where('p.status = :status')
->setParameter('status', 'active');
// Apply filters in priority order
if ($filters->hasCompatibility()) {
// Use CTE approach - most selective
$qb->andWhere('p.id IN (
SELECT DISTINCT pc.product_id FROM product_compatibility pc
WHERE pc.make_id = :make_id
AND (:model_id IS NULL OR pc.model_id = :model_id)
)');
}
if ($filters->hasQuery()) {
$qb->andWhere('p.searchVector @@ plainto_tsquery(:query)');
}
// Direct column filters (denormalized, fast)
if ($filters->regionId) {
$qb->andWhere('p.regionId = :region_id');
}
if ($filters->categoryId) {
$qb->andWhere('p.primaryCategoryId = :category_id');
}
// Range filter last
if ($filters->priceMin || $filters->priceMax) {
$qb->andWhere('p.price BETWEEN :price_min AND :price_max');
}
return $qb;
}
}Query Performance Comparison
| Query Type | Before (JOINs) | After (Denorm) | Improvement |
|---|---|---|---|
| Region + Category | ~150ms | ~5ms | 30× |
| Compatibility + Region | ~300ms | ~25ms | 12× |
| FTS + Region + Category | ~200ms | ~15ms | 13× |
| Full filters | ~500ms | ~40ms | 12× |
Estimates for 100K products, proper indexes
When to JOIN vs Denormalize
| Filter | Strategy | Reason |
|---|---|---|
| Region | Denormalize | Every query uses it |
| Primary category | Denormalize | Most queries use it |
| Secondary categories | JOIN | Rare, M:N relationship |
| Compatibility | CTE subquery | M:N, selective filter |
| District/Metro | JOIN | Rare, already on products |
Filter Parameters
q Full-text query
make_id Car make filter
model_id Car model filter
generation_id Generation filter
region_id Region filter
city_id City filter
district_id District filter
category_id[] Category filters (multiple)
condition Condition category slug
steering left|right|universal
price_min Min price
price_max Max price
sort price_asc|price_desc|date_desc
cursor Pagination cursor (base64)
per_page Items per page (max 100)Cursor-Based Pagination
Offset pagination degrades with large offsets (>10,000 rows). Cursor-based (keyset) pagination maintains constant performance.
Why Cursor Pagination
| Approach | Page 500 (20 items) | Performance |
|---|---|---|
| Offset | OFFSET 10000 LIMIT 20 — scans 10,020 rows | O(offset + limit) — slow |
| Cursor | WHERE (created_at, id) < cursor LIMIT 20 — scans ~20 rows | O(limit) — fast |
Cursor Format
json
// Encoded as base64 in URL
{
"created_at": "2026-01-24T10:30:00Z",
"id": 12345
}API Response
json
{
"data": [...],
"meta": {
"per_page": 20,
"has_more": true,
"next_cursor": "eyJjcmVhdGVkX2F0IjoiMjAyNi0wMS0yNCIsImlkIjoxMjM0NX0=",
"prev_cursor": "eyJjcmVhdGVkX2F0IjoiMjAyNi0wMS0yNCIsImlkIjoxMjM2NX0="
}
}Sort-Specific Cursors
| Sort | Cursor Fields | Query Condition |
|---|---|---|
date_desc | created_at, id | (created_at, id) < (:date, :id) |
price_asc | price, id | (price, id) > (:price, :id) |
price_desc | price, id | (price, id) < (:price, :id) |
Query Example
sql
-- First page
SELECT * FROM products
WHERE status = 'active'
ORDER BY created_at DESC, id DESC
LIMIT 20;
-- Next page with cursor
SELECT * FROM products
WHERE status = 'active'
AND (created_at, id) < ('2026-01-24 10:30:00', 12345)
ORDER BY created_at DESC, id DESC
LIMIT 20;CursorPaginator Service
HMAC-signed cursors prevent tampering and injection attacks:
php
final class CursorPaginator
{
public function __construct(
private string $secretKey, // From environment
) {}
public function encode(array $lastItem, string $sort): string
{
$cursor = match($sort) {
'date_desc' => [
'created_at' => $lastItem['created_at'],
'id' => $lastItem['id'],
],
'price_asc', 'price_desc' => [
'price' => $lastItem['price'],
'id' => $lastItem['id'],
],
default => ['id' => $lastItem['id']],
};
$payload = json_encode($cursor);
$signature = hash_hmac('sha256', $payload, $this->secretKey);
return base64_encode($payload . '.' . $signature);
}
public function decode(string $cursor): ?array
{
$decoded = base64_decode($cursor, true);
if ($decoded === false) {
return null;
}
$parts = explode('.', $decoded, 2);
if (count($parts) !== 2) {
return null;
}
[$payload, $signature] = $parts;
// Constant-time comparison to prevent timing attacks
$expectedSignature = hash_hmac('sha256', $payload, $this->secretKey);
if (!hash_equals($expectedSignature, $signature)) {
return null; // Tampered cursor
}
$data = json_decode($payload, true);
if (!is_array($data)) {
return null;
}
// Validate expected fields exist and have correct types
if (isset($data['id']) && !is_int($data['id'])) {
return null;
}
if (isset($data['price']) && !is_numeric($data['price'])) {
return null;
}
return $data;
}
}Directory Structure
app/
├── Actions/
│ ├── Auth/
│ ├── Store/
│ ├── Vendor/
│ └── Admin/
├── Application/
│ ├── Service/
│ ├── DTO/
│ └── Exception/
├── Domain/
│ ├── Entity/
│ ├── Repository/
│ ├── Enum/
│ └── ValueObject/
└── Infrastructure/
├── Persistence/Doctrine/
├── Auth/
├── Cache/
├── Storage/
├── Queue/
└── Middleware/
config/
├── container.php
├── routes.php
├── middleware.php
├── settings.php
└── doctrine.php
database/
├── migrations/
└── seeds/Implementation Phases
Phase 1: Foundation
- Project scaffolding (Slim 4, PHP-DI, Doctrine)
- Docker setup (PHP 8.2, Nginx, PostgreSQL, PgBouncer)
- PgBouncer connection pooling (transaction mode)
- Redis HA setup (Primary + Replica + 3× Sentinel)
- RedisConnectionFactory service (read/write separation)
- Base entity classes, repository interfaces
- Database migrations infrastructure
- Error handling, response formatting
Phase 2: Auth System
- User, UserSession, VerificationCode entities
- Session service (Redis-backed)
- CSRF protection middleware
- Register, Login, Logout, Me endpoints
- Email verification flow
- Password reset flow
- Auth middleware
Phase 3: Reference Data
- Car entities (Make, Model, Generation)
- Geo entities (Region, City, District, MetroStation)
- Category entity (hierarchical, with types)
- Seeders with real data
- Public API endpoints
- Redis caching (24h TTL)
Phase 4: Products Core
- Product, ProductCategory, ProductCompatibility entities
- OemNumber, ProductOem, OemCrossReference entities
- Product repository with search
- CRUD services
- Vendor endpoints
- Store endpoints
- PostgreSQL FTS setup
Phase 5: Image Processing
- S3 storage service
- Symfony Messenger setup
- ProductImage entity
- Upload endpoint
- ProcessImageHandler
- Worker process + Docker service
- Cancellation tokens for graceful shutdown — see Cancellation Tokens
Phase 6: Admin Panel
- Admin middleware
- Dashboard stats (ProductView, SearchLog entities)
- User management
- Product moderation
- Reference data CRUD (cars, categories, geo)
Phase 7: Polish & Deploy
- Rate limiting middleware
- Error logging & monitoring (Monolog + Sentry) — see Error Logging & Monitoring
- Health check endpoint
- API documentation (OpenAPI) — see API Documentation (Swagger/OpenAPI)
- Production Docker config
Entity Summary (MVP)
| # | Entity | Purpose |
|---|---|---|
| 1 | User | User accounts (+ brute force fields) |
| 2 | UserSession | Session tracking |
| 3 | VerificationCode | Email/phone verification |
| 4 | PasswordResetToken | Secure password reset tokens |
| 5 | LoginAttempt | Security audit log |
| 6 | BusinessProfile | Extended profile for business accounts |
| 7 | CarMake | Car manufacturers |
| 8 | CarModel | Car models |
| 9 | CarGeneration | Model generations/years |
| 10 | Region | Geographic regions |
| 11 | City | Cities |
| 12 | District | City districts |
| 13 | MetroStation | Metro stations |
| 14 | Category | Part categories (hierarchical) |
| 15 | Product | Auto part listings |
| 16 | ProductCategory | Category assignment (M:N) |
| 17 | ProductImage | Product photos (WebP + JPEG, retry tracking) |
| 18 | ProductCompatibility | Car compatibility |
| 19 | OemNumber | OEM part numbers |
| 20 | ProductOem | Product-OEM mapping (M:N) |
| 21 | OemCrossReference | OEM analogs |
| 22 | Favorite | User favorites (products) |
| 23 | ProductView | View tracking |
| 24 | SearchLog | Search analytics |
| 25 | AdminAuditLog | Admin action audit trail |
Total: 25 entities
Post-MVP Iterations
- Iteration 2: Messaging (conversations, messages)
- Iteration 3: Reviews, Reports
- Iteration 4: Phone verification (SMS.ru)
- Iteration 5: Payments (YooKassa) - premium listings, promotions
- Iteration 6: Meilisearch migration (when >50K products)