Files
autopilot/autopilot.py

294 lines
12 KiB
Python

from enum import Enum
from pathlib import Path
import math
import random
import cv2
import numpy as np
from PIL import Image
from timer import Timer
from vision_chunk import VisionChunk
from visualization import VisualizationManager
from position import Position
from constants import CAMERA_HEIGHT
from utility import estimate_transformation_matrix
random.seed(1)
class Pilot:
def __init__(self): pass
def handle(self, image: Image): pass
def act(self) -> tuple[float, float] | None: pass
def get_position(self) -> tuple[float, float]: pass
class PilotCommand:
velocity: float
dangle: float
stop: bool
proccessing_time: float
def __init__(self,
dangle: float = 0,
velocity: float = 100,
stop: bool = False,
processing_time: float = 0.
):
self.dangle = dangle
self.stop = stop
self.velocity = velocity
self.proccessing_time = processing_time
class AutoPilot(Pilot):
# Состояние одометрии
pos: Position
chunks: list[VisionChunk] # Ориентиры
target_idx: int # Текущий ориентир
# Всякие вспомогательные штуки
timer: Timer
prev_chunk: VisionChunk | None
frame_count: int
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
# Положение на основе ориентира
reserved_pos: Position | None
def __init__(self, points = [], chunks = [], viz_manager=None):
self.prev_chunk = None
self.pos = Position(0, 0, 1, 0, 0, 0)
self.chunks = chunks
self.frame_count = 0
self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_pos = None
# Пороговые значения качества сопоставления/гомографии
self.min_inliers: int = 12
self.min_inlier_ratio: float = 0.5
self.max_reproj_rmse: float = 3.0
self.min_scale: float = 0.7
self.max_scale: float = 1.4
self.points = points
self.target_idx = 0
self.timer = Timer()
def get_position(self) -> tuple[float, float]:
return self.pos.x, self.pos.y
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
"""
Вычисляет оптический поток между двумя кадрами
Возвращает src_pts и dst_pts в отцентрированных координатах
"""
prev_gray = prev_chunk.to_cv2_gray()
current_gray = current_chunk.to_cv2_gray()
h, w = prev_gray.shape[:2]
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
step = 35
grid_points = []
for y in range(step, h - step, step):
for x in range(step, w - step, step):
grid_points.append([x, y])
if len(grid_points) < 4:
return None, None
p0 = np.float32(grid_points).reshape(-1, 1, 2)
# Параметры для Lucas-Kanade
lk_params = dict(
winSize=(11, 11),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# Вычисляем разреженный оптический поток
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
# Фильтруем только успешно отслеженные точки
good_mask = err < 12.0 # порог ошибки
good_old = p0[status == 1 & good_mask]
good_new = p1[status == 1 & good_mask]
if len(good_old) < 4:
return None, None
# Преобразуем в отцентрированные координаты (Y вверх)
src_pts = np.float32([[x, y] for x, y in good_old]).reshape(-1, 1, 2)
dst_pts = np.float32([[x, y] for x, y in good_new]).reshape(-1, 1, 2)
return src_pts, dst_pts
def update_drone_position(self, homography_matrix: np.ndarray):
"""
Обновляет позицию и угол БПЛА на основе трансформации изображения
Координаты уже отцентрированы относительно центра изображения
"""
self.pos.iapply(homography_matrix)
if self.reserved_pos is not None:
self.reserved_pos.iapply(homography_matrix)
def get_drone_state(self) -> dict:
"""
Возвращает текущее состояние БПЛА
"""
return {
'x': self.pos.x,
'y': self.pos.y,
'angle': self.pos.yaw,
'angle_degrees': math.degrees(self.pos.yaw),
'frame_count': self.frame_count
}
def get_position_by_chunk(self) -> Position | None:
# Пытаемся найти ориентир на картинке:
current_chunk = self.prev_chunk
landmark_chunk = self.chunks[self.target_idx]
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled
if was_enabled: self.timer.stop()
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches)
if was_enabled: self.timer.start()
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
if landmark_transform is not None:
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
ratio = landmark_transform.get('inliers_ratio', 0.0)
ok_ratio = (ratio >= self.min_inlier_ratio)
rmse = landmark_transform.get('rmse', None)
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
# print("[HELP]")
# print("Matrix", landmark_transform['homography'])
# print("Position", self.x, self.y)
# print("Position of point", self.points[self.target_idx])
# print("[PILOT]", rmse, ratio, ok_rmse)
# if False:
# Считаем абсолютную позу относительно координат ориентира
landmark_world_x, landmark_world_y = self.points[self.target_idx]
landmark = Position(landmark_world_x, landmark_world_y, 1, 0)
homography = landmark_transform['homography']
# homography = np.linalg.inv(homography)
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
return landmark @ homography
return None
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
self.timer.start()
self.frame_count += 1
if self.prev_chunk is None:
self.prev_chunk = current_chunk
self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed())
# Вычисляем оптический поток для покадрового сравнения
matching_timer = Timer()
matching_timer.start()
# src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
matching_timer.stop()
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
# Оцениваем смещение БПЛА
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
matrix_estimation_timer = Timer()
matrix_estimation_timer.start()
homography_matrix = estimate_transformation_matrix(src_pts, dst_pts)
matrix_estimation_timer.stop()
print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
if homography_matrix is not None:
self.update_drone_position(homography_matrix)
self.timer.stop()
# Обновляем визуализацию
if self.vis_manager:
# Используем визуализацию движения точек по сетке
self.vis_manager.update_homography_grid(
current_chunk.to_numpy(),
homography_matrix,
grid_step=35)
self.timer.start()
chunk_timer = Timer()
chunk_timer.start()
# Пытаемся найти ориентир на картинке:
self.prev_chunk = current_chunk
# npos = self.get_position_by_chunk()
# if npos is not None:
# self.reserved_pos = npos
chunk_timer.stop()
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms")
command = self.make_command()
self.timer.reset()
return command
def make_command(self) -> PilotCommand:
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
)
if distance_to_target < 35:
self.target_idx += 1
if self.target_idx == len(self.points):
return PilotCommand(stop=True)
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
)
if self.reserved_pos is not None:
self.pos = self.reserved_pos
self.reserved_pos = None
# Вычисляем угол к цели
target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x)
angle_trajectory = self.pos.yaw + math.pi / 2
# Вычисляем разность углов (направление поворота)
angle_diff = target_angle - angle_trajectory
# Нормализуем разность углов в диапазон [-π, π]
angle_diff %= 2 * math.pi
if angle_diff >= math.pi:
angle_diff -= 2 * math.pi
d_r = max(10, min(35., distance_to_target / 2))
d_a_limit = d_r / 10 * 0.01
command = PilotCommand(
max(min(d_a_limit, angle_diff), -d_a_limit),
d_r, False, self.timer.get_elapsed()
)
return command
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
"""Сбрасывает позицию и угол БПЛА"""
self.pos = Position(x, y, 1, angle)