443 lines
19 KiB
Python
443 lines
19 KiB
Python
from enum import Enum
|
||
from pathlib import Path
|
||
import math
|
||
import random
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from PIL import Image
|
||
|
||
from visualization import VisualizationManager
|
||
|
||
random.seed(1)
|
||
|
||
class Pilot:
|
||
def __init__(self): pass
|
||
def handle(self, image: Image): pass
|
||
def act(self) -> tuple[float, float] | None: pass
|
||
def get_position(self) -> tuple[float, float]: pass
|
||
|
||
class PilotCommand:
|
||
velocity: float
|
||
dangle: float
|
||
stop: bool
|
||
|
||
def __init__(self, dangle: float = 0, velocity: float = 100, stop: bool = False):
|
||
self.dangle = dangle
|
||
self.stop = stop
|
||
self.velocity = velocity
|
||
|
||
class AutoPilot(Pilot):
|
||
|
||
# Состояние одометрии
|
||
angle: float
|
||
x: float # Координата X БПЛА
|
||
y: float # Координата Y БПЛА
|
||
|
||
# Ориентиры
|
||
points: list[tuple[float,float]] # Координаты
|
||
keypoints: list[any] # Ключевые точки ориентиров
|
||
target_idx: int # Текущая целевая метка
|
||
|
||
# Всякие вспомогательные штуки
|
||
prev_image: np.ndarray | None
|
||
orb_detector: cv2.ORB
|
||
bf_matcher: cv2.BFMatcher
|
||
frame_count: int
|
||
image_center: tuple # Центр изображения (x, y)
|
||
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
|
||
|
||
def __init__(self, points = [], keypoints = [], viz_manager=None):
|
||
self.prev_image = None
|
||
self.angle = 0.0
|
||
self.x = 0.0
|
||
self.y = 0.0
|
||
self.frame_count = 0
|
||
self.image_center = (0, 0) # Будет обновлено при получении первого изображения
|
||
self.vis_manager = viz_manager # Менеджер визуализации
|
||
self.reserved_pos = None
|
||
|
||
# Пороговые значения качества сопоставления/гомографии
|
||
self.min_inliers: int = 12
|
||
self.min_inlier_ratio: float = 0.5
|
||
self.max_reproj_rmse: float = 3.0
|
||
self.min_scale: float = 0.7
|
||
self.max_scale: float = 1.4
|
||
|
||
# Инициализация ORB детектора
|
||
self.orb_detector = cv2.ORB_create(
|
||
nfeatures=1800,
|
||
scaleFactor=1.3,
|
||
nlevels=4,
|
||
edgeThreshold=19,
|
||
patchSize=31,
|
||
fastThreshold=15,
|
||
scoreType=cv2.ORB_FAST_SCORE # FAST обычно быстрее
|
||
)
|
||
|
||
# Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio)
|
||
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||
|
||
self.points = points
|
||
self.keypoints = keypoints
|
||
self.target_idx = 0
|
||
|
||
def get_position(self) -> tuple[float, float]:
|
||
return self.x, self.y
|
||
|
||
def image_to_numpy(self, image: Image.Image) -> np.ndarray:
|
||
"""Конвертирует PIL Image в numpy array для OpenCV"""
|
||
return np.array(image)
|
||
|
||
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray):
|
||
"""
|
||
Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями
|
||
"""
|
||
# Предобработка (повышение контраста и устойчивости)
|
||
def preprocess(gray_img: np.ndarray) -> np.ndarray:
|
||
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
||
return clahe.apply(gray_img)
|
||
|
||
# В градации серого
|
||
g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1
|
||
g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2
|
||
g1 = preprocess(g1)
|
||
g2 = preprocess(g2)
|
||
|
||
# Обнаружение ключевых точек и дескрипторов
|
||
kp1, des1 = self.orb_detector.detectAndCompute(g1, None)
|
||
kp2, des2 = self.orb_detector.detectAndCompute(g2, None)
|
||
|
||
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
|
||
return None, None, None, None, None
|
||
|
||
# kNN сопоставление и тест Лоу
|
||
matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2)
|
||
good_matches: list[cv2.DMatch] = []
|
||
for m_n in matches_knn:
|
||
if len(m_n) < 2:
|
||
continue
|
||
m, n = m_n
|
||
if m.distance < 0.75 * n.distance:
|
||
good_matches.append(m)
|
||
|
||
# Дополнительная фильтрация по расстоянию (мягкий порог)
|
||
good_matches = sorted(good_matches, key=lambda x: x.distance)
|
||
good_matches = [m for m in good_matches if m.distance < 64]
|
||
|
||
if len(good_matches) < 4:
|
||
return None, None, None, None, None
|
||
|
||
# Получаем центр изображения
|
||
height1, width1 = g1.shape[:2]
|
||
height2, width2 = g2.shape[:2]
|
||
center_x1, center_y1 = width1 // 2, height1 // 2
|
||
center_x2, center_y2 = width2 // 2, height2 // 2
|
||
|
||
# Извлекаем координаты сопоставленных точек и отцентрируем их
|
||
src_pts = []
|
||
dst_pts = []
|
||
|
||
for match in good_matches:
|
||
# Координаты точки в первом изображении
|
||
pt1 = kp1[match.queryIdx].pt
|
||
src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]])
|
||
|
||
# Координаты точки во втором изображении
|
||
pt2 = kp2[match.trainIdx].pt
|
||
dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]])
|
||
|
||
# Конвертируем в numpy массивы
|
||
src_pts = np.float32(src_pts).reshape(-1, 1, 2)
|
||
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
|
||
|
||
return src_pts, dst_pts, good_matches, kp1, kp2
|
||
|
||
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray):
|
||
"""
|
||
Оценивает матрицу трансформации на основе сопоставленных ключевых точек
|
||
Точки уже отцентрированы относительно центра изображения
|
||
"""
|
||
# Используем RANSAC для оценки матрицы гомографии
|
||
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 1.0)
|
||
# RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0)
|
||
|
||
if H is None:
|
||
return None
|
||
|
||
# Маска инлайеров
|
||
inliers = int(mask.sum()) if mask is not None else 0
|
||
total = int(mask.size) if mask is not None else 0
|
||
inlier_ratio = (inliers / total) if total > 0 else 0.0
|
||
|
||
# Ошибка репроекции по инлайерам (RMSE)
|
||
rmse = None
|
||
try:
|
||
# Проецируем src через H и сравниваем с dst
|
||
proj = cv2.perspectiveTransform(src_pts, H)
|
||
errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1)
|
||
if mask is not None and mask.shape[0] == errs.shape[0]:
|
||
errs = errs[mask.ravel().astype(bool)]
|
||
if errs.size > 0:
|
||
rmse = float(np.sqrt(np.mean(errs ** 2)))
|
||
except Exception:
|
||
rmse = None
|
||
|
||
# Извлекаем параметры трансформации из матрицы гомографии
|
||
# H = [a11 a12 tx]
|
||
# [a21 a22 ty]
|
||
# [0 0 1 ]
|
||
|
||
# Масштаб и поворот
|
||
a11, a12 = H[0, 0], H[0, 1]
|
||
a21, a22 = H[1, 0], H[1, 1]
|
||
|
||
# Смещение (уже отцентрировано)
|
||
tx, ty = -H[0, 2], -H[1, 2]
|
||
|
||
print(" [Pilot] translate:", tx, ty)
|
||
|
||
# Вычисляем угол поворота
|
||
angle = -np.arctan2(a21, a11)
|
||
|
||
# Вычисляем масштаб
|
||
scale_x = np.sqrt(a11**2 + a21**2)
|
||
scale_y = np.sqrt(a12**2 + a22**2)
|
||
scale = (scale_x + scale_y) / 2
|
||
|
||
return {
|
||
'translation': (tx, ty), # Уже отцентрировано
|
||
'rotation': angle,
|
||
'scale': scale,
|
||
'homography': H,
|
||
'mask': mask,
|
||
'inliers': inliers,
|
||
'inliers_ratio': inlier_ratio,
|
||
'rmse': rmse
|
||
}
|
||
|
||
|
||
|
||
def calc_position(
|
||
self,
|
||
mat: np.ndarray,
|
||
x_source: float,
|
||
y_source: float,
|
||
angle_source: float
|
||
) -> tuple[float, float, float] | None:
|
||
""" Возвращает новые координаты и поворот на основе матрицы преобразования """
|
||
|
||
tx, ty = -mat[0, 2], -mat[1, 2]
|
||
|
||
# Вычисляем угол поворота
|
||
rotation = -np.arctan2(mat[1, 0], mat[0, 0])
|
||
print("[HOMOGRAPHY]", mat)
|
||
print("[ROTATION]", rotation)
|
||
|
||
# Координаты уже отцентрированы, поэтому используем их напрямую
|
||
dx_meters = tx
|
||
dy_meters = ty
|
||
|
||
angle_global = angle_source + rotation
|
||
|
||
# Применяем поворот к смещению (учитываем текущий угол БПЛА)
|
||
cos_angle = math.cos(angle_global)
|
||
sin_angle = math.sin(angle_global)
|
||
|
||
# Поворачиваем смещение в глобальные координаты
|
||
# Обратите внимание: dy_meters инвертирован, так как в изображениях Y направлен вниз
|
||
dx_global = dx_meters * cos_angle - dy_meters * sin_angle
|
||
dy_global = dx_meters * sin_angle + dy_meters * cos_angle
|
||
|
||
# Обновляем координаты БПЛА
|
||
x = x_source + dx_global
|
||
y = y_source + dy_global
|
||
|
||
# Нормализуем угол в диапазоне [-π, π]
|
||
angle = math.atan2(math.sin(angle_global), math.cos(angle_global))
|
||
|
||
return x, y, angle
|
||
|
||
def update_drone_position(self, transformation_info: dict):
|
||
"""
|
||
Обновляет позицию и угол БПЛА на основе трансформации изображения
|
||
Координаты уже отцентрированы относительно центра изображения
|
||
"""
|
||
homography = transformation_info['homography']
|
||
|
||
self.x, self.y, self.angle = self.calc_position(
|
||
homography,
|
||
self.x, self.y, self.angle
|
||
)
|
||
|
||
if self.reserved_pos is not None:
|
||
self.reserved_pos = self.calc_position(
|
||
homography, *self.reserved_pos
|
||
)
|
||
|
||
def get_drone_state(self) -> dict:
|
||
"""
|
||
Возвращает текущее состояние БПЛА
|
||
"""
|
||
return {
|
||
'x': self.x,
|
||
'y': self.y,
|
||
'angle': self.angle,
|
||
'angle_degrees': math.degrees(self.angle),
|
||
'frame_count': self.frame_count
|
||
}
|
||
|
||
|
||
|
||
def get_position_by_chunk(self) -> tuple[float, float, float] | None:
|
||
# Пытаемся найти ориентир на картинке:
|
||
current_image = self.prev_image
|
||
landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB)
|
||
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(landmark_image, current_image)
|
||
|
||
if src_pts is not None and dst_pts is not None and self.vis_manager:
|
||
self.vis_manager.update_chunk_matches(landmark_image, current_image, kp1, kp2, matches)
|
||
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
|
||
if landmark_transform is not None:
|
||
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
|
||
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
|
||
ratio = landmark_transform.get('inliers_ratio', 0.0)
|
||
ok_ratio = (ratio >= self.min_inlier_ratio)
|
||
rmse = landmark_transform.get('rmse', None)
|
||
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
|
||
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
|
||
print("[HELP]")
|
||
print("Matrix", landmark_transform['homography'])
|
||
print("Position", self.x, self.y)
|
||
print("Position of point", self.points[self.target_idx])
|
||
print("[PILOT]", rmse, ratio, ok_rmse)
|
||
# if False:
|
||
# Считаем абсолютную позу относительно координат ориентира
|
||
landmark_world_x, landmark_world_y = self.points[self.target_idx]
|
||
landmark_angle = 0
|
||
homography = landmark_transform['homography']
|
||
# homography = np.linalg.inv(homography)
|
||
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
|
||
return self.calc_position(homography, landmark_world_x, landmark_world_y, landmark_angle)
|
||
return None
|
||
|
||
|
||
def handle(self, image: Image) -> PilotCommand:
|
||
self.frame_count += 1
|
||
|
||
if self.prev_image is None:
|
||
self.prev_image = self.image_to_numpy(image)
|
||
# Вычисляем центр изображения
|
||
height, width = self.prev_image.shape[:2]
|
||
self.image_center = (width // 2, height // 2)
|
||
|
||
return PilotCommand()
|
||
|
||
# Конвертируем текущее изображение
|
||
current_image = self.image_to_numpy(image)
|
||
|
||
# Обновляем центр изображения
|
||
height, width = current_image.shape[:2]
|
||
self.image_center = (width // 2, height // 2)
|
||
|
||
# Расстояние до цели
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.y) ** 2
|
||
)
|
||
|
||
# Обнаруживаем и сопоставляем ключевые точки
|
||
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
|
||
|
||
# Оцениваем смещение БПЛА
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||
|
||
if transformation_info:
|
||
# Обновляем позицию и угол БПЛА (одометрия по кадрам)
|
||
self.update_drone_position(transformation_info)
|
||
|
||
# Выводим текущее состояние БПЛА
|
||
drone_state = self.get_drone_state()
|
||
print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})")
|
||
print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°")
|
||
print(f" [Pilot] Target Index: {self.target_idx}")
|
||
print(f" [Pilot] Target Position: {self.points[self.target_idx]}")
|
||
print(f" [Pilot] Distance: {distance_to_target}")
|
||
|
||
# Обновляем визуализацию
|
||
if self.vis_manager:
|
||
|
||
# Обновляем сопоставление точек
|
||
self.vis_manager.update_matches(
|
||
self.prev_image,
|
||
current_image,
|
||
kp1, kp2, matches,
|
||
transformation_info)
|
||
|
||
mask = transformation_info['mask']
|
||
self.vis_manager.update_motion_vectors(
|
||
current_image,
|
||
kp1, kp2,
|
||
np.array(matches)[mask.ravel().astype(bool)])
|
||
|
||
|
||
# Используем новую визуализацию движения точек по сетке
|
||
homography_matrix = transformation_info['homography']
|
||
self.vis_manager.update_homography_grid(
|
||
current_image,
|
||
homography_matrix,
|
||
grid_step=85)
|
||
|
||
# Пытаемся найти ориентир на картинке:
|
||
self.prev_image = current_image
|
||
npos = self.get_position_by_chunk()
|
||
if npos is not None:
|
||
self.reserved_pos = npos
|
||
|
||
if distance_to_target < 35:
|
||
self.target_idx += 1
|
||
|
||
if self.target_idx == len(self.points):
|
||
return PilotCommand(stop=True)
|
||
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.y) ** 2
|
||
)
|
||
|
||
if self.reserved_pos is not None:
|
||
self.x, self.y, self.angle = self.reserved_pos
|
||
self.reserved_pos = None
|
||
|
||
# Вычисляем угол к цели
|
||
target_angle = math.atan2(self.points[self.target_idx][1] - self.y, self.points[self.target_idx][0] - self.x)
|
||
|
||
angle_trajectory = self.angle + math.pi / 2
|
||
|
||
print("[ANGLE]", angle_trajectory, "->", target_angle)
|
||
|
||
# Вычисляем разность углов (направление поворота)
|
||
angle_diff = target_angle - angle_trajectory
|
||
|
||
# Нормализуем разность углов в диапазон [-π, π]
|
||
angle_diff %= 2 * math.pi
|
||
if angle_diff >= math.pi:
|
||
angle_diff -= 2 * math.pi
|
||
|
||
d_r = max(10, min(75., distance_to_target / 2))
|
||
d_a_limit = d_r / 10 * 0.07
|
||
return PilotCommand(max(min(d_a_limit, angle_diff), -d_a_limit), d_r)
|
||
|
||
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
||
"""Сбрасывает позицию и угол БПЛА"""
|
||
self.x = x
|
||
self.y = y
|
||
self.angle = angle
|
||
self.frame_count = 0
|