Налаштування Redis Pub/Sub та Streams як чергиці повідомлень
Redis забезпечує два механізми для асинхронних повідомлень: Pub/Sub — простий fire-and-forget без персистентності, та Streams — персистентна черга з групами користувачів, подібна до облегченої Kafka.
Redis Pub/Sub
Підходить для real-time повідомлень усередині додатку. Повідомлення не зберігаються — якщо підписник відключений, повідомлення теряється.
// Laravel: публікація через Redis Pub/Sub
use Illuminate\Support\Facades\Redis;
// Publisher
Redis::publish('user-notifications', json_encode([
'user_id' => $userId,
'type' => 'order.shipped',
'message' => 'Ваше замовлення відправлено',
]));
// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
protected $signature = 'redis:subscribe';
public function handle(): void
{
Redis::subscribe(['user-notifications'], function (string $message) {
$data = json_decode($message, true);
broadcast(new UserNotificationEvent($data)); // → WebSocket
});
}
}
Redis Streams
Streams — правильний вибір для task queue на Redis. Повідомлення зберігаються в потоці, групи користувачів відслідковують прогрес, pending entries — необроблені повідомлення.
# Створити потік та додати повідомлення
XADD emails * user_id 123 email [email protected] template welcome
# Створити групу користувачів
XGROUP CREATE emails email-workers $ MKSTREAM
# Читати нові повідомлення (воркер 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >
# Підтвердити обробку
XACK emails email-workers <message-id>
PHP: Redis Streams воркер
use Illuminate\Support\Facades\Redis;
class RedisStreamWorker
{
private string $stream = 'emails';
private string $group = 'email-workers';
private string $consumer;
public function __construct()
{
$this->consumer = gethostname() . ':' . getmypid();
$this->ensureGroup();
}
private function ensureGroup(): void
{
try {
Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
} catch (\Throwable) {
// Група вже існує
}
}
public function run(): void
{
while (true) {
// Спочатку обробити pending (непідтверджені з попереднього запуску)
$pending = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '0'], // '0' = pending messages
10
);
$this->processMessages($pending);
// Потім нові повідомлення
$messages = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '>'], // '>' = only new
10,
5000 // блокування 5 секунд
);
$this->processMessages($messages);
}
}
private function processMessages(?array $streams): void
{
if (!$streams) return;
foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
try {
$this->handleEmail($fields);
Redis::xack($this->stream, $this->group, $id);
} catch (\Throwable $e) {
Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
// Повідомлення залишається в pending — буде перечитано при наступному запуску
}
}
}
private function handleEmail(array $fields): void
{
Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
}
}
Node.js: ioredis Streams
import Redis from 'ioredis';
const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;
async function startWorker(): Promise<void> {
// Створити групу якщо не існує
try {
await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
} catch { /* group exists */ }
while (true) {
const messages = await redis.xreadgroup(
'GROUP', GROUP, CONSUMER,
'COUNT', '10',
'BLOCK', '5000',
'STREAMS', STREAM, '>'
) as [string, [string, string[]][]][] | null;
if (!messages) continue;
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
const data = Object.fromEntries(
fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
);
try {
await sendEmail(data);
await redis.xack(STREAM, GROUP, id);
} catch (err) {
console.error('Email failed:', id, err);
}
}
}
}
}
Обрізання та очистка потоку
# Обрізати потік до останніх 10000 повідомлень
XTRIM emails MAXLEN ~ 10000
# Автоматично при додаванні
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome
Порівняння механізмів Redis
| Характеристика | Pub/Sub | Streams | Lists (LPUSH/BRPOP) |
|---|---|---|---|
| Персистентність | Ні | Так | Так |
| Групи користувачів | Ні | Так | Ні |
| Повторне відтворення історії | Ні | Так | Ні |
| Складність | Мінімальна | Середня | Мінімальна |
| Застосування | Real-time события | Task queue | Simple queue |
Терміни реалізації
Redis Streams воркер для типового PHP/Node.js додатку (електронна пошта, сповіщення): 1–2 дні. З моніторингом pending messages та сигналами тривоги: 2–3 дні.







