feat: working model

This commit is contained in:
2026-04-04 20:26:56 +03:00
parent 4b398f6c9a
commit 703ea8dbaf
6 changed files with 754 additions and 579 deletions

58
models/SiaN/analyze.py Normal file
View File

@@ -0,0 +1,58 @@
import torch
import numpy as np
import matplotlib.pyplot as plt
from torchvision.utils import make_grid
def analyze_training(trainer):
print("=== Training Analysis ===\n")
if trainer.writer:
print("TensorBoard logs available at:", trainer.writer.log_dir)
print(f"\nBest val loss: {trainer.best_val_loss:.4f}")
trainer.model.eval()
with torch.no_grad():
batch = next(iter(trainer.val_loader))
google_img = batch["google_img"].to(trainer.device)
yandex_img = batch["yandex_img"].to(trainer.device)
target_params = batch["homography_params"].to(trainer.device)
pred_params = trainer.model(google_img, yandex_img)
print(f"\nSample predictions (first 3 of batch):")
print(f"{'Param':<8} {'Target':>12} {'Predicted':>12} {'Error':>12}")
print("-" * 46)
names = ["rx", "ry", "rz", "tx", "ty", "scale"]
for i in range(6):
t = target_params[0, i].item()
p = pred_params[0, i].item()
print(f"{names[i]:<8} {t:>12.4f} {p:>12.4f} {abs(t-p):>12.4f}")
print(f"\nBatch mean abs error: {torch.mean(torch.abs(pred_params - target_params)).item():.4f}")
print("\n=== Visualization ===")
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
img1 = google_img[0].cpu()
img2 = yandex_img[0].cpu()
axes[0].imshow(img1.permute(1, 2, 0))
axes[0].set_title("Google")
axes[0].axis("off")
axes[1].imshow(img2.permute(1, 2, 0))
axes[1].set_title("Yandex")
axes[1].axis("off")
axes[2].bar(names, pred_params[0].cpu().numpy())
axes[2].set_title("Predicted params")
axes[2].axhline(y=0, color="k", lw=0.5)
plt.tight_layout()
plt.savefig("prediction_sample.png")
print("Saved prediction_sample.png")
plt.show()
return {"best_val_loss": trainer.best_val_loss}
if __name__ == "__main__":
from train import trainer
analyze_training(trainer)

View File

