Initial CV-aligned infrastructure portfolio
ci / validate (push) Failing after 1m8s

Rework portfolio around Linux operations, Zabbix monitoring, migration validation, and ELK/Grafana log observability.

Add AAP-style LVM resize workflow, Zabbix server/proxy/agent automation assets, Linux/AIX monitoring templates, and updated validation CI.
This commit is contained in:
Mateusz Suski
2026-05-04 17:37:24 +00:00
commit 35e6b139fc
114 changed files with 6422 additions and 0 deletions
@@ -0,0 +1,11 @@
.PHONY: run test demo
run:
python3 cli.py --help
test:
python3 -m py_compile cli.py collectors/*.py validators/*.py reports/*.py
python3 -m unittest discover -s tests
demo:
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
@@ -0,0 +1,87 @@
# Migration Validation Framework
## Problem
Infrastructure migrations often fail in small, expensive ways: a mount option changes, a service is disabled, or disk usage moves past an operational threshold. Teams need structured evidence that the migrated host still matches the expected operating profile.
## CV Relevance
This project maps to storage/platform migration validation work: collecting pre-migration and post-migration state, comparing results, and producing evidence that can be attached to change or migration tickets.
## What This Project Demonstrates
- A Python CLI for collecting and comparing system snapshots.
- Modular collectors for mounts, services, and disk usage.
- Risk assessment and validation checks for before/after drift.
- JSON and HTML evidence suitable for migration review.
- Offline tests and examples that run without remote hosts.
## Architecture
```
Operator -> CLI -> Collectors -> JSON Snapshot -> Comparator -> Diff/Report
```
Core components:
- `cli.py` provides collect, compare, snapshot, list, and report commands.
- `collectors/` gathers mounts, services, and disk usage.
- `validators/compare.py` identifies drift and validation failures.
- `reports/` contains report generation helpers with escaped HTML output.
- `examples/` contains realistic before/after evidence.
## Quickstart
```bash
cd professional-infra/migration-validation-framework
make test
make demo
```
The demo compares the included example snapshots:
```bash
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
```
The example intentionally returns `FAIL` because it demonstrates a high-risk migration finding.
Legacy snapshot IDs are still supported:
```bash
python3 cli.py snapshot --env prod --label pre --systems web01,db01
python3 cli.py compare prod-pre-20260429_020000 prod-post-20260429_030000 --output change-0429
```
## Validation
```bash
make test
```
This compiles the Python modules and runs unit tests for example comparison, parser behavior, and HTML escaping.
## Example Output
```text
Comparison completed: diff.json (FAIL)
Overall risk: high
Total changes: 4
Failed checks: critical_services_running
Recommendation: restore sshd before production cutover
```
Sample inputs and output are available in [examples/before.json](examples/before.json), [examples/after.json](examples/after.json), and [examples/diff.json](examples/diff.json).
## Roadmap
- Add database-specific migration checks.
- Add performance baseline comparisons.
- Add a REST API wrapper for CI/CD integration.
- Add compliance-oriented validation profiles.
## Interview Talking Points
- Why pre/post migration evidence reduces risk during storage and platform migrations.
- How to separate collection from comparison so evidence can be replayed.
- How drift detection supports change approval and rollback decisions.
@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Migration Validation Framework - CLI Interface
A comprehensive tool for validating system migrations through data collection,
snapshot comparison, and automated reporting.
"""
import argparse
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
# Import framework modules
from collectors import mounts, services, disk_usage
from validators import compare
from reports import html_report
# Configuration
SNAPSHOTS_DIR = Path("snapshots")
LOGS_DIR = Path("logs")
REPORTS_DIR = Path("reports")
class MigrationValidator:
"""Main migration validation class."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.ensure_directories()
self.setup_logging()
def setup_logging(self):
"""Configure logging."""
log_level = logging.DEBUG if self.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOGS_DIR / "validation.log"),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def ensure_directories(self):
"""Ensure required directories exist."""
for directory in [SNAPSHOTS_DIR, LOGS_DIR, REPORTS_DIR]:
directory.mkdir(exist_ok=True)
def collect_system_data(self, systems: List[str]) -> Dict[str, Any]:
"""Collect data from target systems."""
self.logger.info(f"Collecting data from systems: {systems}")
snapshot = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"systems": systems,
"version": "1.0"
},
"data": {}
}
collectors = [
("mounts", mounts.collect),
("services", services.collect),
("disk_usage", disk_usage.collect)
]
for system in systems:
self.logger.info(f"Collecting data from {system}")
snapshot["data"][system] = {}
for collector_name, collector_func in collectors:
try:
self.logger.debug(f"Running {collector_name} collector on {system}")
data = collector_func(system)
snapshot["data"][system][collector_name] = data
except Exception as e:
self.logger.error(f"Failed to collect {collector_name} from {system}: {e}")
snapshot["data"][system][collector_name] = {"error": str(e)}
return snapshot
def save_snapshot(self, snapshot: Dict[str, Any], label: str, env: str) -> str:
"""Save snapshot to disk."""
snapshot_id = f"{env}-{label}-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json"
with open(snapshot_file, 'w') as f:
json.dump(snapshot, f, indent=2)
self.logger.info(f"Snapshot saved: {snapshot_id}")
return snapshot_id
def load_snapshot(self, snapshot_id: str) -> Dict[str, Any]:
"""Load snapshot from disk."""
snapshot_path = Path(snapshot_id)
snapshot_file = snapshot_path if snapshot_path.exists() else SNAPSHOTS_DIR / f"{snapshot_id}.json"
if not snapshot_file.exists():
raise FileNotFoundError(f"Snapshot {snapshot_id} not found")
with open(snapshot_file, 'r') as f:
return json.load(f)
def collect_to_file(self, output_file: str, systems: List[str]) -> str:
"""Collect a snapshot and write it to an explicit file path."""
snapshot = self.collect_system_data(systems)
with open(output_file, 'w') as f:
json.dump(snapshot, f, indent=2)
f.write("\n")
self.logger.info(f"Snapshot written: {output_file}")
return output_file
def create_snapshot(self, env: str, label: str, systems: List[str]) -> str:
"""Create and save a system snapshot."""
self.logger.info(f"Creating snapshot for environment: {env}, label: {label}")
snapshot = self.collect_system_data(systems)
snapshot_id = self.save_snapshot(snapshot, label, env)
return snapshot_id
def compare_snapshots(self, snapshot1_id: str, snapshot2_id: str, output_id: str) -> Dict[str, Any]:
"""Compare two snapshots."""
self.logger.info(f"Comparing snapshots: {snapshot1_id} vs {snapshot2_id}")
snapshot1 = self.load_snapshot(snapshot1_id)
snapshot2 = self.load_snapshot(snapshot2_id)
comparison = compare.compare_snapshots(snapshot1, snapshot2)
comparison["metadata"] = {
"snapshot1": snapshot1_id,
"snapshot2": snapshot2_id,
"timestamp": datetime.now().isoformat(),
"comparison_id": output_id
}
# Save comparison results
comparison_file = REPORTS_DIR / f"comparison_{output_id}.json"
with open(comparison_file, 'w') as f:
json.dump(comparison, f, indent=2)
self.logger.info(f"Comparison saved: {output_id}")
return comparison
def compare_files(self, before_file: str, after_file: str, output_file: Optional[str] = None) -> Dict[str, Any]:
"""Compare two explicit JSON snapshot files."""
self.logger.info(f"Comparing files: {before_file} vs {after_file}")
before = self.load_snapshot(before_file)
after = self.load_snapshot(after_file)
comparison = compare.compare_snapshots(before, after)
comparison["metadata"] = {
"before": before_file,
"after": after_file,
"timestamp": datetime.now().isoformat()
}
if output_file:
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
f.write("\n")
self.logger.info(f"Comparison written: {output_file}")
return comparison
def generate_report(self, comparison_id: str, format_type: str, output_file: Optional[str] = None) -> str:
"""Generate a report from comparison results."""
self.logger.info(f"Generating {format_type} report for comparison: {comparison_id}")
comparison_file = REPORTS_DIR / f"comparison_{comparison_id}.json"
if not comparison_file.exists():
raise FileNotFoundError(f"Comparison {comparison_id} not found")
with open(comparison_file, 'r') as f:
comparison = json.load(f)
if format_type == "html":
if output_file is None:
output_file = f"migration_report_{comparison_id}.html"
html_report.generate(comparison, output_file)
elif format_type == "json":
if output_file is None:
output_file = f"migration_report_{comparison_id}.json"
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
else:
raise ValueError(f"Unsupported format: {format_type}")
self.logger.info(f"Report generated: {output_file}")
return output_file
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="Migration Validation Framework",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Collect pre-migration snapshot
python3 cli.py collect --output before.json --systems web01,db01
# Compare snapshot files
python3 cli.py compare before.json after.json --output diff.json
# Generate HTML report
python3 cli.py report --comparison comparison_001 --format html
"""
)
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
parser.add_argument('--dry-run', action='store_true', help='Preview actions without execution')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Collect command
collect_parser = subparsers.add_parser('collect', help='Collect a system snapshot to a JSON file')
collect_parser.add_argument('--output', required=True, help='Output JSON file')
collect_parser.add_argument('--systems', default='localhost', help='Comma-separated list of systems')
# Snapshot command
snapshot_parser = subparsers.add_parser('snapshot', help='Create system snapshot')
snapshot_parser.add_argument('--env', required=True, help='Target environment')
snapshot_parser.add_argument('--label', required=True, help='Snapshot label')
snapshot_parser.add_argument('--systems', required=True, help='Comma-separated list of systems')
# Compare command
compare_parser = subparsers.add_parser('compare', help='Compare two snapshots')
compare_parser.add_argument('snapshot1', help='First snapshot ID')
compare_parser.add_argument('snapshot2', help='Second snapshot ID')
compare_parser.add_argument('--output', help='Output comparison ID or JSON file')
# Report command
report_parser = subparsers.add_parser('report', help='Generate report from comparison')
report_parser.add_argument('--comparison', required=True, help='Comparison ID')
report_parser.add_argument('--format', choices=['html', 'json'], default='html', help='Report format')
report_parser.add_argument('--output', help='Output file path')
# List command
list_parser = subparsers.add_parser('list', help='List snapshots or comparisons')
list_parser.add_argument('type', choices=['snapshots', 'comparisons'], help='Type to list')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize validator
validator = MigrationValidator(verbose=args.verbose)
try:
if args.command == 'collect':
systems = [system.strip() for system in args.systems.split(',') if system.strip()]
if args.dry_run:
print(f"DRY RUN: Would collect {systems} into {args.output}")
return
output_file = validator.collect_to_file(args.output, systems)
print(f"Snapshot written: {output_file}")
elif args.command == 'snapshot':
systems = args.systems.split(',')
if args.dry_run:
print(f"DRY RUN: Would create snapshot for systems: {systems}")
return
snapshot_id = validator.create_snapshot(args.env, args.label, systems)
print(f"Snapshot created: {snapshot_id}")
elif args.command == 'compare':
if args.dry_run:
print(f"DRY RUN: Would compare {args.snapshot1} vs {args.snapshot2}")
return
output = args.output
if output and output.endswith('.json'):
comparison = validator.compare_files(args.snapshot1, args.snapshot2, output)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output} ({result})")
else:
output_id = output or datetime.now().strftime('%Y%m%d_%H%M%S')
comparison = validator.compare_snapshots(args.snapshot1, args.snapshot2, output_id)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output_id} ({result})")
elif args.command == 'report':
if args.dry_run:
print(f"DRY RUN: Would generate {args.format} report for {args.comparison}")
return
output_file = validator.generate_report(args.comparison, args.format, args.output)
print(f"Report generated: {output_file}")
elif args.command == 'list':
if args.type == 'snapshots':
snapshots = list(SNAPSHOTS_DIR.glob("*.json"))
if snapshots:
print("Available snapshots:")
for snapshot in sorted(snapshots):
print(f" {snapshot.stem}")
else:
print("No snapshots found")
elif args.type == 'comparisons':
comparisons = list(REPORTS_DIR.glob("comparison_*.json"))
if comparisons:
print("Available comparisons:")
for comparison in sorted(comparisons):
comp_id = comparison.stem.replace('comparison_', '')
print(f" {comp_id}")
else:
print("No comparisons found")
except Exception as e:
validator.logger.error(f"Command failed: {e}")
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
@@ -0,0 +1,206 @@
"""
Disk Usage Data Collector
Collects disk usage statistics including directory sizes,
file system usage, and largest files information.
"""
import logging
import shlex
import subprocess
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class DiskUsageCollector:
"""Collector for disk usage statistics."""
def __init__(self):
self.max_depth = 3
self.exclude_paths = [
"/proc",
"/sys",
"/dev",
"/run",
"/tmp",
"/var/log"
]
def collect_disk_usage(self, system: str) -> Dict[str, Any]:
"""Collect disk usage information from target system."""
logger.info(f"Collecting disk usage data from {system}")
try:
# Collect filesystem usage
filesystem_usage = self.collect_filesystem_usage(system)
# Collect directory sizes
directory_sizes = self.collect_directory_sizes(system)
# Collect largest files
largest_files = self.collect_largest_files(system)
return {
"filesystem_usage": filesystem_usage,
"directory_sizes": directory_sizes,
"largest_files": largest_files,
"timestamp": self.get_timestamp(system)
}
except Exception as e:
logger.error(f"Failed to collect disk usage from {system}: {e}")
raise
def collect_filesystem_usage(self, system: str) -> List[Dict[str, Any]]:
"""Collect filesystem usage statistics."""
usage_stats = []
try:
# Run df command
result = subprocess.run(
["ssh", system, "df -h --output=source,fstype,size,used,avail,pcent,target"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"df command failed: {result.stderr}")
# Parse output
lines = result.stdout.strip().split('\n')
if len(lines) < 2:
return usage_stats
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 7:
usage_stat = {
"filesystem": parts[0],
"type": parts[1],
"size": parts[2],
"used": parts[3],
"available": parts[4],
"use_percent": parts[5],
"mountpoint": parts[6]
}
usage_stats.append(usage_stat)
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting filesystem usage from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect filesystem usage from {system}: {e}")
raise
return usage_stats
def collect_directory_sizes(self, system: str) -> List[Dict[str, Any]]:
"""Collect sizes of top-level directories."""
directory_sizes = []
try:
# Get top-level directories
dirs_to_check = ["/", "/home", "/var", "/usr", "/opt", "/etc"]
for directory in dirs_to_check:
if directory in self.exclude_paths:
continue
try:
# Run du command for directory size
result = subprocess.run(
["ssh", system, f"du -sh -- {shlex.quote(directory)} 2>/dev/null"],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
size, path = result.stdout.strip().split('\t', 1)
directory_sizes.append({
"path": path,
"size": size
})
except subprocess.TimeoutExpired:
logger.warning(f"Timeout getting size for {directory} on {system}")
continue
except Exception as e:
logger.warning(f"Failed to get size for {directory} on {system}: {e}")
continue
except Exception as e:
logger.error(f"Failed to collect directory sizes from {system}: {e}")
raise
return directory_sizes
def collect_largest_files(self, system: str) -> List[Dict[str, Any]]:
"""Collect information about largest files in the system."""
largest_files = []
try:
# Find largest files (excluding certain paths)
exclude_expr = " ".join(f"-not -path {shlex.quote(path + '/*')}" for path in self.exclude_paths)
cmd = f"find / {exclude_expr} -type f -exec ls -lh {{}} \\; 2>/dev/null | sort -k5 -hr | head -20"
result = subprocess.run(
["ssh", system, cmd],
capture_output=True,
text=True,
timeout=120
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 9:
file_info = {
"permissions": parts[0],
"links": parts[1],
"owner": parts[2],
"group": parts[3],
"size": parts[4],
"month": parts[5],
"day": parts[6],
"time": parts[7],
"path": " ".join(parts[8:])
}
largest_files.append(file_info)
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting largest files from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect largest files from {system}: {e}")
raise
return largest_files
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for disk usage data."""
collector = DiskUsageCollector()
return collector.collect_disk_usage(system)
@@ -0,0 +1,172 @@
"""
Mounts Data Collector
Collects filesystem mount information including mount points, devices,
filesystem types, and usage statistics.
"""
import logging
import shlex
import subprocess
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class MountsCollector:
"""Collector for filesystem mount information."""
def __init__(self):
self.exclude_patterns = [
"/proc/*",
"/sys/*",
"/dev/*",
"/run/*"
]
def collect_mounts(self, system: str) -> Dict[str, Any]:
"""Collect mount information from target system."""
logger.info(f"Collecting mounts data from {system}")
try:
# Run mount command
result = subprocess.run(
["ssh", system, "mount"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"Mount command failed: {result.stderr}")
mounts = self.parse_mount_output(result.stdout)
filtered_mounts = self.filter_mounts(mounts)
# Get usage statistics
usage_stats = self.collect_usage_stats(system, filtered_mounts)
return {
"mounts": filtered_mounts,
"usage": usage_stats,
"timestamp": self.get_timestamp(system)
}
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting mounts from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect mounts from {system}: {e}")
raise
def parse_mount_output(self, output: str) -> List[Dict[str, str]]:
"""Parse mount command output."""
mounts = []
for line in output.strip().split('\n'):
if not line.strip():
continue
# Parse mount output format: device on mountpoint type fstype (options)
parts = line.split()
if len(parts) >= 6 and parts[1] == 'on' and parts[3] == 'type':
mount_info = {
"device": parts[0],
"mountpoint": parts[2],
"fstype": parts[4],
"options": parts[5].strip('()')
}
mounts.append(mount_info)
return mounts
def filter_mounts(self, mounts: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Filter out unwanted mount points."""
filtered = []
for mount in mounts:
mountpoint = mount["mountpoint"]
if not any(mountpoint.startswith(pattern.rstrip('*')) for pattern in self.exclude_patterns):
filtered.append(mount)
return filtered
def collect_usage_stats(self, system: str, mounts: List[Dict[str, str]]) -> Dict[str, Any]:
"""Collect disk usage statistics for mount points."""
usage_stats = {}
for mount in mounts:
mountpoint = mount["mountpoint"]
try:
# Run df command for usage statistics
result = subprocess.run(
["ssh", system, f"df -BG -- {shlex.quote(mountpoint)}"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0:
usage_stats[mountpoint] = self.parse_df_output(result.stdout)
except subprocess.TimeoutExpired:
logger.warning(f"Timeout getting usage for {mountpoint} on {system}")
usage_stats[mountpoint] = {"error": "timeout"}
except Exception as e:
logger.warning(f"Failed to get usage for {mountpoint} on {system}: {e}")
usage_stats[mountpoint] = {"error": str(e)}
return usage_stats
def parse_df_output(self, output: str) -> Dict[str, Any]:
"""Parse df command output."""
lines = output.strip().split('\n')
if len(lines) < 2:
return {"error": "invalid df output"}
# Parse header and data
header = lines[0].split()
data = lines[1].split()
if len(header) != len(data):
return {"error": "header/data mismatch"}
stats = {}
for i, field in enumerate(header):
if i < len(data):
if field in ['1G-blocks', 'Used', 'Available']:
# Convert to GB
value = data[i]
if value.endswith('G'):
stats[field.lower()] = float(value.rstrip('G'))
else:
stats[field.lower()] = float(value) / (1024**3) # Assume bytes
elif field == 'Use%':
stats['use_percent'] = int(data[i].rstrip('%'))
else:
stats[field.lower()] = data[i]
return stats
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for mounts data."""
collector = MountsCollector()
return collector.collect_mounts(system)
@@ -0,0 +1,223 @@
"""
Services Data Collector
Collects system service information including running services,
their states, startup configuration, and dependencies.
"""
import logging
import shlex
import subprocess
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class ServicesCollector:
"""Collector for system service information."""
def __init__(self):
self.service_manager = "systemd" # Default to systemd
self.include_disabled = False
def collect_services(self, system: str) -> Dict[str, Any]:
"""Collect service information from target system."""
logger.info(f"Collecting services data from {system}")
try:
# Detect service manager
service_manager = self.detect_service_manager(system)
if service_manager == "systemd":
services = self.collect_systemd_services(system)
elif service_manager == "sysv":
services = self.collect_sysv_services(system)
else:
raise RuntimeError(f"Unsupported service manager: {service_manager}")
return {
"service_manager": service_manager,
"services": services,
"timestamp": self.get_timestamp(system)
}
except Exception as e:
logger.error(f"Failed to collect services from {system}: {e}")
raise
def detect_service_manager(self, system: str) -> str:
"""Detect which service manager is running on the system."""
try:
# Check for systemd
result = subprocess.run(
["ssh", system, "ps -p 1 -o comm="],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
if "systemd" in result.stdout.strip():
return "systemd"
elif "init" in result.stdout.strip():
return "sysv"
# Fallback check
result = subprocess.run(
["ssh", system, "which systemctl"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return "systemd"
return "sysv"
except Exception:
return "unknown"
def collect_systemd_services(self, system: str) -> List[Dict[str, Any]]:
"""Collect systemd service information."""
services = []
try:
# Get all services
result = subprocess.run(
["ssh", system, "systemctl list-units --type=service --all --no-pager --no-legend"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"systemctl list-units failed: {result.stderr}")
# Parse service list
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split()
if len(parts) >= 4:
service_name = parts[0]
load_state = parts[1]
active_state = parts[2]
sub_state = parts[3]
# Skip if disabled and not including disabled
if not self.include_disabled and load_state == "not-found":
continue
# Get detailed service info
service_info = self.get_systemd_service_details(system, service_name)
services.append({
"name": service_name,
"load_state": load_state,
"active_state": active_state,
"sub_state": sub_state,
**service_info
})
except subprocess.TimeoutExpired:
logger.error(f"Timeout collecting systemd services from {system}")
raise
except Exception as e:
logger.error(f"Failed to collect systemd services from {system}: {e}")
raise
return services
def get_systemd_service_details(self, system: str, service_name: str) -> Dict[str, Any]:
"""Get detailed information for a systemd service."""
details = {}
try:
# Get service status
result = subprocess.run(
["ssh", system, f"systemctl show {shlex.quote(service_name)} --no-pager"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
details[key.lower()] = value
except Exception as e:
logger.warning(f"Failed to get details for {service_name}: {e}")
return details
def collect_sysv_services(self, system: str) -> List[Dict[str, Any]]:
"""Collect SysV init service information."""
services = []
try:
# Get service list from /etc/init.d/
result = subprocess.run(
["ssh", system, "ls -1 /etc/init.d/"],
capture_output=True,
text=True,
timeout=15
)
if result.returncode != 0:
raise RuntimeError(f"Failed to list init.d services: {result.stderr}")
for service_name in result.stdout.strip().split('\n'):
if not service_name.strip():
continue
# Get service status
status_result = subprocess.run(
["ssh", system, f"/etc/init.d/{shlex.quote(service_name)} status"],
capture_output=True,
text=True,
timeout=10
)
status = "unknown"
if status_result.returncode == 0:
status = "running"
elif "not running" in status_result.stdout.lower():
status = "stopped"
services.append({
"name": service_name,
"status": status,
"type": "sysv"
})
except Exception as e:
logger.error(f"Failed to collect SysV services from {system}: {e}")
raise
return services
def get_timestamp(self, system: str) -> str:
"""Get current timestamp from target system."""
try:
result = subprocess.run(
["ssh", system, "date -Iseconds"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
else:
return "unknown"
except Exception:
return "unknown"
def collect(system: str) -> Dict[str, Any]:
"""Main collection function for services data."""
collector = ServicesCollector()
return collector.collect_services(system)
@@ -0,0 +1,30 @@
# Migration Validation Framework Architecture
## Components
- CLI: parses operator commands and coordinates workflows.
- Collectors: gather mounts, services, and disk usage from target systems.
- Snapshot files: JSON evidence used as immutable migration checkpoints.
- Comparator: evaluates drift between before and after snapshots.
- Reports: stores JSON or HTML output for audit and review.
## Data Flow
```
Operator
-> python3 cli.py collect
-> collectors over SSH
-> before.json / after.json
-> python3 cli.py compare
-> diff.json with PASS/FAIL validation
```
## Validation Flow
```
before.json -> Comparator -> service checks
after.json -> Comparator -> filesystem checks -> validation result
-> mount checks
```
The framework keeps collection and comparison separate so migration evidence can be reviewed, archived, and replayed without recollecting from production systems.
@@ -0,0 +1,40 @@
{
"metadata": {
"timestamp": "2026-04-29T03:40:00Z",
"systems": ["web01"],
"version": "1.0"
},
"data": {
"web01": {
"mounts": {
"mounts": [
{"device": "/dev/sda1", "mountpoint": "/", "fstype": "ext4", "options": "rw,relatime"},
{"device": "/dev/sdb1", "mountpoint": "/var", "fstype": "xfs", "options": "rw,noatime"}
],
"usage": {
"/": {"filesystem": "/dev/sda1", "use_percent": "62%"},
"/var": {"filesystem": "/dev/sdb1", "use_percent": "94%"}
},
"timestamp": "2026-04-29T03:40:00Z"
},
"services": {
"service_manager": "systemd",
"services": [
{"name": "sshd", "active_state": "failed", "sub_state": "failed"},
{"name": "nginx", "active_state": "active", "sub_state": "running"},
{"name": "node-exporter", "active_state": "active", "sub_state": "running"}
],
"timestamp": "2026-04-29T03:40:00Z"
},
"disk_usage": {
"filesystem_usage": [
{"filesystem": "/dev/sda1", "type": "ext4", "size": "80G", "used": "50G", "available": "30G", "use_percent": "62%", "mountpoint": "/"},
{"filesystem": "/dev/sdb1", "type": "xfs", "size": "200G", "used": "188G", "available": "12G", "use_percent": "94%", "mountpoint": "/var"}
],
"directory_sizes": [{"path": "/var/lib/app", "size": "139G"}],
"largest_files": [{"path": "/var/lib/app/import/archive.tar", "size": "42G"}],
"timestamp": "2026-04-29T03:40:00Z"
}
}
}
}
@@ -0,0 +1,39 @@
{
"metadata": {
"timestamp": "2026-04-29T01:15:00Z",
"systems": ["web01"],
"version": "1.0"
},
"data": {
"web01": {
"mounts": {
"mounts": [
{"device": "/dev/sda1", "mountpoint": "/", "fstype": "ext4", "options": "rw,relatime"},
{"device": "/dev/sdb1", "mountpoint": "/var", "fstype": "xfs", "options": "rw,noatime"}
],
"usage": {
"/": {"filesystem": "/dev/sda1", "use_percent": "61%"},
"/var": {"filesystem": "/dev/sdb1", "use_percent": "68%"}
},
"timestamp": "2026-04-29T01:15:00Z"
},
"services": {
"service_manager": "systemd",
"services": [
{"name": "sshd", "active_state": "active", "sub_state": "running"},
{"name": "nginx", "active_state": "active", "sub_state": "running"}
],
"timestamp": "2026-04-29T01:15:00Z"
},
"disk_usage": {
"filesystem_usage": [
{"filesystem": "/dev/sda1", "type": "ext4", "size": "80G", "used": "49G", "available": "31G", "use_percent": "61%", "mountpoint": "/"},
{"filesystem": "/dev/sdb1", "type": "xfs", "size": "200G", "used": "136G", "available": "64G", "use_percent": "68%", "mountpoint": "/var"}
],
"directory_sizes": [{"path": "/var/lib/app", "size": "84G"}],
"largest_files": [],
"timestamp": "2026-04-29T01:15:00Z"
}
}
}
}
@@ -0,0 +1,211 @@
{
"summary": {
"total_systems": 1,
"systems_with_changes": 1,
"total_changes": 7,
"changes_by_type": {
"mounts": 2,
"services": 2,
"disk_usage": 3
},
"most_affected_systems": [
[
"web01",
7
]
]
},
"differences": {
"mounts": {
"web01": {
"added_mounts": [],
"removed_mounts": [],
"changed_mounts": [],
"usage_changes": [
{
"mountpoint": "/",
"before": {
"filesystem": "/dev/sda1",
"use_percent": "61%"
},
"after": {
"filesystem": "/dev/sda1",
"use_percent": "62%"
}
},
{
"mountpoint": "/var",
"before": {
"filesystem": "/dev/sdb1",
"use_percent": "68%"
},
"after": {
"filesystem": "/dev/sdb1",
"use_percent": "94%"
}
}
]
}
},
"services": {
"web01": {
"added_services": [
{
"name": "node-exporter",
"active_state": "active",
"sub_state": "running"
}
],
"removed_services": [],
"status_changes": [
{
"name": "sshd",
"before": {
"active_state": "active",
"sub_state": "running"
},
"after": {
"active_state": "failed",
"sub_state": "failed"
}
}
],
"configuration_changes": []
}
},
"disk_usage": {
"web01": {
"filesystem_changes": [
{
"mountpoint": "/",
"before": {
"filesystem": "/dev/sda1",
"type": "ext4",
"size": "80G",
"used": "49G",
"available": "31G",
"use_percent": "61%",
"mountpoint": "/"
},
"after": {
"filesystem": "/dev/sda1",
"type": "ext4",
"size": "80G",
"used": "50G",
"available": "30G",
"use_percent": "62%",
"mountpoint": "/"
}
},
{
"mountpoint": "/var",
"before": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "136G",
"available": "64G",
"use_percent": "68%",
"mountpoint": "/var"
},
"after": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "188G",
"available": "12G",
"use_percent": "94%",
"mountpoint": "/var"
}
}
],
"directory_size_changes": [],
"significant_usage_changes": [
{
"mountpoint": "/var",
"change_percent": 26,
"before": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "136G",
"available": "64G",
"use_percent": "68%",
"mountpoint": "/var"
},
"after": {
"filesystem": "/dev/sdb1",
"type": "xfs",
"size": "200G",
"used": "188G",
"available": "12G",
"use_percent": "94%",
"mountpoint": "/var"
}
}
]
}
}
},
"risk_assessment": {
"overall_risk": "high",
"risk_factors": [
{
"type": "service_failure",
"description": "Service failed: sshd",
"level": 3
},
{
"type": "disk_usage_spike",
"description": "Significant disk usage change: /var (26%)",
"level": 2
}
],
"critical_changes": [],
"recommendations": [
"Immediate review required - critical changes detected",
"Consider rolling back migration if critical services are affected"
]
},
"validation_results": {
"passed": false,
"checks": [
{
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": false,
"details": [
"Critical service sshd failed on web01"
]
},
{
"name": "filesystem_integrity",
"description": "Verify filesystem integrity maintained",
"passed": true,
"details": []
},
{
"name": "no_critical_mounts_removed",
"description": "Verify critical mount points remain",
"passed": true,
"details": []
}
],
"failed_checks": [
{
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": false,
"details": [
"Critical service sshd failed on web01"
]
}
],
"result": "FAIL"
},
"metadata": {
"before": "migration-validation-framework/examples/before.json",
"after": "migration-validation-framework/examples/after.json",
"timestamp": "2026-04-29T23:29:07.510774"
}
}
@@ -0,0 +1,19 @@
# Scenario: Before/After Migration Comparison
## Description
Compare a pre-cutover host snapshot against a post-cutover snapshot and determine whether the migrated system is ready for production traffic.
## Commands
```bash
cd professional-infra/migration-validation-framework
python3 cli.py compare examples/before.json examples/after.json --output /tmp/migration-diff.json
```
## Expected Result
- The command writes a JSON diff.
- The result is `FAIL` because `sshd` is failed after migration.
- The risk assessment highlights the `/var` disk usage increase.
- The remediation path is to restore SSH and reduce or expand `/var` before approving cutover.
@@ -0,0 +1,67 @@
import json
import unittest
from pathlib import Path
from collectors.mounts import MountsCollector
from reports.html_report import HTMLReportGenerator
from validators.compare import compare_snapshots
PROJECT_ROOT = Path(__file__).resolve().parents[1]
class ComparatorExampleTests(unittest.TestCase):
def test_example_comparison_detects_expected_failure(self):
before = json.loads((PROJECT_ROOT / "examples" / "before.json").read_text())
after = json.loads((PROJECT_ROOT / "examples" / "after.json").read_text())
comparison = compare_snapshots(before, after)
self.assertFalse(comparison["validation_results"]["passed"])
self.assertEqual(comparison["validation_results"]["result"], "FAIL")
self.assertGreater(comparison["summary"]["total_changes"], 0)
class HtmlReportTests(unittest.TestCase):
def test_report_escapes_untrusted_snapshot_content(self):
report = HTMLReportGenerator().build_html_content({
"metadata": {"comparison_id": "<script>alert(1)</script>"},
"summary": {
"total_systems": 1,
"systems_with_changes": 1,
"total_changes": 1,
"changes_by_type": {"services": 1},
"most_affected_systems": [("<img src=x onerror=alert(1)>", 1)],
},
"differences": {},
"risk_assessment": {"overall_risk": "low", "risk_factors": [], "critical_changes": [], "recommendations": ["Review <b>change</b>"]},
"validation_results": {"passed": True, "checks": []},
})
self.assertNotIn("<script>alert(1)</script>", report)
self.assertNotIn("<img src=x onerror=alert(1)>", report)
self.assertIn("&lt;script&gt;alert(1)&lt;/script&gt;", report)
self.assertIn("&lt;b&gt;change&lt;/b&gt;", report)
class CollectorParserTests(unittest.TestCase):
def test_mount_parser_handles_standard_mount_output(self):
output = "/dev/sda1 on / type ext4 (rw,relatime)\nproc on /proc type proc (rw,nosuid,nodev,noexec,relatime)\n"
mounts = MountsCollector().parse_mount_output(output)
self.assertEqual(mounts[0]["device"], "/dev/sda1")
self.assertEqual(mounts[0]["mountpoint"], "/")
self.assertEqual(mounts[0]["fstype"], "ext4")
def test_df_parser_handles_gigabyte_output(self):
output = "Filesystem 1G-blocks Used Available Use% Mounted\n/dev/sda1 100G 45G 55G 45% /\n"
stats = MountsCollector().parse_df_output(output)
self.assertEqual(stats["1g-blocks"], 100.0)
self.assertEqual(stats["used"], 45.0)
self.assertEqual(stats["available"], 55.0)
self.assertEqual(stats["use_percent"], 45)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,501 @@
"""
Snapshot Comparison Engine
Compares two system snapshots and identifies differences,
risk levels, and validation results.
"""
import json
import logging
from typing import Dict, Any, List, Tuple
from datetime import datetime
logger = logging.getLogger(__name__)
class SnapshotComparator:
"""Engine for comparing system snapshots."""
def __init__(self):
self.risk_levels = {
"low": 1,
"medium": 2,
"high": 3,
"critical": 4
}
def compare_snapshots(self, snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare two snapshots and return detailed comparison results."""
logger.info("Starting snapshot comparison")
comparison = {
"summary": {},
"differences": {},
"risk_assessment": {},
"validation_results": {}
}
# Compare each data type
data_types = ["mounts", "services", "disk_usage"]
data1 = snapshot1.get("data", {})
data2 = snapshot2.get("data", {})
for data_type in data_types:
if self.data_type_exists(data1, data_type) or self.data_type_exists(data2, data_type):
differences = self.compare_data_type(data1, data2, data_type)
comparison["differences"][data_type] = differences
# Generate summary
comparison["summary"] = self.generate_summary(comparison["differences"])
# Risk assessment
comparison["risk_assessment"] = self.assess_risks(comparison["differences"])
# Validation results
comparison["validation_results"] = self.validate_changes(comparison["differences"])
comparison["validation_results"]["result"] = "PASS" if comparison["validation_results"]["passed"] else "FAIL"
logger.info("Snapshot comparison completed")
return comparison
def data_type_exists(self, systems: Dict[str, Any], data_type: str) -> bool:
"""Return true when at least one system has the requested collector data."""
return any(data_type in system_data for system_data in systems.values())
def compare_data_type(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare a specific data type between two snapshots."""
differences = {}
# Get all systems from both snapshots
systems1 = set(data1.keys())
systems2 = set(data2.keys())
all_systems = systems1.union(systems2)
for system in all_systems:
system_diffs = {}
if system not in data1:
system_diffs["status"] = "added"
system_diffs["details"] = {"new_system": True}
elif system not in data2:
system_diffs["status"] = "removed"
system_diffs["details"] = {"removed_system": True}
else:
# Compare data for this system and data type
if data_type in data1[system] and data_type in data2[system]:
system_diffs = self.compare_system_data(
data1[system][data_type],
data2[system][data_type],
data_type
)
else:
system_diffs["status"] = "data_missing"
system_diffs["details"] = {"missing_data_type": data_type}
if system_diffs:
differences[system] = system_diffs
return differences
def compare_system_data(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare data for a specific system and data type."""
differences = {}
if data_type == "mounts":
differences = self.compare_mounts(data1, data2)
elif data_type == "services":
differences = self.compare_services(data1, data2)
elif data_type == "disk_usage":
differences = self.compare_disk_usage(data1, data2)
else:
differences["status"] = "unknown_data_type"
return differences
def compare_mounts(self, mounts1: Dict[str, Any], mounts2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare mounts data between snapshots."""
differences = {
"added_mounts": [],
"removed_mounts": [],
"changed_mounts": [],
"usage_changes": []
}
# Compare mount lists
mounts_list1 = mounts1.get("mounts", [])
mounts_list2 = mounts2.get("mounts", [])
# Create mountpoint maps
mounts_map1 = {m["mountpoint"]: m for m in mounts_list1}
mounts_map2 = {m["mountpoint"]: m for m in mounts_list2}
# Find added and removed mounts
added = set(mounts_map2.keys()) - set(mounts_map1.keys())
removed = set(mounts_map1.keys()) - set(mounts_map2.keys())
differences["added_mounts"] = [{"mountpoint": mp, **mounts_map2[mp]} for mp in added]
differences["removed_mounts"] = [{"mountpoint": mp, **mounts_map1[mp]} for mp in removed]
# Find changed mounts
common = set(mounts_map1.keys()) & set(mounts_map2.keys())
for mp in common:
m1, m2 = mounts_map1[mp], mounts_map2[mp]
if m1 != m2:
differences["changed_mounts"].append({
"mountpoint": mp,
"before": m1,
"after": m2
})
# Compare usage statistics
usage1 = mounts1.get("usage", {})
usage2 = mounts2.get("usage", {})
for mp in set(usage1.keys()) | set(usage2.keys()):
if mp in usage1 and mp in usage2:
u1, u2 = usage1[mp], usage2[mp]
if u1 != u2:
differences["usage_changes"].append({
"mountpoint": mp,
"before": u1,
"after": u2
})
return differences
def compare_services(self, services1: Dict[str, Any], services2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare services data between snapshots."""
differences = {
"added_services": [],
"removed_services": [],
"status_changes": [],
"configuration_changes": []
}
# Compare service lists
services_list1 = services1.get("services", [])
services_list2 = services2.get("services", [])
# Create service maps
services_map1 = {s["name"]: s for s in services_list1}
services_map2 = {s["name"]: s for s in services_list2}
# Find added and removed services
added = set(services_map2.keys()) - set(services_map1.keys())
removed = set(services_map1.keys()) - set(services_map2.keys())
differences["added_services"] = [{"name": name, **services_map2[name]} for name in added]
differences["removed_services"] = [{"name": name, **services_map1[name]} for name in removed]
# Find status changes
common = set(services_map1.keys()) & set(services_map2.keys())
for name in common:
s1, s2 = services_map1[name], services_map2[name]
if s1.get("active_state") != s2.get("active_state") or s1.get("sub_state") != s2.get("sub_state"):
differences["status_changes"].append({
"name": name,
"before": {"active_state": s1.get("active_state"), "sub_state": s1.get("sub_state")},
"after": {"active_state": s2.get("active_state"), "sub_state": s2.get("sub_state")}
})
return differences
def compare_disk_usage(self, usage1: Dict[str, Any], usage2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare disk usage data between snapshots."""
differences = {
"filesystem_changes": [],
"directory_size_changes": [],
"significant_usage_changes": []
}
# Compare filesystem usage
fs1 = usage1.get("filesystem_usage", [])
fs2 = usage2.get("filesystem_usage", [])
# Create filesystem maps by mountpoint
fs_map1 = {fs["mountpoint"]: fs for fs in fs1}
fs_map2 = {fs["mountpoint"]: fs for fs in fs2}
common_fs = set(fs_map1.keys()) & set(fs_map2.keys())
for mp in common_fs:
f1, f2 = fs_map1[mp], fs_map2[mp]
if f1 != f2:
differences["filesystem_changes"].append({
"mountpoint": mp,
"before": f1,
"after": f2
})
# Check for significant usage changes
try:
use1 = int(f1.get("use_percent", "0").rstrip("%"))
use2 = int(f2.get("use_percent", "0").rstrip("%"))
if abs(use2 - use1) > 10: # 10% change threshold
differences["significant_usage_changes"].append({
"mountpoint": mp,
"change_percent": use2 - use1,
"before": f1,
"after": f2
})
except (ValueError, KeyError):
pass
return differences
def generate_summary(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a summary of all differences."""
summary = {
"total_systems": 0,
"systems_with_changes": 0,
"total_changes": 0,
"changes_by_type": {},
"most_affected_systems": []
}
system_change_counts = {}
for data_type, systems in differences.items():
summary["changes_by_type"][data_type] = 0
for system, system_diffs in systems.items():
if system not in system_change_counts:
system_change_counts[system] = 0
# Count changes for this system and data type
change_count = self.count_changes(system_diffs)
system_change_counts[system] += change_count
summary["changes_by_type"][data_type] += change_count
summary["total_changes"] += change_count
summary["total_systems"] = len(system_change_counts)
# Count systems with changes
summary["systems_with_changes"] = len([s for s in system_change_counts.values() if s > 0])
# Find most affected systems
sorted_systems = sorted(system_change_counts.items(), key=lambda x: x[1], reverse=True)
summary["most_affected_systems"] = sorted_systems[:5]
return summary
def count_changes(self, system_diffs: Dict[str, Any]) -> int:
"""Count the number of changes in system differences."""
count = 0
for key, value in system_diffs.items():
if isinstance(value, list):
count += len(value)
elif isinstance(value, dict) and key not in ["status"]:
# Count nested changes
count += sum(1 for v in value.values() if isinstance(v, list) and v)
return count
def assess_risks(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Assess risk levels for the changes."""
risk_assessment = {
"overall_risk": "low",
"risk_factors": [],
"critical_changes": [],
"recommendations": []
}
max_risk_level = 1
# Analyze each type of change
for data_type, systems in differences.items():
for system, system_diffs in systems.items():
risk_factors = self.analyze_system_risks(system_diffs, data_type)
risk_assessment["risk_factors"].extend(risk_factors)
for factor in risk_factors:
if factor["level"] > max_risk_level:
max_risk_level = factor["level"]
if factor["level"] >= 4: # Critical
risk_assessment["critical_changes"].append({
"system": system,
"data_type": data_type,
"factor": factor
})
# Set overall risk
risk_levels = {1: "low", 2: "medium", 3: "high", 4: "critical"}
risk_assessment["overall_risk"] = risk_levels.get(max_risk_level, "unknown")
# Generate recommendations
risk_assessment["recommendations"] = self.generate_recommendations(risk_assessment)
return risk_assessment
def analyze_system_risks(self, system_diffs: Dict[str, Any], data_type: str) -> List[Dict[str, Any]]:
"""Analyze risks for a specific system's changes."""
risk_factors = []
if data_type == "mounts":
# Check for removed critical mounts
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in ["/", "/boot", "/usr", "/var"]:
risk_factors.append({
"type": "critical_mount_removed",
"description": f"Critical mount point removed: {mount['mountpoint']}",
"level": 4
})
# Check for significant usage changes
for change in system_diffs.get("usage_changes", []):
try:
before_pct = int(change["before"].get("use_percent", "0").rstrip("%"))
after_pct = int(change["after"].get("use_percent", "0").rstrip("%"))
if after_pct > 95:
risk_factors.append({
"type": "filesystem_full",
"description": f"Filesystem usage critical: {change['mountpoint']} at {after_pct}%",
"level": 3
})
except (ValueError, KeyError):
pass
elif data_type == "services":
# Check for critical service changes
critical_services = ["sshd", "systemd", "networking", "dbus"]
for service in system_diffs.get("removed_services", []):
if service["name"] in critical_services:
risk_factors.append({
"type": "critical_service_removed",
"description": f"Critical service removed: {service['name']}",
"level": 4
})
for change in system_diffs.get("status_changes", []):
if change["after"]["active_state"] == "failed":
risk_factors.append({
"type": "service_failure",
"description": f"Service failed: {change['name']}",
"level": 3
})
elif data_type == "disk_usage":
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 20:
risk_factors.append({
"type": "disk_usage_spike",
"description": f"Significant disk usage change: {change['mountpoint']} ({change['change_percent']}%)",
"level": 2
})
return risk_factors
def generate_recommendations(self, risk_assessment: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on risk assessment."""
recommendations = []
if risk_assessment["overall_risk"] in ["high", "critical"]:
recommendations.append("Immediate review required - critical changes detected")
recommendations.append("Consider rolling back migration if critical services are affected")
if any(f["type"] == "critical_mount_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Verify system boot capability after mount changes")
if any(f["type"] == "critical_service_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Ensure critical services are restored before production cutover")
if any(f["type"] == "filesystem_full" for f in risk_assessment["risk_factors"]):
recommendations.append("Monitor disk space closely - cleanup may be required")
if not recommendations:
recommendations.append("Changes appear safe - proceed with standard validation procedures")
return recommendations
def validate_changes(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that changes meet requirements."""
validation_results = {
"passed": True,
"checks": [],
"failed_checks": []
}
# Define validation checks
checks = [
self.check_critical_services_running,
self.check_filesystem_integrity,
self.check_no_critical_mounts_removed
]
for check_func in checks:
check_result = check_func(differences)
validation_results["checks"].append(check_result)
if not check_result["passed"]:
validation_results["passed"] = False
validation_results["failed_checks"].append(check_result)
return validation_results
def check_critical_services_running(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that critical services are still running."""
check = {
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": True,
"details": []
}
critical_services = ["sshd", "systemd"]
for data_type, systems in differences.items():
if data_type == "services":
for system, system_diffs in systems.items():
for change in system_diffs.get("status_changes", []):
if change["name"] in critical_services:
if change["after"]["active_state"] == "failed":
check["passed"] = False
check["details"].append(f"Critical service {change['name']} failed on {system}")
return check
def check_filesystem_integrity(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check filesystem integrity after changes."""
check = {
"name": "filesystem_integrity",
"description": "Verify filesystem integrity maintained",
"passed": True,
"details": []
}
for data_type, systems in differences.items():
if data_type == "disk_usage":
for system, system_diffs in systems.items():
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 50: # Arbitrary threshold
check["passed"] = False
check["details"].append(f"Extreme usage change on {system}:{change['mountpoint']}")
return check
def check_no_critical_mounts_removed(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that no critical mount points were removed."""
check = {
"name": "no_critical_mounts_removed",
"description": "Verify critical mount points remain",
"passed": True,
"details": []
}
critical_mounts = ["/", "/boot", "/usr", "/var"]
for data_type, systems in differences.items():
if data_type == "mounts":
for system, system_diffs in systems.items():
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in critical_mounts:
check["passed"] = False
check["details"].append(f"Critical mount {mount['mountpoint']} removed from {system}")
return check
def compare_snapshots(snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Main comparison function."""
comparator = SnapshotComparator()
return comparator.compare_snapshots(snapshot1, snapshot2)