Compare commits

...

10 Commits

18 changed files with 1273 additions and 501 deletions

4
.gitignore vendored
View File

@@ -1,4 +1,6 @@
.venv .venv
__pycache__ __pycache__
*.png *.png
images images
trajectories
z

View File

@@ -3,6 +3,7 @@ from pathlib import Path
import math import math
import random import random
import constants
import cv2 import cv2
import numpy as np import numpy as np
from PIL import Image from PIL import Image
@@ -56,14 +57,16 @@ class AutoPilot(Pilot):
# Положение на основе ориентира # Положение на основе ориентира
reserved_pos: Position | None reserved_pos: Position | None
proccessing_time: float
def __init__(self, points = [], chunks = [], viz_manager=None): def __init__(self, points = [], chunks = [], viz_manager=None, pixel_ratio: float = 1.):
self.prev_chunk = None self.prev_chunk = None
self.pos = Position(0, 0, 1, 0, 0, 0) self.pos = Position(0, 0, 1, 0, 0, 0)
self.chunks = chunks self.chunks = chunks
self.frame_count = 0 self.frame_count = 0
self.vis_manager = viz_manager # Менеджер визуализации self.vis_manager = viz_manager # Менеджер визуализации
self.reserved_pos = None self.reserved_pos = None
self.pixel_ratio = pixel_ratio
# Пороговые значения качества сопоставления/гомографии # Пороговые значения качества сопоставления/гомографии
self.min_inliers: int = 12 self.min_inliers: int = 12
@@ -76,6 +79,7 @@ class AutoPilot(Pilot):
self.target_idx = 0 self.target_idx = 0
self.timer = Timer() self.timer = Timer()
self.chunk_points = np.array([[chunk.pos.x, chunk.pos.y] for chunk in self.chunks])
def get_position(self) -> tuple[float, float]: def get_position(self) -> tuple[float, float]:
return self.pos.x, self.pos.y return self.pos.x, self.pos.y
@@ -91,7 +95,7 @@ class AutoPilot(Pilot):
h, w = prev_gray.shape[:2] h, w = prev_gray.shape[:2]
# Создаем сетку точек для отслеживания (аналогично вашему step=20) # Создаем сетку точек для отслеживания (аналогично вашему step=20)
step = 35 step = 20
grid_points = [] grid_points = []
for y in range(step, h - step, step): for y in range(step, h - step, step):
for x in range(step, w - step, step): for x in range(step, w - step, step):
@@ -133,9 +137,6 @@ class AutoPilot(Pilot):
""" """
self.pos.iapply(homography_matrix) self.pos.iapply(homography_matrix)
if self.reserved_pos is not None:
self.reserved_pos.iapply(homography_matrix)
def get_drone_state(self) -> dict: def get_drone_state(self) -> dict:
""" """
Возвращает текущее состояние БПЛА Возвращает текущее состояние БПЛА
@@ -149,43 +150,122 @@ class AutoPilot(Pilot):
} }
def get_position_by_chunk(self) -> Position | None: def get_position_by_chunk(self) -> Position | None:
# Пытаемся найти ориентир на картинке: landmark_timer = Timer()
landmark_timer.start()
cur_pos = np.array([self.pos.x, self.pos.y])
closest_chunk_idx = ((self.chunk_points - cur_pos) ** 2).sum(1).argmin()
current_chunk = self.prev_chunk current_chunk = self.prev_chunk
landmark_chunk = self.chunks[self.target_idx] landmark_chunk = self.chunks[closest_chunk_idx]
if constants.DEBUG_FPS:
print(f"[LANDMARK]: Closest chunk finding: {landmark_timer.loop() * 1000:.2f} ms")
# Краевой случай: отсутствие чанков
if current_chunk is None or landmark_chunk is None:
return None
landmark_timer.start()
src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk) src_pts, dst_pts, matches, kp1, kp2 = landmark_chunk.detect_and_match_keypoints(current_chunk)
if constants.DEBUG_FPS:
print(f"[LANDMARK]: detect and match keypoints: {landmark_timer.loop() * 1000:.2f} ms")
landmark_timer.stop()
# Визуализация (если нужна)
if src_pts is not None and dst_pts is not None and self.vis_manager: if src_pts is not None and dst_pts is not None and self.vis_manager:
was_enabled = self.timer.enabled was_enabled = self.timer.enabled
if was_enabled: self.timer.stop() if was_enabled:
self.vis_manager.update_chunk_matches(landmark_chunk.to_numpy(), current_chunk.to_numpy(), kp1, kp2, matches) self.timer.stop()
if was_enabled: self.timer.start() self.vis_manager.update_chunk_matches(
landmark_chunk.to_numpy(),
current_chunk.to_numpy(),
kp1, kp2, matches
)
if was_enabled:
self.timer.start()
if src_pts is not None and dst_pts is not None: landmark_timer.start()
# Оцениваем матрицу трансформации # Краевой случай: нет точек или недостаточно матчей
landmark_transform = self.estimate_transformation_matrix(src_pts, dst_pts) if src_pts is None or dst_pts is None:
# Если ориентир достоверно найден — скорректируем глобальные координаты и угол return None
if landmark_transform is not None:
ok_scale = (self.min_scale <= landmark_transform['scale'] <= self.max_scale) num_matches = len(src_pts)
ok_inliers = (landmark_transform.get('inliers', 0) >= self.min_inliers) if num_matches < 20:
ratio = landmark_transform.get('inliers_ratio', 0.0) return None
ok_ratio = (ratio >= self.min_inlier_ratio)
rmse = landmark_transform.get('rmse', None) # Оценка матрицы гомографии
ok_rmse = (rmse is not None and rmse <= self.max_reproj_rmse) landmark_timer.loop()
if ok_scale and ok_inliers and ok_ratio and ok_rmse: landmark_transform, mask = estimate_transformation_matrix(src_pts, dst_pts)
# print("[HELP]") num_inliers = int(np.sum(mask))
# print("Matrix", landmark_transform['homography'])
# print("Position", self.x, self.y) if constants.DEBUG_FPS:
# print("Position of point", self.points[self.target_idx]) print(f"[LANDMARK]: matrix estimation: {landmark_timer.loop() * 1000:.2f} ms")
# print("[PILOT]", rmse, ratio, ok_rmse)
# if False: # Краевой случай: матрица не найдена
# Считаем абсолютную позу относительно координат ориентира if landmark_transform is None or mask is None:
landmark_world_x, landmark_world_y = self.points[self.target_idx] return None
landmark = Position(landmark_world_x, landmark_world_y, 1, 0)
homography = landmark_transform['homography'] # === КРИТЕРИИ ПРИНЯТИЯ РЕШЕНИЯ ===
# homography = np.linalg.inv(homography)
print(f" [Pilot] Landmark correction applied (inliers={landmark_transform['inliers']}, ratio={ratio:.2f}, rmse={rmse:.2f})") # 1. Минимальное количество инлайеров (абсолютное)
return landmark @ homography MIN_INLIERS_ABSOLUTE = 6
return None if num_inliers < MIN_INLIERS_ABSOLUTE:
return None
# 2. Процент инлайеров от общего числа матчей
inlier_ratio = num_inliers / num_matches
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: inlier_ratio=", inlier_ratio)
MIN_INLIER_RATIO = 0.6
if inlier_ratio < MIN_INLIER_RATIO:
return None
# 3. Проверка качества гомографии (детерминант для выявления вырожденных случаев)
det = np.linalg.det(landmark_transform[:2, :2])
# Детерминант должен быть близок к 1 (без сильного масштабирования)
if abs(det) < 0.1 or abs(det) > 10.0:
return None
# 4. Проверка на валидность перспективного преобразования
# Элементы третьей строки не должны быть слишком большими
if abs(landmark_transform[2, 0]) > 0.01 or abs(landmark_transform[2, 1]) > 0.01:
return None
# 5. Дополнительная проверка: средняя ошибка репроекции для инлайеров
inlier_src = src_pts[mask.ravel() == 1]
inlier_dst = dst_pts[mask.ravel() == 1]
# Преобразуем точки через найденную гомографию
transformed_pts = cv2.perspectiveTransform(inlier_src, landmark_transform)
# Вычисляем ошибку репроекции
reprojection_errors = np.sqrt(np.sum((transformed_pts - inlier_dst) ** 2, axis=2))
mean_error = np.mean(reprojection_errors)
MAX_MEAN_REPROJECTION_ERROR = 1.1 # пиксели
if constants.DEBUG_LANDMARK:
print("[LANDMARK]: Mean_error=", mean_error)
if mean_error > MAX_MEAN_REPROJECTION_ERROR:
return None
# 6. Проверка стабильности: если слишком много хороших совпадений, но мало инлайеров - подозрительно
if num_matches > 50 and inlier_ratio < 0.15:
return None
# === ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ ===
print("[LANDMARK]: Correction Applied")
if constants.DEBUG_FPS:
print(f"[LANDMARK]: time: {landmark_timer.get_elapsed() * 1000:.2f} ms")
return landmark_chunk.pos.apply(landmark_transform)
def handle(self, current_chunk: VisionChunk) -> PilotCommand: def handle(self, current_chunk: VisionChunk) -> PilotCommand:
@@ -197,17 +277,11 @@ class AutoPilot(Pilot):
self.timer.stop() self.timer.stop()
return PilotCommand(processing_time=self.timer.get_elapsed()) return PilotCommand(processing_time=self.timer.get_elapsed())
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
)
# Вычисляем оптический поток для покадрового сравнения # Вычисляем оптический поток для покадрового сравнения
matching_timer = Timer() matching_timer = Timer()
matching_timer.start() matching_timer.start()
# src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk) src_pts, dst_pts = self.calculate_optical_flow(self.prev_chunk, current_chunk)
src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk) # src_pts, dst_pts, _, _, _ = self.prev_chunk.detect_and_match_keypoints(current_chunk)
matching_timer.stop() matching_timer.stop()
print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms") print(f"Matching calculating: {matching_timer.get_elapsed() * 1000:.2f} ms")
@@ -217,7 +291,7 @@ class AutoPilot(Pilot):
matrix_estimation_timer = Timer() matrix_estimation_timer = Timer()
matrix_estimation_timer.start() matrix_estimation_timer.start()
homography_matrix = estimate_transformation_matrix(src_pts, dst_pts) homography_matrix, _ = estimate_transformation_matrix(src_pts, dst_pts)
matrix_estimation_timer.stop() matrix_estimation_timer.stop()
print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms") print(f"Transformation matrix updating: {matrix_estimation_timer.get_elapsed() * 1000:.2f} ms")
@@ -235,17 +309,24 @@ class AutoPilot(Pilot):
self.timer.start() self.timer.start()
chunk_timer = Timer()
chunk_timer.start()
# Пытаемся найти ориентир на картинке: # Пытаемся найти ориентир на картинке:
self.prev_chunk = current_chunk self.prev_chunk = current_chunk
# npos = self.get_position_by_chunk() # Для улучшения среднего FPS
# if npos is not None: if self.frame_count % 5 == 0:
# self.reserved_pos = npos pos_by_chunk = self.get_position_by_chunk()
if pos_by_chunk is not None:
self.pos = pos_by_chunk
chunk_timer.stop() command = self.make_command()
print(f"Chunk timer: {chunk_timer.get_elapsed() * 1000:.2f} ms") self.timer.reset()
return command
def make_command(self) -> PilotCommand:
# Расстояние до цели
distance_to_target = math.sqrt(
(self.points[self.target_idx][0] - self.pos.x) ** 2 +
(self.points[self.target_idx][1] - self.pos.y) ** 2
) * self.pixel_ratio
if distance_to_target < 35: if distance_to_target < 35:
self.target_idx += 1 self.target_idx += 1
@@ -258,16 +339,32 @@ class AutoPilot(Pilot):
(self.points[self.target_idx][1] - self.pos.y) ** 2 (self.points[self.target_idx][1] - self.pos.y) ** 2
) )
if self.reserved_pos is not None: angle_trajectory = self.pos.yaw + math.pi / 2
self.pos = self.reserved_pos
self.reserved_pos = None # Проверка на слепую зону
R = 120
blind = np.array([
[
self.pos.x * self.pixel_ratio + R * np.cos(angle_trajectory - np.pi / 2),
self.pos.y * self.pixel_ratio + R * np.sin(angle_trajectory - np.pi / 2),
],
[
self.pos.x * self.pixel_ratio + R * np.cos(angle_trajectory + np.pi / 2),
self.pos.y * self.pixel_ratio + R * np.sin(angle_trajectory + np.pi / 2),
]
])
blind -= self.points[self.target_idx] * self.pixel_ratio
blind = np.hypot(blind[:, 0], blind[:, 1])
print("R: ", blind)
if np.min(blind) < R:
return PilotCommand(0, 10, False, self.timer.get_elapsed())
# Вычисляем угол к цели # Вычисляем угол к цели
target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x) target_angle = math.atan2(self.points[self.target_idx][1] - self.pos.y, self.points[self.target_idx][0] - self.pos.x)
angle_trajectory = self.pos.yaw + math.pi / 2
# print("[ANGLE]", angle_trajectory, "->", target_angle)
# Вычисляем разность углов (направление поворота) # Вычисляем разность углов (направление поворота)
angle_diff = target_angle - angle_trajectory angle_diff = target_angle - angle_trajectory
@@ -277,14 +374,13 @@ class AutoPilot(Pilot):
if angle_diff >= math.pi: if angle_diff >= math.pi:
angle_diff -= 2 * math.pi angle_diff -= 2 * math.pi
d_r = max(10, min(35., distance_to_target / 2)) d_r = max(5, min(10., distance_to_target / 2))
d_a_limit = d_r / 10 * 0.01 d_a_limit = np.radians(5)
command = PilotCommand( command = PilotCommand(
max(min(d_a_limit, angle_diff), -d_a_limit), max(min(d_a_limit, angle_diff), -d_a_limit),
d_r, False, self.timer.get_elapsed() d_r, False, self.timer.get_elapsed()
) )
self.timer.reset()
return command return command
def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0): def reset_position(self, x: float = 0.0, y: float = 0.0, angle: float = 0.0):

