from enum import Enum from pathlib import Path import math import random import cv2 import numpy as np from PIL import Image from timer import Timer from vision_chunk import VisionChunk from visualization import VisualizationManager from position import Position from constants import CAMERA_HEIGHT from utility import estimate_transformation_matrix random.seed(1) class Pilot: def __init__(self): pass def handle(self, image: Image): pass def act(self) -> tuple[float, float] | None: pass def get_position(self) -> tuple[float, float]: pass class PilotCommand: velocity: float dangle: float stop: bool proccessing_time: float def __init__(self, dangle: float = 0, velocity: float = 100, stop: bool = False, processing_time: float = 0. ): self.dangle = dangle self.stop = stop self.velocity = velocity self.proccessing_time = processing_time class AutoPilot(Pilot): # Состояние одометрии pos: Position chunks: list[VisionChunk] # Ориентиры target_idx: int # Текущий ориентир # Всякие вспомогательные штуки timer: Timer prev_chunk: VisionChunk | None frame_count: int vis_manager: VisualizationManager # Менеджер визуализации (опционально) # Положение на основе ориентира reserved_pos: Position | None proccessing_time: float def __init__(self, points = [], chunks = [], viz_manager=None, pixel_ratio: float = 1.): self.prev_chunk = None self.pos = Position(0, 0, 1, 0, 0, 0) self.chunks = chunks self.frame_count = 0 self.vis_manager = viz_manager # Менеджер визуализации self.reserved_pos = None self.pixel_ratio = pixel_ratio # Пороговые значения качества сопоставления/гомографии self.min_inliers: int = 12 self.min_inlier_ratio: float = 0.5 self.max_reproj_rmse: float = 3.0 self.min_scale: float = 0.7 self.max_scale: float = 1.4 self.points = points self.target_idx = 0 self.timer = Timer() self.chunk_points = np.array([[chunk.pos.x, chunk.pos.y] for chunk in self.chunks]) def get_position(self) -> tuple[float, float]: return self.pos.x, self.pos.y def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk): """ Вычисляет оптический поток между двумя кадрами Возвращает src_pts и dst_pts в отцентрированных координатах """ prev_gray = prev_chunk.to_cv2_gray() current_gray = current_chunk.to_cv2_gray() h, w = prev_gray.shape[:2] # Создаем сетку точек для отслеживания (аналогично вашему step=20) step = 35 grid_points = [] for y in range(step, h - step, step): for x in range(step, w - step, step): grid_points.append([x, y]) if len(grid_points) < 4: return None, None p0 = np.float32(grid_points).reshape(-1, 1, 2) # Параметры для Lucas-Kanade lk_params = dict( winSize=(11, 11), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03) ) # Вычисляем разреженный оптический поток p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params) # Фильтруем только успешно отслеженные точки good_mask = err < 12.0 # порог ошибки good_old = p0[status == 1 & good_mask] good_new = p1[status == 1 & good_mask] if len(good_old) < 4: return None, None # Преобразуем в отцентрированные координаты (Y вверх) src_pts = np.float32([[x, y] for x, y in good_old]).reshape(-1, 1, 2) dst_pts = np.float32([[x, y] for x, y in good_new]).reshape(-1, 1, 2) return src_pts, dst_pts def update_drone_position(self, homography_matrix: np.ndarray): """ Обновляет позицию и угол БПЛА на основе трансформации изображения Координаты уже отцентрированы относительно центра изображения """ self.pos.iapply(homography_matrix) def get_drone_state(self) -> dict: """ Возвращает текущее состояние БПЛА """ return { 'x': self.pos.x, 'y': self.pos.y, 'angle': self.pos.yaw, 'angle_degrees': math.degrees(self.pos.yaw), 'frame_count': self.frame_count } def get_position_by_chunk(self) -> Position | None: cur_pos = np.array([self.pos.x, self.pos.y]) closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin() current_chunk = self.prev_chunk landmark_chunk = self.chunks[closest_chunk_idx] # Краевой случай: отсутствие чанков if current_chunk is None or landmark_chunk is None: return None src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk) # Визуализация (если нужна) if src_pts is not None and dst_pts is not None and self.vis_manager: was_enabled = self.timer.enabled if was_enabled: self.timer.stop() self.vis_manager.update_chunk_matches( landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches ) if was_enabled: self.timer.start() # Краевой случай: нет точек или недостаточно матчей if src_pts is None or dst_pts is None: return None num_matches = len(src_pts) if num_matches < 20: return None # Оценка матрицы гомографии landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts) num_inliers = int(np.sum(mask)) # Краевой случай: матрица не найдена if landmark_transform is None or mask is None: return None # === КРИТЕРИИ ПРИНЯТИЯ РЕШЕНИЯ === # 1. Минимальное количество инлайеров (абсолютное) MIN_INLIERS_ABSOLUTE = 6 if num_inliers < MIN_INLIERS_ABSOLUTE: return None # 2. Процент инлайеров от общего числа матчей inlier_ratio = num_inliers / num_matches MIN_INLIER_RATIO = 0.25 # Минимум 25% инлайеров if inlier_ratio < MIN_INLIER_RATIO: return None # 3. Проверка качества гомографии (детерминант для выявления вырожденных случаев) det = np.linalg.det(landmark_transform[:2, :2]) # Детерминант должен быть близок к 1 (без сильного масштабирования) if abs(det) < 0.1 or abs(det) > 10.0: return None # 4. Проверка на валидность перспективного преобразования # Элементы третьей строки не должны быть слишком большими if abs(landmark_transform[2, 0]) > 0.01 or abs(landmark_transform[2, 1]) > 0.01: return None # 5. Дополнительная проверка: средняя ошибка репроекции для инлайеров inlier_src = src_pts[mask.ravel() == 1] inlier_dst = dst_pts[mask.ravel() == 1] # Преобразуем точки через найденную гомографию transformed_pts = cv2.perspectiveTransform(inlier_src, landmark_transform) # Вычисляем ошибку репроекции reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2)) mean_error = np.mean(reprojection_errors) MAX_MEAN_REPROJECTION_ERROR = 1.0 # пиксели if mean_error > MAX_MEAN_REPROJECTION_ERROR: return None # 6. Проверка стабильности: если слишком много хороших совпадений, но мало инлайеров - подозрительно if num_matches > 50 and inlier_ratio < 0.15: return None # === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ === print("[INFO]: Landmark Chunk Correction Applied") return landmark_chunk.pos.apply(landmark_transform) def handle(self, current_chunk: VisionChunk) -> PilotCommand: self.timer.start() self.frame_count += 1 if self.prev_chunk is None: self.prev_chunk = current_chunk self.timer.stop() return PilotCommand(processing_time=self.timer.get_elapsed()) # Вычисляем оптический поток для покадрового сравнения matching_timer = Timer() matching_timer.start() src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk) # src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk) matching_timer.stop() print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms") # Оцениваем смещение БПЛА if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации matrix_estimation_timer = Timer() matrix_estimation_timer.start() homography_matrix, _ = estimate_transformation_matrix(src_pts, dst_pts) matrix_estimation_timer.stop() print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms") if homography_matrix is not None: self.update_drone_position(homography_matrix) self.timer.stop() # Обновляем визуализацию if self.vis_manager: # Используем визуализацию движения точек по сетке self.vis_manager.update_homography_grid( current_chunk.to_numpy(), homography_matrix, grid_step=35) self.timer.start() chunk_timer = Timer() chunk_timer.start() # Пытаемся найти ориентир на картинке: self.prev_chunk = current_chunk pos_by_chunk = self.get_position_by_chunk() if pos_by_chunk is not None: self.pos = pos_by_chunk chunk_timer.stop() print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms") command = self.make_command() self.timer.reset() return command def make_command(self) -> PilotCommand: # Расстояние до цели distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.pos.x) ** 2 + (self.points[self.target_idx][1] - self.pos.y) ** 2 ) if distance_to_target < 35: self.target_idx += 1 if self.target_idx == len(self.points): return PilotCommand(stop=True) distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.pos.x) ** 2 + (self.points[self.target_idx][1] - self.pos.y) ** 2 ) # Вычисляем угол к цели target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x) angle_trajectory = self.pos.yaw + math.pi / 2 # Вычисляем разность углов (направление поворота) angle_diff = target_angle - angle_trajectory # Нормализуем разность углов в диапазон [-π, π] angle_diff %= 2 * math.pi if angle_diff >= math.pi: angle_diff -= 2 * math.pi d_r = max(5, min(10., distance_to_target / 2 * self.pixel_ratio)) d_a_limit = np.radians(5) command = PilotCommand( max(min(d_a_limit, angle_diff), -d_a_limit), d_r, False, self.timer.get_elapsed() ) return command def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0): """Сбрасывает позицию и угол БПЛА""" self.pos = Position(x, y, 1, angle)