AI-based driver fatigue and behavior monitoring system
According to the WHO, 20% of serious road accidents are related to drowsy driving. The Driver Monitoring System (DMS)—an in-car camera aimed at the driver—monitors signs of fatigue, distraction, and phone use in real time.
What are we tracking?
Fatigue manifests itself through several measurable facial parameters:
- PERCLOS (Percentage of Eye Closure): the proportion of time the eyes are closed > 80% in the last 60 seconds. PERCLOS > 15% = warning, > 25% = alarm
- Blink rate: normal 12–20 times/min, fatigue - < 8 or > 30
- Blink duration: normal 150–200 ms, fatigue — > 350 ms
- Head tilt angle: nodding down > 15° = falling asleep
- Direction of gaze: distraction for > 3 seconds
import cv2
import numpy as np
import mediapipe as mp
from collections import deque
import time
class DriverMonitoringSystem:
def __init__(self, config: dict):
# MediaPipe Face Mesh: 478 landmarks, быстро, хорошо на embedded
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# Индексы ключевых точек (MediaPipe Face Mesh)
self.LEFT_EYE = [362, 385, 387, 263, 373, 380]
self.RIGHT_EYE = [33, 160, 158, 133, 153, 144]
self.LEFT_IRIS = [474, 475, 476, 477]
self.RIGHT_IRIS = [469, 470, 471, 472]
# Буферы для temporal анализа
window = config.get('window_sec', 60) * config.get('fps', 30)
self.ear_buffer = deque(maxlen=window) # Eye Aspect Ratio
self.blink_buffer = deque(maxlen=window) # 1 если моргание
self.head_pose_buffer = deque(maxlen=300) # 10 секунд
# Текущее состояние моргания
self.in_blink = False
self.blink_start = None
self.alert_callbacks = config.get('alert_callbacks', [])
def _eye_aspect_ratio(self, landmarks: np.ndarray,
eye_indices: list) -> float:
"""EAR = (||p2-p6|| + ||p3-p5||) / (2 * ||p1-p4||)"""
pts = landmarks[eye_indices]
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C + 1e-6)
def _estimate_head_pose(self, landmarks: np.ndarray,
frame_size: tuple) -> dict:
"""Solvepnp для оценки pitch/yaw/roll головы"""
model_points = np.float32([
[0.0, 0.0, 0.0], # нос (тип)
[0.0, -330.0, -65.0], # подбородок
[-225.0, 170.0, -135.0], # левый угол глаза
[225.0, 170.0, -135.0], # правый угол глаза
[-150.0, -150.0, -125.0], # левый угол рта
[150.0, -150.0, -125.0], # правый угол рта
])
key_indices = [1, 152, 263, 33, 287, 57]
image_points = np.float32([landmarks[i] for i in key_indices])
h, w = frame_size
cam_matrix = np.float32([[w, 0, w/2],
[0, w, h/2],
[0, 0, 1]])
dist_coeffs = np.zeros((4, 1))
success, rvec, tvec = cv2.solvePnP(
model_points, image_points, cam_matrix, dist_coeffs
)
if not success:
return {'pitch': 0, 'yaw': 0, 'roll': 0}
rmat, _ = cv2.Rodrigues(rvec)
angles = cv2.RQDecomp3x3(rmat)[0]
return {'pitch': angles[0], 'yaw': angles[1], 'roll': angles[2]}
def process_frame(self, frame: np.ndarray) -> dict:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb)
if not results.multi_face_landmarks:
return {'driver_detected': False, 'alerts': []}
h, w = frame.shape[:2]
lm = results.multi_face_landmarks[0].landmark
landmarks = np.array([[l.x * w, l.y * h] for l in lm])
# EAR для обоих глаз
ear_left = self._eye_aspect_ratio(landmarks, self.LEFT_EYE)
ear_right = self._eye_aspect_ratio(landmarks, self.RIGHT_EYE)
ear = (ear_left + ear_right) / 2.0
self.ear_buffer.append(ear)
# Детекция моргания
ear_threshold = 0.22
if ear < ear_threshold:
if not self.in_blink:
self.in_blink = True
self.blink_start = time.time()
else:
if self.in_blink:
blink_duration = time.time() - self.blink_start
self.blink_buffer.append(blink_duration)
self.in_blink = False
# PERCLOS: доля кадров с EAR < threshold за последние 60 сек
perclos = sum(1 for e in self.ear_buffer
if e < ear_threshold) / max(len(self.ear_buffer), 1)
# Поза головы
head_pose = self._estimate_head_pose(landmarks, (h, w))
self.head_pose_buffer.append(head_pose)
alerts = self._generate_alerts(perclos, head_pose)
return {
'driver_detected': True,
'ear': ear,
'perclos': perclos,
'head_pose': head_pose,
'recent_blink_durations': list(self.blink_buffer)[-5:],
'alerts': alerts
}
def _generate_alerts(self, perclos: float,
head_pose: dict) -> list[str]:
alerts = []
if perclos > 0.25:
alerts.append('DROWSINESS_CRITICAL')
elif perclos > 0.15:
alerts.append('DROWSINESS_WARNING')
if head_pose['pitch'] < -20:
alerts.append('HEAD_NODDING')
if abs(head_pose['yaw']) > 30:
alerts.append('DISTRACTION_YAW')
return alerts
Phone usage detection
A separate model for hands: YOLOv8n retrained on the Driver Phone Use Dataset:
class PhoneUseDetector:
def __init__(self, model_path: str):
self.model = YOLO(model_path)
self.detection_buffer = deque(maxlen=15) # 0.5 сек @ 30fps
def detect(self, frame: np.ndarray) -> bool:
dets = self.model(frame, conf=0.6,
classes=['phone', 'cell phone'])
self.detection_buffer.append(len(dets[0].boxes) > 0)
# Тревога если телефон обнаружен в 10+ из 15 последних кадров
return sum(self.detection_buffer) >= 10
Performance on embedded
On the Qualcomm SA8295P (ADAS SoC): MediaPipe FaceMesh — 8ms, YOLOv8n phone — 12ms. Total frame rates < 25ms, providing real-time performance at 30fps without any dropouts.
On Raspberry Pi 4 (4GB RAM): MediaPipe + OpenCV — 35ms at 720p. Acceptable for fleet monitoring of commercial vehicles.
Case: Bus depot, 80 vehicles
Driver Safety Monitor (DSM) was installed on 80 city buses. In three months:
- 1240 DROWSINESS_WARNING events were recorded, 87 of which were CRITICAL
- After implementing the system and training drivers: a 64% reduction in critical events
- 340 cases of phone use while driving were recorded and reported to HR
| Scale | Term |
|---|---|
| Prototype (EAR + PERCLOS) | 3-4 weeks |
| Full DMS (fatigue + phone + look) | 6–10 weeks |
| Fleet solution with cloud analytics | 10–16 weeks |







