Налаштування черг повідомлень (Redis Pub/Sub) для веб-застосунку

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування черг повідомлень (Redis Pub/Sub) для веб-застосунку
Середня
~2-3 робочих дні
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування Redis Pub/Sub та Streams як чергиці повідомлень

Redis забезпечує два механізми для асинхронних повідомлень: Pub/Sub — простий fire-and-forget без персистентності, та Streams — персистентна черга з групами користувачів, подібна до облегченої Kafka.

Redis Pub/Sub

Підходить для real-time повідомлень усередині додатку. Повідомлення не зберігаються — якщо підписник відключений, повідомлення теряється.

// Laravel: публікація через Redis Pub/Sub
use Illuminate\Support\Facades\Redis;

// Publisher
Redis::publish('user-notifications', json_encode([
    'user_id' => $userId,
    'type'    => 'order.shipped',
    'message' => 'Ваше замовлення відправлено',
]));

// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
    protected $signature = 'redis:subscribe';

    public function handle(): void
    {
        Redis::subscribe(['user-notifications'], function (string $message) {
            $data = json_decode($message, true);
            broadcast(new UserNotificationEvent($data));  // → WebSocket
        });
    }
}

Redis Streams

Streams — правильний вибір для task queue на Redis. Повідомлення зберігаються в потоці, групи користувачів відслідковують прогрес, pending entries — необроблені повідомлення.

# Створити потік та додати повідомлення
XADD emails * user_id 123 email [email protected] template welcome

# Створити групу користувачів
XGROUP CREATE emails email-workers $ MKSTREAM

# Читати нові повідомлення (воркер 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >

# Підтвердити обробку
XACK emails email-workers <message-id>

PHP: Redis Streams воркер

use Illuminate\Support\Facades\Redis;

class RedisStreamWorker
{
    private string $stream = 'emails';
    private string $group = 'email-workers';
    private string $consumer;

    public function __construct()
    {
        $this->consumer = gethostname() . ':' . getmypid();
        $this->ensureGroup();
    }

    private function ensureGroup(): void
    {
        try {
            Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
        } catch (\Throwable) {
            // Група вже існує
        }
    }

    public function run(): void
    {
        while (true) {
            // Спочатку обробити pending (непідтверджені з попереднього запуску)
            $pending = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '0'],  // '0' = pending messages
                10
            );
            $this->processMessages($pending);

            // Потім нові повідомлення
            $messages = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '>'],  // '>' = only new
                10,
                5000  // блокування 5 секунд
            );
            $this->processMessages($messages);
        }
    }

    private function processMessages(?array $streams): void
    {
        if (!$streams) return;

        foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
            try {
                $this->handleEmail($fields);
                Redis::xack($this->stream, $this->group, $id);
            } catch (\Throwable $e) {
                Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
                // Повідомлення залишається в pending — буде перечитано при наступному запуску
            }
        }
    }

    private function handleEmail(array $fields): void
    {
        Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
    }
}

Node.js: ioredis Streams

import Redis from 'ioredis';

const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;

async function startWorker(): Promise<void> {
  // Створити групу якщо не існує
  try {
    await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
  } catch { /* group exists */ }

  while (true) {
    const messages = await redis.xreadgroup(
      'GROUP', GROUP, CONSUMER,
      'COUNT', '10',
      'BLOCK', '5000',
      'STREAMS', STREAM, '>'
    ) as [string, [string, string[]][]][] | null;

    if (!messages) continue;

    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        const data = Object.fromEntries(
          fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
        );

        try {
          await sendEmail(data);
          await redis.xack(STREAM, GROUP, id);
        } catch (err) {
          console.error('Email failed:', id, err);
        }
      }
    }
  }
}

Обрізання та очистка потоку

# Обрізати потік до останніх 10000 повідомлень
XTRIM emails MAXLEN ~ 10000

# Автоматично при додаванні
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome

Порівняння механізмів Redis

Характеристика Pub/Sub Streams Lists (LPUSH/BRPOP)
Персистентність Ні Так Так
Групи користувачів Ні Так Ні
Повторне відтворення історії Ні Так Ні
Складність Мінімальна Середня Мінімальна
Застосування Real-time события Task queue Simple queue

Терміни реалізації

Redis Streams воркер для типового PHP/Node.js додатку (електронна пошта, сповіщення): 1–2 дні. З моніторингом pending messages та сигналами тривоги: 2–3 дні.