@@ -1,34 +1,6 @@
config = {
"learning_rate": 2e-4,
"beta1": 0.5,
"beta2": 0.999,
"batch_size": 32,
"epochs": 100,
"gan_mode": "vanilla",
"lambda_L1": 100.0,
"grad_clip": 1.0,
"early_stopping_patience": 20,
"output_dir": "runs/gan_training",
"log_interval": 10,
"save_interval": 5,
"data_dir": r"C:\Users\admin\Projects\autopilot\datasets\ya_go_maps\images",
"image_size": [256, 256],
"train_split": 0.8,
"num_workers": 0,
}
import os
from typing import Dict, List, Tuple
import torch
from PIL import Image
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import os
import random
from typing import Any, Dict, List, Optional, Tuple
from typing import Tuple
import cv2
import numpy as np
@@ -37,238 +9,84 @@ from PIL import Image
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms
from utils import config, get_camera_matrix, generate_random_homography_params, homography_params_to_matrix, matrix_to_homography_params
class YaGoDataset(Dataset):
def __init__(
self,
root_dir: str,
transform=None,
augment: bool = True,
max_samples: Optional[int] = None,
image_size: Tuple[int, int] = (700, 700),
cache_homographies: bool = True,
device=None,
):
def __init__(self, root_dir: str, transform=None, augment: bool = True,
image_size: Tuple[int, int] = (256, 256)):
self.root_dir = root_dir
self.transform = transform
self.augment = augment
self.image_size = image_size
self.cache_homographies = cache_homographies
self.device = device
self.K = get_camera_matrix(image_size[1], image_size[0])
self.image_pairs = self._discover_image_pairs()
if max_samples is not None:
self.image_pairs = self.image_pairs[:max_samples]
def _discover_image_pairs(self) -> List[Dict[str, Any]]:
image_pairs = []
google_files = [f for f in os.listdir(self.root_dir) if f.endswith("_google.png")]
for google_file in sorted(google_files):
idx_str = google_file.split("_")[0]
try:
idx = int(idx_str)
except ValueError:
continue
yandex_file = f"{idx:04d}_yandex.png"
yandex_path = os.path.join(self.root_dir, yandex_file)
if os.path.exists(yandex_path):
image_pairs.append({
"idx": idx,
"google_path": os.path.join(self.root_dir, google_file),
"yandex_path": yandex_path,
})
return image_pairs
def _discover_image_pairs(self):
pairs = []
for f in os.listdir(self.root_dir):
if f.endswith("_google.png"):
idx = f.split("_")[0]
yandex_path = os.path.join(self.root_dir, f"{idx}_yandex.png")
if os.path.exists(yandex_path):
pairs.append({"idx": int(idx), "google": os.path.join(self.root_dir, f), "yandex": yandex_path})
return sorted(pairs, key=lambda x: x["idx"])
def __len__(self) -> int:
def __len__(self):
return len(self.image_pairs)
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
pair_info = self.image_pairs[idx]
google_path = pair_info["google_path"]
yandex_path = pair_info["yandex_path"]
same_domain = True
if np.random.rand() > 0.5:
random_idx = np.random.randint(0, len(self))
google_path = self.image_pairs[random_idx]["google_path"]
same_domain = random_idx == idx
def __getitem__(self, idx):
pair = self.image_pairs[idx]
google_img = Image.open(pair["google"]).convert("RGB").resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
yandex_img = Image.open(pair["yandex"]).convert("RGB").resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
yandex_img = Image.open(yandex_path).convert("RGB")
google_img = Image.open(google_path).convert("RGB")
google_img = google_img.resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
yandex_img = yandex_img.resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)
matrices = self._get_homography_matrix(pair_info["idx"])
if self.augment:
google_img, yandex_img, homography_matrix = self._apply_augmentation(
google_img, yandex_img, matrices
)
homography_tensor = torch.from_numpy(homography_matrix).float()
params1 = generate_random_homography_params()
params2 = generate_random_homography_params()
H1 = homography_params_to_matrix(params1, self.K)
H2 = homography_params_to_matrix(params2, self.K)
H_combined = np.linalg.inv(H1) @ H2
yandex_img = Image.fromarray(cv2.warpPerspective(np.array(yandex_img), H1, self.image_size))
google_img = Image.fromarray(cv2.warpPerspective(np.array(google_img), H2, self.image_size))
target_params = matrix_to_homography_params(H_combined, self.K)
target_matrix = H_combined
else:
homography_tensor = torch.from_numpy(np.eye(3))
target_params = np.zeros(6, dtype=np.float32)
target_matrix = np.eye(3, dtype=np.float32)
if self.transform:
google_img = self.transform(google_img)
yandex_img = self.transform(yandex_img)
else:
google_img = torch.from_numpy(np.array(google_img)).float().permute(2, 0, 1) / 255.0
yandex_img = torch.from_numpy(np.array(yandex_img)).float().permute(2, 0, 1) / 255.0
return {
"google_img": google_img,
"yandex_img": yandex_img,
"homography": homography_tensor,
"same_domain": same_domain,
"idx": torch.tensor(pair_info["idx"], dtype=torch.long),
"homography_matrix": torch.from_numpy(target_matrix).float(),
"homography_params": torch.from_numpy(target_params).float(),
}
def _get_homography_matrix(self, idx: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
homography_matrix_1 = self.generate_random_homography()
homography_matrix_2 = self.generate_random_homography()
homography_matrix_r = np.linalg.inv(homography_matrix_1) @ homography_matrix_2
return (homography_matrix_1, homography_matrix_2, homography_matrix_r)
def generate_random_homography(self) -> np.ndarray:
scale = np.random.uniform(0.8, 1.2)
tx = np.random.uniform(-0.50, 0.50)
ty = np.random.uniform(-0.50, 0.50)
angle_x = np.random.uniform(np.radians(-10), np.radians(10))
angle_y = np.random.uniform(np.radians(-10), np.radians(10))
angle_z = np.random.uniform(np.radians(-10), np.radians(10))
cy, sy = np.cos(angle_z), np.sin(angle_z)
cp, sp = np.cos(angle_y), np.sin(angle_y)
cr, sr = np.cos(angle_x), np.sin(angle_x)
Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]])
Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]])
Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]])
T = np.array([[1, 0, tx], [0, 1, ty], [0, 0, scale]])
K = self.get_camera_matrix()
return K @ Rx @ Ry @ Rz @ T @ np.linalg.inv(K)
def get_camera_matrix(self) -> np.ndarray:
w, h = config["image_size"]
return np.array([[w / 2, 0, w / 2], [0, h / 2, h / 2], [0, 0, 1]])
def _apply_augmentation(
self,
google_img: Image.Image,
yandex_img: Image.Image,
matrices: Tuple[np.ndarray, np.ndarray, np.ndarray],
) -> Tuple[Image.Image, Image.Image, np.ndarray]:
combined_homography = matrices[2]
yandex_aug = self._apply_homography_to_image(yandex_img, matrices[0])
google_aug = self._apply_homography_to_image(google_img, matrices[1])
print("F", combined_homography, np.linalg.inv(matrices[0]) @ matrices[1])
return google_aug, yandex_aug, combined_homography
def _apply_homography_to_image(
self, img: Image.Image, homography: np.ndarray
) -> Image.Image:
img_np = np.array(img)
h, w = img_np.shape[:2]
transformed = cv2.warpPerspective(
img_np, homography, (w, h), flags=cv2.INTER_LINEAR
)
return Image.fromarray(transformed)
def create_data_loaders(
root_dir: str,
batch_size: int = 32,
train_split: float = 0.8,
num_workers: int = 4,
image_size: Tuple[int, int] = (256, 256),
augment_train: bool = True,
augment_val: bool = False,
device=None,
) -> Tuple[DataLoader, DataLoader]:
transform = transforms.Compose([
transforms.ToTensor(),
# transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def create_data_loaders(root_dir, batch_size=32, train_split=0.8, num_workers=0,
image_size=(256, 256), augment_train=True):
transform = transforms.Compose([transforms.ToTensor()])
full_dataset = YaGoDataset(
root_dir=root_dir,
transform=transform,
augment=False,
image_size=image_size,
cache_homographies=True,
device=device,
)
aug_dataset = YaGoDataset(
root_dir=root_dir,
transform=transform,
augment=True,
image_size=image_size,
cache_homographies=False,
device=device,
)
dataset_size = len(full_dataset)
train_size = int(train_split * dataset_size)
val_size = dataset_size - train_size
indices = list(range(dataset_size))
full_ds = YaGoDataset(root_dir, transform=transform, augment=False, image_size=image_size)
aug_ds = YaGoDataset(root_dir, transform=transform, augment=True, image_size=image_size)
indices = list(range(len(full_ds)))
random.shuffle(indices)
train_indices = indices[:train_size]
val_indices = indices[train_size:]
split = int(train_split * len(indices))
train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, val_indices)
if augment_train:
train_dataset = Subset(aug_dataset, train_indices)
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
return train_loader, val_loader
train_ds = Subset(aug_ds if augment_train else full_ds, indices[:split])
val_ds = Subset(full_ds, indices[split:])
return (DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True),
DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True))
# Example usage
dataset = YaGoDataset(
root_dir=config["data_dir"],
augment=True,
image_size=(256, 256),
)
print(f"Dataset size: {len(dataset)}")
# Get a sample
sample = dataset[0]
print(f"Sample keys: {list(sample.keys())}")
print(f"Google image shape: {sample['google_img'].shape}")
print(f"Yandex image shape: {sample['yandex_img'].shape}")
print(f"Homography shape: {sample['homography'].shape}")
# Create data loaders
train_loader, val_loader = create_data_loaders(
root_dir=config["data_dir"],
batch_size=16,
train_split=0.8,
)
print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
if __name__ == "__main__":
ds = YaGoDataset(config["data_dir"], augment=True, image_size=config["image_size"])
print(f"Dataset size: {len(ds)}")
s = ds[0]
print(f"Keys: {list(s.keys())}")
print(f"Params: {s['homography_params'].numpy()}")

