Files
autopilot/models/SiaN-similarity/dataloader.py
2026-04-04 17:49:31 +03:00

275 lines
9.0 KiB
Python

config = {
"learning_rate": 2e-4,
"beta1": 0.5,
"beta2": 0.999,
"batch_size": 32,
"epochs": 100,
"gan_mode": "vanilla",
"lambda_L1": 100.0,
"grad_clip": 1.0,
"early_stopping_patience": 20,
"output_dir": "runs/gan_training",
"log_interval": 10,
"save_interval": 5,
"data_dir": r"C:\Users\admin\Projects\autopilot\datasets\ya_go_maps\images",
"image_size": [256, 256],
"train_split": 0.8,
"num_workers": 0,
}
import os
from typing import Dict, List, Tuple
import torch
from PIL import Image
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import os
import random
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
import torch
from PIL import Image
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms
class YaGoDataset(Dataset):
def __init__(
self,
root_dir: str,
transform=None,
augment: bool = True,
max_samples: Optional[int] = None,
image_size: Tuple[int, int] = (700, 700),
cache_homographies: bool = True,
device=None,
):
self.root_dir = root_dir
self.transform = transform
self.augment = augment
self.image_size = image_size
self.cache_homographies = cache_homographies
self.device = device
self.image_pairs = self._discover_image_pairs()
if max_samples is not None:
self.image_pairs = self.image_pairs[:max_samples]
def _discover_image_pairs(self) -> List[Dict[str, Any]]:
image_pairs = []
google_files = [f for f in os.listdir(self.root_dir) if f.endswith("_google.png")]
for google_file in sorted(google_files):
idx_str = google_file.split("_")[0]
try:
idx = int(idx_str)
except ValueError:
continue
yandex_file = f"{idx:04d}_yandex.png"
yandex_path = os.path.join(self.root_dir, yandex_file)
if os.path.exists(yandex_path):
image_pairs.append({
"idx": idx,
"google_path": os.path.join(self.root_dir, google_file),
"yandex_path": yandex_path,
})
return image_pairs
def __len__(self) -> int:
return len(self.image_pairs)
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
pair_info = self.image_pairs[idx]
google_path = pair_info["google_path"]
yandex_path = pair_info["yandex_path"]
same_domain = True
if np.random.rand() > 0.5:
random_idx = np.random.randint(0, len(self))
google_path = self.image_pairs[random_idx]["google_path"]
same_domain = random_idx == idx
yandex_img = Image.open(yandex_path).convert("RGB")
google_img = Image.open(google_path).convert("RGB")
google_img = google_img.resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
yandex_img = yandex_img.resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
matrices = self._get_homography_matrix(pair_info["idx"])
if self.augment:
google_img, yandex_img, homography_matrix = self._apply_augmentation(
google_img, yandex_img, matrices
)
homography_tensor = torch.from_numpy(homography_matrix).float()
else:
homography_tensor = torch.from_numpy(np.eye(3))
if self.transform:
google_img = self.transform(google_img)
yandex_img = self.transform(yandex_img)
else:
google_img = torch.from_numpy(np.array(google_img)).float().permute(2, 0, 1) / 255.0
yandex_img = torch.from_numpy(np.array(yandex_img)).float().permute(2, 0, 1) / 255.0
return {
"google_img": google_img,
"yandex_img": yandex_img,
"homography": homography_tensor,
"same_domain": same_domain,
"idx": torch.tensor(pair_info["idx"], dtype=torch.long),
}
def _get_homography_matrix(self, idx: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
homography_matrix_1 = self.generate_random_homography()
homography_matrix_2 = self.generate_random_homography()
homography_matrix_r = np.linalg.inv(homography_matrix_1) @ homography_matrix_2
return (homography_matrix_1, homography_matrix_2, homography_matrix_r)
def generate_random_homography(self) -> np.ndarray:
scale = np.random.uniform(0.8, 1.2)
tx = np.random.uniform(-0.50, 0.50)
ty = np.random.uniform(-0.50, 0.50)
angle_x = np.random.uniform(np.radians(-10), np.radians(10))
angle_y = np.random.uniform(np.radians(-10), np.radians(10))
angle_z = np.random.uniform(np.radians(-10), np.radians(10))
cy, sy = np.cos(angle_z), np.sin(angle_z)
cp, sp = np.cos(angle_y), np.sin(angle_y)
cr, sr = np.cos(angle_x), np.sin(angle_x)
Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]])
Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]])
Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]])
T = np.array([[1, 0, tx], [0, 1, ty], [0, 0, scale]])
K = self.get_camera_matrix()
return K @ Rx @ Ry @ Rz @ T @ np.linalg.inv(K)
def get_camera_matrix(self) -> np.ndarray:
w, h = config["image_size"]
return np.array([[w / 2, 0, w / 2], [0, h / 2, h / 2], [0, 0, 1]])
def _apply_augmentation(
self,
google_img: Image.Image,
yandex_img: Image.Image,
matrices: Tuple[np.ndarray, np.ndarray, np.ndarray],
) -> Tuple[Image.Image, Image.Image, np.ndarray]:
combined_homography = matrices[2]
yandex_aug = self._apply_homography_to_image(yandex_img, matrices[0])
google_aug = self._apply_homography_to_image(google_img, matrices[1])
print("F", combined_homography, np.linalg.inv(matrices[0]) @ matrices[1])
return google_aug, yandex_aug, combined_homography
def _apply_homography_to_image(
self, img: Image.Image, homography: np.ndarray
) -> Image.Image:
img_np = np.array(img)
h, w = img_np.shape[:2]
transformed = cv2.warpPerspective(
img_np, homography, (w, h), flags=cv2.INTER_LINEAR
)
return Image.fromarray(transformed)
def create_data_loaders(
root_dir: str,
batch_size: int = 32,
train_split: float = 0.8,
num_workers: int = 4,
image_size: Tuple[int, int] = (256, 256),
augment_train: bool = True,
augment_val: bool = False,
device=None,
) -> Tuple[DataLoader, DataLoader]:
transform = transforms.Compose([
transforms.ToTensor(),
# transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
full_dataset = YaGoDataset(
root_dir=root_dir,
transform=transform,
augment=False,
image_size=image_size,
cache_homographies=True,
device=device,
)
aug_dataset = YaGoDataset(
root_dir=root_dir,
transform=transform,
augment=True,
image_size=image_size,
cache_homographies=False,
device=device,
)
dataset_size = len(full_dataset)
train_size = int(train_split * dataset_size)
val_size = dataset_size - train_size
indices = list(range(dataset_size))
random.shuffle(indices)
train_indices = indices[:train_size]
val_indices = indices[train_size:]
train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, val_indices)
if augment_train:
train_dataset = Subset(aug_dataset, train_indices)
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
return train_loader, val_loader
# Example usage
dataset = YaGoDataset(
root_dir=config["data_dir"],
augment=True,
image_size=(256, 256),
)
print(f"Dataset size: {len(dataset)}")
# Get a sample
sample = dataset[0]
print(f"Sample keys: {list(sample.keys())}")
print(f"Google image shape: {sample['google_img'].shape}")
print(f"Yandex image shape: {sample['yandex_img'].shape}")
print(f"Homography shape: {sample['homography'].shape}")
# Create data loaders
train_loader, val_loader = create_data_loaders(
root_dir=config["data_dir"],
batch_size=16,
train_split=0.8,
)
print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")