自动驾驶算法开发
01-红外循迹
02-红外循迹+超声波避障
03-红外循迹+超声波避障+交通标志识别(USB)
04-车道线检测(USB)
05-车道线检测+交通标志识别(USB)
06-命令
07-连接指南
08-竞赛文档
本文档使用 MrDoc 发布
-
+
首页
05-车道线检测+交通标志识别(USB)
""" 自动驾驶车道线检测与交通标识识别系统 功能: 1. 使用LaneNet模型进行车道线检测和车道保持 2. 使用YOLOv5模型识别交通标识(减速、转弯、红绿灯等) 3. PID控制器实现车辆转向控制 4. 语音提示系统 """ # 导入系统和第三方库 import time import os import sys import json import cv2 as cv import numpy as np from cap import VideoCapture # 摄像头捕获模块 from tpu_factory import TPUFactory # TPU加速模块 from bot.utils import Voice # 语音模块 from bot.ugv import Motor,PID # 电机控制和PID控制器 from tool import has_digit_input, global_exception_handler # 工具函数 sys.excepthook = global_exception_handler # 设置全局异常处理钩子 # =============================== 全局配置参数 =============================== class Config: # 车速控制相关参数 SPEED_NORMAL = 3500 # 正常行驶时的车速设定 SPEED_DECELERATE = 2500 # 减速行驶时的车速设定 TURN_MAX = 1500 # 最大转向速度差,用于转弯控制 STRAIGHT_MAX = 1000 # 直行时允许的最大转向速度差 # PID控制器参数(比例、积分、微分系数) PID_P = 10.0 PID_I = 0.0 PID_D = 0.0 # 车道中心点设定值,根据不同环岛行驶方向设置不同的目标点 SET_POINTS = { "Clockwise_outer_ring": 140, # 顺时针外环 "Counterclockwise_outer_ring": 560, # 逆时针外环 "Counterclockwise_inner_ring": 520, # 逆时针内环 "Clockwise_inner_ring": 180 # 顺时针内环 } # YOLO目标检测参数 IOU_THRESH = 0.6 # 交并比阈值,用于非极大值抑制 CONF_THRESH = 0.9 # 置信度阈值,过滤低置信度检测结果 IMG_SIZE = [416, 416] # 模型输入图像尺寸 # 交通标识类别定义 CLASSES = ["decelerate", "turn", "whistle", "stop", "red", "yellow", "green"] # 摄像头参数 ROI_HEIGHT = 300 # 车道线检测感兴趣区域的高度,减少处理区域 # =============================== 车道线跟踪模块 =============================== class LaneFollower: def __init__(self, pid_p, pid_i, pid_d): self.pid = PID(pid_p, pid_i, pid_d) # 初始化PID控制器 self.flag_turn = 0 # 转弯标志位(0:直行,1:右转,2:左转) self.straight_count = 0 # 直行状态计数器 self.current_direction = "Clockwise_outer_ring" # 当前行驶方向 self.current_speed = Config.SPEED_NORMAL # 当前车速初始化为正常速度 def follow_lane(self, image, tpu_lanenet): """ 车道线跟踪主函数,实现完整的车道保持控制流程 :param image: 输入图像帧 :param tpu_lanenet: LaneNet模型实例 :return: 电机控制指令(左轮速度, 右轮速度) """ # Step 1: 使用LaneNet模型预测车道线 output_img, lanes_points = tpu_lanenet.predict(image) # Step 2: 从模型输出中提取有效车道线特征点 lane_points = self._extract_lane_points(lanes_points) # Step 3: 处理车道线丢失的情况 if not lane_points['valid']: return self._handle_no_lane_detected() # Step 4: 根据检测到的车道线更新行驶方向判断 self._update_direction(lane_points) # Step 5: 计算转向控制量,基于PID算法 speed_diff = self._calculate_steering(lane_points) # Step 6: 生成电机控制指令,转化为左右轮速度 return self._generate_motor_command(speed_diff) def _extract_lane_points(self, lanes_points): """提取并处理有效的车道线特征点,过滤无效点""" points = { 'max_x0': -float('inf'), 'min_x0': float('inf'), 'max_x1': -float('inf'), 'min_x1': float('inf'), 'valid': False } # 处理模型输出为空的情况 if lanes_points is None: return points # 处理第一条车道线的特征点 if len(lanes_points) > 0: for x, y in lanes_points[0]: if y > Config.ROI_HEIGHT: # 只处理感兴趣区域内的点 if x > points['max_x0']: points['max_x0'] = x points['max_x0_y'] = y if x < points['min_x0']: points['min_x0'] = x points['min_x0_y'] = y # 处理第二条车道线的特征点 if len(lanes_points) > 1: for x, y in lanes_points[1]: if y > Config.ROI_HEIGHT: # 只处理感兴趣区域内的点 if x > points['max_x1']: points['max_x1'] = x points['max_x1_y'] = y if x < points['min_x1']: points['min_x1'] = x points['min_x1_y'] = y # 判断是否检测到有效车道点 points['valid'] = any(val != float('inf') and val != -float('inf') for key, val in points.items() if key.startswith(('max', 'min'))) return points def _handle_no_lane_detected(self): """处理未检测到车道线时的应急控制逻辑""" self.straight_count = 0 # 重置直行计数器 # 根据当前转弯状态执行不同的应急动作 if self.flag_turn == 1: Control_Motor.Rotate_Right(2000) # 执行右转动作 return 2000, -2000 # 返回右转速度指令 elif self.flag_turn == 2: Control_Motor.Rotate_Left(2000) # 执行左转动作 return -2000, 2000 # 返回左转速度指令 else: Control_Motor.GS_run(0, 0) # 执行停车动作 return 0, 0 # 返回零速度指令 def _update_direction(self, points): """根据检测到的车道线特征更新车辆行驶方向""" self.straight_count += 1 # 直行计数器加1 # 如果处于转弯状态且直行计数器超过阈值,重置转弯标志 if self.flag_turn and self.straight_count > 5: self.flag_turn = 0 self.straight_count = 0 # 根据车道线特征点计算斜率,判断行驶方向 if points['max_x0'] == -float('inf') and points['min_x1'] != float('inf'): if points['max_x1'] != points['min_x1']: k = (points['max_x1_y'] - points['min_x1_y']) / (points['max_x1'] - points['min_x1']) self.current_direction = "Counterclockwise_inner_ring" if k > 0 else "Clockwise_inner_ring" elif points['max_x0'] != -float('inf') and points['min_x1'] != float('inf'): if points['max_x0'] != points['min_x0']: k = (points['max_x0_y'] - points['min_x0_y']) / (points['max_x0'] - points['min_x0']) self.current_direction = "Counterclockwise_outer_ring" if k > 0 else "Clockwise_outer_ring" def _calculate_steering(self, points): """基于PID算法计算车辆转向控制量""" target_point = Config.SET_POINTS[self.current_direction] # 获取当前方向的目标点 current_point = 0 # 初始化当前点位置 threshold = 60 # 触发转弯的阈值 # 根据不同行驶方向选择参考点 if self.current_direction == "Clockwise_outer_ring": current_point = points['max_x0'] if abs(current_point - target_point) > threshold: self.flag_turn = 1 # 设置右转标志 self.straight_count = 0 elif self.current_direction == "Counterclockwise_inner_ring": current_point = points['min_x1'] if abs(current_point - target_point) > threshold: self.flag_turn = 2 # 设置左转标志 self.straight_count = 0 elif self.current_direction == "Counterclockwise_outer_ring": current_point = points['min_x0'] if abs(current_point - target_point) > threshold: self.flag_turn = 2 # 设置右转标志 self.straight_count = 0 elif self.current_direction == "Clockwise_inner_ring": current_point = points['max_x1'] if abs(current_point - target_point) > threshold: self.flag_turn = 1 # 设置左转标志 self.straight_count = 0 # 更新PID控制器设定点并计算输出 self.pid.SetPoint = target_point self.pid.update(current_point) return self.pid.output # 返回PID计算的转向控制量 def _generate_motor_command(self, speed_diff): """根据转向控制量生成电机控制命令""" # 根据是否转弯选择不同的最大转向速度差 turn_max = Config.TURN_MAX if self.flag_turn else Config.STRAIGHT_MAX # 计算左右轮速度 speed_L = self.current_speed - max(-turn_max, min(turn_max, speed_diff)) speed_R = self.current_speed + max(-turn_max, min(turn_max, speed_diff)) # 对速度进行限幅处理,确保在有效范围内 speed_L = np.clip(speed_L, -4000, 4000) speed_R = np.clip(speed_R, -4000, 4000) return int(speed_L), int(speed_R) # 返回整数类型的左右轮速度 # =============================== 交通标识处理模块 =============================== class TrafficSignProcessor: def __init__(self, classes): self.classes = classes # 交通标识类别列表 self.current_sign = -1 # 当前识别到的交通标识ID,-1表示未识别 self.last_sign_time = 0 # 上次识别到标识的时间戳 self.sign_thresholds = { # 不同标识的处理时间阈值(秒) "whistle": 0.5, # 鸣笛标识 "turn": 0.5, # 转弯标识 "stop": 0.5, # 停止标识 "light": 1.0 # 红绿灯标识 } def process_detection(self, detections): """处理YOLO模型的目标检测结果""" if not detections: # 如果没有检测结果,直接返回 return for obj in detections: # 遍历每个检测到的目标 x, y, w, h, conf, cls_id = obj # 解析目标信息:坐标、宽高、置信度、类别ID area = w * h # 计算目标面积 # 过滤无效检测:y坐标在有效范围内且面积在合理区间 if not (0 < y < 480 and 1000 < area < 3000): continue self._update_sign_state(cls_id) # 更新交通标识状态 def _update_sign_state(self, sign_id): """根据新检测到的标识更新系统状态""" current_time = time.time() # 获取当前时间 sign_name = self.classes[sign_id] # 获取标识名称 # 处理新检测到标识的情况 if self.current_sign == -1: self._handle_new_sign(sign_id, sign_name) # 特殊处理红绿灯标识 elif sign_name in ["red", "yellow", "green"] and self.classes[self.current_sign] in ["red", "yellow"]: self.current_sign = sign_id # 更新为新的红绿灯状态 # 更新最后检测时间 self.last_sign_time = current_time def _handle_new_sign(self, sign_id, sign_name): """处理新检测到的交通标识,触发相应操作""" self.current_sign = sign_id # 更新当前标识ID self.last_sign_time = time.time() # 更新最后检测时间 # 语音提示相应的交通标识 if sign_name == "decelerate": voice_.send(1) # 减速提示 elif sign_name == "turn": voice_.send(2) # 转弯提示 def execute_sign_action(self, lane_follower): """执行交通标识对应的动作""" if self.current_sign == -1: # 如果没有识别到标识,直接返回 return False sign_name = self.classes[self.current_sign] # 获取标识名称 elapsed = time.time() - self.last_sign_time # 计算距离上次识别的时间 threshold = self.sign_thresholds.get(sign_name, 0) # 获取该标识的处理阈值 # 检查是否达到处理时间阈值 if elapsed < threshold: return False # 根据标识类型执行相应动作 if sign_name == "whistle": voice_.send(3) # 触发鸣笛提示 self.current_sign = -1 # 重置当前标识 return False elif sign_name == "turn": self._execute_turn(lane_follower) # 执行转弯动作 self.current_sign = -1 # 重置当前标识 return False elif sign_name == "stop": return True # 触发停车动作 elif sign_name in ["decelerate", "red", "yellow", "green"]: if elapsed > self.sign_thresholds["light"]: self.current_sign = -1 # 超过阈值后重置标识 return False return False def _execute_turn(self, lane_follower): """执行转弯动作,更新车辆行驶方向""" direction = lane_follower.current_direction # 获取当前行驶方向 if direction in ["Clockwise_outer_ring", "Counterclockwise_inner_ring"]: Control_Motor.Turn_Right(3600, 1000, 3) # 执行右转动作 lane_follower.flag_turn = 1 # 设置右转标志 lane_follower.straight_count = 0 # 重置直行计数器 # 更新行驶方向 if direction == "Clockwise_outer_ring": lane_follower.current_direction = "Counterclockwise_inner_ring" else: lane_follower.current_direction = "Clockwise_outer_ring" else: Control_Motor.Turn_Left(1000, 3600, 3) # 执行左转动作 lane_follower.flag_turn = 2 # 设置左转标志 # 更新行驶方向 if direction == "Clockwise_inner_ring": lane_follower.current_direction = "Counterclockwise_outer_ring" else: lane_follower.current_direction = "Clockwise_outer_ring" # =============================== 主程序 =============================== def main(): global voice_, Control_Motor # 声明全局变量 # 系统初始化流程 try: # 获取当前脚本路径和应用名称 pwd = os.path.dirname(os.path.abspath(__file__)) print(f"当前应用路径: {pwd}") # 初始化语音系统 voice_ = Voice() # 初始化TPU加速的深度学习模型 tpu_yolo = _init_tpu("yolov5", pwd) # 初始化YOLOv5模型 tpu_lanenet = _init_tpu("lanenet", pwd) # 初始化LaneNet模型 # 初始化摄像头 cap = _init_camera() # 初始化车道跟踪器 lane_follower = LaneFollower(Config.PID_P, Config.PID_I, Config.PID_D) # 初始化交通标识处理器 sign_processor = TrafficSignProcessor(Config.CLASSES) # 初始化电机控制 Control_Motor = Motor(0, 0, 0, 0) # 云台初始化 Control_Motor.servo_ptz(0, 0) # 语音提示系统初始化成功 voice_.send(111) # 进入主控制循环 _main_loop(cap, tpu_yolo, tpu_lanenet, lane_follower, sign_processor) except Exception as e: print(f"系统异常: {str(e)}") voice_.send(250) # 播放异常提示音 finally: _cleanup_resources(cap, tpu_yolo, tpu_lanenet) # 清理系统资源 def _init_tpu(algorithm, pwd): """初始化TPU加速的深度学习模型""" try: tpu = TPUFactory(algorithm=algorithm) # 创建TPU工厂实例 except Exception as e: voice_.send(242) # 播放库导入异常提示音 print(f"导入库异常, error: {e}") time.sleep(3) sys.exit(1) if not tpu.link(): # 尝试连接TPU voice_.send(242) # 播放TPU连接失败提示音 print("TPU连接失败") time.sleep(3) sys.exit(1) model_path = os.path.join(os.path.join(pwd, "model"), algorithm) # 构建模型路径 tpu.model_init(model_path, host=True) # 初始化模型 return tpu # 返回初始化后的TPU实例 def _init_camera(): """初始化摄像头设备""" try: return VideoCapture() # 创建摄像头捕获实例 except Exception as e: voice_.send(248) # 播放摄像头异常提示音 print(f"摄像头初始化失败: {str(e)}") time.sleep(5) sys.exit(1) def _main_loop(cap, tpu_yolo, tpu_lanenet, lane_follower, sign_processor): """系统主控制循环,处理图像、检测和控制""" while True: # 无限循环,直到退出条件满足 try: # 从摄像头读取图像帧 frame = cap.read() if frame is None: # 处理读取失败的情况 voice_.send(248) # 播放摄像头异常提示音 time.sleep(1) continue # 处理交通灯状态,必要时停车 if sign_processor.current_sign != -1 and \ Config.CLASSES[sign_processor.current_sign] in ["red", "yellow"]: Control_Motor.GS_run(0, 0, 0, 0) # 执行停车动作 else: # 根据转弯状态或减速标识更新车速 lane_follower.current_speed = Config.SPEED_DECELERATE if ( lane_follower.flag_turn or (sign_processor.current_sign != -1 and Config.CLASSES[sign_processor.current_sign] == "decelerate") ) else Config.SPEED_NORMAL # 执行车道跟踪,获取电机控制指令 speed_L, speed_R = lane_follower.follow_lane(frame, tpu_lanenet) Control_Motor.GS_run(speed_L, speed_R, speed_L, speed_R) # 控制电机运行 # 执行目标检测,获取交通标识 detections = tpu_yolo.predict(frame, Config.IMG_SIZE, Config.CONF_THRESH, Config.IOU_THRESH) sign_processor.process_detection(detections) # 处理检测结果 # 执行交通标识对应的动作 if sign_processor.execute_sign_action(lane_follower): break # 检测到停止标志时退出循环 # 检查是否有退出指令 if has_digit_input() == 'q': break except KeyboardInterrupt: # 处理键盘中断 break def _cleanup_resources(cap, tpu_yolo, tpu_lanenet): """系统资源清理函数,确保安全退出""" voice_.send(4) # 播放结束提示音 Control_Motor.close() # 关闭电机控制 tpu_yolo.release() # 释放YOLO模型资源 tpu_lanenet.release() # 释放LaneNet模型资源 voice_.close() # 关闭语音系统 cap.release() # 释放摄像头资源 print("系统安全关闭") if __name__ == "__main__": main() # 程序入口 [【附件】lanenet_model.zip](/media/attachment/2025/05/lanenet_model.zip) [【附件】model.zip](/media/attachment/2025/05/model.zip)
gdsoke
2025年6月26日 10:57
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码