Wash trading detection model development

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Wash trading detection model development
Complex
from 2 weeks to 3 months
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1214
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Development of wash trading detection model

Wash trading — artificial creation of trading volume through simultaneous buying and selling of one asset by one person or group of coordinated parties. According to Chainalysis, in 2023 over 50% of trading volume on several NFT marketplaces was wash trading. On centralized exchanges, the scale is even larger. The detection model should work with on-chain data, identify patterns, and deliver interpretable results.

Types of wash trading in Web3

Understanding variations determines the choice of model features:

Self-trading: the same wallet buys and sells to itself or through a chain of affiliated addresses.

Circular trading: A sells to B, B sells to C, C sells to A. The asset returns to the original owner.

Layered wash trading: complex chains through 5-10 addresses to hide relationships. Used to pump NFTs before selling to a real buyer at an inflated price.

Airdrop farming: wash trading for accumulating trading volume for future airdrops. This was widespread on Blur in 2023.

Fee rebate abuse: obtaining rebates from exchange through artificial volume.

Graph analysis: the primary tool

The key detection method is building a transaction graph and searching for cycles and clusters:

Building a transaction graph

import networkx as nx
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Set, Tuple
import pandas as pd

@dataclass
class Transfer:
    tx_hash: str
    from_address: str
    to_address: str
    token_id: int  # for NFT
    price: float
    timestamp: int
    block_number: int

def build_transaction_graph(transfers: List[Transfer]) -> nx.DiGraph:
    """
    Build directed graph: nodes = addresses, edges = transfers.
    Edge weight = trading volume between addresses.
    """
    G = nx.DiGraph()

    for t in transfers:
        if G.has_edge(t.from_address, t.to_address):
            G[t.from_address][t.to_address]['volume'] += t.price
            G[t.from_address][t.to_address]['count'] += 1
            G[t.from_address][t.to_address]['txs'].append(t.tx_hash)
        else:
            G.add_edge(
                t.from_address,
                t.to_address,
                volume=t.price,
                count=1,
                txs=[t.tx_hash]
            )

    return G

def detect_cycles(G: nx.DiGraph, max_length: int = 6) -> List[List[str]]:
    """
    Find cycles in the graph — sign of wash trading.
    max_length limits search depth for performance.
    """
    cycles = []

    # simple_cycles from NetworkX — Johnson's algorithm, O((n+e)(c+1))
    for cycle in nx.simple_cycles(G):
        if len(cycle) <= max_length:
            cycles.append(cycle)

    return cycles

Clustering affiliated addresses

Addresses from one cluster (managed by one person) can be identified through:

  • Same funding source (received ETH from one address)
  • Patterns of activity synchronization over time
  • Common gas price strategies
def cluster_addresses(
    addresses: List[str],
    funding_map: Dict[str, str],  # address -> funding source
    time_correlations: Dict[Tuple[str, str], float]
) -> List[Set[str]]:
    """
    Union-Find to group affiliated addresses into clusters.
    """
    parent = {addr: addr for addr in addresses}

    def find(x):
        if parent[x] != x:
            parent[x] = find(parent[x])
        return parent[x]

    def union(x, y):
        parent[find(x)] = find(y)

    # Merge addresses with one funding source
    funding_groups = defaultdict(list)
    for addr, source in funding_map.items():
        funding_groups[source].append(addr)

    for source, addrs in funding_groups.items():
        for i in range(1, len(addrs)):
            union(addrs[0], addrs[i])

    # Merge by high activity correlation
    CORRELATION_THRESHOLD = 0.85
    for (addr1, addr2), corr in time_correlations.items():
        if corr >= CORRELATION_THRESHOLD:
            union(addr1, addr2)

    # Collect clusters
    clusters = defaultdict(set)
    for addr in addresses:
        clusters[find(addr)].add(addr)

    return [cluster for cluster in clusters.values() if len(cluster) > 1]

Features for ML model

Beyond graph analysis, build feature vectors for each trade pair or address:

Temporal features

def compute_temporal_features(
    trades: pd.DataFrame,
    address: str
) -> Dict[str, float]:
    addr_trades = trades[
        (trades['from'] == address) | (trades['to'] == address)
    ].sort_values('timestamp')

    features = {}

    # Average interval between trades (in seconds)
    if len(addr_trades) > 1:
        intervals = addr_trades['timestamp'].diff().dropna()
        features['mean_trade_interval'] = intervals.mean()
        features['std_trade_interval'] = intervals.std()
        # Small STD = suspiciously regular intervals
        features['regularity_score'] = 1 / (1 + features['std_trade_interval'])
    else:
        features['mean_trade_interval'] = 0
        features['std_trade_interval'] = 0
        features['regularity_score'] = 0

    # Fraction of trades during off-hours (2-6 AM UTC)
    addr_trades['hour'] = pd.to_datetime(
        addr_trades['timestamp'], unit='s'
    ).dt.hour
    off_hours = addr_trades[addr_trades['hour'].between(2, 6)]
    features['off_hours_ratio'] = len(off_hours) / max(len(addr_trades), 1)

    return features

