359 lines
14 KiB
Python
359 lines
14 KiB
Python
from enum import Enum
|
||
from pathlib import Path
|
||
import math
|
||
import random
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from PIL import Image
|
||
|
||
from timer import Timer
|
||
|
||
from vision_chunk import VisionChunk
|
||
from visualization import VisualizationManager
|
||
from geolocation import Geolocation
|
||
|
||
random.seed(1)
|
||
|
||
class Pilot:
|
||
def __init__(self): pass
|
||
def handle(self, image: Image): pass
|
||
def act(self) -> tuple[float, float] | None: pass
|
||
def get_position(self) -> tuple[float, float]: pass
|
||
|
||
class PilotCommand:
|
||
velocity: float
|
||
dangle: float
|
||
stop: bool
|
||
proccessing_time: float
|
||
|
||
def __init__(self,
|
||
dangle: float = 0,
|
||
velocity: float = 100,
|
||
stop: bool = False,
|
||
processing_time: float = 0.
|
||
):
|
||
self.dangle = dangle
|
||
self.stop = stop
|
||
self.velocity = velocity
|
||
self.proccessing_time = processing_time
|
||
|
||
class AutoPilot(Pilot):
|
||
|
||
# Состояние одометрии
|
||
geo: Geolocation
|
||
|
||
chunks: list[VisionChunk] # Ориентиры
|
||
target_idx: int # Текущий ориентир
|
||
|
||
# Всякие вспомогательные штуки
|
||
timer: Timer
|
||
prev_chunk: VisionChunk | None
|
||
frame_count: int
|
||
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
|
||
|
||
# Положение на основе ориентира
|
||
reserved_geo: Geolocation | None
|
||
|
||
def __init__(self, points = [], chunks = [], viz_manager=None):
|
||
self.prev_chunk = None
|
||
self.geo = Geolocation(0, 0, 1, 0)
|
||
self.chunks = chunks
|
||
self.frame_count = 0
|
||
self.vis_manager = viz_manager # Менеджер визуализации
|
||
self.reserved_geo = None
|
||
|
||
# Пороговые значения качества сопоставления/гомографии
|
||
self.min_inliers: int = 12
|
||
self.min_inlier_ratio: float = 0.5
|
||
self.max_reproj_rmse: float = 3.0
|
||
self.min_scale: float = 0.7
|
||
self.max_scale: float = 1.4
|
||
|
||
self.points = points
|
||
self.target_idx = 0
|
||
|
||
self.timer = Timer()
|
||
|
||
def get_position(self) -> tuple[float, float]:
|
||
return self.geo.x, self.geo.y
|
||
|
||
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
|
||
"""
|
||
Вычисляет оптический поток между двумя кадрами
|
||
Возвращает src_pts и dst_pts в отцентрированных координатах
|
||
"""
|
||
prev_gray = prev_chunk.to_cv2_gray()
|
||
current_gray = current_chunk.to_cv2_gray()
|
||
|
||
h, w = prev_gray.shape[:2]
|
||
cx, cy = w // 2, h // 2
|
||
|
||
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
|
||
step = 35
|
||
grid_points = []
|
||
for y in range(step, h - step, step):
|
||
for x in range(step, w - step, step):
|
||
grid_points.append([x, y])
|
||
|
||
if len(grid_points) < 4:
|
||
return None, None
|
||
|
||
p0 = np.float32(grid_points).reshape(-1, 1, 2)
|
||
|
||
# Параметры для Lucas-Kanade
|
||
lk_params = dict(
|
||
winSize=(11, 11),
|
||
maxLevel=2,
|
||
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
|
||
)
|
||
|
||
# Вычисляем разреженный оптический поток
|
||
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
|
||
|
||
# Фильтруем только успешно отслеженные точки
|
||
good_mask = err < 12.0 # порог ошибки
|
||
good_old = p0[status == 1 & good_mask]
|
||
good_new = p1[status == 1 & good_mask]
|
||
|
||
if len(good_old) < 4:
|
||
return None, None
|
||
|
||
# Преобразуем в отцентрированные координаты (Y вверх)
|
||
src_pts = np.float32([[x - cx, cy - y] for x, y in good_old]).reshape(-1, 1, 2)
|
||
dst_pts = np.float32([[x - cx, cy - y] for x, y in good_new]).reshape(-1, 1, 2)
|
||
|
||
return src_pts, dst_pts
|
||
|
||
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray):
|
||
"""
|
||
Оценивает матрицу трансформации на основе сопоставленных точек
|
||
Точки уже отцентрированы относительно центра изображения
|
||
"""
|
||
# Используем RANSAC для оценки матрицы гомографии
|
||
print("Count of points: ", len(src_pts), len(dst_pts))
|
||
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0, maxIters=1000)
|
||
# RH, rmask = cv2.findHomography(dst_pts, src_pts, cv2.RANSAC, 1.0)
|
||
|
||
if H is None:
|
||
return None
|
||
|
||
# Маска инлайеров
|
||
inliers = int(mask.sum()) if mask is not None else 0
|
||
total = int(mask.size) if mask is not None else 0
|
||
inlier_ratio = (inliers / total) if total > 0 else 0.0
|
||
|
||
# Ошибка репроекции по инлайерам (RMSE)
|
||
rmse = None
|
||
try:
|
||
# Проецируем src через H и сравниваем с dst
|
||
proj = cv2.perspectiveTransform(src_pts, H)
|
||
errs = np.linalg.norm((proj - dst_pts).reshape(-1, 2), axis=1)
|
||
if mask is not None and mask.shape[0] == errs.shape[0]:
|
||
errs = errs[mask.ravel().astype(bool)]
|
||
if errs.size > 0:
|
||
rmse = float(np.sqrt(np.mean(errs ** 2)))
|
||
except Exception:
|
||
rmse = None
|
||
|
||
# Извлекаем параметры трансформации из матрицы гомографии
|
||
# H = [a11 a12 tx]
|
||
# [a21 a22 ty]
|
||
# [0 0 1 ]
|
||
|
||
# Масштаб и поворот
|
||
a11, a12 = H[0, 0], H[0, 1]
|
||
a21, a22 = H[1, 0], H[1, 1]
|
||
|
||
# Смещение (уже отцентрировано)
|
||
tx, ty = -H[0, 2], -H[1, 2]
|
||
|
||
# print(" [Pilot] translate:", tx, ty)
|
||
|
||
# Вычисляем угол поворота
|
||
angle = -np.arctan2(a21, a11)
|
||
|
||
# Вычисляем масштаб
|
||
scale_x = np.sqrt(a11**2 + a21**2)
|
||
scale_y = np.sqrt(a12**2 + a22**2)
|
||
scale = (scale_x + scale_y) / 2
|
||
|
||
return {
|
||
'translation': (tx, ty), # Уже отцентрировано
|
||
'rotation': angle,
|
||
'scale': scale,
|
||
'homography': H,
|
||
'mask': mask,
|
||
'inliers': inliers,
|
||
'inliers_ratio': inlier_ratio,
|
||
'rmse': rmse
|
||
}
|
||
|
||
def update_drone_position(self, transformation_info: dict):
|
||
"""
|
||
Обновляет позицию и угол БПЛА на основе трансформации изображения
|
||
Координаты уже отцентрированы относительно центра изображения
|
||
"""
|
||
homography = transformation_info['homography']
|
||
self.geo @= homography
|
||
|
||
if self.reserved_geo is not None:
|
||
self.reserved_geo @= homography
|
||
|
||
def get_drone_state(self) -> dict:
|
||
"""
|
||
Возвращает текущее состояние БПЛА
|
||
"""
|
||
return {
|
||
'x': self.geo.x,
|
||
'y': self.geo.y,
|
||
'angle': self.geo.yaw,
|
||
'angle_degrees': math.degrees(self.geo.yaw),
|
||
'frame_count': self.frame_count
|
||
}
|
||
|
||
def get_position_by_chunk(self) -> Geolocation | None:
|
||
# Пытаемся найти ориентир на картинке:
|
||
current_chunk = self.prev_chunk
|
||
landmark_chunk = self.chunks[self.target_idx]
|
||
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
|
||
|
||
if src_pts is not None and dst_pts is not None and self.vis_manager:
|
||
was_enabled = self.timer.enabled
|
||
if was_enabled: self.timer.stop()
|
||
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches)
|
||
if was_enabled: self.timer.start()
|
||
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
|
||
if landmark_transform is not None:
|
||
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
|
||
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
|
||
ratio = landmark_transform.get('inliers_ratio', 0.0)
|
||
ok_ratio = (ratio >= self.min_inlier_ratio)
|
||
rmse = landmark_transform.get('rmse', None)
|
||
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
|
||
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
|
||
# print("[HELP]")
|
||
# print("Matrix", landmark_transform['homography'])
|
||
# print("Position", self.x, self.y)
|
||
# print("Position of point", self.points[self.target_idx])
|
||
# print("[PILOT]", rmse, ratio, ok_rmse)
|
||
# if False:
|
||
# Считаем абсолютную позу относительно координат ориентира
|
||
landmark_world_x, landmark_world_y = self.points[self.target_idx]
|
||
landmark = Geolocation(landmark_world_x, landmark_world_y, 1, 0)
|
||
homography = landmark_transform['homography']
|
||
# homography = np.linalg.inv(homography)
|
||
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
|
||
return landmark @ homography
|
||
return None
|
||
|
||
|
||
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
|
||
self.timer.start()
|
||
self.frame_count += 1
|
||
|
||
if self.prev_chunk is None:
|
||
self.prev_chunk = current_chunk
|
||
self.timer.stop()
|
||
return PilotCommand(processing_time=self.timer.get_elapsed())
|
||
|
||
# Расстояние до цели
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.geo.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.geo.y) ** 2
|
||
)
|
||
|
||
# Вычисляем оптический поток для покадрового сравнения
|
||
optical_flow_timer = Timer()
|
||
optical_flow_timer.start()
|
||
src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
|
||
optical_flow_timer.stop()
|
||
print(f"Optical flow calculating: {optical_flow_timer.get_elapsed() * 1000:.2f} ms")
|
||
|
||
# Оцениваем смещение БПЛА
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
|
||
matrix_estimation_timer = Timer()
|
||
matrix_estimation_timer.start()
|
||
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||
matrix_estimation_timer.stop()
|
||
print(f"Transformation matrix estimating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
|
||
|
||
if transformation_info:
|
||
# Обновляем позицию и угол БПЛА (одометрия по кадрам)
|
||
self.update_drone_position(transformation_info)
|
||
|
||
self.timer.stop()
|
||
|
||
# Обновляем визуализацию
|
||
if self.vis_manager:
|
||
# Используем визуализацию движения точек по сетке
|
||
homography_matrix = transformation_info['homography']
|
||
self.vis_manager.update_homography_grid(
|
||
current_chunk.to_numpy(),
|
||
homography_matrix,
|
||
grid_step=85)
|
||
|
||
self.timer.start()
|
||
|
||
|
||
chunk_timer = Timer()
|
||
chunk_timer.start()
|
||
# Пытаемся найти ориентир на картинке:
|
||
self.prev_chunk = current_chunk
|
||
# npos = self.get_position_by_chunk()
|
||
# if npos is not None:
|
||
# self.reserved_geo = npos
|
||
|
||
chunk_timer.stop()
|
||
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms")
|
||
|
||
if distance_to_target < 35:
|
||
self.target_idx += 1
|
||
|
||
if self.target_idx == len(self.points):
|
||
return PilotCommand(stop=True)
|
||
|
||
distance_to_target = math.sqrt(
|
||
(self.points[self.target_idx][0] - self.geo.x) ** 2 +
|
||
(self.points[self.target_idx][1] - self.geo.y) ** 2
|
||
)
|
||
|
||
if self.reserved_geo is not None:
|
||
self.geo = self.reserved_geo
|
||
self.reserved_geo = None
|
||
|
||
# Вычисляем угол к цели
|
||
target_angle = math.atan2(self.points[self.target_idx][1] - self.geo.y, self.points[self.target_idx][0] - self.geo.x)
|
||
|
||
angle_trajectory = self.geo.yaw + math.pi / 2
|
||
|
||
# print("[ANGLE]", angle_trajectory, "->", target_angle)
|
||
|
||
# Вычисляем разность углов (направление поворота)
|
||
angle_diff = target_angle - angle_trajectory
|
||
|
||
# Нормализуем разность углов в диапазон [-π, π]
|
||
angle_diff %= 2 * math.pi
|
||
if angle_diff >= math.pi:
|
||
angle_diff -= 2 * math.pi
|
||
|
||
d_r = max(10, min(35., distance_to_target / 2))
|
||
d_a_limit = d_r / 10 * 0.01
|
||
|
||
command = PilotCommand(
|
||
max(min(d_a_limit, angle_diff), -d_a_limit),
|
||
d_r, False, self.timer.get_elapsed()
|
||
)
|
||
self.timer.reset()
|
||
return command
|
||
|
||
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
||
"""Сбрасывает позицию и угол БПЛА"""
|
||
self.geo = Geolocation(x, y, 1, angle)
|