from enum import Enum from pathlib import Path import math import random import cv2 import numpy as np from PIL import Image from timer import Timer from visualization import VisualizationManager random.seed(1) class Pilot: def __init__(self): pass def handle(self, image: Image): pass def act(self) -> tuple[float, float] | None: pass def get_position(self) -> tuple[float, float]: pass class PilotCommand: velocity: float dangle: float stop: bool proccessing_time: float def __init__(self, dangle: float = 0, velocity: float = 100, stop: bool = False, processing_time: float = 0. ): self.dangle = dangle self.stop = stop self.velocity = velocity self.proccessing_time = processing_time class AutoPilot(Pilot): # Состояние одометрии angle: float x: float # Координата X БПЛА y: float # Координата Y БПЛА # Ориентиры points: list[tuple[float,float]] # Координаты keypoints: list[any] # Ключевые точки ориентиров target_idx: int # Текущая целевая метка # Всякие вспомогательные штуки timer: Timer prev_image: np.ndarray | None orb_detector: cv2.ORB bf_matcher: cv2.BFMatcher frame_count: int image_center: tuple # Центр изображения (x, y) vis_manager: VisualizationManager # Менеджер визуализации (опционально) # Положение на основе ориентира reserved_pos: tuple[float, float, float] | None def __init__(self, points = [], keypoints = [], viz_manager=None): self.prev_image = None self.angle = 0.0 self.x = 0.0 self.y = 0.0 self.frame_count = 0 self.image_center = (0, 0) # Будет обновлено при получении первого изображения self.vis_manager = viz_manager # Менеджер визуализации self.reserved_pos = None # Пороговые значения качества сопоставления/гомографии self.min_inliers: int = 12 self.min_inlier_ratio: float = 0.5 self.max_reproj_rmse: float = 3.0 self.min_scale: float = 0.7 self.max_scale: float = 1.4 # Инициализация ORB детектора self.orb_detector = cv2.ORB_create( nfeatures=1000, scaleFactor=1.3, nlevels=4, edgeThreshold=19, patchSize=31, fastThreshold=15, scoreType=cv2.ORB_FAST_SCORE # FAST обычно быстрее ) # Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio) self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) self.points = points self.keypoints = keypoints self.target_idx = 0 self.timer = Timer() def get_position(self) -> tuple[float, float]: return self.x, self.y def image_to_numpy(self, image: Image.Image) -> np.ndarray: """Конвертирует PIL Image в numpy array для OpenCV""" return np.array(image) def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray): """ Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями """ # Предобработка (повышение контраста и устойчивости) def preprocess(gray_img: np.ndarray) -> np.ndarray: clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) return clahe.apply(gray_img) # В градации серого g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1 g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2 g1 = preprocess(g1) g2 = preprocess(g2) # Обнаружение ключевых точек и дескрипторов kp1, des1 = self.orb_detector.detectAndCompute(g1, None) kp2, des2 = self.orb_detector.detectAndCompute(g2, None) if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: return None, None, None, None, None # kNN сопоставление и тест Лоу matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2) good_matches: list[cv2.DMatch] = [] for m_n in matches_knn: if len(m_n) < 2: continue m, n = m_n if m.distance < 0.75 * n.distance: good_matches.append(m) # Дополнительная фильтрация по расстоянию (мягкий порог) good_matches = sorted(good_matches, key=lambda x: x.distance) good_matches = [m for m in good_matches if m.distance < 64] if len(good_matches) < 4: return None, None, None, None, None # Получаем центр изображения height1, width1 = g1.shape[:2] height2, width2 = g2.shape[:2] center_x1, center_y1 = width1 // 2, height1 // 2 center_x2, center_y2 = width2 // 2, height2 // 2 # Извлекаем координаты сопоставленных точек и отцентрируем их src_pts = [] dst_pts = [] for match in good_matches: # Координаты точки в первом изображении pt1 = kp1[match.queryIdx].pt src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]]) # Координаты точки во втором изображении pt2 = kp2[match.trainIdx].pt dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]]) # Конвертируем в numpy массивы src_pts = np.float32(src_pts).reshape(-1, 1, 2) dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) return src_pts, dst_pts, good_matches, kp1, kp2 def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray): """ Оценивает матрицу трансформации на основе сопоставленных ключевых точек Точки уже отцентрированы относительно центра изображения """ # Используем RANSAC для оценки матрицы гомографии H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 1.0) # RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0) if H is None: return None # Маска инлайеров inliers = int(mask.sum()) if mask is not None else 0 total = int(mask.size) if mask is not None else 0 inlier_ratio = (inliers / total) if total > 0 else 0.0 # Ошибка репроекции по инлайерам (RMSE) rmse = None try: # Проецируем src через H и сравниваем с dst proj = cv2.perspectiveTransform(src_pts, H) errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1) if mask is not None and mask.shape[0] == errs.shape[0]: errs = errs[mask.ravel().astype(bool)] if errs.size > 0: rmse = float(np.sqrt(np.mean(errs ** 2))) except Exception: rmse = None # Извлекаем параметры трансформации из матрицы гомографии # H = [a11 a12 tx] # [a21 a22 ty] # [0 0 1 ] # Масштаб и поворот a11, a12 = H[0, 0], H[0, 1] a21, a22 = H[1, 0], H[1, 1] # Смещение (уже отцентрировано) tx, ty = -H[0, 2], -H[1, 2] # print(" [Pilot] translate:", tx, ty) # Вычисляем угол поворота angle = -np.arctan2(a21, a11) # Вычисляем масштаб scale_x = np.sqrt(a11**2 + a21**2) scale_y = np.sqrt(a12**2 + a22**2) scale = (scale_x + scale_y) / 2 return { 'translation': (tx, ty), # Уже отцентрировано 'rotation': angle, 'scale': scale, 'homography': H, 'mask': mask, 'inliers': inliers, 'inliers_ratio': inlier_ratio, 'rmse': rmse } def calc_position( self, mat: np.ndarray, x_source: float, y_source: float, angle_source: float ) -> tuple[float, float, float] | None: """ Возвращает новые координаты и поворот на основе матрицы преобразования """ tx, ty = -mat[0, 2], -mat[1, 2] # Вычисляем угол поворота rotation = -np.arctan2(mat[1, 0], mat[0, 0]) # print("[HOMOGRAPHY]", mat) # print("[ROTATION]", rotation) # Координаты уже отцентрированы, поэтому используем их напрямую dx_meters = tx dy_meters = ty angle_global = angle_source + rotation # Применяем поворот к смещению (учитываем текущий угол БПЛА) cos_angle = math.cos(angle_global) sin_angle = math.sin(angle_global) # Поворачиваем смещение в глобальные координаты # Обратите внимание: dy_meters инвертирован, так как в изображениях Y направлен вниз dx_global = dx_meters * cos_angle - dy_meters * sin_angle dy_global = dx_meters * sin_angle + dy_meters * cos_angle # Обновляем координаты БПЛА x = x_source + dx_global y = y_source + dy_global # Нормализуем угол в диапазоне [-π, π] angle = math.atan2(math.sin(angle_global), math.cos(angle_global)) return x, y, angle def update_drone_position(self, transformation_info: dict): """ Обновляет позицию и угол БПЛА на основе трансформации изображения Координаты уже отцентрированы относительно центра изображения """ homography = transformation_info['homography'] self.x, self.y, self.angle = self.calc_position( homography, self.x, self.y, self.angle ) if self.reserved_pos is not None: self.reserved_pos = self.calc_position( homography, *self.reserved_pos ) def get_drone_state(self) -> dict: """ Возвращает текущее состояние БПЛА """ return { 'x': self.x, 'y': self.y, 'angle': self.angle, 'angle_degrees': math.degrees(self.angle), 'frame_count': self.frame_count } def get_position_by_chunk(self) -> tuple[float, float, float] | None: # Пытаемся найти ориентир на картинке: current_image = self.prev_image landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB) src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(landmark_image, current_image) if src_pts is not None and dst_pts is not None and self.vis_manager: was_enabled = self.timer.enabled if was_enabled: self.timer.stop() self.vis_manager.update_chunk_matches(landmark_image, current_image, kp1, kp2, matches) if was_enabled: self.timer.start() if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts) # Если ориентир достоверно найден — скорректируем глобальные координаты и угол if landmark_transform is not None: ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale) ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers) ratio = landmark_transform.get('inliers_ratio', 0.0) ok_ratio = (ratio >= self.min_inlier_ratio) rmse = landmark_transform.get('rmse', None) ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse) if ok_scale and ok_inliers and ok_ratio and ok_rmse: # print("[HELP]") # print("Matrix", landmark_transform['homography']) # print("Position", self.x, self.y) # print("Position of point", self.points[self.target_idx]) # print("[PILOT]", rmse, ratio, ok_rmse) # if False: # Считаем абсолютную позу относительно координат ориентира landmark_world_x, landmark_world_y = self.points[self.target_idx] landmark_angle = 0 homography = landmark_transform['homography'] # homography = np.linalg.inv(homography) # print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})") return self.calc_position(homography, landmark_world_x, landmark_world_y, landmark_angle) return None def handle(self, image: Image) -> PilotCommand: self.timer.start() self.frame_count += 1 if self.prev_image is None: self.prev_image = self.image_to_numpy(image) # Вычисляем центр изображения height, width = self.prev_image.shape[:2] self.image_center = (width // 2, height // 2) self.timer.stop() return PilotCommand(processing_time=self.timer.get_elapsed()) # Конвертируем текущее изображение current_image = self.image_to_numpy(image) # Обновляем центр изображения height, width = current_image.shape[:2] self.image_center = (width // 2, height // 2) # Расстояние до цели distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.x) ** 2 + (self.points[self.target_idx][1] - self.y) ** 2 ) # Обнаруживаем и сопоставляем ключевые точки src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image) # Оцениваем смещение БПЛА if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts) if transformation_info: # Обновляем позицию и угол БПЛА (одометрия по кадрам) self.update_drone_position(transformation_info) self.timer.stop() # Выводим текущее состояние БПЛА drone_state = self.get_drone_state() # print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})") # print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°") # print(f" [Pilot] Target Index: {self.target_idx}") # print(f" [Pilot] Target Position: {self.points[self.target_idx]}") # print(f" [Pilot] Distance: {distance_to_target}") # Обновляем визуализацию if self.vis_manager: # Обновляем сопоставление точек self.vis_manager.update_matches( self.prev_image, current_image, kp1, kp2, matches, transformation_info) mask = transformation_info['mask'] self.vis_manager.update_motion_vectors( current_image, kp1, kp2, np.array(matches)[mask.ravel().astype(bool)]) # Используем новую визуализацию движения точек по сетке homography_matrix = transformation_info['homography'] self.vis_manager.update_homography_grid( current_image, homography_matrix, grid_step=85) self.timer.start() # Пытаемся найти ориентир на картинке: self.prev_image = current_image npos = self.get_position_by_chunk() if npos is not None: self.reserved_pos = npos if distance_to_target < 35: self.target_idx += 1 if self.target_idx == len(self.points): return PilotCommand(stop=True) distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.x) ** 2 + (self.points[self.target_idx][1] - self.y) ** 2 ) if self.reserved_pos is not None: self.x, self.y, self.angle = self.reserved_pos self.reserved_pos = None # Вычисляем угол к цели target_angle = math.atan2(self.points[self.target_idx][1] - self.y, self.points[self.target_idx][0] - self.x) angle_trajectory = self.angle + math.pi / 2 # print("[ANGLE]", angle_trajectory, "->", target_angle) # Вычисляем разность углов (направление поворота) angle_diff = target_angle - angle_trajectory # Нормализуем разность углов в диапазон [-π, π] angle_diff %= 2 * math.pi if angle_diff >= math.pi: angle_diff -= 2 * math.pi d_r = max(10, min(75., distance_to_target / 2)) d_a_limit = d_r / 10 * 0.07 command = PilotCommand( max(min(d_a_limit, angle_diff), -d_a_limit), d_r, False, self.timer.get_elapsed() ) self.timer.reset() return command def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0): """Сбрасывает позицию и угол БПЛА""" self.x = x self.y = y self.angle = angle self.frame_count = 0