340 lines
13 KiB
Python
340 lines
13 KiB
Python
from enum import Enum
|
||
from pathlib import Path
|
||
import math
|
||
import random
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from PIL import Image
|
||
|
||
from timer import Timer
|
||
|
||
from vision_chunk import VisionChunk
|
||
from visualization import VisualizationManager
|
||
from position import Position
|
||
from constants import CAMERA_HEIGHT
|
||
from utility import estimate_transformation_matrix
|
||
|
||
random.seed(1)
|
||
|
||
class Pilot:
|
||
def __init__(self): pass
|
||
def handle(self, image: Image): pass
|
||
def act(self) -> tuple[float, float] | None: pass
|
||
def get_position(self) -> tuple[float, float]: pass
|
||
|
||
class PilotCommand:
|
||
velocity: float
|
||
dangle: float
|
||
stop: bool
|
||
proccessing_time: float
|
||
|
||
def __init__(self,
|
||
dangle: float = 0,
|
||
velocity: float = 100,
|
||
stop: bool = False,
|
||
processing_time: float = 0.
|
||
):
|
||
self.dangle = dangle
|
||
self.stop = stop
|
||
self.velocity = velocity
|
||
self.proccessing_time = processing_time
|
||
|
||
class AutoPilot(Pilot):
|
||
|
||
# Состояние одометрии
|
||
pos: Position
|
||
|
||
chunks: list[VisionChunk] # Ориентиры
|
||
target_idx: int # Текущий ориентир
|
||
|
||
# Всякие вспомогательные штуки
|
||
timer: Timer
|
||
prev_chunk: VisionChunk | None
|
||
frame_count: int
|
||
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
|
||
|
||
# Положение на основе ориентира
|
||
reserved_pos: Position | None
|
||
proccessing_time: float
|
||
|
||
def __init__(self, points = [], chunks = [], viz_manager=None, pixel_ratio: float = 1.):
|
||
self.prev_chunk = None
|
||
self.pos = Position(0, 0, 1, 0, 0, 0)
|
||
self.chunks = chunks
|
||
self.frame_count = 0
|
||
self.vis_manager = viz_manager # Менеджер визуализации
|
||
self.reserved_pos = None
|
||
self.pixel_ratio = pixel_ratio
|
||
|
||
# Пороговые значения качества сопоставления/гомографии
|
||
self.min_inliers: int = 12
|
||
self.min_inlier_ratio: float = 0.5
|
||
self.max_reproj_rmse: float = 3.0
|
||
self.min_scale: float = 0.7
|
||
self.max_scale: float = 1.4
|
||
|
||
self.points = points
|
||
self.target_idx = 0
|
||
|
||
self.timer = Timer()
|
||
self.chunk_points = np.array([[chunk.pos.x, chunk.pos.y] for chunk in self.chunks])
|
||
|
||
def get_position(self) -> tuple[float, float]:
|
||
return self.pos.x, self.pos.y
|
||
|
||
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
|
||
"""
|
||
Вычисляет оптический поток между двумя кадрами
|
||
Возвращает src_pts и dst_pts в отцентрированных координатах
|
||
"""
|
||
prev_gray = prev_chunk.to_cv2_gray()
|
||
current_gray = current_chunk.to_cv2_gray()
|
||
|
||
h, w = prev_gray.shape[:2]
|
||
|
||
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
|
||
step = 20
|
||
grid_points = []
|
||
for y in range(step, h - step, step):
|
||
for x in range(step, w - step, step):
|
||
grid_points.append([x, y])
|
||
|
||
if len(grid_points) < 4:
|
||
return None, None
|
||
|
||
p0 = np.float32(grid_points).reshape(-1, 1, 2)
|
||
|
||
# Параметры для Lucas-Kanade
|
||
lk_params = dict(
|
||
winSize=(11, 11),
|
||
maxLevel=2,
|
||
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
|
||
)
|
||
|
||
# Вычисляем разреженный оптический поток
|
||
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
|
||
|
||
# Фильтруем только успешно отслеженные точки
|
||
good_mask = err < 12.0 # порог ошибки
|
||
good_old = p0[status == 1 & good_mask]
|
||
good_new = p1[status == 1 & good_mask]
|
||
|
||
if len(good_old) < 4:
|
||
return None, None
|
||
|
||
# Преобразуем в отцентрированные координаты (Y вверх)
|
||
src_pts = np.float32([[x, y] for x, y in good_old]).reshape(-1, 1, 2)
|
||
dst_pts = np.float32([[x, y] for x, y in good_new]).reshape(-1, 1, 2)
|
||
|
||
return src_pts, dst_pts
|
||
|
||
def update_drone_position(self, homography_matrix: np.ndarray):
|
||
"""
|
||
Обновляет позицию и угол БПЛА на основе трансформации изображения
|
||
Координаты уже отцентрированы относительно центра изображения
|
||
"""
|
||
self.pos.iapply(homography_matrix)
|
||
|
||
def get_drone_state(self) -> dict:
|
||
"""
|
||
Возвращает текущее состояние БПЛА
|
||
"""
|
||
return {
|
||
'x': self.pos.x,
|
||
'y': self.pos.y,
|
||
'angle': self.pos.yaw,
|
||
'angle_degrees': math.degrees(self.pos.yaw),
|
||
'frame_count': self.frame_count
|
||
}
|
||
|
||
def get_position_by_chunk(self) -> Position | None:
|
||
cur_pos = np.array([self.pos.x, self.pos.y])
|
||
closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin()
|
||
|
||
current_chunk = self.prev_chunk
|
||
landmark_chunk = self.chunks[closest_chunk_idx]
|
||
|
||
# Краевой случай: отсутствие чанков
|
||
if current_chunk is None or landmark_chunk is None:
|
||
return None
|
||
|
||
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
|
||
|
||
# Визуализация (если нужна)
|
||
if src_pts is not None and dst_pts is not None and self.vis_manager:
|
||
was_enabled = self.timer.enabled
|
||
if was_enabled:
|
||
self.timer.stop()
|
||
self.vis_manager.update_chunk_matches(
|
||
landmark_chunk.to_numpy(),
|
||
current_chunk.to_numpy(),
|
||
kp1, kp2, matches
|
||
)
|
||
if was_enabled:
|
||
self.timer.start()
|
||
|
||
# Краевой случай: нет точек или недостаточно матчей
|
||
if src_pts is None or dst_pts is None:
|
||
return None
|
||
|
||
num_matches = len(src_pts)
|
||
if num_matches < 20:
|
||
return None
|
||
|
||
# Оценка матрицы гомографии
|
||
landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts)
|
||
num_inliers = int(np.sum(mask))
|
||
|
||
# Краевой случай: матрица не найдена
|
||
if landmark_transform is None or mask is None:
|
||
return None
|
||
|
||
# === КРИТЕРИИ ПРИНЯТИЯ РЕШЕНИЯ ===
|
||
|
||
# 1. Минимальное количество инлайеров (абсолютное)
|
||
MIN_INLIERS_ABSOLUTE = 6
|
||
if num_inliers < MIN_INLIERS_ABSOLUTE:
|
||
return None
|
||
|
||
# 2. Процент инлайеров от общего числа матчей
|
||
inlier_ratio = num_inliers / num_matches
|
||
MIN_INLIER_RATIO = 0.25 # Минимум 25% инлайеров
|
||
if inlier_ratio < MIN_INLIER_RATIO:
|
||
return None
|
||
|
||
# 3. Проверка качества гомографии (детерминант для выявления вырожденных случаев)
|
||
det = np.linalg.det(landmark_transform[:2, :2])
|
||
# Детерминант должен быть близок к 1 (без сильного масштабирования)
|
||
if abs(det) < 0.1 or abs(det) > 10.0:
|
||
return None
|
||
|
||
# 4. Проверка на валидность перспективного преобразования
|
||
# Элементы третьей строки не должны быть слишком большими
|
||
if abs(landmark_transform[2, 0]) > 0.01 or abs(landmark_transform[2, 1]) > 0.01:
|
||
return None
|
||
|
||
# 5. Дополнительная проверка: средняя ошибка репроекции для инлайеров
|
||
inlier_src = src_pts[mask.ravel() == 1]
|
||
inlier_dst = dst_pts[mask.ravel() == 1]
|
||
|
||
# Преобразуем точки через найденную гомографию
|
||
transformed_pts = cv2.perspectiveTransform(inlier_src, landmark_transform)
|
||
|
||
# Вычисляем ошибку репроекции
|
||
reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2))
|
||
mean_error = np.mean(reprojection_errors)
|
||
|
||
MAX_MEAN_REPROJECTION_ERROR = 1.0 # пиксели
|
||
if mean_error > MAX_MEAN_REPROJECTION_ERROR:
|
||
return None
|
||
|
||
# 6. Проверка стабильности: если слишком много хороших совпадений, но мало инлайеров - подозрительно
|
||
if num_matches > 50 and inlier_ratio < 0.15:
|
||
return None
|
||
|
||
# === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ ===
|
||
print("[INFO]: Landmark Chunk Correction Applied")
|
||
return landmark_chunk.pos.apply(landmark_transform)
|
||
|
||
|
||
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
|
||
self.timer.start()
|
||
self.frame_count += 1
|
||
|
||
if self.prev_chunk is None:
|
||
self.prev_chunk = current_chunk
|
||
self.timer.stop()
|
||
return PilotCommand(processing_time=self.timer.get_elapsed())
|
||
|
||
# Вычисляем оптический поток для покадрового сравнения
|
||
matching_timer = Timer()
|
||
matching_timer.start()
|
||
# src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
|
||
src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
|
||
matching_timer.stop()
|
||
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
|
||
|
||
# Оцениваем смещение БПЛА
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
|
||
matrix_estimation_timer = Timer()
|
||
matrix_estimation_timer.start()
|
||
homography_matrix, _ = estimate_transformation_matrix(src_pts, dst_pts)
|
||
matrix_estimation_timer.stop()
|
||
print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
|
||
|
||
if homography_matrix is not None:
|
||
self.update_drone_position(homography_matrix)
|
||
self.timer.stop()
|
||
|
||
# Обновляем визуализацию
|
||
if self.vis_manager:
|
||
# Используем визуализацию движения точек по сетке
|
||
self.vis_manager.update_homography_grid(
|
||
current_chunk.to_numpy(),
|
||
homography_matrix,
|
||
grid_step=35)
|
||
|
||
self.timer.start()
|
||
|
||
chunk_timer = Timer()
|
||
chunk_timer.start()
|
||
|
||
# Пытаемся найти ориентир на картинке:
|
||
self.prev_chunk = current_chunk
|
||
pos_by_chunk = self.get_position_by_chunk()
|
||
if pos_by_chunk is not None:
|
||
self.pos = pos_by_chunk
|
||
|
||
chunk_timer.stop()
|
||
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms")
|
||
command = self.make_command()
|
||
self.timer.reset()
|
||
return command
|
||
|
||
def make_command(self) -> PilotCommand:
|
||
# Расстояние до цели
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.pos.y) ** 2
|
||
)
|
||
|
||
if distance_to_target < 35:
|
||
self.target_idx += 1
|
||
|
||
if self.target_idx == len(self.points):
|
||
return PilotCommand(stop=True)
|
||
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.pos.y) ** 2
|
||
)
|
||
|
||
# Вычисляем угол к цели
|
||
target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x)
|
||
|
||
angle_trajectory = self.pos.yaw + math.pi / 2
|
||
|
||
# Вычисляем разность углов (направление поворота)
|
||
angle_diff = target_angle - angle_trajectory
|
||
|
||
# Нормализуем разность углов в диапазон [-π, π]
|
||
angle_diff %= 2 * math.pi
|
||
if angle_diff >= math.pi:
|
||
angle_diff -= 2 * math.pi
|
||
|
||
d_r = max(5, min(10., distance_to_target / 2 * self.pixel_ratio))
|
||
d_a_limit = np.radians(5)
|
||
|
||
command = PilotCommand(
|
||
max(min(d_a_limit, angle_diff), -d_a_limit),
|
||
d_r, False, self.timer.get_elapsed()
|
||
)
|
||
return command
|
||
|
||
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
||
"""Сбрасывает позицию и угол БПЛА"""
|
||
self.pos = Position(x, y, 1, angle)
|