feat: add fps/landmark debugging

This commit is contained in:
2026-01-12 15:47:03 +03:00
parent 7cd700c1fa
commit 64c9215f5b
5 changed files with 109 additions and 19 deletions

View File

@@ -3,6 +3,7 @@ from pathlib import Path
import math import math
import random import random
import constants
import cv2 import cv2
import numpy as np import numpy as np
from PIL import Image from PIL import Image
@@ -149,18 +150,30 @@ class AutoPilot(Pilot):
} }
def get_position_by_chunk(self) -> Position | None: def get_position_by_chunk(self) -> Position | None:
landmark_timer = Timer()
landmark_timer.start()
cur_pos = np.array([self.pos.x, self.pos.y]) cur_pos = np.array([self.pos.x, self.pos.y])
closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin() closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin()
current_chunk = self.prev_chunk current_chunk = self.prev_chunk
landmark_chunk = self.chunks[closest_chunk_idx] landmark_chunk = self.chunks[closest_chunk_idx]
if constants.DEBUG_FPS:
print(f"[LANDMARK]: Closest chunk finding: {landmark_timer.loop() * 1000:.2f} ms")
# Краевой случай: отсутствие чанков # Краевой случай: отсутствие чанков
if current_chunk is None or landmark_chunk is None: if current_chunk is None or landmark_chunk is None:
return None return None
landmark_timer.start()
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk) src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if constants.DEBUG_FPS:
print(f"[LANDMARK]: detect and match keypoints: {landmark_timer.loop() * 1000:.2f} ms")
landmark_timer.stop()
# Визуализация (если нужна) # Визуализация (если нужна)
if src_pts is not None and dst_pts is not None and self.vis_manager: if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled was_enabled = self.timer.enabled
@@ -174,6 +187,7 @@ class AutoPilot(Pilot):
if was_enabled: if was_enabled:
self.timer.start() self.timer.start()
landmark_timer.start()
# Краевой случай: нет точек или недостаточно матчей # Краевой случай: нет точек или недостаточно матчей
if src_pts is None or dst_pts is None: if src_pts is None or dst_pts is None:
return None return None
@@ -183,8 +197,12 @@ class AutoPilot(Pilot):
return None return None
# Оценка матрицы гомографии # Оценка матрицы гомографии
landmark_timer.loop()
landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts) landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts)
num_inliers = int(np.sum(mask)) num_inliers = int(np.sum(mask))
if constants.DEBUG_FPS:
print(f"[LANDMARK]: matrix estimation: {landmark_timer.loop() * 1000:.2f} ms")
# Краевой случай: матрица не найдена # Краевой случай: матрица не найдена
if landmark_transform is None or mask is None: if landmark_transform is None or mask is None:
@@ -199,7 +217,11 @@ class AutoPilot(Pilot):
# 2. Процент инлайеров от общего числа матчей # 2. Процент инлайеров от общего числа матчей
inlier_ratio = num_inliers / num_matches inlier_ratio = num_inliers / num_matches
MIN_INLIER_RATIO = 0.25 # Минимум 25% инлайеров
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: inlier_ratio=", inlier_ratio)
MIN_INLIER_RATIO = 0.6
if inlier_ratio < MIN_INLIER_RATIO: if inlier_ratio < MIN_INLIER_RATIO:
return None return None
@@ -225,7 +247,11 @@ class AutoPilot(Pilot):
reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2)) reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2))
mean_error = np.mean(reprojection_errors) mean_error = np.mean(reprojection_errors)
MAX_MEAN_REPROJECTION_ERROR = 1.0 # пиксели MAX_MEAN_REPROJECTION_ERROR = 1.1 # пиксели
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: Mean_error=", mean_error)
if mean_error > MAX_MEAN_REPROJECTION_ERROR: if mean_error > MAX_MEAN_REPROJECTION_ERROR:
return None return None
@@ -234,7 +260,11 @@ class AutoPilot(Pilot):
return None return None
# === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ === # === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ ===
print("[INFO]: Landmark Chunk Correction Applied") print("[LANDMARK]: Correction Applied")
if constants.DEBUG_FPS:
print(f"[LANDMARK]: time: {landmark_timer.get_elapsed() * 1000:.2f} ms")
return landmark_chunk.pos.apply(landmark_transform) return landmark_chunk.pos.apply(landmark_transform)
@@ -250,8 +280,8 @@ class AutoPilot(Pilot):
# Вычисляем оптический поток для покадрового сравнения # Вычисляем оптический поток для покадрового сравнения
matching_timer = Timer() matching_timer = Timer()
matching_timer.start() matching_timer.start()
# src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk) src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk) # src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
matching_timer.stop() matching_timer.stop()
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms") print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
@@ -279,17 +309,14 @@ class AutoPilot(Pilot):
self.timer.start() self.timer.start()
chunk_timer = Timer()
chunk_timer.start()
# Пытаемся найти ориентир на картинке: # Пытаемся найти ориентир на картинке:
self.prev_chunk = current_chunk self.prev_chunk = current_chunk
pos_by_chunk = self.get_position_by_chunk() # Для улучшения среднего FPS
if pos_by_chunk is not None: if self.frame_count % 5 == 0:
self.pos = pos_by_chunk pos_by_chunk = self.get_position_by_chunk()
if pos_by_chunk is not None:
self.pos = pos_by_chunk
chunk_timer.stop()
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms")
command = self.make_command() command = self.make_command()
self.timer.reset() self.timer.reset()
return command return command

