feat: Add comprehensive enterprise Linux infrastructure portfolio with Ansible, Python, and ELK stack
CI Pipeline / lint-ansible (push) Waiting to run
CI Pipeline / test-python (push) Waiting to run
CI Pipeline / validate-docker (push) Waiting to run
CI Pipeline / security-scan (push) Waiting to run
CI Pipeline / documentation (push) Waiting to run
CI Pipeline / integration-test (push) Blocked by required conditions
CI Pipeline / lint-ansible (push) Waiting to run
CI Pipeline / test-python (push) Waiting to run
CI Pipeline / validate-docker (push) Waiting to run
CI Pipeline / security-scan (push) Waiting to run
CI Pipeline / documentation (push) Waiting to run
CI Pipeline / integration-test (push) Blocked by required conditions
This commit is contained in:
@@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migration Validation Framework - CLI Interface
|
||||
|
||||
A comprehensive tool for validating system migrations through data collection,
|
||||
snapshot comparison, and automated reporting.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
# Import framework modules
|
||||
from collectors import mounts, services, disk_usage
|
||||
from validators import compare
|
||||
from reports import html_report
|
||||
|
||||
# Configuration
|
||||
SNAPSHOTS_DIR = Path("snapshots")
|
||||
LOGS_DIR = Path("logs")
|
||||
REPORTS_DIR = Path("reports")
|
||||
|
||||
class MigrationValidator:
|
||||
"""Main migration validation class."""
|
||||
|
||||
def __init__(self, verbose: bool = False):
|
||||
self.verbose = verbose
|
||||
self.setup_logging()
|
||||
self.ensure_directories()
|
||||
|
||||
def setup_logging(self):
|
||||
"""Configure logging."""
|
||||
log_level = logging.DEBUG if self.verbose else logging.INFO
|
||||
logging.basicConfig(
|
||||
level=log_level,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(LOGS_DIR / "validation.log"),
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def ensure_directories(self):
|
||||
"""Ensure required directories exist."""
|
||||
for directory in [SNAPSHOTS_DIR, LOGS_DIR, REPORTS_DIR]:
|
||||
directory.mkdir(exist_ok=True)
|
||||
|
||||
def collect_system_data(self, systems: List[str]) -> Dict[str, Any]:
|
||||
"""Collect data from target systems."""
|
||||
self.logger.info(f"Collecting data from systems: {systems}")
|
||||
|
||||
snapshot = {
|
||||
"metadata": {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"systems": systems,
|
||||
"version": "1.0"
|
||||
},
|
||||
"data": {}
|
||||
}
|
||||
|
||||
collectors = [
|
||||
("mounts", mounts.collect),
|
||||
("services", services.collect),
|
||||
("disk_usage", disk_usage.collect)
|
||||
]
|
||||
|
||||
for system in systems:
|
||||
self.logger.info(f"Collecting data from {system}")
|
||||
snapshot["data"][system] = {}
|
||||
|
||||
for collector_name, collector_func in collectors:
|
||||
try:
|
||||
self.logger.debug(f"Running {collector_name} collector on {system}")
|
||||
data = collector_func(system)
|
||||
snapshot["data"][system][collector_name] = data
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to collect {collector_name} from {system}: {e}")
|
||||
snapshot["data"][system][collector_name] = {"error": str(e)}
|
||||
|
||||
return snapshot
|
||||
|
||||
def save_snapshot(self, snapshot: Dict[str, Any], label: str, env: str) -> str:
|
||||
"""Save snapshot to disk."""
|
||||
snapshot_id = f"{env}-{label}-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json"
|
||||
|
||||
with open(snapshot_file, 'w') as f:
|
||||
json.dump(snapshot, f, indent=2)
|
||||
|
||||
self.logger.info(f"Snapshot saved: {snapshot_id}")
|
||||
return snapshot_id
|
||||
|
||||
def load_snapshot(self, snapshot_id: str) -> Dict[str, Any]:
|
||||
"""Load snapshot from disk."""
|
||||
snapshot_file = SNAPSHOTS_DIR / f"{snapshot_id}.json"
|
||||
if not snapshot_file.exists():
|
||||
raise FileNotFoundError(f"Snapshot {snapshot_id} not found")
|
||||
|
||||
with open(snapshot_file, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def create_snapshot(self, env: str, label: str, systems: List[str]) -> str:
|
||||
"""Create and save a system snapshot."""
|
||||
self.logger.info(f"Creating snapshot for environment: {env}, label: {label}")
|
||||
|
||||
snapshot = self.collect_system_data(systems)
|
||||
snapshot_id = self.save_snapshot(snapshot, label, env)
|
||||
|
||||
return snapshot_id
|
||||
|
||||
def compare_snapshots(self, snapshot1_id: str, snapshot2_id: str, output_id: str) -> Dict[str, Any]:
|
||||
"""Compare two snapshots."""
|
||||
self.logger.info(f"Comparing snapshots: {snapshot1_id} vs {snapshot2_id}")
|
||||
|
||||
snapshot1 = self.load_snapshot(snapshot1_id)
|
||||
snapshot2 = self.load_snapshot(snapshot2_id)
|
||||
|
||||
comparison = compare.compare_snapshots(snapshot1, snapshot2)
|
||||
comparison["metadata"] = {
|
||||
"snapshot1": snapshot1_id,
|
||||
"snapshot2": snapshot2_id,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"comparison_id": output_id
|
||||
}
|
||||
|
||||
# Save comparison results
|
||||
comparison_file = REPORTS_DIR / f"comparison_{output_id}.json"
|
||||
with open(comparison_file, 'w') as f:
|
||||
json.dump(comparison, f, indent=2)
|
||||
|
||||
self.logger.info(f"Comparison saved: {output_id}")
|
||||
return comparison
|
||||
|
||||
def generate_report(self, comparison_id: str, format_type: str, output_file: Optional[str] = None) -> str:
|
||||
"""Generate a report from comparison results."""
|
||||
self.logger.info(f"Generating {format_type} report for comparison: {comparison_id}")
|
||||
|
||||
comparison_file = REPORTS_DIR / f"comparison_{comparison_id}.json"
|
||||
if not comparison_file.exists():
|
||||
raise FileNotFoundError(f"Comparison {comparison_id} not found")
|
||||
|
||||
with open(comparison_file, 'r') as f:
|
||||
comparison = json.load(f)
|
||||
|
||||
if format_type == "html":
|
||||
if output_file is None:
|
||||
output_file = f"migration_report_{comparison_id}.html"
|
||||
html_report.generate(comparison, output_file)
|
||||
elif format_type == "json":
|
||||
if output_file is None:
|
||||
output_file = f"migration_report_{comparison_id}.json"
|
||||
with open(output_file, 'w') as f:
|
||||
json.dump(comparison, f, indent=2)
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format_type}")
|
||||
|
||||
self.logger.info(f"Report generated: {output_file}")
|
||||
return output_file
|
||||
|
||||
def main():
|
||||
"""Main CLI entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Migration Validation Framework",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Create pre-migration snapshot
|
||||
python cli.py snapshot --env production --label pre-migration --systems web01,db01
|
||||
|
||||
# Compare snapshots
|
||||
python cli.py compare pre-migration-snapshot post-migration-snapshot --output comparison_001
|
||||
|
||||
# Generate HTML report
|
||||
python cli.py report --comparison comparison_001 --format html
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Preview actions without execution')
|
||||
|
||||
subparsers = parser.add_subparsers(dest='command', help='Available commands')
|
||||
|
||||
# Snapshot command
|
||||
snapshot_parser = subparsers.add_parser('snapshot', help='Create system snapshot')
|
||||
snapshot_parser.add_argument('--env', required=True, help='Target environment')
|
||||
snapshot_parser.add_argument('--label', required=True, help='Snapshot label')
|
||||
snapshot_parser.add_argument('--systems', required=True, help='Comma-separated list of systems')
|
||||
|
||||
# Compare command
|
||||
compare_parser = subparsers.add_parser('compare', help='Compare two snapshots')
|
||||
compare_parser.add_argument('snapshot1', help='First snapshot ID')
|
||||
compare_parser.add_argument('snapshot2', help='Second snapshot ID')
|
||||
compare_parser.add_argument('--output', required=True, help='Comparison output ID')
|
||||
|
||||
# Report command
|
||||
report_parser = subparsers.add_parser('report', help='Generate report from comparison')
|
||||
report_parser.add_argument('--comparison', required=True, help='Comparison ID')
|
||||
report_parser.add_argument('--format', choices=['html', 'json'], default='html', help='Report format')
|
||||
report_parser.add_argument('--output', help='Output file path')
|
||||
|
||||
# List command
|
||||
list_parser = subparsers.add_parser('list', help='List snapshots or comparisons')
|
||||
list_parser.add_argument('type', choices=['snapshots', 'comparisons'], help='Type to list')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
# Initialize validator
|
||||
validator = MigrationValidator(verbose=args.verbose)
|
||||
|
||||
try:
|
||||
if args.command == 'snapshot':
|
||||
systems = args.systems.split(',')
|
||||
if args.dry_run:
|
||||
print(f"DRY RUN: Would create snapshot for systems: {systems}")
|
||||
return
|
||||
|
||||
snapshot_id = validator.create_snapshot(args.env, args.label, systems)
|
||||
print(f"Snapshot created: {snapshot_id}")
|
||||
|
||||
elif args.command == 'compare':
|
||||
if args.dry_run:
|
||||
print(f"DRY RUN: Would compare {args.snapshot1} vs {args.snapshot2}")
|
||||
return
|
||||
|
||||
comparison = validator.compare_snapshots(args.snapshot1, args.snapshot2, args.output)
|
||||
print(f"Comparison completed: {args.output}")
|
||||
|
||||
elif args.command == 'report':
|
||||
if args.dry_run:
|
||||
print(f"DRY RUN: Would generate {args.format} report for {args.comparison}")
|
||||
return
|
||||
|
||||
output_file = validator.generate_report(args.comparison, args.format, args.output)
|
||||
print(f"Report generated: {output_file}")
|
||||
|
||||
elif args.command == 'list':
|
||||
if args.type == 'snapshots':
|
||||
snapshots = list(SNAPSHOTS_DIR.glob("*.json"))
|
||||
if snapshots:
|
||||
print("Available snapshots:")
|
||||
for snapshot in sorted(snapshots):
|
||||
print(f" {snapshot.stem}")
|
||||
else:
|
||||
print("No snapshots found")
|
||||
elif args.type == 'comparisons':
|
||||
comparisons = list(REPORTS_DIR.glob("comparison_*.json"))
|
||||
if comparisons:
|
||||
print("Available comparisons:")
|
||||
for comparison in sorted(comparisons):
|
||||
comp_id = comparison.stem.replace('comparison_', '')
|
||||
print(f" {comp_id}")
|
||||
else:
|
||||
print("No comparisons found")
|
||||
|
||||
except Exception as e:
|
||||
validator.logger.error(f"Command failed: {e}")
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user