LLM Streaming Responses Implementation in Application

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
LLM Streaming Responses Implementation in Application
Medium
~2-3 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Реализация потокового вывода LLM (Streaming Responses) в приложении

Стриминг меняет UX радикально: пользователь видит ответ через 200–500 мс вместо ожидания 3–10 секунд. Технически это Server-Sent Events (SSE) на бэкенде и потоковая обработка на фронтенде. Правильная реализация требует обработки прерываний, backpressure и корректного завершения потока.

Backend: FastAPI + SSE

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
import asyncio
import json

app = FastAPI()
anthropic_client = AsyncAnthropic()
openai_client = AsyncOpenAI()

async def stream_anthropic(messages: list[dict], system: str = "") -> AsyncGenerator[str, None]:
    """Генератор для Claude streaming"""
    async with anthropic_client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=2048,
        system=system,
        messages=messages,
    ) as stream:
        async for text in stream.text_stream:
            # SSE формат: "data: {json}\n\n"
            yield f"data: {json.dumps({'text': text})}\n\n"
        # Сигнал завершения
        yield f"data: {json.dumps({'done': True})}\n\n"

async def stream_openai(messages: list[dict]) -> AsyncGenerator[str, None]:
    """Генератор для OpenAI streaming"""
    async with await openai_client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    ) as stream:
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {json.dumps({'text': delta.content})}\n\n"
        yield f"data: {json.dumps({'done': True})}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    messages = request.get("messages", [])
    provider = request.get("provider", "anthropic")

    generator = (
        stream_anthropic(messages)
        if provider == "anthropic"
        else stream_openai(messages)
    )

    return StreamingResponse(
        generator,
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Для nginx
        }
    )

Frontend: React с SSE

import { useState, useCallback } from 'react';

function useStreamingChat() {
  const [response, setResponse] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (messages: Message[]) => {
    setIsStreaming(true);
    setResponse('');

    const eventSource = new EventSource('/chat/stream', {
      // Для POST запросов используем fetch с ReadableStream
    });

    // Для POST — используем fetch
    const res = await fetch('/chat/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ messages }),
    });

    const reader = res.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\n\n');

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6));
          if (data.done) {
            setIsStreaming(false);
            return;
          }
          setResponse(prev => prev + data.text);
        }
      }
    }
    setIsStreaming(false);
  }, []);

  return { response, isStreaming, sendMessage };
}

WebSocket альтернатива

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()
            messages = data.get("messages", [])

            async with anthropic_client.messages.stream(
                model="claude-sonnet-4-5",
                max_tokens=2048,
                messages=messages,
            ) as stream:
                async for text in stream.text_stream:
                    await websocket.send_json({"type": "token", "text": text})

            await websocket.send_json({"type": "done"})

    except WebSocketDisconnect:
        pass

Обработка прерываний и backpressure

import asyncio

async def stream_with_cancellation(
    messages: list[dict],
    cancel_event: asyncio.Event,
) -> AsyncGenerator[str, None]:
    """Стриминг с поддержкой отмены"""
    async with anthropic_client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=2048,
        messages=messages,
    ) as stream:
        async for text in stream.text_stream:
            if cancel_event.is_set():
                # Клиент отменил запрос — прекращаем
                stream.close()
                yield f"data: {json.dumps({'cancelled': True})}\n\n"
                return
            yield f"data: {json.dumps({'text': text})}\n\n"

Накопление контента и Tool Use со стримингом

# Anthropic позволяет стримить AND получать tool_use блоки
async def stream_with_tools(messages: list[dict]):
    full_text = ""
    tool_calls = []

    async with anthropic_client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=2048,
        tools=TOOLS,
        messages=messages,
    ) as stream:
        async for event in stream:
            if hasattr(event, "type"):
                if event.type == "content_block_delta":
                    if hasattr(event.delta, "text"):
                        full_text += event.delta.text
                        yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
                    elif hasattr(event.delta, "partial_json"):
                        # Накапливаем JSON для tool_use
                        pass

        final_message = await stream.get_final_message()
        # Обрабатываем tool_use из финального сообщения

Сроки

  • FastAPI SSE endpoint: 1–2 дня
  • Frontend React компонент: 1–2 дня
  • WebSocket альтернатива: 2–3 дня
  • Отмена запросов + backpressure: 1–2 дня