323 lines
10 KiB
Python
323 lines
10 KiB
Python
from google_map import GoogleMap
|
||
from pathlib import Path
|
||
from position import Position
|
||
from simulator import Simulator
|
||
from time import sleep
|
||
from trajectory_drawer import TrajectoryDrawer
|
||
from utility import cv2_to_pil
|
||
from vision_chunk import VisionChunk
|
||
from visualization import VisualizationManager
|
||
from yandex_map import YandexMap
|
||
|
||
import argparse
|
||
import autopilot
|
||
import constants
|
||
import cv2
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
import pickle
|
||
import random
|
||
import utility
|
||
|
||
def get_map(map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=18):
|
||
if map_name == 'google': return GoogleMap(lat, lon, zoom)
|
||
if map_name == 'yandex': return YandexMap(lat, lon, zoom)
|
||
return None
|
||
|
||
def make_global_photo(filename, map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=13):
|
||
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, zoom)
|
||
online_map.save_photo(filename)
|
||
online_map.destroy()
|
||
|
||
def get_trajectory_points(bg_img: str) -> list[(float, float)]:
|
||
trajectoryDrawer = TrajectoryDrawer(bg_img)
|
||
trajectoryDrawer.show()
|
||
trajectoryDrawer.wait()
|
||
points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points))
|
||
return points
|
||
|
||
def build(name: str, map_name: str, lat: float, lon: float):
|
||
|
||
# Создание папки с информацией о маршруте
|
||
dir = Path('trajectories')
|
||
if not dir.exists(): dir.mkdir()
|
||
dir /= name
|
||
assert not dir.exists()
|
||
dir.mkdir()
|
||
dir_chunks = dir / 'chunks'
|
||
dir_chunks.mkdir()
|
||
|
||
make_global_photo('map.jpg', map_name, lat, lon, 15)
|
||
points = get_trajectory_points('map.jpg')
|
||
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 15)
|
||
|
||
|
||
width, height = online_map.get_size()
|
||
points_coords = np.array(list(map(lambda p: [
|
||
(p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height
|
||
], points)))
|
||
|
||
points_coords *= online_map.pixel_ratio
|
||
|
||
# Начнём симуляцию полёта с первой точки
|
||
online_map.make_as_center(*points[0])
|
||
sleep(3)
|
||
online_map.scroll(0.5, 0.5, 10, True)
|
||
sleep(2)
|
||
geo = online_map.get_geolocation()
|
||
online_map.open(geo['lat'], geo['lon'], 18)
|
||
sleep(5)
|
||
|
||
points_coords_pixel = points_coords.copy() / online_map.pixel_ratio
|
||
|
||
pilot = autopilot.AutoPilot(points_coords_pixel, pixel_ratio=online_map.pixel_ratio)
|
||
simulator = Simulator(online_map)
|
||
pilot.target_idx = 0
|
||
|
||
pilot.pos = simulator.pos.copy()
|
||
command = pilot.make_command()
|
||
|
||
positions: list[Position] = []
|
||
|
||
print("points_coords_pixel:", points_coords_pixel)
|
||
for i in range(5000):
|
||
|
||
if command.stop:
|
||
break
|
||
|
||
chunk = simulator.get_chunk()
|
||
pilot.pos = simulator.pos.copy()
|
||
command = pilot.make_command()
|
||
command.velocity /= online_map.pixel_ratio
|
||
|
||
print("Position:", simulator.pos)
|
||
|
||
# Save Image
|
||
chunk.save_image(dir_chunks / f"chunk_{len(positions)}.png")
|
||
|
||
positions.append(simulator.pos.copy() * online_map.pixel_ratio)
|
||
|
||
simulator.handle(command.dangle, command.velocity)
|
||
if i == 0 and map_name == 'google':
|
||
simulator.pos.x = 0
|
||
simulator.pos.y = 0
|
||
sleep(1.5)
|
||
|
||
data = {
|
||
'points': points_coords,
|
||
'chunk_positions': positions,
|
||
'initial_geolocation': geo
|
||
}
|
||
|
||
print(points_coords)
|
||
|
||
file_positions = dir / 'positions.pkl'
|
||
with file_positions.open('wb') as file:
|
||
pickle.dump(data, file)
|
||
|
||
print("WRITE POINTS:", points)
|
||
|
||
sleep(15)
|
||
online_map.destroy()
|
||
|
||
def run(name: str, map_name: str, ref_min_distance: float):
|
||
dir = Path('trajectories')
|
||
assert dir.exists()
|
||
dir /= name
|
||
assert dir.exists(), "Укажите корректное название маршрута"
|
||
dir_chunks = dir / 'chunks'
|
||
file_positions = dir / 'positions.pkl'
|
||
|
||
with file_positions.open('rb') as file:
|
||
data = pickle.load(file)
|
||
|
||
initial_geolocation = data['initial_geolocation']
|
||
|
||
lat = initial_geolocation['lat']
|
||
lon = initial_geolocation['lon']
|
||
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 18)
|
||
sleep(2)
|
||
|
||
chunks: list[VisionChunk] = []
|
||
for i in range(len(data['chunk_positions'])):
|
||
pos = data['chunk_positions'][i]
|
||
if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance:
|
||
chunk = VisionChunk.load_image(dir_chunks / f"chunk_{i}.png")
|
||
chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio
|
||
chunks.append(chunk)
|
||
|
||
points = data['points'] / online_map.pixel_ratio
|
||
print("READ POINTS:", points)
|
||
|
||
vis_manager = VisualizationManager()
|
||
pilot = autopilot.AutoPilot(points, chunks, vis_manager, online_map.pixel_ratio)
|
||
simulator = Simulator(online_map)
|
||
pilot.target_idx = 0
|
||
|
||
chunk = simulator.get_chunk()
|
||
command = pilot.handle(chunk)
|
||
|
||
vis_manager.update_display()
|
||
vis_manager.pause(1)
|
||
|
||
vis_manager.set_target_points(data['points'])
|
||
|
||
proc_time = np.array([])
|
||
|
||
errors = []
|
||
|
||
sleep(1)
|
||
|
||
for i in range(10000000000):
|
||
if i > 0:
|
||
simulator.set_pitch(np.sin(i / 10) * 5)
|
||
simulator.set_roll(np.sin(i / 15) * 5)
|
||
simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3)
|
||
|
||
if command.stop:
|
||
break
|
||
|
||
chunk = simulator.get_chunk()
|
||
command = pilot.handle(chunk)
|
||
command.velocity /= online_map.pixel_ratio
|
||
|
||
proc_time = np.append(proc_time, command.proccessing_time)
|
||
|
||
# Save Image
|
||
chunk.save_image(Path('images') / f"photo_{i}.png")
|
||
|
||
vis_manager.update_display()
|
||
vis_manager.pause(0.2)
|
||
|
||
vis_manager.set_target_index(pilot.target_idx)
|
||
vis_manager.update_drone_trajectory(pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio)
|
||
vis_manager.update_global_map(simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
|
||
vis_manager.update_error_plot(i, pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio, simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
|
||
|
||
errors.append(np.hypot((pilot.pos.x - simulator.pos.x) * online_map.pixel_ratio, (pilot.pos.y - simulator.pos.y) * online_map.pixel_ratio))
|
||
|
||
vis_manager.update_display()
|
||
vis_manager.pause(0.2)
|
||
|
||
last_proc_times = proc_time[-30:]
|
||
print(F"\nImage #{i}")
|
||
print("Average FPS:", 1 / last_proc_times.mean())
|
||
print("Pilot coords:", pilot.pos)
|
||
print("Simulator coords:", simulator.pos)
|
||
sleep(0.5)
|
||
simulator.handle(command.dangle, command.velocity)
|
||
if i == 0 and map_name == 'google':
|
||
simulator.pos.x = 0
|
||
simulator.pos.y = 0
|
||
|
||
print("Errors:", errors)
|
||
print("MSE:", (np.array(errors) ** 2).mean())
|
||
print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5)
|
||
print("Average FPS:", 1 / proc_time.mean())
|
||
vis_manager.show_final()
|
||
|
||
def parse_args():
|
||
"""Парсер аргументов командной строки"""
|
||
parser = argparse.ArgumentParser(description='Обработка траекторий')
|
||
|
||
# Добавляем обязательный аргумент --mode
|
||
parser.add_argument(
|
||
'--mode',
|
||
type=str,
|
||
required=True,
|
||
choices=['standalone', 'build', 'run'],
|
||
help='Режим работы: standalone, build или run'
|
||
)
|
||
|
||
# Добавляем опциональный аргумент --name
|
||
parser.add_argument(
|
||
'--name',
|
||
type=str,
|
||
default=utility.generate_folder_name(),
|
||
help='Название траектории'
|
||
)
|
||
|
||
# Координаты
|
||
parser.add_argument(
|
||
'--lat',
|
||
type=float,
|
||
default=49.103814,
|
||
help='Широта (по умолчанию 49.103814)'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--lon',
|
||
type=float,
|
||
default=55.794258,
|
||
help='Долгота (по умолчанию 55.794258)'
|
||
)
|
||
|
||
# Источник эталонных изображений (ориентиров)
|
||
parser.add_argument(
|
||
'--reference',
|
||
type=str,
|
||
default='google',
|
||
choices=['google', 'yandex'],
|
||
help='Откуда берутся эталонные изображения (ориентиры): google или yandex (по умолчанию google)'
|
||
)
|
||
|
||
# Место проведения симуляции
|
||
parser.add_argument(
|
||
'--simulation',
|
||
type=str,
|
||
default='yandex',
|
||
choices=['google', 'yandex'],
|
||
help='Где проводится симуляция: google или yandex (по умолчанию yandex)'
|
||
)
|
||
|
||
# Место проведения симуляции
|
||
parser.add_argument(
|
||
'--ref-min-distance',
|
||
type=float,
|
||
default=100,
|
||
help='Минимальное расстояние между эталонами'
|
||
)
|
||
|
||
# Место проведения симуляции
|
||
parser.add_argument(
|
||
'--debug-fps',
|
||
action='store_true',
|
||
help='Включить отладку FPS'
|
||
)
|
||
|
||
# Место проведения симуляции
|
||
parser.add_argument(
|
||
'--debug-landmark',
|
||
action='store_true',
|
||
help='Включить отладку эталонов'
|
||
)
|
||
|
||
# Парсим аргументы
|
||
args = parser.parse_args()
|
||
|
||
# Проверяем, что для build и run указан --name
|
||
if args.mode in ['build', 'run'] and not args.name:
|
||
parser.error(f"--name обязателен для режима {args.mode}")
|
||
|
||
return args
|
||
|
||
|
||
if __name__ == "__main__":
|
||
args = parse_args()
|
||
name = args.name
|
||
mode = args.mode
|
||
sim: str = args.simulation
|
||
ref: str = args.reference
|
||
lat: float = args.lat
|
||
lon: float = args.lon
|
||
rmd: float = args.ref_min_distance
|
||
|
||
constants.DEBUG_FPS = args.debug_fps
|
||
constants.DEBUG_LANDMARK = args.debug_landmark
|
||
|
||
if mode == 'build' or mode == 'standalone':
|
||
build(name, ref, lat, lon)
|
||
|
||
if mode == 'run' or mode == 'standalone':
|
||
run(name, sim, rmd)
|