View File

@@ -1,5 +1,18 @@
import numpy as np import numpy as np
# Ширина 1 пикселя в метрах
YANDEX_PIXEL_RATIO = {
15: 2830 / 1049,
18: 350 / 1049,
}
GOOGLE_PIXEL_RATIO = {
15: 2766 / 1031,
18: 346 / 1031,
}
WINDOW_HEIGHT = 1031
# Ширина и высота снимка в пикселях # Ширина и высота снимка в пикселях
CHUNK_WIDTH = 700 CHUNK_WIDTH = 700
@@ -17,3 +30,6 @@ K = np.array([
[0, _K_FOCUS_DISTANCE, _K_CENTER], [0, _K_FOCUS_DISTANCE, _K_CENTER],
[0, 0, 1] [0, 0, 1]
]) ])
DEBUG_FPS: bool = False
DEBUG_LANDMARK: bool = False

89
google_map.py Normal file
View File

@@ -0,0 +1,89 @@
from io import BytesIO
from PIL import Image
from selenium.webdriver.common.actions.wheel_input import ScrollOrigin
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import constants
import cv2
import math
import numpy as np
import os
import utility
def generateURL(lat: float, lon: float, zoom: int):
return f"https://www.google.com/maps/@{lon},{lat},{zoom}z"
class GoogleMap:
initial_zoom: int
initial_lat: float
initial_lon: float
pixel_ratio: float
def __init__(self, initial_lat=49.103814, initial_lon=55.794258, initial_zoom=18):
self.initial_lat = initial_lat
self.initial_lon = initial_lon
self.initial_zoom = initial_zoom
self.pixel_ratio = constants.GOOGLE_PIXEL_RATIO[self.initial_zoom]
options = webdriver.ChromeOptions()
# options.add_experimental_option("detach", True)
self.driver = webdriver.Chrome(options)
self.driver.get(generateURL(initial_lat, initial_lon, initial_zoom))
self.driver.maximize_window()
action = ActionChains(self.driver)
sleep(5)
self.driver.execute_script('document.querySelector(\'.yHc72.qk5Wte\').click()')
sleep(5)
def open(self, lat, lon, zoom):
self.initial_lat = lat
self.initial_lon = lon
self.initial_zoom = zoom
self.pixel_ratio = constants.GOOGLE_PIXEL_RATIO[self.initial_zoom]
self.driver.get(generateURL(lat, lon, zoom))
def save_photo(self, filename: str):
im = self.make_screenshot()
im.save(filename)
def destroy(self):
self.driver.close()
def get_size(self) -> tuple[int, int]:
html = self.driver.find_element(By.TAG_NAME, 'html')
return (html.size['width'], html.size['height'])
def scroll(self, x: float, y: float, count: int = 1, inner_zoom: bool = True):
html = self.driver.find_element(By.TAG_NAME, 'html')
x_offset = (x - 0.5) * (html.size['width'] - 72) + 72
y_offset = (y - 0.5) * html.size['height']
action = ActionChains(self.driver)
for i in range(count-1):
action.scroll_from_origin(ScrollOrigin(html, int(x_offset), int(y_offset)), 0, -100 if inner_zoom else 100)
action.perform()
if i != count - 1:
sleep(0.25)
def move(self, dx: float, dy: float):
self.driver.execute_script(utility.google_map_js_move_script(dx, dy))
def make_as_center(self, x: float, y: float):
dx = (x - 0.5) * self.get_size()[0]
dy = (0.5 - y) * self.get_size()[1]
self.move(dx, dy)
sleep(1)
def make_screenshot(self) -> Image.Image:
png = self.driver.get_screenshot_as_png()
im = Image.open(BytesIO(png))
return utility.cv2_to_pil(np.array(im)[:, 72:])
def get_geolocation(self):
current_url = self.driver.current_url
return utility.parse_google_maps_url(current_url)

