AI-система матчингу водіїв та пасажирів у райдшерингу
Матчинг у Uber/Lyft — це не просто пошук «найближчого водія». Алгоритм одночасно мінімізує час очікування пасажира, максимізує завантаженість водія та запобігає «порожнім» переїздам між поїздками. При правильному матчингу ETA можна скоротити на 30-40%.
Алгоритм оптимального матчингу
import numpy as np
from scipy.optimize import linear_sum_assignment
from dataclasses import dataclass
from typing import Optional
import heapq
@dataclass
class Driver:
id: str
lat: float
lon: float
current_passengers: int
max_passengers: int
rating: float
acceptance_rate: float
vehicle_type: str # economy, comfort, xl
@dataclass
class RideRequest:
id: str
pickup_lat: float
pickup_lon: float
dropoff_lat: float
dropoff_lon: float
passenger_count: int
vehicle_preference: str
max_wait_seconds: int
surge_accepted: bool
class RideshareMatchingEngine:
"""Матчинг водія та пасажира з урахуванням кількох критеріїв"""
EARTH_RADIUS_KM = 6371.0
def haversine_distance(self, lat1: float, lon1: float,
lat2: float, lon2: float) -> float:
"""Відстань у кілометрах"""
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = (np.sin(dlat/2)**2 +
np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2)
return 2 * self.EARTH_RADIUS_KM * np.arcsin(np.sqrt(a))
def estimated_pickup_time(self, driver: Driver, request: RideRequest) -> float:
"""ETA у хвилинах (спрощено через дистанцію, у продакшені — OSRM/Google Maps)"""
dist_km = self.haversine_distance(
driver.lat, driver.lon,
request.pickup_lat, request.pickup_lon
)
# Середня швидкість з урахуванням міського трафіку: 20-25 км/год
return dist_km / 22 * 60
def compute_match_score(self, driver: Driver,
request: RideRequest) -> float:
"""
Складова оцінка матчингу. Мінімізуємо ETA + максимізуємо
використання + враховуємо переваги та якість водія.
"""
eta_min = self.estimated_pickup_time(driver, request)
# Жорсткі обмеження
if driver.vehicle_type != request.vehicle_preference and request.vehicle_preference != 'any':
if not (request.vehicle_preference == 'economy' and driver.vehicle_type == 'comfort'):
return -1.0 # Недопустиме збігання
if driver.current_passengers + request.passenger_count > driver.max_passengers:
return -1.0 # Немає місць
if eta_min > request.max_wait_seconds / 60:
return -1.0 # Занадто довго чекати
# Нормалізація компонентів (менша ETA = вища оцінка)
eta_score = max(0, 1.0 - eta_min / 10) # 0 хв = 1.0, 10+ хв = 0
# Якість водія
quality_score = (driver.rating - 4.0) / 1.0 * 0.5 + driver.acceptance_rate * 0.5
# Коефіцієнт об'їзду для спільних поїздок (якщо водій вже везе пасажирів)
if driver.current_passengers > 0:
detour_factor = 0.7 # Спільна поїздка менш привабливо для пасажира
else:
detour_factor = 1.0
return eta_score * 0.55 + quality_score * 0.25 + detour_factor * 0.20
def batch_match(self, drivers: list[Driver],
requests: list[RideRequest]) -> dict:
"""
Оптимальний пакетний матчинг через угорський алгоритм.
Запускається кожні 30 секунд для накопичених запитів.
"""
n_drivers = len(drivers)
n_requests = len(requests)
if n_drivers == 0 or n_requests == 0:
return {'matches': [], 'unmatched_requests': [r.id for r in requests]}
# Матриця вартості (угорський алгоритм мінімізує, тому інвертуємо оцінку)
cost_matrix = np.full((n_drivers, n_requests), 1000.0)
for i, driver in enumerate(drivers):
for j, request in enumerate(requests):
score = self.compute_match_score(driver, request)
if score >= 0:
cost_matrix[i, j] = 1.0 - score # Інверсія для мінімізації
# Угорський алгоритм O(n³)
driver_indices, request_indices = linear_sum_assignment(cost_matrix)
matches = []
matched_request_ids = set()
for d_idx, r_idx in zip(driver_indices, request_indices):
if cost_matrix[d_idx, r_idx] < 900.0: # Не фіктивне завдання
matches.append({
'driver_id': drivers[d_idx].id,
'request_id': requests[r_idx].id,
'eta_min': round(self.estimated_pickup_time(drivers[d_idx], requests[r_idx]), 1),
'score': round(1.0 - cost_matrix[d_idx, r_idx], 3)
})
matched_request_ids.add(requests[r_idx].id)
unmatched = [r.id for r in requests if r.id not in matched_request_ids]
return {
'matches': matches,
'unmatched_requests': unmatched,
'match_rate': len(matches) / max(len(requests), 1)
}
class DriverPositioningAdvisor:
"""Рекомендації водієві, куди переїхати для наступного замовлення"""
def suggest_repositioning(self, driver: Driver,
demand_heatmap: dict,
nearby_drivers: list[Driver],
radius_km: float = 3.0) -> dict:
"""
demand_heatmap: {(lat, lon): expected_requests_next_30min}
Шукаємо зону з високим попитом та малою конкуренцією серед водіїв.
"""
best_zone = None
best_score = -1.0
for (zone_lat, zone_lon), expected_demand in demand_heatmap.items():
dist_to_zone = self.haversine_distance(
driver.lat, driver.lon, zone_lat, zone_lon
)
if dist_to_zone > radius_km:
continue
# Скільки водіїв уже в цій зоні
competing_drivers = sum(
1 for d in nearby_drivers
if self.haversine_distance(d.lat, d.lon, zone_lat, zone_lon) < 1.0
)
# Попит на одного водія = demand / (drivers + 1)
demand_per_driver = expected_demand / (competing_drivers + 1)
# Штраф за дистанцію переміщення
relocation_cost = dist_to_zone / radius_km * 0.3
score = demand_per_driver - relocation_cost
if score > best_score:
best_score = score
best_zone = (zone_lat, zone_lon, dist_to_zone, expected_demand)
if best_zone:
return {
'suggest': True,
'target_lat': best_zone[0],
'target_lon': best_zone[1],
'distance_km': round(best_zone[2], 1),
'expected_wait_min': round(best_zone[2] / 22 * 60, 0), # Час доїзду
'expected_demand': best_zone[3]
}
return {'suggest': False, 'reason': 'Already in optimal zone'}
def haversine_distance(self, lat1, lon1, lat2, lon2) -> float:
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
return 2 * 6371.0 * np.arcsin(np.sqrt(a))
Пакетний матчинг кожні 30 секунд (порівняно з жадібним онлайн-матчингом) зменшує середню ETA на 15-20%. Рекомендації позиціювання для водіїв збільшують їхній заробіток за годину на 10-15% і покращують покриття районів з високим попитом.







