418 lines
18 KiB
Python
418 lines
18 KiB
Python
"""
|
||
Расширенная система автопилота с улучшенным отслеживанием координат и угла БПЛА
|
||
"""
|
||
|
||
import random
|
||
import cv2
|
||
import numpy as np
|
||
from PIL import Image
|
||
import math
|
||
from typing import Optional, Tuple, Dict, List
|
||
from dataclasses import dataclass
|
||
import time
|
||
|
||
random.seed(1)
|
||
|
||
@dataclass
|
||
class DroneState:
|
||
"""Структура для хранения состояния БПЛА"""
|
||
x: float = 0.0
|
||
y: float = 0.0
|
||
altitude: float = 10.0
|
||
angle: float = 0.0
|
||
velocity_x: float = 0.0
|
||
velocity_y: float = 0.0
|
||
angular_velocity: float = 0.0
|
||
timestamp: float = 0.0
|
||
confidence: float = 1.0
|
||
|
||
class AdvancedAutoPilot:
|
||
"""
|
||
Расширенный автопилот с улучшенным отслеживанием координат и угла БПЛА
|
||
"""
|
||
|
||
def __init__(self,
|
||
initial_x: float = 0.0,
|
||
initial_y: float = 0.0,
|
||
initial_altitude: float = 10.0,
|
||
camera_fov: float = 60.0,
|
||
image_width: int = 640,
|
||
image_height: int = 480):
|
||
|
||
# Состояние БПЛА
|
||
self.state = DroneState(
|
||
x=initial_x,
|
||
y=initial_y,
|
||
altitude=initial_altitude,
|
||
timestamp=time.time()
|
||
)
|
||
|
||
# Параметры камеры
|
||
self.camera_fov = math.radians(camera_fov)
|
||
self.image_width = image_width
|
||
self.image_height = image_height
|
||
|
||
# Вычисляем коэффициент перевода пикселей в метры
|
||
self.update_pixel_to_meter_ratio()
|
||
|
||
# История состояний для фильтрации
|
||
self.state_history: List[DroneState] = []
|
||
self.max_history_size = 10
|
||
|
||
# Параметры фильтрации
|
||
self.velocity_filter_alpha = 0.3 # Коэффициент сглаживания скорости
|
||
self.position_filter_alpha = 0.7 # Коэффициент сглаживания позиции
|
||
|
||
# Инициализация детекторов
|
||
self.orb_detector = cv2.ORB_create(
|
||
nfeatures=2000,
|
||
scaleFactor=1.2,
|
||
nlevels=8,
|
||
edgeThreshold=31,
|
||
firstLevel=0,
|
||
WTA_K=2,
|
||
patchSize=31,
|
||
fastThreshold=20
|
||
)
|
||
|
||
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
|
||
|
||
# Предыдущее изображение
|
||
self.prev_image: Optional[np.ndarray] = None
|
||
self.prev_timestamp: float = 0.0
|
||
|
||
# Счетчики
|
||
self.frame_count = 0
|
||
self.successful_tracking_frames = 0
|
||
|
||
# Статистика
|
||
self.tracking_stats = {
|
||
'total_frames': 0,
|
||
'successful_frames': 0,
|
||
'failed_frames': 0,
|
||
'avg_processing_time': 0.0
|
||
}
|
||
|
||
def update_pixel_to_meter_ratio(self):
|
||
"""Обновляет коэффициент перевода пикселей в метры на основе высоты и FOV камеры"""
|
||
# Вычисляем размер пикселя на земле при текущей высоте
|
||
ground_distance = 2 * self.state.altitude * math.tan(self.camera_fov / 2)
|
||
self.pixel_to_meter_ratio = ground_distance / self.image_width
|
||
|
||
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[List], Optional[List], Optional[List]]:
|
||
"""Обнаруживает и сопоставляет ключевые точки между двумя изображениями"""
|
||
# Обнаружение ключевых точек и дескрипторов
|
||
kp1, des1 = self.orb_detector.detectAndCompute(img1, None)
|
||
kp2, des2 = self.orb_detector.detectAndCompute(img2, None)
|
||
|
||
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
|
||
return None, None, None, None, None
|
||
|
||
# Сопоставление ключевых точек
|
||
matches = self.bf_matcher.match(des1, des2)
|
||
matches = sorted(matches, key=lambda x: x.distance)
|
||
|
||
# Фильтрация хороших совпадений
|
||
good_matches = []
|
||
for match in matches:
|
||
if match.distance < 50: # Порог расстояния
|
||
good_matches.append(match)
|
||
|
||
if len(good_matches) < 8: # Увеличиваем минимальное количество совпадений
|
||
return None, None, None, None, None
|
||
|
||
# Извлечение координат сопоставленных точек
|
||
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
|
||
|
||
return src_pts, dst_pts, good_matches, kp1, kp2
|
||
|
||
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Optional[Dict]:
|
||
"""Оценивает матрицу трансформации с улучшенной обработкой ошибок"""
|
||
try:
|
||
# Используем RANSAC для оценки матрицы гомографии
|
||
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0)
|
||
|
||
if H is None:
|
||
return None
|
||
|
||
# Проверяем качество матрицы
|
||
if not self.is_valid_homography(H):
|
||
return None
|
||
|
||
# Извлекаем параметры трансформации
|
||
a11, a12 = H[0, 0], H[0, 1]
|
||
a21, a22 = H[1, 0], H[1, 1]
|
||
tx, ty = H[0, 2], H[1, 2]
|
||
|
||
# Вычисляем угол поворота
|
||
angle = np.arctan2(a21, a11)
|
||
|
||
# Вычисляем масштаб
|
||
scale_x = np.sqrt(a11**2 + a21**2)
|
||
scale_y = np.sqrt(a12**2 + a22**2)
|
||
scale = (scale_x + scale_y) / 2
|
||
|
||
# Проверяем разумность масштаба
|
||
if scale < 0.5 or scale > 2.0:
|
||
return None
|
||
|
||
return {
|
||
'translation': (tx, ty),
|
||
'rotation': angle,
|
||
'scale': scale,
|
||
'homography': H,
|
||
'mask': mask,
|
||
'inliers_ratio': np.sum(mask) / len(mask) if mask is not None else 0.0
|
||
}
|
||
except Exception as e:
|
||
print(f"Ошибка при оценке трансформации: {e}")
|
||
return None
|
||
|
||
def is_valid_homography(self, H: np.ndarray) -> bool:
|
||
"""Проверяет валидность матрицы гомографии"""
|
||
if H is None:
|
||
return False
|
||
|
||
# Проверяем, что матрица не вырождена
|
||
det = np.linalg.det(H)
|
||
if abs(det) < 1e-6:
|
||
return False
|
||
|
||
# Проверяем, что элементы матрицы разумны
|
||
if np.any(np.abs(H) > 1000):
|
||
return False
|
||
|
||
return True
|
||
|
||
def update_drone_position(self, transformation_info: Dict, dt: float):
|
||
"""Обновляет позицию и состояние БПЛА с учетом временного интервала"""
|
||
tx, ty = transformation_info['translation']
|
||
rotation = transformation_info['rotation']
|
||
scale = transformation_info['scale']
|
||
inliers_ratio = transformation_info.get('inliers_ratio', 1.0)
|
||
|
||
# Конвертируем смещение в пикселях в метры
|
||
dx_meters = tx * self.pixel_to_meter_ratio
|
||
dy_meters = ty * self.pixel_to_meter_ratio
|
||
|
||
# Применяем поворот к смещению (учитываем текущий угол БПЛА)
|
||
cos_angle = math.cos(self.state.angle)
|
||
sin_angle = math.sin(self.state.angle)
|
||
|
||
# Поворачиваем смещение в глобальные координаты
|
||
dx_global = dx_meters * cos_angle - dy_meters * sin_angle
|
||
dy_global = dx_meters * sin_angle + dy_meters * cos_angle
|
||
|
||
# Вычисляем скорости
|
||
velocity_x = dx_global / dt if dt > 0 else 0.0
|
||
velocity_y = dy_global / dt if dt > 0 else 0.0
|
||
angular_velocity = rotation / dt if dt > 0 else 0.0
|
||
|
||
# Применяем фильтрацию
|
||
self.state.velocity_x = (self.velocity_filter_alpha * velocity_x +
|
||
(1 - self.velocity_filter_alpha) * self.state.velocity_x)
|
||
self.state.velocity_y = (self.velocity_filter_alpha * velocity_y +
|
||
(1 - self.velocity_filter_alpha) * self.state.velocity_y)
|
||
self.state.angular_velocity = (self.velocity_filter_alpha * angular_velocity +
|
||
(1 - self.velocity_filter_alpha) * self.state.angular_velocity)
|
||
|
||
# Обновляем позицию с фильтрацией
|
||
new_x = self.state.x + dx_global
|
||
new_y = self.state.y + dy_global
|
||
new_angle = self.state.angle + rotation
|
||
|
||
self.state.x = (self.position_filter_alpha * new_x +
|
||
(1 - self.position_filter_alpha) * self.state.x)
|
||
self.state.y = (self.position_filter_alpha * new_y +
|
||
(1 - self.position_filter_alpha) * self.state.y)
|
||
self.state.angle = (self.position_filter_alpha * new_angle +
|
||
(1 - self.position_filter_alpha) * self.state.angle)
|
||
|
||
# Нормализуем угол
|
||
self.state.angle = math.atan2(math.sin(self.state.angle), math.cos(self.state.angle))
|
||
|
||
# Обновляем временную метку и уверенность
|
||
self.state.timestamp = time.time()
|
||
self.state.confidence = min(1.0, inliers_ratio * 1.5) # Увеличиваем уверенность при хороших совпадениях
|
||
|
||
def handle(self, image: Image.Image) -> bool:
|
||
"""Обрабатывает новый кадр и обновляет состояние БПЛА"""
|
||
start_time = time.time()
|
||
self.frame_count += 1
|
||
self.tracking_stats['total_frames'] += 1
|
||
|
||
# Конвертируем изображение
|
||
current_image = np.array(image)
|
||
current_timestamp = time.time()
|
||
|
||
if self.prev_image is None:
|
||
self.prev_image = current_image
|
||
self.prev_timestamp = current_timestamp
|
||
return False
|
||
|
||
# Вычисляем временной интервал
|
||
dt = current_timestamp - self.prev_timestamp
|
||
|
||
# Обнаруживаем и сопоставляем ключевые точки
|
||
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
|
||
|
||
if src_pts is not None and dst_pts is not None:
|
||
# Оцениваем матрицу трансформации
|
||
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
|
||
|
||
if transformation_info:
|
||
# Обновляем позицию БПЛА
|
||
self.update_drone_position(transformation_info, dt)
|
||
|
||
# Добавляем состояние в историю
|
||
self.add_state_to_history()
|
||
|
||
# Обновляем статистику
|
||
self.successful_tracking_frames += 1
|
||
self.tracking_stats['successful_frames'] += 1
|
||
|
||
# Выводим информацию
|
||
self.print_tracking_info(transformation_info)
|
||
|
||
# Визуализация
|
||
self.visualize_tracking(current_image, kp1, kp2, matches, transformation_info)
|
||
|
||
success = True
|
||
else:
|
||
self.tracking_stats['failed_frames'] += 1
|
||
success = False
|
||
else:
|
||
self.tracking_stats['failed_frames'] += 1
|
||
success = False
|
||
|
||
# Обновляем предыдущее изображение
|
||
self.prev_image = current_image
|
||
self.prev_timestamp = current_timestamp
|
||
|
||
# Обновляем статистику времени обработки
|
||
processing_time = time.time() - start_time
|
||
self.tracking_stats['avg_processing_time'] = (
|
||
(self.tracking_stats['avg_processing_time'] * (self.frame_count - 1) + processing_time) / self.frame_count
|
||
)
|
||
|
||
return success
|
||
|
||
def add_state_to_history(self):
|
||
"""Добавляет текущее состояние в историю"""
|
||
# Создаем копию текущего состояния
|
||
state_copy = DroneState(
|
||
x=self.state.x,
|
||
y=self.state.y,
|
||
altitude=self.state.altitude,
|
||
angle=self.state.angle,
|
||
velocity_x=self.state.velocity_x,
|
||
velocity_y=self.state.velocity_y,
|
||
angular_velocity=self.state.angular_velocity,
|
||
timestamp=self.state.timestamp,
|
||
confidence=self.state.confidence
|
||
)
|
||
|
||
self.state_history.append(state_copy)
|
||
|
||
# Ограничиваем размер истории
|
||
if len(self.state_history) > self.max_history_size:
|
||
self.state_history.pop(0)
|
||
|
||
def get_drone_state(self) -> Dict:
|
||
"""Возвращает текущее состояние БПЛА в виде словаря"""
|
||
return {
|
||
'x': self.state.x,
|
||
'y': self.state.y,
|
||
'altitude': self.state.altitude,
|
||
'angle': self.state.angle,
|
||
'angle_degrees': math.degrees(self.state.angle),
|
||
'velocity_x': self.state.velocity_x,
|
||
'velocity_y': self.state.velocity_y,
|
||
'angular_velocity': self.state.angular_velocity,
|
||
'confidence': self.state.confidence,
|
||
'frame_count': self.frame_count,
|
||
'timestamp': self.state.timestamp
|
||
}
|
||
|
||
def get_tracking_stats(self) -> Dict:
|
||
"""Возвращает статистику отслеживания"""
|
||
success_rate = (self.tracking_stats['successful_frames'] /
|
||
max(1, self.tracking_stats['total_frames'])) * 100
|
||
|
||
return {
|
||
**self.tracking_stats,
|
||
'success_rate_percent': success_rate,
|
||
'current_altitude': self.state.altitude,
|
||
'pixel_to_meter_ratio': self.pixel_to_meter_ratio
|
||
}
|
||
|
||
def set_altitude(self, altitude: float):
|
||
"""Устанавливает высоту БПЛА и пересчитывает масштаб"""
|
||
self.state.altitude = altitude
|
||
self.update_pixel_to_meter_ratio()
|
||
|
||
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
|
||
"""Сбрасывает позицию и угол БПЛА"""
|
||
self.state.x = x
|
||
self.state.y = y
|
||
self.state.angle = angle
|
||
self.state.velocity_x = 0.0
|
||
self.state.velocity_y = 0.0
|
||
self.state.angular_velocity = 0.0
|
||
self.state_history.clear()
|
||
self.frame_count = 0
|
||
self.successful_tracking_frames = 0
|
||
|
||
def print_tracking_info(self, transformation_info: Dict):
|
||
"""Выводит информацию об отслеживании"""
|
||
state = self.get_drone_state()
|
||
stats = self.get_tracking_stats()
|
||
|
||
print(f"Frame {self.frame_count}:")
|
||
print(f" Position: ({state['x']:.3f}, {state['y']:.3f}) m")
|
||
print(f" Angle: {state['angle_degrees']:.1f}°")
|
||
print(f" Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s")
|
||
print(f" Confidence: {state['confidence']:.2f}")
|
||
print(f" Success Rate: {stats['success_rate_percent']:.1f}%")
|
||
|
||
def visualize_tracking(self, current_image: np.ndarray, kp1, kp2, matches, transformation_info: Dict):
|
||
"""Визуализирует процесс отслеживания"""
|
||
if kp1 is None or kp2 is None or matches is None:
|
||
return
|
||
|
||
# Рисуем сопоставления
|
||
img_matches = cv2.drawMatches(self.prev_image, kp1, current_image, kp2, matches, None,
|
||
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
|
||
|
||
# Добавляем информацию о состоянии
|
||
state = self.get_drone_state()
|
||
stats = self.get_tracking_stats()
|
||
|
||
info_lines = [
|
||
f"Position: ({state['x']:.2f}, {state['y']:.2f}) m",
|
||
f"Angle: {state['angle_degrees']:.1f}°",
|
||
f"Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s",
|
||
f"Altitude: {state['altitude']:.1f} m",
|
||
f"Confidence: {state['confidence']:.2f}",
|
||
f"Success Rate: {stats['success_rate_percent']:.1f}%"
|
||
]
|
||
|
||
for i, line in enumerate(info_lines):
|
||
cv2.putText(img_matches, line, (10, 30 + i * 25),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
|
||
|
||
cv2.imshow('Advanced Drone Tracking', img_matches)
|
||
cv2.waitKey(1)
|
||
|
||
def act(self) -> float:
|
||
"""Возвращает угол поворота для управления дроном"""
|
||
return self.state.angle
|
||
|
||
def get_trajectory_history(self) -> Tuple[List[float], List[float], List[float]]:
|
||
"""Возвращает историю траектории"""
|
||
x_history = [state.x for state in self.state_history]
|
||
y_history = [state.y for state in self.state_history]
|
||
angle_history = [math.degrees(state.angle) for state in self.state_history]
|
||
|
||
return x_history, y_history, angle_history |