View File

@@ -1,152 +1,45 @@
from typing import Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
class HomographyCNN(nn.Module):
"""
Model for estimating homography matrix (3x3) between two images.
"""
def __init__(
self,
input_channels: int = 3,
backbone_name: str = "resnet18",
pretrained: bool = True,
dropout_rate: float = 0.3,
use_batch_norm: bool = True,
):
class HomographyCNN6(nn.Module):
def __init__(self, input_channels=3, backbone_name="resnet18", pretrained=True, dropout_rate=0.3):
super().__init__()
self.input_channels = input_channels
self.backbone_name = backbone_name
self.pretrained = pretrained
self.dropout_rate = dropout_rate
self.use_batch_norm = use_batch_norm
backbone = self._create_backbone(backbone_name, pretrained)
backbone = getattr(models, backbone_name)(weights=models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None)
self.feature_dim = backbone.fc.in_features
backbone.fc = nn.Identity()
self.backbone = backbone
compare_input_dim = self.feature_dim * 4
layers = [
nn.Linear(compare_input_dim, 512),
nn.BatchNorm1d(512) if use_batch_norm else nn.Identity(),
self.head = nn.Sequential(
nn.Linear(self.feature_dim * 4, 512),
nn.ReLU(inplace=True),
nn.Dropout(dropout_rate),
nn.Linear(512, 256),
nn.BatchNorm1d(256) if use_batch_norm else nn.Identity(),
nn.ReLU(inplace=True),
nn.Dropout(dropout_rate),
nn.Linear(256, 6),
)
nn.Linear(256, 9),
]
self.head = nn.Sequential(*layers)
def _create_backbone(self, name: str, pretrained: bool) -> nn.Module:
name = name.lower()
if name == "resnet18":
model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None)
elif name == "resnet34":
model = models.resnet34(weights=models.ResNet34_Weights.IMAGENET1K_V1 if pretrained else None)
else:
raise ValueError(f"Unsupported backbone: {name}")
if self.input_channels != 3:
old_conv = model.conv1
model.conv1 = nn.Conv2d(
self.input_channels,
old_conv.out_channels,
kernel_size=old_conv.kernel_size,
stride=old_conv.stride,
padding=old_conv.padding,
bias=old_conv.bias is not None,
)
return model
def _extract_features(self, x: torch.Tensor) -> torch.Tensor:
return self.backbone(x)
def forward(self, img1: torch.Tensor, img2: torch.Tensor) -> torch.Tensor:
f1 = self._extract_features(img1)
f2 = self._extract_features(img2)
diff = torch.abs(f1 - f2)
prod = f1 * f2
combined = torch.cat([f1, f2, diff, prod], dim=1)
h = self.head(combined)
h = h.view(-1, 3, 3)
return h
def predict_homography(self, img1: torch.Tensor, img2: torch.Tensor) -> torch.Tensor:
was_training = self.training
self.eval()
with torch.no_grad():
h = self.forward(img1, img2)
if was_training:
self.train()
return h
def forward(self, img1, img2):
f1 = self.backbone(img1)
f2 = self.backbone(img2)
combined = torch.cat([f1, f2, torch.abs(f1 - f2), f1 * f2], dim=1)
return self.head(combined)
class HomographyLoss(nn.Module):
class HomographyLoss6(nn.Module):
def __init__(self):
super().__init__()
self.criterion = nn.MSELoss()
def forward(self, pred_homography: torch.Tensor, target_homography: torch.Tensor) -> torch.Tensor:
return self.criterion(pred_homography, target_homography)
def create_homography_model(
model_type: str = "backbone",
input_size: Tuple[int, int] = (256, 256),
**kwargs,
) -> nn.Module:
if model_type == "backbone":
return HomographyCNN(**kwargs)
else:
raise ValueError(f"Unknown model type: {model_type}")
def forward(self, pred, target):
return self.criterion(pred, target)
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model = HomographyCNN(
input_channels=3,
backbone_name="resnet18",
pretrained=True,
dropout_rate=0.3,
use_batch_norm=True,
).to(device)
print(f"Model created with {sum(p.numel() for p in model.parameters()):,} parameters")
batch_size = 4
height, width = 256, 256
img1 = torch.randn(batch_size, 3, height, width).to(device)
img2 = torch.randn(batch_size, 3, height, width).to(device)
print("\nTesting forward pass...")
output = model(img1, img2)
print(f"Output shape: {output.shape}")
print("\nTesting prediction...")
pred = model.predict_homography(img1, img2)
print(f"Prediction shape: {pred.shape}")
print("\nTesting loss function...")
target = torch.eye(3).unsqueeze(0).expand(batch_size, -1, -1).to(device)
loss_fn = HomographyLoss().to(device)
loss = loss_fn(output, target)
print(f"Loss value: {loss.item():.6f}")
print("\nAll tests completed successfully!")
model = HomographyCNN6()
img1 = torch.randn(2, 3, 256, 256)
img2 = torch.randn(2, 3, 256, 256)
out = model(img1, img2)
print(f"Output shape: {out.shape}, mean: {out.mean():.3f}")

