from enum import Enum from pathlib import Path import math import random import cv2 import numpy as np from PIL import Image from visualization import VisualizationManager random.seed(1) class Pilot: def __init__(self): pass def handle(self, image: Image): pass def act(self) -> tuple[float, float] | None: pass def get_position(self) -> tuple[float, float]: pass class PilotCommand: velocity: float dangle: float stop: bool def __init__(self, dangle: float = 0, velocity: float = 100, stop: bool = False): self.dangle = dangle self.stop = stop self.velocity = velocity MOVE_KOF: float = 1 / 1.2 class AutoPilot(Pilot): # Состояние одометрии angle: float x: float # Координата X БПЛА y: float # Координата Y БПЛА # Ориентиры points: list[tuple[float,float]] # Координаты keypoints: list[any] # Ключевые точки ориентиров target_idx: int # Текущая целевая метка # Всякие вспомогательные штуки prev_image: np.ndarray | None orb_detector: cv2.ORB bf_matcher: cv2.BFMatcher frame_count: int image_center: tuple # Центр изображения (x, y) vis_manager: VisualizationManager # Менеджер визуализации (опционально) def __init__(self, points = [], keypoints = [], viz_manager=None): self.prev_image = None self.angle = 0.0 self.x = 0.0 self.y = 0.0 self.frame_count = 0 self.image_center = (0, 0) # Будет обновлено при получении первого изображения self.vis_manager = viz_manager # Менеджер визуализации # Пороговые значения качества сопоставления/гомографии self.min_inliers: int = 12 self.min_inlier_ratio: float = 0.5 self.max_reproj_rmse: float = 3.0 self.min_scale: float = 0.7 self.max_scale: float = 1.4 # Инициализация ORB детектора self.orb_detector = cv2.ORB_create( nfeatures=3000, scaleFactor=1.2, nlevels=8, edgeThreshold=15, firstLevel=0, WTA_K=2, patchSize=31, fastThreshold=8, scoreType=cv2.ORB_HARRIS_SCORE ) # Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio) self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) self.points = points self.keypoints = keypoints self.target_idx = 0 def get_position(self) -> tuple[float, float]: return self.x, self.y def image_to_numpy(self, image: Image.Image) -> np.ndarray: """Конвертирует PIL Image в numpy array для OpenCV""" return np.array(image) def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray): """ Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями """ # Предобработка (повышение контраста и устойчивости) def preprocess(gray_img: np.ndarray) -> np.ndarray: clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) return clahe.apply(gray_img) # В градации серого g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1 g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2 g1 = preprocess(g1) g2 = preprocess(g2) # Обнаружение ключевых точек и дескрипторов kp1, des1 = self.orb_detector.detectAndCompute(g1, None) kp2, des2 = self.orb_detector.detectAndCompute(g2, None) if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: return None, None, None, None, None # kNN сопоставление и тест Лоу matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2) good_matches: list[cv2.DMatch] = [] for m_n in matches_knn: if len(m_n) < 2: continue m, n = m_n if m.distance < 0.75 * n.distance: good_matches.append(m) # Дополнительная фильтрация по расстоянию (мягкий порог) good_matches = sorted(good_matches, key=lambda x: x.distance) good_matches = [m for m in good_matches if m.distance < 64] if len(good_matches) < 4: return None, None, None, None, None # Получаем центр изображения height1, width1 = g1.shape[:2] height2, width2 = g2.shape[:2] center_x1, center_y1 = width1 // 2, height1 // 2 center_x2, center_y2 = width2 // 2, height2 // 2 # Извлекаем координаты сопоставленных точек и отцентрируем их src_pts = [] dst_pts = [] for match in good_matches: # Координаты точки в первом изображении pt1 = kp1[match.queryIdx].pt src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]]) # Координаты точки во втором изображении pt2 = kp2[match.trainIdx].pt dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]]) # Конвертируем в numpy массивы src_pts = np.float32(src_pts).reshape(-1, 1, 2) dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) return src_pts, dst_pts, good_matches, kp1, kp2 def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray): """ Оценивает матрицу трансформации на основе сопоставленных ключевых точек Точки уже отцентрированы относительно центра изображения """ # Используем RANSAC для оценки матрицы гомографии H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 1.0) RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0) if H is None: return None # Маска инлайеров inliers = int(mask.sum()) if mask is not None else 0 total = int(mask.size) if mask is not None else 0 inlier_ratio = (inliers / total) if total > 0 else 0.0 # Ошибка репроекции по инлайерам (RMSE) rmse = None try: # Проецируем src через H и сравниваем с dst proj = cv2.perspectiveTransform(src_pts, H) errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1) if mask is not None and mask.shape[0] == errs.shape[0]: errs = errs[mask.ravel().astype(bool)] if errs.size > 0: rmse = float(np.sqrt(np.mean(errs ** 2))) except Exception: rmse = None # Извлекаем параметры трансформации из матрицы гомографии # H = [a11 a12 tx] # [a21 a22 ty] # [0 0 1 ] # Масштаб и поворот a11, a12 = H[0, 0], H[0, 1] a21, a22 = H[1, 0], H[1, 1] # Смещение (уже отцентрировано) tx, ty = RH[0, 2] * MOVE_KOF, RH[1, 2] * MOVE_KOF print(" [Pilot] translate:", tx, ty) # Вычисляем угол поворота angle = -np.arctan2(a21, a11) # Вычисляем масштаб scale_x = np.sqrt(a11**2 + a21**2) scale_y = np.sqrt(a12**2 + a22**2) scale = (scale_x + scale_y) / 2 return { 'translation': (tx, ty), # Уже отцентрировано 'rotation': angle, 'scale': scale, 'homography': H, 'mask': mask, 'inliers': inliers, 'inliers_ratio': inlier_ratio, 'rmse': rmse } def update_drone_position(self, transformation_info: dict): """ Обновляет позицию и угол БПЛА на основе трансформации изображения Координаты уже отцентрированы относительно центра изображения """ tx, ty = transformation_info['translation'] rotation = transformation_info['rotation'] scale = transformation_info['scale'] # Координаты уже отцентрированы, поэтому используем их напрямую dx_meters = ty dy_meters = tx # Обновляем угол БПЛА self.angle += rotation # Применяем поворот к смещению (учитываем текущий угол БПЛА) cos_angle = math.cos(self.angle) sin_angle = math.sin(self.angle) # Поворачиваем смещение в глобальные координаты # Обратите внимание: dy_meters инвертирован, так как в изображениях Y направлен вниз dx_global = dx_meters * cos_angle - dy_meters * sin_angle dy_global = dx_meters * sin_angle + dy_meters * cos_angle # Обновляем координаты БПЛА self.x += dx_global self.y += dy_global # Нормализуем угол в диапазоне [-π, π] self.angle = math.atan2(math.sin(self.angle), math.cos(self.angle)) def get_drone_state(self) -> dict: """ Возвращает текущее состояние БПЛА """ return { 'x': self.x, 'y': self.y, 'angle': self.angle, 'angle_degrees': math.degrees(self.angle), 'frame_count': self.frame_count } def handle(self, image: Image) -> PilotCommand: self.frame_count += 1 if self.prev_image is None: self.prev_image = self.image_to_numpy(image) # Вычисляем центр изображения height, width = self.prev_image.shape[:2] self.image_center = (width // 2, height // 2) # Обновляем визуализацию детекции if self.vis_manager is not None: kp, _ = self.orb_detector.detectAndCompute(self.prev_image, None) self.vis_manager.update_detection(self.prev_image, kp) return PilotCommand() # Конвертируем текущее изображение current_image = self.image_to_numpy(image) # Обновляем центр изображения height, width = current_image.shape[:2] self.image_center = (width // 2, height // 2) # Расстояние до цели distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.x) ** 2 + (self.points[self.target_idx][1] - self.y) ** 2 ) # Обнаруживаем и сопоставляем ключевые точки src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image) # Оцениваем смещение БПЛА if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts) if transformation_info: # Обновляем позицию и угол БПЛА (одометрия по кадрам) self.update_drone_position(transformation_info) # Выводим текущее состояние БПЛА drone_state = self.get_drone_state() print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})") print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°") print(f" [Pilot] Target Index: {self.target_idx}") print(f" [Pilot] Target Position: {self.points[self.target_idx]}") print(f" [Pilot] Distance: {distance_to_target}") # Обновляем визуализацию if self.vis_manager: # Обновляем детекцию ключевых точек self.vis_manager.update_detection(current_image, kp2) # Обновляем сопоставление точек self.vis_manager.update_matches( self.prev_image, current_image, kp1, kp2, matches, transformation_info) # Пытаемся найти ориентир на картинке: landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB) src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(current_image, landmark_image) if src_pts is not None and dst_pts is not None and self.vis_manager: self.vis_manager.update_chunk_matches(current_image, landmark_image, kp1, kp2, matches) if src_pts is not None and dst_pts is not None: # Оцениваем матрицу трансформации landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts) # Если ориентир достоверно найден — скорректируем глобальные координаты и угол if landmark_transform is not None: ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale) ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers) ratio = landmark_transform.get('inliers_ratio', 0.0) ok_ratio = (ratio >= self.min_inlier_ratio) rmse = landmark_transform.get('rmse', None) ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse) if ok_scale and ok_inliers and ok_ratio and ok_rmse: print("[PILOT]", rmse, ratio, ok_rmse) # if False: # Считаем абсолютную позу относительно координат ориентира landmark_world_x, landmark_world_y = self.points[self.target_idx] tx, ty = landmark_transform['translation'] # Смещение в системе БПЛА: (dx_meters, dy_meters) dx_meters = ty dy_meters = tx # Используем оценённый абсолютный угол из гомографии относительно карты (север-вверх) absolute_angle = landmark_transform['rotation'] cos_a = math.cos(absolute_angle) sin_a = math.sin(absolute_angle) # Переход в глобальные координаты карты dx_global = dx_meters * cos_a - dy_meters * sin_a dy_global = dx_meters * sin_a + dy_meters * cos_a self.x = landmark_world_x + dx_global self.y = landmark_world_y + dy_global self.angle = math.atan2(math.sin(absolute_angle), math.cos(absolute_angle)) print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})") if distance_to_target < 75: self.target_idx += 1 if self.target_idx == len(self.points): return PilotCommand(stop=True) # Вычисляем угол к цели target_angle = math.atan2(self.points[self.target_idx][1] - self.y, self.points[self.target_idx][0] - self.x) # Вычисляем разность углов (направление поворота) angle_diff = target_angle - self.angle # Нормализуем разность углов в диапазон [-π, π] angle_diff %= 2 * math.pi if angle_diff >= math.pi: angle_diff -= 2 * math.pi self.prev_image = current_image return PilotCommand(max(min(0.1, angle_diff), -0.1), min(50., distance_to_target / 2)) def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0): """Сбрасывает позицию и угол БПЛА""" self.x = x self.y = y self.angle = angle self.frame_count = 0 class RandomPilot(Pilot): counter: int def __init__(self, velocity: float = 1): self.counter = 0 def act(self) -> tuple[float, float] | None: self.counter += 1 if self.counter > 300: return None return 1 / (self.counter + 20), 10.0 # def _test(): # randomPilot = RandomPilot() # point = [0, 0] # iter_count = 100 # points = [point.copy()] # for i in range(iter_count): # dx, dy = randomPilot.step() # prev_point = point.copy() # point[0] += dx # point[1] += dy # points.append(point.copy()) # coords = list(zip(*points)) # padding = 5 # plt.axis([ # min(coords[0]) - padding, max(coords[0]) + padding, # min(coords[1]) - padding, max(coords[1]) + padding]) # for i in range(iter_count): # plt.plot(coords[0][i:i+2], coords[1][i:i+2], color='#5e5') # plt.pause(0.05) # sleep(1) # if __name__ == '__main__': # _test()