GNN analysis of social graphs
The social graph is one of the most natural applications of GNNs. Classic tasks include bot and spam detection, link prediction, community detection, and influence analysis. Structural information (who is connected to whom) is often more important than content information for these tasks.
Community Detection and Community Analysis
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GAEConv
from torch_geometric.utils import to_networkx, negative_sampling
import networkx as nx
import numpy as np
import pandas as pd
from community import community_louvain # python-louvain
class SocialGraphAnalyzer:
"""Анализ структуры социального графа"""
def build_graph_from_edges(self, edges: pd.DataFrame,
node_features: pd.DataFrame = None) -> tuple:
"""
edges: source_id, target_id, weight (optional)
node_features: node_id, feature_1, ..., feature_n
"""
# Маппинг строковых ID в числовые индексы
all_nodes = pd.unique(edges[['source_id', 'target_id']].values.ravel())
node_idx = {nid: i for i, nid in enumerate(all_nodes)}
n_nodes = len(node_idx)
src = edges['source_id'].map(node_idx).values
dst = edges['target_id'].map(node_idx).values
# Ненаправленный граф: добавляем обратные рёбра
edge_index = torch.tensor([
np.concatenate([src, dst]),
np.concatenate([dst, src])
], dtype=torch.long)
# Признаки узлов
if node_features is not None:
feat_matrix = node_features.set_index('node_id').reindex(all_nodes).fillna(0).values
x = torch.tensor(feat_matrix, dtype=torch.float)
else:
# Degree как базовый признак
degrees = np.bincount(src, minlength=n_nodes) + np.bincount(dst, minlength=n_nodes)
x = torch.tensor(degrees.reshape(-1, 1), dtype=torch.float)
return edge_index, x, node_idx
def detect_communities_louvain(self, edge_index: torch.Tensor,
n_nodes: int) -> dict:
"""
Алгоритм Лувена для обнаружения сообществ.
Оптимизирует modularity — меру качества разбиения.
"""
# Конвертируем в NetworkX
G = nx.Graph()
G.add_nodes_from(range(n_nodes))
edges = edge_index.T.numpy()
G.add_edges_from(edges)
# Алгоритм Лувена
partition = community_louvain.best_partition(G)
# Modularity quality
modularity = community_louvain.modularity(partition, G)
community_sizes = pd.Series(partition).value_counts().sort_values(ascending=False)
return {
'node_to_community': partition,
'n_communities': len(set(partition.values())),
'modularity': round(modularity, 4),
'largest_community_size': int(community_sizes.iloc[0]),
'community_size_distribution': community_sizes.head(10).to_dict()
}
def compute_node_centrality(self, G: nx.Graph,
top_k: int = 20) -> pd.DataFrame:
"""Метрики центральности узлов"""
# Degree centrality
degree_centrality = nx.degree_centrality(G)
# Betweenness (для небольших графов; для больших — approximation)
if G.number_of_nodes() < 5000:
betweenness = nx.betweenness_centrality(G, normalized=True)
else:
betweenness = nx.betweenness_centrality(G, k=500, normalized=True) # Аппроксимация
# PageRank
pagerank = nx.pagerank(G, alpha=0.85, max_iter=100)
df = pd.DataFrame({
'degree_centrality': degree_centrality,
'betweenness': betweenness,
'pagerank': pagerank,
})
# Нормализованный composite score
df_norm = (df - df.min()) / (df.max() - df.min() + 1e-9)
df['influence_score'] = (
df_norm['degree_centrality'] * 0.30 +
df_norm['betweenness'] * 0.35 +
df_norm['pagerank'] * 0.35
)
return df.nlargest(top_k, 'influence_score')
class BotDetectorGNN(nn.Module):
"""GNN для детекции ботов в социальных сетях"""
def __init__(self, node_features: int, hidden_dim: int = 64):
super().__init__()
# GAT лучше GCN для этой задачи:
# боты часто связаны аномально — attention выявляет это
from torch_geometric.nn import GATConv
self.conv1 = GATConv(node_features, hidden_dim, heads=4, dropout=0.3)
self.conv2 = GATConv(hidden_dim * 4, hidden_dim, heads=1, dropout=0.3)
self.conv3 = GATConv(hidden_dim, 32, heads=1, dropout=0.3)
self.classifier = nn.Sequential(
nn.Linear(32, 16),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(16, 2) # Human vs Bot
)
def forward(self, x, edge_index):
x = F.elu(self.conv1(x, edge_index))
x = F.elu(self.conv2(x, edge_index))
x = self.conv3(x, edge_index)
return self.classifier(x)
def get_bot_probability(self, x: torch.Tensor,
edge_index: torch.Tensor) -> np.ndarray:
self.eval()
with torch.no_grad():
logits = self.forward(x, edge_index)
probs = torch.softmax(logits, dim=-1)[:, 1]
return probs.cpu().numpy()
class LinkPredictor(nn.Module):
"""
Link prediction: предсказываем появление новых связей.
Применения: «Кого вы можете знать?», рекомендации партнёров, fraud rings.
"""
def __init__(self, node_features: int, hidden_dim: int = 64):
super().__init__()
self.encoder = nn.ModuleList([
GCNConv(node_features, hidden_dim),
GCNConv(hidden_dim, hidden_dim // 2),
])
# Декодер: из эмбеддингов двух узлов предсказываем связь
self.decoder = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def encode(self, x, edge_index):
for conv in self.encoder:
x = F.relu(conv(x, edge_index))
return x
def decode(self, z, edge_index):
"""Произведение эмбеддингов пар узлов"""
src_emb = z[edge_index[0]]
dst_emb = z[edge_index[1]]
return self.decoder(src_emb * dst_emb).squeeze()
def forward(self, x, edge_index, pos_edge, neg_edge=None):
z = self.encode(x, edge_index)
pos_scores = self.decode(z, pos_edge)
if neg_edge is not None:
neg_scores = self.decode(z, neg_edge)
return pos_scores, neg_scores
return pos_scores
def predict_new_links(self, z: torch.Tensor,
candidate_pairs: torch.Tensor,
threshold: float = 0.7) -> list:
"""Предсказание новых связей из кандидатных пар"""
with torch.no_grad():
scores = self.decode(z, candidate_pairs)
predicted = []
for i, score in enumerate(scores):
if float(score) >= threshold:
predicted.append({
'node_a': int(candidate_pairs[0, i]),
'node_b': int(candidate_pairs[1, i]),
'probability': round(float(score), 3)
})
return sorted(predicted, key=lambda x: -x['probability'])
Detection of fraudulent rings
class FraudRingDetector:
"""Обнаружение организованного мошенничества через анализ подграфов"""
def __init__(self, gnn_model: BotDetectorGNN):
self.model = gnn_model
def find_suspicious_clusters(self, graph_data,
bot_probs: np.ndarray,
min_cluster_bot_ratio: float = 0.6,
min_cluster_size: int = 5) -> list[dict]:
"""
Ищем плотно связанные подграфы с высокой долей ботов.
Признак fraud ring: взаимосвязанная группа аккаунтов.
"""
G = to_networkx(graph_data, to_undirected=True)
# Добавляем вероятности ботов как атрибуты узлов
for node_id in G.nodes():
G.nodes[node_id]['bot_prob'] = float(bot_probs[node_id])
suspicious_clusters = []
# Находим клики и плотные подграфы
for component in nx.connected_components(G):
if len(component) < min_cluster_size:
continue
subgraph = G.subgraph(component)
nodes = list(component)
bot_ratio = np.mean([G.nodes[n]['bot_prob'] for n in nodes])
if bot_ratio < min_cluster_bot_ratio:
continue
# Метрики плотности кластера
density = nx.density(subgraph)
avg_clustering = nx.average_clustering(subgraph)
suspicious_clusters.append({
'cluster_id': len(suspicious_clusters),
'nodes': nodes,
'size': len(nodes),
'bot_probability': round(float(bot_ratio), 3),
'density': round(density, 3),
'avg_clustering': round(avg_clustering, 3),
'risk_score': round(bot_ratio * density * avg_clustering, 3)
})
return sorted(suspicious_clusters, key=lambda x: -x['risk_score'])
GNN for Twitter/Telegram bot detection: AUC 0.90-0.94 (TwiBot-22 dataset). Link prediction: Hits@50 around 0.65-0.75 on OGB-Collab. Key advantage over feature-based methods: GNN captures collusive patterns (coordinated inauthentic behavior) through structure that bots cannot hide.







