feat: add coordinates tracking
This commit is contained in:
418
advanced_autopilot.py
Normal file
418
advanced_autopilot.py
Normal file
@@ -0,0 +1,418 @@
|
||||
"""
|
||||
Расширенная система автопилота с улучшенным отслеживанием координат и угла БПЛА
|
||||
"""
|
||||
|
||||
import random
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
import math
|
||||
from typing import Optional, Tuple, Dict, List
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
|
||||
random.seed(1)
|
||||
|
||||
@dataclass
|
||||
class DroneState:
|
||||
"""Структура для хранения состояния БПЛА"""
|
||||
x: float = 0.0
|
||||
y: float = 0.0
|
||||
altitude: float = 10.0
|
||||
angle: float = 0.0
|
||||
velocity_x: float = 0.0
|
||||
velocity_y: float = 0.0
|
||||
angular_velocity: float = 0.0
|
||||
timestamp: float = 0.0
|
||||
confidence: float = 1.0
|
||||
|
||||
class AdvancedAutoPilot:
|
||||
"""
|
||||
Расширенный автопилот с улучшенным отслеживанием координат и угла БПЛА
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
initial_x: float = 0.0,
|
||||
initial_y: float = 0.0,
|
||||
initial_altitude: float = 10.0,
|
||||
camera_fov: float = 60.0,
|
||||
image_width: int = 640,
|
||||
image_height: int = 480):
|
||||
|
||||
# Состояние БПЛА
|
||||
self.state = DroneState(
|
||||
x=initial_x,
|
||||
y=initial_y,
|
||||
altitude=initial_altitude,
|
||||
timestamp=time.time()
|
||||
)
|
||||
|
||||
# Параметры камеры
|
||||
self.camera_fov = math.radians(camera_fov)
|
||||
self.image_width = image_width
|
||||
self.image_height = image_height
|
||||
|
||||
# Вычисляем коэффициент перевода пикселей в метры
|
||||
self.update_pixel_to_meter_ratio()
|
||||
|
||||
# История состояний для фильтрации
|
||||
self.state_history: List[DroneState] = []
|
||||
self.max_history_size = 10
|
||||
|
||||
# Параметры фильтрации
|
||||
self.velocity_filter_alpha = 0.3 # Коэффициент сглаживания скорости
|
||||
self.position_filter_alpha = 0.7 # Коэффициент сглаживания позиции
|
||||
|
||||
# Инициализация детекторов
|
||||
self.orb_detector = cv2.ORB_create(
|
||||
nfeatures=2000,
|
||||
scaleFactor=1.2,
|
||||
nlevels=8,
|
||||
edgeThreshold=31,
|
||||
firstLevel=0,
|
||||
WTA_K=2,
|
||||
patchSize=31,
|
||||
fastThreshold=20
|
||||
)
|
||||
|
||||
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
|
||||
|
||||
# Предыдущее изображение
|
||||
self.prev_image: Optional[np.ndarray] = None
|
||||
self.prev_timestamp: float = 0.0
|
||||
|
||||
# Счетчики
|
||||
self.frame_count = 0
|
||||
self.successful_tracking_frames = 0
|
||||
|
||||
# Статистика
|
||||
self.tracking_stats = {
|
||||
'total_frames': 0,
|
||||
'successful_frames': 0,
|
||||
'failed_frames': 0,
|
||||
'avg_processing_time': 0.0
|
||||
}
|
||||
|
||||
def update_pixel_to_meter_ratio(self):
|
||||
"""Обновляет коэффициент перевода пикселей в метры на основе высоты и FOV камеры"""
|
||||
# Вычисляем размер пикселя на земле при текущей высоте
|
||||
ground_distance = 2 * self.state.altitude * math.tan(self.camera_fov / 2)
|
||||
self.pixel_to_meter_ratio = ground_distance / self.image_width
|
||||
|
||||
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[List], Optional[List], Optional[List]]:
|
||||
"""Обнаруживает и сопоставляет ключевые точки между двумя изображениями"""
|
||||
# Обнаружение ключевых точек и дескрипторов
|
||||
kp1, des1 = self.orb_detector.detectAndCompute(img1, None)
|
||||
kp2, des2 = self.orb_detector.detectAndCompute(img2, None)
|
||||
|
||||
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
|
||||
return None, None, None, None, None
|
||||
|
||||
# Сопоставление ключевых точек
|
||||
matches = self.bf_matcher.match(des1, des2)
|
||||
matches = sorted(matches, key=lambda x: x.distance)
|
||||
|
||||
# Фильтрация хороших совпадений
|
||||
good_matches = []
|
||||
for match in matches:
|
||||
if match.distance < 50: # Порог расстояния
|
||||
good_matches.append(match)
|
||||
|
||||
if len(good_matches) < 8: # Увеличиваем минимальное количество совпадений
|
||||
return None, None, None, None, None
|
||||
|
||||
# Извлечение координат сопоставленных точек
|
||||
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||||
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||||
|
||||
return src_pts, dst_pts, good_matches, kp1, kp2
|
||||
|
||||
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Optional[Dict]:
|
||||
"""Оценивает матрицу трансформации с улучшенной обработкой ошибок"""
|
||||
try:
|
||||
# Используем RANSAC для оценки матрицы гомографии
|
||||
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0)
|
||||
|
||||
if H is None:
|
||||
return None
|
||||
|
||||
# Проверяем качество матрицы
|
||||
if not self.is_valid_homography(H):
|
||||
return None
|
||||
|
||||
# Извлекаем параметры трансформации
|
||||
a11, a12 = H[0, 0], H[0, 1]
|
||||
a21, a22 = H[1, 0], H[1, 1]
|
||||
tx, ty = H[0, 2], H[1, 2]
|
||||
|
||||
# Вычисляем угол поворота
|
||||
angle = np.arctan2(a21, a11)
|
||||
|
||||
# Вычисляем масштаб
|
||||
scale_x = np.sqrt(a11**2 + a21**2)
|
||||
scale_y = np.sqrt(a12**2 + a22**2)
|
||||
scale = (scale_x + scale_y) / 2
|
||||
|
||||
# Проверяем разумность масштаба
|
||||
if scale < 0.5 or scale > 2.0:
|
||||
return None
|
||||
|
||||
return {
|
||||
'translation': (tx, ty),
|
||||
'rotation': angle,
|
||||
'scale': scale,
|
||||
'homography': H,
|
||||
'mask': mask,
|
||||
'inliers_ratio': np.sum(mask) / len(mask) if mask is not None else 0.0
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Ошибка при оценке трансформации: {e}")
|
||||
return None
|
||||
|
||||
def is_valid_homography(self, H: np.ndarray) -> bool:
|
||||
"""Проверяет валидность матрицы гомографии"""
|
||||
if H is None:
|
||||
return False
|
||||
|
||||
# Проверяем, что матрица не вырождена
|
||||
det = np.linalg.det(H)
|
||||
if abs(det) < 1e-6:
|
||||
return False
|
||||
|
||||
# Проверяем, что элементы матрицы разумны
|
||||
if np.any(np.abs(H) > 1000):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def update_drone_position(self, transformation_info: Dict, dt: float):
|
||||
"""Обновляет позицию и состояние БПЛА с учетом временного интервала"""
|
||||
tx, ty = transformation_info['translation']
|
||||
rotation = transformation_info['rotation']
|
||||
scale = transformation_info['scale']
|
||||
inliers_ratio = transformation_info.get('inliers_ratio', 1.0)
|
||||
|
||||
# Конвертируем смещение в пикселях в метры
|
||||
dx_meters = tx * self.pixel_to_meter_ratio
|
||||
dy_meters = ty * self.pixel_to_meter_ratio
|
||||
|
||||
# Применяем поворот к смещению (учитываем текущий угол БПЛА)
|
||||
cos_angle = math.cos(self.state.angle)
|
||||
sin_angle = math.sin(self.state.angle)
|
||||
|
||||
# Поворачиваем смещение в глобальные координаты
|
||||
dx_global = dx_meters * cos_angle - dy_meters * sin_angle
|
||||
dy_global = dx_meters * sin_angle + dy_meters * cos_angle
|
||||
|
||||
# Вычисляем скорости
|
||||
velocity_x = dx_global / dt if dt > 0 else 0.0
|
||||
velocity_y = dy_global / dt if dt > 0 else 0.0
|
||||
angular_velocity = rotation / dt if dt > 0 else 0.0
|
||||
|
||||
# Применяем фильтрацию
|
||||
self.state.velocity_x = (self.velocity_filter_alpha * velocity_x +
|
||||
(1 - self.velocity_filter_alpha) * self.state.velocity_x)
|
||||
self.state.velocity_y = (self.velocity_filter_alpha * velocity_y +
|
||||
(1 - self.velocity_filter_alpha) * self.state.velocity_y)
|
||||
self.state.angular_velocity = (self.velocity_filter_alpha * angular_velocity +
|
||||
(1 - self.velocity_filter_alpha) * self.state.angular_velocity)
|
||||
|
||||
# Обновляем позицию с фильтрацией
|
||||
new_x = self.state.x + dx_global
|
||||
new_y = self.state.y + dy_global
|
||||
new_angle = self.state.angle + rotation
|
||||
|
||||
self.state.x = (self.position_filter_alpha * new_x +
|
||||
(1 - self.position_filter_alpha) * self.state.x)
|
||||
self.state.y = (self.position_filter_alpha * new_y +
|
||||
(1 - self.position_filter_alpha) * self.state.y)
|
||||
self.state.angle = (self.position_filter_alpha * new_angle +
|
||||
(1 - self.position_filter_alpha) * self.state.angle)
|
||||
|
||||
# Нормализуем угол
|
||||
self.state.angle = math.atan2(math.sin(self.state.angle), math.cos(self.state.angle))
|
||||
|
||||
# Обновляем временную метку и уверенность
|
||||
self.state.timestamp = time.time()
|
||||
self.state.confidence = min(1.0, inliers_ratio * 1.5) # Увеличиваем уверенность при хороших совпадениях
|
||||
|
||||
def handle(self, image: Image.Image) -> bool:
|
||||
"""Обрабатывает новый кадр и обновляет состояние БПЛА"""
|
||||
start_time = time.time()
|
||||
self.frame_count += 1
|
||||
self.tracking_stats['total_frames'] += 1
|
||||
|
||||
# Конвертируем изображение
|
||||
current_image = np.array(image)
|
||||
current_timestamp = time.time()
|
||||
|
||||
if self.prev_image is None:
|
||||
self.prev_image = current_image
|
||||
self.prev_timestamp = current_timestamp
|
||||
return False
|
||||
|
||||
# Вычисляем временной интервал
|
||||
dt = current_timestamp - self.prev_timestamp
|
||||
|
||||
# Обнаруживаем и сопоставляем ключевые точки
|
||||
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
|
||||
|
||||
if src_pts is not None and dst_pts is not None:
|
||||
# Оцениваем матрицу трансформации
|
||||
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||||
|
||||
if transformation_info:
|
||||
# Обновляем позицию БПЛА
|
||||
self.update_drone_position(transformation_info, dt)
|
||||
|
||||
# Добавляем состояние в историю
|
||||
self.add_state_to_history()
|
||||
|
||||
# Обновляем статистику
|
||||
self.successful_tracking_frames += 1
|
||||
self.tracking_stats['successful_frames'] += 1
|
||||
|
||||
# Выводим информацию
|
||||
self.print_tracking_info(transformation_info)
|
||||
|
||||
# Визуализация
|
||||
self.visualize_tracking(current_image, kp1, kp2, matches, transformation_info)
|
||||
|
||||
success = True
|
||||
else:
|
||||
self.tracking_stats['failed_frames'] += 1
|
||||
success = False
|
||||
else:
|
||||
self.tracking_stats['failed_frames'] += 1
|
||||
success = False
|
||||
|
||||
# Обновляем предыдущее изображение
|
||||
self.prev_image = current_image
|
||||
self.prev_timestamp = current_timestamp
|
||||
|
||||
# Обновляем статистику времени обработки
|
||||
processing_time = time.time() - start_time
|
||||
self.tracking_stats['avg_processing_time'] = (
|
||||
(self.tracking_stats['avg_processing_time'] * (self.frame_count - 1) + processing_time) / self.frame_count
|
||||
)
|
||||
|
||||
return success
|
||||
|
||||
def add_state_to_history(self):
|
||||
"""Добавляет текущее состояние в историю"""
|
||||
# Создаем копию текущего состояния
|
||||
state_copy = DroneState(
|
||||
x=self.state.x,
|
||||
y=self.state.y,
|
||||
altitude=self.state.altitude,
|
||||
angle=self.state.angle,
|
||||
velocity_x=self.state.velocity_x,
|
||||
velocity_y=self.state.velocity_y,
|
||||
angular_velocity=self.state.angular_velocity,
|
||||
timestamp=self.state.timestamp,
|
||||
confidence=self.state.confidence
|
||||
)
|
||||
|
||||
self.state_history.append(state_copy)
|
||||
|
||||
# Ограничиваем размер истории
|
||||
if len(self.state_history) > self.max_history_size:
|
||||
self.state_history.pop(0)
|
||||
|
||||
def get_drone_state(self) -> Dict:
|
||||
"""Возвращает текущее состояние БПЛА в виде словаря"""
|
||||
return {
|
||||
'x': self.state.x,
|
||||
'y': self.state.y,
|
||||
'altitude': self.state.altitude,
|
||||
'angle': self.state.angle,
|
||||
'angle_degrees': math.degrees(self.state.angle),
|
||||
'velocity_x': self.state.velocity_x,
|
||||
'velocity_y': self.state.velocity_y,
|
||||
'angular_velocity': self.state.angular_velocity,
|
||||
'confidence': self.state.confidence,
|
||||
'frame_count': self.frame_count,
|
||||
'timestamp': self.state.timestamp
|
||||
}
|
||||
|
||||
def get_tracking_stats(self) -> Dict:
|
||||
"""Возвращает статистику отслеживания"""
|
||||
success_rate = (self.tracking_stats['successful_frames'] /
|
||||
max(1, self.tracking_stats['total_frames'])) * 100
|
||||
|
||||
return {
|
||||
**self.tracking_stats,
|
||||
'success_rate_percent': success_rate,
|
||||
'current_altitude': self.state.altitude,
|
||||
'pixel_to_meter_ratio': self.pixel_to_meter_ratio
|
||||
}
|
||||
|
||||
def set_altitude(self, altitude: float):
|
||||
"""Устанавливает высоту БПЛА и пересчитывает масштаб"""
|
||||
self.state.altitude = altitude
|
||||
self.update_pixel_to_meter_ratio()
|
||||
|
||||
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
||||
"""Сбрасывает позицию и угол БПЛА"""
|
||||
self.state.x = x
|
||||
self.state.y = y
|
||||
self.state.angle = angle
|
||||
self.state.velocity_x = 0.0
|
||||
self.state.velocity_y = 0.0
|
||||
self.state.angular_velocity = 0.0
|
||||
self.state_history.clear()
|
||||
self.frame_count = 0
|
||||
self.successful_tracking_frames = 0
|
||||
|
||||
def print_tracking_info(self, transformation_info: Dict):
|
||||
"""Выводит информацию об отслеживании"""
|
||||
state = self.get_drone_state()
|
||||
stats = self.get_tracking_stats()
|
||||
|
||||
print(f"Frame {self.frame_count}:")
|
||||
print(f" Position: ({state['x']:.3f}, {state['y']:.3f}) m")
|
||||
print(f" Angle: {state['angle_degrees']:.1f}°")
|
||||
print(f" Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s")
|
||||
print(f" Confidence: {state['confidence']:.2f}")
|
||||
print(f" Success Rate: {stats['success_rate_percent']:.1f}%")
|
||||
|
||||
def visualize_tracking(self, current_image: np.ndarray, kp1, kp2, matches, transformation_info: Dict):
|
||||
"""Визуализирует процесс отслеживания"""
|
||||
if kp1 is None or kp2 is None or matches is None:
|
||||
return
|
||||
|
||||
# Рисуем сопоставления
|
||||
img_matches = cv2.drawMatches(self.prev_image, kp1, current_image, kp2, matches, None,
|
||||
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
|
||||
|
||||
# Добавляем информацию о состоянии
|
||||
state = self.get_drone_state()
|
||||
stats = self.get_tracking_stats()
|
||||
|
||||
info_lines = [
|
||||
f"Position: ({state['x']:.2f}, {state['y']:.2f}) m",
|
||||
f"Angle: {state['angle_degrees']:.1f}°",
|
||||
f"Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s",
|
||||
f"Altitude: {state['altitude']:.1f} m",
|
||||
f"Confidence: {state['confidence']:.2f}",
|
||||
f"Success Rate: {stats['success_rate_percent']:.1f}%"
|
||||
]
|
||||
|
||||
for i, line in enumerate(info_lines):
|
||||
cv2.putText(img_matches, line, (10, 30 + i * 25),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
|
||||
|
||||
cv2.imshow('Advanced Drone Tracking', img_matches)
|
||||
cv2.waitKey(1)
|
||||
|
||||
def act(self) -> float:
|
||||
"""Возвращает угол поворота для управления дроном"""
|
||||
return self.state.angle
|
||||
|
||||
def get_trajectory_history(self) -> Tuple[List[float], List[float], List[float]]:
|
||||
"""Возвращает историю траектории"""
|
||||
x_history = [state.x for state in self.state_history]
|
||||
y_history = [state.y for state in self.state_history]
|
||||
angle_history = [math.degrees(state.angle) for state in self.state_history]
|
||||
|
||||
return x_history, y_history, angle_history
|
||||
Reference in New Issue
Block a user