Разработка индикатора кумулятивной дельты
Кумулятивная дельта (CVD — Cumulative Volume Delta) — это накопленная сумма разности между объёмами агрессивных покупок и продаж за выбранный период. В отличие от дельты отдельной свечи, CVD показывает накопленный баланс сил рынка, что делает видимыми скрытые дивергенции с ценой.
Принцип работы CVD
Свеча 1: buy_vol=100, sell_vol=60 → delta=+40, CVD=+40
Свеча 2: buy_vol=80, sell_vol=90 → delta=-10, CVD=+30
Свеча 3: buy_vol=50, sell_vol=120 → delta=-70, CVD=-40
Свеча 4: buy_vol=200, sell_vol=80 → delta=+120, CVD=+80
CVD растёт когда покупатели агрессивнее. При растущей цене и падающем CVD — бычий нарратив не подтверждён объёмом (медвежья дивергенция).
Расчёт CVD
import pandas as pd
from decimal import Decimal
import asyncio
class CVDCalculator:
def __init__(self, symbol: str, reset_period: str = 'session'):
"""
reset_period: 'session' (каждый день), 'week', 'never'
"""
self.symbol = symbol
self.reset_period = reset_period
def calculate_from_trades(self, trades: list[dict]) -> pd.DataFrame:
"""
Рассчитываем CVD из raw aggTrades данных
trades: список {price, quantity, time, is_buyer_maker}
"""
df = pd.DataFrame(trades)
# Определяем сторону сделки
df['buy_vol'] = df.apply(
lambda row: row['quantity'] if not row['is_buyer_maker'] else 0,
axis=1
)
df['sell_vol'] = df.apply(
lambda row: row['quantity'] if row['is_buyer_maker'] else 0,
axis=1
)
df['delta'] = df['buy_vol'] - df['sell_vol']
# Группируем по свечам (например, 1-минутные)
df['time'] = pd.to_datetime(df['time'], unit='ms')
df = df.set_index('time')
candle_delta = df['delta'].resample('1min').sum()
candle_buy = df['buy_vol'].resample('1min').sum()
candle_sell = df['sell_vol'].resample('1min').sum()
result = pd.DataFrame({
'delta': candle_delta,
'buy_vol': candle_buy,
'sell_vol': candle_sell,
})
# CVD с ресетом по сессиям
if self.reset_period == 'session':
result['session'] = result.index.date
result['cvd'] = result.groupby('session')['delta'].cumsum()
else:
result['cvd'] = result['delta'].cumsum()
return result
Обнаружение дивергенций
class CVDDivergenceDetector:
def find_divergences(
self,
price_series: pd.Series,
cvd_series: pd.Series,
lookback: int = 20
) -> pd.DataFrame:
divergences = []
for i in range(lookback, len(price_series)):
window_price = price_series.iloc[i-lookback:i+1]
window_cvd = cvd_series.iloc[i-lookback:i+1]
current_price = window_price.iloc[-1]
current_cvd = window_cvd.iloc[-1]
# Медвежья дивергенция: цена выше, CVD ниже прежнего максимума
prev_price_high = window_price.iloc[:-1].max()
prev_cvd_at_high = window_cvd.iloc[window_price.iloc[:-1].argmax()]
if current_price > prev_price_high and current_cvd < prev_cvd_at_high:
divergences.append({
'time': price_series.index[i],
'type': 'bearish',
'price': current_price,
'cvd': current_cvd,
'strength': (prev_cvd_at_high - current_cvd) / abs(prev_cvd_at_high) * 100
})
# Бычья дивергенция: цена ниже, CVD выше прежнего минимума
prev_price_low = window_price.iloc[:-1].min()
prev_cvd_at_low = window_cvd.iloc[window_price.iloc[:-1].argmin()]
if current_price < prev_price_low and current_cvd > prev_cvd_at_low:
divergences.append({
'time': price_series.index[i],
'type': 'bullish',
'price': current_price,
'cvd': current_cvd,
'strength': (current_cvd - prev_cvd_at_low) / abs(prev_cvd_at_low) * 100
})
return pd.DataFrame(divergences)
Pine Script реализация
//@version=5
indicator("Cumulative Volume Delta", shorttitle="CVD", overlay=false)
reset_on_session = input.bool(true, "Reset daily")
// Приближение delta из OHLCV
candle_up = close >= open
delta_approx = candle_up ?
volume * ((close - open) / (high - low + 0.001)) :
-volume * ((open - close) / (high - low + 0.001))
// CVD с ресетом по сессии
var float cvd = 0.0
new_session = ta.change(time("D")) != 0
if reset_on_session and new_session
cvd := delta_approx
else
cvd := cvd + delta_approx
// Цвет по направлению
cvd_color = cvd >= cvd[1] ? color.new(color.green, 40) : color.new(color.red, 40)
hline(0, color=color.gray, linestyle=hline.style_dotted)
plot(cvd, "CVD", cvd_color, linewidth=2)
// Маркеры дивергенций (упрощённо)
price_up = close > close[20]
cvd_down = cvd < cvd[20]
bearish_div = price_up and cvd_down
plotshape(bearish_div, "Bear Div", shape.circle, location.top,
color.new(color.red, 0), size=size.tiny)
Real-time WebSocket обновление
class CVDWebSocketStreamer:
def __init__(self, symbol: str):
self.symbol = symbol
self.cvd = 0.0
self.session_date = None
async def stream(self):
url = f"wss://stream.binance.com:9443/ws/{self.symbol.lower()}@aggTrade"
async with websockets.connect(url) as ws:
async for message in ws:
trade = json.loads(message)
await self.process_trade(trade)
async def process_trade(self, trade: dict):
import datetime
today = datetime.date.today()
# Сброс CVD в начале сессии
if self.session_date != today:
self.cvd = 0.0
self.session_date = today
qty = float(trade['q'])
is_buyer_maker = trade['m']
delta = -qty if is_buyer_maker else qty
self.cvd += delta
# Публикуем обновление подписчикам
await self.broadcast({
'type': 'cvd_update',
'symbol': self.symbol,
'cvd': self.cvd,
'delta': delta,
'timestamp': trade['T']
})
CVD — более продвинутый инструмент чем обычный bar delta. Накопленная дивергенция видна сразу: когда цена росла 3 дня, а CVD падал всё это время — серьёзный предупреждающий сигнал о слабости тренда.







