Files
autopilot/autopilot.py
2026-02-05 21:49:03 +03:00

389 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from enum import Enum
from pathlib import Path
import math
import random
import constants
import cv2
import numpy as np
from PIL import Image
from timer import Timer
from vision_chunk import VisionChunk
from visualization import VisualizationManager
from position import Position
from constants import CAMERA_HEIGHT
from utility import estimate_transformation_matrix
random.seed(1)
class Pilot:
def __init__(self): pass
def handle(self, image: Image): pass
def act(self) -> tuple[float, float] | None: pass
def get_position(self) -> tuple[float, float]: pass
class PilotCommand:
velocity: float
dangle: float
stop: bool
proccessing_time: float
def __init__(self,
dangle: float = 0,
velocity: float = 100,
stop: bool = False,
processing_time: float = 0.
):
self.dangle = dangle
self.stop = stop
self.velocity = velocity
self.proccessing_time = processing_time
class AutoPilot(Pilot):
# Состояние одометрии
pos: Position
chunks: list[VisionChunk] # Ориентиры
target_idx: int # Текущий ориентир
# Всякие вспомогательные штуки
timer: Timer
prev_chunk: VisionChunk | None
frame_count: int
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
# Положение на основе ориентира
reserved_pos: Position | None
proccessing_time: float
def __init__(self, points = [], chunks = [], viz_manager=None, pixel_ratio: float = 1.):
self.prev_chunk = None
self.pos = Position(0, 0, 1, 0, 0, 0)
self.chunks = chunks
self.frame_count = 0
self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_pos = None
self.pixel_ratio = pixel_ratio
# Пороговые значения качества сопоставления/гомографии
self.min_inliers: int = 12
self.min_inlier_ratio: float = 0.5
self.max_reproj_rmse: float = 3.0
self.min_scale: float = 0.7
self.max_scale: float = 1.4
self.points = points
self.target_idx = 0
self.timer = Timer()
self.chunk_points = np.array([[chunk.pos.x, chunk.pos.y] for chunk in self.chunks])
def get_position(self) -> tuple[float, float]:
return self.pos.x, self.pos.y
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
"""
Вычисляет оптический поток между двумя кадрами
Возвращает src_pts и dst_pts в отцентрированных координатах
"""
prev_gray = prev_chunk.to_cv2_gray()
current_gray = current_chunk.to_cv2_gray()
h, w = prev_gray.shape[:2]
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
step = 20
grid_points = []
for y in range(step, h - step, step):
for x in range(step, w - step, step):
grid_points.append([x, y])
if len(grid_points) < 4:
return None, None
p0 = np.float32(grid_points).reshape(-1, 1, 2)
# Параметры для Lucas-Kanade
lk_params = dict(
winSize=(11, 11),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# Вычисляем разреженный оптический поток
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
# Фильтруем только успешно отслеженные точки
good_mask = err < 12.0 # порог ошибки
good_old = p0[status == 1 & good_mask]
good_new = p1[status == 1 & good_mask]
if len(good_old) < 4:
return None, None
# Преобразуем в отцентрированные координаты (Y вверх)
src_pts = np.float32([[x, y] for x, y in good_old]).reshape(-1, 1, 2)
dst_pts = np.float32([[x, y] for x, y in good_new]).reshape(-1, 1, 2)
return src_pts, dst_pts
def update_drone_position(self, homography_matrix: np.ndarray):
"""
Обновляет позицию и угол БПЛА на основе трансформации изображения
Координаты уже отцентрированы относительно центра изображения
"""
self.pos.iapply(homography_matrix)
def get_drone_state(self) -> dict:
"""
Возвращает текущее состояние БПЛА
"""
return {
'x': self.pos.x,
'y': self.pos.y,
'angle': self.pos.yaw,
'angle_degrees': math.degrees(self.pos.yaw),
'frame_count': self.frame_count
}
def get_position_by_chunk(self) -> Position | None:
landmark_timer = Timer()
landmark_timer.start()
cur_pos = np.array([self.pos.x, self.pos.y])
closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin()
current_chunk = self.prev_chunk
landmark_chunk = self.chunks[closest_chunk_idx]
if constants.DEBUG_FPS:
print(f"[LANDMARK]: Closest chunk finding: {landmark_timer.loop() * 1000:.2f} ms")
# Краевой случай: отсутствие чанков
if current_chunk is None or landmark_chunk is None:
return None
landmark_timer.start()
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if constants.DEBUG_FPS:
print(f"[LANDMARK]: detect and match keypoints: {landmark_timer.loop() * 1000:.2f} ms")
landmark_timer.stop()
# Визуализация (если нужна)
if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled
if was_enabled:
self.timer.stop()
self.vis_manager.update_chunk_matches(
landmark_chunk.to_numpy(),
current_chunk.to_numpy(),
kp1, kp2, matches
)
if was_enabled:
self.timer.start()
landmark_timer.start()
# Краевой случай: нет точек или недостаточно матчей
if src_pts is None or dst_pts is None:
return None
num_matches = len(src_pts)
if num_matches < 20:
return None
# Оценка матрицы гомографии
landmark_timer.loop()
landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts)
num_inliers = int(np.sum(mask))
if constants.DEBUG_FPS:
print(f"[LANDMARK]: matrix estimation: {landmark_timer.loop() * 1000:.2f} ms")
# Краевой случай: матрица не найдена
if landmark_transform is None or mask is None:
return None
# === КРИТЕРИИ ПРИНЯТИЯ РЕШЕНИЯ ===
# 1. Минимальное количество инлайеров (абсолютное)
MIN_INLIERS_ABSOLUTE = 6
if num_inliers < MIN_INLIERS_ABSOLUTE:
return None
# 2. Процент инлайеров от общего числа матчей
inlier_ratio = num_inliers / num_matches
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: inlier_ratio=", inlier_ratio)
MIN_INLIER_RATIO = 0.6
if inlier_ratio < MIN_INLIER_RATIO:
return None
# 3. Проверка качества гомографии (детерминант для выявления вырожденных случаев)
det = np.linalg.det(landmark_transform[:2, :2])
# Детерминант должен быть близок к 1 (без сильного масштабирования)
if abs(det) < 0.1 or abs(det) > 10.0:
return None
# 4. Проверка на валидность перспективного преобразования
# Элементы третьей строки не должны быть слишком большими
if abs(landmark_transform[2, 0]) > 0.01 or abs(landmark_transform[2, 1]) > 0.01:
return None
# 5. Дополнительная проверка: средняя ошибка репроекции для инлайеров
inlier_src = src_pts[mask.ravel() == 1]
inlier_dst = dst_pts[mask.ravel() == 1]
# Преобразуем точки через найденную гомографию
transformed_pts = cv2.perspectiveTransform(inlier_src, landmark_transform)
# Вычисляем ошибку репроекции
reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2))
mean_error = np.mean(reprojection_errors)
MAX_MEAN_REPROJECTION_ERROR = 1.1 # пиксели
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: Mean_error=", mean_error)
if mean_error > MAX_MEAN_REPROJECTION_ERROR:
return None
# 6. Проверка стабильности: если слишком много хороших совпадений, но мало инлайеров - подозрительно
if num_matches > 50 and inlier_ratio < 0.15:
return None
# === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ ===
print("[LANDMARK]: Correction Applied")
if constants.DEBUG_FPS:
print(f"[LANDMARK]: time: {landmark_timer.get_elapsed() * 1000:.2f} ms")
return landmark_chunk.pos.apply(landmark_transform)
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
self.timer.start()
self.frame_count += 1
if self.prev_chunk is None:
self.prev_chunk = current_chunk
self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed())
# Вычисляем оптический поток для покадрового сравнения
matching_timer = Timer()
matching_timer.start()
src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
# src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
matching_timer.stop()
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
# Оцениваем смещение БПЛА
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
matrix_estimation_timer = Timer()
matrix_estimation_timer.start()
homography_matrix, _ = estimate_transformation_matrix(src_pts, dst_pts)
matrix_estimation_timer.stop()
print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
if homography_matrix is not None:
self.update_drone_position(homography_matrix)
self.timer.stop()
# Обновляем визуализацию
if self.vis_manager:
# Используем визуализацию движения точек по сетке
self.vis_manager.update_homography_grid(
current_chunk.to_numpy(),
homography_matrix,
grid_step=35)
self.timer.start()
# Пытаемся найти ориентир на картинке:
self.prev_chunk = current_chunk
# Для улучшения среднего FPS
if self.frame_count % 5 == 0:
pos_by_chunk = self.get_position_by_chunk()
if pos_by_chunk is not None:
self.pos = pos_by_chunk
command = self.make_command()
self.timer.reset()
return command
def make_command(self) -> PilotCommand:
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
) * self.pixel_ratio
if distance_to_target < 35:
self.target_idx += 1
if self.target_idx == len(self.points):
return PilotCommand(stop=True)
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
)
angle_trajectory = self.pos.yaw + math.pi / 2
# Проверка на слепую зону
R = 120
blind = np.array([
[
self.pos.x * self.pixel_ratio + R * np.cos(angle_trajectory - np.pi / 2),
self.pos.y * self.pixel_ratio + R * np.sin(angle_trajectory - np.pi / 2),
],
[
self.pos.x * self.pixel_ratio + R * np.cos(angle_trajectory + np.pi / 2),
self.pos.y * self.pixel_ratio + R * np.sin(angle_trajectory + np.pi / 2),
]
])
blind -= self.points[self.target_idx] * self.pixel_ratio
blind = np.hypot(blind[:, 0], blind[:, 1])
print("R: ", blind)
if np.min(blind) < R:
return PilotCommand(0, 10, False, self.timer.get_elapsed())
# Вычисляем угол к цели
target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x)
# Вычисляем разность углов (направление поворота)
angle_diff = target_angle - angle_trajectory
# Нормализуем разность углов в диапазон [-π, π]
angle_diff %= 2 * math.pi
if angle_diff >= math.pi:
angle_diff -= 2 * math.pi
d_r = max(5, min(10., distance_to_target / 2))
d_a_limit = np.radians(5)
command = PilotCommand(
max(min(d_a_limit, angle_diff), -d_a_limit),
d_r, False, self.timer.get_elapsed()
)
return command
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
"""Сбрасывает позицию и угол БПЛА"""
self.pos = Position(x, y, 1, angle)