Реализация AI-аналитики пользовательского поведения
Классическая веб-аналитика отвечает на вопрос "что происходит". AI-аналитика поведения отвечает на "почему" и "что делать дальше". Разница в подходе: вместо заранее настроенных воронок и дашбордов — автоматическое обнаружение паттернов, аномалий и причинно-следственных связей в потоке событий.
Сбор и обработка событий
import pandas as pd
import numpy as np
from anthropic import Anthropic
from datetime import datetime, timedelta
import json
class UserBehaviorAnalytics:
def __init__(self, events_df: pd.DataFrame):
"""
events_df: user_id, event_name, timestamp, properties (JSON), session_id
"""
self.events = events_df
self.llm = Anthropic()
self._preprocess()
def _preprocess(self):
self.events['timestamp'] = pd.to_datetime(self.events['timestamp'])
self.events = self.events.sort_values(['user_id', 'timestamp'])
# Сессионизация
session_gap = timedelta(minutes=30)
self.events['prev_ts'] = self.events.groupby('user_id')['timestamp'].shift(1)
self.events['is_new_session'] = (
(self.events['timestamp'] - self.events['prev_ts'] > session_gap) |
self.events['prev_ts'].isna()
)
self.events['session_id'] = self.events.groupby('user_id')['is_new_session'].cumsum()
def compute_session_features(self) -> pd.DataFrame:
"""Признаки на уровне сессии"""
agg = self.events.groupby(['user_id', 'session_id']).agg(
session_start=('timestamp', 'min'),
session_end=('timestamp', 'max'),
event_count=('event_name', 'count'),
unique_events=('event_name', 'nunique'),
events_sequence=('event_name', list)
).reset_index()
agg['session_duration_min'] = (
agg['session_end'] - agg['session_start']
).dt.total_seconds() / 60
return agg
Автоматическое обнаружение паттернов
def find_conversion_paths(self, target_event: str, window_days: int = 7) -> dict:
"""Топ путей к конверсионному событию"""
converted_users = self.events[
self.events['event_name'] == target_event
]['user_id'].unique()
paths = []
for user_id in converted_users[:500]: # Ограничение для производительности
user_events = self.events[
self.events['user_id'] == user_id
].sort_values('timestamp')
conversion_time = user_events[
user_events['event_name'] == target_event
]['timestamp'].min()
# События за N дней до конверсии
pre_conversion = user_events[
user_events['timestamp'] <= conversion_time
].tail(10)['event_name'].tolist()
paths.append(' → '.join(pre_conversion))
# Частотный анализ путей
from collections import Counter
path_counts = Counter(paths)
return {
'top_paths': path_counts.most_common(10),
'total_conversions': len(converted_users),
'median_steps': np.median([len(p.split(' → ')) for p in paths])
}
def detect_drop_off_points(self, funnel: list[str]) -> list[dict]:
"""Где теряются пользователи в воронке"""
results = []
users_at_step = None
for i, event in enumerate(funnel):
users_with_event = set(
self.events[self.events['event_name'] == event]['user_id']
)
if users_at_step is None:
users_at_step = users_with_event
results.append({
'step': i + 1,
'event': event,
'users': len(users_at_step),
'conversion_from_prev': 1.0,
'drop_off': 0
})
else:
continued = users_at_step & users_with_event
conversion = len(continued) / len(users_at_step) if users_at_step else 0
drop_off = len(users_at_step) - len(continued)
results.append({
'step': i + 1,
'event': event,
'users': len(continued),
'conversion_from_prev': conversion,
'drop_off': drop_off
})
users_at_step = continued
return results
def detect_behavioral_anomalies(self) -> list[dict]:
"""Обнаружение аномальных паттернов поведения"""
daily_metrics = self.events.groupby(
self.events['timestamp'].dt.date
).agg(
dau=('user_id', 'nunique'),
events_per_user=('event_name', 'count')
)
daily_metrics['events_per_user'] = (
daily_metrics['events_per_user'] / daily_metrics['dau']
)
anomalies = []
# Z-score для обнаружения выбросов
for col in ['dau', 'events_per_user']:
mean = daily_metrics[col].mean()
std = daily_metrics[col].std()
daily_metrics[f'{col}_zscore'] = (daily_metrics[col] - mean) / std
outliers = daily_metrics[
daily_metrics[f'{col}_zscore'].abs() > 2.5
]
for date, row in outliers.iterrows():
anomalies.append({
'date': str(date),
'metric': col,
'value': row[col],
'zscore': row[f'{col}_zscore'],
'direction': 'spike' if row[f'{col}_zscore'] > 0 else 'drop'
})
return sorted(anomalies, key=lambda x: abs(x['zscore']), reverse=True)
AI-интерпретация поведенческих данных
def generate_insights(self, analysis_period_days: int = 30) -> dict:
"""Генерация инсайтов через LLM"""
recent_events = self.events[
self.events['timestamp'] >= datetime.now() - timedelta(days=analysis_period_days)
]
# Топ событий
event_counts = recent_events['event_name'].value_counts().head(15).to_dict()
# Дневная активность
daily_active = recent_events.groupby(
recent_events['timestamp'].dt.date
)['user_id'].nunique()
# Аномалии
anomalies = self.detect_behavioral_anomalies()
# Воронка (если определена)
stats_summary = {
'period_days': analysis_period_days,
'total_users': recent_events['user_id'].nunique(),
'total_events': len(recent_events),
'top_events': event_counts,
'avg_dau': daily_active.mean(),
'dau_trend': 'growing' if daily_active.iloc[-7:].mean() > daily_active.iloc[:7].mean() else 'declining',
'anomalies_detected': len(anomalies),
'top_anomaly': anomalies[0] if anomalies else None
}
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Ты аналитик роста (Growth Analyst). Проанализируй данные о поведении пользователей.
Статистика за {analysis_period_days} дней:
{json.dumps(stats_summary, ensure_ascii=False, indent=2)}
Дай анализ в формате:
1. Ключевые наблюдения (3-4 пункта с числами)
2. Проблемные паттерны (если есть)
3. Рекомендации для роста (2-3 конкретных действия)
Будь конкретным, используй числа из данных."""
}]
)
return {
'insights': response.content[0].text,
'stats': stats_summary,
'anomalies': anomalies[:5]
}
Когортный анализ
def cohort_retention_analysis(self) -> pd.DataFrame:
"""Retention по когортам регистрации"""
# Первое событие = дата регистрации
first_event = self.events.groupby('user_id')['timestamp'].min().reset_index()
first_event.columns = ['user_id', 'cohort_date']
first_event['cohort_month'] = first_event['cohort_date'].dt.to_period('M')
# Объединение с событиями
events_with_cohort = self.events.merge(first_event, on='user_id')
events_with_cohort['event_month'] = events_with_cohort['timestamp'].dt.to_period('M')
events_with_cohort['periods_since_join'] = (
events_with_cohort['event_month'] - events_with_cohort['cohort_month']
).apply(lambda x: x.n)
# Матрица retention
cohort_data = events_with_cohort.groupby(
['cohort_month', 'periods_since_join']
)['user_id'].nunique().reset_index()
cohort_sizes = cohort_data[cohort_data['periods_since_join'] == 0].set_index('cohort_month')['user_id']
retention_matrix = cohort_data.pivot(
index='cohort_month',
columns='periods_since_join',
values='user_id'
).divide(cohort_sizes, axis=0)
return retention_matrix
Система обнаруживает аномалии в течение 1-2 часов после их появления (против 1-2 дней при ручном мониторинге). Автоматическая интерпретация паттернов сокращает время на еженедельный анализ с 4-6 часов до 30-40 минут.







