Реализация потокового вывода LLM (Streaming Responses) в приложении
Стриминг меняет UX радикально: пользователь видит ответ через 200–500 мс вместо ожидания 3–10 секунд. Технически это Server-Sent Events (SSE) на бэкенде и потоковая обработка на фронтенде. Правильная реализация требует обработки прерываний, backpressure и корректного завершения потока.
Backend: FastAPI + SSE
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
import asyncio
import json
app = FastAPI()
anthropic_client = AsyncAnthropic()
openai_client = AsyncOpenAI()
async def stream_anthropic(messages: list[dict], system: str = "") -> AsyncGenerator[str, None]:
"""Генератор для Claude streaming"""
async with anthropic_client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
# SSE формат: "data: {json}\n\n"
yield f"data: {json.dumps({'text': text})}\n\n"
# Сигнал завершения
yield f"data: {json.dumps({'done': True})}\n\n"
async def stream_openai(messages: list[dict]) -> AsyncGenerator[str, None]:
"""Генератор для OpenAI streaming"""
async with await openai_client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
) as stream:
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield f"data: {json.dumps({'text': delta.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: dict):
messages = request.get("messages", [])
provider = request.get("provider", "anthropic")
generator = (
stream_anthropic(messages)
if provider == "anthropic"
else stream_openai(messages)
)
return StreamingResponse(
generator,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Для nginx
}
)
Frontend: React с SSE
import { useState, useCallback } from 'react';
function useStreamingChat() {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (messages: Message[]) => {
setIsStreaming(true);
setResponse('');
const eventSource = new EventSource('/chat/stream', {
// Для POST запросов используем fetch с ReadableStream
});
// Для POST — используем fetch
const res = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.done) {
setIsStreaming(false);
return;
}
setResponse(prev => prev + data.text);
}
}
}
setIsStreaming(false);
}, []);
return { response, isStreaming, sendMessage };
}
WebSocket альтернатива
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
messages = data.get("messages", [])
async with anthropic_client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=messages,
) as stream:
async for text in stream.text_stream:
await websocket.send_json({"type": "token", "text": text})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass
Обработка прерываний и backpressure
import asyncio
async def stream_with_cancellation(
messages: list[dict],
cancel_event: asyncio.Event,
) -> AsyncGenerator[str, None]:
"""Стриминг с поддержкой отмены"""
async with anthropic_client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=messages,
) as stream:
async for text in stream.text_stream:
if cancel_event.is_set():
# Клиент отменил запрос — прекращаем
stream.close()
yield f"data: {json.dumps({'cancelled': True})}\n\n"
return
yield f"data: {json.dumps({'text': text})}\n\n"
Накопление контента и Tool Use со стримингом
# Anthropic позволяет стримить AND получать tool_use блоки
async def stream_with_tools(messages: list[dict]):
full_text = ""
tool_calls = []
async with anthropic_client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
tools=TOOLS,
messages=messages,
) as stream:
async for event in stream:
if hasattr(event, "type"):
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
full_text += event.delta.text
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
elif hasattr(event.delta, "partial_json"):
# Накапливаем JSON для tool_use
pass
final_message = await stream.get_final_message()
# Обрабатываем tool_use из финального сообщения
Сроки
- FastAPI SSE endpoint: 1–2 дня
- Frontend React компонент: 1–2 дня
- WebSocket альтернатива: 2–3 дня
- Отмена запросов + backpressure: 1–2 дня







