from google_map import GoogleMap from datetime import datetime from pathlib import Path from position import Position from simulator import Simulator from time import sleep from trajectory_drawer import TrajectoryDrawer from utility import cv2_to_pil from vision_chunk import VisionChunk from visualization import VisualizationManager from yandex_map import YandexMap from PIL import Image, ImageDraw import argparse import autopilot import constants import cv2 import matplotlib.pyplot as plt import numpy as np import pickle import random import shutil import utility def get_map(map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=18): if map_name == 'google': return GoogleMap(lat, lon, zoom) if map_name == 'yandex': return YandexMap(lat, lon, zoom) return None def get_pixel_ratio(map_name: str, zoom: int) -> float: if map_name == 'google': return constants.GOOGLE_PIXEL_RATIO[zoom] if map_name == 'yandex': return constants.YANDEX_PIXEL_RATIO[zoom] raise ValueError(f"Unknown map provider: {map_name}") def make_global_photo(filename, map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=13): online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, zoom) online_map.save_photo(filename) online_map.destroy() def get_trajectory_points(bg_img: str) -> list[(float, float)]: trajectoryDrawer = TrajectoryDrawer(bg_img) trajectoryDrawer.show() trajectoryDrawer.wait() points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points)) return points def make_map_extent(width: float, height: float, origin_point: list[float], pixel_ratio: float) -> tuple[float, float, float, float]: """Возвращает границы снимка карты в координатах маршрута.""" origin_x = origin_point[0] * width origin_y = origin_point[1] * height left = (0 - origin_x) * pixel_ratio right = (width - origin_x) * pixel_ratio bottom = (origin_y - height) * pixel_ratio top = origin_y * pixel_ratio return (left, right, bottom, top) def make_test_run_dir(name: str) -> Path: run_dir = Path('test_runs') / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{name}" run_dir.mkdir(parents=True, exist_ok=True) return run_dir def calc_polyline_length(points) -> float: points = np.array(points) if len(points) < 2: return 0.0 segments = points[1:] - points[:-1] return float(np.hypot(segments[:, 0], segments[:, 1]).sum()) def move_map_safely(online_map: YandexMap | GoogleMap, dx: float, dy: float, step: int = 300): """Двигает карту короткими drag-шагами, чтобы Selenium не выходил за viewport.""" steps = max(1, int(np.ceil(max(abs(dx), abs(dy)) / step))) for _ in range(steps): online_map.move(dx / steps, dy / steps) sleep(0.05) def normalize_interframe_method(method: str) -> str: return "optical_flow" if method == "optical-flow" else method def get_trajectory_dir(name: str) -> Path: dir = Path('trajectories') if not dir.exists(): dir.mkdir() return dir / name def save_marked_trajectory_map(map_path: Path, points: list, output_path: Path): image = Image.open(map_path).convert('RGB') draw = ImageDraw.Draw(image) width, height = image.size pixel_points = [(float(p[0]) * width, float(p[1]) * height) for p in points] if len(pixel_points) > 1: draw.line(pixel_points, fill=(255, 0, 0), width=5) radius = 8 for i, (x, y) in enumerate(pixel_points): color = (0, 180, 0) if i == 0 else (0, 90, 255) if i == len(pixel_points) - 1: color = (255, 0, 0) draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=color, outline=(255, 255, 255), width=2) image.save(output_path) def markup(name: str, map_name: str, lat: float, lon: float): dir = get_trajectory_dir(name) assert not dir.exists(), "Маршрут уже существует" dir.mkdir() map_zoom = 15 flight_zoom = 18 map_pixel_ratio = get_pixel_ratio(map_name, map_zoom) map_path = dir / 'map.jpg' online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, map_zoom) online_map.save_photo(str(map_path)) width, height = online_map.get_size() online_map.destroy() points = get_trajectory_points(str(map_path)) save_marked_trajectory_map(map_path, points, dir / 'trajectory_map.jpg') points_coords = np.array(list(map(lambda p: [ (p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height ], points))) points_coords_pixels = points_coords.copy() points_coords *= map_pixel_ratio data = { 'points': points_coords, 'drawn_points': points, 'map_image': 'map.jpg', 'trajectory_map_image': 'trajectory_map.jpg', 'map_size': (width, height), 'map_zero_point': points[0], 'map_provider': map_name, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, 'map_pixel_ratio': map_pixel_ratio, 'build_params': { 'name': name, 'reference': map_name, 'lat': lat, 'lon': lon, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, }, 'map_extent': make_map_extent(width, height, points[0], map_pixel_ratio), 'route_length': calc_polyline_length(points_coords), 'route_length_meters': calc_polyline_length(points_coords), 'route_length_pixels': calc_polyline_length(points_coords_pixels), } with (dir / 'positions.pkl').open('wb') as file: pickle.dump(data, file) print(points_coords) print("WRITE POINTS:", points) def build(name: str, map_name: str, lat: float, lon: float): dir = get_trajectory_dir(name) if not dir.exists(): markup(name, map_name, lat, lon) dir_chunks = dir / 'chunks' dir_chunks.mkdir(exist_ok=True) file_positions = dir / 'positions.pkl' with file_positions.open('rb') as file: data = pickle.load(file) build_params = data.get('build_params', {}) map_name = data.get('map_provider', build_params.get('reference', map_name)) lat = build_params.get('lat', lat) lon = build_params.get('lon', lon) points_coords = data['points'] points = data.get('drawn_points') if points is None: points = [data['map_zero_point']] map_zoom = data.get('map_zoom', 15) flight_zoom = data.get('flight_zoom', 18) online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, map_zoom) # Начнём симуляцию полёта с первой точки online_map.make_as_center(*points[0]) sleep(3) online_map.scroll(0.5, 0.5, 10, True) sleep(2) geo = online_map.get_geolocation() online_map.open(geo['lat'], geo['lon'], flight_zoom) sleep(5) points_coords_pixel = points_coords.copy() / online_map.pixel_ratio pilot = autopilot.AutoPilot(points_coords_pixel, pixel_ratio=online_map.pixel_ratio) simulator = Simulator(online_map) pilot.target_idx = 0 pilot.pos = simulator.pos.copy() command = pilot.make_command() positions: list[Position] = [] final_build_yaw = simulator.pos.yaw print("points_coords_pixel:", points_coords_pixel) for i in range(5000): if command.stop: final_build_yaw = simulator.pos.yaw final_pos = simulator.pos.copy() * online_map.pixel_ratio final_pos.x = points_coords[-1][0] final_pos.y = points_coords[-1][1] final_pos.z = 1 final_pos.pitch = 0 final_pos.roll = 0 if not positions or np.hypot(positions[-1].x - final_pos.x, positions[-1].y - final_pos.y) > 1: positions.append(final_pos) break chunk = simulator.get_chunk() pilot.pos = simulator.pos.copy() command = pilot.make_command() command.velocity /= online_map.pixel_ratio if command.stop: final_build_yaw = simulator.pos.yaw final_pos = simulator.pos.copy() * online_map.pixel_ratio final_pos.x = points_coords[-1][0] final_pos.y = points_coords[-1][1] final_pos.z = 1 final_pos.pitch = 0 final_pos.roll = 0 if not positions or np.hypot(positions[-1].x - final_pos.x, positions[-1].y - final_pos.y) > 1: positions.append(final_pos) break print("Position:", simulator.pos) # Save Image chunk.save_image(dir_chunks / f"chunk_{len(positions)}.png") positions.append(simulator.pos.copy() * online_map.pixel_ratio) simulator.handle(command.dangle, command.velocity) if i == 0 and map_name == 'google': simulator.pos.x = 0 simulator.pos.y = 0 sleep(1.5) data.update({ 'points': points_coords, 'chunk_positions': positions, 'final_build_position': positions[-1].copy() if positions else None, 'final_build_yaw': final_build_yaw, 'initial_geolocation': geo, 'map_provider': map_name, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, 'flight_pixel_ratio': online_map.pixel_ratio, 'build_params': { 'name': name, 'reference': map_name, 'lat': lat, 'lon': lon, 'map_zoom': map_zoom, 'flight_zoom': flight_zoom, }, 'build_flight_length': calc_polyline_length([[pos.x, pos.y] for pos in positions]), 'build_flight_length_meters': calc_polyline_length([[pos.x, pos.y] for pos in positions]), 'build_flight_length_pixels': calc_polyline_length([[pos.x / online_map.pixel_ratio, pos.y / online_map.pixel_ratio] for pos in positions]), }) print(points_coords) with file_positions.open('wb') as file: pickle.dump(data, file) print("WRITE POINTS:", points) sleep(15) online_map.destroy() def run( name: str, map_name: str, ref_min_distance: float, use_sian_similarity: bool = False, use_gan: bool = False, interframe_method: str = "optical_flow", landmark_method: str = "orb", use_landmarks: bool = True, run_params: dict | None = None, ): dir = Path('trajectories') assert dir.exists() dir /= name assert dir.exists(), "Укажите корректное название маршрута" dir_chunks = dir / 'chunks' file_positions = dir / 'positions.pkl' interframe_method = normalize_interframe_method(interframe_method) with file_positions.open('rb') as file: data = pickle.load(file) assert 'initial_geolocation' in data, "Маршрут размечен, но не наполнен chunks. Сначала запустите --mode build." initial_geolocation = data['initial_geolocation'] lat = initial_geolocation['lat'] lon = initial_geolocation['lon'] online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 18) sleep(2) chunks: list[VisionChunk] = [] if use_landmarks: for i in range(len(data['chunk_positions'])): pos = data['chunk_positions'][i] chunk_path = dir_chunks / f"chunk_{i}.png" if not chunk_path.exists(): continue if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance: chunk = VisionChunk.load_image(chunk_path, landmark_method) chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio chunks.append(chunk) r = 0 for i in range(len(data['points']) - 1): r += np.hypot( data['points'][i][0] - data['points'][i+1][0], data['points'][i][1] - data['points'][i+1][1] ) print("R: ", r) forward_points = data['points'] planned_route_length_meters = float(data.get('route_length_meters', data.get('route_length', calc_polyline_length(forward_points)))) planned_route_length_pixels = float(data.get('route_length_pixels', planned_route_length_meters / data.get('map_pixel_ratio', online_map.pixel_ratio))) build_flight_length_meters = float(data.get('build_flight_length_meters', data.get('build_flight_length', calc_polyline_length([[pos.x, pos.y] for pos in data.get('chunk_positions', [])])))) build_flight_length_pixels = float(data.get('build_flight_length_pixels', build_flight_length_meters / online_map.pixel_ratio)) return_points = forward_points[::-1].copy() points = return_points / online_map.pixel_ratio print("READ RTH POINTS:", points) print("Saved planned route length:", planned_route_length_meters, "m /", planned_route_length_pixels, "px") print("Saved build flight length:", build_flight_length_meters, "m /", build_flight_length_pixels, "px") vis_manager = VisualizationManager() map_path = dir / data.get('map_image', 'map.jpg') if map_path.exists() and 'map_extent' in data: turn_point = data['chunk_positions'][-1] if data.get('chunk_positions') else forward_points[-1] build_trajectory = np.array([[pos.x, pos.y] for pos in data.get('chunk_positions', [])]) if len(build_trajectory) == 0: build_trajectory = forward_points map_extent = data['map_extent'] if 'map_pixel_ratio' in data and 'map_size' in data and 'map_zero_point' in data: map_extent = make_map_extent(data['map_size'][0], data['map_size'][1], data['map_zero_point'], data['map_pixel_ratio']) vis_manager.set_route_map( map_path, map_extent, build_trajectory, turn_point, forward_points[0], ) pilot = autopilot.AutoPilot( points, chunks, vis_manager, online_map.pixel_ratio, use_sian_similarity=use_sian_similarity, use_gan=use_gan, interframe_method=interframe_method, landmark_method=landmark_method, use_landmarks=use_landmarks, ) simulator = Simulator(online_map) pilot.target_idx = 0 turn_position = data.get('final_build_position') if turn_position is None: turn_position = data['chunk_positions'][-1].copy() if data.get('chunk_positions') else Position(forward_points[-1][0], forward_points[-1][1]) else: turn_position = turn_position.copy() final_build_yaw = data.get('final_build_yaw', turn_position.yaw) start_position = Position( turn_position.x / online_map.pixel_ratio, turn_position.y / online_map.pixel_ratio, 1, final_build_yaw + np.pi, 0, 0, ) simulator.pos = start_position.copy() pilot.pos = start_position.copy() if len(points) > 1: distance_to_first_return_point = np.hypot(points[0][0] - pilot.pos.x, points[0][1] - pilot.pos.y) * online_map.pixel_ratio if distance_to_first_return_point < pilot.direct_step_threshold: pilot.target_idx = 1 move_map_safely(online_map, start_position.x, start_position.y) vis_manager.update_rth_trajectory(turn_position.x, turn_position.y) chunk = simulator.get_chunk() command = pilot.handle(chunk) vis_manager.update_display() vis_manager.pause(1) vis_manager.set_target_points(return_points) proc_time = np.array([]) errors = [] start_point = forward_points[0] final_return_error = None return_route_length_meters = 0.0 prev_return_point = np.array([turn_position.x, turn_position.y], dtype=float) odometry_warmup_frames = 2 sleep(5) for i in range(10000000000): if i > 0: simulator.set_pitch(np.sin(i / 10) * 5) simulator.set_roll(np.sin(i / 15) * 5) simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3) if command.stop: break chunk = simulator.get_chunk() command = pilot.handle(chunk) if i < odometry_warmup_frames: pilot.pos = simulator.pos.copy() command = pilot.make_command() command.velocity /= online_map.pixel_ratio proc_time = np.append(proc_time, command.proccessing_time) # Save Image chunk.save_image(Path('images') / f"photo_{i}.png") vis_manager.update_display() vis_manager.pause(0.2) vis_manager.set_target_index(pilot.target_idx) current_return_point = np.array([simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio], dtype=float) return_route_length_meters += float(np.hypot(*(current_return_point - prev_return_point))) prev_return_point = current_return_point vis_manager.update_drone_trajectory(pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio) vis_manager.update_global_map(current_return_point[0], current_return_point[1]) vis_manager.update_rth_trajectory(current_return_point[0], current_return_point[1]) if i >= odometry_warmup_frames: vis_manager.update_error_plot(i, pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio, current_return_point[0], current_return_point[1]) errors.append(np.hypot((pilot.pos.x - simulator.pos.x) * online_map.pixel_ratio, (pilot.pos.y - simulator.pos.y) * online_map.pixel_ratio)) vis_manager.update_display() vis_manager.pause(0.2) last_proc_times = proc_time[-30:] print(F"\nImage #{i}") print("Average FPS:", 1 / last_proc_times.mean()) print("Pilot coords:", pilot.pos) print("Simulator coords:", simulator.pos) if command.stop: break sleep(0.5) simulator.handle(command.dangle, command.velocity) final_point = np.array([simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio]) final_return_error = float(np.hypot(final_point[0] - start_point[0], final_point[1] - start_point[1])) return_route_length_pixels = return_route_length_meters / online_map.pixel_ratio full_route_length_meters = build_flight_length_meters + return_route_length_meters full_route_length_pixels = build_flight_length_pixels + return_route_length_pixels final_return_error_pixels = final_return_error / online_map.pixel_ratio valid_proc_time = proc_time[proc_time > 0] fps_values = (1 / valid_proc_time).tolist() if len(valid_proc_time) > 0 else [] average_fps = float(np.mean(fps_values)) if fps_values else None median_fps = float(np.median(fps_values)) if fps_values else None min_fps = float(np.min(fps_values)) if fps_values else None max_fps = float(np.max(fps_values)) if fps_values else None average_processing_time = float(np.mean(valid_proc_time)) if len(valid_proc_time) > 0 else None vis_manager.set_final_point(final_point[0], final_point[1], final_return_error) print("Errors:", errors) print("MSE:", (np.array(errors) ** 2).mean()) print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5) print("Return route length:", return_route_length_meters, "m /", return_route_length_pixels, "px") print("Full route length:", full_route_length_meters, "m /", full_route_length_pixels, "px") print("Final-to-start error:", final_return_error, "m /", final_return_error_pixels, "px") print("Average FPS:", average_fps) test_run_dir = make_test_run_dir(name) vis_manager.save_plots(test_run_dir) metrics = { 'run_params': run_params or { 'name': name, 'simulation': map_name, 'ref_min_distance': ref_min_distance, 'use_sian_similarity': use_sian_similarity, 'use_gan': use_gan, 'use_landmarks': use_landmarks, 'interframe_method': interframe_method, 'landmark_method': landmark_method, }, 'build_params': data.get('build_params'), 'position_errors': errors, 'processing_times': proc_time.tolist(), 'fps_values': fps_values, 'average_fps': average_fps, 'median_fps': median_fps, 'min_fps': min_fps, 'max_fps': max_fps, 'average_processing_time': average_processing_time, 'mse': float((np.array(errors) ** 2).mean()) if errors else None, 'rmse': float((np.array(errors) ** 2).mean() ** 0.5) if errors else None, 'planned_route_length': planned_route_length_meters, 'build_flight_length': build_flight_length_meters, 'return_route_length': return_route_length_meters, 'full_route_length': full_route_length_meters, 'final_to_start_error': final_return_error, 'planned_route_length_meters': planned_route_length_meters, 'planned_route_length_pixels': planned_route_length_pixels, 'build_flight_length_meters': build_flight_length_meters, 'build_flight_length_pixels': build_flight_length_pixels, 'return_route_length_meters': return_route_length_meters, 'return_route_length_pixels': return_route_length_pixels, 'full_route_length_meters': full_route_length_meters, 'full_route_length_pixels': full_route_length_pixels, 'final_to_start_error_meters': final_return_error, 'final_to_start_error_pixels': final_return_error_pixels, 'map_pixel_ratio': data.get('map_pixel_ratio'), 'flight_pixel_ratio': online_map.pixel_ratio, 'final_build_yaw': final_build_yaw, 'rth_start_yaw': start_position.yaw, 'start_point': start_point, 'turn_point': np.array([turn_position.x, turn_position.y]), 'final_point': final_point, } with (test_run_dir / 'metrics.pkl').open('wb') as file: pickle.dump(metrics, file) with (test_run_dir / 'metrics.txt').open('w', encoding='utf-8') as file: for key, value in metrics.items(): file.write(f"{key}: {value}\n") vis_manager.show_final() def parse_args(): """Парсер аргументов командной строки""" parser = argparse.ArgumentParser(description='Обработка траекторий') # Добавляем обязательный аргумент --mode parser.add_argument( '--mode', type=str, required=True, choices=['standalone', 'markup', 'build', 'run'], help='Режим работы: standalone, markup, build или run' ) # Добавляем опциональный аргумент --name parser.add_argument( '--name', type=str, default=utility.generate_folder_name(), help='Название траектории' ) # Координаты parser.add_argument( '--lat', type=float, default=49.103814, help='Широта (по умолчанию 49.103814)' ) parser.add_argument( '--lon', type=float, default=55.794258, help='Долгота (по умолчанию 55.794258)' ) # Источник эталонных изображений (ориентиров) parser.add_argument( '--reference', type=str, default='google', choices=['google', 'yandex'], help='Откуда берутся эталонные изображения (ориентиры): google или yandex (по умолчанию google)' ) # Место проведения симуляции parser.add_argument( '--simulation', type=str, default='yandex', choices=['google', 'yandex'], help='Где проводится симуляция: google или yandex (по умолчанию yandex)' ) # Место проведения симуляции parser.add_argument( '--ref-min-distance', type=float, default=100, help='Минимальное расстояние между эталонами' ) # Место проведения симуляции parser.add_argument( '--debug-fps', action='store_true', help='Включить отладку FPS' ) # Место проведения симуляции parser.add_argument( '--debug-landmark', action='store_true', help='Включить отладку эталонов' ) parser.add_argument( '--use-sian-similarity', action='store_true', help='Выбирать ориентир через SiaN similarity вместо ближайшего по текущей позиции' ) parser.add_argument( '--use-gan', action='store_true', help='Преобразовывать эталонный vision_chunk через GAN перед поиском ключевых точек' ) parser.add_argument( '--use-landmarks', dest='use_landmarks', action='store_true', default=True, help='Использовать эталоны для коррекции позиции (по умолчанию включено)' ) parser.add_argument( '--no-landmarks', dest='use_landmarks', action='store_false', help='Не использовать эталоны, лететь только по межкадровой одометрии' ) parser.add_argument( '--interframe-method', type=str, default='optical-flow', choices=['orb', 'akaze', 'sift', 'brisk', 'optical-flow', 'optical_flow'], help='Метод межкадрового сравнения: orb, akaze, sift, brisk или optical-flow (по умолчанию optical-flow)' ) parser.add_argument( '--landmark-method', '--reference-method', dest='landmark_method', type=str, default='orb', choices=['orb', 'akaze', 'sift', 'brisk'], help='Метод сравнения с эталонами: orb, akaze, sift или brisk (по умолчанию orb)' ) # Парсим аргументы args = parser.parse_args() # Проверяем, что для markup, build и run указан --name if args.mode in ['markup', 'build', 'run'] and not args.name: parser.error(f"--name обязателен для режима {args.mode}") return args if __name__ == "__main__": args = parse_args() name = args.name mode = args.mode sim: str = args.simulation ref: str = args.reference lat: float = args.lat lon: float = args.lon rmd: float = args.ref_min_distance interframe_method = normalize_interframe_method(args.interframe_method) landmark_method = args.landmark_method constants.DEBUG_FPS = args.debug_fps constants.DEBUG_LANDMARK = args.debug_landmark if mode == 'markup': markup(name, ref, lat, lon) if mode == 'build' or mode == 'standalone': build(name, ref, lat, lon) if mode == 'run' or mode == 'standalone': run( name, sim, rmd, args.use_sian_similarity, args.use_gan, interframe_method, landmark_method, args.use_landmarks, { 'mode': mode, 'name': name, 'lat': lat, 'lon': lon, 'reference': ref, 'simulation': sim, 'ref_min_distance': rmd, 'debug_fps': args.debug_fps, 'debug_landmark': args.debug_landmark, 'use_sian_similarity': args.use_sian_similarity, 'use_gan': args.use_gan, 'use_landmarks': args.use_landmarks, 'interframe_method': interframe_method, 'landmark_method': landmark_method, }, )