#!/usr/bin/env python3 """ Migration Validation Framework - CLI Interface A comprehensive tool for validating system migrations through data collection, snapshot comparison, and automated reporting. """ import argparse import json import logging import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any # Import framework modules from collectors import mounts, services, disk_usage from validators import compare from reports import html_report # Configuration SNAPSHOTS_DIR = Path("snapshots") LOGS_DIR = Path("logs") REPORTS_DIR = Path("reports") class MigrationValidator: """Main migration validation class.""" def __init__(self, verbose: bool = False): self.verbose = verbose self.setup_logging() self.ensure_directories() def setup_logging(self): """Configure logging.""" log_level = logging.DEBUG if self.verbose else logging.INFO logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOGS_DIR / "validation.log"), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) def ensure_directories(self): """Ensure required directories exist.""" for directory in [SNAPSHOTS_DIR, LOGS_DIR, REPORTS_DIR]: directory.mkdir(exist_ok=True) def collect_system_data(self, systems: List[str]) -> Dict[str, Any]: """Collect data from target systems.""" self.logger.info(f"Collecting data from systems: {systems}") snapshot = { "metadata": { "timestamp": datetime.now().isoformat(), "systems": systems, "version": "1.0" }, "data": {} } collectors = [ ("mounts", mounts.collect), ("services", services.collect), ("disk_usage", disk_usage.collect) ] for system in systems: self.logger.info(f"Collecting data from {system}") snapshot["data"][system] = {} for collector_name, collector_func in collectors: try: self.logger.debug(f"Running {collector_name} collector on {system}") data = collector_func(system) snapshot["data"][system][collector_name] = data except Exception as e: self.logger.error(f"Failed to collect {collector_name} from {system}: {e}") snapshot["data"][system][collector_name] = {"error": str(e)} return snapshot def save_snapshot(self, snapshot: Dict[str, Any], label: str, env: str) -> str: """Save snapshot to disk.""" snapshot_id = f"{env}-{label}-{datetime.now().strftime('%Y%m%d_%H%M%S')}" snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json" with open(snapshot_file, 'w') as f: json.dump(snapshot, f, indent=2) self.logger.info(f"Snapshot saved: {snapshot_id}") return snapshot_id def load_snapshot(self, snapshot_id: str) -> Dict[str, Any]: """Load snapshot from disk.""" snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json" if not snapshot_file.exists(): raise FileNotFoundError(f"Snapshot {snapshot_id} not found") with open(snapshot_file, 'r') as f: return json.load(f) def create_snapshot(self, env: str, label: str, systems: List[str]) -> str: """Create and save a system snapshot.""" self.logger.info(f"Creating snapshot for environment: {env}, label: {label}") snapshot = self.collect_system_data(systems) snapshot_id = self.save_snapshot(snapshot, label, env) return snapshot_id def compare_snapshots(self, snapshot1_id: str, snapshot2_id: str, output_id: str) -> Dict[str, Any]: """Compare two snapshots.""" self.logger.info(f"Comparing snapshots: {snapshot1_id} vs {snapshot2_id}") snapshot1 = self.load_snapshot(snapshot1_id) snapshot2 = self.load_snapshot(snapshot2_id) comparison = compare.compare_snapshots(snapshot1, snapshot2) comparison["metadata"] = { "snapshot1": snapshot1_id, "snapshot2": snapshot2_id, "timestamp": datetime.now().isoformat(), "comparison_id": output_id } # Save comparison results comparison_file = REPORTS_DIR / f"comparison_{output_id}.json" with open(comparison_file, 'w') as f: json.dump(comparison, f, indent=2) self.logger.info(f"Comparison saved: {output_id}") return comparison def generate_report(self, comparison_id: str, format_type: str, output_file: Optional[str] = None) -> str: """Generate a report from comparison results.""" self.logger.info(f"Generating {format_type} report for comparison: {comparison_id}") comparison_file = REPORTS_DIR / f"comparison_{comparison_id}.json" if not comparison_file.exists(): raise FileNotFoundError(f"Comparison {comparison_id} not found") with open(comparison_file, 'r') as f: comparison = json.load(f) if format_type == "html": if output_file is None: output_file = f"migration_report_{comparison_id}.html" html_report.generate(comparison, output_file) elif format_type == "json": if output_file is None: output_file = f"migration_report_{comparison_id}.json" with open(output_file, 'w') as f: json.dump(comparison, f, indent=2) else: raise ValueError(f"Unsupported format: {format_type}") self.logger.info(f"Report generated: {output_file}") return output_file def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description="Migration Validation Framework", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Create pre-migration snapshot python cli.py snapshot --env production --label pre-migration --systems web01,db01 # Compare snapshots python cli.py compare pre-migration-snapshot post-migration-snapshot --output comparison_001 # Generate HTML report python cli.py report --comparison comparison_001 --format html """ ) parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') parser.add_argument('--dry-run', action='store_true', help='Preview actions without execution') subparsers = parser.add_subparsers(dest='command', help='Available commands') # Snapshot command snapshot_parser = subparsers.add_parser('snapshot', help='Create system snapshot') snapshot_parser.add_argument('--env', required=True, help='Target environment') snapshot_parser.add_argument('--label', required=True, help='Snapshot label') snapshot_parser.add_argument('--systems', required=True, help='Comma-separated list of systems') # Compare command compare_parser = subparsers.add_parser('compare', help='Compare two snapshots') compare_parser.add_argument('snapshot1', help='First snapshot ID') compare_parser.add_argument('snapshot2', help='Second snapshot ID') compare_parser.add_argument('--output', required=True, help='Comparison output ID') # Report command report_parser = subparsers.add_parser('report', help='Generate report from comparison') report_parser.add_argument('--comparison', required=True, help='Comparison ID') report_parser.add_argument('--format', choices=['html', 'json'], default='html', help='Report format') report_parser.add_argument('--output', help='Output file path') # List command list_parser = subparsers.add_parser('list', help='List snapshots or comparisons') list_parser.add_argument('type', choices=['snapshots', 'comparisons'], help='Type to list') args = parser.parse_args() if not args.command: parser.print_help() return # Initialize validator validator = MigrationValidator(verbose=args.verbose) try: if args.command == 'snapshot': systems = args.systems.split(',') if args.dry_run: print(f"DRY RUN: Would create snapshot for systems: {systems}") return snapshot_id = validator.create_snapshot(args.env, args.label, systems) print(f"Snapshot created: {snapshot_id}") elif args.command == 'compare': if args.dry_run: print(f"DRY RUN: Would compare {args.snapshot1} vs {args.snapshot2}") return comparison = validator.compare_snapshots(args.snapshot1, args.snapshot2, args.output) print(f"Comparison completed: {args.output}") elif args.command == 'report': if args.dry_run: print(f"DRY RUN: Would generate {args.format} report for {args.comparison}") return output_file = validator.generate_report(args.comparison, args.format, args.output) print(f"Report generated: {output_file}") elif args.command == 'list': if args.type == 'snapshots': snapshots = list(SNAPSHOTS_DIR.glob("*.json")) if snapshots: print("Available snapshots:") for snapshot in sorted(snapshots): print(f" {snapshot.stem}") else: print("No snapshots found") elif args.type == 'comparisons': comparisons = list(REPORTS_DIR.glob("comparison_*.json")) if comparisons: print("Available comparisons:") for comparison in sorted(comparisons): comp_id = comparison.stem.replace('comparison_', '') print(f" {comp_id}") else: print("No comparisons found") except Exception as e: validator.logger.error(f"Command failed: {e}") print(f"Error: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()