AI-Powered Driver-Passenger Matching System in Ridesharing
Matching in Uber/Lyft is not just about finding the "nearest driver." The algorithm simultaneously minimizes passenger wait time, maximizes driver utilization, and prevents empty repositioning trips between rides. With proper matching, ETA can be reduced by 30-40%.
Optimal Matching Algorithm
import numpy as np
from scipy.optimize import linear_sum_assignment
from dataclasses import dataclass
from typing import Optional
import heapq
@dataclass
class Driver:
id: str
lat: float
lon: float
current_passengers: int
max_passengers: int
rating: float
acceptance_rate: float
vehicle_type: str # economy, comfort, xl
@dataclass
class RideRequest:
id: str
pickup_lat: float
pickup_lon: float
dropoff_lat: float
dropoff_lon: float
passenger_count: int
vehicle_preference: str
max_wait_seconds: int
surge_accepted: bool
class RideshareMatchingEngine:
"""Driver-passenger matching considering multiple criteria"""
EARTH_RADIUS_KM = 6371.0
def haversine_distance(self, lat1: float, lon1: float,
lat2: float, lon2: float) -> float:
"""Distance in kilometers"""
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = (np.sin(dlat/2)**2 +
np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2)
return 2 * self.EARTH_RADIUS_KM * np.arcsin(np.sqrt(a))
def estimated_pickup_time(self, driver: Driver, request: RideRequest) -> float:
"""ETA in minutes (simplified via distance, in production use OSRM/Google Maps)"""
dist_km = self.haversine_distance(
driver.lat, driver.lon,
request.pickup_lat, request.pickup_lon
)
# Average speed considering urban traffic: 20-25 km/h
return dist_km / 22 * 60
def compute_match_score(self, driver: Driver,
request: RideRequest) -> float:
"""
Composite matching score. Minimize ETA + maximize utilization +
consider preferences and driver quality.
"""
eta_min = self.estimated_pickup_time(driver, request)
# Hard constraints
if driver.vehicle_type != request.vehicle_preference and request.vehicle_preference != 'any':
if not (request.vehicle_preference == 'economy' and driver.vehicle_type == 'comfort'):
return -1.0 # Invalid match
if driver.current_passengers + request.passenger_count > driver.max_passengers:
return -1.0 # No available seats
if eta_min > request.max_wait_seconds / 60:
return -1.0 # Wait time too long
# Normalization of components (lower ETA = higher score)
eta_score = max(0, 1.0 - eta_min / 10) # 0 min = 1.0, 10+ min = 0
# Driver quality
quality_score = (driver.rating - 4.0) / 1.0 * 0.5 + driver.acceptance_rate * 0.5
# Detour coefficient for pooled rides (if driver already has passengers)
if driver.current_passengers > 0:
detour_factor = 0.7 # Pooled ride is less attractive for passenger
else:
detour_factor = 1.0
return eta_score * 0.55 + quality_score * 0.25 + detour_factor * 0.20
def batch_match(self, drivers: list[Driver],
requests: list[RideRequest]) -> dict:
"""
Optimal batch matching using Hungarian algorithm.
Runs every 30 seconds for accumulated requests.
"""
n_drivers = len(drivers)
n_requests = len(requests)
if n_drivers == 0 or n_requests == 0:
return {'matches': [], 'unmatched_requests': [r.id for r in requests]}
# Cost matrix (Hungarian algorithm minimizes, so we invert the score)
cost_matrix = np.full((n_drivers, n_requests), 1000.0)
for i, driver in enumerate(drivers):
for j, request in enumerate(requests):
score = self.compute_match_score(driver, request)
if score >= 0:
cost_matrix[i, j] = 1.0 - score # Inversion for minimization
# Hungarian algorithm O(n³)
driver_indices, request_indices = linear_sum_assignment(cost_matrix)
matches = []
matched_request_ids = set()
for d_idx, r_idx in zip(driver_indices, request_indices):
if cost_matrix[d_idx, r_idx] < 900.0: # Not a dummy assignment
matches.append({
'driver_id': drivers[d_idx].id,
'request_id': requests[r_idx].id,
'eta_min': round(self.estimated_pickup_time(drivers[d_idx], requests[r_idx]), 1),
'score': round(1.0 - cost_matrix[d_idx, r_idx], 3)
})
matched_request_ids.add(requests[r_idx].id)
unmatched = [r.id for r in requests if r.id not in matched_request_ids]
return {
'matches': matches,
'unmatched_requests': unmatched,
'match_rate': len(matches) / max(len(requests), 1)
}
class DriverPositioningAdvisor:
"""Recommendations to drivers on where to relocate for the next order"""
def suggest_repositioning(self, driver: Driver,
demand_heatmap: dict,
nearby_drivers: list[Driver],
radius_km: float = 3.0) -> dict:
"""
demand_heatmap: {(lat, lon): expected_requests_next_30min}
Find a zone with high demand and low driver competition.
"""
best_zone = None
best_score = -1.0
for (zone_lat, zone_lon), expected_demand in demand_heatmap.items():
dist_to_zone = self.haversine_distance(
driver.lat, driver.lon, zone_lat, zone_lon
)
if dist_to_zone > radius_km:
continue
# How many drivers are already in this zone
competing_drivers = sum(
1 for d in nearby_drivers
if self.haversine_distance(d.lat, d.lon, zone_lat, zone_lon) < 1.0
)
# Demand per driver = demand / (drivers + 1)
demand_per_driver = expected_demand / (competing_drivers + 1)
# Penalty for relocation distance
relocation_cost = dist_to_zone / radius_km * 0.3
score = demand_per_driver - relocation_cost
if score > best_score:
best_score = score
best_zone = (zone_lat, zone_lon, dist_to_zone, expected_demand)
if best_zone:
return {
'suggest': True,
'target_lat': best_zone[0],
'target_lon': best_zone[1],
'distance_km': round(best_zone[2], 1),
'expected_wait_min': round(best_zone[2] / 22 * 60, 0), # Time to reach
'expected_demand': best_zone[3]
}
return {'suggest': False, 'reason': 'Already in optimal zone'}
def haversine_distance(self, lat1, lon1, lat2, lon2) -> float:
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
return 2 * 6371.0 * np.arcsin(np.sqrt(a))
Batch matching every 30 seconds (versus greedy online matching) reduces average ETA by 15-20%. Positioning recommendations for drivers increase their earnings per hour by 10-15% and improve coverage in high-demand areas.