384
main.py
View File

@@ -1,23 +1,33 @@
from google_map import GoogleMap
from pathlib import Path from pathlib import Path
from position import Position
from simulator import Simulator from simulator import Simulator
from time import sleep from time import sleep
from trajectory_drawer import TrajectoryDrawer from trajectory_drawer import TrajectoryDrawer
from utility import cv2_to_pil
from vision_chunk import VisionChunk
from visualization import VisualizationManager from visualization import VisualizationManager
from yandex_map import YandexMap from yandex_map import YandexMap
from vision_chunk import VisionChunk
from utility import cv2_to_pil
import random
import argparse
import autopilot
import constants
import cv2 import cv2
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
import pickle
import random
import utility
import autopilot def get_map(map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=18):
if map_name == 'google': return GoogleMap(lat, lon, zoom)
if map_name == 'yandex': return YandexMap(lat, lon, zoom)
return None
def make_global_photo(filename): def make_global_photo(filename, map_name: str = 'google', lat=49.103814, lon=55.794258, zoom=13):
yandexMap = YandexMap() online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, zoom)
yandexMap.save_photo(filename) online_map.save_photo(filename)
yandexMap.destroy() online_map.destroy()
def get_trajectory_points(bg_img: str) -> list[(float, float)]: def get_trajectory_points(bg_img: str) -> list[(float, float)]:
trajectoryDrawer = TrajectoryDrawer(bg_img) trajectoryDrawer = TrajectoryDrawer(bg_img)
@@ -26,97 +36,132 @@ def get_trajectory_points(bg_img: str) -> list[(float, float)]:
points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points)) points = list(map(lambda p: [p[0] / trajectoryDrawer.img.shape[1], p[1] / trajectoryDrawer.img.shape[0]], trajectoryDrawer.points))
return points return points
def main(): def build(name: str, map_name: str, lat: float, lon: float):
# Скриншот местности
# make_global_photo('map.jpg')
# Получаем траекторию от пользователя # Создание папки с информацией о маршруте
# points = get_trajectory_points('map.jpg') dir = Path('trajectories')
if not dir.exists(): dir.mkdir()
dir /= name
assert not dir.exists()
dir.mkdir()
dir_chunks = dir / 'chunks'
dir_chunks.mkdir()
# Trajectory #1 make_global_photo('map.jpg', map_name, lat, lon, 15)
# points = [[np.float64(0.5384504359393909), np.float64(0.4084520767967683)], [np.float64(0.4451750568707629), np.float64(0.38213330305374654)], [np.float64(0.49266070439660997), np.float64(0.2789637099811013)], [np.float64(0.36377108968359656), np.float64(0.3263375027185404)], [np.float64(0.3535955937852008), np.float64(0.4337180995900692)]] points = get_trajectory_points('map.jpg')
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 15)
# Trajectory #2
# points = [[np.float64(0.29197731306713737), np.float64(0.3452870198135161)], [np.float64(0.33494051797147517), np.float64(0.2010601397017569)], [np.float64(0.39768940934491587), np.float64(0.25369768718780034)], [np.float64(0.4027771572941138), np.float64(0.4158213334448144)], [np.float64(0.2914120077394487), np.float64(0.5547844588079692)]]
# Trajectory #3 width, height = online_map.get_size()
# points = [[np.float64(0.2755834585641664), np.float64(0.45687862048392835)], [np.float64(0.295934450360958), np.float64(0.5021469113219258)], [np.float64(0.32872215936689997), np.float64(0.4810918923275084)], [np.float64(0.3649017003389739), np.float64(0.5295184360146684)], [np.float64(0.3999506306556705), np.float64(0.49477765467387963)]]
# Trajectory #4
# points = [[np.float64(0.42143223310783934), np.float64(0.6663760594783815)], [np.float64(0.4253893704016599), np.float64(0.5537317078582484)], [np.float64(0.5124463908657128), np.float64(0.5621537154560153)], [np.float64(0.5124463908657128), np.float64(0.6684815613778233)], [np.float64(0.42143223310783934), np.float64(0.6663760594783815)]]
# Trajectory #5
# points = [[np.float64(0.5983728006743884), np.float64(0.7348048712102382)], [np.float64(0.5966768846913225), np.float64(0.5453097002604814)], [np.float64(0.6345523416464622), np.float64(0.7190136069644251)], [np.float64(0.6402053949233488), np.float64(0.5495207040593649)], [np.float64(0.5983728006743884), np.float64(0.7348048712102382)]]
# Trajectory #6
# points = [[np.float64(0.4406526142492536), np.float64(0.28106921188054296)], [np.float64(0.38581799746345413), np.float64(0.2968604761263561)], [np.float64(0.3931669667234066), np.float64(0.353709027411283)], [np.float64(0.4248240650739713), np.float64(0.35265627646156217)], [np.float64(0.40616898926024564), np.float64(0.3179154951207735)]]
# Trajectory #7
# points = [[np.float64(0.5491912371654754), np.float64(0.7505961354560512)], [np.float64(0.5537136797869846), np.float64(0.6863783275230781)], [np.float64(0.5017055896396284), np.float64(0.6653233085286606)], [np.float64(0.5520177638039186), np.float64(0.6042637534448503)], [np.float64(0.5593667330638712), np.float64(0.516885424618018)]]
# Trajectory #8
# points =
# Trajectory #9
# points =
# Trajectory #10
# points =
print(points)
# Для каждой точки сделаем приближенный снимок
yandexMap = YandexMap()
chunks: list[VisionChunk] = []
plt.ion()
for i in range(len(points)):
point = points[i]
yandexMap.scroll(point[0], point[1], 5, True)
sleep(1)
cv2_img = yandexMap.make_screenshot(point[0], point[1], 0.2, 0.2)
img = cv2_to_pil(cv2_img)
chunk = VisionChunk(img)
Path('chunks').mkdir(exist_ok=True)
chunk.save_image(Path('.') / 'chunks' / f'chunk_{i}.png')
plt.subplot(1, len(points), i+1)
plt.imshow(img)
plt.pause(0.25)
yandexMap.scroll(point[0], point[1], 5, False)
plt.tight_layout()
# Выделим на каждой картинке ключевые точки
for i in range(len(points)):
chunk = VisionChunk.load_image(Path('chunks') / f'chunk_{i}.png')
chunks.append(chunk)
kp, des = chunk.compute_keypoints()
plt.subplot(1, len(points), i+1)
plt.imshow(chunk.image)
kp_coords = np.array([j.pt for j in kp])
if len(kp_coords) > 0:
plt.scatter(kp_coords[:, 0], kp_coords[:, 1], c='red', s=20, alpha=0.7, marker='o')
plt.pause(0.2)
plt.ioff()
plt.show(block=True)
# Начнём симуляцию полёта с первой точки
yandexMap.scroll(points[0][0], points[0][1], 5, True)
sleep(0.2)
yandexMap.make_as_center(*points[0])
sleep(1)
vis_manager = VisualizationManager()
width, height = yandexMap.get_size()
# print(width, height)
points_coords = np.array(list(map(lambda p: [ points_coords = np.array(list(map(lambda p: [
(p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height (p[0] - points[0][0]) * width, (points[0][1] - p[1]) * height
], points))) ], points)))
points_coords *= 2 ** 4
pilot = autopilot.AutoPilot(points_coords, chunks, vis_manager) points_coords *= online_map.pixel_ratio
simulator = Simulator(yandexMap)
# Начнём симуляцию полёта с первой точки
online_map.make_as_center(*points[0])
sleep(3)
online_map.scroll(0.5, 0.5, 10, True)
sleep(2)
geo = online_map.get_geolocation()
online_map.open(geo['lat'], geo['lon'], 18)
sleep(5)
points_coords_pixel = points_coords.copy() / online_map.pixel_ratio
pilot = autopilot.AutoPilot(points_coords_pixel, pixel_ratio=online_map.pixel_ratio)
simulator = Simulator(online_map)
pilot.target_idx = 0
pilot.pos = simulator.pos.copy()
command = pilot.make_command()
positions: list[Position] = []
print("points_coords_pixel:", points_coords_pixel)
for i in range(5000):
if command.stop:
break
chunk = simulator.get_chunk()
pilot.pos = simulator.pos.copy()
command = pilot.make_command()
command.velocity /= online_map.pixel_ratio
print("Position:", simulator.pos)
# Save Image
chunk.save_image(dir_chunks / f"chunk_{len(positions)}.png")
positions.append(simulator.pos.copy() * online_map.pixel_ratio)
simulator.handle(command.dangle, command.velocity)
if i == 0 and map_name == 'google':
simulator.pos.x = 0
simulator.pos.y = 0
sleep(1.5)
data = {
'points': points_coords,
'chunk_positions': positions,
'initial_geolocation': geo
}
print(points_coords)
file_positions = dir / 'positions.pkl'
with file_positions.open('wb') as file:
pickle.dump(data, file)
print("WRITE POINTS:", points)
sleep(15)
online_map.destroy()
def run(name: str, map_name: str, ref_min_distance: float):
dir = Path('trajectories')
assert dir.exists()
dir /= name
assert dir.exists(), "Укажите корректное название маршрута"
dir_chunks = dir / 'chunks'
file_positions = dir / 'positions.pkl'
with file_positions.open('rb') as file:
data = pickle.load(file)
initial_geolocation = data['initial_geolocation']
lat = initial_geolocation['lat']
lon = initial_geolocation['lon']
online_map: YandexMap | GoogleMap = get_map(map_name, lat, lon, 18)
sleep(2)
chunks: list[VisionChunk] = []
for i in range(len(data['chunk_positions'])):
pos = data['chunk_positions'][i]
if len(chunks) == 0 or np.hypot(chunks[-1].pos.x - pos.x, chunks[-1].pos.y - pos.y) > ref_min_distance:
chunk = VisionChunk.load_image(dir_chunks / f"chunk_{i}.png")
chunk.pos = data['chunk_positions'][i] / online_map.pixel_ratio
chunks.append(chunk)
r = 0
for i in range(len(data['points']) - 1):
r += np.hypot(
data['points'][i][0] - data['points'][i+1][0],
data['points'][i][1] - data['points'][i+1][1]
)
print("R: ", r)
return
points = data['points'] / online_map.pixel_ratio
print("READ POINTS:", points)
vis_manager = VisualizationManager()
pilot = autopilot.AutoPilot(points, chunks, vis_manager, online_map.pixel_ratio)
simulator = Simulator(online_map)
pilot.target_idx = 0 pilot.target_idx = 0
chunk = simulator.get_chunk() chunk = simulator.get_chunk()
@@ -125,38 +170,26 @@ def main():
vis_manager.update_display() vis_manager.update_display()
vis_manager.pause(1) vis_manager.pause(1)
vis_manager.set_target_points(points_coords) vis_manager.set_target_points(data['points'])
proc_time = np.array([]) proc_time = np.array([])
zoom_next_event = random.randint(5, 10)
errors = [] errors = []
chunk_errors = []
chunk_improves = [] sleep(1)
last_chunk_index = 0
for i in range(10000000000): for i in range(10000000000):
print(f"Image #{i}") if i > 0:
if i == zoom_next_event: simulator.set_pitch(np.sin(i / 10) * 5)
r = random.randint(0, 1) simulator.set_roll(np.sin(i / 15) * 5)
direction = ['up', 'down'][r] simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3)
# simulator.change_zoom(direction)
zoom_next_event = i + random.randint(20, 40)
# if i > 0:
# simulator.set_pitch(np.sin(i / 10) * 5)
# simulator.set_roll(np.sin(i / 15) * 5)
# simulator.set_zoom(1.0 + np.sin(i / 10) * 0.3)
if command.stop: if command.stop:
break break
# simulator.handle(command.dangle, command.velocity)
chunk = simulator.get_chunk() chunk = simulator.get_chunk()
command = pilot.handle(chunk) command = pilot.handle(chunk)
command.velocity /= online_map.pixel_ratio
proc_time = np.append(proc_time, command.proccessing_time) proc_time = np.append(proc_time, command.proccessing_time)
@@ -167,34 +200,133 @@ def main():
vis_manager.pause(0.2) vis_manager.pause(0.2)
vis_manager.set_target_index(pilot.target_idx) vis_manager.set_target_index(pilot.target_idx)
vis_manager.update_drone_trajectory(pilot.pos.x, pilot.pos.y) vis_manager.update_drone_trajectory(pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio)
vis_manager.update_global_map(simulator.pos.x, simulator.pos.y) vis_manager.update_global_map(simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
vis_manager.update_error_plot(i, pilot.pos.x, pilot.pos.y, simulator.pos.x, simulator.pos.y) vis_manager.update_error_plot(i, pilot.pos.x * online_map.pixel_ratio, pilot.pos.y * online_map.pixel_ratio, simulator.pos.x * online_map.pixel_ratio, simulator.pos.y * online_map.pixel_ratio)
errors.append(np.hypot(pilot.pos.x - simulator.pos.x, pilot.pos.y - simulator.pos.y)) errors.append(np.hypot((pilot.pos.x - simulator.pos.x) * online_map.pixel_ratio, (pilot.pos.y - simulator.pos.y) * online_map.pixel_ratio))
if last_chunk_index != pilot.target_idx:
last_chunk_index = pilot.target_idx
chunk_errors.append(errors[-1])
chunk_improves.append(errors[-1] - errors[max(len(errors) - 2, 0)])
vis_manager.update_display() vis_manager.update_display()
vis_manager.pause(0.2) vis_manager.pause(0.2)
last_proc_times = proc_time[-10:] last_proc_times = proc_time[-30:]
print(F"\nImage #{i}")
print("Average FPS:", 1 / last_proc_times.mean()) print("Average FPS:", 1 / last_proc_times.mean())
print("Pilot coords:", pilot.pos) print("Pilot coords:", pilot.pos)
print("Simulator coords:", simulator.pos) print("Simulator coords:", simulator.pos)
sleep(0.5)
simulator.handle(command.dangle, command.velocity) simulator.handle(command.dangle, command.velocity)
if i == 0 and map_name == 'google':
simulator.pos.x = 0
simulator.pos.y = 0
print("Errors:", errors) print("Errors:", errors)
print("MSE:", (np.array(errors) ** 2).mean()) print("MSE:", (np.array(errors) ** 2).mean())
print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5) print("RMSE:", (np.array(errors) ** 2).mean() ** 0.5)
print("Chunk errors:", chunk_errors)
print("Chunk error improves:", chunk_improves)
print("Average FPS:", 1 / proc_time.mean()) print("Average FPS:", 1 / proc_time.mean())
vis_manager.show_final() vis_manager.show_final()
if __name__ == "__main__": def parse_args():
main() """Парсер аргументов командной строки"""
parser = argparse.ArgumentParser(description='Обработка траекторий')
#TODO # Добавляем обязательный аргумент --mode
parser.add_argument(
'--mode',
type=str,
required=True,
choices=['standalone', 'build', 'run'],
help='Режим работы: standalone, build или run'
)
# Добавляем опциональный аргумент --name
parser.add_argument(
'--name',
type=str,
default=utility.generate_folder_name(),
help='Название траектории'
)
# Координаты
parser.add_argument(
'--lat',
type=float,
default=49.103814,
help='Широта (по умолчанию 49.103814)'
)
parser.add_argument(
'--lon',
type=float,
default=55.794258,
help='Долгота (по умолчанию 55.794258)'
)
# Источник эталонных изображений (ориентиров)
parser.add_argument(
'--reference',
type=str,
default='google',
choices=['google', 'yandex'],
help='Откуда берутся эталонные изображения (ориентиры): google или yandex (по умолчанию google)'
)
# Место проведения симуляции
parser.add_argument(
'--simulation',
type=str,
default='yandex',
choices=['google', 'yandex'],
help='Где проводится симуляция: google или yandex (по умолчанию yandex)'
)
# Место проведения симуляции
parser.add_argument(
'--ref-min-distance',
type=float,
default=100,
help='Минимальное расстояние между эталонами'
)
# Место проведения симуляции
parser.add_argument(
'--debug-fps',
action='store_true',
help='Включить отладку FPS'
)
# Место проведения симуляции
parser.add_argument(
'--debug-landmark',
action='store_true',
help='Включить отладку эталонов'
)
# Парсим аргументы
args = parser.parse_args()
# Проверяем, что для build и run указан --name
if args.mode in ['build', 'run'] and not args.name:
parser.error(f"--name обязателен для режима {args.mode}")
return args
if __name__ == "__main__":
args = parse_args()
name = args.name
mode = args.mode
sim: str = args.simulation
ref: str = args.reference
lat: float = args.lat
lon: float = args.lon
rmd: float = args.ref_min_distance
constants.DEBUG_FPS = args.debug_fps
constants.DEBUG_LANDMARK = args.debug_landmark
if mode == 'build' or mode == 'standalone':
build(name, ref, lat, lon)
if mode == 'run' or mode == 'standalone':
run(name, sim, rmd)

