AI Automated Food Sorting System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Automated Food Sorting System
Medium
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1212
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822

Автоматизация сортировки продуктов питания с помощью AI

Сортировочная линия с AI — это детекция + классификация + управление физическим актуатором (пневматический сдув, дефлектор, рука-манипулятор) в реальном времени. Latency решения должна укладываться в 20–50ms, иначе продукт уедет мимо сортировочной точки.

Система real-time сортировки

import cv2
import numpy as np
from ultralytics import YOLO
import time
from queue import Queue
import threading

class FoodSortingSystem:
    def __init__(self, config: dict):
        self.detector = YOLO(config['model_path'])
        self.belt_speed = config['belt_speed_ms']  # м/с
        self.camera_to_actuator_dist = config['cam_to_actuator_m']  # метры

        # Задержка до срабатывания актуатора
        self.actuator_delay = self.camera_to_actuator_dist / self.belt_speed
        self.actuator_queue = Queue()

        # Запускаем поток управления актуатором
        self.actuator_thread = threading.Thread(
            target=self._actuator_worker, daemon=True
        )
        self.actuator_thread.start()

        self.grade_to_lane = config['grade_to_lane']
        # {'Premium': 1, 'Standard': 2, 'Juice': 3, 'Reject': 0}

    def process_frame(self, frame: np.ndarray,
                       frame_timestamp: float) -> list:
        start = time.perf_counter()
        results = self.detector(frame, conf=0.45)
        inference_ms = (time.perf_counter() - start) * 1000

        sorting_commands = []

        for box in results[0].boxes:
            cls = self.detector.model.names[int(box.cls)]
            bbox = list(map(int, box.xyxy[0]))
            conf = float(box.conf)

            # Позиция объекта на ленте (центр bbox по X)
            belt_pos = (bbox[0] + bbox[2]) / 2 / frame.shape[1]
            # Текущее расстояние до актуатора
            dist_to_actuator = (1 - belt_pos) * self.belt_speed * \
                                self.camera_to_actuator_dist

            # Время срабатывания
            trigger_time = frame_timestamp + dist_to_actuator / self.belt_speed

            grade = self._classify_grade(cls, conf)
            lane = self.grade_to_lane.get(grade, 0)

            if lane > 0:  # не Reject (Reject = ничего не делаем = на дефолтную линию)
                cmd = {
                    'trigger_time': trigger_time,
                    'lane': lane,
                    'grade': grade,
                    'class': cls,
                    'confidence': conf,
                    'inference_ms': inference_ms
                }
                self.actuator_queue.put(cmd)
                sorting_commands.append(cmd)

        return sorting_commands

    def _actuator_worker(self):
        """Поток, который отправляет команды актуатору в нужное время"""
        while True:
            cmd = self.actuator_queue.get()
            now = time.time()
            wait = cmd['trigger_time'] - now

            if wait > 0:
                time.sleep(wait)

            self._trigger_actuator(cmd['lane'])
            self.actuator_queue.task_done()

    def _trigger_actuator(self, lane: int):
        """Интерфейс с PLC/контроллером актуатора (Modbus/OPC-UA)"""
        # Конкретная реализация зависит от оборудования
        # self.plc.write_coil(lane, True)
        # time.sleep(0.1)
        # self.plc.write_coil(lane, False)
        pass

    def _classify_grade(self, defect_class: str,
                          confidence: float) -> str:
        critical = ['mold', 'rot', 'foreign_object']
        moderate = ['bruise', 'crack', 'large_scar']

        if defect_class in critical:
            return 'Reject'
        if defect_class in moderate and confidence > 0.6:
            return 'Juice'
        if defect_class in moderate:
            return 'Standard'
        return 'Premium'

Мультиспектральная сортировка

RGB-камеры не видят внутренние дефекты. Для орехов (плесень внутри) и некоторых фруктов используют NIR (near-infrared) или гиперспектральные камеры:

class NearIRSorter:
    """
    NIR (750–1100nm): проникает под кожуру, показывает внутренние дефекты.
    Гиперспектральная камера (400–1000nm): 100+ спектральных каналов.
    """
    def __init__(self, nir_model_path: str):
        # Модель обучена на NIR-изображениях
        self.model = torch.load(nir_model_path)

    def detect_internal_defect(self, nir_image: np.ndarray) -> dict:
        """
        Для орехов: афлатоксин (плесень) флуоресцирует в NIR.
        Для картофеля: внутренние потемнения видны в NIR.
        """
        tensor = self._preprocess(nir_image)
        with torch.no_grad():
            output = self.model(tensor)

        return {
            'has_internal_defect': bool(output.argmax() == 1),
            'defect_probability': float(torch.softmax(output, -1)[0][1])
        }

Performance сортировочных систем

Параметр Значение
Performance До 800 объектов/мин на поток
Latency (camera → actuator command) 15–40ms
Точность классификации 93–98% (зависит от продукта)
Минимальный размер объекта ~15мм @ 1m от камеры
Integration с PLC Modbus TCP/RTU, OPC-UA, Profinet

Калибровка и обслуживание

  • Еженедельная калибровка: прогон эталонных образцов каждого класса, проверка точности
  • Дрейф модели: загрязнение линзы, изменение освещения меняют распределение входных данных. Мониторинг через distribution shift detection
  • Онлайн дообучение: сложные случаи собираются автоматически, раз в месяц — дообучение модели на новых данных
Тип проекта Срок
Сортировщик одного продукта (2–3 категории) 4–7 недель
Мультипродуктовая линия + PLC интеграция 8–14 недель
Высокоскоростная линия (> 500 ед/мин) 10–18 недель