Files
autopilot/autopilot.py
2025-12-19 23:26:25 +03:00

483 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from enum import Enum
from pathlib import Path
import math
import random
import cv2
import numpy as np
from PIL import Image
from timer import Timer
from visualization import VisualizationManager
random.seed(1)
class Pilot:
def __init__(self): pass
def handle(self, image: Image): pass
def act(self) -> tuple[float, float] | None: pass
def get_position(self) -> tuple[float, float]: pass
class PilotCommand:
velocity: float
dangle: float
stop: bool
proccessing_time: float
def __init__(self,
dangle: float = 0,
velocity: float = 100,
stop: bool = False,
processing_time: float = 0.
):
self.dangle = dangle
self.stop = stop
self.velocity = velocity
self.proccessing_time = processing_time
class AutoPilot(Pilot):
# Состояние одометрии
angle: float
x: float # Координата X БПЛА
y: float # Координата Y БПЛА
zoom: float # Коэффициент увеличения
# Ориентиры
points: list[tuple[float,float]] # Координаты
keypoints: list[any] # Ключевые точки ориентиров
target_idx: int # Текущая целевая метка
# Всякие вспомогательные штуки
timer: Timer
prev_image: np.ndarray | None
orb_detector: cv2.ORB
bf_matcher: cv2.BFMatcher
frame_count: int
image_center: tuple # Центр изображения (x, y)
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
# Положение на основе ориентира
reserved_pos: tuple[float, float, float] | None
def __init__(self, points = [], keypoints = [], viz_manager=None):
self.prev_image = None
self.angle = 0.0
self.x = 0.0
self.y = 0.0
self.zoom = 1.0
self.frame_count = 0
self.image_center = (0, 0) # Будет обновлено при получении первого изображения
self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_pos = None
# Пороговые значения качества сопоставления/гомографии
self.min_inliers: int = 12
self.min_inlier_ratio: float = 0.5
self.max_reproj_rmse: float = 3.0
self.min_scale: float = 0.7
self.max_scale: float = 1.4
# Инициализация ORB детектора
self.orb_detector = cv2.ORB_create(
nfeatures=1000,
scaleFactor=1.3,
nlevels=4,
edgeThreshold=19,
patchSize=31,
fastThreshold=15,
scoreType=cv2.ORB_FAST_SCORE # FAST обычно быстрее
)
# Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio)
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
self.points = points
self.keypoints = keypoints
self.target_idx = 0
self.timer = Timer()
def get_position(self) -> tuple[float, float]:
return self.x, self.y
def image_to_numpy(self, image: Image.Image) -> np.ndarray:
"""Конвертирует PIL Image в numpy array для OpenCV"""
return np.array(image)
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray):
"""
Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями
"""
# Предобработка (повышение контраста и устойчивости)
def preprocess(gray_img: np.ndarray) -> np.ndarray:
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
return clahe.apply(gray_img)
# В градации серого
g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1
g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2
g1 = preprocess(g1)
g2 = preprocess(g2)
# Обнаружение ключевых точек и дескрипторов
kp1, des1 = self.orb_detector.detectAndCompute(g1, None)
kp2, des2 = self.orb_detector.detectAndCompute(g2, None)
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
return None, None, None, None, None
# kNN сопоставление и тест Лоу
matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2)
good_matches: list[cv2.DMatch] = []
for m_n in matches_knn:
if len(m_n) < 2:
continue
m, n = m_n
if m.distance < 0.75 * n.distance:
good_matches.append(m)
# Дополнительная фильтрация по расстоянию (мягкий порог)
good_matches = sorted(good_matches, key=lambda x: x.distance)
good_matches = [m for m in good_matches if m.distance < 64]
if len(good_matches) < 4:
return None, None, None, None, None
# Получаем центр изображения
height1, width1 = g1.shape[:2]
height2, width2 = g2.shape[:2]
center_x1, center_y1 = width1 // 2, height1 // 2
center_x2, center_y2 = width2 // 2, height2 // 2
# Извлекаем координаты сопоставленных точек и отцентрируем их
src_pts = []
dst_pts = []
for match in good_matches:
# Координаты точки в первом изображении
pt1 = kp1[match.queryIdx].pt
src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]])
# Координаты точки во втором изображении
pt2 = kp2[match.trainIdx].pt
dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]])
# Конвертируем в numpy массивы
src_pts = np.float32(src_pts).reshape(-1, 1, 2)
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
return src_pts, dst_pts, good_matches, kp1, kp2
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray):
"""
Оценивает матрицу трансформации на основе сопоставленных ключевых точек
Точки уже отцентрированы относительно центра изображения
"""
# Используем RANSAC для оценки матрицы гомографии
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 1.0)
# RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0)
if H is None:
return None
# Маска инлайеров
inliers = int(mask.sum()) if mask is not None else 0
total = int(mask.size) if mask is not None else 0
inlier_ratio = (inliers / total) if total > 0 else 0.0
# Ошибка репроекции по инлайерам (RMSE)
rmse = None
try:
# Проецируем src через H и сравниваем с dst
proj = cv2.perspectiveTransform(src_pts, H)
errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1)
if mask is not None and mask.shape[0] == errs.shape[0]:
errs = errs[mask.ravel().astype(bool)]
if errs.size > 0:
rmse = float(np.sqrt(np.mean(errs ** 2)))
except Exception:
rmse = None
# Извлекаем параметры трансформации из матрицы гомографии
# H = [a11 a12 tx]
# [a21 a22 ty]
# [0 0 1 ]
# Масштаб и поворот
a11, a12 = H[0, 0], H[0, 1]
a21, a22 = H[1, 0], H[1, 1]
# Смещение (уже отцентрировано)
tx, ty = -H[0, 2], -H[1, 2]
# print(" [Pilot] translate:", tx, ty)
# Вычисляем угол поворота
angle = -np.arctan2(a21, a11)
# Вычисляем масштаб
scale_x = np.sqrt(a11**2 + a21**2)
scale_y = np.sqrt(a12**2 + a22**2)
scale = (scale_x + scale_y) / 2
return {
'translation': (tx, ty), # Уже отцентрировано
'rotation': angle,
'scale': scale,
'homography': H,
'mask': mask,
'inliers': inliers,
'inliers_ratio': inlier_ratio,
'rmse': rmse
}
def calc_position(
self,
mat: np.ndarray,
x_source: float,
y_source: float,
angle_source: float,
zoom_source: float
) -> tuple[float, float, float] | None:
""" Возвращает новые координаты и поворот на основе матрицы преобразования """
tx, ty = -mat[0, 2], -mat[1, 2]
# Вычисляем угол поворота
rotation = -np.arctan2(mat[1, 0], mat[0, 0])
scale = np.hypot(mat[0, 0], mat[0, 1])
print("Scale: ", scale)
# print("[HOMOGRAPHY]", mat)
# print("[ROTATION]", rotation)
# Координаты уже отцентрированы, поэтому используем их напрямую
dx_meters = tx
dy_meters = ty
angle_global = angle_source + rotation
# Применяем поворот к смещению (учитываем текущий угол БПЛА)
cos_angle = math.cos(angle_global)
sin_angle = math.sin(angle_global)
# Поворачиваем смещение в глобальные координаты
# Обратите внимание: dy_meters инвертирован, так как в изображениях Y направлен вниз
dx_global = dx_meters * cos_angle - dy_meters * sin_angle
dy_global = dx_meters * sin_angle + dy_meters * cos_angle
# Обновляем координаты БПЛА
print("[Matrix Transformation]: ")
print(mat)
zoom = zoom_source / scale
x = x_source + dx_global * zoom
y = y_source + dy_global * zoom
# Нормализуем угол в диапазоне [-π, π]
angle = math.atan2(math.sin(angle_global), math.cos(angle_global))
return x, y, angle, zoom
def update_drone_position(self, transformation_info: dict):
"""
Обновляет позицию и угол БПЛА на основе трансформации изображения
Координаты уже отцентрированы относительно центра изображения
"""
homography = transformation_info['homography']
self.x, self.y, self.angle, self.zoom = self.calc_position(
homography,
self.x, self.y, self.angle, self.zoom
)
if self.reserved_pos is not None:
self.reserved_pos = self.calc_position(
homography, *self.reserved_pos
)
def get_drone_state(self) -> dict:
"""
Возвращает текущее состояние БПЛА
"""
return {
'x': self.x,
'y': self.y,
'angle': self.angle,
'angle_degrees': math.degrees(self.angle),
'frame_count': self.frame_count
}
def get_position_by_chunk(self) -> tuple[float, float, float, float] | None:
# Пытаемся найти ориентир на картинке:
current_image = self.prev_image
landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB)
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(landmark_image, current_image)
if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled
if was_enabled: self.timer.stop()
self.vis_manager.update_chunk_matches(landmark_image, current_image, kp1, kp2, matches)
if was_enabled: self.timer.start()
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
if landmark_transform is not None:
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
ratio = landmark_transform.get('inliers_ratio', 0.0)
ok_ratio = (ratio >= self.min_inlier_ratio)
rmse = landmark_transform.get('rmse', None)
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
# print("[HELP]")
# print("Matrix", landmark_transform['homography'])
# print("Position", self.x, self.y)
# print("Position of point", self.points[self.target_idx])
# print("[PILOT]", rmse, ratio, ok_rmse)
# if False:
# Считаем абсолютную позу относительно координат ориентира
landmark_world_x, landmark_world_y = self.points[self.target_idx]
landmark_angle = 0
landmark_zoom = 1
homography = landmark_transform['homography']
# homography = np.linalg.inv(homography)
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
return self.calc_position(homography, landmark_world_x, landmark_world_y, landmark_angle, landmark_zoom)
return None
def handle(self, image: Image) -> PilotCommand:
self.timer.start()
self.frame_count += 1
if self.prev_image is None:
self.prev_image = self.image_to_numpy(image)
# Вычисляем центр изображения
height, width = self.prev_image.shape[:2]
self.image_center = (width // 2, height // 2)
self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed())
# Конвертируем текущее изображение
current_image = self.image_to_numpy(image)
# Обновляем центр изображения
height, width = current_image.shape[:2]
self.image_center = (width // 2, height // 2)
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.x) ** 2 +
(self.points[self.target_idx][1] - self.y) ** 2
)
# Обнаруживаем и сопоставляем ключевые точки
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
# Оцениваем смещение БПЛА
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
if transformation_info:
# Обновляем позицию и угол БПЛА (одометрия по кадрам)
self.update_drone_position(transformation_info)
self.timer.stop()
# Выводим текущее состояние БПЛА
drone_state = self.get_drone_state()
# print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})")
# print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°")
# print(f" [Pilot] Target Index: {self.target_idx}")
# print(f" [Pilot] Target Position: {self.points[self.target_idx]}")
# print(f" [Pilot] Distance: {distance_to_target}")
# Обновляем визуализацию
if self.vis_manager:
# Обновляем сопоставление точек
self.vis_manager.update_matches(
self.prev_image,
current_image,
kp1, kp2, matches,
transformation_info)
mask = transformation_info['mask']
self.vis_manager.update_motion_vectors(
current_image,
kp1, kp2,
np.array(matches)[mask.ravel().astype(bool)])
# Используем новую визуализацию движения точек по сетке
homography_matrix = transformation_info['homography']
self.vis_manager.update_homography_grid(
current_image,
homography_matrix,
grid_step=85)
self.timer.start()
# Пытаемся найти ориентир на картинке:
self.prev_image = current_image
npos = self.get_position_by_chunk()
if npos is not None:
self.reserved_pos = npos
if distance_to_target < 35:
self.target_idx += 1
if self.target_idx == len(self.points):
return PilotCommand(stop=True)
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.x) ** 2 +
(self.points[self.target_idx][1] - self.y) ** 2
)
if self.reserved_pos is not None:
self.x, self.y, self.angle, self.zoom = self.reserved_pos
self.reserved_pos = None
# Вычисляем угол к цели
target_angle = math.atan2(self.points[self.target_idx][1] - self.y, self.points[self.target_idx][0] - self.x)
angle_trajectory = self.angle + math.pi / 2
# print("[ANGLE]", angle_trajectory, "->", target_angle)
# Вычисляем разность углов (направление поворота)
angle_diff = target_angle - angle_trajectory
# Нормализуем разность углов в диапазон [-π, π]
angle_diff %= 2 * math.pi
if angle_diff >= math.pi:
angle_diff -= 2 * math.pi
d_r = max(10, min(75., distance_to_target / 2))
d_a_limit = d_r / 10 * 0.07
command = PilotCommand(
max(min(d_a_limit, angle_diff), -d_a_limit),
d_r, False, self.timer.get_elapsed()
)
self.timer.reset()
return command
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
"""Сбрасывает позицию и угол БПЛА"""
self.x = x
self.y = y
self.angle = angle
self.frame_count = 0