Economic features

def compute_economic_features(
    trades: pd.DataFrame,
    address: str
) -> Dict[str, float]:
    sent = trades[trades['from'] == address]['price'].sum()
    received = trades[trades['to'] == address]['price'].sum()

    features = {}

    # Net P&L: wash trader typically has P&L close to 0 (only gas)
    features['net_pnl'] = received - sent
    features['total_volume'] = sent + received
    features['pnl_to_volume_ratio'] = abs(features['net_pnl']) / max(features['total_volume'], 1)
    # Low value = suspicious (no real profit/loss)

    # Number of unique counterparties
    counterparts = set(trades[trades['from'] == address]['to'].tolist() +
                       trades[trades['to'] == address]['from'].tolist())
    features['unique_counterparts'] = len(counterparts)

    # Volume concentration with one counterparty
    if len(counterparts) > 0:
        volumes_by_counterpart = trades.groupby('to')['price'].sum()
        max_concentration = volumes_by_counterpart.max() / max(sent, 1)
        features['max_counterpart_concentration'] = max_concentration

    return features

NFT-specific features

def compute_nft_features(
    trades: pd.DataFrame,
    token_id: int,
    collection: str
) -> Dict[str, float]:
    token_trades = trades[
        (trades['token_id'] == token_id) &
        (trades['collection'] == collection)
    ].sort_values('timestamp')

    features = {}

    # Number of ownership changes
    features['ownership_changes'] = len(token_trades)

    # Return to previous owners
    owners_seen = set()
    revisits = 0
    for _, row in token_trades.iterrows():
        if row['to'] in owners_seen:
            revisits += 1
        owners_seen.add(row['to'])
    features['ownership_revisit_rate'] = revisits / max(len(token_trades), 1)

    # Price growth vs market benchmark
    if len(token_trades) >= 2:
        price_growth = (token_trades.iloc[-1]['price'] /
                        token_trades.iloc[0]['price'] - 1)
        features['price_growth'] = price_growth
        # Anomalous price growth with high volume = suspicious
    else:
        features['price_growth'] = 0

    return features

Classification model

Collect features and train Gradient Boosting:

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve, roc_auc_score
import shap

def train_wash_trading_model(
    features_df: pd.DataFrame,
    labels: pd.Series  # 1 = wash trading, 0 = legitimate
):
    X_train, X_test, y_train, y_test = train_test_split(
        features_df, labels, test_size=0.2, stratify=labels
    )

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    model = GradientBoostingClassifier(
        n_estimators=200,
        max_depth=5,
        learning_rate=0.05,
        subsample=0.8,
        random_state=42
    )
    model.fit(X_train_scaled, y_train)

    # SHAP for interpretability
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_test_scaled)

    # Precision-Recall more important than ROC for imbalanced data
    y_proba = model.predict_proba(X_test_scaled)[:, 1]
    auc = roc_auc_score(y_test, y_proba)
    print(f"ROC-AUC: {auc:.3f}")

    return model, scaler, explainer

Confidence assessment and interpretation

The model outputs not a binary result, but a score with explanation:

@dataclass
class WashTradingAssessment:
    address: str
    wash_probability: float    # 0.0 - 1.0
    risk_level: str           # LOW / MEDIUM / HIGH / CRITICAL
    contributing_factors: List[str]  # SHAP-based explanation
    flagged_transactions: List[str]  # specific suspicious txs

def assess_address(
    address: str,
    model,
    scaler,
    explainer,
    features: Dict
) -> WashTradingAssessment:
    X = pd.DataFrame([features])
    X_scaled = scaler.transform(X)

    probability = model.predict_proba(X_scaled)[0][1]

    if probability < 0.3:
        risk_level = "LOW"
    elif probability < 0.6:
        risk_level = "MEDIUM"
    elif probability < 0.85:
        risk_level = "HIGH"
    else:
        risk_level = "CRITICAL"

    # SHAP explanation — which features had the most impact
    shap_vals = explainer.shap_values(X_scaled)[0]
    top_factors = sorted(
        zip(X.columns, shap_vals),
        key=lambda x: abs(x[1]),
        reverse=True
    )[:5]

    contributing_factors = [
        f"{feat}: {'+' if val > 0 else '-'}{abs(val):.3f}"
        for feat, val in top_factors
    ]

    return WashTradingAssessment(
        address=address,
        wash_probability=probability,
        risk_level=risk_level,
        contributing_factors=contributing_factors,
        flagged_transactions=[]
    )

Data sources

Source Data Updates
The Graph On-chain events DEX/NFT Real-time
Dune Analytics Historical data, SQL access Minutes
Transpose Transaction graph data Real-time API
Flipside Crypto On-chain analytics Daily
Native indexer Custom events Real-time

For production model on DEX — a custom indexer via WebSocket RPC provides lowest latency and full data control. Dune Analytics is good for development and research but too slow for real-time monitoring.

The model is not an absolute detector. High score — signal for manual analysis, not automatic ban. Especially important for NFT marketplaces where market-makers and automated traders can produce false positives.