Integrating ROS (Robot Operating System) with AI models
ROS 2 is a standard for developing robotic systems. Integrating AI models with ROS enables robots to perceive their environment (computer vision, LiDAR processing), make decisions (reinforcement learning policy), and execute actions in real time with minimal latency.
ROS 2 + AI Architecture
In ROS 2, an AI model is typically implemented as a Node that subscribes to sensor topics (camera, LiDAR) and publishes results (detected objects, control commands):
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray
from cv_bridge import CvBridge
import torch
import numpy as np
class ObjectDetectionNode(Node):
def __init__(self):
super().__init__('object_detection')
# Загрузка модели
self.model = torch.hub.load(
'ultralytics/yolov8', 'yolov8n',
pretrained=True
)
self.model.eval()
if torch.cuda.is_available():
self.model.cuda()
self.bridge = CvBridge()
# Подписка на камеру
self.subscription = self.create_subscription(
Image,
'/camera/color/image_raw',
self.image_callback,
10 # QoS depth
)
# Публикация результатов
self.detection_pub = self.create_publisher(
Detection2DArray,
'/detections',
10
)
self.get_logger().info('Object detection node started')
def image_callback(self, msg: Image):
# Конвертация ROS Image → numpy
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='rgb8')
# Инференс
with torch.no_grad():
results = self.model(cv_image)
# Конвертация в ROS Detection2DArray
detections = self._to_detection_array(results, msg.header)
self.detection_pub.publish(detections)
def main():
rclpy.init()
node = ObjectDetectionNode()
rclpy.spin(node)
rclpy.shutdown()
LiDAR + PointNet for 3D perception
from sensor_msgs.msg import PointCloud2
import sensor_msgs_py.point_cloud2 as pc2
class LidarPerceptionNode(Node):
def __init__(self):
super().__init__('lidar_perception')
self.pointnet_model = load_pointnet_model('pointnet_weights.pth')
self.sub = self.create_subscription(
PointCloud2, '/velodyne_points',
self.lidar_callback, 10
)
def lidar_callback(self, msg: PointCloud2):
# Конвертация PointCloud2 → numpy (N, 3)
points = np.array(list(pc2.read_points(msg, field_names=("x","y","z"))))
# Субсэмплирование до 1024 точек для PointNet
indices = np.random.choice(len(points), 1024, replace=False)
sampled = points[indices]
# Инференс
tensor = torch.FloatTensor(sampled).unsqueeze(0).cuda()
with torch.no_grad():
classes = self.pointnet_model(tensor)
# Публикация классифицированных объектов
Latency optimization for real-time
For robotic systems, latency is critical: the perception loop must operate at 10-30 Hz:
- TensorRT optimization: PyTorch → ONNX → TensorRT conversion provides 2-5x speedup on NVIDIA Jetson
- Parallel processing: running inference in a separate thread without blocking the ROS callback executor
- CUDA streams: parallel processing of multiple frames
# TensorRT для NVIDIA Jetson Orin
import tensorrt as trt
# YOLOv8 → ONNX → TensorRT: ~8ms на Jetson Orin vs ~45ms PyTorch
Navigation with RL policy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
class RLNavigationNode(Node):
def __init__(self):
super().__init__('rl_navigation')
self.policy = load_stable_baselines3_model('navigation_policy.zip')
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.odom_sub = self.create_subscription(Odometry, '/odom', self.step, 10)
def step(self, odom_msg):
state = self._odom_to_state(odom_msg)
action, _ = self.policy.predict(state, deterministic=True)
cmd = Twist()
cmd.linear.x = float(action[0])
cmd.angular.z = float(action[1])
self.cmd_vel_pub.publish(cmd)
Integration takes 2-4 weeks depending on the complexity of the perception pipeline and real-time processing requirements.







