Реализация Real-Time Notifications (WebSocket/SSE) на сайте
Уведомления в реальном времени — один из самых частых запросов. Технически между WebSocket и SSE принципиальная разница: SSE — однонаправленный поток от сервера к клиенту через обычный HTTP, WebSocket — двунаправленный канал. Для уведомлений чаще достаточно SSE.
SSE vs WebSocket: когда что выбирать
SSE подходит, если нужно только получать события с сервера: новые сообщения, обновления статуса, алерты. Работает через стандартный HTTP/2, поддерживает автоматическое переподключение, не требует дополнительных библиотек на клиенте.
WebSocket нужен, если клиент тоже отправляет данные в реальном времени: чат, игры, совместное редактирование.
SSE: Server → Client (one-way, HTTP)
WebSocket: Server ↔ Client (two-way, WS protocol)
SSE: реализация на Node.js/Express
// server/routes/notifications.ts
import { Router, Request, Response } from 'express';
import { authMiddleware } from '../middleware/auth';
const router = Router();
// Map userId -> Set<Response>
const clients = new Map<string, Set<Response>>();
router.get('/stream', authMiddleware, (req: Request, res: Response) => {
const userId = req.user!.id;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // важно для nginx
});
// Heartbeat каждые 30 секунд — иначе прокси/браузер разорвёт соединение
const heartbeat = setInterval(() => {
res.write(':heartbeat\n\n');
}, 30_000);
// Регистрация клиента
if (!clients.has(userId)) clients.set(userId, new Set());
clients.get(userId)!.add(res);
// Начальный снимок непрочитанных
getUnreadNotifications(userId).then((notifications) => {
res.write(sseEvent('init', notifications));
});
req.on('close', () => {
clearInterval(heartbeat);
clients.get(userId)?.delete(res);
if (clients.get(userId)?.size === 0) clients.delete(userId);
});
});
function sseEvent(type: string, data: unknown, id?: string): string {
let msg = '';
if (id) msg += `id: ${id}\n`;
msg += `event: ${type}\n`;
msg += `data: ${JSON.stringify(data)}\n\n`;
return msg;
}
// Публичная функция для отправки уведомления пользователю
export function pushNotification(userId: string, notification: Notification) {
const userClients = clients.get(userId);
if (!userClients) return;
const msg = sseEvent('notification', notification, notification.id);
userClients.forEach((res) => res.write(msg));
}
export default router;
Клиентская часть: EventSource
class NotificationService {
private es: EventSource | null = null;
private reconnectDelay = 1000;
connect() {
this.es = new EventSource('/api/notifications/stream', {
withCredentials: true,
});
this.es.addEventListener('init', (e) => {
const notifications = JSON.parse(e.data);
notificationStore.setAll(notifications);
});
this.es.addEventListener('notification', (e) => {
const notification = JSON.parse(e.data);
notificationStore.add(notification);
this.showToast(notification);
});
this.es.addEventListener('error', () => {
this.es?.close();
// Экспоненциальный backoff
setTimeout(() => {
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30_000);
this.connect();
}, this.reconnectDelay);
});
this.es.addEventListener('open', () => {
this.reconnectDelay = 1000; // сброс при успешном подключении
});
}
disconnect() {
this.es?.close();
this.es = null;
}
private showToast(notification: Notification) {
// интеграция с библиотекой toast по выбору (sonner, react-hot-toast, etc.)
toast(notification.title, {
description: notification.body,
action: notification.actionUrl
? { label: 'Открыть', onClick: () => navigate(notification.actionUrl!) }
: undefined,
});
}
}
WebSocket-вариант: интеграция с очередью
Для продакшена уведомления не отправляются напрямую из запроса в SSE-клиент — между слоем приложения и доставкой стоит очередь:
HTTP Request → DB сохранение → Redis Publish → WebSocket Server → Client
// Публикация события (из любого сервиса/воркера)
import { createClient } from 'redis';
const pub = createClient({ url: process.env.REDIS_URL });
await pub.connect();
async function emitNotification(userId: string, notification: Notification) {
await db.notifications.create({ data: notification });
await pub.publish(
`notifications:${userId}`,
JSON.stringify(notification)
);
}
// WebSocket сервер подписывается через отдельный Redis subscriber
const sub = createClient({ url: process.env.REDIS_URL });
await sub.connect();
io.on('connection', (socket) => {
const userId = socket.data.userId;
// Подписка на персональный канал
sub.subscribe(`notifications:${userId}`, (message) => {
socket.emit('notification', JSON.parse(message));
});
socket.on('notification:read', async (notificationId: string) => {
await db.notifications.update({
where: { id: notificationId },
data: { readAt: new Date() },
});
});
socket.on('disconnect', () => {
sub.unsubscribe(`notifications:${userId}`);
});
});
Структура уведомления
interface Notification {
id: string;
userId: string;
type: 'comment' | 'mention' | 'order' | 'system' | 'alert';
title: string;
body: string;
actorId?: string; // кто инициировал
entityType?: string; // 'post' | 'order' | ...
entityId?: string;
actionUrl?: string;
imageUrl?: string;
readAt?: Date;
createdAt: Date;
}
Группировка и батчинг
Если в секунду приходит много событий (массовая рассылка, стрим данных), клиент получает отдельный SSE-event на каждое. Лучше группировать:
// Серверный буфер: 200ms debounce на flush
const pendingByUser = new Map<string, Notification[]>();
function bufferNotification(userId: string, notification: Notification) {
if (!pendingByUser.has(userId)) {
pendingByUser.set(userId, []);
setTimeout(() => flushUser(userId), 200);
}
pendingByUser.get(userId)!.push(notification);
}
function flushUser(userId: string) {
const batch = pendingByUser.get(userId) ?? [];
pendingByUser.delete(userId);
if (batch.length === 1) {
pushToClient(userId, sseEvent('notification', batch[0]));
} else {
pushToClient(userId, sseEvent('notifications:batch', batch));
}
}
Масштабирование SSE
SSE держит HTTP-соединение открытым — каждый подключённый пользователь занимает один file descriptor. Node.js комфортно держит 10k+ соединений, но при горизонтальном масштабировании (несколько инстансов) пользователь может быть подключён к инстансу A, а уведомление сгенерировано инстансом B. Redis Pub/Sub решает это — каждый инстанс подписан на все каналы и доставляет только своим клиентам.
Для nginx: проксирование SSE требует отключить буферизацию:
location /api/notifications/stream {
proxy_pass http://app_backend;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_set_header Connection '';
chunked_transfer_encoding on;
}
Реализация SSE-уведомлений с Redis: 2–3 дня. Добавление WebSocket с двусторонней логикой (read receipts, typing): ещё день.







