diff --git a/autopilot.py b/autopilot.py index 201de7f..277df52 100644 --- a/autopilot.py +++ b/autopilot.py @@ -20,6 +20,9 @@ from utility import estimate_transformation_matrix random.seed(1) +FEATURE_METHODS = ("orb", "akaze", "sift", "brisk") +INTERFRAME_METHODS = FEATURE_METHODS + ("optical_flow",) + class Pilot: def __init__(self): pass def handle(self, image: Image): pass @@ -71,6 +74,8 @@ class AutoPilot(Pilot): pixel_ratio: float = 1., use_sian_similarity: bool = False, use_gan: bool = False, + interframe_method: str = "optical_flow", + landmark_method: str = "orb", ): self.prev_chunk = None self.pos = Position(0, 0, 1, 0, 0, 0) @@ -81,6 +86,12 @@ class AutoPilot(Pilot): self.pixel_ratio = pixel_ratio self.use_sian_similarity = use_sian_similarity self.use_gan = use_gan + if interframe_method not in INTERFRAME_METHODS: + raise ValueError(f"Unsupported interframe method: {interframe_method}") + if landmark_method not in FEATURE_METHODS: + raise ValueError(f"Unsupported landmark method: {landmark_method}") + self.interframe_method = interframe_method + self.landmark_method = landmark_method # Пороговые значения качества сопоставления/гомографии self.min_inliers: int = 12 @@ -146,6 +157,16 @@ class AutoPilot(Pilot): return src_pts, dst_pts + def calculate_interframe_motion(self, prev_chunk: VisionChunk, current_chunk: VisionChunk): + """Вычисляет смещение между соседними кадрами выбранным методом.""" + if self.interframe_method == "optical_flow": + return self.calculate_optical_flow(prev_chunk, current_chunk) + + prev_features = VisionChunk(prev_chunk.image, self.interframe_method) + current_features = VisionChunk(current_chunk.image, self.interframe_method) + src_pts, dst_pts, *_ = prev_features.detect_and_match_keypoints(current_features) + return src_pts, dst_pts + def update_drone_position(self, homography_matrix: np.ndarray): """ Обновляет позицию и угол БПЛА на основе трансформации изображения @@ -169,7 +190,7 @@ class AutoPilot(Pilot): landmark_timer = Timer() landmark_timer.start() - current_chunk = self.prev_chunk + current_chunk = VisionChunk(self.prev_chunk.image, self.landmark_method) if self.prev_chunk is not None else None if current_chunk is None or not self.chunks: return None @@ -315,8 +336,7 @@ class AutoPilot(Pilot): # Вычисляем оптический поток для покадрового сравнения matching_timer = Timer() matching_timer.start() - src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk) - # src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk) + src_pts, dst_pts = self.calculate_interframe_motion(self.prev_chunk, current_chunk) matching_timer.stop() print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms") diff --git a/main.py b/main.py index 8b75727..6fe8d7e 100644 --- a/main.py +++ b/main.py @@ -72,6 +72,9 @@ def move_map_safely(online_map: YandexMap | GoogleMap, dx: float, dy: float, ste online_map.move(dx / steps, dy / steps) sleep(0.05) +def normalize_interframe_method(method: str) -> str: + return "optical_flow" if method == "optical-flow" else method + def build(name: str, map_name: str, lat: float, lon: float): # Создание папки с информацией о маршруте @@ -167,6 +170,14 @@ def build(name: str, map_name: str, lat: float, lon: float): 'flight_zoom': flight_zoom, 'map_pixel_ratio': map_pixel_ratio, 'flight_pixel_ratio': online_map.pixel_ratio, + 'build_params': { + 'name': name, + 'reference': map_name, + 'lat': lat, + 'lon': lon, + 'map_zoom': map_zoom, + 'flight_zoom': flight_zoom, + }, 'map_extent': make_map_extent(width, height, points[0], map_pixel_ratio), 'route_length': calc_polyline_length(points_coords), 'route_length_meters': calc_polyline_length(points_coords), @@ -193,6 +204,9 @@ def run( ref_min_distance: float, use_sian_similarity: bool = False, use_gan: bool = False, + interframe_method: str = "optical_flow", + landmark_method: str = "orb", + run_params: dict | None = None, ): dir = Path('trajectories') assert dir.exists() @@ -200,6 +214,7 @@ def run( assert dir.exists(), "Укажите корректное название маршрута" dir_chunks = dir / 'chunks' file_positions = dir / 'positions.pkl' + interframe_method = normalize_interframe_method(interframe_method) with file_positions.open('rb') as file: data = pickle.load(file) @@ -218,7 +233,7 @@ def run( if not chunk_path.exists(): continue if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance: - chunk = VisionChunk.load_image(chunk_path) + chunk = VisionChunk.load_image(chunk_path, landmark_method) chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio chunks.append(chunk) @@ -266,6 +281,8 @@ def run( online_map.pixel_ratio, use_sian_similarity=use_sian_similarity, use_gan=use_gan, + interframe_method=interframe_method, + landmark_method=landmark_method, ) simulator = Simulator(online_map) pilot.target_idx = 0 @@ -368,6 +385,16 @@ def run( test_run_dir = make_test_run_dir(name) vis_manager.save_plots(test_run_dir) metrics = { + 'run_params': run_params or { + 'name': name, + 'simulation': map_name, + 'ref_min_distance': ref_min_distance, + 'use_sian_similarity': use_sian_similarity, + 'use_gan': use_gan, + 'interframe_method': interframe_method, + 'landmark_method': landmark_method, + }, + 'build_params': data.get('build_params'), 'position_errors': errors, 'mse': float((np.array(errors) ** 2).mean()) if errors else None, 'rmse': float((np.array(errors) ** 2).mean() ** 0.5) if errors else None, @@ -487,6 +514,24 @@ def parse_args(): help='Преобразовывать эталонный vision_chunk через GAN перед поиском ключевых точек' ) + parser.add_argument( + '--interframe-method', + type=str, + default='optical-flow', + choices=['orb', 'akaze', 'sift', 'brisk', 'optical-flow', 'optical_flow'], + help='Метод межкадрового сравнения: orb, akaze, sift, brisk или optical-flow (по умолчанию optical-flow)' + ) + + parser.add_argument( + '--landmark-method', + '--reference-method', + dest='landmark_method', + type=str, + default='orb', + choices=['orb', 'akaze', 'sift', 'brisk'], + help='Метод сравнения с эталонами: orb, akaze, sift или brisk (по умолчанию orb)' + ) + # Парсим аргументы args = parser.parse_args() @@ -506,6 +551,8 @@ if __name__ == "__main__": lat: float = args.lat lon: float = args.lon rmd: float = args.ref_min_distance + interframe_method = normalize_interframe_method(args.interframe_method) + landmark_method = args.landmark_method constants.DEBUG_FPS = args.debug_fps constants.DEBUG_LANDMARK = args.debug_landmark @@ -514,4 +561,27 @@ if __name__ == "__main__": build(name, ref, lat, lon) if mode == 'run' or mode == 'standalone': - run(name, sim, rmd, args.use_sian_similarity, args.use_gan) + run( + name, + sim, + rmd, + args.use_sian_similarity, + args.use_gan, + interframe_method, + landmark_method, + { + 'mode': mode, + 'name': name, + 'lat': lat, + 'lon': lon, + 'reference': ref, + 'simulation': sim, + 'ref_min_distance': rmd, + 'debug_fps': args.debug_fps, + 'debug_landmark': args.debug_landmark, + 'use_sian_similarity': args.use_sian_similarity, + 'use_gan': args.use_gan, + 'interframe_method': interframe_method, + 'landmark_method': landmark_method, + }, + )