293 lines
12 KiB
Python
293 lines
12 KiB
Python
from enum import Enum
|
|
from pathlib import Path
|
|
import math
|
|
import random
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
from timer import Timer
|
|
|
|
from vision_chunk import VisionChunk
|
|
from visualization import VisualizationManager
|
|
from position import Position
|
|
from constants import CAMERA_HEIGHT
|
|
from utility import estimate_transformation_matrix
|
|
|
|
random.seed(1)
|
|
|
|
class Pilot:
|
|
def __init__(self): pass
|
|
def handle(self, image: Image): pass
|
|
def act(self) -> tuple[float, float] | None: pass
|
|
def get_position(self) -> tuple[float, float]: pass
|
|
|
|
class PilotCommand:
|
|
velocity: float
|
|
dangle: float
|
|
stop: bool
|
|
proccessing_time: float
|
|
|
|
def __init__(self,
|
|
dangle: float = 0,
|
|
velocity: float = 100,
|
|
stop: bool = False,
|
|
processing_time: float = 0.
|
|
):
|
|
self.dangle = dangle
|
|
self.stop = stop
|
|
self.velocity = velocity
|
|
self.proccessing_time = processing_time
|
|
|
|
class AutoPilot(Pilot):
|
|
|
|
# Состояние одометрии
|
|
pos: Position
|
|
|
|
chunks: list[VisionChunk] # Ориентиры
|
|
target_idx: int # Текущий ориентир
|
|
|
|
# Всякие вспомогательные штуки
|
|
timer: Timer
|
|
prev_chunk: VisionChunk | None
|
|
frame_count: int
|
|
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
|
|
|
|
# Положение на основе ориентира
|
|
reserved_pos: Position | None
|
|
|
|
def __init__(self, points = [], chunks = [], viz_manager=None):
|
|
self.prev_chunk = None
|
|
self.pos = Position(0, 0, 1, 0, 0, 0)
|
|
self.chunks = chunks
|
|
self.frame_count = 0
|
|
self.vis_manager = viz_manager # Менеджер визуализации
|
|
self.reserved_pos = None
|
|
|
|
# Пороговые значения качества сопоставления/гомографии
|
|
self.min_inliers: int = 12
|
|
self.min_inlier_ratio: float = 0.5
|
|
self.max_reproj_rmse: float = 3.0
|
|
self.min_scale: float = 0.7
|
|
self.max_scale: float = 1.4
|
|
|
|
self.points = points
|
|
self.target_idx = 0
|
|
|
|
self.timer = Timer()
|
|
|
|
def get_position(self) -> tuple[float, float]:
|
|
return self.pos.x, self.pos.y
|
|
|
|
def calculate_optical_flow(self, prev_chunk: VisionChunk, current_chunk: VisionChunk):
|
|
"""
|
|
Вычисляет оптический поток между двумя кадрами
|
|
Возвращает src_pts и dst_pts в отцентрированных координатах
|
|
"""
|
|
prev_gray = prev_chunk.to_cv2_gray()
|
|
current_gray = current_chunk.to_cv2_gray()
|
|
|
|
h, w = prev_gray.shape[:2]
|
|
|
|
# Создаем сетку точек для отслеживания (аналогично вашему step=20)
|
|
step = 35
|
|
grid_points = []
|
|
for y in range(step, h - step, step):
|
|
for x in range(step, w - step, step):
|
|
grid_points.append([x, y])
|
|
|
|
if len(grid_points) < 4:
|
|
return None, None
|
|
|
|
p0 = np.float32(grid_points).reshape(-1, 1, 2)
|
|
|
|
# Параметры для Lucas-Kanade
|
|
lk_params = dict(
|
|
winSize=(11, 11),
|
|
maxLevel=2,
|
|
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
|
|
)
|
|
|
|
# Вычисляем разреженный оптический поток
|
|
p1, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, current_gray, p0, None, **lk_params)
|
|
|
|
# Фильтруем только успешно отслеженные точки
|
|
good_mask = err < 12.0 # порог ошибки
|
|
good_old = p0[status == 1 & good_mask]
|
|
good_new = p1[status == 1 & good_mask]
|
|
|
|
if len(good_old) < 4:
|
|
return None, None
|
|
|
|
# Преобразуем в отцентрированные координаты (Y вверх)
|
|
src_pts = np.float32([[x, y] for x, y in good_old]).reshape(-1, 1, 2)
|
|
dst_pts = np.float32([[x, y] for x, y in good_new]).reshape(-1, 1, 2)
|
|
|
|
return src_pts, dst_pts
|
|
|
|
def update_drone_position(self, homography_matrix: np.ndarray):
|
|
"""
|
|
Обновляет позицию и угол БПЛА на основе трансформации изображения
|
|
Координаты уже отцентрированы относительно центра изображения
|
|
"""
|
|
self.pos.iapply(homography_matrix)
|
|
|
|
if self.reserved_pos is not None:
|
|
self.reserved_pos.iapply(homography_matrix)
|
|
|
|
def get_drone_state(self) -> dict:
|
|
"""
|
|
Возвращает текущее состояние БПЛА
|
|
"""
|
|
return {
|
|
'x': self.pos.x,
|
|
'y': self.pos.y,
|
|
'angle': self.pos.yaw,
|
|
'angle_degrees': math.degrees(self.pos.yaw),
|
|
'frame_count': self.frame_count
|
|
}
|
|
|
|
def get_position_by_chunk(self) -> Position | None:
|
|
# Пытаемся найти ориентир на картинке:
|
|
current_chunk = self.prev_chunk
|
|
landmark_chunk = self.chunks[self.target_idx]
|
|
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
|
|
|
|
if src_pts is not None and dst_pts is not None and self.vis_manager:
|
|
was_enabled = self.timer.enabled
|
|
if was_enabled: self.timer.stop()
|
|
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches)
|
|
if was_enabled: self.timer.start()
|
|
|
|
if src_pts is not None and dst_pts is not None:
|
|
# Оцениваем матрицу трансформации
|
|
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts)
|
|
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол
|
|
if landmark_transform is not None:
|
|
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale)
|
|
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers)
|
|
ratio = landmark_transform.get('inliers_ratio', 0.0)
|
|
ok_ratio = (ratio >= self.min_inlier_ratio)
|
|
rmse = landmark_transform.get('rmse', None)
|
|
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse)
|
|
if ok_scale and ok_inliers and ok_ratio and ok_rmse:
|
|
# print("[HELP]")
|
|
# print("Matrix", landmark_transform['homography'])
|
|
# print("Position", self.x, self.y)
|
|
# print("Position of point", self.points[self.target_idx])
|
|
# print("[PILOT]", rmse, ratio, ok_rmse)
|
|
# if False:
|
|
# Считаем абсолютную позу относительно координат ориентира
|
|
landmark_world_x, landmark_world_y = self.points[self.target_idx]
|
|
landmark = Position(landmark_world_x, landmark_world_y, 1, 0)
|
|
homography = landmark_transform['homography']
|
|
# homography = np.linalg.inv(homography)
|
|
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})")
|
|
return landmark @ homography
|
|
return None
|
|
|
|
|
|
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
|
|
self.timer.start()
|
|
self.frame_count += 1
|
|
|
|
if self.prev_chunk is None:
|
|
self.prev_chunk = current_chunk
|
|
self.timer.stop()
|
|
return PilotCommand(processing_time=self.timer.get_elapsed())
|
|
|
|
# Расстояние до цели
|
|
distance_to_target = math.sqrt(
|
|
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
|
|
(self.points[self.target_idx][1] - self.pos.y) ** 2
|
|
)
|
|
|
|
# Вычисляем оптический поток для покадрового сравнения
|
|
matching_timer = Timer()
|
|
matching_timer.start()
|
|
# src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
|
|
src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
|
|
matching_timer.stop()
|
|
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
|
|
|
|
# Оцениваем смещение БПЛА
|
|
if src_pts is not None and dst_pts is not None:
|
|
# Оцениваем матрицу трансформации
|
|
|
|
matrix_estimation_timer = Timer()
|
|
matrix_estimation_timer.start()
|
|
homography_matrix = estimate_transformation_matrix(src_pts, dst_pts)
|
|
matrix_estimation_timer.stop()
|
|
print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
|
|
|
|
if homography_matrix is not None:
|
|
self.update_drone_position(homography_matrix)
|
|
self.timer.stop()
|
|
|
|
# Обновляем визуализацию
|
|
if self.vis_manager:
|
|
# Используем визуализацию движения точек по сетке
|
|
self.vis_manager.update_homography_grid(
|
|
current_chunk.to_numpy(),
|
|
homography_matrix,
|
|
grid_step=35)
|
|
|
|
self.timer.start()
|
|
|
|
chunk_timer = Timer()
|
|
chunk_timer.start()
|
|
|
|
# Пытаемся найти ориентир на картинке:
|
|
self.prev_chunk = current_chunk
|
|
# npos = self.get_position_by_chunk()
|
|
# if npos is not None:
|
|
# self.reserved_pos = npos
|
|
|
|
chunk_timer.stop()
|
|
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms")
|
|
|
|
if distance_to_target < 35:
|
|
self.target_idx += 1
|
|
|
|
if self.target_idx == len(self.points):
|
|
return PilotCommand(stop=True)
|
|
|
|
distance_to_target = math.sqrt(
|
|
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
|
|
(self.points[self.target_idx][1] - self.pos.y) ** 2
|
|
)
|
|
|
|
if self.reserved_pos is not None:
|
|
self.pos = self.reserved_pos
|
|
self.reserved_pos = None
|
|
|
|
# Вычисляем угол к цели
|
|
target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x)
|
|
|
|
angle_trajectory = self.pos.yaw + math.pi / 2
|
|
|
|
# print("[ANGLE]", angle_trajectory, "->", target_angle)
|
|
|
|
# Вычисляем разность углов (направление поворота)
|
|
angle_diff = target_angle - angle_trajectory
|
|
|
|
# Нормализуем разность углов в диапазон [-π, π]
|
|
angle_diff %= 2 * math.pi
|
|
if angle_diff >= math.pi:
|
|
angle_diff -= 2 * math.pi
|
|
|
|
d_r = max(10, min(35., distance_to_target / 2))
|
|
d_a_limit = d_r / 10 * 0.01
|
|
|
|
command = PilotCommand(
|
|
max(min(d_a_limit, angle_diff), -d_a_limit),
|
|
d_r, False, self.timer.get_elapsed()
|
|
)
|
|
self.timer.reset()
|
|
return command
|
|
|
|
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
|
"""Сбрасывает позицию и угол БПЛА"""
|
|
self.pos = Position(x, y, 1, angle)
|