Files
autopilot/old/advanced_autopilot.py

418 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Расширенная система автопилота с улучшенным отслеживанием координат и угла БПЛА
"""
import random
import cv2
import numpy as np
from PIL import Image
import math
from typing import Optional, Tuple, Dict, List
from dataclasses import dataclass
import time
random.seed(1)
@dataclass
class DroneState:
"""Структура для хранения состояния БПЛА"""
x: float = 0.0
y: float = 0.0
altitude: float = 10.0
angle: float = 0.0
velocity_x: float = 0.0
velocity_y: float = 0.0
angular_velocity: float = 0.0
timestamp: float = 0.0
confidence: float = 1.0
class AdvancedAutoPilot:
"""
Расширенный автопилот с улучшенным отслеживанием координат и угла БПЛА
"""
def __init__(self,
initial_x: float = 0.0,
initial_y: float = 0.0,
initial_altitude: float = 10.0,
camera_fov: float = 60.0,
image_width: int = 640,
image_height: int = 480):
# Состояние БПЛА
self.state = DroneState(
x=initial_x,
y=initial_y,
altitude=initial_altitude,
timestamp=time.time()
)
# Параметры камеры
self.camera_fov = math.radians(camera_fov)
self.image_width = image_width
self.image_height = image_height
# Вычисляем коэффициент перевода пикселей в метры
self.update_pixel_to_meter_ratio()
# История состояний для фильтрации
self.state_history: List[DroneState] = []
self.max_history_size = 10
# Параметры фильтрации
self.velocity_filter_alpha = 0.3 # Коэффициент сглаживания скорости
self.position_filter_alpha = 0.7 # Коэффициент сглаживания позиции
# Инициализация детекторов
self.orb_detector = cv2.ORB_create(
nfeatures=2000,
scaleFactor=1.2,
nlevels=8,
edgeThreshold=31,
firstLevel=0,
WTA_K=2,
patchSize=31,
fastThreshold=20
)
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
# Предыдущее изображение
self.prev_image: Optional[np.ndarray] = None
self.prev_timestamp: float = 0.0
# Счетчики
self.frame_count = 0
self.successful_tracking_frames = 0
# Статистика
self.tracking_stats = {
'total_frames': 0,
'successful_frames': 0,
'failed_frames': 0,
'avg_processing_time': 0.0
}
def update_pixel_to_meter_ratio(self):
"""Обновляет коэффициент перевода пикселей в метры на основе высоты и FOV камеры"""
# Вычисляем размер пикселя на земле при текущей высоте
ground_distance = 2 * self.state.altitude * math.tan(self.camera_fov / 2)
self.pixel_to_meter_ratio = ground_distance / self.image_width
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[List], Optional[List], Optional[List]]:
"""Обнаруживает и сопоставляет ключевые точки между двумя изображениями"""
# Обнаружение ключевых точек и дескрипторов
kp1, des1 = self.orb_detector.detectAndCompute(img1, None)
kp2, des2 = self.orb_detector.detectAndCompute(img2, None)
if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
return None, None, None, None, None
# Сопоставление ключевых точек
matches = self.bf_matcher.match(des1, des2)
matches = sorted(matches, key=lambda x: x.distance)
# Фильтрация хороших совпадений
good_matches = []
for match in matches:
if match.distance < 50: # Порог расстояния
good_matches.append(match)
if len(good_matches) < 8: # Увеличиваем минимальное количество совпадений
return None, None, None, None, None
# Извлечение координат сопоставленных точек
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
return src_pts, dst_pts, good_matches, kp1, kp2
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray) -> Optional[Dict]:
"""Оценивает матрицу трансформации с улучшенной обработкой ошибок"""
try:
# Используем RANSAC для оценки матрицы гомографии
H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0)
if H is None:
return None
# Проверяем качество матрицы
if not self.is_valid_homography(H):
return None
# Извлекаем параметры трансформации
a11, a12 = H[0, 0], H[0, 1]
a21, a22 = H[1, 0], H[1, 1]
tx, ty = H[0, 2], H[1, 2]
# Вычисляем угол поворота
angle = np.arctan2(a21, a11)
# Вычисляем масштаб
scale_x = np.sqrt(a11**2 + a21**2)
scale_y = np.sqrt(a12**2 + a22**2)
scale = (scale_x + scale_y) / 2
# Проверяем разумность масштаба
if scale < 0.5 or scale > 2.0:
return None
return {
'translation': (tx, ty),
'rotation': angle,
'scale': scale,
'homography': H,
'mask': mask,
'inliers_ratio': np.sum(mask) / len(mask) if mask is not None else 0.0
}
except Exception as e:
print(f"Ошибка при оценке трансформации: {e}")
return None
def is_valid_homography(self, H: np.ndarray) -> bool:
"""Проверяет валидность матрицы гомографии"""
if H is None:
return False
# Проверяем, что матрица не вырождена
det = np.linalg.det(H)
if abs(det) < 1e-6:
return False
# Проверяем, что элементы матрицы разумны
if np.any(np.abs(H) > 1000):
return False
return True
def update_drone_position(self, transformation_info: Dict, dt: float):
"""Обновляет позицию и состояние БПЛА с учетом временного интервала"""
tx, ty = transformation_info['translation']
rotation = transformation_info['rotation']
scale = transformation_info['scale']
inliers_ratio = transformation_info.get('inliers_ratio', 1.0)
# Конвертируем смещение в пикселях в метры
dx_meters = tx * self.pixel_to_meter_ratio
dy_meters = ty * self.pixel_to_meter_ratio
# Применяем поворот к смещению (учитываем текущий угол БПЛА)
cos_angle = math.cos(self.state.angle)
sin_angle = math.sin(self.state.angle)
# Поворачиваем смещение в глобальные координаты
dx_global = dx_meters * cos_angle - dy_meters * sin_angle
dy_global = dx_meters * sin_angle + dy_meters * cos_angle
# Вычисляем скорости
velocity_x = dx_global / dt if dt > 0 else 0.0
velocity_y = dy_global / dt if dt > 0 else 0.0
angular_velocity = rotation / dt if dt > 0 else 0.0
# Применяем фильтрацию
self.state.velocity_x = (self.velocity_filter_alpha * velocity_x +
(1 - self.velocity_filter_alpha) * self.state.velocity_x)
self.state.velocity_y = (self.velocity_filter_alpha * velocity_y +
(1 - self.velocity_filter_alpha) * self.state.velocity_y)
self.state.angular_velocity = (self.velocity_filter_alpha * angular_velocity +
(1 - self.velocity_filter_alpha) * self.state.angular_velocity)
# Обновляем позицию с фильтрацией
new_x = self.state.x + dx_global
new_y = self.state.y + dy_global
new_angle = self.state.angle + rotation
self.state.x = (self.position_filter_alpha * new_x +
(1 - self.position_filter_alpha) * self.state.x)
self.state.y = (self.position_filter_alpha * new_y +
(1 - self.position_filter_alpha) * self.state.y)
self.state.angle = (self.position_filter_alpha * new_angle +
(1 - self.position_filter_alpha) * self.state.angle)
# Нормализуем угол
self.state.angle = math.atan2(math.sin(self.state.angle), math.cos(self.state.angle))
# Обновляем временную метку и уверенность
self.state.timestamp = time.time()
self.state.confidence = min(1.0, inliers_ratio * 1.5) # Увеличиваем уверенность при хороших совпадениях
def handle(self, image: Image.Image) -> bool:
"""Обрабатывает новый кадр и обновляет состояние БПЛА"""
start_time = time.time()
self.frame_count += 1
self.tracking_stats['total_frames'] += 1
# Конвертируем изображение
current_image = np.array(image)
current_timestamp = time.time()
if self.prev_image is None:
self.prev_image = current_image
self.prev_timestamp = current_timestamp
return False
# Вычисляем временной интервал
dt = current_timestamp - self.prev_timestamp
# Обнаруживаем и сопоставляем ключевые точки
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
if src_pts is not None and dst_pts is not None:
# Оцениваем матрицу трансформации
transformation_info = self.estimate_transformation_matrix(src_pts, dst_pts)
if transformation_info:
# Обновляем позицию БПЛА
self.update_drone_position(transformation_info, dt)
# Добавляем состояние в историю
self.add_state_to_history()
# Обновляем статистику
self.successful_tracking_frames += 1
self.tracking_stats['successful_frames'] += 1
# Выводим информацию
self.print_tracking_info(transformation_info)
# Визуализация
self.visualize_tracking(current_image, kp1, kp2, matches, transformation_info)
success = True
else:
self.tracking_stats['failed_frames'] += 1
success = False
else:
self.tracking_stats['failed_frames'] += 1
success = False
# Обновляем предыдущее изображение
self.prev_image = current_image
self.prev_timestamp = current_timestamp
# Обновляем статистику времени обработки
processing_time = time.time() - start_time
self.tracking_stats['avg_processing_time'] = (
(self.tracking_stats['avg_processing_time'] * (self.frame_count - 1) + processing_time) / self.frame_count
)
return success
def add_state_to_history(self):
"""Добавляет текущее состояние в историю"""
# Создаем копию текущего состояния
state_copy = DroneState(
x=self.state.x,
y=self.state.y,
altitude=self.state.altitude,
angle=self.state.angle,
velocity_x=self.state.velocity_x,
velocity_y=self.state.velocity_y,
angular_velocity=self.state.angular_velocity,
timestamp=self.state.timestamp,
confidence=self.state.confidence
)
self.state_history.append(state_copy)
# Ограничиваем размер истории
if len(self.state_history) > self.max_history_size:
self.state_history.pop(0)
def get_drone_state(self) -> Dict:
"""Возвращает текущее состояние БПЛА в виде словаря"""
return {
'x': self.state.x,
'y': self.state.y,
'altitude': self.state.altitude,
'angle': self.state.angle,
'angle_degrees': math.degrees(self.state.angle),
'velocity_x': self.state.velocity_x,
'velocity_y': self.state.velocity_y,
'angular_velocity': self.state.angular_velocity,
'confidence': self.state.confidence,
'frame_count': self.frame_count,
'timestamp': self.state.timestamp
}
def get_tracking_stats(self) -> Dict:
"""Возвращает статистику отслеживания"""
success_rate = (self.tracking_stats['successful_frames'] /
max(1, self.tracking_stats['total_frames'])) * 100
return {
**self.tracking_stats,
'success_rate_percent': success_rate,
'current_altitude': self.state.altitude,
'pixel_to_meter_ratio': self.pixel_to_meter_ratio
}
def set_altitude(self, altitude: float):
"""Устанавливает высоту БПЛА и пересчитывает масштаб"""
self.state.altitude = altitude
self.update_pixel_to_meter_ratio()
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):
"""Сбрасывает позицию и угол БПЛА"""
self.state.x = x
self.state.y = y
self.state.angle = angle
self.state.velocity_x = 0.0
self.state.velocity_y = 0.0
self.state.angular_velocity = 0.0
self.state_history.clear()
self.frame_count = 0
self.successful_tracking_frames = 0
def print_tracking_info(self, transformation_info: Dict):
"""Выводит информацию об отслеживании"""
state = self.get_drone_state()
stats = self.get_tracking_stats()
print(f"Frame {self.frame_count}:")
print(f" Position: ({state['x']:.3f}, {state['y']:.3f}) m")
print(f" Angle: {state['angle_degrees']:.1f}°")
print(f" Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s")
print(f" Confidence: {state['confidence']:.2f}")
print(f" Success Rate: {stats['success_rate_percent']:.1f}%")
def visualize_tracking(self, current_image: np.ndarray, kp1, kp2, matches, transformation_info: Dict):
"""Визуализирует процесс отслеживания"""
if kp1 is None or kp2 is None or matches is None:
return
# Рисуем сопоставления
img_matches = cv2.drawMatches(self.prev_image, kp1, current_image, kp2, matches, None,
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
# Добавляем информацию о состоянии
state = self.get_drone_state()
stats = self.get_tracking_stats()
info_lines = [
f"Position: ({state['x']:.2f}, {state['y']:.2f}) m",
f"Angle: {state['angle_degrees']:.1f}°",
f"Velocity: ({state['velocity_x']:.2f}, {state['velocity_y']:.2f}) m/s",
f"Altitude: {state['altitude']:.1f} m",
f"Confidence: {state['confidence']:.2f}",
f"Success Rate: {stats['success_rate_percent']:.1f}%"
]
for i, line in enumerate(info_lines):
cv2.putText(img_matches, line, (10, 30 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.imshow('Advanced Drone Tracking', img_matches)
cv2.waitKey(1)
def act(self) -> float:
"""Возвращает угол поворота для управления дроном"""
return self.state.angle
def get_trajectory_history(self) -> Tuple[List[float], List[float], List[float]]:
"""Возвращает историю траектории"""
x_history = [state.x for state in self.state_history]
y_history = [state.y for state in self.state_history]
angle_history = [math.degrees(state.angle) for state in self.state_history]
return x_history, y_history, angle_history