Files
autopilot/main.py
2026-05-31 14:07:34 +03:00

427 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from google_map import GoogleMap
from datetime import datetime
from pathlib import Path
from position import Position
from simulator import Simulator
from time import sleep
from trajectory_drawer import TrajectoryDrawer
from utility import cv2_to_pil
from vision_chunk import VisionChunk
from visualization import VisualizationManager
from yandex_map import YandexMap
import argparse
import autopilot
import constants
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pickle
import random
import shutil
import utility
def get_map(map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=18):
if map_name == 'google': return GoogleMap(lat, lon, zoom)
if map_name == 'yandex': return YandexMap(lat, lon, zoom)
return None
def make_global_photo(filename, map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=13):
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, zoom)
online_map.save_photo(filename)
online_map.destroy()
def get_trajectory_points(bg_img: str) -> list[(float, float)]:
trajectoryDrawer = TrajectoryDrawer(bg_img)
trajectoryDrawer.show()
trajectoryDrawer.wait()
points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points))
return points
def make_map_extent(width: float, height: float, origin_point: list[float], pixel_ratio: float) -> tuple[float, float, float, float]:
"""Возвращает границы снимка карты в координатах маршрута."""
origin_x = origin_point[0] * width
origin_y = origin_point[1] * height
left = (0 - origin_x) * pixel_ratio
right = (width - origin_x) * pixel_ratio
bottom = (origin_y - height) * pixel_ratio
top = origin_y * pixel_ratio
return (left, right, bottom, top)
def make_test_run_dir(name: str) -> Path:
run_dir = Path('test_runs') / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}_{name}"
run_dir.mkdir(parents=True, exist_ok=True)
return run_dir
def move_map_safely(online_map: YandexMap | GoogleMap, dx: float, dy: float, step: int = 300):
"""Двигает карту короткими drag-шагами, чтобы Selenium не выходил за viewport."""
steps = max(1, int(np.ceil(max(abs(dx), abs(dy)) / step)))
for _ in range(steps):
online_map.move(dx / steps, dy / steps)
sleep(0.05)
def build(name: str, map_name: str, lat: float, lon: float):
# Создание папки с информацией о маршруте
dir = Path('trajectories')
if not dir.exists(): dir.mkdir()
dir /= name
assert not dir.exists()
dir.mkdir()
dir_chunks = dir / 'chunks'
dir_chunks.mkdir()
make_global_photo('map.jpg', map_name, lat, lon, 15)
map_path = dir / 'map.jpg'
shutil.copyfile('map.jpg', map_path)
points = get_trajectory_points(str(map_path))
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 15)
width, height = online_map.get_size()
points_coords = np.array(list(map(lambda p: [
(p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height
], points)))
points_coords *= online_map.pixel_ratio
# Начнём симуляцию полёта с первой точки
online_map.make_as_center(*points[0])
sleep(3)
online_map.scroll(0.5, 0.5, 10, True)
sleep(2)
geo = online_map.get_geolocation()
online_map.open(geo['lat'], geo['lon'], 18)
sleep(5)
points_coords_pixel = points_coords.copy() / online_map.pixel_ratio
pilot = autopilot.AutoPilot(points_coords_pixel, pixel_ratio=online_map.pixel_ratio)
simulator = Simulator(online_map)
pilot.target_idx = 0
pilot.pos = simulator.pos.copy()
command = pilot.make_command()
positions: list[Position] = []
print("points_coords_pixel:", points_coords_pixel)
for i in range(5000):
if command.stop:
break
chunk = simulator.get_chunk()
pilot.pos = simulator.pos.copy()
command = pilot.make_command()
command.velocity /= online_map.pixel_ratio
print("Position:", simulator.pos)
# Save Image
chunk.save_image(dir_chunks / f"chunk_{len(positions)}.png")
positions.append(simulator.pos.copy() * online_map.pixel_ratio)
simulator.handle(command.dangle, command.velocity)
if i == 0 and map_name == 'google':
simulator.pos.x = 0
simulator.pos.y = 0
sleep(1.5)
data = {
'points': points_coords,
'chunk_positions': positions,
'initial_geolocation': geo,
'map_image': 'map.jpg',
'map_size': (width, height),
'map_zero_point': points[0],
'map_extent': make_map_extent(width, height, points[0], online_map.pixel_ratio),
}
print(points_coords)
file_positions = dir / 'positions.pkl'
with file_positions.open('wb') as file:
pickle.dump(data, file)
print("WRITE POINTS:", points)
sleep(15)
online_map.destroy()
def run(
name: str,
map_name: str,
ref_min_distance: float,
use_sian_similarity: bool = False,
use_gan: bool = False,
):
dir = Path('trajectories')
assert dir.exists()
dir /= name
assert dir.exists(), "Укажите корректное название маршрута"
dir_chunks = dir / 'chunks'
file_positions = dir / 'positions.pkl'
with file_positions.open('rb') as file:
data = pickle.load(file)
initial_geolocation = data['initial_geolocation']
lat = initial_geolocation['lat']
lon = initial_geolocation['lon']
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 18)
sleep(2)
chunks: list[VisionChunk] = []
for i in range(len(data['chunk_positions'])):
pos = data['chunk_positions'][i]
if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance:
chunk = VisionChunk.load_image(dir_chunks / f"chunk_{i}.png")
chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio
chunks.append(chunk)
r = 0
for i in range(len(data['points']) - 1):
r += np.hypot(
data['points'][i][0] - data['points'][i+1][0],
data['points'][i][1] - data['points'][i+1][1]
)
print("R: ", r)
forward_points = data['points']
return_points = forward_points[::-1].copy()
points = return_points / online_map.pixel_ratio
print("READ RTH POINTS:", points)
vis_manager = VisualizationManager()
map_path = dir / data.get('map_image', 'map.jpg')
if map_path.exists() and 'map_extent' in data:
turn_point = data['chunk_positions'][-1] if data.get('chunk_positions') else forward_points[-1]
build_trajectory = np.array([[pos.x, pos.y] for pos in data.get('chunk_positions', [])])
if len(build_trajectory) == 0:
build_trajectory = forward_points
vis_manager.set_route_map(
map_path,
data['map_extent'],
build_trajectory,
turn_point,
forward_points[0],
)
pilot = autopilot.AutoPilot(
points,
chunks,
vis_manager,
online_map.pixel_ratio,
use_sian_similarity=use_sian_similarity,
use_gan=use_gan,
)
simulator = Simulator(online_map)
pilot.target_idx = 0
turn_position = data['chunk_positions'][-1].copy() if data.get('chunk_positions') else Position(forward_points[-1][0], forward_points[-1][1])
start_position = turn_position / online_map.pixel_ratio
start_position.yaw += np.pi
simulator.pos = start_position.copy()
pilot.pos = start_position.copy()
move_map_safely(online_map, start_position.x, start_position.y)
vis_manager.update_rth_trajectory(turn_position.x, turn_position.y)
chunk = simulator.get_chunk()
command = pilot.handle(chunk)
vis_manager.update_display()
vis_manager.pause(1)
vis_manager.set_target_points(return_points)
proc_time = np.array([])
errors = []
start_point = forward_points[0]
final_return_error = None
sleep(1)
for i in range(10000000000):
if i > 0:
simulator.set_pitch(np.sin(i / 10) * 5)
simulator.set_roll(np.sin(i / 15) * 5)
simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3)
if command.stop:
break
chunk = simulator.get_chunk()
command = pilot.handle(chunk)
command.velocity /= online_map.pixel_ratio
proc_time = np.append(proc_time, command.proccessing_time)
# Save Image
chunk.save_image(Path('images') / f"photo_{i}.png")
vis_manager.update_display()
vis_manager.pause(0.2)
vis_manager.set_target_index(pilot.target_idx)
vis_manager.update_drone_trajectory(pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio)
vis_manager.update_global_map(simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
vis_manager.update_rth_trajectory(simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
vis_manager.update_error_plot(i, pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio, simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
errors.append(np.hypot((pilot.pos.x - simulator.pos.x) * online_map.pixel_ratio, (pilot.pos.y - simulator.pos.y) * online_map.pixel_ratio))
vis_manager.update_display()
vis_manager.pause(0.2)
last_proc_times = proc_time[-30:]
print(F"\nImage #{i}")
print("Average FPS:", 1 / last_proc_times.mean())
print("Pilot coords:", pilot.pos)
print("Simulator coords:", simulator.pos)
sleep(0.5)
simulator.handle(command.dangle, command.velocity)
final_point = np.array([simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio])
final_return_error = float(np.hypot(final_point[0] - start_point[0], final_point[1] - start_point[1]))
vis_manager.set_final_point(final_point[0], final_point[1], final_return_error)
print("Errors:", errors)
print("MSE:", (np.array(errors) ** 2).mean())
print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5)
print("Return error:", final_return_error)
print("Average FPS:", 1 / proc_time.mean())
test_run_dir = make_test_run_dir(name)
vis_manager.save_plots(test_run_dir)
with (test_run_dir / 'metrics.pkl').open('wb') as file:
pickle.dump({
'position_errors': errors,
'mse': float((np.array(errors) ** 2).mean()) if errors else None,
'rmse': float((np.array(errors) ** 2).mean() ** 0.5) if errors else None,
'return_error': final_return_error,
'start_point': start_point,
'turn_point': np.array([turn_position.x, turn_position.y]),
'final_point': final_point,
}, file)
vis_manager.show_final()
def parse_args():
"""Парсер аргументов командной строки"""
parser = argparse.ArgumentParser(description='Обработка траекторий')
# Добавляем обязательный аргумент --mode
parser.add_argument(
'--mode',
type=str,
required=True,
choices=['standalone', 'build', 'run'],
help='Режим работы: standalone, build или run'
)
# Добавляем опциональный аргумент --name
parser.add_argument(
'--name',
type=str,
default=utility.generate_folder_name(),
help='Название траектории'
)
# Координаты
parser.add_argument(
'--lat',
type=float,
default=49.103814,
help='Широта (по умолчанию 49.103814)'
)
parser.add_argument(
'--lon',
type=float,
default=55.794258,
help='Долгота (по умолчанию 55.794258)'
)
# Источник эталонных изображений (ориентиров)
parser.add_argument(
'--reference',
type=str,
default='google',
choices=['google', 'yandex'],
help='Откуда берутся эталонные изображения (ориентиры): google или yandex (по умолчанию google)'
)
# Место проведения симуляции
parser.add_argument(
'--simulation',
type=str,
default='yandex',
choices=['google', 'yandex'],
help='Где проводится симуляция: google или yandex (по умолчанию yandex)'
)
# Место проведения симуляции
parser.add_argument(
'--ref-min-distance',
type=float,
default=100,
help='Минимальное расстояние между эталонами'
)
# Место проведения симуляции
parser.add_argument(
'--debug-fps',
action='store_true',
help='Включить отладку FPS'
)
# Место проведения симуляции
parser.add_argument(
'--debug-landmark',
action='store_true',
help='Включить отладку эталонов'
)
parser.add_argument(
'--use-sian-similarity',
action='store_true',
help='Выбирать ориентир через SiaN similarity вместо ближайшего по текущей позиции'
)
parser.add_argument(
'--use-gan',
action='store_true',
help='Преобразовывать эталонный vision_chunk через GAN перед поиском ключевых точек'
)
# Парсим аргументы
args = parser.parse_args()
# Проверяем, что для build и run указан --name
if args.mode in ['build', 'run'] and not args.name:
parser.error(f"--name обязателен для режима {args.mode}")
return args
if __name__ == "__main__":
args = parse_args()
name = args.name
mode = args.mode
sim: str = args.simulation
ref: str = args.reference
lat: float = args.lat
lon: float = args.lon
rmd: float = args.ref_min_distance
constants.DEBUG_FPS = args.debug_fps
constants.DEBUG_LANDMARK = args.debug_landmark
if mode == 'build' or mode == 'standalone':
build(name, ref, lat, lon)
if mode == 'run' or mode == 'standalone':
run(name, sim, rmd, args.use_sian_similarity, args.use_gan)