""" Snapshot Comparison Engine Compares two system snapshots and identifies differences, risk levels, and validation results. """ import json import logging from typing import Dict, Any, List, Tuple from datetime import datetime logger = logging.getLogger(__name__) class SnapshotComparator: """Engine for comparing system snapshots.""" def __init__(self): self.risk_levels = { "low": 1, "medium": 2, "high": 3, "critical": 4 } def compare_snapshots(self, snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]: """Compare two snapshots and return detailed comparison results.""" logger.info("Starting snapshot comparison") comparison = { "summary": {}, "differences": {}, "risk_assessment": {}, "validation_results": {} } # Compare each data type data_types = ["mounts", "services", "disk_usage"] data1 = snapshot1.get("data", {}) data2 = snapshot2.get("data", {}) for data_type in data_types: if self.data_type_exists(data1, data_type) or self.data_type_exists(data2, data_type): differences = self.compare_data_type(data1, data2, data_type) comparison["differences"][data_type] = differences # Generate summary comparison["summary"] = self.generate_summary(comparison["differences"]) # Risk assessment comparison["risk_assessment"] = self.assess_risks(comparison["differences"]) # Validation results comparison["validation_results"] = self.validate_changes(comparison["differences"]) comparison["validation_results"]["result"] = "PASS" if comparison["validation_results"]["passed"] else "FAIL" logger.info("Snapshot comparison completed") return comparison def data_type_exists(self, systems: Dict[str, Any], data_type: str) -> bool: """Return true when at least one system has the requested collector data.""" return any(data_type in system_data for system_data in systems.values()) def compare_data_type(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]: """Compare a specific data type between two snapshots.""" differences = {} # Get all systems from both snapshots systems1 = set(data1.keys()) systems2 = set(data2.keys()) all_systems = systems1.union(systems2) for system in all_systems: system_diffs = {} if system not in data1: system_diffs["status"] = "added" system_diffs["details"] = {"new_system": True} elif system not in data2: system_diffs["status"] = "removed" system_diffs["details"] = {"removed_system": True} else: # Compare data for this system and data type if data_type in data1[system] and data_type in data2[system]: system_diffs = self.compare_system_data( data1[system][data_type], data2[system][data_type], data_type ) else: system_diffs["status"] = "data_missing" system_diffs["details"] = {"missing_data_type": data_type} if system_diffs: differences[system] = system_diffs return differences def compare_system_data(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]: """Compare data for a specific system and data type.""" differences = {} if data_type == "mounts": differences = self.compare_mounts(data1, data2) elif data_type == "services": differences = self.compare_services(data1, data2) elif data_type == "disk_usage": differences = self.compare_disk_usage(data1, data2) else: differences["status"] = "unknown_data_type" return differences def compare_mounts(self, mounts1: Dict[str, Any], mounts2: Dict[str, Any]) -> Dict[str, Any]: """Compare mounts data between snapshots.""" differences = { "added_mounts": [], "removed_mounts": [], "changed_mounts": [], "usage_changes": [] } # Compare mount lists mounts_list1 = mounts1.get("mounts", []) mounts_list2 = mounts2.get("mounts", []) # Create mountpoint maps mounts_map1 = {m["mountpoint"]: m for m in mounts_list1} mounts_map2 = {m["mountpoint"]: m for m in mounts_list2} # Find added and removed mounts added = set(mounts_map2.keys()) - set(mounts_map1.keys()) removed = set(mounts_map1.keys()) - set(mounts_map2.keys()) differences["added_mounts"] = [{"mountpoint": mp, **mounts_map2[mp]} for mp in added] differences["removed_mounts"] = [{"mountpoint": mp, **mounts_map1[mp]} for mp in removed] # Find changed mounts common = set(mounts_map1.keys()) & set(mounts_map2.keys()) for mp in common: m1, m2 = mounts_map1[mp], mounts_map2[mp] if m1 != m2: differences["changed_mounts"].append({ "mountpoint": mp, "before": m1, "after": m2 }) # Compare usage statistics usage1 = mounts1.get("usage", {}) usage2 = mounts2.get("usage", {}) for mp in set(usage1.keys()) | set(usage2.keys()): if mp in usage1 and mp in usage2: u1, u2 = usage1[mp], usage2[mp] if u1 != u2: differences["usage_changes"].append({ "mountpoint": mp, "before": u1, "after": u2 }) return differences def compare_services(self, services1: Dict[str, Any], services2: Dict[str, Any]) -> Dict[str, Any]: """Compare services data between snapshots.""" differences = { "added_services": [], "removed_services": [], "status_changes": [], "configuration_changes": [] } # Compare service lists services_list1 = services1.get("services", []) services_list2 = services2.get("services", []) # Create service maps services_map1 = {s["name"]: s for s in services_list1} services_map2 = {s["name"]: s for s in services_list2} # Find added and removed services added = set(services_map2.keys()) - set(services_map1.keys()) removed = set(services_map1.keys()) - set(services_map2.keys()) differences["added_services"] = [{"name": name, **services_map2[name]} for name in added] differences["removed_services"] = [{"name": name, **services_map1[name]} for name in removed] # Find status changes common = set(services_map1.keys()) & set(services_map2.keys()) for name in common: s1, s2 = services_map1[name], services_map2[name] if s1.get("active_state") != s2.get("active_state") or s1.get("sub_state") != s2.get("sub_state"): differences["status_changes"].append({ "name": name, "before": {"active_state": s1.get("active_state"), "sub_state": s1.get("sub_state")}, "after": {"active_state": s2.get("active_state"), "sub_state": s2.get("sub_state")} }) return differences def compare_disk_usage(self, usage1: Dict[str, Any], usage2: Dict[str, Any]) -> Dict[str, Any]: """Compare disk usage data between snapshots.""" differences = { "filesystem_changes": [], "directory_size_changes": [], "significant_usage_changes": [] } # Compare filesystem usage fs1 = usage1.get("filesystem_usage", []) fs2 = usage2.get("filesystem_usage", []) # Create filesystem maps by mountpoint fs_map1 = {fs["mountpoint"]: fs for fs in fs1} fs_map2 = {fs["mountpoint"]: fs for fs in fs2} common_fs = set(fs_map1.keys()) & set(fs_map2.keys()) for mp in common_fs: f1, f2 = fs_map1[mp], fs_map2[mp] if f1 != f2: differences["filesystem_changes"].append({ "mountpoint": mp, "before": f1, "after": f2 }) # Check for significant usage changes try: use1 = int(f1.get("use_percent", "0").rstrip("%")) use2 = int(f2.get("use_percent", "0").rstrip("%")) if abs(use2 - use1) > 10: # 10% change threshold differences["significant_usage_changes"].append({ "mountpoint": mp, "change_percent": use2 - use1, "before": f1, "after": f2 }) except (ValueError, KeyError): pass return differences def generate_summary(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Generate a summary of all differences.""" summary = { "total_systems": 0, "systems_with_changes": 0, "total_changes": 0, "changes_by_type": {}, "most_affected_systems": [] } system_change_counts = {} for data_type, systems in differences.items(): summary["changes_by_type"][data_type] = 0 for system, system_diffs in systems.items(): if system not in system_change_counts: system_change_counts[system] = 0 # Count changes for this system and data type change_count = self.count_changes(system_diffs) system_change_counts[system] += change_count summary["changes_by_type"][data_type] += change_count summary["total_changes"] += change_count summary["total_systems"] = len(system_change_counts) # Count systems with changes summary["systems_with_changes"] = len([s for s in system_change_counts.values() if s > 0]) # Find most affected systems sorted_systems = sorted(system_change_counts.items(), key=lambda x: x[1], reverse=True) summary["most_affected_systems"] = sorted_systems[:5] return summary def count_changes(self, system_diffs: Dict[str, Any]) -> int: """Count the number of changes in system differences.""" count = 0 for key, value in system_diffs.items(): if isinstance(value, list): count += len(value) elif isinstance(value, dict) and key not in ["status"]: # Count nested changes count += sum(1 for v in value.values() if isinstance(v, list) and v) return count def assess_risks(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Assess risk levels for the changes.""" risk_assessment = { "overall_risk": "low", "risk_factors": [], "critical_changes": [], "recommendations": [] } max_risk_level = 1 # Analyze each type of change for data_type, systems in differences.items(): for system, system_diffs in systems.items(): risk_factors = self.analyze_system_risks(system_diffs, data_type) risk_assessment["risk_factors"].extend(risk_factors) for factor in risk_factors: if factor["level"] > max_risk_level: max_risk_level = factor["level"] if factor["level"] >= 4: # Critical risk_assessment["critical_changes"].append({ "system": system, "data_type": data_type, "factor": factor }) # Set overall risk risk_levels = {1: "low", 2: "medium", 3: "high", 4: "critical"} risk_assessment["overall_risk"] = risk_levels.get(max_risk_level, "unknown") # Generate recommendations risk_assessment["recommendations"] = self.generate_recommendations(risk_assessment) return risk_assessment def analyze_system_risks(self, system_diffs: Dict[str, Any], data_type: str) -> List[Dict[str, Any]]: """Analyze risks for a specific system's changes.""" risk_factors = [] if data_type == "mounts": # Check for removed critical mounts for mount in system_diffs.get("removed_mounts", []): if mount["mountpoint"] in ["/", "/boot", "/usr", "/var"]: risk_factors.append({ "type": "critical_mount_removed", "description": f"Critical mount point removed: {mount['mountpoint']}", "level": 4 }) # Check for significant usage changes for change in system_diffs.get("usage_changes", []): try: before_pct = int(change["before"].get("use_percent", "0").rstrip("%")) after_pct = int(change["after"].get("use_percent", "0").rstrip("%")) if after_pct > 95: risk_factors.append({ "type": "filesystem_full", "description": f"Filesystem usage critical: {change['mountpoint']} at {after_pct}%", "level": 3 }) except (ValueError, KeyError): pass elif data_type == "services": # Check for critical service changes critical_services = ["sshd", "systemd", "networking", "dbus"] for service in system_diffs.get("removed_services", []): if service["name"] in critical_services: risk_factors.append({ "type": "critical_service_removed", "description": f"Critical service removed: {service['name']}", "level": 4 }) for change in system_diffs.get("status_changes", []): if change["after"]["active_state"] == "failed": risk_factors.append({ "type": "service_failure", "description": f"Service failed: {change['name']}", "level": 3 }) elif data_type == "disk_usage": for change in system_diffs.get("significant_usage_changes", []): if change["change_percent"] > 20: risk_factors.append({ "type": "disk_usage_spike", "description": f"Significant disk usage change: {change['mountpoint']} ({change['change_percent']}%)", "level": 2 }) return risk_factors def generate_recommendations(self, risk_assessment: Dict[str, Any]) -> List[str]: """Generate recommendations based on risk assessment.""" recommendations = [] if risk_assessment["overall_risk"] in ["high", "critical"]: recommendations.append("Immediate review required - critical changes detected") recommendations.append("Consider rolling back migration if critical services are affected") if any(f["type"] == "critical_mount_removed" for f in risk_assessment["risk_factors"]): recommendations.append("Verify system boot capability after mount changes") if any(f["type"] == "critical_service_removed" for f in risk_assessment["risk_factors"]): recommendations.append("Ensure critical services are restored before production cutover") if any(f["type"] == "filesystem_full" for f in risk_assessment["risk_factors"]): recommendations.append("Monitor disk space closely - cleanup may be required") if not recommendations: recommendations.append("Changes appear safe - proceed with standard validation procedures") return recommendations def validate_changes(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Validate that changes meet requirements.""" validation_results = { "passed": True, "checks": [], "failed_checks": [] } # Define validation checks checks = [ self.check_critical_services_running, self.check_filesystem_integrity, self.check_no_critical_mounts_removed ] for check_func in checks: check_result = check_func(differences) validation_results["checks"].append(check_result) if not check_result["passed"]: validation_results["passed"] = False validation_results["failed_checks"].append(check_result) return validation_results def check_critical_services_running(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Check that critical services are still running.""" check = { "name": "critical_services_running", "description": "Verify critical services remain operational", "passed": True, "details": [] } critical_services = ["sshd", "systemd"] for data_type, systems in differences.items(): if data_type == "services": for system, system_diffs in systems.items(): for change in system_diffs.get("status_changes", []): if change["name"] in critical_services: if change["after"]["active_state"] == "failed": check["passed"] = False check["details"].append(f"Critical service {change['name']} failed on {system}") return check def check_filesystem_integrity(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Check filesystem integrity after changes.""" check = { "name": "filesystem_integrity", "description": "Verify filesystem integrity maintained", "passed": True, "details": [] } for data_type, systems in differences.items(): if data_type == "disk_usage": for system, system_diffs in systems.items(): for change in system_diffs.get("significant_usage_changes", []): if change["change_percent"] > 50: # Arbitrary threshold check["passed"] = False check["details"].append(f"Extreme usage change on {system}:{change['mountpoint']}") return check def check_no_critical_mounts_removed(self, differences: Dict[str, Any]) -> Dict[str, Any]: """Check that no critical mount points were removed.""" check = { "name": "no_critical_mounts_removed", "description": "Verify critical mount points remain", "passed": True, "details": [] } critical_mounts = ["/", "/boot", "/usr", "/var"] for data_type, systems in differences.items(): if data_type == "mounts": for system, system_diffs in systems.items(): for mount in system_diffs.get("removed_mounts", []): if mount["mountpoint"] in critical_mounts: check["passed"] = False check["details"].append(f"Critical mount {mount['mountpoint']} removed from {system}") return check def compare_snapshots(snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]: """Main comparison function.""" comparator = SnapshotComparator() return comparator.compare_snapshots(snapshot1, snapshot2)