Files
autopilot/models/SiaN/notebook.gen.ipynb
2026-04-04 22:57:41 +03:00

482 lines
20 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import random\n",
"import logging\n",
"from typing import Tuple\n",
"import cv2\n",
"import numpy as np\n",
"import torch\n",
"import torch.nn as nn\n",
"import torch.optim as optim\n",
"import matplotlib.pyplot as plt\n",
"from PIL import Image\n",
"from torch.utils.data import DataLoader, Dataset, Subset\n",
"from torch.utils.tensorboard import SummaryWriter\n",
"from torchvision import transforms, models\n",
"from tqdm import tqdm\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"config = {\n",
" \"data_dir\": r\"C:\\Users\\admin\\Projects\\autopilot\\datasets\\ya_go_maps\\images\",\n",
" \"image_size\": (256, 256),\n",
" \"batch_size\": 32,\n",
" \"train_split\": 0.8,\n",
" \"num_workers\": 0,\n",
" \"epochs\": 100,\n",
" \"learning_rate\": 2e-4,\n",
" \"dropout_rate\": 0.3,\n",
" \"backbone\": \"resnet18\",\n",
" \"output_dir\": r\"C:\\Users\\admin\\Projects\\autopilot\\models\\SiaN\\runs\",\n",
" \"save_every_n_epochs\": 15,\n",
"}\n",
"\n",
"\n",
"def get_camera_matrix(w, h):\n",
" return np.array([[w / 2, 0, w / 2], [0, h / 2, h / 2], [0, 0, 1]], dtype=np.float32)\n",
"\n",
"\n",
"def generate_random_homography_params(angle_range=10, translation_range=0.1, scale_range=(0.9, 1.1)):\n",
" scale = np.random.uniform(*scale_range)\n",
" tx = np.random.uniform(-translation_range, translation_range)\n",
" ty = np.random.uniform(-translation_range, translation_range)\n",
" rx = np.radians(np.random.uniform(-angle_range, angle_range))\n",
" ry = np.radians(np.random.uniform(-angle_range, angle_range))\n",
" rz = np.radians(np.random.uniform(-angle_range, angle_range))\n",
" return np.array([rx, ry, rz, tx, ty, scale])\n",
"\n",
"\n",
"def homography_params_to_matrix(params, K):\n",
" rx, ry, rz, tx, ty, scale = params\n",
" cy, sy = np.cos(rz), np.sin(rz)\n",
" cp, sp = np.cos(ry), np.sin(ry)\n",
" cr, sr = np.cos(rx), np.sin(rx)\n",
" Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float32)\n",
" Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float32)\n",
" Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float32)\n",
" T = np.array([[1, 0, tx], [0, 1, ty], [0, 0, scale]], dtype=np.float32)\n",
" return K @ Rx @ Ry @ Rz @ T @ np.linalg.inv(K)\n",
"\n",
"\n",
"def matrix_to_homography_params(H, K):\n",
" K_inv = np.linalg.inv(K)\n",
" E = K_inv @ H @ K\n",
" scale = np.sqrt(np.linalg.det(E[:2, :2]))\n",
" R = E[:2, :2] / scale\n",
" tx, ty = E[0, 2], E[1, 2]\n",
" rz = np.arctan2(R[1, 0], R[0, 0])\n",
" r20, r21 = E[2, 0], E[2, 1]\n",
" ry = np.arctan2(r20, r21)\n",
" rx = np.arctan2(-E[1, 2], E[1, 1])\n",
" return np.array([rx, ry, rz, tx, ty, scale], dtype=np.float32)\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "# Configuration\n\nGlobal settings for:\n- Data paths and image parameters\n- Training hyperparameters\n- Model architecture options\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"\n",
"\n",
"class YaGoDataset(Dataset):\n",
" def __init__(self, root_dir: str, transform=None, augment: bool = True, \n",
" image_size: Tuple[int, int] = (256, 256)):\n",
" self.root_dir = root_dir\n",
" self.transform = transform\n",
" self.augment = augment\n",
" self.image_size = image_size\n",
" self.K = get_camera_matrix(image_size[1], image_size[0])\n",
" self.image_pairs = self._discover_image_pairs()\n",
"\n",
" def _discover_image_pairs(self):\n",
" pairs = []\n",
" for f in os.listdir(self.root_dir):\n",
" if f.endswith(\"_google.png\"):\n",
" idx = f.split(\"_\")[0]\n",
" yandex_path = os.path.join(self.root_dir, f\"{idx}_yandex.png\")\n",
" if os.path.exists(yandex_path):\n",
" pairs.append({\"idx\": int(idx), \"google\": os.path.join(self.root_dir, f), \"yandex\": yandex_path})\n",
" return sorted(pairs, key=lambda x: x[\"idx\"])\n",
"\n",
" def __len__(self):\n",
" return len(self.image_pairs)\n",
"\n",
" def __getitem__(self, idx):\n",
" pair = self.image_pairs[idx]\n",
" google_img = Image.open(pair[\"google\"]).convert(\"RGB\").resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)\n",
" yandex_img = Image.open(pair[\"yandex\"]).convert(\"RGB\").resize((self.image_size[1], self.image_size[0]), Image.BILINEAR)\n",
"\n",
" if self.augment:\n",
" params1 = generate_random_homography_params()\n",
" params2 = generate_random_homography_params()\n",
" H1 = homography_params_to_matrix(params1, self.K)\n",
" H2 = homography_params_to_matrix(params2, self.K)\n",
" H_combined = np.linalg.inv(H1) @ H2\n",
" yandex_img = Image.fromarray(cv2.warpPerspective(np.array(yandex_img), H1, self.image_size))\n",
" google_img = Image.fromarray(cv2.warpPerspective(np.array(google_img), H2, self.image_size))\n",
" target_params = matrix_to_homography_params(H_combined, self.K)\n",
" target_matrix = H_combined\n",
" else:\n",
" target_params = np.zeros(6, dtype=np.float32)\n",
" target_matrix = np.eye(3, dtype=np.float32)\n",
"\n",
" if self.transform:\n",
" google_img = self.transform(google_img)\n",
" yandex_img = self.transform(yandex_img)\n",
"\n",
" return {\n",
" \"google_img\": google_img,\n",
" \"yandex_img\": yandex_img,\n",
" \"homography_matrix\": torch.from_numpy(target_matrix).float(),\n",
" \"homography_params\": torch.from_numpy(target_params).float(),\n",
" }\n",
"\n",
"\n",
"def create_data_loaders(root_dir, batch_size=32, train_split=0.8, num_workers=0, \n",
" image_size=(256, 256), augment_train=True):\n",
" transform = transforms.Compose([transforms.ToTensor()])\n",
" \n",
" full_ds = YaGoDataset(root_dir, transform=transform, augment=False, image_size=image_size)\n",
" aug_ds = YaGoDataset(root_dir, transform=transform, augment=True, image_size=image_size)\n",
"\n",
" indices = list(range(len(full_ds)))\n",
" random.shuffle(indices)\n",
" split = int(train_split * len(indices))\n",
" \n",
" train_ds = Subset(aug_ds if augment_train else full_ds, indices[:split])\n",
" val_ds = Subset(full_ds, indices[split:])\n",
"\n",
" return (DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True),\n",
" DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True))\n",
"\n",
"\n",
"def get_dataset_info():\n",
" ds = YaGoDataset(config[\"data_dir\"], augment=True, image_size=config[\"image_size\"])\n",
" return {\n",
" \"size\": len(ds),\n",
" \"sample_keys\": list(ds[0].keys()),\n",
" \"sample_params\": ds[0][\"homography_params\"].numpy()\n",
" }\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## Dataset\n\nGoogle/Yandex image pair loader with homography augmentation.\n\n**Features:**\n- Loads paired images from dual camera sources\n- Applies random homography transformations\n- Supports configurable train/val split\n\n**Returns:**\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"class HomographyCNN6(nn.Module):\n",
" def __init__(self, input_channels=3, backbone_name=\"resnet18\", pretrained=True, dropout_rate=0.3):\n",
" super().__init__()\n",
" backbone = getattr(models, backbone_name)(weights=models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None)\n",
" self.feature_dim = backbone.fc.in_features\n",
" backbone.fc = nn.Identity()\n",
" self.backbone = backbone\n",
"\n",
" self.head = nn.Sequential(\n",
" nn.Linear(self.feature_dim * 4, 512),\n",
" nn.ReLU(inplace=True),\n",
" nn.Dropout(dropout_rate),\n",
" nn.Linear(512, 256),\n",
" nn.ReLU(inplace=True),\n",
" nn.Dropout(dropout_rate),\n",
" nn.Linear(256, 6),\n",
" )\n",
"\n",
" def forward(self, img1, img2):\n",
" f1 = self.backbone(img1)\n",
" f2 = self.backbone(img2)\n",
" combined = torch.cat([f1, f2, torch.abs(f1 - f2), f1 * f2], dim=1)\n",
" return self.head(combined)\n",
"\n",
"\n",
"class HomographyLoss6(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.criterion = nn.MSELoss()\n",
"\n",
" def forward(self, pred, target):\n",
" return self.criterion(pred, target)\n",
"\n",
"\n",
"def count_parameters(model):\n",
" return sum(p.numel() for p in model.parameters())\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## Model\n\n`HomographyCNN6` — CNN architecture for homography estimation.\n\n**Output:** 6 parameters\n- `rx, ry, rz` — rotation angles (radians)\n- `tx, ty` — translation offsets\n- `scale` — isotropic scale factor\n\n**Architecture:**\n- Dual-branch CNN (Google + Yandex images)\n- Shared backbone (configurable: resnet18/34/50)\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"\n",
"\n",
"class HomographyTrainer:\n",
" def __init__(self, model, train_loader, val_loader, device):\n",
" self.model = model.to(device)\n",
" self.train_loader = train_loader\n",
" self.val_loader = val_loader\n",
" self.device = device\n",
" self.criterion = HomographyLoss6()\n",
" self.optimizer = optim.Adam(model.parameters(), lr=config[\"learning_rate\"])\n",
" self.writer = None\n",
" self.best_val_loss = float(\"inf\")\n",
"\n",
" def train_epoch(self, epoch):\n",
" self.model.train()\n",
" total_loss, total_samples = 0, 0\n",
" pbar = tqdm(self.train_loader, desc=f\"Epoch {epoch}\")\n",
" for batch_idx, batch in enumerate(pbar):\n",
" google_img = batch[\"google_img\"].to(self.device)\n",
" yandex_img = batch[\"yandex_img\"].to(self.device)\n",
" target = batch[\"homography_params\"].to(self.device)\n",
"\n",
" self.optimizer.zero_grad()\n",
" output = self.model(google_img, yandex_img)\n",
" loss = self.criterion(output, target)\n",
" loss.backward()\n",
" self.optimizer.step()\n",
"\n",
" total_loss += loss.item() * google_img.size(0)\n",
" total_samples += google_img.size(0)\n",
" pbar.set_postfix({\"loss\": loss.item()})\n",
"\n",
" return {\"loss\": total_loss / total_samples}\n",
"\n",
" def validate(self):\n",
" self.model.eval()\n",
" total_loss, total_samples = 0, 0\n",
" with torch.no_grad():\n",
" for batch in tqdm(self.val_loader, desc=\"Validation\"):\n",
" google_img = batch[\"google_img\"].to(self.device)\n",
" yandex_img = batch[\"yandex_img\"].to(self.device)\n",
" target = batch[\"homography_params\"].to(self.device)\n",
" output = self.model(google_img, yandex_img)\n",
" loss = self.criterion(output, target)\n",
" total_loss += loss.item() * google_img.size(0)\n",
" total_samples += google_img.size(0)\n",
" return {\"loss\": total_loss / total_samples}\n",
"\n",
" def train(self, num_epochs):\n",
" log_dir = config[\"output_dir\"]\n",
" os.makedirs(log_dir, exist_ok=True)\n",
" self.writer = SummaryWriter(log_dir)\n",
"\n",
" for epoch in range(1, num_epochs + 1):\n",
" train_metrics = self.train_epoch(epoch)\n",
" val_metrics = self.validate()\n",
" print(f\"Train Loss: {train_metrics['loss']:.4f}, Val Loss: {val_metrics['loss']:.4f}\")\n",
"\n",
" if val_metrics[\"loss\"] < self.best_val_loss:\n",
" self.best_val_loss = val_metrics[\"loss\"]\n",
" self.save_checkpoint(epoch, is_best=True)\n",
" print(f\"Best model saved (val loss: {val_metrics['loss']:.4f})\")\n",
"\n",
" if epoch % config[\"save_every_n_epochs\"] == 0:\n",
" self.save_checkpoint(epoch, is_best=False)\n",
" print(f\"Checkpoint saved at epoch {epoch}\")\n",
"\n",
" self.writer.close()\n",
"\n",
" def save_checkpoint(self, epoch, is_best=False):\n",
" ckpt_dir = os.path.join(config[\"output_dir\"], \"checkpoints\")\n",
" os.makedirs(ckpt_dir, exist_ok=True)\n",
" ckpt = {\"epoch\": epoch, \"model_state_dict\": self.model.state_dict(), \"val_loss\": self.best_val_loss}\n",
" torch.save(ckpt, os.path.join(ckpt_dir, f\"checkpoint_epoch_{epoch}.pt\"))\n",
" if is_best:\n",
" torch.save(ckpt, os.path.join(ckpt_dir, \"best_model.pt\"))\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## Training\n\n`HomographyTrainer` — training loop with validation and checkpointing.\n\n**Features:**\n- Epoch-based training with tqdm progress bar\n- Adam optimizer with configurable LR\n- Validation after each epoch\n- Best model auto-save\n- Periodic checkpoints (every N epochs via `save_every_n_epochs`)\n\n**Checkpoint saving:**\n- `best_model.pt` — lowest validation loss\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"def analyze_training(trainer):\n",
" print(\"=== Training Analysis ===\\n\")\n",
"\n",
" if trainer.writer:\n",
" print(\"TensorBoard logs available at:\", trainer.writer.log_dir)\n",
"\n",
" print(f\"\\nBest val loss: {trainer.best_val_loss:.4f}\")\n",
"\n",
" trainer.model.eval()\n",
" with torch.no_grad():\n",
" batch = next(iter(trainer.val_loader))\n",
" google_img = batch[\"google_img\"].to(trainer.device)\n",
" yandex_img = batch[\"yandex_img\"].to(trainer.device)\n",
" target_params = batch[\"homography_params\"].to(trainer.device)\n",
"\n",
" pred_params = trainer.model(google_img, yandex_img)\n",
"\n",
" print(f\"\\nSample predictions (first 3 of batch):\")\n",
" print(f\"{'Param':<8} {'Target':>12} {'Predicted':>12} {'Error':>12}\")\n",
" print(\"-\" * 46)\n",
" names = [\"rx\", \"ry\", \"rz\", \"tx\", \"ty\", \"scale\"]\n",
" for i in range(6):\n",
" t = target_params[0, i].item()\n",
" p = pred_params[0, i].item()\n",
" print(f\"{names[i]:<8} {t:>12.4f} {p:>12.4f} {abs(t-p):>12.4f}\")\n",
"\n",
" print(f\"\\nBatch mean abs error: {torch.mean(torch.abs(pred_params - target_params)).item():.4f}\")\n",
"\n",
" print(\"\\n=== Visualization ===\")\n",
" fig, axes = plt.subplots(1, 3, figsize=(15, 5))\n",
" img1 = google_img[0].cpu()\n",
" img2 = yandex_img[0].cpu()\n",
" axes[0].imshow(img1.permute(1, 2, 0))\n",
" axes[0].set_title(\"Google\")\n",
" axes[0].axis(\"off\")\n",
" axes[1].imshow(img2.permute(1, 2, 0))\n",
" axes[1].set_title(\"Yandex\")\n",
" axes[1].axis(\"off\")\n",
" axes[2].bar(names, pred_params[0].cpu().numpy())\n",
" axes[2].set_title(\"Predicted params\")\n",
" axes[2].axhline(y=0, color=\"k\", lw=0.5)\n",
" plt.tight_layout()\n",
" plt.savefig(\"prediction_sample.png\")\n",
" print(\"Saved prediction_sample.png\")\n",
" plt.show()\n",
"\n",
" return {\"best_val_loss\": trainer.best_val_loss}\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## Analysis\n\nVisualization and evaluation tools:\n\n- Training metrics plots (loss curves)\n- Prediction visualization on sample images\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"\n",
"\n",
"logging.basicConfig(level=logging.INFO, format=\"%(asctime)s - %(message)s\")\n",
"logger = logging.getLogger(__name__)\n",
"\n",
"logger.info(\"=\" * 50)\n",
"logger.info(\"SiaN Training Pipeline\")\n",
"logger.info(\"=\" * 50)\n",
"\n",
"dataset_info = get_dataset_info()\n",
"logger.info(f\"Dataset: {dataset_info['size']} samples, keys={dataset_info['sample_keys']}\")\n",
"\n",
"train_loader, val_loader = create_data_loaders(\n",
" root_dir=config[\"data_dir\"],\n",
" batch_size=config[\"batch_size\"],\n",
" train_split=config[\"train_split\"],\n",
" num_workers=config[\"num_workers\"],\n",
" image_size=config[\"image_size\"],\n",
")\n",
"logger.info(f\"Data loaders created: train={len(train_loader.dataset)}, val={len(val_loader.dataset)}\")\n",
"\n",
"model = HomographyCNN6(\n",
" input_channels=3,\n",
" backbone_name=config[\"backbone\"],\n",
" pretrained=True,\n",
" dropout_rate=config[\"dropout_rate\"]\n",
")\n",
"logger.info(f\"Model created with {count_parameters(model):,} parameters\")\n",
"\n",
"device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
"logger.info(f\"Using device: {device}\")\n",
"\n",
"trainer = HomographyTrainer(model, train_loader, val_loader, device)\n",
"logger.info(\"Starting training...\")\n",
"trainer.train(config[\"epochs\"])\n",
"logger.info(\"Training completed\")\n",
"\n",
"logger.info(\"Analyzing model...\")\n",
"results = analyze_training(trainer)\n",
"logger.info(f\"Analysis complete: best_val_loss={results['best_val_loss']:.4f}\")\n",
"\n",
"logger.info(\"=\" * 50)\n",
"logger.info(\"Pipeline completed successfully\")\n",
"logger.info(\"=\" * 50)\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## Main Pipeline\n\nExecutes the full training workflow:\n1. Load dataset info\n2. Create data loaders\n3. Initialize model\n4. Train with validation\n5. Analyze and export results\n\n**Outputs:**\n- Model checkpoints in `runs/checkpoints/`\n- TensorBoard logs in `runs/`\n"
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!zip artefacts.zip runs/gan_training/checkpoints/best_model.pt\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}