Індексація чат-історії для RAG

Проектуємо та впроваджуємо системи штучного інтелекту: від прототипу до production-ready рішення. Наша команда поєднує експертизу в машинному навчанні, дата-інжинірингу та MLOps, щоб AI працював не в лабораторії, а в реальному бізнесі.
Показано 1 з 1Усі 1566 послуг
Індексація чат-історії для RAG
Середній
від 1 тижня до 3 місяців
Часті запитання

Напрямки AI-розробки

Етапи розробки AI-рішення

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    901
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1119
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    853

Індексація історії чатів для RAG

Індексація історії чатів (Slack, Teams, Telegram, Discord) дозволяє RAG-системам відповідати на запитання, використовуючи накопилене неформальне знання команди — вирішення проблем, обговорення архітектури, відповіді експертів. Основні виклики: неструктурованість чатів, розмовний контекст та конфіденційність.

Інтеграція Slack

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

class SlackIndexer:
    def __init__(self, token: str):
        self.client = WebClient(token=token)

    def get_messages(self, channel_id: str,
                     oldest: float = None,
                     limit: int = 1000) -> list[dict]:
        messages = []
        cursor = None

        while True:
            params = {
                'channel': channel_id,
                'limit': 200,
                'oldest': oldest
            }
            if cursor:
                params['cursor'] = cursor

            result = self.client.conversations_history(**params)
            messages.extend(result['messages'])

            if not result.get('has_more') or len(messages) >= limit:
                break
            cursor = result['response_metadata']['next_cursor']

        return messages

    def reconstruct_thread(self, channel_id: str,
                           thread_ts: str) -> list[dict]:
        """Завантаження повного потоку"""
        result = self.client.conversations_replies(
            channel=channel_id,
            ts=thread_ts
        )
        return result['messages']

    def messages_to_document(self, messages: list[dict],
                              channel_name: str) -> dict:
        """Конвертація набору повідомлень у індексований документ"""
        # Фільтрування службових повідомлень
        relevant = [
            m for m in messages
            if m.get('type') == 'message'
            and not m.get('subtype')  # Видаляємо channel_join, bot_message тощо
            and len(m.get('text', '')) > 20
        ]

        if not relevant:
            return None

        # Групування в сесії (повідомлення в межах 1 години)
        sessions = self._group_into_sessions(relevant, gap_hours=1)
        documents = []

        for session in sessions:
            text = '\n'.join([
                f"[{self._get_username(m['user'])}]: {m['text']}"
                for m in session
                if m.get('user')
            ])

            # Розв'язання посилань на користувачів та канали
            text = self._resolve_mentions(text)

            documents.append({
                'text': text,
                'channel': channel_name,
                'timestamp_start': session[0]['ts'],
                'timestamp_end': session[-1]['ts'],
                'participants': list(set(m.get('user') for m in session if m.get('user'))),
                'message_count': len(session)
            })

        return documents

Розумна стратегія чанкування для чатів

class ChatChunker:
    def chunk_by_topic(self, messages: list[dict],
                        similarity_threshold: float = 0.6) -> list[list]:
        """Розбиття на тематичні групи, а не за фіксованим розміром"""
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer('all-MiniLM-L6-v2')

        texts = [m.get('text', '') for m in messages]
        embeddings = model.encode(texts)

        # Розбиття там, де тема різко змінюється
        chunks = [[messages[0]]]
        for i in range(1, len(messages)):
            sim = np.dot(embeddings[i], embeddings[i-1]) / (
                np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i-1])
            )
            if sim < similarity_threshold:
                chunks.append([])
            chunks[-1].append(messages[i])

        return chunks

Анонімізація перед індексацією

class ChatAnonymizer:
    def anonymize(self, text: str, user_mapping: dict) -> str:
        """Заміна імен користувачів на анонімні ID"""
        for real_name, anon_id in user_mapping.items():
            text = text.replace(f"@{real_name}", f"@user_{anon_id}")
            text = text.replace(real_name, f"[User {anon_id}]")
        return text

Для корпоративного Slack індексація повинна: виключати приватні повідомлення (DM), дотримуватися політик збереження (повідомлення старші за N днів видаляються) та надавати можливість виключення конкретних каналів або користувачів за їхнім запитом.