View File

@@ -30,3 +30,6 @@ K = np.array([
[0, _K_FOCUS_DISTANCE, _K_CENTER], [0, _K_FOCUS_DISTANCE, _K_CENTER],
[0, 0, 1] [0, 0, 1]
]) ])
DEBUG_FPS: bool = False
DEBUG_LANDMARK: bool = False

19
main.py
View File

@@ -199,7 +199,7 @@ def run(name: str, map_name: str, ref_min_distance: float):
vis_manager.update_display() vis_manager.update_display()
vis_manager.pause(0.2) vis_manager.pause(0.2)
last_proc_times = proc_time[-10:] last_proc_times = proc_time[-30:]
print(F"\nImage #{i}") print(F"\nImage #{i}")
print("Average FPS:", 1 / last_proc_times.mean()) print("Average FPS:", 1 / last_proc_times.mean())
print("Pilot coords:", pilot.pos) print("Pilot coords:", pilot.pos)
@@ -278,6 +278,20 @@ def parse_args():
help='Минимальное расстояние между эталонами' help='Минимальное расстояние между эталонами'
) )
# Место проведения симуляции
parser.add_argument(
'--debug-fps',
action='store_true',
help='Включить отладку FPS'
)
# Место проведения симуляции
parser.add_argument(
'--debug-landmark',
action='store_true',
help='Включить отладку эталонов'
)
# Парсим аргументы # Парсим аргументы
args = parser.parse_args() args = parser.parse_args()
@@ -298,6 +312,9 @@ if __name__ == "__main__":
lon: float = args.lon lon: float = args.lon
rmd: float = args.ref_min_distance rmd: float = args.ref_min_distance
constants.DEBUG_FPS = args.debug_fps
constants.DEBUG_LANDMARK = args.debug_landmark
if mode == 'build' or mode == 'standalone': if mode == 'build' or mode == 'standalone':
build(name, ref, lat, lon) build(name, ref, lat, lon)

View File

@@ -29,3 +29,9 @@ class Timer:
self.elapsed = 0. self.elapsed = 0.
self.enabled = False self.enabled = False
self.last_enabled = 0. self.last_enabled = 0.
def loop(self) -> float:
v = self.get_diff()
self.stop()
self.start()
return v

View File

