Files
autopilot/autopilot.py

342 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from enum import Enum
from pathlib import Path
import math
import random
import cv2
import numpy as np
from PIL import Image
from timer import Timer
from vision_chunk import VisionChunk
from visualization import VisualizationManager
from geolocation import Geolocation
random.seed(1)
class Pilot:
def __init__(self): pass
def handle(self, image: Image): pass
def act(self) -> tuple[float, float] | None: pass
def get_position(self) -> tuple[float, float]: pass
class PilotCommand:
velocity: float
dangle: float
stop: bool
proccessing_time: float
def __init__(self,
dangle: float = 0,
velocity: float = 100,
stop: bool = False,
processing_time: float = 0.
):
self.dangle = dangle
self.stop = stop
self.velocity = velocity
self.proccessing_time = processing_time
class AutoPilot(Pilot):
# Состояние одометрии
geo: Geolocation
chunks: list[VisionChunk] # Ориентиры
target_idx: int # Текущий ориентир
# Всякие вспомогательные штуки
timer: Timer
prev_chunk: VisionChunk | None
frame_count: int
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
# Положение на основе ориентира
reserved_geo: Geolocation | None
def __init__(self, points = [], chunks = [], viz_manager=None):
self.prev_chunk = None
self.geo = Geolocation(0, 0, 1, 0)
self.chunks = chunks
self.frame_count = 0
self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_geo = None
# Пороговые значения качества сопоставления/гомографии
self.min_inliers: int = 12
self.min_inlier_ratio: float = 0.5
self.max_reproj_rmse: float = 3.0
self.min_scale: float = 0.7
self.max_scale: float = 1.4
self.points = points
self.target_idx = 0
self.timer = Timer()
def get_position(self) -> tuple[float, float]:
return self.geo.x, self.geo.y
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
"""
Вычисляет оптический поток между двумя кадрами
Возвращает src_pts и dst_pts в отцентрированных координатах
"""
prev_gray = prev_chunk.to_cv2_gray()
current_gray = current_chunk.to_cv2_gray()
h, w = prev_gray.shape[:2]
cx, cy = w // 2, h // 2
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
step = 20
grid_points = []
for y in range(step, h - step, step):
for x in range(step, w - step, step):
grid_points.append([x, y])
if len(grid_points) < 4:
return None, None
p0 = np.float32(grid_points).reshape(-1, 1, 2)
# Параметры для Lucas-Kanade
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# Вычисляем разреженный оптический поток
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
# Фильтруем только успешно отслеженные точки
good_old = p0[status == 1]
good_new = p1[status == 1]
if len(good_old) < 4:
return None, None
# Преобразуем в отцентрированные координаты (Y вверх)
src_pts = np.float32([[x - cx, cy - y] for x, y in good_old]).reshape(-1, 1, 2)
dst_pts = np.float32([[x - cx, cy - y] for x, y in good_new]).reshape(-1, 1, 2)
return src_pts, dst_pts
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray):
"""
Оценивает матрицу трансформации на основе сопоставленных точек
Точки уже отцентрированы относительно центра изображения
"""
# Используем RANSAC для оценки матрицы гомографии
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 1.0)
# RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0)
if H is None:
return None
# Маска инлайеров
inliers = int(mask.sum()) if mask is not None else 0
total = int(mask.size) if mask is not None else 0
inlier_ratio = (inliers / total) if total > 0 else 0.0
# Ошибка репроекции по инлайерам (RMSE)
rmse = None
try:
# Проецируем src через H и сравниваем с dst
proj = cv2.perspectiveTransform(src_pts, H)
errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1)
if mask is not None and mask.shape[0] == errs.shape[0]:
errs = errs[mask.ravel().astype(bool)]
if errs.size > 0:
rmse = float(np.sqrt(np.mean(errs ** 2)))
except Exception:
rmse = None
# Извлекаем параметры трансформации из матрицы гомографии
# H = [a11 a12 tx]
# [a21 a22 ty]
# [0 0 1 ]
# Масштаб и поворот
a11, a12 = H[0, 0], H[0, 1]
a21, a22 = H[1, 0], H[1, 1]
# Смещение (уже отцентрировано)
tx, ty = -H[0, 2], -H[1, 2]
# print(" [Pilot] translate:", tx, ty)
# Вычисляем угол поворота
angle = -np.arctan2(a21, a11)
# Вычисляем масштаб
scale_x = np.sqrt(a11**2 + a21**2)
scale_y = np.sqrt(a12**2 + a22**2)
scale = (scale_x + scale_y) / 2
return {
'translation': (tx, ty), # Уже отцентрировано
'rotation': angle,
'scale': scale,
'homography': H,
'mask': mask,
'inliers': inliers,
'inliers_ratio': inlier_ratio,
'rmse': rmse
}
def update_drone_position(self, transformation_info: dict):
"""
Обновляет позицию и угол БПЛА на основе трансформации изображения
Координаты уже отцентрированы относительно центра изображения
"""
homography = transformation_info['homography']
self.geo @= homography
if self.reserved_geo is not None:
self.reserved_geo @= homography
def get_drone_state(self) -> dict:
"""
Возвращает текущее состояние БПЛА
"""
return {
'x': self.geo.x,
'y': self.geo.y,
'angle': self.geo.angle,
'angle_degrees': math.degrees(self.geo.angle),
'frame_count': self.frame_count
}
def get_position_by_chunk(self) -> Geolocation | None:
# Пытаемся найти ориентир на картинке:
current_chunk = self.prev_chunk
landmark_chunk = self.chunks[self.target_idx]
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled
if was_enabled: self.timer.stop()
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches)
if was_enabled: self.timer.start()
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
if landmark_transform is not None:
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
ratio = landmark_transform.get('inliers_ratio', 0.0)
ok_ratio = (ratio >= self.min_inlier_ratio)
rmse = landmark_transform.get('rmse', None)
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
# print("[HELP]")
# print("Matrix", landmark_transform['homography'])
# print("Position", self.x, self.y)
# print("Position of point", self.points[self.target_idx])
# print("[PILOT]", rmse, ratio, ok_rmse)
# if False:
# Считаем абсолютную позу относительно координат ориентира
landmark_world_x, landmark_world_y = self.points[self.target_idx]
landmark = Geolocation(landmark_world_x, landmark_world_y, 1, 0)
homography = landmark_transform['homography']
# homography = np.linalg.inv(homography)
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
return landmark @ homography
return None
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
self.timer.start()
self.frame_count += 1
if self.prev_chunk is None:
self.prev_chunk = current_chunk
self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed())
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.geo.x) ** 2 +
(self.points[self.target_idx][1] - self.geo.y) ** 2
)
# Вычисляем оптический поток для покадрового сравнения
src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
# Оцениваем смещение БПЛА
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
if transformation_info:
# Обновляем позицию и угол БПЛА (одометрия по кадрам)
self.update_drone_position(transformation_info)
self.timer.stop()
# Обновляем визуализацию
if self.vis_manager:
# Используем визуализацию движения точек по сетке
homography_matrix = transformation_info['homography']
self.vis_manager.update_homography_grid(
current_chunk.to_numpy(),
homography_matrix,
grid_step=85)
self.timer.start()
# Пытаемся найти ориентир на картинке:
self.prev_chunk = current_chunk
npos = self.get_position_by_chunk()
if npos is not None:
self.reserved_geo = npos
if distance_to_target < 35:
self.target_idx += 1
if self.target_idx == len(self.points):
return PilotCommand(stop=True)
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.geo.x) ** 2 +
(self.points[self.target_idx][1] - self.geo.y) ** 2
)
if self.reserved_geo is not None:
self.geo = self.reserved_geo
self.reserved_geo = None
# Вычисляем угол к цели
target_angle = math.atan2(self.points[self.target_idx][1] - self.geo.y, self.points[self.target_idx][0] - self.geo.x)
angle_trajectory = self.geo.angle + math.pi / 2
# print("[ANGLE]", angle_trajectory, "->", target_angle)
# Вычисляем разность углов (направление поворота)
angle_diff = target_angle - angle_trajectory
# Нормализуем разность углов в диапазон [-π, π]
angle_diff %= 2 * math.pi
if angle_diff >= math.pi:
angle_diff -= 2 * math.pi
d_r = max(10, min(35., distance_to_target / 2))
d_a_limit = d_r / 10 * 0.01
command = PilotCommand(
max(min(d_a_limit, angle_diff), -d_a_limit),
d_r, False, self.timer.get_elapsed()
)
self.timer.reset()
return command
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
"""Сбрасывает позицию и угол БПЛА"""
self.geo = Geolocation(x, y, 1, angle)