ref: create vision_chunk

This commit is contained in:
2025-12-21 20:13:51 +03:00
parent 92b6b0b0fe
commit 9d4839aa2a
6 changed files with 221 additions and 152 deletions

View File

@@ -9,6 +9,7 @@ from PIL import Image
from timer import Timer
from vision_chunk import VisionChunk
from visualization import VisualizationManager
from geolocation import Geolocation
@@ -42,28 +43,23 @@ class AutoPilot(Pilot):
# Состояние одометрии
geo: Geolocation
# Ориентиры
points: list[tuple[float,float]] # Координаты
keypoints: list[any] # Ключевые точки ориентиров
target_idx: int # Текущая целевая метка
chunks: list[VisionChunk] # Ориентиры
target_idx: int # Текущий ориентир
# Всякие вспомогательные штуки
timer: Timer
prev_image: np.ndarray | None
orb_detector: cv2.ORB
bf_matcher: cv2.BFMatcher
prev_chunk: VisionChunk | None
frame_count: int
image_center: tuple # Центр изображения (x, y)
vis_manager: VisualizationManager # Менеджер визуализации (опционально)
# Положение на основе ориентира
reserved_geo: Geolocation | None
def __init__(self, points = [], keypoints = [], viz_manager=None):
self.prev_image = None
def __init__(self, points = [], chunks = [], viz_manager=None):
self.prev_chunk = None
self.geo = Geolocation(0, 0, 1, 0)
self.chunks = chunks
self.frame_count = 0
self.image_center = (0, 0) # Будет обновлено при получении первого изображения
self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_geo = None
@@ -74,22 +70,7 @@ class AutoPilot(Pilot):
self.min_scale: float = 0.7
self.max_scale: float = 1.4
# Инициализация ORB детектора
self.orb_detector = cv2.ORB_create(
nfeatures=1000,
scaleFactor=1.3,
nlevels=4,
edgeThreshold=19,
patchSize=31,
fastThreshold=15,
scoreType=cv2.ORB_FAST_SCORE # FAST обычно быстрее
)
# Инициализация матчера для сопоставления ключевых точек (kNN + Lowe ratio)
self.bf_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
self.points = points
self.keypoints = keypoints
self.target_idx = 0
self.timer = Timer()
@@ -97,74 +78,6 @@ class AutoPilot(Pilot):
def get_position(self) -> tuple[float, float]:
return self.geo.x, self.geo.y
def image_to_numpy(self, image: Image.Image) -> np.ndarray:
"""Конвертирует PIL Image в numpy array для OpenCV"""
return np.array(image)
def detect_and_match_keypoints(self, img1: np.ndarray, img2: np.ndarray):
"""
Обнаруживает ключевые точки ORB и сопоставляет их между двумя изображениями
"""
# Предобработка (повышение контраста и устойчивости)
def preprocess(gray_img: np.ndarray) -> np.ndarray:
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
return clahe.apply(gray_img)
# В градации серого
g1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) if len(img1.shape) == 3 else img1
g2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if len(img2.shape) == 3 else img2
g1 = preprocess(g1)
g2 = preprocess(g2)
# Обнаружение ключевых точек и дескрипторов
kp1, des1 = self.orb_detector.detectAndCompute(g1, None)
kp2, des2 = self.orb_detector.detectAndCompute(g2, None)
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
return None, None, None, None, None
# kNN сопоставление и тест Лоу
matches_knn = self.bf_matcher.knnMatch(des1, des2, k=2)
good_matches: list[cv2.DMatch] = []
for m_n in matches_knn:
if len(m_n) < 2:
continue
m, n = m_n
if m.distance < 0.75 * n.distance:
good_matches.append(m)
# Дополнительная фильтрация по расстоянию (мягкий порог)
good_matches = sorted(good_matches, key=lambda x: x.distance)
good_matches = [m for m in good_matches if m.distance < 64]
if len(good_matches) < 4:
return None, None, None, None, None
# Получаем центр изображения
height1, width1 = g1.shape[:2]
height2, width2 = g2.shape[:2]
center_x1, center_y1 = width1 // 2, height1 // 2
center_x2, center_y2 = width2 // 2, height2 // 2
# Извлекаем координаты сопоставленных точек и отцентрируем их
src_pts = []
dst_pts = []
for match in good_matches:
# Координаты точки в первом изображении
pt1 = kp1[match.queryIdx].pt
src_pts.append([pt1[0] - center_x1, center_y1 - pt1[1]])
# Координаты точки во втором изображении
pt2 = kp2[match.trainIdx].pt
dst_pts.append([pt2[0] - center_x2, center_y2 - pt2[1]])
# Конвертируем в numpy массивы
src_pts = np.float32(src_pts).reshape(-1, 1, 2)
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
return src_pts, dst_pts, good_matches, kp1, kp2
def estimate_transformation_matrix(self, src_pts: np.ndarray, dst_pts: np.ndarray):
"""
Оценивает матрицу трансформации на основе сопоставленных ключевых точек
@@ -251,18 +164,16 @@ class AutoPilot(Pilot):
'frame_count': self.frame_count
}
def get_position_by_chunk(self) -> Geolocation | None:
# Пытаемся найти ориентир на картинке:
current_image = self.prev_image
landmark_image = cv2.imread(Path('chunks') / f'chunk_{self.target_idx}.png', cv2.IMREAD_COLOR_RGB)
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(landmark_image, current_image)
current_chunk = self.prev_chunk
landmark_chunk = self.chunks[self.target_idx]
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled
if was_enabled: self.timer.stop()
self.vis_manager.update_chunk_matches(landmark_image, current_image, kp1, kp2, matches)
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches)
if was_enabled: self.timer.start()
if src_pts is not None and dst_pts is not None:
@@ -293,26 +204,15 @@ class AutoPilot(Pilot):
return None
def handle(self, image: Image) -> PilotCommand:
def handle(self, current_chunk: VisionChunk) -> PilotCommand:
self.timer.start()
self.frame_count += 1
if self.prev_image is None:
self.prev_image = self.image_to_numpy(image)
# Вычисляем центр изображения
height, width = self.prev_image.shape[:2]
self.image_center = (width // 2, height // 2)
if self.prev_chunk is None:
self.prev_chunk = current_chunk
self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed())
# Конвертируем текущее изображение
current_image = self.image_to_numpy(image)
# Обновляем центр изображения
height, width = current_image.shape[:2]
self.image_center = (width // 2, height // 2)
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.geo.x) ** 2 +
@@ -320,7 +220,7 @@ class AutoPilot(Pilot):
)
# Обнаруживаем и сопоставляем ключевые точки
src_pts, dst_pts, matches, kp1, kp2 = self.detect_and_match_keypoints(self.prev_image, current_image)
src_pts, dst_pts, matches, kp1, kp2 = self.prev_chunk.detect_and_match_keypoints(current_chunk)
# Оцениваем смещение БПЛА
if src_pts is not None and dst_pts is not None:
@@ -334,7 +234,6 @@ class AutoPilot(Pilot):
self.timer.stop()
# Выводим текущее состояние БПЛА
drone_state = self.get_drone_state()
# print(f" [Pilot] Drone Position: ({drone_state['x']:.2f}, {drone_state['y']:.2f})")
# print(f" [Pilot] Angle: {drone_state['angle_degrees']:.1f}°")
# print(f" [Pilot] Target Index: {self.target_idx}")
@@ -346,14 +245,14 @@ class AutoPilot(Pilot):
# Обновляем сопоставление точек
self.vis_manager.update_matches(
self.prev_image,
current_image,
self.prev_chunk.to_numpy(),
current_chunk.to_numpy(),
kp1, kp2, matches,
transformation_info)
mask = transformation_info['mask']
self.vis_manager.update_motion_vectors(
current_image,
current_chunk.to_numpy(),
kp1, kp2,
np.array(matches)[mask.ravel().astype(bool)])
@@ -361,14 +260,14 @@ class AutoPilot(Pilot):
# Используем новую визуализацию движения точек по сетке
homography_matrix = transformation_info['homography']
self.vis_manager.update_homography_grid(
current_image,
current_chunk.to_numpy(),
homography_matrix,
grid_step=85)
self.timer.start()
# Пытаемся найти ориентир на картинке:
self.prev_image = current_image
self.prev_chunk = current_chunk
npos = self.get_position_by_chunk()
if npos is not None:
self.reserved_geo = npos

View File

51
main.py
View File

@@ -1,9 +1,11 @@
from simulator import Simulator
from visualization import VisualizationManager
from trajectory_drawer import TrajectoryDrawer
from yandex_map import YandexMap
from time import sleep
from pathlib import Path
from simulator import Simulator
from time import sleep
from trajectory_drawer import TrajectoryDrawer
from visualization import VisualizationManager
from yandex_map import YandexMap
from vision_chunk import VisionChunk
from utility import cv2_to_pil
import random
import cv2
@@ -37,16 +39,18 @@ def main():
# Для каждой точки сделаем приближенный снимок
yandexMap = YandexMap()
keypoints: list[(any, any)] = []
chunks: list[VisionChunk] = []
plt.ion()
for i in range(len(points)):
point = points[i]
yandexMap.scroll(point[0], point[1], 5, True)
sleep(1)
img = yandexMap.make_screenshot(point[0], point[1], 0.2, 0.2)
cv2_img = yandexMap.make_screenshot(point[0], point[1], 0.2, 0.2)
img = cv2_to_pil(cv2_img)
chunk = VisionChunk(img)
Path('chunks').mkdir(exist_ok=True)
cv2.imwrite(Path('.') / 'chunks' / f'chunk_{i}.png', img)
chunk.save_image(Path('.') / 'chunks' / f'chunk_{i}.png')
plt.subplot(1, len(points), i+1)
plt.imshow(img)
plt.pause(0.25)
@@ -56,20 +60,9 @@ def main():
# Выделим на каждой картинке ключевые точки
for i in range(len(points)):
orb = cv2.ORB_create(
nfeatures=1000,
scaleFactor=1.2,
nlevels=8,
edgeThreshold=31,
firstLevel=0,
WTA_K=2,
patchSize=31,
fastThreshold=20
)
img = cv2.imread(Path('chunks') / f'chunk_{i}.png')
kp: list[cv2.KeyPoint]
kp, des = orb.detectAndCompute(img, None)
keypoints.append((kp, des))
chunk = VisionChunk.load_image(Path('chunks') / f'chunk_{i}.png')
chunks.append(chunk)
kp, des = chunk.compute_keypoints()
plt.subplot(1, len(points), i+1)
kp_coords = np.array([j.pt for j in kp])
@@ -93,12 +86,12 @@ def main():
(p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height
], points)))
points_coords *= 2 ** 4
pilot = autopilot.AutoPilot(points_coords, keypoints, vis_manager)
pilot = autopilot.AutoPilot(points_coords, chunks, vis_manager)
simulator = Simulator(yandexMap)
pilot.target_idx = 0
photo = simulator.get_photo()
command = pilot.handle(photo)
chunk = simulator.get_chunk()
command = pilot.handle(chunk)
vis_manager.update_display()
vis_manager.pause(1)
@@ -117,13 +110,13 @@ def main():
simulator.change_zoom(direction)
zoom_next_event = i + random.randint(20, 40)
photo = simulator.get_photo()
command = pilot.handle(photo)
chunk = simulator.get_chunk()
command = pilot.handle(chunk)
proc_time = np.append(proc_time, command.proccessing_time)
# Save Image
photo.save(Path('images') / f"photo_{i}.png")
chunk.save_image(Path('images') / f"photo_{i}.png")
vis_manager.update_display()
vis_manager.pause(0.2)
@@ -142,7 +135,7 @@ def main():
vis_manager.pause(0.2)
last_proc_times = proc_time[-10:]
# print("Average FPS:", 1 / last_proc_times.mean())
print("Average FPS:", 1 / last_proc_times.mean())
vis_manager.show_final()

View File

@@ -9,6 +9,7 @@ from selenium.webdriver.common.action_chains import ActionChains
from autopilot import Pilot
from geolocation import Geolocation
from vision_chunk import VisionChunk
from visualization import VisualizationManager, SimMode
from yandex_map import YandexMap
@@ -103,10 +104,10 @@ class Simulator:
else:
self.geo.z *= 2
def get_photo(self) -> Image.Image:
def get_chunk(self) -> VisionChunk:
png = self.yandexMap.driver.get_screenshot_as_png()
im = Image.open(BytesIO(png))
# Применяем поворот как будто съемка с дрона
rotated_im = self.rotate_image_like_drone(im, -self.geo.angle)
return rotated_im
return VisionChunk(rotated_im)

15
utility.py Normal file
View File

@@ -0,0 +1,15 @@
from PIL import Image
import cv2
import numpy as np
def cv2_to_pil(cv_image: np.ndarray) -> Image.Image:
"""
cv2.MatLike (BGR/RGB/Grayscale) → PIL.Image
"""
if len(cv_image.shape) == 3 and cv_image.shape[2] == 3:
cv_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
return Image.fromarray(cv_image)
def image_to_numpy(self, image: Image.Image) -> np.ndarray:
"""Конвертирует PIL Image в numpy array для OpenCV"""
return np.array(image)

161
vision_chunk.py Normal file
View File

@@ -0,0 +1,161 @@
import cv2
import json
import numpy as np
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Optional, Tuple
from PIL import Image
FeatureMethod = Literal["orb", "sift", "surf"]
@dataclass
class VisionChunk:
image: Image.Image
feature_method: FeatureMethod = "orb"
keypoints: Optional[list] = field(default=None, init=False)
descriptors: Optional[np.ndarray] = field(default=None, init=False)
_detector: Optional[cv2.Feature2D] = field(default=None, init=False, repr=False)
_matcher: Optional[cv2.DescriptorMatcher] = field(default=None, init=False, repr=False)
def _get_detector(self) -> cv2.Feature2D:
if self._detector is not None:
return self._detector
if self.feature_method == "orb":
self._detector = cv2.ORB_create(
nfeatures=1000,
scaleFactor=1.2,
nlevels=8,
edgeThreshold=31,
firstLevel=0,
WTA_K=2,
patchSize=31,
fastThreshold=20,
)
else:
raise ValueError(f"Unsupported feature method: {self.feature_method}")
return self._detector
def _get_matcher(self) -> cv2.DescriptorMatcher:
if self._matcher is None:
self._matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
return self._matcher
def _preprocess(self, img_np: np.ndarray) -> np.ndarray:
"""CLAHE предобработка для улучшения контраста"""
if len(img_np.shape) == 3:
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
else:
gray = img_np
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
return clahe.apply(gray)
def compute_keypoints(self, force: bool = False) -> Tuple[list, Optional[np.ndarray]]:
if self.keypoints is not None and self.descriptors is not None and not force:
return self.keypoints, self.descriptors
detector = self._get_detector()
# PIL -> OpenCV (RGB->BGR)
img_np = np.array(self.image)
if img_np.ndim == 3 and img_np.shape[2] == 3:
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
# CLAHE предобработка
preprocessed = self._preprocess(img_np)
kps, desc = detector.detectAndCompute(preprocessed, None)
self.keypoints = kps
self.descriptors = desc
return kps, desc
def detect_and_match_keypoints(
self,
other: "VisionChunk"
) -> Tuple[
Optional[np.ndarray],
Optional[np.ndarray],
Optional[list],
Optional[list],
Optional[list]
]:
"""
Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты)
"""
# Вычисляем keypoints для обоих
kp1, des1 = self.compute_keypoints()
kp2, des2 = other.compute_keypoints()
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
return None, None, None, None, None
# kNN matching + Lowe ratio test
matcher = self._get_matcher()
matches_knn = matcher.knnMatch(des1, des2, k=2)
good_matches: list[cv2.DMatch] = []
for m_n in matches_knn:
if len(m_n) < 2:
continue
m, n = m_n
if m.distance < 0.75 * n.distance:
good_matches.append(m)
# Фильтрация по расстоянию (мягкий порог 64)
good_matches = sorted(good_matches, key=lambda x: x.distance)
good_matches = [m for m in good_matches if m.distance < 64]
if len(good_matches) < 4:
return None, None, None, None, None
# Центр изображений
img1_cv = self.to_cv2_gray()
img2_cv = other.to_cv2_gray()
h1, w1 = img1_cv.shape
h2, w2 = img2_cv.shape
cx1, cy1 = w1 // 2, h1 // 2
cx2, cy2 = w2 // 2, h2 // 2
# Отцентрированные координаты (x_rel, y_rel)
src_pts = []
dst_pts = []
for match in good_matches:
pt1 = kp1[match.queryIdx].pt
src_pts.append([pt1[0] - cx1, cy1 - pt1[1]]) # Y вверх!
pt2 = kp2[match.trainIdx].pt
dst_pts.append([pt2[0] - cx2, cy2 - pt2[1]])
src_pts = np.float32(src_pts).reshape(-1, 1, 2)
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
return src_pts, dst_pts, good_matches, kp1, kp2
def to_cv2_gray(self) -> np.ndarray:
"""PIL -> OpenCV grayscale с предобработкой"""
img_np = np.array(self.image)
if img_np.ndim == 3:
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
else:
gray = img_np
return self._preprocess(img_np)
def get_shape(self) -> Tuple[int, int]:
return self.image.height, self.image.width
def save_image(self, path: Path | str, format: str = "PNG") -> None:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
self.image.save(path, format=format.upper())
def to_numpy(self) -> np.ndarray:
return np.array(self.image)
@classmethod
def load_image(cls, path: Path | str, feature_method: FeatureMethod = "orb") -> "VisionChunk":
path = Path(path)
image = Image.open(path)
return cls(image=image, feature_method=feature_method)