Індексація поштових повідомлень для RAG
Поштова кореспонденція як джерело знань для RAG є нетривіальною задачею: повідомлення містять цитовані частини попередніх повідомлень, підписи, автоматичні сповіщення та спам, які потрібно відфільтрувати. Однак саме в електронній пошті часто міститься унікальне експертне знання, недоступне в офіційній документації.
Підключення до поштових серверів
import imaplib
import email
from email.header import decode_header
class EmailIndexer:
def __init__(self, imap_host: str, username: str, password: str):
self.mail = imaplib.IMAP4_SSL(imap_host)
self.mail.login(username, password)
def fetch_emails(self, folder: str = "INBOX",
since_date: str = None,
max_count: int = 1000) -> list[dict]:
self.mail.select(folder)
# Пошук повідомлень
search_criteria = []
if since_date:
search_criteria.append(f'SINCE {since_date}')
criteria = ' '.join(search_criteria) if search_criteria else 'ALL'
_, message_ids = self.mail.search(None, criteria)
emails = []
ids = message_ids[0].split()[-max_count:] # Останні N повідомлень
for msg_id in ids:
_, msg_data = self.mail.fetch(msg_id, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
parsed = self._parse_email(msg)
if parsed:
emails.append(parsed)
return emails
def _parse_email(self, msg: email.message.Message) -> dict | None:
# Декодування заголовків
subject = self._decode_header(msg.get('Subject', ''))
sender = msg.get('From', '')
date = msg.get('Date', '')
# Вилучення тіла повідомлення
body = self._extract_body(msg)
if not body or len(body.split()) < 20:
return None # Занадто коротке повідомлення
# Очищення від цитат та підписів
clean_body = self._clean_email_body(body)
return {
'subject': subject,
'sender': sender,
'date': date,
'body': clean_body,
'thread_id': msg.get('Message-ID', ''),
'in_reply_to': msg.get('In-Reply-To', ''),
}
def _clean_email_body(self, body: str) -> str:
"""Видалення цитованого тексту, підписів, автовідповідей"""
lines = body.split('\n')
clean_lines = []
for line in lines:
# Пропуск цитованих рядків (починаються з >)
if line.strip().startswith('>'):
continue
# Пропуск стандартних розділювачів цитат
if re.match(r'^On .* wrote:$', line.strip()):
break # Все, що після цього — цитата
if line.strip().startswith('From:') and len(clean_lines) > 10:
break
clean_lines.append(line)
text = '\n'.join(clean_lines).strip()
# Видалення типових підписів
signature_markers = [
'Best regards,', 'Best,', 'Thanks,', 'Regards,',
'С уважением,', 'Спасибо,'
]
for marker in signature_markers:
if marker in text:
idx = text.rfind(marker)
# Якщо маркер наприкінці — це підпис
if len(text) - idx < 200:
text = text[:idx].strip()
break
return text
Фільтрація нерелевантних повідомлень
class EmailRelevanceFilter:
IGNORE_SENDERS = [
'noreply@', 'no-reply@', 'donotreply@',
'newsletter@', 'notifications@', 'alerts@'
]
IGNORE_SUBJECT_PATTERNS = [
r'^(Re: )?Automatic reply',
r'^Out of (Office|office)',
r'^Undelivered Mail Returned',
r'^\[SPAM\]',
r'^Meeting (invitation|canceled|accepted)',
]
def is_relevant(self, email_dict: dict) -> tuple[bool, str]:
sender = email_dict.get('sender', '').lower()
subject = email_dict.get('subject', '')
# Автоматичні повідомлення
for ignore in self.IGNORE_SENDERS:
if ignore in sender:
return False, f"Auto-sender: {ignore}"
# Системні сповіщення
for pattern in self.IGNORE_SUBJECT_PATTERNS:
if re.search(pattern, subject, re.IGNORECASE):
return False, f"System notification: {pattern}"
# Занадто коротке тіло
if len(email_dict.get('body', '').split()) < 30:
return False, "Body too short"
return True, "relevant"
Реконструкція потоків
def reconstruct_threads(emails: list[dict]) -> list[dict]:
"""Групування повідомлень у потоки для кращого контексту"""
threads = {}
for email in emails:
thread_id = email.get('in_reply_to') or email.get('thread_id')
if thread_id not in threads:
threads[thread_id] = []
threads[thread_id].append(email)
# Створення документів-потоків
thread_docs = []
for thread_id, thread_emails in threads.items():
# Сортування за датою
sorted_emails = sorted(thread_emails, key=lambda e: e.get('date', ''))
thread_text = '\n\n---\n\n'.join([
f"**From:** {e['sender']}\n**Date:** {e['date']}\n\n{e['body']}"
for e in sorted_emails
])
thread_docs.append({
'thread_id': thread_id,
'subject': sorted_emails[0]['subject'],
'text': thread_text,
'participants': list(set(e['sender'] for e in sorted_emails)),
'date_range': (sorted_emails[0]['date'], sorted_emails[-1]['date'])
})
return thread_docs
При індексації поштової кореспонденції важливо дотримуватися GDPR та корпоративної політики: індексувати лише робочу кореспонденцію, виключати особисті повідомлення та забезпечувати право користувачів на видалення їхніх даних за запитом.







