from enum import Enum from pathlib import Path import math import random import cv2 import numpy as np from PIL import Image from timer import Timer from vision_chunk import VisionChunk from visualization import VisualizationManager from geolocation import Geolocation random.seed(1) class Pilot: def __init__(self): pass def handle(self, image: Image): pass def act(self) -> tuple[float, float] | None: pass def get_position(self) -> tuple[float, float]: pass class PilotCommand: velocity: float dangle: float stop: bool proccessing_time: float def __init__(self, dangle: float = 0, velocity: float = 100, stop: bool = False, processing_time: float = 0. ): self.dangle = dangle self.stop = stop self.velocity = velocity self.proccessing_time = processing_time class AutoPilot(Pilot): # Состояние одометрии geo: Geolocation chunks: list[VisionChunk] # Ориентиры target_idx: int # Текущий ориентир # Всякие вспомогательные штуки timer: Timer prev_chunk: VisionChunk | None frame_count: int vis_manager: VisualizationManager # Менеджер визуализации (опционально) # Положение на основе ориентира reserved_geo: Geolocation | None def __init__(self, points = [], chunks = [], viz_manager=None): self.prev_chunk = None self.geo = Geolocation(0, 0, 1, 0) self.chunks = chunks self.frame_count = 0 self.vis_manager = viz_manager # Менеджер визуализации self.reserved_geo = None # Пороговые значения качества сопоставления/гомографии self.min_inliers: int = 12 self.min_inlier_ratio: float = 0.5 self.max_reproj_rmse: float = 3.0 self.min_scale: float = 0.7 self.max_scale: float = 1.4 self.points = points self.target_idx = 0 self.timer = Timer() def get_position(self) -> tuple[float, float]: return self.geo.x, self.geo.y def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk): """ Вычисляет оптический поток между двумя кадрами Возвращает src_pts и dst_pts в отцентрированных координатах """ prev_gray = prev_chunk.to_cv2_gray() current_gray = current_chunk.to_cv2_gray() h, w = prev_gray.shape[:2] cx, cy = w // 2, h // 2 # Создаем сетку точек для отслеживания (аналогично вашему step=20) step = 35 grid_points = [] for y in range(step, h - step, step): for x in range(step, w - step, step): grid_points.append([x, y]) if len(grid_points) < 4: return None, None p0 = np.float32(grid_points).reshape(-1, 1, 2) # Параметры для Lucas-Kanade lk_params = dict( winSize=(11, 11), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03) ) # Вычисляем разреженный оптический поток p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params) # Фильтруем только успешно отслеженные точки good_mask = err < 12.0 # порог ошибки good_old = p0[status == 1 & good_mask] good_new = p1[status == 1 & good_mask] if len(good_old) < 4: return None, None # Преобразуем в отцентрированные координаты (Y вверх) src_pts = np.float32([[x - cx, cy - y] for x, y in good_old]).reshape(-1, 1, 2) dst_pts = np.float32([[x - cx, cy - y] for x, y in good_new]).reshape(-1, 1, 2) return src_pts, dst_pts def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray): """ Оценивает матрицу трансформации на основе сопоставленных точек Точки уже отцентрированы относительно центра изображения """ # Используем RANSAC для оценки матрицы гомографии print("Count of points: ", len(src_pts), len(dst_pts)) H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0, maxIters=1000) # RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0) if H is None: return None # Маска инлайеров inliers = int(mask.sum()) if mask is not None else 0 total = int(mask.size) if mask is not None else 0 inlier_ratio = (inliers / total) if total > 0 else 0.0 # Ошибка репроекции по инлайерам (RMSE) rmse = None try: # Проецируем src через H и сравниваем с dst proj = cv2.perspectiveTransform(src_pts, H) errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1) if mask is not None and mask.shape[0] == errs.shape[0]: errs = errs[mask.ravel().astype(bool)] if errs.size > 0: rmse = float(np.sqrt(np.mean(errs ** 2))) except Exception: rmse = None # Извлекаем параметры трансформации из матрицы гомографии # H = [a11 a12 tx] # [a21 a22 ty] # [0 0 1 ] # Масштаб и поворот a11, a12 = H[0, 0], H[0, 1] a21, a22 = H[1, 0], H[1, 1] # Смещение (уже отцентрировано) tx, ty = -H[0, 2], -H[1, 2] # print(" [Pilot] translate:", tx, ty) # Вычисляем угол поворота angle = -np.arctan2(a21, a11) # Вычисляем масштаб scale_x = np.sqrt(a11**2 + a21**2) scale_y = np.sqrt(a12**2 + a22**2) scale = (scale_x + scale_y) / 2 return { 'translation': (tx, ty), # Уже отцентрировано 'rotation': angle, 'scale': scale, 'homography': H, 'mask': mask, 'inliers': inliers, 'inliers_ratio': inlier_ratio, 'rmse': rmse } def update_drone_position(self, transformation_info: dict): """ Обновляет позицию и угол БПЛА на основе трансформации изображения Координаты уже отцентрированы относительно центра изображения """ homography = transformation_info['homography'] self.geo @= homography if self.reserved_geo is not None: self.reserved_geo @= homography def get_drone_state(self) -> dict: """ Возвращает текущее состояние БПЛА """ return { 'x': self.geo.x, 'y': self.geo.y, 'angle': self.geo.yaw, 'angle_degrees': math.degrees(self.geo.yaw), 'frame_count': self.frame_count } def get_position_by_chunk(self) -> Geolocation | None: # Пытаемся найти ориентир на картинке: current_chunk = self.prev_chunk landmark_chunk = self.chunks[self.target_idx] src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk) if src_pts is not None and dst_pts is not None and self.vis_manager: was_enabled = self.timer.enabled if was_enabled: self.timer.stop() self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches) if was_enabled: self.timer.start() if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts) # Если ориентир достоверно найден — скорректируем глобальные координаты и угол if landmark_transform is not None: ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale) ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers) ratio = landmark_transform.get('inliers_ratio', 0.0) ok_ratio = (ratio >= self.min_inlier_ratio) rmse = landmark_transform.get('rmse', None) ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse) if ok_scale and ok_inliers and ok_ratio and ok_rmse: # print("[HELP]") # print("Matrix", landmark_transform['homography']) # print("Position", self.x, self.y) # print("Position of point", self.points[self.target_idx]) # print("[PILOT]", rmse, ratio, ok_rmse) # if False: # Считаем абсолютную позу относительно координат ориентира landmark_world_x, landmark_world_y = self.points[self.target_idx] landmark = Geolocation(landmark_world_x, landmark_world_y, 1, 0) homography = landmark_transform['homography'] # homography = np.linalg.inv(homography) print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})") return landmark @ homography return None def handle(self, current_chunk: VisionChunk) -> PilotCommand: self.timer.start() self.frame_count += 1 if self.prev_chunk is None: self.prev_chunk = current_chunk self.timer.stop() return PilotCommand(processing_time=self.timer.get_elapsed()) # Расстояние до цели distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.geo.x) ** 2 + (self.points[self.target_idx][1] - self.geo.y) ** 2 ) # Вычисляем оптический поток для покадрового сравнения optical_flow_timer = Timer() optical_flow_timer.start() src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk) optical_flow_timer.stop() print(f"Optical flow calculating: {optical_flow_timer.get_elapsed() * 1000:.2f} ms") # Оцениваем смещение БПЛА if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации matrix_estimation_timer = Timer() matrix_estimation_timer.start() transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts) matrix_estimation_timer.stop() print(f"Transformation matrix estimating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms") if transformation_info: # Обновляем позицию и угол БПЛА (одометрия по кадрам) self.update_drone_position(transformation_info) self.timer.stop() # Обновляем визуализацию if self.vis_manager: # Используем визуализацию движения точек по сетке homography_matrix = transformation_info['homography'] self.vis_manager.update_homography_grid( current_chunk.to_numpy(), homography_matrix, grid_step=85) self.timer.start() chunk_timer = Timer() chunk_timer.start() # Пытаемся найти ориентир на картинке: self.prev_chunk = current_chunk # npos = self.get_position_by_chunk() # if npos is not None: # self.reserved_geo = npos chunk_timer.stop() print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms") if distance_to_target < 35: self.target_idx += 1 if self.target_idx == len(self.points): return PilotCommand(stop=True) distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.geo.x) ** 2 + (self.points[self.target_idx][1] - self.geo.y) ** 2 ) if self.reserved_geo is not None: self.geo = self.reserved_geo self.reserved_geo = None # Вычисляем угол к цели target_angle = math.atan2(self.points[self.target_idx][1] - self.geo.y, self.points[self.target_idx][0] - self.geo.x) angle_trajectory = self.geo.yaw + math.pi / 2 # print("[ANGLE]", angle_trajectory, "->", target_angle) # Вычисляем разность углов (направление поворота) angle_diff = target_angle - angle_trajectory # Нормализуем разность углов в диапазон [-π, π] angle_diff %= 2 * math.pi if angle_diff >= math.pi: angle_diff -= 2 * math.pi d_r = max(10, min(35., distance_to_target / 2)) d_a_limit = d_r / 10 * 0.01 command = PilotCommand( max(min(d_a_limit, angle_diff), -d_a_limit), d_r, False, self.timer.get_elapsed() ) self.timer.reset() return command def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0): """Сбрасывает позицию и угол БПЛА""" self.geo = Geolocation(x, y, 1, angle)