Реалізація GraphQL Subscriptions (real-time підписки)
GraphQL Subscriptions — механізм довгоживучого з'єднання між клієнтом і сервером, через яке сервер відправляє дані при виникненні подій. Транспорт — зазвичай WebSocket (протокол graphql-ws або застарілий subscriptions-transport-ws), рідше SSE. Subscriptions — третя операція GraphQL поряд з Query та Mutation.
Коли це потрібно
Для сайту з GraphQL API Subscriptions закривають задачі, де потрібно оновлювати UI без дій користувача: real-time чат, сповіщення, відстеження статусу замовлення, live-статистика, спільне редагування. Якщо вже є GraphQL, додавання Subscriptions дешевше, ніж будувати окремий WebSocket-сервер.
Серверна частина: Node.js + graphql-ws
Сучасний стандарт — пакет graphql-ws, який реалізує протокол graphql-transport-ws:
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { PubSub } from 'graphql-subscriptions';
const pubsub = new PubSub();
const typeDefs = `
type Message {
id: ID!
roomId: String!
authorId: String!
text: String!
createdAt: String!
}
type OrderStatus {
orderId: ID!
status: String!
updatedAt: String!
}
type Query {
messages(roomId: String!): [Message!]!
}
type Mutation {
sendMessage(roomId: String!, text: String!): Message!
}
type Subscription {
messageAdded(roomId: String!): Message!
orderStatusChanged(orderId: ID!): OrderStatus!
}
`;
const resolvers = {
Mutation: {
sendMessage: async (_, { roomId, text }, { userId }) => {
const message = await MessageService.create({ roomId, text, authorId: userId });
// Публікуємо подію
pubsub.publish(`MESSAGE_ADDED:${roomId}`, { messageAdded: message });
return message;
},
},
Subscription: {
messageAdded: {
subscribe: (_, { roomId }, { userId }) => {
// Перевіряємо доступ користувача до кімнати
if (!ChatRoom.hasAccess(userId, roomId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
},
},
orderStatusChanged: {
subscribe: (_, { orderId }, { userId }) => {
// Користувач може підписатися лише на свої замовлення
if (!Order.belongsTo(orderId, userId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`ORDER_STATUS:${orderId}`);
},
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' });
useServer(
{
schema,
context: async (ctx) => {
// Аутентифікація через параметри з'єднання
const token = ctx.connectionParams?.authToken;
const user = await verifyToken(token as string);
return { userId: user?.id };
},
onConnect: async (ctx) => {
const token = ctx.connectionParams?.authToken;
if (!token) return false; // відхилити з'єднання
return true;
},
onDisconnect: (ctx, code, reason) => {
console.log(`Client disconnected: ${code} ${reason}`);
},
},
wsServer
);
httpServer.listen(4000);
Масштабування: Redis PubSub замість in-memory
Вбудований PubSub з graphql-subscriptions — in-memory, працює лише в межах одного процесу. При кількох екземплярах потрібен graphql-redis-subscriptions:
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const options = {
host: process.env.REDIS_HOST,
port: 6379,
retryStrategy: (times: number) => Math.min(times * 50, 2000),
};
const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options),
});
// Використання ідентично — pubsub.publish() та pubsub.asyncIterator() ті ж
Тепер будь-який екземпляр може опубліковувати подію, і всі підписники на всіх екземплярах її отримають через Redis Pub/Sub.
Клієнтська частина: Apollo Client
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';
const httpLink = new HttpLink({ uri: '/graphql' });
const wsLink = new GraphQLWsLink(
createClient({
url: 'wss://example.com/graphql',
connectionParams: () => ({
authToken: localStorage.getItem('token'),
}),
shouldRetry: () => true,
retryAttempts: 10,
on: {
connected: () => console.log('WS connected'),
closed: () => console.log('WS closed'),
},
})
);
// Query/Mutation йдуть через HTTP, Subscription — через WS
const splitLink = split(
({ query }) => {
const def = getMainDefinition(query);
return def.kind === 'OperationDefinition' && def.operation === 'subscription';
},
wsLink,
httpLink
);
export const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});
import { useSubscription, gql } from '@apollo/client';
const MESSAGE_ADDED = gql`
subscription MessageAdded($roomId: String!) {
messageAdded(roomId: $roomId) {
id
text
authorId
createdAt
}
}
`;
function ChatRoom({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useSubscription(MESSAGE_ADDED, {
variables: { roomId },
onData: ({ data }) => {
const newMessage = data.data?.messageAdded;
if (newMessage) {
setMessages(prev => [...prev, newMessage]);
}
},
onError: (error) => console.error('Subscription error:', error),
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.text}</div>
))}
</div>
);
}
Фільтрація подій на рівні сервера
Іноді потрібно фільтрувати события прямо у резолвері, щоб не гонити лишнє по мережі:
import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
messageAdded: {
// withFilter обертає asyncIterator і фільтрує события
subscribe: withFilter(
(_, { roomId }) => pubsub.asyncIterator(`MESSAGES`),
(payload, variables) => {
// Відправляємо лише якщо roomId збігається
return payload.messageAdded.roomId === variables.roomId;
}
),
},
},
};
Це дозволяє використовувати один широкий канал MESSAGES замість каналу на кожну кімнату, фільтруючи на сервері.
Управління з'єднаннями та утечки пам'яті
При невдалому обращенні Subscriptions можуть привести до утечок: ітератори не закриваються, слухачі накопичуються.
// Правильне завершення ітератора
const MESSAGE_ADDED_SUBSCRIPTION = {
subscribe: async function* (_, { roomId }, context) {
const iterator = pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
try {
for await (const value of iterator) {
yield value;
}
} finally {
// Викликається при відписці клієнта
iterator.return?.();
}
},
};
graphql-ws автоматично викликає return() у ітератора при закритті з'єднання або явній відписці, але явний try/finally захищає від нестандартних випадків.
Інтеграція з Laravel бекенд
Якщо GraphQL API реалізований на PHP (Lighthouse), можна публікувати события через Redis з Laravel та приймати в Node.js WebSocket-сервері:
// Laravel публікує подію
Redis::publish('ORDER_STATUS:' . $order->id, json_encode([
'orderStatusChanged' => [
'orderId' => $order->id,
'status' => $order->status,
'updatedAt' => now()->toISOString(),
],
]));
// Node.js WS-сервер слухає Redis і форвардить у pubsub
const subscriber = new Redis({ host: process.env.REDIS_HOST });
subscriber.psubscribe('ORDER_STATUS:*');
subscriber.on('pmessage', (pattern, channel, message) => {
const orderId = channel.split(':')[1];
const data = JSON.parse(message);
pubsub.publish(`ORDER_STATUS:${orderId}`, data);
});
Тестування
import { createTestClient } from 'apollo-server-testing';
import { execute, subscribe } from 'graphql';
it('повинен доставляти повідомлення підписникам', async () => {
const results: any[] = [];
const iterator = await subscribe({
schema,
document: parse(`subscription { messageAdded(roomId: "room1") { id text } }`),
contextValue: { userId: 'user1' },
});
// Збираємо першу подію
const firstResult = (await (iterator as AsyncIterator<any>).next()).value;
// Публікуємо подію
pubsub.publish('MESSAGE_ADDED:room1', {
messageAdded: { id: '1', text: 'Hello', roomId: 'room1', authorId: 'user2' }
});
expect(firstResult.data.messageAdded.text).toBe('Hello');
});
Терміни
Базові Subscriptions з одним типом события на існуючому GraphQL-сервері — 2–3 дні. Повноцінна реалізація з Redis PubSub, аутентифікацією через connectionParams, кількома типами підписок і тестами — 1–1.5 тижня. Додавання до Laravel/Lighthouse через гібридну схему PHP + Node.js — ще 2–3 дні.







