ci: configure and stabilize CI/CD pipeline

- fix runner configuration issues
- correct workflow labels and execution environment
- resolve dependency issues in pipeline (python deps)
- improve reliability of automation runs
This commit is contained in:
Mateusz Suski
2026-04-29 23:14:14 +00:00
parent 2313efac88
commit fcf305bd70
45 changed files with 6016 additions and 0 deletions
+10
View File
@@ -0,0 +1,10 @@
.PHONY: run test demo
run:
python3 cli.py --help
test:
python3 -m py_compile cli.py collectors/*.py validators/*.py reports/*.py
demo:
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
+56
View File
@@ -0,0 +1,56 @@
# Migration Validation Framework
## Problem Statement
Infrastructure migrations often fail in small, expensive ways: a mount option changes, a service is disabled, or disk usage moves past an operational threshold. Teams need structured evidence that the migrated host still matches the expected operating profile.
## Solution Overview
This project provides a Python CLI that collects system state into JSON snapshots and compares before/after files. The output is designed for change records, migration gates, and post-cutover validation.
## Architecture Overview
```
Operator -> CLI -> Collectors -> JSON Snapshot -> Comparator -> Diff/Report
```
Core components:
- `cli.py` provides collect, compare, snapshot, list, and report commands.
- `collectors/` gathers mounts, services, and disk usage.
- `validators/compare.py` identifies drift and validation failures.
- `reports/` contains report generation helpers.
- `examples/` contains realistic before/after evidence.
## How to Run
```bash
cd migration-validation-framework
python3 cli.py collect --output before.json --systems web01,db01
python3 cli.py collect --output after.json --systems web01,db01
python3 cli.py compare before.json after.json --output diff.json
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
```
Legacy snapshot IDs are still supported:
```bash
python3 cli.py snapshot --env prod --label pre --systems web01,db01
python3 cli.py compare prod-pre-20260429_020000 prod-post-20260429_030000 --output change-0429
```
## Example Output
```text
Comparison completed: diff.json (FAIL)
Overall risk: high
Total changes: 4
Failed checks: critical_services_running
Recommendation: restore sshd before production cutover
```
Sample inputs and output are available in [examples/before.json](examples/before.json), [examples/after.json](examples/after.json), and [examples/diff.json](examples/diff.json).
## Real-World Use Case
During a data center migration, a platform team can collect baseline state before cutover, collect the same evidence after DNS or workload migration, and attach the diff to the change ticket. The framework gives reviewers a compact signal on whether the host is ready for production traffic.
+323
View File
@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Migration Validation Framework - CLI Interface
A comprehensive tool for validating system migrations through data collection,
snapshot comparison, and automated reporting.
"""
import argparse
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
# Import framework modules
from collectors import mounts, services, disk_usage
from validators import compare
from reports import html_report
# Configuration
SNAPSHOTS_DIR = Path("snapshots")
LOGS_DIR = Path("logs")
REPORTS_DIR = Path("reports")
class MigrationValidator:
"""Main migration validation class."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.ensure_directories()
self.setup_logging()
def setup_logging(self):
"""Configure logging."""
log_level = logging.DEBUG if self.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOGS_DIR / "validation.log"),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def ensure_directories(self):
"""Ensure required directories exist."""
for directory in [SNAPSHOTS_DIR, LOGS_DIR, REPORTS_DIR]:
directory.mkdir(exist_ok=True)
def collect_system_data(self, systems: List[str]) -> Dict[str, Any]:
"""Collect data from target systems."""
self.logger.info(f"Collecting data from systems: {systems}")
snapshot = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"systems": systems,
"version": "1.0"
},
"data": {}
}
collectors = [
("mounts", mounts.collect),
("services", services.collect),
("disk_usage", disk_usage.collect)
]
for system in systems:
self.logger.info(f"Collecting data from {system}")
snapshot["data"][system] = {}
for collector_name, collector_func in collectors:
try:
self.logger.debug(f"Running {collector_name} collector on {system}")
data = collector_func(system)
snapshot["data"][system][collector_name] = data
except Exception as e:
self.logger.error(f"Failed to collect {collector_name} from {system}: {e}")
snapshot["data"][system][collector_name] = {"error": str(e)}
return snapshot
def save_snapshot(self, snapshot: Dict[str, Any], label: str, env: str) -> str:
"""Save snapshot to disk."""
snapshot_id = f"{env}-{label}-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json"
with open(snapshot_file, 'w') as f:
json.dump(snapshot, f, indent=2)
self.logger.info(f"Snapshot saved: {snapshot_id}")
return snapshot_id
def load_snapshot(self, snapshot_id: str) -> Dict[str, Any]:
"""Load snapshot from disk."""
snapshot_path = Path(snapshot_id)
snapshot_file = snapshot_path if snapshot_path.exists() else SNAPSHOTS_DIR / f"{snapshot_id}.json"
if not snapshot_file.exists():
raise FileNotFoundError(f"Snapshot {snapshot_id} not found")
with open(snapshot_file, 'r') as f:
return json.load(f)
def collect_to_file(self, output_file: str, systems: List[str]) -> str:
"""Collect a snapshot and write it to an explicit file path."""
snapshot = self.collect_system_data(systems)
with open(output_file, 'w') as f:
json.dump(snapshot, f, indent=2)
f.write("\n")
self.logger.info(f"Snapshot written: {output_file}")
return output_file
def create_snapshot(self, env: str, label: str, systems: List[str]) -> str:
"""Create and save a system snapshot."""
self.logger.info(f"Creating snapshot for environment: {env}, label: {label}")
snapshot = self.collect_system_data(systems)
snapshot_id = self.save_snapshot(snapshot, label, env)
return snapshot_id
def compare_snapshots(self, snapshot1_id: str, snapshot2_id: str, output_id: str) -> Dict[str, Any]:
"""Compare two snapshots."""
self.logger.info(f"Comparing snapshots: {snapshot1_id} vs {snapshot2_id}")
snapshot1 = self.load_snapshot(snapshot1_id)
snapshot2 = self.load_snapshot(snapshot2_id)
comparison = compare.compare_snapshots(snapshot1, snapshot2)
comparison["metadata"] = {
"snapshot1": snapshot1_id,
"snapshot2": snapshot2_id,
"timestamp": datetime.now().isoformat(),
"comparison_id": output_id
}
# Save comparison results
comparison_file = REPORTS_DIR / f"comparison_{output_id}.json"
with open(comparison_file, 'w') as f:
json.dump(comparison, f, indent=2)
self.logger.info(f"Comparison saved: {output_id}")
return comparison
def compare_files(self, before_file: str, after_file: str, output_file: Optional[str] = None) -> Dict[str, Any]:
"""Compare two explicit JSON snapshot files."""
self.logger.info(f"Comparing files: {before_file} vs {after_file}")
before = self.load_snapshot(before_file)
after = self.load_snapshot(after_file)
comparison = compare.compare_snapshots(before, after)
comparison["metadata"] = {
"before": before_file,
"after": after_file,
"timestamp": datetime.now().isoformat()
}
if output_file:
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
f.write("\n")
self.logger.info(f"Comparison written: {output_file}")
return comparison
def generate_report(self, comparison_id: str, format_type: str, output_file: Optional[str] = None) -> str:
"""Generate a report from comparison results."""
self.logger.info(f"Generating {format_type} report for comparison: {comparison_id}")
comparison_file = REPORTS_DIR / f"comparison_{comparison_id}.json"
if not comparison_file.exists():
raise FileNotFoundError(f"Comparison {comparison_id} not found")
with open(comparison_file, 'r') as f:
comparison = json.load(f)
if format_type == "html":
if output_file is None:
output_file = f"migration_report_{comparison_id}.html"
html_report.generate(comparison, output_file)
elif format_type == "json":
if output_file is None:
output_file = f"migration_report_{comparison_id}.json"
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
else:
raise ValueError(f"Unsupported format: {format_type}")
self.logger.info(f"Report generated: {output_file}")
return output_file
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="Migration Validation Framework",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Collect pre-migration snapshot
python3 cli.py collect --output before.json --systems web01,db01
# Compare snapshot files
python3 cli.py compare before.json after.json --output diff.json
# Generate HTML report
python3 cli.py report --comparison comparison_001 --format html
"""
)
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
parser.add_argument('--dry-run', action='store_true', help='Preview actions without execution')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Collect command
collect_parser = subparsers.add_parser('collect', help='Collect a system snapshot to a JSON file')
collect_parser.add_argument('--output', required=True, help='Output JSON file')
collect_parser.add_argument('--systems', default='localhost', help='Comma-separated list of systems')
# Snapshot command
snapshot_parser = subparsers.add_parser('snapshot', help='Create system snapshot')
snapshot_parser.add_argument('--env', required=True, help='Target environment')
snapshot_parser.add_argument('--label', required=True, help='Snapshot label')
snapshot_parser.add_argument('--systems', required=True, help='Comma-separated list of systems')
# Compare command
compare_parser = subparsers.add_parser('compare', help='Compare two snapshots')
compare_parser.add_argument('snapshot1', help='First snapshot ID')
compare_parser.add_argument('snapshot2', help='Second snapshot ID')
compare_parser.add_argument('--output', help='Output comparison ID or JSON file')
# Report command
report_parser = subparsers.add_parser('report', help='Generate report from comparison')
report_parser.add_argument('--comparison', required=True, help='Comparison ID')
report_parser.add_argument('--format', choices=['html', 'json'], default='html', help='Report format')
report_parser.add_argument('--output', help='Output file path')
# List command
list_parser = subparsers.add_parser('list', help='List snapshots or comparisons')
list_parser.add_argument('type', choices=['snapshots', 'comparisons'], help='Type to list')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize validator
validator = MigrationValidator(verbose=args.verbose)
try:
if args.command == 'collect':
systems = [system.strip() for system in args.systems.split(',') if system.strip()]
if args.dry_run:
print(f"DRY RUN: Would collect {systems} into {args.output}")
return
output_file = validator.collect_to_file(args.output, systems)
print(f"Snapshot written: {output_file}")
elif args.command == 'snapshot':
systems = args.systems.split(',')
if args.dry_run:
print(f"DRY RUN: Would create snapshot for systems: {systems}")
return
snapshot_id = validator.create_snapshot(args.env, args.label, systems)
print(f"Snapshot created: {snapshot_id}")
elif args.command == 'compare':
if args.dry_run:
print(f"DRY RUN: Would compare {args.snapshot1} vs {args.snapshot2}")
return
output = args.output
if output and output.endswith('.json'):
comparison = validator.compare_files(args.snapshot1, args.snapshot2, output)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output} ({result})")
else:
output_id = output or datetime.now().strftime('%Y%m%d_%H%M%S')
comparison = validator.compare_snapshots(args.snapshot1, args.snapshot2, output_id)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output_id} ({result})")
elif args.command == 'report':
if args.dry_run:
print(f"DRY RUN: Would generate {args.format} report for {args.comparison}")
return
output_file = validator.generate_report(args.comparison, args.format, args.output)
print(f"Report generated: {output_file}")
elif args.command == 'list':
if args.type == 'snapshots':
snapshots = list(SNAPSHOTS_DIR.glob("*.json"))
if snapshots:
print("Available snapshots:")
for snapshot in sorted(snapshots):
print(f" {snapshot.stem}")
else:
print("No snapshots found")
elif args.type == 'comparisons':
comparisons = list(REPORTS_DIR.glob("comparison_*.json"))
if comparisons:
print("Available comparisons:")
for comparison in sorted(comparisons):
comp_id = comparison.stem.replace('comparison_', '')
print(f" {comp_id}")
else:
print("No comparisons found")
except Exception as e:
validator.logger.error(f"Command failed: {e}")
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
@@ -0,0 +1,207 @@
"""
Disk Usage Data Collector
Collects disk usage statistics including directory sizes,
file system usage, and largest files information.
"""
import json
import logging
import subprocess
from typing import Dict, Any, List
from pathlib import Path
logger = logging.getLogger(__name__)
class DiskUsageCollector:
"""Collector for disk usage statistics."""
def __init__(self):
self.max_depth = 3
self.exclude_paths = [
"/proc",
"/sys",
"/dev",
"/run",
"/tmp",
"/var/log"
]
def collect_disk_usage(self, system: str) -> Dict[str, Any]:
"""Collect disk usage information from target system."""
logger.info(f"Collecting disk usage data from {system}")
try:
# Collect filesystem usage
filesystem_usage = self.collect_filesystem_usage(system)
# Collect directory sizes
directory_sizes = self.collect_directory_sizes(system)
# Collect largest files
largest_files = self.collect_largest_files(system)
return {
"filesystem_usage": filesystem_usage,
"directory_sizes": directory_sizes,
"largest_files": largest_files,
"timestamp": self.get_timestamp(system)
}
except Exception as e:
logger.error(f"Failed to collect disk usage from {system}: {e}")
raise
def collect_filesystem_usage(self, system: str) -> List[Dict[str, Any]]:
"""Collect filesystem usage statistics."""
usage_stats = []
try:
# Run df command
result = subprocess.run(
["ssh", system, "df -h --output=source,fstype,size,used,avail,pcent,target"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"df command failed: {result.stderr}")
# Parse output
lines = result.stdout.strip().split('\n')
if len(lines) < 2:
return usage_stats
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 7:
usage_stat = {
"filesystem": parts[0],
"type": parts[1],
"size": parts[2],
"used": parts[3],
"available": parts[4],
"use_percent": parts[5],
"mountpoint": parts[6]
}
usage_stats.append(usage_stat)
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting filesystem usage from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect filesystem usage from {system}: {e}")
raise
return usage_stats
def collect_directory_sizes(self, system: str) -> List[Dict[str, Any]]:
"""Collect sizes of top-level directories."""
directory_sizes = []
try:
# Get top-level directories
dirs_to_check = ["/", "/home", "/var", "/usr", "/opt", "/etc"]
for directory in dirs_to_check:
if directory in self.exclude_paths:
continue
try:
# Run du command for directory size
result = subprocess.run(
["ssh", system, f"du -sh {directory} 2>/dev/null"],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
size, path = result.stdout.strip().split('\t', 1)
directory_sizes.append({
"path": path,
"size": size
})
except subprocess.TimeoutExpired:
logger.warning(f"Timeout getting size for {directory} on {system}")
continue
except Exception as e:
logger.warning(f"Failed to get size for {directory} on {system}: {e}")
continue
except Exception as e:
logger.error(f"Failed to collect directory sizes from {system}: {e}")
raise
return directory_sizes
def collect_largest_files(self, system: str) -> List[Dict[str, Any]]:
"""Collect information about largest files in the system."""
largest_files = []
try:
# Find largest files (excluding certain paths)
exclude_expr = " ".join(f"-not -path '{path}/*'" for path in self.exclude_paths)
cmd = f"find / {exclude_expr} -type f -exec ls -lh {{}} \\; 2>/dev/null | sort -k5 -hr | head -20"
result = subprocess.run(
["ssh", system, cmd],
capture_output=True,
text=True,
timeout=120
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 9:
file_info = {
"permissions": parts[0],
"links": parts[1],
"owner": parts[2],
"group": parts[3],
"size": parts[4],
"month": parts[5],
"day": parts[6],
"time": parts[7],
"path": " ".join(parts[8:])
}
largest_files.append(file_info)
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting largest files from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect largest files from {system}: {e}")
raise
return largest_files
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for disk usage data."""
collector = DiskUsageCollector()
return collector.collect_disk_usage(system)
@@ -0,0 +1,173 @@
"""
Mounts Data Collector
Collects filesystem mount information including mount points, devices,
filesystem types, and usage statistics.
"""
import json
import logging
import subprocess
from typing import Dict, Any, List
from pathlib import Path
logger = logging.getLogger(__name__)
class MountsCollector:
"""Collector for filesystem mount information."""
def __init__(self):
self.exclude_patterns = [
"/proc/*",
"/sys/*",
"/dev/*",
"/run/*"
]
def collect_mounts(self, system: str) -> Dict[str, Any]:
"""Collect mount information from target system."""
logger.info(f"Collecting mounts data from {system}")
try:
# Run mount command
result = subprocess.run(
["ssh", system, "mount"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"Mount command failed: {result.stderr}")
mounts = self.parse_mount_output(result.stdout)
filtered_mounts = self.filter_mounts(mounts)
# Get usage statistics
usage_stats = self.collect_usage_stats(system, filtered_mounts)
return {
"mounts": filtered_mounts,
"usage": usage_stats,
"timestamp": self.get_timestamp(system)
}
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting mounts from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect mounts from {system}: {e}")
raise
def parse_mount_output(self, output: str) -> List[Dict[str, str]]:
"""Parse mount command output."""
mounts = []
for line in output.strip().split('\n'):
if not line.strip():
continue
# Parse mount output format: device on mountpoint type fstype (options)
parts = line.split()
if len(parts) >= 6 and parts[1] == 'on' and parts[3] == 'type':
mount_info = {
"device": parts[0],
"mountpoint": parts[2],
"fstype": parts[4],
"options": parts[5].strip('()')
}
mounts.append(mount_info)
return mounts
def filter_mounts(self, mounts: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Filter out unwanted mount points."""
filtered = []
for mount in mounts:
mountpoint = mount["mountpoint"]
if not any(Path(mountpoint).match(pattern.rstrip('*')) for pattern in self.exclude_patterns):
filtered.append(mount)
return filtered
def collect_usage_stats(self, system: str, mounts: List[Dict[str, str]]) -> Dict[str, Any]:
"""Collect disk usage statistics for mount points."""
usage_stats = {}
for mount in mounts:
mountpoint = mount["mountpoint"]
try:
# Run df command for usage statistics
result = subprocess.run(
["ssh", system, f"df -BG {mountpoint}"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0:
usage_stats[mountpoint] = self.parse_df_output(result.stdout)
except subprocess.TimeoutExpired:
logger.warning(f"Timeout getting usage for {mountpoint} on {system}")
usage_stats[mountpoint] = {"error": "timeout"}
except Exception as e:
logger.warning(f"Failed to get usage for {mountpoint} on {system}: {e}")
usage_stats[mountpoint] = {"error": str(e)}
return usage_stats
def parse_df_output(self, output: str) -> Dict[str, Any]:
"""Parse df command output."""
lines = output.strip().split('\n')
if len(lines) < 2:
return {"error": "invalid df output"}
# Parse header and data
header = lines[0].split()
data = lines[1].split()
if len(header) != len(data):
return {"error": "header/data mismatch"}
stats = {}
for i, field in enumerate(header):
if i < len(data):
if field in ['1G-blocks', 'Used', 'Available']:
# Convert to GB
value = data[i]
if value.endswith('G'):
stats[field.lower()] = float(value.rstrip('G'))
else:
stats[field.lower()] = float(value) / (1024**3) # Assume bytes
elif field == 'Use%':
stats['use_percent'] = int(data[i].rstrip('%'))
else:
stats[field.lower()] = data[i]
return stats
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for mounts data."""
collector = MountsCollector()
return collector.collect_mounts(system)
@@ -0,0 +1,223 @@
"""
Services Data Collector
Collects system service information including running services,
their states, startup configuration, and dependencies.
"""
import json
import logging
import subprocess
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class ServicesCollector:
"""Collector for system service information."""
def __init__(self):
self.service_manager = "systemd" # Default to systemd
self.include_disabled = False
def collect_services(self, system: str) -> Dict[str, Any]:
"""Collect service information from target system."""
logger.info(f"Collecting services data from {system}")
try:
# Detect service manager
service_manager = self.detect_service_manager(system)
if service_manager == "systemd":
services = self.collect_systemd_services(system)
elif service_manager == "sysv":
services = self.collect_sysv_services(system)
else:
raise RuntimeError(f"Unsupported service manager: {service_manager}")
return {
"service_manager": service_manager,
"services": services,
"timestamp": self.get_timestamp(system)
}
except Exception as e:
logger.error(f"Failed to collect services from {system}: {e}")
raise
def detect_service_manager(self, system: str) -> str:
"""Detect which service manager is running on the system."""
try:
# Check for systemd
result = subprocess.run(
["ssh", system, "ps -p 1 -o comm="],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
if "systemd" in result.stdout.strip():
return "systemd"
elif "init" in result.stdout.strip():
return "sysv"
# Fallback check
result = subprocess.run(
["ssh", system, "which systemctl"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return "systemd"
return "sysv"
except Exception:
return "unknown"
def collect_systemd_services(self, system: str) -> List[Dict[str, Any]]:
"""Collect systemd service information."""
services = []
try:
# Get all services
result = subprocess.run(
["ssh", system, "systemctl list-units --type=service --all --no-pager --no-legend"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"systemctl list-units failed: {result.stderr}")
# Parse service list
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 4:
service_name = parts[0]
load_state = parts[1]
active_state = parts[2]
sub_state = parts[3]
# Skip if disabled and not including disabled
if not self.include_disabled and load_state == "not-found":
continue
# Get detailed service info
service_info = self.get_systemd_service_details(system, service_name)
services.append({
"name": service_name,
"load_state": load_state,
"active_state": active_state,
"sub_state": sub_state,
**service_info
})
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting systemd services from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect systemd services from {system}: {e}")
raise
return services
def get_systemd_service_details(self, system: str, service_name: str) -> Dict[str, Any]:
"""Get detailed information for a systemd service."""
details = {}
try:
# Get service status
result = subprocess.run(
["ssh", system, f"systemctl show {service_name} --no-pager"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
details[key.lower()] = value
except Exception as e:
logger.warning(f"Failed to get details for {service_name}: {e}")
return details
def collect_sysv_services(self, system: str) -> List[Dict[str, Any]]:
"""Collect SysV init service information."""
services = []
try:
# Get service list from /etc/init.d/
result = subprocess.run(
["ssh", system, "ls -1 /etc/init.d/"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode != 0:
raise RuntimeError(f"Failed to list init.d services: {result.stderr}")
for service_name in result.stdout.strip().split('\n'):
if not service_name.strip():
continue
# Get service status
status_result = subprocess.run(
["ssh", system, f"/etc/init.d/{service_name} status"],
capture_output=True,
text=True,
timeout=10
)
status = "unknown"
if status_result.returncode == 0:
status = "running"
elif "not running" in status_result.stdout.lower():
status = "stopped"
services.append({
"name": service_name,
"status": status,
"type": "sysv"
})
except Exception as e:
logger.error(f"Failed to collect SysV services from {system}: {e}")
raise
return services
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for services data."""
collector = ServicesCollector()
return collector.collect_services(system)
@@ -0,0 +1,30 @@
# Migration Validation Framework Architecture
## Components
- CLI: parses operator commands and coordinates workflows.
- Collectors: gather mounts, services, and disk usage from target systems.
- Snapshot files: JSON evidence used as immutable migration checkpoints.
- Comparator: evaluates drift between before and after snapshots.
- Reports: stores JSON or HTML output for audit and review.
## Data Flow
```
Operator
-> python3 cli.py collect
-> collectors over SSH
-> before.json / after.json
-> python3 cli.py compare
-> diff.json with PASS/FAIL validation
```
## Validation Flow
```
before.json -> Comparator -> service checks
after.json -> Comparator -> filesystem checks -> validation result
-> mount checks
```
The framework keeps collection and comparison separate so migration evidence can be reviewed, archived, and replayed without recollecting from production systems.
@@ -0,0 +1,40 @@
{
"metadata": {
"timestamp": "2026-04-29T03:40:00Z",
"systems": ["web01"],
"version": "1.0"
},
"data": {
"web01": {
"mounts": {
"mounts": [
{"device": "/dev/sda1", "mountpoint": "/", "fstype": "ext4", "options": "rw,relatime"},
{"device": "/dev/sdb1", "mountpoint": "/var", "fstype": "xfs", "options": "rw,noatime"}
],
"usage": {
"/": {"filesystem": "/dev/sda1", "use_percent": "62%"},
"/var": {"filesystem": "/dev/sdb1", "use_percent": "94%"}
},
"timestamp": "2026-04-29T03:40:00Z"
},
"services": {
"service_manager": "systemd",
"services": [
{"name": "sshd", "active_state": "failed", "sub_state": "failed"},
{"name": "nginx", "active_state": "active", "sub_state": "running"},
{"name": "node-exporter", "active_state": "active", "sub_state": "running"}
],
"timestamp": "2026-04-29T03:40:00Z"
},
"disk_usage": {
"filesystem_usage": [
{"filesystem": "/dev/sda1", "type": "ext4", "size": "80G", "used": "50G", "available": "30G", "use_percent": "62%", "mountpoint": "/"},
{"filesystem": "/dev/sdb1", "type": "xfs", "size": "200G", "used": "188G", "available": "12G", "use_percent": "94%", "mountpoint": "/var"}
],
"directory_sizes": [{"path": "/var/lib/app", "size": "139G"}],
"largest_files": [{"path": "/var/lib/app/import/archive.tar", "size": "42G"}],
"timestamp": "2026-04-29T03:40:00Z"
}
}
}
}
@@ -0,0 +1,39 @@
{
"metadata": {
"timestamp": "2026-04-29T01:15:00Z",
"systems": ["web01"],
"version": "1.0"
},
"data": {
"web01": {
"mounts": {
"mounts": [
{"device": "/dev/sda1", "mountpoint": "/", "fstype": "ext4", "options": "rw,relatime"},
{"device": "/dev/sdb1", "mountpoint": "/var", "fstype": "xfs", "options": "rw,noatime"}
],
"usage": {
"/": {"filesystem": "/dev/sda1", "use_percent": "61%"},
"/var": {"filesystem": "/dev/sdb1", "use_percent": "68%"}
},
"timestamp": "2026-04-29T01:15:00Z"
},
"services": {
"service_manager": "systemd",
"services": [
{"name": "sshd", "active_state": "active", "sub_state": "running"},
{"name": "nginx", "active_state": "active", "sub_state": "running"}
],
"timestamp": "2026-04-29T01:15:00Z"
},
"disk_usage": {
"filesystem_usage": [
{"filesystem": "/dev/sda1", "type": "ext4", "size": "80G", "used": "49G", "available": "31G", "use_percent": "61%", "mountpoint": "/"},
{"filesystem": "/dev/sdb1", "type": "xfs", "size": "200G", "used": "136G", "available": "64G", "use_percent": "68%", "mountpoint": "/var"}
],
"directory_sizes": [{"path": "/var/lib/app", "size": "84G"}],
"largest_files": [],
"timestamp": "2026-04-29T01:15:00Z"
}
}
}
}
@@ -0,0 +1,211 @@
{
"summary": {
"total_systems": 1,
"systems_with_changes": 1,
"total_changes": 7,
"changes_by_type": {
"mounts": 2,
"services": 2,
"disk_usage": 3
},
"most_affected_systems": [
[
"web01",
7
]
]
},
"differences": {
"mounts": {
"web01": {
"added_mounts": [],
"removed_mounts": [],
"changed_mounts": [],
"usage_changes": [
{
"mountpoint": "/",
"before": {
"filesystem": "/dev/sda1",
"use_percent": "61%"
},
"after": {
"filesystem": "/dev/sda1",
"use_percent": "62%"
}
},
{
"mountpoint": "/var",
"before": {
"filesystem": "/dev/sdb1",
"use_percent": "68%"
},
"after": {
"filesystem": "/dev/sdb1",
"use_percent": "94%"
}
}
]
}
},
"services": {
"web01": {
"added_services": [
{
"name": "node-exporter",
"active_state": "active",
"sub_state": "running"
}
],
"removed_services": [],
"status_changes": [
{
"name": "sshd",
"before": {
"active_state": "active",
"sub_state": "running"
},
"after": {
"active_state": "failed",
"sub_state": "failed"
}
}
],
"configuration_changes": []
}
},
"disk_usage": {
"web01": {
"filesystem_changes": [
{
"mountpoint": "/",
"before": {
"filesystem": "/dev/sda1",
"type": "ext4",
"size": "80G",
"used": "49G",
"available": "31G",
"use_percent": "61%",
"mountpoint": "/"
},
"after": {
"filesystem": "/dev/sda1",
"type": "ext4",
"size": "80G",
"used": "50G",
"available": "30G",
"use_percent": "62%",
"mountpoint": "/"
}
},
{
"mountpoint": "/var",
"before": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "136G",
"available": "64G",
"use_percent": "68%",
"mountpoint": "/var"
},
"after": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "188G",
"available": "12G",
"use_percent": "94%",
"mountpoint": "/var"
}
}
],
"directory_size_changes": [],
"significant_usage_changes": [
{
"mountpoint": "/var",
"change_percent": 26,
"before": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "136G",
"available": "64G",
"use_percent": "68%",
"mountpoint": "/var"
},
"after": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "188G",
"available": "12G",
"use_percent": "94%",
"mountpoint": "/var"
}
}
]
}
}
},
"risk_assessment": {
"overall_risk": "high",
"risk_factors": [
{
"type": "service_failure",
"description": "Service failed: sshd",
"level": 3
},
{
"type": "disk_usage_spike",
"description": "Significant disk usage change: /var (26%)",
"level": 2
}
],
"critical_changes": [],
"recommendations": [
"Immediate review required - critical changes detected",
"Consider rolling back migration if critical services are affected"
]
},
"validation_results": {
"passed": false,
"checks": [
{
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": false,
"details": [
"Critical service sshd failed on web01"
]
},
{
"name": "filesystem_integrity",
"description": "Verify filesystem integrity maintained",
"passed": true,
"details": []
},
{
"name": "no_critical_mounts_removed",
"description": "Verify critical mount points remain",
"passed": true,
"details": []
}
],
"failed_checks": [
{
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": false,
"details": [
"Critical service sshd failed on web01"
]
}
],
"result": "FAIL"
},
"metadata": {
"before": "migration-validation-framework/examples/before.json",
"after": "migration-validation-framework/examples/after.json",
"timestamp": "2026-04-29T23:29:07.510774"
}
}
@@ -0,0 +1,608 @@
"""
HTML Report Generator
Generates comprehensive HTML reports from migration validation comparison results.
"""
import json
import logging
from typing import Dict, Any
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
class HTMLReportGenerator:
"""Generator for HTML migration validation reports."""
def __init__(self):
self.css_styles = self.get_css_styles()
self.js_scripts = self.get_js_scripts()
def generate_report(self, comparison: Dict[str, Any], output_file: str) -> str:
"""Generate HTML report from comparison data."""
logger.info(f"Generating HTML report: {output_file}")
html_content = self.build_html_content(comparison)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"HTML report generated: {output_file}")
return output_file
def build_html_content(self, comparison: Dict[str, Any]) -> str:
"""Build complete HTML content."""
metadata = comparison.get("metadata", {})
summary = comparison.get("summary", {})
differences = comparison.get("differences", {})
risk_assessment = comparison.get("risk_assessment", {})
validation_results = comparison.get("validation_results", {})
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Migration Validation Report</title>
<style>{self.css_styles}</style>
</head>
<body>
<div class="container">
<header>
<h1>Migration Validation Report</h1>
<div class="report-meta">
<p><strong>Report Generated:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>Comparison ID:</strong> {metadata.get('comparison_id', 'N/A')}</p>
<p><strong>Snapshot 1:</strong> {metadata.get('snapshot1', 'N/A')}</p>
<p><strong>Snapshot 2:</strong> {metadata.get('snapshot2', 'N/A')}</p>
</div>
</header>
<nav class="toc">
<h2>Table of Contents</h2>
<ul>
<li><a href="#executive-summary">Executive Summary</a></li>
<li><a href="#risk-assessment">Risk Assessment</a></li>
<li><a href="#validation-results">Validation Results</a></li>
<li><a href="#detailed-changes">Detailed Changes</a></li>
<li><a href="#recommendations">Recommendations</a></li>
</ul>
</nav>
<section id="executive-summary">
<h2>Executive Summary</h2>
{self.generate_summary_section(summary)}
</section>
<section id="risk-assessment">
<h2>Risk Assessment</h2>
{self.generate_risk_section(risk_assessment)}
</section>
<section id="validation-results">
<h2>Validation Results</h2>
{self.generate_validation_section(validation_results)}
</section>
<section id="detailed-changes">
<h2>Detailed Changes</h2>
{self.generate_changes_section(differences)}
</section>
<section id="recommendations">
<h2>Recommendations</h2>
{self.generate_recommendations_section(risk_assessment)}
</section>
</div>
<script>{self.js_scripts}</script>
</body>
</html>"""
return html
def generate_summary_section(self, summary: Dict[str, Any]) -> str:
"""Generate executive summary HTML."""
total_systems = summary.get('total_systems', 0)
systems_with_changes = summary.get('systems_with_changes', 0)
total_changes = summary.get('total_changes', 0)
html = f"""
<div class="summary-grid">
<div class="summary-card">
<h3>Systems Analyzed</h3>
<div class="metric">{total_systems}</div>
</div>
<div class="summary-card">
<h3>Systems with Changes</h3>
<div class="metric">{systems_with_changes}</div>
</div>
<div class="summary-card">
<h3>Total Changes</h3>
<div class="metric">{total_changes}</div>
</div>
</div>
<h3>Changes by Type</h3>
<table class="changes-table">
<thead>
<tr>
<th>Data Type</th>
<th>Changes</th>
</tr>
</thead>
<tbody>"""
for data_type, count in summary.get('changes_by_type', {}).items():
html += f"""
<tr>
<td>{data_type.replace('_', ' ').title()}</td>
<td>{count}</td>
</tr>"""
html += """
</tbody>
</table>
<h3>Most Affected Systems</h3>
<table class="systems-table">
<thead>
<tr>
<th>System</th>
<th>Changes</th>
</tr>
</thead>
<tbody>"""
for system, count in summary.get('most_affected_systems', []):
html += f"""
<tr>
<td>{system}</td>
<td>{count}</td>
</tr>"""
html += """
</tbody>
</table>"""
return html
def generate_risk_section(self, risk_assessment: Dict[str, Any]) -> str:
"""Generate risk assessment HTML."""
overall_risk = risk_assessment.get('overall_risk', 'unknown')
risk_color = self.get_risk_color(overall_risk)
html = f"""
<div class="risk-overview">
<h3>Overall Risk Level</h3>
<div class="risk-badge risk-{overall_risk}" style="background-color: {risk_color}">
{overall_risk.upper()}
</div>
</div>
<h3>Risk Factors</h3>
<div class="risk-factors">"""
for factor in risk_assessment.get('risk_factors', []):
factor_color = self.get_risk_color(self.get_risk_level_name(factor.get('level', 1)))
html += f"""
<div class="risk-factor">
<div class="risk-badge risk-{self.get_risk_level_name(factor.get('level', 1))}" style="background-color: {factor_color}">
{self.get_risk_level_name(factor.get('level', 1)).upper()}
</div>
<div class="risk-details">
<strong>{factor.get('type', 'Unknown').replace('_', ' ').title()}</strong>
<p>{factor.get('description', 'No description')}</p>
</div>
</div>"""
html += """
</div>
<h3>Critical Changes</h3>
<div class="critical-changes">"""
for change in risk_assessment.get('critical_changes', []):
html += f"""
<div class="critical-change">
<h4>{change.get('system', 'Unknown System')}</h4>
<p><strong>Type:</strong> {change.get('data_type', 'Unknown')}</p>
<p>{change.get('factor', {}).get('description', 'No details')}</p>
</div>"""
html += "</div>"
return html
def generate_validation_section(self, validation_results: Dict[str, Any]) -> str:
"""Generate validation results HTML."""
passed = validation_results.get('passed', False)
status_color = "#28a745" if passed else "#dc3545"
status_text = "PASSED" if passed else "FAILED"
html = f"""
<div class="validation-status">
<div class="status-indicator" style="background-color: {status_color}">
{status_text}
</div>
</div>
<h3>Validation Checks</h3>
<div class="validation-checks">"""
for check in validation_results.get('checks', []):
check_status = "" if check.get('passed', False) else ""
check_color = "#28a745" if check.get('passed', False) else "#dc3545"
html += f"""
<div class="validation-check">
<div class="check-status" style="color: {check_color}">{check_status}</div>
<div class="check-details">
<h4>{check.get('name', 'Unknown').replace('_', ' ').title()}</h4>
<p>{check.get('description', 'No description')}</p>
{"<ul>" + "".join(f"<li>{detail}</li>" for detail in check.get('details', [])) + "</ul>" if check.get('details') else ""}
</div>
</div>"""
html += "</div>"
return html
def generate_changes_section(self, differences: Dict[str, Any]) -> str:
"""Generate detailed changes HTML."""
html = ""
for data_type, systems in differences.items():
html += f"<h3>{data_type.replace('_', ' ').title()} Changes</h3>"
for system, system_diffs in systems.items():
html += f"<h4>System: {system}</h4>"
# Generate tables for different change types
html += self.generate_change_tables(system_diffs)
return html
def generate_change_tables(self, system_diffs: Dict[str, Any]) -> str:
"""Generate HTML tables for different types of changes."""
html = ""
change_types = {
'added_mounts': ('Added Mounts', ['mountpoint', 'device', 'fstype']),
'removed_mounts': ('Removed Mounts', ['mountpoint', 'device', 'fstype']),
'changed_mounts': ('Changed Mounts', ['mountpoint', 'before', 'after']),
'usage_changes': ('Usage Changes', ['mountpoint', 'before', 'after']),
'added_services': ('Added Services', ['name', 'active_state', 'sub_state']),
'removed_services': ('Removed Services', ['name', 'active_state', 'sub_state']),
'status_changes': ('Service Status Changes', ['name', 'before', 'after']),
'filesystem_changes': ('Filesystem Changes', ['mountpoint', 'before', 'after']),
'directory_size_changes': ('Directory Size Changes', ['path', 'before', 'after']),
'significant_usage_changes': ('Significant Usage Changes', ['mountpoint', 'change_percent', 'before', 'after'])
}
for change_key, (title, columns) in change_types.items():
if change_key in system_diffs and system_diffs[change_key]:
html += f"<h5>{title}</h5>"
html += '<table class="changes-table">'
html += '<thead><tr>'
for col in columns:
html += f'<th>{col.replace("_", " ").title()}</th>'
html += '</tr></thead><tbody>'
for item in system_diffs[change_key]:
html += '<tr>'
for col in columns:
value = item.get(col, '')
if isinstance(value, dict):
value = json.dumps(value, indent=2)
html += f'<td><pre>{value}</pre></td>'
html += '</tr>'
html += '</tbody></table>'
return html
def generate_recommendations_section(self, risk_assessment: Dict[str, Any]) -> str:
"""Generate recommendations HTML."""
recommendations = risk_assessment.get('recommendations', [])
html = '<div class="recommendations">'
if not recommendations:
html += '<p>No specific recommendations at this time.</p>'
else:
html += '<ul>'
for rec in recommendations:
html += f'<li>{rec}</li>'
html += '</ul>'
html += '</div>'
return html
def get_risk_color(self, risk_level: str) -> str:
"""Get color for risk level."""
colors = {
'low': '#28a745',
'medium': '#ffc107',
'high': '#fd7e14',
'critical': '#dc3545'
}
return colors.get(risk_level.lower(), '#6c757d')
def get_risk_level_name(self, level: int) -> str:
"""Get risk level name from numeric level."""
levels = {1: 'low', 2: 'medium', 3: 'high', 4: 'critical'}
return levels.get(level, 'unknown')
def get_css_styles(self) -> str:
"""Get CSS styles for the report."""
return """
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.6;
color: #333;
margin: 0;
padding: 20px;
background-color: #f8f9fa;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
header {
text-align: center;
margin-bottom: 40px;
padding-bottom: 20px;
border-bottom: 2px solid #e9ecef;
}
h1 {
color: #2c3e50;
margin-bottom: 10px;
}
.report-meta {
color: #6c757d;
font-size: 0.9em;
}
.toc {
background: #f8f9fa;
padding: 20px;
border-radius: 5px;
margin-bottom: 30px;
}
.toc ul {
list-style: none;
padding: 0;
}
.toc li {
margin: 5px 0;
}
.toc a {
color: #007bff;
text-decoration: none;
}
.toc a:hover {
text-decoration: underline;
}
section {
margin-bottom: 40px;
}
h2 {
color: #2c3e50;
border-bottom: 1px solid #e9ecef;
padding-bottom: 10px;
}
h3 {
color: #495057;
margin-top: 30px;
}
.summary-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin: 20px 0;
}
.summary-card {
background: #f8f9fa;
padding: 20px;
border-radius: 5px;
text-align: center;
}
.metric {
font-size: 2em;
font-weight: bold;
color: #007bff;
margin: 10px 0;
}
table {
width: 100%;
border-collapse: collapse;
margin: 20px 0;
}
th, td {
padding: 12px;
text-align: left;
border-bottom: 1px solid #e9ecef;
}
th {
background-color: #f8f9fa;
font-weight: 600;
}
.risk-overview {
text-align: center;
margin: 20px 0;
}
.risk-badge {
display: inline-block;
padding: 8px 16px;
border-radius: 20px;
color: white;
font-weight: bold;
margin: 10px;
}
.risk-factors, .critical-changes {
margin: 20px 0;
}
.risk-factor, .critical-change {
background: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin: 10px 0;
border-left: 4px solid #007bff;
}
.risk-factor {
display: flex;
align-items: center;
}
.risk-details {
margin-left: 15px;
}
.validation-status {
text-align: center;
margin: 20px 0;
}
.status-indicator {
display: inline-block;
padding: 15px 30px;
border-radius: 5px;
color: white;
font-weight: bold;
font-size: 1.2em;
}
.validation-checks {
margin: 20px 0;
}
.validation-check {
display: flex;
align-items: flex-start;
background: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin: 10px 0;
}
.check-status {
font-size: 1.5em;
margin-right: 15px;
margin-top: 5px;
}
.check-details {
flex: 1;
}
.recommendations ul {
background: #e7f3ff;
padding: 20px;
border-radius: 5px;
border-left: 4px solid #007bff;
}
.recommendations li {
margin: 10px 0;
}
pre {
background: #f8f9fa;
padding: 10px;
border-radius: 3px;
overflow-x: auto;
max-width: 100%;
white-space: pre-wrap;
word-wrap: break-word;
}
@media (max-width: 768px) {
.container {
padding: 15px;
}
.summary-grid {
grid-template-columns: 1fr;
}
.risk-factor {
flex-direction: column;
text-align: center;
}
.validation-check {
flex-direction: column;
}
}
"""
def get_js_scripts(self) -> str:
"""Get JavaScript for interactive features."""
return """
// Add smooth scrolling to TOC links
document.addEventListener('DOMContentLoaded', function() {
const links = document.querySelectorAll('.toc a');
links.forEach(link => {
link.addEventListener('click', function(e) {
e.preventDefault();
const target = document.querySelector(this.getAttribute('href'));
if (target) {
target.scrollIntoView({ behavior: 'smooth' });
}
});
});
// Add collapsible sections for large tables
const tables = document.querySelectorAll('table');
tables.forEach(table => {
if (table.rows.length > 10) {
const toggle = document.createElement('button');
toggle.textContent = 'Toggle Details';
toggle.style.marginBottom = '10px';
toggle.addEventListener('click', function() {
const tbody = table.querySelector('tbody');
tbody.style.display = tbody.style.display === 'none' ? '' : 'none';
});
table.parentNode.insertBefore(toggle, table);
}
});
});
"""
def generate(comparison: Dict[str, Any], output_file: str) -> str:
"""Main function to generate HTML report."""
generator = HTMLReportGenerator()
return generator.generate_report(comparison, output_file)
@@ -0,0 +1,19 @@
# Scenario: Before/After Migration Comparison
## Description
Compare a pre-cutover host snapshot against a post-cutover snapshot and determine whether the migrated system is ready for production traffic.
## Commands
```bash
cd migration-validation-framework
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
```
## Expected Result
- The command writes a JSON diff.
- The result is `FAIL` because `sshd` is failed after migration.
- The risk assessment highlights the `/var` disk usage increase.
- The remediation path is to restore SSH and reduce or expand `/var` before approving cutover.
@@ -0,0 +1,501 @@
"""
Snapshot Comparison Engine
Compares two system snapshots and identifies differences,
risk levels, and validation results.
"""
import json
import logging
from typing import Dict, Any, List, Tuple
from datetime import datetime
logger = logging.getLogger(__name__)
class SnapshotComparator:
"""Engine for comparing system snapshots."""
def __init__(self):
self.risk_levels = {
"low": 1,
"medium": 2,
"high": 3,
"critical": 4
}
def compare_snapshots(self, snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare two snapshots and return detailed comparison results."""
logger.info("Starting snapshot comparison")
comparison = {
"summary": {},
"differences": {},
"risk_assessment": {},
"validation_results": {}
}
# Compare each data type
data_types = ["mounts", "services", "disk_usage"]
data1 = snapshot1.get("data", {})
data2 = snapshot2.get("data", {})
for data_type in data_types:
if self.data_type_exists(data1, data_type) or self.data_type_exists(data2, data_type):
differences = self.compare_data_type(data1, data2, data_type)
comparison["differences"][data_type] = differences
# Generate summary
comparison["summary"] = self.generate_summary(comparison["differences"])
# Risk assessment
comparison["risk_assessment"] = self.assess_risks(comparison["differences"])
# Validation results
comparison["validation_results"] = self.validate_changes(comparison["differences"])
comparison["validation_results"]["result"] = "PASS" if comparison["validation_results"]["passed"] else "FAIL"
logger.info("Snapshot comparison completed")
return comparison
def data_type_exists(self, systems: Dict[str, Any], data_type: str) -> bool:
"""Return true when at least one system has the requested collector data."""
return any(data_type in system_data for system_data in systems.values())
def compare_data_type(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare a specific data type between two snapshots."""
differences = {}
# Get all systems from both snapshots
systems1 = set(data1.keys())
systems2 = set(data2.keys())
all_systems = systems1.union(systems2)
for system in all_systems:
system_diffs = {}
if system not in data1:
system_diffs["status"] = "added"
system_diffs["details"] = {"new_system": True}
elif system not in data2:
system_diffs["status"] = "removed"
system_diffs["details"] = {"removed_system": True}
else:
# Compare data for this system and data type
if data_type in data1[system] and data_type in data2[system]:
system_diffs = self.compare_system_data(
data1[system][data_type],
data2[system][data_type],
data_type
)
else:
system_diffs["status"] = "data_missing"
system_diffs["details"] = {"missing_data_type": data_type}
if system_diffs:
differences[system] = system_diffs
return differences
def compare_system_data(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare data for a specific system and data type."""
differences = {}
if data_type == "mounts":
differences = self.compare_mounts(data1, data2)
elif data_type == "services":
differences = self.compare_services(data1, data2)
elif data_type == "disk_usage":
differences = self.compare_disk_usage(data1, data2)
else:
differences["status"] = "unknown_data_type"
return differences
def compare_mounts(self, mounts1: Dict[str, Any], mounts2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare mounts data between snapshots."""
differences = {
"added_mounts": [],
"removed_mounts": [],
"changed_mounts": [],
"usage_changes": []
}
# Compare mount lists
mounts_list1 = mounts1.get("mounts", [])
mounts_list2 = mounts2.get("mounts", [])
# Create mountpoint maps
mounts_map1 = {m["mountpoint"]: m for m in mounts_list1}
mounts_map2 = {m["mountpoint"]: m for m in mounts_list2}
# Find added and removed mounts
added = set(mounts_map2.keys()) - set(mounts_map1.keys())
removed = set(mounts_map1.keys()) - set(mounts_map2.keys())
differences["added_mounts"] = [{"mountpoint": mp, **mounts_map2[mp]} for mp in added]
differences["removed_mounts"] = [{"mountpoint": mp, **mounts_map1[mp]} for mp in removed]
# Find changed mounts
common = set(mounts_map1.keys()) & set(mounts_map2.keys())
for mp in common:
m1, m2 = mounts_map1[mp], mounts_map2[mp]
if m1 != m2:
differences["changed_mounts"].append({
"mountpoint": mp,
"before": m1,
"after": m2
})
# Compare usage statistics
usage1 = mounts1.get("usage", {})
usage2 = mounts2.get("usage", {})
for mp in set(usage1.keys()) | set(usage2.keys()):
if mp in usage1 and mp in usage2:
u1, u2 = usage1[mp], usage2[mp]
if u1 != u2:
differences["usage_changes"].append({
"mountpoint": mp,
"before": u1,
"after": u2
})
return differences
def compare_services(self, services1: Dict[str, Any], services2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare services data between snapshots."""
differences = {
"added_services": [],
"removed_services": [],
"status_changes": [],
"configuration_changes": []
}
# Compare service lists
services_list1 = services1.get("services", [])
services_list2 = services2.get("services", [])
# Create service maps
services_map1 = {s["name"]: s for s in services_list1}
services_map2 = {s["name"]: s for s in services_list2}
# Find added and removed services
added = set(services_map2.keys()) - set(services_map1.keys())
removed = set(services_map1.keys()) - set(services_map2.keys())
differences["added_services"] = [{"name": name, **services_map2[name]} for name in added]
differences["removed_services"] = [{"name": name, **services_map1[name]} for name in removed]
# Find status changes
common = set(services_map1.keys()) & set(services_map2.keys())
for name in common:
s1, s2 = services_map1[name], services_map2[name]
if s1.get("active_state") != s2.get("active_state") or s1.get("sub_state") != s2.get("sub_state"):
differences["status_changes"].append({
"name": name,
"before": {"active_state": s1.get("active_state"), "sub_state": s1.get("sub_state")},
"after": {"active_state": s2.get("active_state"), "sub_state": s2.get("sub_state")}
})
return differences
def compare_disk_usage(self, usage1: Dict[str, Any], usage2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare disk usage data between snapshots."""
differences = {
"filesystem_changes": [],
"directory_size_changes": [],
"significant_usage_changes": []
}
# Compare filesystem usage
fs1 = usage1.get("filesystem_usage", [])
fs2 = usage2.get("filesystem_usage", [])
# Create filesystem maps by mountpoint
fs_map1 = {fs["mountpoint"]: fs for fs in fs1}
fs_map2 = {fs["mountpoint"]: fs for fs in fs2}
common_fs = set(fs_map1.keys()) & set(fs_map2.keys())
for mp in common_fs:
f1, f2 = fs_map1[mp], fs_map2[mp]
if f1 != f2:
differences["filesystem_changes"].append({
"mountpoint": mp,
"before": f1,
"after": f2
})
# Check for significant usage changes
try:
use1 = int(f1.get("use_percent", "0").rstrip("%"))
use2 = int(f2.get("use_percent", "0").rstrip("%"))
if abs(use2 - use1) > 10: # 10% change threshold
differences["significant_usage_changes"].append({
"mountpoint": mp,
"change_percent": use2 - use1,
"before": f1,
"after": f2
})
except (ValueError, KeyError):
pass
return differences
def generate_summary(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a summary of all differences."""
summary = {
"total_systems": 0,
"systems_with_changes": 0,
"total_changes": 0,
"changes_by_type": {},
"most_affected_systems": []
}
system_change_counts = {}
for data_type, systems in differences.items():
summary["changes_by_type"][data_type] = 0
for system, system_diffs in systems.items():
if system not in system_change_counts:
system_change_counts[system] = 0
# Count changes for this system and data type
change_count = self.count_changes(system_diffs)
system_change_counts[system] += change_count
summary["changes_by_type"][data_type] += change_count
summary["total_changes"] += change_count
summary["total_systems"] = len(system_change_counts)
# Count systems with changes
summary["systems_with_changes"] = len([s for s in system_change_counts.values() if s > 0])
# Find most affected systems
sorted_systems = sorted(system_change_counts.items(), key=lambda x: x[1], reverse=True)
summary["most_affected_systems"] = sorted_systems[:5]
return summary
def count_changes(self, system_diffs: Dict[str, Any]) -> int:
"""Count the number of changes in system differences."""
count = 0
for key, value in system_diffs.items():
if isinstance(value, list):
count += len(value)
elif isinstance(value, dict) and key not in ["status"]:
# Count nested changes
count += sum(1 for v in value.values() if isinstance(v, list) and v)
return count
def assess_risks(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Assess risk levels for the changes."""
risk_assessment = {
"overall_risk": "low",
"risk_factors": [],
"critical_changes": [],
"recommendations": []
}
max_risk_level = 1
# Analyze each type of change
for data_type, systems in differences.items():
for system, system_diffs in systems.items():
risk_factors = self.analyze_system_risks(system_diffs, data_type)
risk_assessment["risk_factors"].extend(risk_factors)
for factor in risk_factors:
if factor["level"] > max_risk_level:
max_risk_level = factor["level"]
if factor["level"] >= 4: # Critical
risk_assessment["critical_changes"].append({
"system": system,
"data_type": data_type,
"factor": factor
})
# Set overall risk
risk_levels = {1: "low", 2: "medium", 3: "high", 4: "critical"}
risk_assessment["overall_risk"] = risk_levels.get(max_risk_level, "unknown")
# Generate recommendations
risk_assessment["recommendations"] = self.generate_recommendations(risk_assessment)
return risk_assessment
def analyze_system_risks(self, system_diffs: Dict[str, Any], data_type: str) -> List[Dict[str, Any]]:
"""Analyze risks for a specific system's changes."""
risk_factors = []
if data_type == "mounts":
# Check for removed critical mounts
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in ["/", "/boot", "/usr", "/var"]:
risk_factors.append({
"type": "critical_mount_removed",
"description": f"Critical mount point removed: {mount['mountpoint']}",
"level": 4
})
# Check for significant usage changes
for change in system_diffs.get("usage_changes", []):
try:
before_pct = int(change["before"].get("use_percent", "0").rstrip("%"))
after_pct = int(change["after"].get("use_percent", "0").rstrip("%"))
if after_pct > 95:
risk_factors.append({
"type": "filesystem_full",
"description": f"Filesystem usage critical: {change['mountpoint']} at {after_pct}%",
"level": 3
})
except (ValueError, KeyError):
pass
elif data_type == "services":
# Check for critical service changes
critical_services = ["sshd", "systemd", "networking", "dbus"]
for service in system_diffs.get("removed_services", []):
if service["name"] in critical_services:
risk_factors.append({
"type": "critical_service_removed",
"description": f"Critical service removed: {service['name']}",
"level": 4
})
for change in system_diffs.get("status_changes", []):
if change["after"]["active_state"] == "failed":
risk_factors.append({
"type": "service_failure",
"description": f"Service failed: {change['name']}",
"level": 3
})
elif data_type == "disk_usage":
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 20:
risk_factors.append({
"type": "disk_usage_spike",
"description": f"Significant disk usage change: {change['mountpoint']} ({change['change_percent']}%)",
"level": 2
})
return risk_factors
def generate_recommendations(self, risk_assessment: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on risk assessment."""
recommendations = []
if risk_assessment["overall_risk"] in ["high", "critical"]:
recommendations.append("Immediate review required - critical changes detected")
recommendations.append("Consider rolling back migration if critical services are affected")
if any(f["type"] == "critical_mount_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Verify system boot capability after mount changes")
if any(f["type"] == "critical_service_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Ensure critical services are restored before production cutover")
if any(f["type"] == "filesystem_full" for f in risk_assessment["risk_factors"]):
recommendations.append("Monitor disk space closely - cleanup may be required")
if not recommendations:
recommendations.append("Changes appear safe - proceed with standard validation procedures")
return recommendations
def validate_changes(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that changes meet requirements."""
validation_results = {
"passed": True,
"checks": [],
"failed_checks": []
}
# Define validation checks
checks = [
self.check_critical_services_running,
self.check_filesystem_integrity,
self.check_no_critical_mounts_removed
]
for check_func in checks:
check_result = check_func(differences)
validation_results["checks"].append(check_result)
if not check_result["passed"]:
validation_results["passed"] = False
validation_results["failed_checks"].append(check_result)
return validation_results
def check_critical_services_running(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that critical services are still running."""
check = {
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": True,
"details": []
}
critical_services = ["sshd", "systemd"]
for data_type, systems in differences.items():
if data_type == "services":
for system, system_diffs in systems.items():
for change in system_diffs.get("status_changes", []):
if change["name"] in critical_services:
if change["after"]["active_state"] == "failed":
check["passed"] = False
check["details"].append(f"Critical service {change['name']} failed on {system}")
return check
def check_filesystem_integrity(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check filesystem integrity after changes."""
check = {
"name": "filesystem_integrity",
"description": "Verify filesystem integrity maintained",
"passed": True,
"details": []
}
for data_type, systems in differences.items():
if data_type == "disk_usage":
for system, system_diffs in systems.items():
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 50: # Arbitrary threshold
check["passed"] = False
check["details"].append(f"Extreme usage change on {system}:{change['mountpoint']}")
return check
def check_no_critical_mounts_removed(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that no critical mount points were removed."""
check = {
"name": "no_critical_mounts_removed",
"description": "Verify critical mount points remain",
"passed": True,
"details": []
}
critical_mounts = ["/", "/boot", "/usr", "/var"]
for data_type, systems in differences.items():
if data_type == "mounts":
for system, system_diffs in systems.items():
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in critical_mounts:
check["passed"] = False
check["details"].append(f"Critical mount {mount['mountpoint']} removed from {system}")
return check
def compare_snapshots(snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Main comparison function."""
comparator = SnapshotComparator()
return comparator.compare_snapshots(snapshot1, snapshot2)