Files

324 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Migration Validation Framework - CLI Interface
A comprehensive tool for validating system migrations through data collection,
snapshot comparison, and automated reporting.
"""
import argparse
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
# Import framework modules
from collectors import mounts, services, disk_usage
from validators import compare
from reports import html_report
# Configuration
SNAPSHOTS_DIR = Path("snapshots")
LOGS_DIR = Path("logs")
REPORTS_DIR = Path("reports")
class MigrationValidator:
"""Main migration validation class."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.ensure_directories()
2026-04-29 23:30:30 +00:00
self.setup_logging()
def setup_logging(self):
"""Configure logging."""
log_level = logging.DEBUG if self.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOGS_DIR / "validation.log"),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def ensure_directories(self):
"""Ensure required directories exist."""
for directory in [SNAPSHOTS_DIR, LOGS_DIR, REPORTS_DIR]:
directory.mkdir(exist_ok=True)
def collect_system_data(self, systems: List[str]) -> Dict[str, Any]:
"""Collect data from target systems."""
self.logger.info(f"Collecting data from systems: {systems}")
snapshot = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"systems": systems,
"version": "1.0"
},
"data": {}
}
collectors = [
("mounts", mounts.collect),
("services", services.collect),
("disk_usage", disk_usage.collect)
]
for system in systems:
self.logger.info(f"Collecting data from {system}")
snapshot["data"][system] = {}
for collector_name, collector_func in collectors:
try:
self.logger.debug(f"Running {collector_name} collector on {system}")
data = collector_func(system)
snapshot["data"][system][collector_name] = data
except Exception as e:
self.logger.error(f"Failed to collect {collector_name} from {system}: {e}")
snapshot["data"][system][collector_name] = {"error": str(e)}
return snapshot
def save_snapshot(self, snapshot: Dict[str, Any], label: str, env: str) -> str:
"""Save snapshot to disk."""
snapshot_id = f"{env}-{label}-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json"
with open(snapshot_file, 'w') as f:
json.dump(snapshot, f, indent=2)
self.logger.info(f"Snapshot saved: {snapshot_id}")
return snapshot_id
def load_snapshot(self, snapshot_id: str) -> Dict[str, Any]:
"""Load snapshot from disk."""
2026-04-29 23:30:30 +00:00
snapshot_path = Path(snapshot_id)
snapshot_file = snapshot_path if snapshot_path.exists() else SNAPSHOTS_DIR / f"{snapshot_id}.json"
if not snapshot_file.exists():
raise FileNotFoundError(f"Snapshot {snapshot_id} not found")
with open(snapshot_file, 'r') as f:
return json.load(f)
2026-04-29 23:30:30 +00:00
def collect_to_file(self, output_file: str, systems: List[str]) -> str:
"""Collect a snapshot and write it to an explicit file path."""
snapshot = self.collect_system_data(systems)
with open(output_file, 'w') as f:
json.dump(snapshot, f, indent=2)
f.write("\n")
self.logger.info(f"Snapshot written: {output_file}")
return output_file
def create_snapshot(self, env: str, label: str, systems: List[str]) -> str:
"""Create and save a system snapshot."""
self.logger.info(f"Creating snapshot for environment: {env}, label: {label}")
snapshot = self.collect_system_data(systems)
snapshot_id = self.save_snapshot(snapshot, label, env)
return snapshot_id
def compare_snapshots(self, snapshot1_id: str, snapshot2_id: str, output_id: str) -> Dict[str, Any]:
"""Compare two snapshots."""
self.logger.info(f"Comparing snapshots: {snapshot1_id} vs {snapshot2_id}")
snapshot1 = self.load_snapshot(snapshot1_id)
snapshot2 = self.load_snapshot(snapshot2_id)
comparison = compare.compare_snapshots(snapshot1, snapshot2)
comparison["metadata"] = {
"snapshot1": snapshot1_id,
"snapshot2": snapshot2_id,
"timestamp": datetime.now().isoformat(),
"comparison_id": output_id
}
# Save comparison results
comparison_file = REPORTS_DIR / f"comparison_{output_id}.json"
with open(comparison_file, 'w') as f:
json.dump(comparison, f, indent=2)
self.logger.info(f"Comparison saved: {output_id}")
return comparison
2026-04-29 23:30:30 +00:00
def compare_files(self, before_file: str, after_file: str, output_file: Optional[str] = None) -> Dict[str, Any]:
"""Compare two explicit JSON snapshot files."""
self.logger.info(f"Comparing files: {before_file} vs {after_file}")
before = self.load_snapshot(before_file)
after = self.load_snapshot(after_file)
comparison = compare.compare_snapshots(before, after)
comparison["metadata"] = {
"before": before_file,
"after": after_file,
"timestamp": datetime.now().isoformat()
}
if output_file:
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
f.write("\n")
self.logger.info(f"Comparison written: {output_file}")
return comparison
def generate_report(self, comparison_id: str, format_type: str, output_file: Optional[str] = None) -> str:
"""Generate a report from comparison results."""
self.logger.info(f"Generating {format_type} report for comparison: {comparison_id}")
comparison_file = REPORTS_DIR / f"comparison_{comparison_id}.json"
if not comparison_file.exists():
raise FileNotFoundError(f"Comparison {comparison_id} not found")
with open(comparison_file, 'r') as f:
comparison = json.load(f)
if format_type == "html":
if output_file is None:
output_file = f"migration_report_{comparison_id}.html"
html_report.generate(comparison, output_file)
elif format_type == "json":
if output_file is None:
output_file = f"migration_report_{comparison_id}.json"
with open(output_file, 'w') as f:
json.dump(comparison, f, indent=2)
else:
raise ValueError(f"Unsupported format: {format_type}")
self.logger.info(f"Report generated: {output_file}")
return output_file
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="Migration Validation Framework",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
2026-04-29 23:30:30 +00:00
# Collect pre-migration snapshot
python3 cli.py collect --output before.json --systems web01,db01
2026-04-29 23:30:30 +00:00
# Compare snapshot files
python3 cli.py compare before.json after.json --output diff.json
# Generate HTML report
2026-04-29 23:30:30 +00:00
python3 cli.py report --comparison comparison_001 --format html
"""
)
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
parser.add_argument('--dry-run', action='store_true', help='Preview actions without execution')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
2026-04-29 23:30:30 +00:00
# Collect command
collect_parser = subparsers.add_parser('collect', help='Collect a system snapshot to a JSON file')
collect_parser.add_argument('--output', required=True, help='Output JSON file')
collect_parser.add_argument('--systems', default='localhost', help='Comma-separated list of systems')
# Snapshot command
snapshot_parser = subparsers.add_parser('snapshot', help='Create system snapshot')
snapshot_parser.add_argument('--env', required=True, help='Target environment')
snapshot_parser.add_argument('--label', required=True, help='Snapshot label')
snapshot_parser.add_argument('--systems', required=True, help='Comma-separated list of systems')
# Compare command
compare_parser = subparsers.add_parser('compare', help='Compare two snapshots')
compare_parser.add_argument('snapshot1', help='First snapshot ID')
compare_parser.add_argument('snapshot2', help='Second snapshot ID')
2026-04-29 23:30:30 +00:00
compare_parser.add_argument('--output', help='Output comparison ID or JSON file')
# Report command
report_parser = subparsers.add_parser('report', help='Generate report from comparison')
report_parser.add_argument('--comparison', required=True, help='Comparison ID')
report_parser.add_argument('--format', choices=['html', 'json'], default='html', help='Report format')
report_parser.add_argument('--output', help='Output file path')
# List command
list_parser = subparsers.add_parser('list', help='List snapshots or comparisons')
list_parser.add_argument('type', choices=['snapshots', 'comparisons'], help='Type to list')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize validator
validator = MigrationValidator(verbose=args.verbose)
try:
2026-04-29 23:30:30 +00:00
if args.command == 'collect':
systems = [system.strip() for system in args.systems.split(',') if system.strip()]
if args.dry_run:
print(f"DRY RUN: Would collect {systems} into {args.output}")
return
output_file = validator.collect_to_file(args.output, systems)
print(f"Snapshot written: {output_file}")
elif args.command == 'snapshot':
systems = args.systems.split(',')
if args.dry_run:
print(f"DRY RUN: Would create snapshot for systems: {systems}")
return
snapshot_id = validator.create_snapshot(args.env, args.label, systems)
print(f"Snapshot created: {snapshot_id}")
elif args.command == 'compare':
if args.dry_run:
print(f"DRY RUN: Would compare {args.snapshot1} vs {args.snapshot2}")
return
2026-04-29 23:30:30 +00:00
output = args.output
if output and output.endswith('.json'):
comparison = validator.compare_files(args.snapshot1, args.snapshot2, output)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output} ({result})")
else:
output_id = output or datetime.now().strftime('%Y%m%d_%H%M%S')
comparison = validator.compare_snapshots(args.snapshot1, args.snapshot2, output_id)
result = "PASS" if comparison.get("validation_results", {}).get("passed") else "FAIL"
print(f"Comparison completed: {output_id} ({result})")
elif args.command == 'report':
if args.dry_run:
print(f"DRY RUN: Would generate {args.format} report for {args.comparison}")
return
output_file = validator.generate_report(args.comparison, args.format, args.output)
print(f"Report generated: {output_file}")
elif args.command == 'list':
if args.type == 'snapshots':
snapshots = list(SNAPSHOTS_DIR.glob("*.json"))
if snapshots:
print("Available snapshots:")
for snapshot in sorted(snapshots):
print(f" {snapshot.stem}")
else:
print("No snapshots found")
elif args.type == 'comparisons':
comparisons = list(REPORTS_DIR.glob("comparison_*.json"))
if comparisons:
print("Available comparisons:")
for comparison in sorted(comparisons):
comp_id = comparison.stem.replace('comparison_', '')
print(f" {comp_id}")
else:
print("No comparisons found")
except Exception as e:
validator.logger.error(f"Command failed: {e}")
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
2026-04-29 23:30:30 +00:00
main()