Development of wash trading detection model
Wash trading — artificial creation of trading volume through simultaneous buying and selling of one asset by one person or group of coordinated parties. According to Chainalysis, in 2023 over 50% of trading volume on several NFT marketplaces was wash trading. On centralized exchanges, the scale is even larger. The detection model should work with on-chain data, identify patterns, and deliver interpretable results.
Types of wash trading in Web3
Understanding variations determines the choice of model features:
Self-trading: the same wallet buys and sells to itself or through a chain of affiliated addresses.
Circular trading: A sells to B, B sells to C, C sells to A. The asset returns to the original owner.
Layered wash trading: complex chains through 5-10 addresses to hide relationships. Used to pump NFTs before selling to a real buyer at an inflated price.
Airdrop farming: wash trading for accumulating trading volume for future airdrops. This was widespread on Blur in 2023.
Fee rebate abuse: obtaining rebates from exchange through artificial volume.
Graph analysis: the primary tool
The key detection method is building a transaction graph and searching for cycles and clusters:
Building a transaction graph
import networkx as nx
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Set, Tuple
import pandas as pd
@dataclass
class Transfer:
tx_hash: str
from_address: str
to_address: str
token_id: int # for NFT
price: float
timestamp: int
block_number: int
def build_transaction_graph(transfers: List[Transfer]) -> nx.DiGraph:
"""
Build directed graph: nodes = addresses, edges = transfers.
Edge weight = trading volume between addresses.
"""
G = nx.DiGraph()
for t in transfers:
if G.has_edge(t.from_address, t.to_address):
G[t.from_address][t.to_address]['volume'] += t.price
G[t.from_address][t.to_address]['count'] += 1
G[t.from_address][t.to_address]['txs'].append(t.tx_hash)
else:
G.add_edge(
t.from_address,
t.to_address,
volume=t.price,
count=1,
txs=[t.tx_hash]
)
return G
def detect_cycles(G: nx.DiGraph, max_length: int = 6) -> List[List[str]]:
"""
Find cycles in the graph — sign of wash trading.
max_length limits search depth for performance.
"""
cycles = []
# simple_cycles from NetworkX — Johnson's algorithm, O((n+e)(c+1))
for cycle in nx.simple_cycles(G):
if len(cycle) <= max_length:
cycles.append(cycle)
return cycles
Clustering affiliated addresses
Addresses from one cluster (managed by one person) can be identified through:
- Same funding source (received ETH from one address)
- Patterns of activity synchronization over time
- Common gas price strategies
def cluster_addresses(
addresses: List[str],
funding_map: Dict[str, str], # address -> funding source
time_correlations: Dict[Tuple[str, str], float]
) -> List[Set[str]]:
"""
Union-Find to group affiliated addresses into clusters.
"""
parent = {addr: addr for addr in addresses}
def find(x):
if parent[x] != x:
parent[x] = find(parent[x])
return parent[x]
def union(x, y):
parent[find(x)] = find(y)
# Merge addresses with one funding source
funding_groups = defaultdict(list)
for addr, source in funding_map.items():
funding_groups[source].append(addr)
for source, addrs in funding_groups.items():
for i in range(1, len(addrs)):
union(addrs[0], addrs[i])
# Merge by high activity correlation
CORRELATION_THRESHOLD = 0.85
for (addr1, addr2), corr in time_correlations.items():
if corr >= CORRELATION_THRESHOLD:
union(addr1, addr2)
# Collect clusters
clusters = defaultdict(set)
for addr in addresses:
clusters[find(addr)].add(addr)
return [cluster for cluster in clusters.values() if len(cluster) > 1]
Features for ML model
Beyond graph analysis, build feature vectors for each trade pair or address:
Temporal features
def compute_temporal_features(
trades: pd.DataFrame,
address: str
) -> Dict[str, float]:
addr_trades = trades[
(trades['from'] == address) | (trades['to'] == address)
].sort_values('timestamp')
features = {}
# Average interval between trades (in seconds)
if len(addr_trades) > 1:
intervals = addr_trades['timestamp'].diff().dropna()
features['mean_trade_interval'] = intervals.mean()
features['std_trade_interval'] = intervals.std()
# Small STD = suspiciously regular intervals
features['regularity_score'] = 1 / (1 + features['std_trade_interval'])
else:
features['mean_trade_interval'] = 0
features['std_trade_interval'] = 0
features['regularity_score'] = 0
# Fraction of trades during off-hours (2-6 AM UTC)
addr_trades['hour'] = pd.to_datetime(
addr_trades['timestamp'], unit='s'
).dt.hour
off_hours = addr_trades[addr_trades['hour'].between(2, 6)]
features['off_hours_ratio'] = len(off_hours) / max(len(addr_trades), 1)
return features
Economic features
def compute_economic_features(
trades: pd.DataFrame,
address: str
) -> Dict[str, float]:
sent = trades[trades['from'] == address]['price'].sum()
received = trades[trades['to'] == address]['price'].sum()
features = {}
# Net P&L: wash trader typically has P&L close to 0 (only gas)
features['net_pnl'] = received - sent
features['total_volume'] = sent + received
features['pnl_to_volume_ratio'] = abs(features['net_pnl']) / max(features['total_volume'], 1)
# Low value = suspicious (no real profit/loss)
# Number of unique counterparties
counterparts = set(trades[trades['from'] == address]['to'].tolist() +
trades[trades['to'] == address]['from'].tolist())
features['unique_counterparts'] = len(counterparts)
# Volume concentration with one counterparty
if len(counterparts) > 0:
volumes_by_counterpart = trades.groupby('to')['price'].sum()
max_concentration = volumes_by_counterpart.max() / max(sent, 1)
features['max_counterpart_concentration'] = max_concentration
return features
NFT-specific features
def compute_nft_features(
trades: pd.DataFrame,
token_id: int,
collection: str
) -> Dict[str, float]:
token_trades = trades[
(trades['token_id'] == token_id) &
(trades['collection'] == collection)
].sort_values('timestamp')
features = {}
# Number of ownership changes
features['ownership_changes'] = len(token_trades)
# Return to previous owners
owners_seen = set()
revisits = 0
for _, row in token_trades.iterrows():
if row['to'] in owners_seen:
revisits += 1
owners_seen.add(row['to'])
features['ownership_revisit_rate'] = revisits / max(len(token_trades), 1)
# Price growth vs market benchmark
if len(token_trades) >= 2:
price_growth = (token_trades.iloc[-1]['price'] /
token_trades.iloc[0]['price'] - 1)
features['price_growth'] = price_growth
# Anomalous price growth with high volume = suspicious
else:
features['price_growth'] = 0
return features
Classification model
Collect features and train Gradient Boosting:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve, roc_auc_score
import shap
def train_wash_trading_model(
features_df: pd.DataFrame,
labels: pd.Series # 1 = wash trading, 0 = legitimate
):
X_train, X_test, y_train, y_test = train_test_split(
features_df, labels, test_size=0.2, stratify=labels
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=5,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
model.fit(X_train_scaled, y_train)
# SHAP for interpretability
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_scaled)
# Precision-Recall more important than ROC for imbalanced data
y_proba = model.predict_proba(X_test_scaled)[:, 1]
auc = roc_auc_score(y_test, y_proba)
print(f"ROC-AUC: {auc:.3f}")
return model, scaler, explainer
Confidence assessment and interpretation
The model outputs not a binary result, but a score with explanation:
@dataclass
class WashTradingAssessment:
address: str
wash_probability: float # 0.0 - 1.0
risk_level: str # LOW / MEDIUM / HIGH / CRITICAL
contributing_factors: List[str] # SHAP-based explanation
flagged_transactions: List[str] # specific suspicious txs
def assess_address(
address: str,
model,
scaler,
explainer,
features: Dict
) -> WashTradingAssessment:
X = pd.DataFrame([features])
X_scaled = scaler.transform(X)
probability = model.predict_proba(X_scaled)[0][1]
if probability < 0.3:
risk_level = "LOW"
elif probability < 0.6:
risk_level = "MEDIUM"
elif probability < 0.85:
risk_level = "HIGH"
else:
risk_level = "CRITICAL"
# SHAP explanation — which features had the most impact
shap_vals = explainer.shap_values(X_scaled)[0]
top_factors = sorted(
zip(X.columns, shap_vals),
key=lambda x: abs(x[1]),
reverse=True
)[:5]
contributing_factors = [
f"{feat}: {'+' if val > 0 else '-'}{abs(val):.3f}"
for feat, val in top_factors
]
return WashTradingAssessment(
address=address,
wash_probability=probability,
risk_level=risk_level,
contributing_factors=contributing_factors,
flagged_transactions=[]
)
Data sources
| Source | Data | Updates |
|---|---|---|
| The Graph | On-chain events DEX/NFT | Real-time |
| Dune Analytics | Historical data, SQL access | Minutes |
| Transpose | Transaction graph data | Real-time API |
| Flipside Crypto | On-chain analytics | Daily |
| Native indexer | Custom events | Real-time |
For production model on DEX — a custom indexer via WebSocket RPC provides lowest latency and full data control. Dune Analytics is good for development and research but too slow for real-time monitoring.
The model is not an absolute detector. High score — signal for manual analysis, not automatic ban. Especially important for NFT marketplaces where market-makers and automated traders can produce false positives.