BIN
map.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.4 MiB

After

Width:  |  Height:  |  Size: 3.3 MiB

View File

@@ -42,22 +42,45 @@ class Position:
f"roll={math.degrees(self.roll):.1f}°)" f"roll={math.degrees(self.roll):.1f}°)"
) )
def get_homography_matrix(self, K: np.ndarray = constants.K, sliding: bool = True) -> np.ndarray: def __imul__(self, scalar: float):
self.x *= scalar
self.y *= scalar
self.z *= scalar
return self
def __mul__(self, scalar: float) -> 'Position':
pos = self.copy()
pos *= scalar
return pos
def __itruediv__(self, scalar: float):
self.x /= scalar
self.y /= scalar
self.z /= scalar
return self
def __truediv__(self, scalar: float) -> 'Position':
pos = self.copy()
pos /= scalar
return pos
def get_homography_matrix(self, K_in: np.ndarray = constants.K, K_out: np.ndarray | None = None, sliding: bool = True) -> np.ndarray:
""" Возвращает матрицу гомографии """ """ Возвращает матрицу гомографии """
R = self.get_rotation_matrix() R = self.get_rotation_matrix()
T = self.get_translation_matrix() T = self.get_translation_matrix(K_in)
if not sliding: if not sliding:
T[0, 2] = T[1, 2] = 0 T[0, 2] = T[1, 2] = 0
return K @ R @ T @ np.linalg.inv(K) if K_out is None: K_out = K_in
return K_out @ R @ T @ np.linalg.inv(K_in)
def copy(self) -> 'Position': def copy(self) -> 'Position':
"""Создает полную копию объекта""" """Создает полную копию объекта"""
return Position(self.x, self.y, self.z, self.yaw, self.pitch, self.roll) return Position(self.x, self.y, self.z, self.yaw, self.pitch, self.roll)
def get_translation_matrix(self) -> np.ndarray: def get_translation_matrix(self, K: np.ndarray = constants.K) -> np.ndarray:
return np.array([ return np.array([
[1, 0, self.x / constants._K_FOCUS_DISTANCE], [1, 0, self.x / K[0][0]],
[0, 1, self.y / constants._K_FOCUS_DISTANCE], [0, 1, self.y / K[0][0]],
[0, 0, self.z] [0, 0, self.z]
]) ])
@@ -101,6 +124,13 @@ class Position:
R = np.array(R) R = np.array(R)
t = np.array(t) t = np.array(t)
# print(cv2.decomposeHomographyMat(inv(H), K))
# _, _, z, _ = cv2.decomposeHomographyMat(inv(H), K)
# print(z)
# z = z.copy()
# z *= constants._K_FOCUS_DISTANCE
# print(z)
T = inv(R) @ inv(K) @ H @ K T = inv(R) @ inv(K) @ H @ K
ind = np.array([A[2][0] ** 2 + A[2][1] ** 2 for A in T]) ind = np.array([A[2][0] ** 2 + A[2][1] ** 2 for A in T])
top_k = max(1, len(T) // 2) top_k = max(1, len(T) // 2)
@@ -116,14 +146,16 @@ class Position:
R = R[best_id] R = R[best_id]
rot = Rotation.from_matrix(R).as_euler('XYZ').flatten() rot = Rotation.from_matrix(R).as_euler('XYZ').flatten()
self.roll = rot[0] self.roll = min(np.radians(5), max(np.radians(-5), rot[0]))
self.pitch = rot[1] self.pitch = min(np.radians(5), max(np.radians(-5), rot[1]))
self.yaw = rot[2] self.yaw = rot[2]
t = t[best_id].flatten() t = t[best_id].flatten()
self.x += -T[0] * constants._K_FOCUS_DISTANCE * self.z self.x -= T[0] * constants._K_FOCUS_DISTANCE
self.y += T[1] * constants._K_FOCUS_DISTANCE * self.z self.y += T[1] * constants._K_FOCUS_DISTANCE
self.z = 1 + T[2] self.z = max(0.7, min(1.3, 1 + T[2]))
T[0] *= constants._K_FOCUS_DISTANCE
T[1] *= constants._K_FOCUS_DISTANCE
def apply(self, homography_matrix: np.ndarray, K = constants.K) -> 'Position': def apply(self, homography_matrix: np.ndarray, K = constants.K) -> 'Position':
"""Применяет матрицу трансформации для вычисления новой позиции и ориентации.""" """Применяет матрицу трансформации для вычисления новой позиции и ориентации."""

View File

@@ -8,12 +8,13 @@ import numpy as np
from position import Position from position import Position
from vision_chunk import VisionChunk from vision_chunk import VisionChunk
from yandex_map import YandexMap from yandex_map import YandexMap
from google_map import GoogleMap
import constants import constants
import utility import utility
class Simulator: class Simulator:
def __init__(self, yandex_map: YandexMap = None): def __init__(self, online_map: YandexMap | GoogleMap = None):
self.yandex_map = yandex_map self.online_map = online_map
# Используем новый конструктор с yaw, pitch, roll # Используем новый конструктор с yaw, pitch, roll
self.pos = Position(x=0, y=0, z=1, yaw=0, pitch=0, roll=0) self.pos = Position(x=0, y=0, z=1, yaw=0, pitch=0, roll=0)
@@ -35,24 +36,26 @@ class Simulator:
Возвращает квадратное изображение 700x700. Возвращает квадратное изображение 700x700.
""" """
img_array = np.array(image) img_array = np.array(image)
print(img_array.shape)
h, w, _ = img_array.shape h, w, _ = img_array.shape
# Применяем трансформацию # Применяем трансформацию
pos = self.pos.copy() pos = self.pos.copy()
pos.x = 0 pos.x = 0
pos.y = 0 pos.y = 0
K = utility.calc_camera_matrix(w, h)
K = constants.K K_in = utility.calc_camera_matrix(w, h)
img_array = img_array[:constants.CHUNK_WIDTH, :constants.CHUNK_WIDTH] K_out = constants.K
transformed = cv2.warpPerspective(img_array, pos.get_homography_matrix(K), (constants.CHUNK_WIDTH, constants.CHUNK_WIDTH)) H = pos.get_homography_matrix(K_in, K_out)
shape = (constants.CHUNK_WIDTH, constants.CHUNK_WIDTH)
transformed = cv2.warpPerspective(img_array, H, shape)
return Image.fromarray(transformed) return Image.fromarray(transformed)
def update_trajectory(self, dx: float, dy: float): def update_trajectory(self, dx: float, dy: float):
"""Обновляет координаты дрона""" """Обновляет координаты дрона"""
self.pos.x += dx * self.pos.z self.pos.x += dx
self.pos.y += dy * self.pos.z self.pos.y += dy
def handle(self, dangle: float, velocity: float = 50) -> None: def handle(self, dangle: float, velocity: float = 50) -> None:
""" """
@@ -60,27 +63,17 @@ class Simulator:
dangle - изменение угла курса (радианы) dangle - изменение угла курса (радианы)
velocity - скорость движения velocity - скорость движения
""" """
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
html = self.yandex_map.driver.find_element(By.TAG_NAME, 'html')
action = ActionChains(self.yandex_map.driver)
action.move_to_element_with_offset(html, 200, 200)
action.click_and_hold()
# Обновляем yaw в объекте Position # Обновляем yaw в объекте Position
self.pos.yaw += dangle self.pos.yaw += dangle
velocity = max(velocity, 10) velocity = max(velocity, 10)
# Вычисляем смещение на основе текущего yaw # Вычисляем смещение на основе текущего yaw
dx = math.cos(math.pi / 2 + self.pos.yaw) * velocity / self.pos.z dx = int(math.cos(math.pi / 2 + self.pos.yaw) * velocity)
dy = math.sin(math.pi / 2 + self.pos.yaw) * velocity / self.pos.z dy = int(math.sin(math.pi / 2 + self.pos.yaw) * velocity)
self.update_trajectory(dx, dy) self.update_trajectory(dx, dy)
self.online_map.move(dx, dy)
action.move_by_offset(-dx, dy)
action.release()
action.perform()
def set_zoom(self, zoom_level: float): def set_zoom(self, zoom_level: float):
"""Программное изменение масштаба""" """Программное изменение масштаба"""
@@ -88,8 +81,7 @@ class Simulator:
def get_chunk(self) -> VisionChunk: def get_chunk(self) -> VisionChunk:
"""Получить текущий снимок с камеры дрона""" """Получить текущий снимок с камеры дрона"""
png = self.yandex_map.driver.get_screenshot_as_png() im = self.online_map.make_screenshot()
im = Image.open(BytesIO(png))
# Применяем перспективную трансформацию # Применяем перспективную трансформацию
transformed_im = self._apply_perspective_transform(im) transformed_im = self._apply_perspective_transform(im)

189
test_chunks.ipynb Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -29,3 +29,9 @@ class Timer:
self.elapsed = 0. self.elapsed = 0.
self.enabled = False self.enabled = False
self.last_enabled = 0. self.last_enabled = 0.
def loop(self) -> float:
v = self.get_diff()
self.stop()
self.start()
return v

View File

@@ -19,6 +19,6 @@
| [+] Исследовать причину погрешности при развороте | [+] Исследовать причину погрешности при развороте
[+] Устранение большой погрешности при повороте [+] Устранение большой погрешности при повороте
[ ] Переделать ключевые точки -> Optical Flow [+] Переделать ключевые точки -> Optical Flow
[ ] Добавить перспективу [+] Добавить перспективу
[ ] Эталоны на Google Maps, полёт тот же [ ] Эталоны на Google Maps, полёт тот же

View File

@@ -1,7 +1,11 @@
from PIL import Image from PIL import Image
from datetime import datetime
from urllib.parse import parse_qs, urlparse, unquote
import constants
import cv2 import cv2
import numpy as np import numpy as np
import constants import re
def cv2_to_pil(cv_image: np.ndarray) -> Image.Image: def cv2_to_pil(cv_image: np.ndarray) -> Image.Image:
""" """
@@ -19,11 +23,10 @@ def get_normals(H: np.ndarray, R: np.ndarray, T: np.ndarray) -> np.ndarray:
n = cv2.decomposeHomographyMat(H, constants.K, R, T) n = cv2.decomposeHomographyMat(H, constants.K, R, T)
return n return n
def estimate_transformation_matrix(src_pts: np.ndarray, dst_pts: np.ndarray) -> tuple[np.ndarray, float | None]: def estimate_transformation_matrix(src_pts: np.ndarray, dst_pts: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Оценивает матрицу трансформации на основе сопоставленных точек""" """Оценивает матрицу трансформации на основе сопоставленных точек"""
# Используем RANSAC для оценки матрицы гомографии # Используем RANSAC для оценки матрицы гомографии
H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0, maxIters=1000) return cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3.0, maxIters=1000)
return H
def calc_camera_matrix(w: float, h: float): def calc_camera_matrix(w: float, h: float):
f = constants._K_FOCUS_DISTANCE f = constants._K_FOCUS_DISTANCE
@@ -32,3 +35,135 @@ def calc_camera_matrix(w: float, h: float):
[0, f, h / 2], [0, f, h / 2],
[0, 0, 1] [0, 0, 1]
]) ])
def generate_folder_name():
"""
Генерирует название для папки с текущей датой и временем до секунд.
Формат: YYYY-MM-DD_HH-MM-SS
"""
now = datetime.now()
folder_name = now.strftime("%Y-%m-%d_%H-%M-%S")
return folder_name
def parse_yandex_maps_url(url):
"""
Парсит URL Яндекс.Карт и извлекает lat, lon и zoom
Формат: ?ll=lon,lat&z=zoom
"""
# Декодируем URL (на случай %2C вместо запятых)
url = unquote(url)
# Парсим URL
parsed_url = urlparse(url)
params = parse_qs(parsed_url.query)
if 'll' in params and 'z' in params:
# ll содержит "lon,lat"
ll_value = params['ll'][0]
lat, lon = map(float, ll_value.split(','))
zoom = int(params['z'][0])
return {
'lat': lat,
'lon': lon,
'zoom': zoom
}
return None
def parse_google_maps_url(url):
"""
Парсит URL Google Maps и извлекает lat, lon и zoom
Формат: /@lat,lon,zoom[m|z]
"""
pattern = r'/@([-\d.]+),([-\d.]+),(\d+)([mz])'
match = re.search(pattern, url)
if match:
lon = float(match.group(1))
lat = float(match.group(2))
zoom_value = int(match.group(3))
zoom_unit = match.group(4)
return {
'lat': lat,
'lon': lon,
'zoom': zoom_value,
'zoom_unit': zoom_unit
}
return None
def google_map_js_move_script(dx, dy) -> str:
return """
async function sleep(ms) {
return new Promise((resolve, reject) => {
setTimeout(() => resolve(), ms);
});
}
async function simulateDrag(element, offsetX, offsetY) {
const rect = element.getBoundingClientRect();
const startX = rect.left + rect.width / 2;
const startY = rect.top + rect.height / 2;
const step = 10
const endX = startX + offsetX + step;
const endY = startY + offsetY + step;
element.dispatchEvent(new MouseEvent('mousedown', {
view: window,
bubbles: true,
cancelable: true,
clientX: startX,
clientY: startY,
button: 0
}));
let currentX = startX;
let currentY = startY;
function move(stepX, stepY) {
currentX += stepX;
currentY += stepY;
document.dispatchEvent(new MouseEvent('mousemove', {
view: window,
bubbles: true,
cancelable: false,
clientX: currentX,
clientY: currentY
}));
}
await sleep(50);
move(step, step)
while (currentX != endX || currentY != endY) {
await sleep(50);
const stepX = Math.min(step, Math.max(-step, endX - currentX));
const stepY = Math.min(step, Math.max(-step, endY - currentY));
move(stepX, stepY);
}
await sleep(50)
document.dispatchEvent(new MouseEvent('mouseup', {
view: window,
bubbles: true,
cancelable: false,
clientX: endX,
clientY: endY,
button: 0
}));
}
{
const canvas = document.querySelector('canvas.H1VXrf.JRr1M.DnOnV');
""" + f"simulateDrag(canvas, {int(-dx)}, {int(dy)});" + """
}
"""

View File

@@ -1,10 +1,13 @@
import constants
import cv2 import cv2
import json import json
import numpy as np import numpy as np
from PIL import Image
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from position import Position
from timer import Timer
from typing import Literal, Optional, Tuple from typing import Literal, Optional, Tuple
from PIL import Image
FeatureMethod = Literal["orb", "sift", "akaze", "brisk"] FeatureMethod = Literal["orb", "sift", "akaze", "brisk"]
DEFAULT_METHOD = "orb" DEFAULT_METHOD = "orb"
@@ -14,6 +17,7 @@ class VisionChunk:
image: Image.Image image: Image.Image
feature_method: FeatureMethod = DEFAULT_METHOD feature_method: FeatureMethod = DEFAULT_METHOD
pos: Optional[Position] = field(default=None, init=False)
keypoints: Optional[list] = field(default=None, init=False) keypoints: Optional[list] = field(default=None, init=False)
descriptors: Optional[np.ndarray] = field(default=None, init=False) descriptors: Optional[np.ndarray] = field(default=None, init=False)
_detector: Optional[cv2.Feature2D] = field(default=None, init=False, repr=False) _detector: Optional[cv2.Feature2D] = field(default=None, init=False, repr=False)
@@ -27,12 +31,12 @@ class VisionChunk:
self._detector = cv2.ORB_create( self._detector = cv2.ORB_create(
nfeatures=1000, nfeatures=1000,
scaleFactor=1.2, scaleFactor=1.2,
nlevels=32, nlevels=16,
edgeThreshold=31, edgeThreshold=31,
firstLevel=0, firstLevel=0,
WTA_K=2, WTA_K=2,
patchSize=31, patchSize=31,
fastThreshold=20, fastThreshold=10,
) )
elif self.feature_method == "sift": elif self.feature_method == "sift":
self._detector = cv2.SIFT_create( self._detector = cv2.SIFT_create(
@@ -70,30 +74,55 @@ class VisionChunk:
return self._matcher return self._matcher
def _preprocess(self, img_np: np.ndarray) -> np.ndarray: def _preprocess(self, img_np: np.ndarray) -> np.ndarray:
"""CLAHE предобработка для улучшения контраста""" """Предобработка для улучшения сопоставления между снимками разного времени"""
if len(img_np.shape) == 3: if len(img_np.shape) == 3:
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
else: else:
gray = img_np gray = img_np
# Гауссовское размытие для подавления шума и мелких различий
# blurred = cv2.GaussianBlur(gray, (5, 5), 1.0)
# CLAHE для выравнивания контраста между снимками
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
return clahe.apply(gray) enhanced = clahe.apply(gray)
# Опционально: нормализация гистограммы для устранения различий в освещении
normalized = cv2.normalize(enhanced, None, 0, 255, cv2.NORM_MINMAX)
return normalized
def compute_keypoints(self, force: bool = False) -> Tuple[list[cv2.KeyPoint], Optional[np.ndarray]]: def compute_keypoints(self, force: bool = False) -> Tuple[list[cv2.KeyPoint], Optional[np.ndarray]]:
if self.keypoints is not None and self.descriptors is not None and not force: if self.keypoints is not None and self.descriptors is not None and not force:
return self.keypoints, self.descriptors return self.keypoints, self.descriptors
timer = Timer()
timer.start()
detector = self._get_detector() detector = self._get_detector()
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: get_detector: {timer.loop() * 1000:.2f} ms")
# PIL -> OpenCV (RGB->BGR) # PIL -> OpenCV (RGB->BGR)
img_np = np.array(self.image) img_np = np.array(self.image)
if img_np.ndim == 3 and img_np.shape[2] == 3: if img_np.ndim == 3 and img_np.shape[2] == 3:
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: converting: {timer.loop() * 1000:.2f} ms")
# CLAHE предобработка # CLAHE предобработка
preprocessed = self._preprocess(img_np) preprocessed = self._preprocess(img_np)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: preprocess: {timer.loop() * 1000:.2f} ms")
keypoints, descriptors = detector.detectAndCompute(preprocessed, None) keypoints, descriptors = detector.detectAndCompute(preprocessed, None)
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: detect and compute: {timer.loop() * 1000:.2f} ms")
# Получаем массив response для всех точек # Получаем массив response для всех точек
responses = np.array([kp.response for kp in keypoints]) responses = np.array([kp.response for kp in keypoints])
@@ -104,6 +133,9 @@ class VisionChunk:
best_keypoints = [keypoints[i] for i in top_indices] best_keypoints = [keypoints[i] for i in top_indices]
best_descriptors = descriptors[top_indices] best_descriptors = descriptors[top_indices]
if constants.DEBUG_FPS:
print(f"[VC-DETECTION]: filtration: {timer.loop() * 1000:.2f} ms")
self.keypoints = best_keypoints self.keypoints = best_keypoints
self.descriptors = best_descriptors self.descriptors = best_descriptors
return self.keypoints, self.descriptors return self.keypoints, self.descriptors
@@ -122,15 +154,29 @@ class VisionChunk:
Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты) Возвращает: src_pts, dst_pts, good_matches, kp1, kp2 (отцентрированные координаты)
""" """
# Вычисляем keypoints для обоих # Вычисляем keypoints для обоих
timer = Timer()
timer.start()
kp1, des1 = self.compute_keypoints() kp1, des1 = self.compute_keypoints()
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: computing 1: {timer.loop() * 1000:.2f} ms")
kp2, des2 = other.compute_keypoints() kp2, des2 = other.compute_keypoints()
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: computing 2: {timer.loop() * 1000:.2f} ms")
if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4: if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
return None, None, None, None, None return None, None, None, None, None
# kNN matching + Lowe ratio test # kNN matching + Lowe ratio test
matcher = self._get_matcher() matcher = self._get_matcher()
matches_knn = matcher.knnMatch(des1, des2, k=2) matches_knn = matcher.knnMatch(des1, des2, k=2)
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: matching: {timer.loop() * 1000:.2f} ms")
good_matches: list[cv2.DMatch] = [] good_matches: list[cv2.DMatch] = []
for m_n in matches_knn: for m_n in matches_knn:
@@ -147,15 +193,6 @@ class VisionChunk:
if len(good_matches) < 4: if len(good_matches) < 4:
return None, None, None, None, None return None, None, None, None, None
# Центр изображений
img1_cv = self.to_cv2_gray()
img2_cv = other.to_cv2_gray()
h1, w1 = img1_cv.shape
h2, w2 = img2_cv.shape
cx1, cy1 = w1 // 2, h1 // 2
cx2, cy2 = w2 // 2, h2 // 2
# Отцентрированные координаты (x_rel, y_rel)
src_pts = [] src_pts = []
dst_pts = [] dst_pts = []
@@ -169,6 +206,9 @@ class VisionChunk:
src_pts = np.float32(src_pts).reshape(-1, 1, 2) src_pts = np.float32(src_pts).reshape(-1, 1, 2)
dst_pts = np.float32(dst_pts).reshape(-1, 1, 2) dst_pts = np.float32(dst_pts).reshape(-1, 1, 2)
if constants.DEBUG_FPS:
print(f"[VC-KEYPOINTS]: filtration: {timer.loop() * 1000:.2f} ms")
return src_pts, dst_pts, good_matches, kp1, kp2 return src_pts, dst_pts, good_matches, kp1, kp2
def to_cv2_gray(self) -> np.ndarray: def to_cv2_gray(self) -> np.ndarray:

View File

@@ -3,14 +3,16 @@
Модуль для управления общим окном визуализации Модуль для управления общим окном визуализации
""" """
from PIL import Image
from enum import Enum
from scipy.interpolate import make_interp_spline
import cv2
import matplotlib
import matplotlib.axes import matplotlib.axes
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.patches as patches import matplotlib.patches as patches
import numpy as np import numpy as np
from enum import Enum
import cv2
from PIL import Image
import matplotlib
# Настройки matplotlib # Настройки matplotlib
matplotlib.use('TkAgg') matplotlib.use('TkAgg')
@@ -93,14 +95,14 @@ class VisualizationManager:
self.ax_matches.axis('off') self.ax_matches.axis('off')
# Сопоставление точек (средний средний угол) # Сопоставление точек (средний средний угол)
self.ax_chunk_matches = self.fig.add_subplot(gs[1, 2]) self.ax_chunk_matches = self.fig.add_subplot(gs[1, 1:3])
self.ax_chunk_matches.set_title('Chunk Matching') self.ax_chunk_matches.set_title('Chunk Matching')
self.ax_chunk_matches.axis('off') self.ax_chunk_matches.axis('off')
# Визуализация движения ключевых точек (левый нижний угол) # Визуализация движения ключевых точек (левый нижний угол)
self.ax_motion_vectors = self.fig.add_subplot(gs[1, 1]) # self.ax_motion_vectors = self.fig.add_subplot(gs[1, 1])
self.ax_motion_vectors.set_title('Motion Vectors - Движение ключевых точек') # self.ax_motion_vectors.set_title('Motion Vectors - Движение ключевых точек')
self.ax_motion_vectors.axis('off') # self.ax_motion_vectors.axis('off')
# Визуализация движения ключевых точек на основе матрицы гомографии # Визуализация движения ключевых точек на основе матрицы гомографии
self.ax_motion_gomography = self.fig.add_subplot(gs[0, 1]) self.ax_motion_gomography = self.fig.add_subplot(gs[0, 1])
@@ -157,7 +159,7 @@ class VisualizationManager:
# Рисуем текущую целевую точку # Рисуем текущую целевую точку
if self.target_idx < len(self.target_pts): if self.target_idx < len(self.target_pts):
pt = self.target_pts[self.target_idx] pt = self.target_pts[self.target_idx]
self.ax_global_map.plot(pt[0], pt[1], 'yo', markersize=8, label='Цель (0, 0)') self.ax_global_map.plot(pt[0], pt[1], 'yo', markersize=8, label='Цель')
self.ax_global_map.legend() self.ax_global_map.legend()
@@ -208,7 +210,37 @@ class VisualizationManager:
self.ax_error_plot.grid(True, alpha=0.3) self.ax_error_plot.grid(True, alpha=0.3)
if len(self.error_times) > 1: if len(self.error_times) > 1:
self.ax_error_plot.plot(self.error_times, self.position_errors, 'b-', linewidth=2) # Оригинальный график (более прозрачный)
self.ax_error_plot.plot(self.error_times, self.position_errors, 'b-',
linewidth=1, alpha=0.4, label='Погрешность данных')
if len(self.error_times) > 5:
# Сглаженный график
smoothed_times = np.linspace(self.error_times[0], self.error_times[-1], 300)
spl = make_interp_spline(self.error_times, self.position_errors, k=3)
smoothed_errors = spl(smoothed_times)
self.ax_error_plot.plot(smoothed_times, smoothed_errors, 'orange',
linewidth=2, label='Сглаженный тренд')
# if len(self.position_errors) > 5: # Достаточно данных для сглаживания
# window_size = min(11, len(self.position_errors) // 3) # Адаптивный размер окна
# if window_size % 2 == 0: # Должен быть нечетным
# window_size += 1
# # Метод скользящего среднего
# smoothed_errors = np.convolve(
# self.position_errors,
# np.ones(window_size) / window_size,
# mode='valid'
# )
# # Корректируем временную ось для сглаженных данных
# offset = (window_size - 1) // 2
# smoothed_times = self.error_times[offset:offset + len(smoothed_errors)]
self.ax_error_plot.legend(loc='upper right')
# Автоматически масштабируем оси # Автоматически масштабируем оси
if len(self.position_errors) > 0: if len(self.position_errors) > 0:
@@ -219,6 +251,7 @@ class VisualizationManager:
else: else:
self.ax_error_plot.set_ylim(0, 1) self.ax_error_plot.set_ylim(0, 1)
def update_matches(self, img1: np.ndarray, img2: np.ndarray, def update_matches(self, img1: np.ndarray, img2: np.ndarray,
kp1, kp2, matches, transformation_info=None): kp1, kp2, matches, transformation_info=None):
"""Обновляет визуализацию сопоставления точек""" """Обновляет визуализацию сопоставления точек"""

View File

@@ -1,16 +1,17 @@
import math
from io import BytesIO
from time import sleep
import os
import cv2
import numpy as np
from PIL import Image from PIL import Image
from io import BytesIO
from selenium.webdriver.common.actions.wheel_input import ScrollOrigin from selenium.webdriver.common.actions.wheel_input import ScrollOrigin
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import constants
import cv2
import math
import numpy as np
import os
import utility
def generateURL(lat: float, lon: float, zoom: int): def generateURL(lat: float, lon: float, zoom: int):
return f"https://yandex.ru/maps/43/kazan/?l=sat&ll={lat}%2C{lon}&z={zoom}" return f"https://yandex.ru/maps/43/kazan/?l=sat&ll={lat}%2C{lon}&z={zoom}"
@@ -24,6 +25,7 @@ class YandexMap:
self.initial_lat = initial_lat self.initial_lat = initial_lat
self.initial_lon = initial_lon self.initial_lon = initial_lon
self.initial_zoom = initial_zoom self.initial_zoom = initial_zoom
self.pixel_ratio = constants.YANDEX_PIXEL_RATIO[self.initial_zoom]
options = webdriver.ChromeOptions() options = webdriver.ChromeOptions()
# options.add_experimental_option("detach", True) # options.add_experimental_option("detach", True)
@@ -32,9 +34,8 @@ class YandexMap:
self.driver.maximize_window() self.driver.maximize_window()
sleep(2) sleep(2)
action = ActionChains(self.driver)
# Закрытие левой панели # Закрытие левой панели
action = ActionChains(self.driver)
action.click(self.driver.find_element(By.CLASS_NAME, 'sidebar-toggle-button')) action.click(self.driver.find_element(By.CLASS_NAME, 'sidebar-toggle-button'))
action.perform() action.perform()
@@ -43,8 +44,25 @@ class YandexMap:
self.driver.execute_script("arguments[0].remove();", self.driver.find_element(By.XPATH, "//nav[@class='map-controls']")) self.driver.execute_script("arguments[0].remove();", self.driver.find_element(By.XPATH, "//nav[@class='map-controls']"))
self.driver.execute_script("arguments[0].remove();", self.driver.find_element(By.XPATH, "//footer")) self.driver.execute_script("arguments[0].remove();", self.driver.find_element(By.XPATH, "//footer"))
self.move(39, -9)
sleep(0.2) sleep(0.2)
def open(self, lat, lon, zoom):
self.initial_lat = lat
self.initial_lon = lon
self.initial_zoom = zoom
self.pixel_ratio = constants.YANDEX_PIXEL_RATIO[self.initial_zoom]
self.driver.get(generateURL(lat, lon, zoom))
sleep(2)
# Закрытие левой панели
action = ActionChains(self.driver)
action.click(self.driver.find_element(By.CLASS_NAME, 'sidebar-toggle-button'))
action.perform()
self.move(39, -9)
def save_photo(self, filename: str) -> bytes: def save_photo(self, filename: str) -> bytes:
return self.driver.save_screenshot(filename) return self.driver.save_screenshot(filename)
@@ -68,51 +86,26 @@ class YandexMap:
if i != count - 1: if i != count - 1:
sleep(0.25) sleep(0.25)
def make_as_center(self, x: float, y: float): def move(self, dx: float, dy: float):
html = self.driver.find_element(By.TAG_NAME, 'html') html = self.driver.find_element(By.TAG_NAME, 'html')
action = ActionChains(self.driver) action = ActionChains(self.driver)
action.move_to_element_with_offset(html, 0, 0) action.move_to_element_with_offset(html, 0, 0)
action.click_and_hold() action.click_and_hold()
dx = (x - 0.5) * self.get_size()[0]
dy = (0.5 - y) * self.get_size()[1]
print(dx, dy)
action.move_by_offset(-dx, dy) action.move_by_offset(-dx, dy)
action.release() action.release()
action.perform() action.perform()
def make_screenshot(self, x: float, y: float, width: float, height: float) -> cv2.typing.MatLike:
# Сохраняем скриншот def make_as_center(self, x: float, y: float):
self.save_photo("temp.png") dx = (x - 0.5) * self.get_size()[0]
dy = (0.5 - y) * self.get_size()[1]
# Загружаем изображение self.move(dx, dy)
image = cv2.imread("temp.png")
def make_screenshot(self) -> Image.Image:
if image is None: png = self.driver.get_screenshot_as_png()
raise ValueError("Не удалось загрузить изображение temp.png") im = Image.open(BytesIO(png))
return utility.cv2_to_pil(np.array(im)[:, :])
# Получаем размеры исходного изображения
img_height, img_width = image.shape[:2] def get_geolocation(self):
current_url = self.driver.current_url
# Преобразуем относительные координаты в абсолютные пиксели return utility.parse_yandex_maps_url(current_url)
center_x = int(x * img_width)
center_y = int(y * img_height)
crop_width = int(width * img_width)
crop_height = int(height * img_height)
# Вычисляем координаты прямоугольника для кадрирования
x1 = max(0, center_x - crop_width // 2)
y1 = max(0, center_y - crop_height // 2)
x2 = min(img_width, center_x + crop_width // 2)
y2 = min(img_height, center_y + crop_height // 2)
# Проверяем, что прямоугольник имеет положительные размеры
if x2 <= x1 or y2 <= y1:
raise ValueError("Некорректные размеры для кадрирования")
# Кадрируем изображение
cropped_image = image[y1:y2, x1:x2]
# Если нужно вернуть изображение как результат функции:
return cropped_image