@@ -1,3 +1,4 @@
import constants
import cv2 import cv2
import json import json
import numpy as np import numpy as np
@@ -5,10 +6,11 @@ from PIL import Image
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from position import Position from position import Position
from timer import Timer
from typing import Literal, Optional, Tuple from typing import Literal, Optional, Tuple
FeatureMethod = Literal["orb", "sift", "akaze", "brisk"] FeatureMethod = Literal["orb", "sift", "akaze", "brisk"]
DEFAULT_METHOD = "orb" DEFAULT_METHOD = "brisk"
@dataclass @dataclass
class VisionChunk: class VisionChunk:
@@ -27,14 +29,14 @@ class VisionChunk:
if self.feature_method == "orb": if self.feature_method == "orb":
self._detector = cv2.ORB_create( self._detector = cv2.ORB_create(
nfeatures=10000, nfeatures=1000,
scaleFactor=1.1, scaleFactor=1.2,
nlevels=32, nlevels=16,
edgeThreshold=31, edgeThreshold=31,
firstLevel=0, firstLevel=0,
WTA_K=2, WTA_K=2,
patchSize=31, patchSize=31,
fastThreshold=20, fastThreshold=10,
) )
elif self.feature_method == "sift": elif self.feature_method == "sift":
self._detector = cv2.SIFT_create( self._detector = cv2.SIFT_create(
@@ -95,17 +97,32 @@ class VisionChunk:
if self.keypoints is not None and self.descriptors is not None and not force: if self.keypoints is not None and self.descriptors is not None and not force:
return self.keypoints, self.descriptors return self.keypoints, self.descriptors
timer = Timer()
timer.start()
detector = self._get_detector() detector = self._get_detector()
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: get_detector: {timer.loop() * 1000:.2f} ms")
# PIL -> OpenCV (RGB->BGR) # PIL -> OpenCV (RGB->BGR)
img_np = np.array(self.image) img_np = np.array(self.image)
if img_np.ndim == 3 and img_np.shape[2] == 3: if img_np.ndim == 3 and img_np.shape[2] == 3:
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: converting: {timer.loop() * 1000:.2f} ms")
# CLAHE предобработка # CLAHE предобработка
preprocessed = self._preprocess(img_np) preprocessed = self._preprocess(img_np)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: preprocess: {timer.loop() * 1000:.2f} ms")
keypoints, descriptors = detector.detectAndCompute(preprocessed, None) keypoints, descriptors = detector.detectAndCompute(preprocessed, None)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: detect and compute: {timer.loop() * 1000:.2f} ms")
# Получаем массив response для всех точек # Получаем массив response для всех точек
responses = np.array([kp.response for kp in keypoints]) responses = np.array([kp.response for kp in keypoints])
@@ -116,6 +133,9 @@ class VisionChunk:
best_keypoints = [keypoints[i] for i in top_indices] best_keypoints = [keypoints[i] for i in top_indices]
best_descriptors = descriptors[top_indices] best_descriptors = descriptors[top_indices]
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: filtration: {timer.loop() * 1000:.2f} ms")
self.keypoints = best_keypoints self.keypoints = best_keypoints
self.descriptors = best_descriptors self.descriptors = best_descriptors
return self.keypoints, self.descriptors return self.keypoints, self.descriptors
@@ -134,15 +154,29 @@ class VisionChunk:
Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты) Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты)
""" """
# Вычисляем keypoints для обоих # Вычисляем keypoints для обоих
timer = Timer()
timer.start()
kp1, des1 = self.compute_keypoints() kp1, des1 = self.compute_keypoints()
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: computing 1: {timer.loop() * 1000:.2f} ms")
kp2, des2 = other.compute_keypoints() kp2, des2 = other.compute_keypoints()
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: computing 2: {timer.loop() * 1000:.2f} ms")
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
return None, None, None, None, None return None, None, None, None, None
# kNN matching + Lowe ratio test # kNN matching + Lowe ratio test
matcher = self._get_matcher() matcher = self._get_matcher()
matches_knn = matcher.knnMatch(des1, des2, k=2) matches_knn = matcher.knnMatch(des1, des2, k=2)
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: matching: {timer.loop() * 1000:.2f} ms")
good_matches: list[cv2.DMatch] = [] good_matches: list[cv2.DMatch] = []
for m_n in matches_knn: for m_n in matches_knn:
@@ -172,6 +206,9 @@ class VisionChunk:
src_pts = np.float32(src_pts).reshape(-1, 1, 2) src_pts = np.float32(src_pts).reshape(-1, 1, 2)
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: filtration: {timer.loop() * 1000:.2f} ms")
return src_pts, dst_pts, good_matches, kp1, kp2 return src_pts, dst_pts, good_matches, kp1, kp2
def to_cv2_gray(self) -> np.ndarray: def to_cv2_gray(self) -> np.ndarray: