diff --git a/autopilot.py b/autopilot.py index 136a095..c4f98c6 100644 --- a/autopilot.py +++ b/autopilot.py @@ -9,6 +9,7 @@ from PIL import Image from timer import Timer +from vision_chunk import VisionChunk from visualization import VisualizationManager from geolocation import Geolocation @@ -42,28 +43,23 @@ class AutoPilot(Pilot): # Состояние одометрии geo: Geolocation - # Ориентиры - points: list[tuple[float,float]] # Координаты - keypoints: list[any] # Ключевые точки ориентиров - target_idx: int # Текущая целевая метка + chunks: list[VisionChunk] # Ориентиры + target_idx: int # Текущий ориентир # Всякие вспомогательные штуки timer: Timer - prev_image: np.ndarray | None - orb_detector: cv2.ORB - bf_matcher: cv2.BFMatcher + prev_chunk: VisionChunk | None frame_count: int - image_center: tuple # Центр изображения (x, y) vis_manager: VisualizationManager # Менеджер визуализации (опционально) # Положение на основе ориентира reserved_geo: Geolocation | None - def __init__(self, points = [], keypoints = [], viz_manager=None): - self.prev_image = None + def __init__(self, points = [], chunks = [], viz_manager=None): + self.prev_chunk = None self.geo = Geolocation(0, 0, 1, 0) + self.chunks = chunks self.frame_count = 0 - self.image_center = (0, 0) # Будет обновлено при получении первого изображения self.vis_manager = viz_manager # Менеджер визуализации self.reserved_geo = None @@ -74,22 +70,7 @@ class AutoPilot(Pilot): self.min_scale: float = 0.7 self.max_scale: float = 1.4 - # Инициализация ORB детектора - self.orb_detector = cv2.ORB_create( - nfeatures=1000, - scaleFactor=1.3, - nlevels=4, - edgeThreshold=19, - patchSize=31, - fastThreshold=15, - scoreType=cv2.ORB_FAST_SCORE # FAST обычно быстрее - ) - - # Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio) - self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) - self.points = points - self.keypoints = keypoints self.target_idx = 0 self.timer = Timer() @@ -97,74 +78,6 @@ class AutoPilot(Pilot): def get_position(self) -> tuple[float, float]: return self.geo.x, self.geo.y - def image_to_numpy(self, image: Image.Image) -> np.ndarray: - """Конвертирует PIL Image в numpy array для OpenCV""" - return np.array(image) - - def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray): - """ - Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями - """ - # Предобработка (повышение контраста и устойчивости) - def preprocess(gray_img: np.ndarray) -> np.ndarray: - clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) - return clahe.apply(gray_img) - - # В градации серого - g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1 - g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2 - g1 = preprocess(g1) - g2 = preprocess(g2) - - # Обнаружение ключевых точек и дескрипторов - kp1, des1 = self.orb_detector.detectAndCompute(g1, None) - kp2, des2 = self.orb_detector.detectAndCompute(g2, None) - - if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: - return None, None, None, None, None - - # kNN сопоставление и тест Лоу - matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2) - good_matches: list[cv2.DMatch] = [] - for m_n in matches_knn: - if len(m_n) < 2: - continue - m, n = m_n - if m.distance < 0.75 * n.distance: - good_matches.append(m) - - # Дополнительная фильтрация по расстоянию (мягкий порог) - good_matches = sorted(good_matches, key=lambda x: x.distance) - good_matches = [m for m in good_matches if m.distance < 64] - - if len(good_matches) < 4: - return None, None, None, None, None - - # Получаем центр изображения - height1, width1 = g1.shape[:2] - height2, width2 = g2.shape[:2] - center_x1, center_y1 = width1 // 2, height1 // 2 - center_x2, center_y2 = width2 // 2, height2 // 2 - - # Извлекаем координаты сопоставленных точек и отцентрируем их - src_pts = [] - dst_pts = [] - - for match in good_matches: - # Координаты точки в первом изображении - pt1 = kp1[match.queryIdx].pt - src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]]) - - # Координаты точки во втором изображении - pt2 = kp2[match.trainIdx].pt - dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]]) - - # Конвертируем в numpy массивы - src_pts = np.float32(src_pts).reshape(-1, 1, 2) - dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) - - return src_pts, dst_pts, good_matches, kp1, kp2 - def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray): """ Оценивает матрицу трансформации на основе сопоставленных ключевых точек @@ -251,18 +164,16 @@ class AutoPilot(Pilot): 'frame_count': self.frame_count } - - def get_position_by_chunk(self) -> Geolocation | None: # Пытаемся найти ориентир на картинке: - current_image = self.prev_image - landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB) - src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(landmark_image, current_image) + current_chunk = self.prev_chunk + landmark_chunk = self.chunks[self.target_idx] + src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk) if src_pts is not None and dst_pts is not None and self.vis_manager: was_enabled = self.timer.enabled if was_enabled: self.timer.stop() - self.vis_manager.update_chunk_matches(landmark_image, current_image, kp1, kp2, matches) + self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches) if was_enabled: self.timer.start() if src_pts is not None and dst_pts is not None: @@ -293,26 +204,15 @@ class AutoPilot(Pilot): return None - def handle(self, image: Image) -> PilotCommand: + def handle(self, current_chunk: VisionChunk) -> PilotCommand: self.timer.start() self.frame_count += 1 - if self.prev_image is None: - self.prev_image = self.image_to_numpy(image) - # Вычисляем центр изображения - height, width = self.prev_image.shape[:2] - self.image_center = (width // 2, height // 2) - + if self.prev_chunk is None: + self.prev_chunk = current_chunk self.timer.stop() return PilotCommand(processing_time=self.timer.get_elapsed()) - # Конвертируем текущее изображение - current_image = self.image_to_numpy(image) - - # Обновляем центр изображения - height, width = current_image.shape[:2] - self.image_center = (width // 2, height // 2) - # Расстояние до цели distance_to_target = math.sqrt( (self.points[self.target_idx][0] - self.geo.x) ** 2 + @@ -320,7 +220,7 @@ class AutoPilot(Pilot): ) # Обнаруживаем и сопоставляем ключевые точки - src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image) + src_pts, dst_pts, matches, kp1, kp2 = self.prev_chunk.detect_and_match_keypoints(current_chunk) # Оцениваем смещение БПЛА if src_pts is not None and dst_pts is not None: @@ -334,7 +234,6 @@ class AutoPilot(Pilot): self.timer.stop() # Выводим текущее состояние БПЛА - drone_state = self.get_drone_state() # print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})") # print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°") # print(f" [Pilot] Target Index: {self.target_idx}") @@ -346,14 +245,14 @@ class AutoPilot(Pilot): # Обновляем сопоставление точек self.vis_manager.update_matches( - self.prev_image, - current_image, + self.prev_chunk.to_numpy(), + current_chunk.to_numpy(), kp1, kp2, matches, transformation_info) mask = transformation_info['mask'] self.vis_manager.update_motion_vectors( - current_image, + current_chunk.to_numpy(), kp1, kp2, np.array(matches)[mask.ravel().astype(bool)]) @@ -361,14 +260,14 @@ class AutoPilot(Pilot): # Используем новую визуализацию движения точек по сетке homography_matrix = transformation_info['homography'] self.vis_manager.update_homography_grid( - current_image, + current_chunk.to_numpy(), homography_matrix, grid_step=85) self.timer.start() # Пытаемся найти ориентир на картинке: - self.prev_image = current_image + self.prev_chunk = current_chunk npos = self.get_position_by_chunk() if npos is not None: self.reserved_geo = npos diff --git a/chunk.py b/chunk.py deleted file mode 100644 index e69de29..0000000 diff --git a/main.py b/main.py index 1a4469f..5cc9dee 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,11 @@ -from simulator import Simulator -from visualization import VisualizationManager -from trajectory_drawer import TrajectoryDrawer -from yandex_map import YandexMap -from time import sleep from pathlib import Path +from simulator import Simulator +from time import sleep +from trajectory_drawer import TrajectoryDrawer +from visualization import VisualizationManager +from yandex_map import YandexMap +from vision_chunk import VisionChunk +from utility import cv2_to_pil import random import cv2 @@ -37,16 +39,18 @@ def main(): # Для каждой точки сделаем приближенный снимок yandexMap = YandexMap() - keypoints: list[(any, any)] = [] + chunks: list[VisionChunk] = [] plt.ion() for i in range(len(points)): point = points[i] yandexMap.scroll(point[0], point[1], 5, True) sleep(1) - img = yandexMap.make_screenshot(point[0], point[1], 0.2, 0.2) + cv2_img = yandexMap.make_screenshot(point[0], point[1], 0.2, 0.2) + img = cv2_to_pil(cv2_img) + chunk = VisionChunk(img) Path('chunks').mkdir(exist_ok=True) - cv2.imwrite(Path('.') / 'chunks' / f'chunk_{i}.png', img) + chunk.save_image(Path('.') / 'chunks' / f'chunk_{i}.png') plt.subplot(1, len(points), i+1) plt.imshow(img) plt.pause(0.25) @@ -56,20 +60,9 @@ def main(): # Выделим на каждой картинке ключевые точки for i in range(len(points)): - orb = cv2.ORB_create( - nfeatures=1000, - scaleFactor=1.2, - nlevels=8, - edgeThreshold=31, - firstLevel=0, - WTA_K=2, - patchSize=31, - fastThreshold=20 - ) - img = cv2.imread(Path('chunks') / f'chunk_{i}.png') - kp: list[cv2.KeyPoint] - kp, des = orb.detectAndCompute(img, None) - keypoints.append((kp, des)) + chunk = VisionChunk.load_image(Path('chunks') / f'chunk_{i}.png') + chunks.append(chunk) + kp, des = chunk.compute_keypoints() plt.subplot(1, len(points), i+1) kp_coords = np.array([j.pt for j in kp]) @@ -93,12 +86,12 @@ def main(): (p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height ], points))) points_coords *= 2 ** 4 - pilot = autopilot.AutoPilot(points_coords, keypoints, vis_manager) + pilot = autopilot.AutoPilot(points_coords, chunks, vis_manager) simulator = Simulator(yandexMap) pilot.target_idx = 0 - photo = simulator.get_photo() - command = pilot.handle(photo) + chunk = simulator.get_chunk() + command = pilot.handle(chunk) vis_manager.update_display() vis_manager.pause(1) @@ -117,13 +110,13 @@ def main(): simulator.change_zoom(direction) zoom_next_event = i + random.randint(20, 40) - photo = simulator.get_photo() - command = pilot.handle(photo) + chunk = simulator.get_chunk() + command = pilot.handle(chunk) proc_time = np.append(proc_time, command.proccessing_time) # Save Image - photo.save(Path('images') / f"photo_{i}.png") + chunk.save_image(Path('images') / f"photo_{i}.png") vis_manager.update_display() vis_manager.pause(0.2) @@ -142,7 +135,7 @@ def main(): vis_manager.pause(0.2) last_proc_times = proc_time[-10:] - # print("Average FPS:", 1 / last_proc_times.mean()) + print("Average FPS:", 1 / last_proc_times.mean()) vis_manager.show_final() diff --git a/simulator.py b/simulator.py index 1bea570..5b33741 100644 --- a/simulator.py +++ b/simulator.py @@ -9,6 +9,7 @@ from selenium.webdriver.common.action_chains import ActionChains from autopilot import Pilot from geolocation import Geolocation +from vision_chunk import VisionChunk from visualization import VisualizationManager, SimMode from yandex_map import YandexMap @@ -103,10 +104,10 @@ class Simulator: else: self.geo.z *= 2 - def get_photo(self) -> Image.Image: + def get_chunk(self) -> VisionChunk: png = self.yandexMap.driver.get_screenshot_as_png() im = Image.open(BytesIO(png)) # Применяем поворот как будто съемка с дрона rotated_im = self.rotate_image_like_drone(im, -self.geo.angle) - return rotated_im + return VisionChunk(rotated_im) diff --git a/utility.py b/utility.py new file mode 100644 index 0000000..2af04ee --- /dev/null +++ b/utility.py @@ -0,0 +1,15 @@ +from PIL import Image +import cv2 +import numpy as np + +def cv2_to_pil(cv_image: np.ndarray) -> Image.Image: + """ + cv2.MatLike (BGR/RGB/Grayscale) → PIL.Image + """ + if len(cv_image.shape) == 3 and cv_image.shape[2] == 3: + cv_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB) + return Image.fromarray(cv_image) + +def image_to_numpy(self, image: Image.Image) -> np.ndarray: + """Конвертирует PIL Image в numpy array для OpenCV""" + return np.array(image) \ No newline at end of file diff --git a/vision_chunk.py b/vision_chunk.py new file mode 100644 index 0000000..a123706 --- /dev/null +++ b/vision_chunk.py @@ -0,0 +1,161 @@ +import cv2 +import json +import numpy as np +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal, Optional, Tuple +from PIL import Image + +FeatureMethod = Literal["orb", "sift", "surf"] + +@dataclass +class VisionChunk: + image: Image.Image + feature_method: FeatureMethod = "orb" + + keypoints: Optional[list] = field(default=None, init=False) + descriptors: Optional[np.ndarray] = field(default=None, init=False) + _detector: Optional[cv2.Feature2D] = field(default=None, init=False, repr=False) + _matcher: Optional[cv2.DescriptorMatcher] = field(default=None, init=False, repr=False) + + def _get_detector(self) -> cv2.Feature2D: + if self._detector is not None: + return self._detector + + if self.feature_method == "orb": + self._detector = cv2.ORB_create( + nfeatures=1000, + scaleFactor=1.2, + nlevels=8, + edgeThreshold=31, + firstLevel=0, + WTA_K=2, + patchSize=31, + fastThreshold=20, + ) + else: + raise ValueError(f"Unsupported feature method: {self.feature_method}") + return self._detector + + def _get_matcher(self) -> cv2.DescriptorMatcher: + if self._matcher is None: + self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) + return self._matcher + + def _preprocess(self, img_np: np.ndarray) -> np.ndarray: + """CLAHE предобработка для улучшения контраста""" + if len(img_np.shape) == 3: + gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) + else: + gray = img_np + + clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) + return clahe.apply(gray) + + def compute_keypoints(self, force: bool = False) -> Tuple[list, Optional[np.ndarray]]: + if self.keypoints is not None and self.descriptors is not None and not force: + return self.keypoints, self.descriptors + + detector = self._get_detector() + + # PIL -> OpenCV (RGB->BGR) + img_np = np.array(self.image) + if img_np.ndim == 3 and img_np.shape[2] == 3: + img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) + + # CLAHE предобработка + preprocessed = self._preprocess(img_np) + kps, desc = detector.detectAndCompute(preprocessed, None) + + self.keypoints = kps + self.descriptors = desc + return kps, desc + + def detect_and_match_keypoints( + self, + other: "VisionChunk" + ) -> Tuple[ + Optional[np.ndarray], + Optional[np.ndarray], + Optional[list], + Optional[list], + Optional[list] + ]: + """ + Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты) + """ + # Вычисляем keypoints для обоих + kp1, des1 = self.compute_keypoints() + kp2, des2 = other.compute_keypoints() + + if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: + return None, None, None, None, None + + # kNN matching + Lowe ratio test + matcher = self._get_matcher() + matches_knn = matcher.knnMatch(des1, des2, k=2) + good_matches: list[cv2.DMatch] = [] + + for m_n in matches_knn: + if len(m_n) < 2: + continue + m, n = m_n + if m.distance < 0.75 * n.distance: + good_matches.append(m) + + # Фильтрация по расстоянию (мягкий порог 64) + good_matches = sorted(good_matches, key=lambda x: x.distance) + good_matches = [m for m in good_matches if m.distance < 64] + + if len(good_matches) < 4: + return None, None, None, None, None + + # Центр изображений + img1_cv = self.to_cv2_gray() + img2_cv = other.to_cv2_gray() + h1, w1 = img1_cv.shape + h2, w2 = img2_cv.shape + cx1, cy1 = w1 // 2, h1 // 2 + cx2, cy2 = w2 // 2, h2 // 2 + + # Отцентрированные координаты (x_rel, y_rel) + src_pts = [] + dst_pts = [] + + for match in good_matches: + pt1 = kp1[match.queryIdx].pt + src_pts.append([pt1[0] - cx1, cy1 - pt1[1]]) # Y вверх! + + pt2 = kp2[match.trainIdx].pt + dst_pts.append([pt2[0] - cx2, cy2 - pt2[1]]) + + src_pts = np.float32(src_pts).reshape(-1, 1, 2) + dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) + + return src_pts, dst_pts, good_matches, kp1, kp2 + + def to_cv2_gray(self) -> np.ndarray: + """PIL -> OpenCV grayscale с предобработкой""" + img_np = np.array(self.image) + if img_np.ndim == 3: + gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) + else: + gray = img_np + return self._preprocess(img_np) + + def get_shape(self) -> Tuple[int, int]: + return self.image.height, self.image.width + + def save_image(self, path: Path | str, format: str = "PNG") -> None: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + self.image.save(path, format=format.upper()) + + def to_numpy(self) -> np.ndarray: + return np.array(self.image) + + @classmethod + def load_image(cls, path: Path | str, feature_method: FeatureMethod = "orb") -> "VisionChunk": + path = Path(path) + image = Image.open(path) + return cls(image=image, feature_method=feature_method)