Інтеграція AI-співробітників з IP-телефонією Розгортання AI-агентів у корпоративну телефонну інфраструктуру - завдання на стику VoIP-інженерії та AI-розробки. SIP-стек, кодеки, jitter-буфери, DTMF-сигнали, запис розмов, черги ACD - все це має працювати разом з LLM-агентом, не додаючи затримки, що сприймається вище 800ms. ### Архітектура інтеграції```
IP PBX (Asterisk/FreePBX/3CX)
↓ SIP INVITE
[Media Gateway / SBC]
↓ RTP Audio
[WebSocket Bridge] ←→ [STT Engine (Deepgram/Whisper)]
↓
[AI Agent (LLM + Tools)]
↓
[TTS Engine (ElevenLabs/Azure)]
↓ RTP Audio
[Media Gateway]
↓ SIP
[Caller]
### SIP + WebSocket медіа-містpython
import asyncio
import audioop
import wave
from typing import Optional
import websockets
class SIPAudioBridge: """ Мост между SIP RTP потоком и WebSocket для AI-агента. Принимает PCMU/PCMA аудио от Asterisk, передаёт PCM 16kHz в STT. """
SAMPLE_RATE_RTP = 8000 # Стандарт телефонии
SAMPLE_RATE_AI = 16000 # Deepgram Nova-2 требует 16kHz
CODEC_PCMU = 0 # G.711 μ-law
CODEC_PCMA = 8 # G.711 A-law
def ulaw_to_pcm(self, ulaw_data: bytes) -> bytes:
"""Декодирование G.711 μ-law (PCMU) → PCM"""
return audioop.ulaw2lin(ulaw_data, 2) # 2 = 16-bit
def alaw_to_pcm(self, alaw_data: bytes) -> bytes:
"""Декодирование G.711 A-law (PCMA) → PCM"""
return audioop.alaw2lin(alaw_data, 2)
def resample_8k_to_16k(self, pcm_8k: bytes) -> bytes:
"""Апсемплинг 8kHz → 16kHz для моделей STT"""
resampled, _ = audioop.ratecv(
pcm_8k,
2, # 16-bit
1, # mono
self.SAMPLE_RATE_RTP,
self.SAMPLE_RATE_AI,
None
)
return resampled
async def stream_rtp_to_stt(self, rtp_socket,
stt_websocket,
codec: int = CODEC_PCMU,
chunk_ms: int = 20):
"""
Потоковая передача RTP аудио → STT WebSocket.
chunk_ms: размер чанка в миллисекундах (стандарт 20ms)
"""
rtp_chunk_size = int(self.SAMPLE_RATE_RTP * 2 * chunk_ms / 1000)
while True:
rtp_data, addr = rtp_socket.recvfrom(4096)
# RTP header — первые 12 байт
rtp_payload = rtp_data[12:]
if codec == self.CODEC_PCMU:
pcm = self.ulaw_to_pcm(rtp_payload)
elif codec == self.CODEC_PCMA:
pcm = self.alaw_to_pcm(rtp_payload)
else:
continue
pcm_16k = self.resample_8k_to_16k(pcm)
await stt_websocket.send(pcm_16k)
class AsteriskAMIConnector: """ Интеграция с Asterisk через Asterisk Manager Interface (AMI). AMI позволяет управлять звонками: originate, hangup, transfer, monitor. """
def __init__(self, host: str, port: int,
username: str, password: str):
self.host = host
self.port = port
self.username = username
self.password = password
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
# Пропускаем приветствие
await self.reader.readline()
# Аутентификация
await self._send_action({
"Action": "Login",
"Username": self.username,
"Secret": self.password,
})
async def _send_action(self, action: dict) -> dict:
"""Отправка AMI action и получение ответа"""
message = "\r\n".join(
f"{k}: {v}" for k, v in action.items()
) + "\r\n\r\n"
self.writer.write(message.encode())
await self.writer.drain()
response = {}
while True:
line = (await self.reader.readline()).decode().strip()
if not line:
break
if ": " in line:
key, value = line.split(": ", 1)
response[key] = value
return response
async def originate_call_to_ai(self, phone_number: str,
ai_context: str,
caller_id: str = "AI Agent") -> dict:
"""
Инициирование звонка с AI-агентом через Asterisk.
Соединяет абонента с extension, обрабатываемым AI-агентом.
"""
return await self._send_action({
"Action": "Originate",
"Channel": f"SIP/trunk/{phone_number}",
"Context": "ai-agent",
"Exten": "s",
"Priority": 1,
"CallerID": caller_id,
"Variable": f"AI_CONTEXT={ai_context}",
"Async": "true",
"Timeout": 30000,
})
async def transfer_to_queue(self, channel: str,
queue_name: str,
agent_context: str) -> dict:
"""Перевод звонка в очередь живых операторов"""
return await self._send_action({
"Action": "Redirect",
"Channel": channel,
"Exten": queue_name,
"Context": "queues",
"Priority": 1,
"ExtraChannel": "",
})
class CallRecordingManager: """Управление записями разговоров AI-агентов"""
def __init__(self, storage_backend: str = "s3"):
self.storage = storage_backend
def generate_dial_plan_snippet(self, ai_extension: str = "7000") -> str:
"""
Фрагмент диалплана Asterisk для маршрутизации на AI-агента.
"""
return f"""
; extensions.conf — AI Agent routing [ai-agent] exten => s,1,NoOp(AI Agent handling call) same => n,Set(CALL_ID=${{UNIQUEID}}) same => n,Set(CALLER_NUM=${{CALLERID(num)}}) same => n,Record(/var/spool/asterisk/recordings/${{CALL_ID}}.wav,0,300) same => n,AGI(ai-agent-bridge.py,${{CALL_ID}},${{CALLER_NUM}}) same => n,GotoIf(${{AGENT_ESCALATE}}?escalate) same => n,Hangup() same => n(escalate),Queue(support-agents,t,,,300)
[inbound] exten => _+7XXXXXXXXXX,1,Goto(ai-agent,s,1) """
def save_call_metadata(self, call_id: str,
transcript: list[dict],
metadata: dict) -> dict:
"""Сохранение транскрипта и метаданных звонка"""
record = {
"call_id": call_id,
"timestamp": metadata.get("start_time"),
"duration_seconds": metadata.get("duration"),
"caller_number": metadata.get("caller"),
"resolution": metadata.get("resolution"),
"escalated": metadata.get("escalated", False),
"transcript": transcript,
"tool_calls_made": metadata.get("tools_used", []),
}
# В production: сохранение в S3 + запись в PostgreSQL
return {"status": "saved", "record_id": f"CALL-{call_id}"}
|-----------|----------------------|------------| | SBC/Media Gateway | Kamailio/FreeSWITCH | Kamailio + Rtpengine | | IP PBX Asterisk 20+/3CX | FreePBX Enterprise | | STT latency | <300ms | Deepgram Nova-2 self-hosted | | TTS latency | < 200ms | Azure TTS / ElevenLabs | | Пропускна спроможність | 100kbps/дзвінок G.711 | 80kbps G.729 з транскодом | | Запис розмов | локально | S3 + WORM політика | Повна інтеграція AI-агента в корпоративну телефонію: 10-16 тижнів з урахуванням тестування під навантаженням та налаштування SLA-моніторингу.







