Development of token price manipulation detection model
Price manipulation in DeFi — not abstract threat. Flash loan attacks use temporary distortion of token price to drain lending protocols: borrow a billion, manipulate oracle price, take loans at non-existent collateral value, repay loan, profit. Mango Markets (October 2022) — $117M stolen through manipulation of MNGO token price in Mango oracle. CREAM Finance (October 2021) — $130M through flash loan + yUSD price manipulation.
The detection model does not prevent attack but enables: (1) stop protocol before malicious transaction execution, (2) study patterns to improve oracle protection, (3) alert team in real time.
Typology of manipulations
Flash loan oracle manipulation
Classic vector: AMM (Uniswap/Curve pool) used as price oracle. Attacker temporarily moves price in AMM through large trade, exploits distorted price in victim protocol, returns AMM to normal.
Attack signature in on-chain data:
- Flash loan transaction (one block, one tx)
- Inside tx: large swap → victim protocol call → reverse swap
- Sharp spot price deviation from TWAP at transaction start
- Return to normal price at transaction end
Sandwich attack on oracle update
Lesser-known vector: attacker knows when oracle updates at certain condition, front-runs update, exploits period between old and new price.
Wash trading for price inflation
Series of coordinated transactions between affiliated wallets creates artificial volume and price movement. Goal: raise token price before large sale execution or manipulate lending collateral value.
Signature: high wallet correlation, zero or near-zero net P&L cycles, unusually low slippage at high volume.
Low-liquidity spot manipulation
For tokens with low liquidity ($10K-$100K in pool) small trade ($50K-$200K) can move price 50-200%. If lending protocol accepts this token as collateral with spot price oracle — exploit is trivial.
Statistical detection methods
TWAP deviation detector
Simplest and most effective method: compare spot price with TWAP (Time-Weighted Average Price).
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class PricePoint:
timestamp: int
block: int
price: float
volume: float
def compute_twap(prices: List[PricePoint], window_seconds: int) -> float:
"""Compute time-weighted average price over window."""
if not prices:
return 0.0
current_time = prices[-1].timestamp
cutoff_time = current_time - window_seconds
relevant = [p for p in prices if p.timestamp >= cutoff_time]
if len(relevant) < 2:
return prices[-1].price
total_weighted = 0.0
total_time = 0.0
for i in range(1, len(relevant)):
dt = relevant[i].timestamp - relevant[i-1].timestamp
total_weighted += relevant[i-1].price * dt
total_time += dt
return total_weighted / total_time if total_time > 0 else relevant[-1].price
def detect_twap_deviation(
spot_price: float,
twap_30min: float,
twap_1h: float,
threshold_pct: float = 5.0
) -> dict:
"""
Detect significant deviation between spot and TWAP prices.
Returns risk assessment.
"""
dev_30min = abs(spot_price - twap_30min) / twap_30min * 100
dev_1h = abs(spot_price - twap_1h) / twap_1h * 100
severity = 'normal'
if dev_30min > threshold_pct * 3 or dev_1h > threshold_pct * 4:
severity = 'critical'
elif dev_30min > threshold_pct * 2 or dev_1h > threshold_pct * 2.5:
severity = 'high'
elif dev_30min > threshold_pct or dev_1h > threshold_pct * 1.5:
severity = 'medium'
return {
'spot': spot_price,
'twap_30min': twap_30min,
'twap_1h': twap_1h,
'deviation_30min_pct': dev_30min,
'deviation_1h_pct': dev_1h,
'severity': severity,
}
Volume-price correlation anomaly detector
Normal price movement accompanied by corresponding volume. Sharp price movement with anomalously high one-direction volume — sign of manipulation.
def detect_volume_price_anomaly(
price_changes: List[float], # % price change per period
volumes: List[float], # trade volume per period
lookback: int = 100
) -> dict:
"""
Detect abnormal volume/price movement ratio
using Z-score normalization.
"""
if len(price_changes) < lookback:
return {'anomaly': False, 'reason': 'insufficient data'}
# Normalize by historical distribution
hist_prices = np.array(price_changes[-lookback:])
hist_volumes = np.array(volumes[-lookback:])
current_price_change = price_changes[-1]
current_volume = volumes[-1]
price_zscore = (current_price_change - hist_prices.mean()) / (hist_prices.std() + 1e-8)
volume_zscore = (current_volume - hist_volumes.mean()) / (hist_volumes.std() + 1e-8)
# Anomaly: high volume + large move in one direction
is_anomaly = (
abs(price_zscore) > 3.0 and # price > 3 std devs from norm
volume_zscore > 2.5 and # volume > 2.5 std devs from norm
price_zscore * volume_zscore > 0 # both in same direction
)
return {
'anomaly': is_anomaly,
'price_zscore': price_zscore,
'volume_zscore': volume_zscore,
'current_price_change_pct': current_price_change,
'volume_vs_avg': current_volume / hist_volumes.mean(),
}
Flash loan pattern detector (on-chain)
Flash loan attack has specific transaction structure. Detect through call trace analysis:
def analyze_transaction_for_flash_loan(tx_trace: dict) -> dict:
"""
Analyzes transaction call trace for flash loan attack signs.
tx_trace — output from debug_traceTransaction or Tenderly API.
"""
calls = flatten_calls(tx_trace)
# Search for flash loan pattern: flashLoan → [calls] → flashLoan callback
flash_loan_providers = {
'0xba12222222228d8ba445958a75a0704d566bf2c8': 'Balancer', # Balancer Vault
'0x7d2768de32b0b80b7a3454c06bdac94a69ddc7a9': 'Aave V2',
'0x87870bca3f3fd6335c3f4ce8392d69350b4fa4e2': 'Aave V3',
}
involved_flash_loans = []
large_swaps = []
target_protocol_calls = []
for call in calls:
if call['to'].lower() in flash_loan_providers:
involved_flash_loans.append({
'provider': flash_loan_providers[call['to'].lower()],
'selector': call['input'][:10],
})
# Search for large swap events
if is_swap_call(call) and parse_swap_amount(call) > LARGE_SWAP_THRESHOLD:
large_swaps.append(call)
# Flash loan + large swap in one transaction = high risk
risk_score = 0
if involved_flash_loans:
risk_score += 50
if len(large_swaps) >= 2: # swap back and forth
risk_score += 30
if has_target_protocol_interaction(calls):
risk_score += 20
return {
'risk_score': risk_score,
'is_high_risk': risk_score >= 80,
'flash_loans': involved_flash_loans,
'large_swaps': len(large_swaps),
'recommendation': 'BLOCK' if risk_score >= 80 else 'MONITOR',
}
Wash trading detector
def detect_wash_trading(
trades: List[dict], # {buyer, seller, amount, price, timestamp}
lookback_hours: int = 24
) -> dict:
"""
Detect wash trading patterns.
Signs: one address as both buyer and seller,
or ring of mutual trades.
"""
from collections import defaultdict
# Build trade relationship graph
trade_graph = defaultdict(lambda: defaultdict(float))
cutoff = int(time.time()) - lookback_hours * 3600
recent_trades = [t for t in trades if t['timestamp'] >= cutoff]
for trade in recent_trades:
trade_graph[trade['buyer']][trade['seller']] += trade['amount']
trade_graph[trade['seller']][trade['buyer']] += trade['amount']
wash_pairs = []
addresses = list(trade_graph.keys())
for i, addr_a in enumerate(addresses):
for addr_b in addresses[i+1:]:
flow_a_to_b = trade_graph[addr_a][addr_b]
flow_b_to_a = trade_graph[addr_b][addr_a]
if flow_a_to_b > 0 and flow_b_to_a > 0:
symmetry_ratio = min(flow_a_to_b, flow_b_to_a) / max(flow_a_to_b, flow_b_to_a)
# High symmetry = suspicious
if symmetry_ratio > 0.8:
wash_pairs.append({
'address_a': addr_a,
'address_b': addr_b,
'flow_ab': flow_a_to_b,
'flow_ba': flow_b_to_a,
'symmetry': symmetry_ratio,
})
return {
'wash_pairs_found': len(wash_pairs),
'suspicious_pairs': wash_pairs[:10],
'is_suspicious': len(wash_pairs) > 0,
}
On-chain Oracle protection
Detection model complemented by on-chain protection in oracle itself. Chainlink CCIP Price Feed has built-in circuit breakers, but custom oracles (Uniswap V3 TWAP, Curve EMA) require manual protection:
contract ManipulationResistantOracle {
IUniswapV3Pool public immutable pool;
uint32 public constant TWAP_PERIOD = 1800; // 30 minutes
uint256 public constant MAX_DEVIATION_BPS = 500; // 5%
function getPrice() external view returns (uint256) {
uint256 spotPrice = _getSpotPrice();
uint256 twapPrice = _getTWAPPrice(TWAP_PERIOD);
// Check that spot has not deviated more than MAX_DEVIATION from TWAP
uint256 deviation = spotPrice > twapPrice
? (spotPrice - twapPrice) * 10000 / twapPrice
: (twapPrice - spotPrice) * 10000 / twapPrice;
if (deviation > MAX_DEVIATION_BPS) {
// During manipulation — return TWAP, not spot
return twapPrice;
}
return spotPrice;
}
function _getTWAPPrice(uint32 period) internal view returns (uint256) {
uint32[] memory secondsAgos = new uint32[](2);
secondsAgos[0] = period;
secondsAgos[1] = 0;
(int56[] memory tickCumulatives,) = pool.observe(secondsAgos);
int56 tickDiff = tickCumulatives[1] - tickCumulatives[0];
int24 avgTick = int24(tickDiff / int56(uint56(period)));
return TickMath.getSqrtRatioAtTick(avgTick);
}
}
Pipeline and infrastructure
Blockchain nodes (Alchemy/QuickNode)
↓ WebSocket streams
Event Processor (Node.js)
↓
Detection Models (Python FastAPI)
├── TWAP Deviation Checker
├── Volume Anomaly Detector
├── Flash Loan Analyzer
└── Wash Trading Detector
↓
Risk Aggregator
├── Score < 40: log only
├── Score 40-70: alert team
└── Score > 70: auto-pause + alert
↓
Actions: Telegram/PagerDuty + Circuit Breaker
Latency critical: from suspicious pending transaction appearance to pause execution must be < 3 seconds.
Training and testing data
Historical attacks — best ground truth source. Publicly documented incidents:
| Date | Protocol | Attack Type | Loss |
|---|---|---|---|
| 2021-10 | Cream Finance | Flash loan + oracle | $130M |
| 2022-04 | Beanstalk | Governance flash loan | $182M |
| 2022-10 | Mango Markets | Spot manipulation | $117M |
| 2023-03 | Euler Finance | Flash loan drain | $197M |
Each attack documented on rekt.news and has on-chain data (transaction hashes). Reproduction on Foundry fork: forge test --fork-url $ETH_RPC --fork-block-number [pre-attack block].
Development process
Threat analysis (1 week). Map of possible manipulation vectors for specific protocol. Which oracles are used, their liquidity, which functions depend on price.
Detection model development (2-3 weeks). Python ML service with TWAP deviation + volume anomaly + flash loan pattern detectors. Backtest on historical data.
On-chain oracle protection (1 week). Manipulation-resistant oracle wrapper if current oracle is vulnerable.
Circuit breaker integration (1 week). Detection → auto-pause pipeline.
Audit and testing (1 week). Reproduction of known attacks on fork, verify detection.
Monitoring infrastructure (1-2 weeks). Event streaming, alerting, dashboards.
Full cycle: 2-2.5 months. Cost — after scope evaluation and supported chains assessment.







