Trader Notification System Development
A notification system is the infrastructure for delivering timely messages to traders: order execution, price level achievement, liquidation, new listing. Notification delay means missed opportunity or realized risk. The architecture must guarantee delivery under any load.
Notification Types
Priority Classification
P0 — Critical (immediately):
- Position liquidation
- Margin call (approaching liquidation)
- Order execution
P1 — Important (<5 seconds):
- Price alert triggered
- Partial order fill
- New deposit credited
P2 — Informational (<1 minute):
- New listing (announcement)
- Weekly P&L report
- Referral accrual
Delivery Channels
| Channel | Latency | Reliability | Best for |
|---|---|---|---|
| WebSocket (in-app) | <100ms | High (if online) | P0, real-time |
| Push (FCM/APNs) | 1-5s | Medium | P0, P1 mobile |
| Telegram Bot | 1-3s | High | P0, P1 |
| 1-60s | Very High | P2, reports | |
| SMS | 5-30s | High | P0 critical |
System Architecture
Event Bus → Notification Router
from enum import Enum
from dataclasses import dataclass
class NotificationPriority(Enum):
CRITICAL = 0
HIGH = 1
NORMAL = 2
@dataclass
class NotificationEvent:
user_id: str
event_type: str
priority: NotificationPriority
data: dict
channels: list[str] # ['websocket', 'push', 'telegram']
class NotificationRouter:
async def route(self, event: NotificationEvent):
# Get user preferences
prefs = await self.db.get_notification_prefs(event.user_id)
# Filter channels by preferences and priority
channels = self.select_channels(event, prefs)
tasks = []
for channel in channels:
handler = self.channel_handlers[channel]
tasks.append(handler.send(event))
# Critical notifications — wait for confirmation
if event.priority == NotificationPriority.CRITICAL:
results = await asyncio.gather(*tasks, return_exceptions=True)
await self.log_delivery(event, results)
else:
asyncio.gather(*tasks) # fire and forget for non-critical
WebSocket Delivery
class WebSocketNotificationHandler:
def __init__(self, connection_manager):
self.connections = connection_manager
async def send(self, event: NotificationEvent):
connection = self.connections.get_user_connection(event.user_id)
if not connection:
# User offline — save for later delivery
await self.store_pending(event)
return
try:
await connection.send_json({
'type': 'notification',
'event': event.event_type,
'data': event.data,
'priority': event.priority.value,
'timestamp': datetime.utcnow().isoformat()
})
except ConnectionClosed:
await self.store_pending(event)
async def deliver_pending_on_connect(self, user_id: str, connection):
"""On connect, deliver accumulated notifications"""
pending = await self.db.get_pending_notifications(user_id, limit=50)
for notif in pending:
await connection.send_json(notif.to_dict())
await self.db.mark_delivered(user_id, [n.id for n in pending])
Push Notifications (Firebase FCM)
import firebase_admin
from firebase_admin import messaging
class PushNotificationHandler:
def __init__(self):
firebase_admin.initialize_app()
async def send(self, event: NotificationEvent):
tokens = await self.db.get_fcm_tokens(event.user_id)
if not tokens:
return
message_data = self.format_push(event)
message = messaging.MulticastMessage(
tokens=tokens,
notification=messaging.Notification(
title=message_data['title'],
body=message_data['body']
),
data={k: str(v) for k, v in event.data.items()},
android=messaging.AndroidConfig(
priority='high' if event.priority == NotificationPriority.CRITICAL else 'normal'
),
apns=messaging.APNSConfig(
headers={'apns-priority': '10' if event.priority.value == 0 else '5'}
)
)
response = messaging.send_each_for_multicast(message)
# Clean invalid tokens
for i, result in enumerate(response.responses):
if not result.success and 'registration-token-not-registered' in str(result.exception):
await self.db.remove_fcm_token(tokens[i])
Price Alerts
Price alert — user sets trigger on specific price:
class PriceAlertEngine:
def __init__(self, price_feed, notification_router):
self.price_feed = price_feed
self.router = notification_router
# Alert cache: symbol -> list[Alert] (sorted by price)
self.alert_cache: dict[str, list] = {}
async def check_alerts(self, symbol: str, current_price: float):
alerts = self.alert_cache.get(symbol, [])
triggered = []
for alert in alerts:
if alert.condition == 'above' and current_price >= alert.target_price:
triggered.append(alert)
elif alert.condition == 'below' and current_price <= alert.target_price:
triggered.append(alert)
for alert in triggered:
alerts.remove(alert)
await self.router.route(NotificationEvent(
user_id=alert.user_id,
event_type='price_alert',
priority=NotificationPriority.HIGH,
data={
'symbol': symbol,
'target_price': alert.target_price,
'current_price': current_price,
'condition': alert.condition
},
channels=['websocket', 'push', 'telegram']
))
if alert.is_recurring:
# Recreate alert for re-trigger
await self.add_alert(alert)
User Settings
Detailed notification control improves UX and reduces unsubscribes:
interface NotificationPreferences {
orderFilled: { enabled: boolean; channels: Channel[] };
priceAlert: { enabled: boolean; channels: Channel[] };
liquidationWarning: {
enabled: boolean;
channels: Channel[];
thresholdPercent: number; // warn when margin ratio < X%
};
newListing: { enabled: boolean; minScore: number };
dailyReport: { enabled: boolean; time: string }; // "08:00"
quietHours: { enabled: boolean; from: string; to: string };
}
Quiet hours are important: nobody wants notification at 3 AM about new listing with score 5. P0 notifications (liquidation) always bypass quiet hours.