File diff suppressed because one or more lines are too long

View File

@@ -1,212 +1,105 @@
import os
import time
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from dataloader import config, create_data_loaders
from model import HomographyCNN, HomographyLoss, create_homography_model
from torch.utils.data import DataLoader
from dataloader import create_data_loaders
from model import HomographyCNN6, HomographyLoss6
from utils import config
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
class HomographyTrainer:
def __init__(
self,
model: nn.Module,
train_loader: DataLoader,
val_loader: DataLoader,
device: torch.device,
config: dict,
):
def __init__(self, model, train_loader, val_loader, device):
self.model = model.to(device)
self.train_loader = train_loader
self.val_loader = val_loader
self.device = device
self.config = config
self.criterion = HomographyLoss()
self.optimizer = optim.Adam(
model.parameters(),
lr=config.get("learning_rate", 2e-4),
betas=(config.get("beta1", 0.5), config.get("beta2", 0.999)),
)
self.criterion = HomographyLoss6()
self.optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
self.writer = None
self.best_val_loss = float("inf")
self.epochs_without_improvement = 0
def train_epoch(self, epoch: int) -> dict:
def train_epoch(self, epoch):
self.model.train()
total_loss = 0
total_samples = 0
total_loss, total_samples = 0, 0
pbar = tqdm(self.train_loader, desc=f"Epoch {epoch}")
for batch_idx, batch in enumerate(pbar):
google_img = batch["google_img"].to(self.device)
yandex_img = batch["yandex_img"].to(self.device)
target = batch["homography"].to(self.device)
target = batch["homography_params"].to(self.device)
self.optimizer.zero_grad()
output = self.model(google_img, yandex_img)
loss = self.criterion(output, target)
loss.backward()
self.optimizer.step()
total_loss += loss.item() * google_img.size(0)
total_samples += google_img.size(0)
pbar.set_postfix({"loss": loss.item()})
if batch_idx % self.config.get("log_interval", 10) == 0:
pbar.set_postfix({"loss": loss.item()})
return {"loss": total_loss / total_samples}
if self.writer:
self.writer.add_scalar(
"train/loss",
loss.item(),
epoch * len(self.train_loader) + batch_idx,
)
avg_loss = total_loss / total_samples
return {"loss": avg_loss}
def validate(self) -> dict:
def validate(self):
self.model.eval()
total_loss = 0
total_samples = 0
total_loss, total_samples = 0, 0
with torch.no_grad():
for batch in tqdm(self.val_loader, desc="Validation"):
google_img = batch["google_img"].to(self.device)
yandex_img = batch["yandex_img"].to(self.device)
target = batch["homography"].to(self.device)
target = batch["homography_params"].to(self.device)
output = self.model(google_img, yandex_img)
loss = self.criterion(output, target)
total_loss += loss.item() * google_img.size(0)
total_samples += google_img.size(0)
return {"loss": total_loss / total_samples}
avg_loss = total_loss / total_samples
return {"loss": avg_loss}
def train(self, num_epochs: int):
log_dir = self.config.get("output_dir", "runs/homography")
def train(self, num_epochs):
log_dir = config["output_dir"]
os.makedirs(log_dir, exist_ok=True)
self.writer = SummaryWriter(log_dir)
print(f"Starting training for {num_epochs} epochs")
print(f"Logging to: {log_dir}")
for epoch in range(1, num_epochs + 1):
print(f"\nEpoch {epoch}/{num_epochs}")
train_metrics = self.train_epoch(epoch)
val_metrics = self.validate()
print(f"Train Loss: {train_metrics['loss']:.4f}")
print(f"Val Loss: {val_metrics['loss']:.4f}")
if self.writer:
self.writer.add_scalar("epoch/train_loss", train_metrics["loss"], epoch)
self.writer.add_scalar("epoch/val_loss", val_metrics["loss"], epoch)
print(f"Train Loss: {train_metrics['loss']:.4f}, Val Loss: {val_metrics['loss']:.4f}")
if val_metrics["loss"] < self.best_val_loss:
self.best_val_loss = val_metrics["loss"]
self.epochs_without_improvement = 0
self.save_checkpoint(epoch, val_metrics["loss"], is_best=True)
print(f"New best model saved with val loss: {val_metrics['loss']:.4f}")
else:
self.epochs_without_improvement += 1
self.save_checkpoint(epoch, val_metrics["loss"], is_best=False)
patience = self.config.get("early_stopping_patience", 20)
if self.epochs_without_improvement >= patience:
print(f"Early stopping triggered after {patience} epochs without improvement")
break
self.save_checkpoint(epoch, is_best=True)
print(f"Best model saved (val loss: {val_metrics['loss']:.4f})")
self.writer.close()
def save_checkpoint(self, epoch: int, val_loss: float, is_best: bool = False):
checkpoint_dir = os.path.join(
self.config.get("output_dir", "runs/homography"), "checkpoints"
)
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint = {
"epoch": epoch,
"model_state_dict": self.model.state_dict(),
"optimizer_state_dict": self.optimizer.state_dict(),
"val_loss": val_loss,
"config": self.config,
}
checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")
torch.save(checkpoint, checkpoint_path)
def save_checkpoint(self, epoch, is_best=False):
ckpt_dir = os.path.join(config["output_dir"], "checkpoints")
os.makedirs(ckpt_dir, exist_ok=True)
ckpt = {"epoch": epoch, "model_state_dict": self.model.state_dict(), "val_loss": self.best_val_loss}
torch.save(ckpt, os.path.join(ckpt_dir, f"checkpoint_epoch_{epoch}.pt"))
if is_best:
best_path = os.path.join(checkpoint_dir, "best_model.pt")
torch.save(checkpoint, best_path)
def load_checkpoint(self, checkpoint_path: str):
checkpoint = torch.load(checkpoint_path, map_location=self.device)
self.model.load_state_dict(checkpoint["model_state_dict"])
self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
return checkpoint["epoch"], checkpoint["val_loss"]
torch.save(ckpt, os.path.join(ckpt_dir, "best_model.pt"))
def main():
config_dict = config.copy()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if isinstance(config_dict.get("image_size"), list):
config_dict["image_size"] = tuple(config_dict["image_size"])
train_loader, val_loader = create_data_loaders(
root_dir=config["data_dir"],
batch_size=config["batch_size"],
train_split=config["train_split"],
num_workers=config["num_workers"],
image_size=config["image_size"],
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
model = HomographyCNN6(
input_channels=3,
backbone_name=config["backbone"],
pretrained=True,
dropout_rate=config["dropout_rate"]
)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
print("Creating data loaders...")
train_loader, val_loader = create_data_loaders(
root_dir=config_dict["data_dir"],
batch_size=config_dict["batch_size"],
train_split=config_dict["train_split"],
num_workers=config_dict["num_workers"],
image_size=config_dict["image_size"],
augment_train=True,
augment_val=False,
device=device,
)
print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print("Creating model...")
model = create_homography_model(
model_type="backbone",
input_channels=3,
backbone_name="resnet18",
pretrained=True,
dropout_rate=0.3,
use_batch_norm=True,
)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
trainer = HomographyTrainer(
model=model,
train_loader=train_loader,
val_loader=val_loader,
device=device,
config=config_dict,
)
print("Starting training...")
trainer.train(config_dict["epochs"])
print("Training completed!")
if __name__ == "__main__":
main()
trainer = HomographyTrainer(model, train_loader, val_loader, device)
trainer.train(config["epochs"])

54
models/SiaN/utils.py Normal file
View File

@@ -0,0 +1,54 @@
import numpy as np
config = {
"data_dir": r"C:\Users\admin\Projects\autopilot\datasets\ya_go_maps\images",
"image_size": (256, 256),
"batch_size": 32,
"train_split": 0.8,
"num_workers": 0,
"epochs": 100,
"learning_rate": 2e-4,
"dropout_rate": 0.3,
"backbone": "resnet18",
"output_dir": r"C:\Users\admin\Projects\autopilot\models\SiaN\runs",
}
def get_camera_matrix(w, h):
return np.array([[w / 2, 0, w / 2], [0, h / 2, h / 2], [0, 0, 1]], dtype=np.float32)
def generate_random_homography_params(angle_range=10, translation_range=0.1, scale_range=(0.9, 1.1)):
scale = np.random.uniform(*scale_range)
tx = np.random.uniform(-translation_range, translation_range)
ty = np.random.uniform(-translation_range, translation_range)
rx = np.radians(np.random.uniform(-angle_range, angle_range))
ry = np.radians(np.random.uniform(-angle_range, angle_range))
rz = np.radians(np.random.uniform(-angle_range, angle_range))
return np.array([rx, ry, rz, tx, ty, scale])
def homography_params_to_matrix(params, K):
rx, ry, rz, tx, ty, scale = params
cy, sy = np.cos(rz), np.sin(rz)
cp, sp = np.cos(ry), np.sin(ry)
cr, sr = np.cos(rx), np.sin(rx)
Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float32)
Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float32)
Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float32)
T = np.array([[1, 0, tx], [0, 1, ty], [0, 0, scale]], dtype=np.float32)
return K @ Rx @ Ry @ Rz @ T @ np.linalg.inv(K)
def matrix_to_homography_params(H, K):
K_inv = np.linalg.inv(K)
E = K_inv @ H @ K
scale = np.sqrt(np.linalg.det(E[:2, :2]))
R = E[:2, :2] / scale
tx, ty = E[0, 2], E[1, 2]
rz = np.arctan2(R[1, 0], R[0, 0])
r20, r21 = E[2, 0], E[2, 1]
ry = np.arctan2(r20, r21)
rx = np.arctan2(-E[1, 2], E[1, 1])
return np.array([rx, ry, rz, tx, ty, scale], dtype=np.float32)