from google_map import GoogleMap from datetime import datetime from pathlib import Path from position import Position from simulator import Simulator from time import sleep from trajectory_drawer import TrajectoryDrawer from utility import cv2_to_pil from vision_chunk import VisionChunk from visualization import VisualizationManager from yandex_map import YandexMap import argparse import autopilot import constants import cv2 import matplotlib.pyplot as plt import numpy as np import pickle import random import shutil import utility def get_map(map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=18): if map_name == 'google': return GoogleMap(lat, lon, zoom) if map_name == 'yandex': return YandexMap(lat, lon, zoom) return None def get_pixel_ratio(map_name: str, zoom: int) -> float: if map_name == 'google': return constants.GOOGLE_PIXEL_RATIO[zoom] if map_name == 'yandex': return constants.YANDEX_PIXEL_RATIO[zoom] raise ValueError(f"Unknown map provider: {map_name}") def make_global_photo(filename, map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=13): online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, zoom) online_map.save_photo(filename) online_map.destroy() def get_trajectory_points(bg_img: str) -> list[(float, float)]: trajectoryDrawer = TrajectoryDrawer(bg_img) trajectoryDrawer.show() trajectoryDrawer.wait() points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points)) return points def make_map_extent(width: float, height: float, origin_point: list[float], pixel_ratio: float) -> tuple[float, float, float, float]: """Возвращает границы снимка карты в координатах маршрута.""" origin_x = origin_point[0] * width origin_y = origin_point[1] * height left = (0 - origin_x) * pixel_ratio right = (width - origin_x) * pixel_ratio bottom = (origin_y - height) * pixel_ratio top = origin_y * pixel_ratio return (left, right, bottom, top) def make_test_run_dir(name: str) -> Path: run_dir = Path('test_runs') / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{name}" run_dir.mkdir(parents=True, exist_ok=True) return run_dir def calc_polyline_length(points) -> float: points = np.array(points) if len(points) < 2: return 0.0 segments = points[1:] - points[:-1] return float(np.hypot(segments[:, 0], segments[:, 1]).sum()) def move_map_safely(online_map: YandexMap | GoogleMap, dx: float, dy: float, step: int = 300): """Двигает карту короткими drag-шагами, чтобы Selenium не выходил за viewport.""" steps = max(1, int(np.ceil(max(abs(dx), abs(dy)) / step))) for _ in range(steps): online_map.move(dx / steps, dy / steps) sleep(0.05) def normalize_interframe_method(method: str) -> str: return "optical_flow" if method == "optical-flow" else method def get_trajectory_dir(name: str) -> Path: dir = Path('trajectories') if not dir.exists(): dir.mkdir() return dir / name def markup(name: str, map_name: str, lat: float, lon: float): dir = get_trajectory_dir(name) assert not dir.exists(), "Маршрут уже существует" dir.mkdir() map_zoom = 15 flight_zoom = 18 map_pixel_ratio = get_pixel_ratio(map_name, map_zoom) map_path = dir / 'map.jpg' online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, map_zoom) online_map.save_photo(str(map_path)) width, height = online_map.get_size() online_map.destroy() points = get_trajectory_points(str(map_path)) points_coords = np.array(list(map(lambda p: [ (p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height ], points))) points_coords_pixels = points_coords.copy() points_coords *= map_pixel_ratio data = { 'points': points_coords, 'drawn_points': points, 'map_image': 'map.jpg', 'map_size': (width, height), 'map_zero_point': points[0], 'map_provider': map_name, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, 'map_pixel_ratio': map_pixel_ratio, 'build_params': { 'name': name, 'reference': map_name, 'lat': lat, 'lon': lon, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, }, 'map_extent': make_map_extent(width, height, points[0], map_pixel_ratio), 'route_length': calc_polyline_length(points_coords), 'route_length_meters': calc_polyline_length(points_coords), 'route_length_pixels': calc_polyline_length(points_coords_pixels), } with (dir / 'positions.pkl').open('wb') as file: pickle.dump(data, file) print(points_coords) print("WRITE POINTS:", points) def build(name: str, map_name: str, lat: float, lon: float): dir = get_trajectory_dir(name) if not dir.exists(): markup(name, map_name, lat, lon) dir_chunks = dir / 'chunks' dir_chunks.mkdir(exist_ok=True) file_positions = dir / 'positions.pkl' with file_positions.open('rb') as file: data = pickle.load(file) build_params = data.get('build_params', {}) map_name = data.get('map_provider', build_params.get('reference', map_name)) lat = build_params.get('lat', lat) lon = build_params.get('lon', lon) points_coords = data['points'] points = data.get('drawn_points') if points is None: points = [data['map_zero_point']] map_zoom = data.get('map_zoom', 15) flight_zoom = data.get('flight_zoom', 18) online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, map_zoom) # Начнём симуляцию полёта с первой точки online_map.make_as_center(*points[0]) sleep(3) online_map.scroll(0.5, 0.5, 10, True) sleep(2) geo = online_map.get_geolocation() online_map.open(geo['lat'], geo['lon'], flight_zoom) sleep(5) points_coords_pixel = points_coords.copy() / online_map.pixel_ratio pilot = autopilot.AutoPilot(points_coords_pixel, pixel_ratio=online_map.pixel_ratio) simulator = Simulator(online_map) pilot.target_idx = 0 pilot.pos = simulator.pos.copy() command = pilot.make_command() positions: list[Position] = [] print("points_coords_pixel:", points_coords_pixel) for i in range(5000): if command.stop: final_pos = Position(points_coords[-1][0], points_coords[-1][1]) if not positions or np.hypot(positions[-1].x - final_pos.x, positions[-1].y - final_pos.y) > 1: positions.append(final_pos) break chunk = simulator.get_chunk() pilot.pos = simulator.pos.copy() command = pilot.make_command() command.velocity /= online_map.pixel_ratio if command.stop: final_pos = Position(points_coords[-1][0], points_coords[-1][1]) if not positions or np.hypot(positions[-1].x - final_pos.x, positions[-1].y - final_pos.y) > 1: positions.append(final_pos) break print("Position:", simulator.pos) # Save Image chunk.save_image(dir_chunks / f"chunk_{len(positions)}.png") positions.append(simulator.pos.copy() * online_map.pixel_ratio) simulator.handle(command.dangle, command.velocity) if i == 0 and map_name == 'google': simulator.pos.x = 0 simulator.pos.y = 0 sleep(1.5) data.update({ 'points': points_coords, 'chunk_positions': positions, 'initial_geolocation': geo, 'map_provider': map_name, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, 'flight_pixel_ratio': online_map.pixel_ratio, 'build_params': { 'name': name, 'reference': map_name, 'lat': lat, 'lon': lon, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, }, 'build_flight_length': calc_polyline_length([[pos.x, pos.y] for pos in positions]), 'build_flight_length_meters': calc_polyline_length([[pos.x, pos.y] for pos in positions]), 'build_flight_length_pixels': calc_polyline_length([[pos.x / online_map.pixel_ratio, pos.y / online_map.pixel_ratio] for pos in positions]), }) print(points_coords) with file_positions.open('wb') as file: pickle.dump(data, file) print("WRITE POINTS:", points) sleep(15) online_map.destroy() def run( name: str, map_name: str, ref_min_distance: float, use_sian_similarity: bool = False, use_gan: bool = False, interframe_method: str = "optical_flow", landmark_method: str = "orb", use_landmarks: bool = True, run_params: dict | None = None, ): dir = Path('trajectories') assert dir.exists() dir /= name assert dir.exists(), "Укажите корректное название маршрута" dir_chunks = dir / 'chunks' file_positions = dir / 'positions.pkl' interframe_method = normalize_interframe_method(interframe_method) with file_positions.open('rb') as file: data = pickle.load(file) assert 'initial_geolocation' in data, "Маршрут размечен, но не наполнен chunks. Сначала запустите --mode build." initial_geolocation = data['initial_geolocation'] lat = initial_geolocation['lat'] lon = initial_geolocation['lon'] online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 18) sleep(2) chunks: list[VisionChunk] = [] if use_landmarks: for i in range(len(data['chunk_positions'])): pos = data['chunk_positions'][i] chunk_path = dir_chunks / f"chunk_{i}.png" if not chunk_path.exists(): continue if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance: chunk = VisionChunk.load_image(chunk_path, landmark_method) chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio chunks.append(chunk) r = 0 for i in range(len(data['points']) - 1): r += np.hypot( data['points'][i][0] - data['points'][i+1][0], data['points'][i][1] - data['points'][i+1][1] ) print("R: ", r) forward_points = data['points'] planned_route_length_meters = float(data.get('route_length_meters', data.get('route_length', calc_polyline_length(forward_points)))) planned_route_length_pixels = float(data.get('route_length_pixels', planned_route_length_meters / data.get('map_pixel_ratio', online_map.pixel_ratio))) build_flight_length_meters = float(data.get('build_flight_length_meters', data.get('build_flight_length', calc_polyline_length([[pos.x, pos.y] for pos in data.get('chunk_positions', [])])))) build_flight_length_pixels = float(data.get('build_flight_length_pixels', build_flight_length_meters / online_map.pixel_ratio)) return_points = forward_points[::-1].copy() points = return_points / online_map.pixel_ratio print("READ RTH POINTS:", points) print("Saved planned route length:", planned_route_length_meters, "m /", planned_route_length_pixels, "px") print("Saved build flight length:", build_flight_length_meters, "m /", build_flight_length_pixels, "px") vis_manager = VisualizationManager() map_path = dir / data.get('map_image', 'map.jpg') if map_path.exists() and 'map_extent' in data: turn_point = data['chunk_positions'][-1] if data.get('chunk_positions') else forward_points[-1] build_trajectory = np.array([[pos.x, pos.y] for pos in data.get('chunk_positions', [])]) if len(build_trajectory) == 0: build_trajectory = forward_points map_extent = data['map_extent'] if 'map_pixel_ratio' in data and 'map_size' in data and 'map_zero_point' in data: map_extent = make_map_extent(data['map_size'][0], data['map_size'][1], data['map_zero_point'], data['map_pixel_ratio']) vis_manager.set_route_map( map_path, map_extent, build_trajectory, turn_point, forward_points[0], ) pilot = autopilot.AutoPilot( points, chunks, vis_manager, online_map.pixel_ratio, use_sian_similarity=use_sian_similarity, use_gan=use_gan, interframe_method=interframe_method, landmark_method=landmark_method, use_landmarks=use_landmarks, ) simulator = Simulator(online_map) pilot.target_idx = 0 turn_position = data['chunk_positions'][-1].copy() if data.get('chunk_positions') else Position(forward_points[-1][0], forward_points[-1][1]) start_position = turn_position / online_map.pixel_ratio start_position.yaw += np.pi simulator.pos = start_position.copy() pilot.pos = start_position.copy() if len(points) > 1: distance_to_first_return_point = np.hypot(points[0][0] - pilot.pos.x, points[0][1] - pilot.pos.y) * online_map.pixel_ratio if distance_to_first_return_point < pilot.direct_step_threshold: pilot.target_idx = 1 move_map_safely(online_map, start_position.x, start_position.y) vis_manager.update_rth_trajectory(turn_position.x, turn_position.y) chunk = simulator.get_chunk() command = pilot.handle(chunk) vis_manager.update_display() vis_manager.pause(1) vis_manager.set_target_points(return_points) proc_time = np.array([]) errors = [] start_point = forward_points[0] final_return_error = None return_route_length_meters = 0.0 prev_return_point = np.array([turn_position.x, turn_position.y], dtype=float) odometry_warmup_frames = 2 sleep(1) for i in range(10000000000): if i > 0: simulator.set_pitch(np.sin(i / 10) * 5) simulator.set_roll(np.sin(i / 15) * 5) simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3) if command.stop: break chunk = simulator.get_chunk() command = pilot.handle(chunk) if i < odometry_warmup_frames: pilot.pos = simulator.pos.copy() command = pilot.make_command() command.velocity /= online_map.pixel_ratio proc_time = np.append(proc_time, command.proccessing_time) # Save Image chunk.save_image(Path('images') / f"photo_{i}.png") vis_manager.update_display() vis_manager.pause(0.2) vis_manager.set_target_index(pilot.target_idx) current_return_point = np.array([simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio], dtype=float) return_route_length_meters += float(np.hypot(*(current_return_point - prev_return_point))) prev_return_point = current_return_point vis_manager.update_drone_trajectory(pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio) vis_manager.update_global_map(current_return_point[0], current_return_point[1]) vis_manager.update_rth_trajectory(current_return_point[0], current_return_point[1]) if i >= odometry_warmup_frames: vis_manager.update_error_plot(i, pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio, current_return_point[0], current_return_point[1]) errors.append(np.hypot((pilot.pos.x - simulator.pos.x) * online_map.pixel_ratio, (pilot.pos.y - simulator.pos.y) * online_map.pixel_ratio)) vis_manager.update_display() vis_manager.pause(0.2) last_proc_times = proc_time[-30:] print(F"\nImage #{i}") print("Average FPS:", 1 / last_proc_times.mean()) print("Pilot coords:", pilot.pos) print("Simulator coords:", simulator.pos) if command.stop: break sleep(0.5) simulator.handle(command.dangle, command.velocity) final_point = np.array([simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio]) final_return_error = float(np.hypot(final_point[0] - start_point[0], final_point[1] - start_point[1])) return_route_length_pixels = return_route_length_meters / online_map.pixel_ratio full_route_length_meters = build_flight_length_meters + return_route_length_meters full_route_length_pixels = build_flight_length_pixels + return_route_length_pixels final_return_error_pixels = final_return_error / online_map.pixel_ratio vis_manager.set_final_point(final_point[0], final_point[1], final_return_error) print("Errors:", errors) print("MSE:", (np.array(errors) ** 2).mean()) print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5) print("Return route length:", return_route_length_meters, "m /", return_route_length_pixels, "px") print("Full route length:", full_route_length_meters, "m /", full_route_length_pixels, "px") print("Final-to-start error:", final_return_error, "m /", final_return_error_pixels, "px") print("Average FPS:", 1 / proc_time.mean()) test_run_dir = make_test_run_dir(name) vis_manager.save_plots(test_run_dir) metrics = { 'run_params': run_params or { 'name': name, 'simulation': map_name, 'ref_min_distance': ref_min_distance, 'use_sian_similarity': use_sian_similarity, 'use_gan': use_gan, 'use_landmarks': use_landmarks, 'interframe_method': interframe_method, 'landmark_method': landmark_method, }, 'build_params': data.get('build_params'), 'position_errors': errors, 'mse': float((np.array(errors) ** 2).mean()) if errors else None, 'rmse': float((np.array(errors) ** 2).mean() ** 0.5) if errors else None, 'planned_route_length': planned_route_length_meters, 'build_flight_length': build_flight_length_meters, 'return_route_length': return_route_length_meters, 'full_route_length': full_route_length_meters, 'final_to_start_error': final_return_error, 'planned_route_length_meters': planned_route_length_meters, 'planned_route_length_pixels': planned_route_length_pixels, 'build_flight_length_meters': build_flight_length_meters, 'build_flight_length_pixels': build_flight_length_pixels, 'return_route_length_meters': return_route_length_meters, 'return_route_length_pixels': return_route_length_pixels, 'full_route_length_meters': full_route_length_meters, 'full_route_length_pixels': full_route_length_pixels, 'final_to_start_error_meters': final_return_error, 'final_to_start_error_pixels': final_return_error_pixels, 'map_pixel_ratio': data.get('map_pixel_ratio'), 'flight_pixel_ratio': online_map.pixel_ratio, 'start_point': start_point, 'turn_point': np.array([turn_position.x, turn_position.y]), 'final_point': final_point, } with (test_run_dir / 'metrics.pkl').open('wb') as file: pickle.dump(metrics, file) with (test_run_dir / 'metrics.txt').open('w', encoding='utf-8') as file: for key, value in metrics.items(): file.write(f"{key}: {value}\n") vis_manager.show_final() def parse_args(): """Парсер аргументов командной строки""" parser = argparse.ArgumentParser(description='Обработка траекторий') # Добавляем обязательный аргумент --mode parser.add_argument( '--mode', type=str, required=True, choices=['standalone', 'markup', 'build', 'run'], help='Режим работы: standalone, markup, build или run' ) # Добавляем опциональный аргумент --name parser.add_argument( '--name', type=str, default=utility.generate_folder_name(), help='Название траектории' ) # Координаты parser.add_argument( '--lat', type=float, default=49.103814, help='Широта (по умолчанию 49.103814)' ) parser.add_argument( '--lon', type=float, default=55.794258, help='Долгота (по умолчанию 55.794258)' ) # Источник эталонных изображений (ориентиров) parser.add_argument( '--reference', type=str, default='google', choices=['google', 'yandex'], help='Откуда берутся эталонные изображения (ориентиры): google или yandex (по умолчанию google)' ) # Место проведения симуляции parser.add_argument( '--simulation', type=str, default='yandex', choices=['google', 'yandex'], help='Где проводится симуляция: google или yandex (по умолчанию yandex)' ) # Место проведения симуляции parser.add_argument( '--ref-min-distance', type=float, default=100, help='Минимальное расстояние между эталонами' ) # Место проведения симуляции parser.add_argument( '--debug-fps', action='store_true', help='Включить отладку FPS' ) # Место проведения симуляции parser.add_argument( '--debug-landmark', action='store_true', help='Включить отладку эталонов' ) parser.add_argument( '--use-sian-similarity', action='store_true', help='Выбирать ориентир через SiaN similarity вместо ближайшего по текущей позиции' ) parser.add_argument( '--use-gan', action='store_true', help='Преобразовывать эталонный vision_chunk через GAN перед поиском ключевых точек' ) parser.add_argument( '--use-landmarks', dest='use_landmarks', action='store_true', default=True, help='Использовать эталоны для коррекции позиции (по умолчанию включено)' ) parser.add_argument( '--no-landmarks', dest='use_landmarks', action='store_false', help='Не использовать эталоны, лететь только по межкадровой одометрии' ) parser.add_argument( '--interframe-method', type=str, default='optical-flow', choices=['orb', 'akaze', 'sift', 'brisk', 'optical-flow', 'optical_flow'], help='Метод межкадрового сравнения: orb, akaze, sift, brisk или optical-flow (по умолчанию optical-flow)' ) parser.add_argument( '--landmark-method', '--reference-method', dest='landmark_method', type=str, default='orb', choices=['orb', 'akaze', 'sift', 'brisk'], help='Метод сравнения с эталонами: orb, akaze, sift или brisk (по умолчанию orb)' ) # Парсим аргументы args = parser.parse_args() # Проверяем, что для markup, build и run указан --name if args.mode in ['markup', 'build', 'run'] and not args.name: parser.error(f"--name обязателен для режима {args.mode}") return args if __name__ == "__main__": args = parse_args() name = args.name mode = args.mode sim: str = args.simulation ref: str = args.reference lat: float = args.lat lon: float = args.lon rmd: float = args.ref_min_distance interframe_method = normalize_interframe_method(args.interframe_method) landmark_method = args.landmark_method constants.DEBUG_FPS = args.debug_fps constants.DEBUG_LANDMARK = args.debug_landmark if mode == 'markup': markup(name, ref, lat, lon) if mode == 'build' or mode == 'standalone': build(name, ref, lat, lon) if mode == 'run' or mode == 'standalone': run( name, sim, rmd, args.use_sian_similarity, args.use_gan, interframe_method, landmark_method, args.use_landmarks, { 'mode': mode, 'name': name, 'lat': lat, 'lon': lon, 'reference': ref, 'simulation': sim, 'ref_min_distance': rmd, 'debug_fps': args.debug_fps, 'debug_landmark': args.debug_landmark, 'use_sian_similarity': args.use_sian_similarity, 'use_gan': args.use_gan, 'use_landmarks': args.use_landmarks, 'interframe_method': interframe_method, 'landmark_method': landmark_method, }, )