Files
portfolio/migration-validation-framework/validators/compare.py
T
Mateusz Suski 8783892241
ci / validate (push) Waiting to run
Polish infrastructure portfolio projects
2026-04-29 23:30:30 +00:00

502 lines
20 KiB
Python

"""
Snapshot Comparison Engine
Compares two system snapshots and identifies differences,
risk levels, and validation results.
"""
import json
import logging
from typing import Dict, Any, List, Tuple
from datetime import datetime
logger = logging.getLogger(__name__)
class SnapshotComparator:
"""Engine for comparing system snapshots."""
def __init__(self):
self.risk_levels = {
"low": 1,
"medium": 2,
"high": 3,
"critical": 4
}
def compare_snapshots(self, snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare two snapshots and return detailed comparison results."""
logger.info("Starting snapshot comparison")
comparison = {
"summary": {},
"differences": {},
"risk_assessment": {},
"validation_results": {}
}
# Compare each data type
data_types = ["mounts", "services", "disk_usage"]
data1 = snapshot1.get("data", {})
data2 = snapshot2.get("data", {})
for data_type in data_types:
if self.data_type_exists(data1, data_type) or self.data_type_exists(data2, data_type):
differences = self.compare_data_type(data1, data2, data_type)
comparison["differences"][data_type] = differences
# Generate summary
comparison["summary"] = self.generate_summary(comparison["differences"])
# Risk assessment
comparison["risk_assessment"] = self.assess_risks(comparison["differences"])
# Validation results
comparison["validation_results"] = self.validate_changes(comparison["differences"])
comparison["validation_results"]["result"] = "PASS" if comparison["validation_results"]["passed"] else "FAIL"
logger.info("Snapshot comparison completed")
return comparison
def data_type_exists(self, systems: Dict[str, Any], data_type: str) -> bool:
"""Return true when at least one system has the requested collector data."""
return any(data_type in system_data for system_data in systems.values())
def compare_data_type(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare a specific data type between two snapshots."""
differences = {}
# Get all systems from both snapshots
systems1 = set(data1.keys())
systems2 = set(data2.keys())
all_systems = systems1.union(systems2)
for system in all_systems:
system_diffs = {}
if system not in data1:
system_diffs["status"] = "added"
system_diffs["details"] = {"new_system": True}
elif system not in data2:
system_diffs["status"] = "removed"
system_diffs["details"] = {"removed_system": True}
else:
# Compare data for this system and data type
if data_type in data1[system] and data_type in data2[system]:
system_diffs = self.compare_system_data(
data1[system][data_type],
data2[system][data_type],
data_type
)
else:
system_diffs["status"] = "data_missing"
system_diffs["details"] = {"missing_data_type": data_type}
if system_diffs:
differences[system] = system_diffs
return differences
def compare_system_data(self, data1: Dict[str, Any], data2: Dict[str, Any], data_type: str) -> Dict[str, Any]:
"""Compare data for a specific system and data type."""
differences = {}
if data_type == "mounts":
differences = self.compare_mounts(data1, data2)
elif data_type == "services":
differences = self.compare_services(data1, data2)
elif data_type == "disk_usage":
differences = self.compare_disk_usage(data1, data2)
else:
differences["status"] = "unknown_data_type"
return differences
def compare_mounts(self, mounts1: Dict[str, Any], mounts2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare mounts data between snapshots."""
differences = {
"added_mounts": [],
"removed_mounts": [],
"changed_mounts": [],
"usage_changes": []
}
# Compare mount lists
mounts_list1 = mounts1.get("mounts", [])
mounts_list2 = mounts2.get("mounts", [])
# Create mountpoint maps
mounts_map1 = {m["mountpoint"]: m for m in mounts_list1}
mounts_map2 = {m["mountpoint"]: m for m in mounts_list2}
# Find added and removed mounts
added = set(mounts_map2.keys()) - set(mounts_map1.keys())
removed = set(mounts_map1.keys()) - set(mounts_map2.keys())
differences["added_mounts"] = [{"mountpoint": mp, **mounts_map2[mp]} for mp in added]
differences["removed_mounts"] = [{"mountpoint": mp, **mounts_map1[mp]} for mp in removed]
# Find changed mounts
common = set(mounts_map1.keys()) & set(mounts_map2.keys())
for mp in common:
m1, m2 = mounts_map1[mp], mounts_map2[mp]
if m1 != m2:
differences["changed_mounts"].append({
"mountpoint": mp,
"before": m1,
"after": m2
})
# Compare usage statistics
usage1 = mounts1.get("usage", {})
usage2 = mounts2.get("usage", {})
for mp in set(usage1.keys()) | set(usage2.keys()):
if mp in usage1 and mp in usage2:
u1, u2 = usage1[mp], usage2[mp]
if u1 != u2:
differences["usage_changes"].append({
"mountpoint": mp,
"before": u1,
"after": u2
})
return differences
def compare_services(self, services1: Dict[str, Any], services2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare services data between snapshots."""
differences = {
"added_services": [],
"removed_services": [],
"status_changes": [],
"configuration_changes": []
}
# Compare service lists
services_list1 = services1.get("services", [])
services_list2 = services2.get("services", [])
# Create service maps
services_map1 = {s["name"]: s for s in services_list1}
services_map2 = {s["name"]: s for s in services_list2}
# Find added and removed services
added = set(services_map2.keys()) - set(services_map1.keys())
removed = set(services_map1.keys()) - set(services_map2.keys())
differences["added_services"] = [{"name": name, **services_map2[name]} for name in added]
differences["removed_services"] = [{"name": name, **services_map1[name]} for name in removed]
# Find status changes
common = set(services_map1.keys()) & set(services_map2.keys())
for name in common:
s1, s2 = services_map1[name], services_map2[name]
if s1.get("active_state") != s2.get("active_state") or s1.get("sub_state") != s2.get("sub_state"):
differences["status_changes"].append({
"name": name,
"before": {"active_state": s1.get("active_state"), "sub_state": s1.get("sub_state")},
"after": {"active_state": s2.get("active_state"), "sub_state": s2.get("sub_state")}
})
return differences
def compare_disk_usage(self, usage1: Dict[str, Any], usage2: Dict[str, Any]) -> Dict[str, Any]:
"""Compare disk usage data between snapshots."""
differences = {
"filesystem_changes": [],
"directory_size_changes": [],
"significant_usage_changes": []
}
# Compare filesystem usage
fs1 = usage1.get("filesystem_usage", [])
fs2 = usage2.get("filesystem_usage", [])
# Create filesystem maps by mountpoint
fs_map1 = {fs["mountpoint"]: fs for fs in fs1}
fs_map2 = {fs["mountpoint"]: fs for fs in fs2}
common_fs = set(fs_map1.keys()) & set(fs_map2.keys())
for mp in common_fs:
f1, f2 = fs_map1[mp], fs_map2[mp]
if f1 != f2:
differences["filesystem_changes"].append({
"mountpoint": mp,
"before": f1,
"after": f2
})
# Check for significant usage changes
try:
use1 = int(f1.get("use_percent", "0").rstrip("%"))
use2 = int(f2.get("use_percent", "0").rstrip("%"))
if abs(use2 - use1) > 10: # 10% change threshold
differences["significant_usage_changes"].append({
"mountpoint": mp,
"change_percent": use2 - use1,
"before": f1,
"after": f2
})
except (ValueError, KeyError):
pass
return differences
def generate_summary(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a summary of all differences."""
summary = {
"total_systems": 0,
"systems_with_changes": 0,
"total_changes": 0,
"changes_by_type": {},
"most_affected_systems": []
}
system_change_counts = {}
for data_type, systems in differences.items():
summary["changes_by_type"][data_type] = 0
for system, system_diffs in systems.items():
if system not in system_change_counts:
system_change_counts[system] = 0
# Count changes for this system and data type
change_count = self.count_changes(system_diffs)
system_change_counts[system] += change_count
summary["changes_by_type"][data_type] += change_count
summary["total_changes"] += change_count
summary["total_systems"] = len(system_change_counts)
# Count systems with changes
summary["systems_with_changes"] = len([s for s in system_change_counts.values() if s > 0])
# Find most affected systems
sorted_systems = sorted(system_change_counts.items(), key=lambda x: x[1], reverse=True)
summary["most_affected_systems"] = sorted_systems[:5]
return summary
def count_changes(self, system_diffs: Dict[str, Any]) -> int:
"""Count the number of changes in system differences."""
count = 0
for key, value in system_diffs.items():
if isinstance(value, list):
count += len(value)
elif isinstance(value, dict) and key not in ["status"]:
# Count nested changes
count += sum(1 for v in value.values() if isinstance(v, list) and v)
return count
def assess_risks(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Assess risk levels for the changes."""
risk_assessment = {
"overall_risk": "low",
"risk_factors": [],
"critical_changes": [],
"recommendations": []
}
max_risk_level = 1
# Analyze each type of change
for data_type, systems in differences.items():
for system, system_diffs in systems.items():
risk_factors = self.analyze_system_risks(system_diffs, data_type)
risk_assessment["risk_factors"].extend(risk_factors)
for factor in risk_factors:
if factor["level"] > max_risk_level:
max_risk_level = factor["level"]
if factor["level"] >= 4: # Critical
risk_assessment["critical_changes"].append({
"system": system,
"data_type": data_type,
"factor": factor
})
# Set overall risk
risk_levels = {1: "low", 2: "medium", 3: "high", 4: "critical"}
risk_assessment["overall_risk"] = risk_levels.get(max_risk_level, "unknown")
# Generate recommendations
risk_assessment["recommendations"] = self.generate_recommendations(risk_assessment)
return risk_assessment
def analyze_system_risks(self, system_diffs: Dict[str, Any], data_type: str) -> List[Dict[str, Any]]:
"""Analyze risks for a specific system's changes."""
risk_factors = []
if data_type == "mounts":
# Check for removed critical mounts
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in ["/", "/boot", "/usr", "/var"]:
risk_factors.append({
"type": "critical_mount_removed",
"description": f"Critical mount point removed: {mount['mountpoint']}",
"level": 4
})
# Check for significant usage changes
for change in system_diffs.get("usage_changes", []):
try:
before_pct = int(change["before"].get("use_percent", "0").rstrip("%"))
after_pct = int(change["after"].get("use_percent", "0").rstrip("%"))
if after_pct > 95:
risk_factors.append({
"type": "filesystem_full",
"description": f"Filesystem usage critical: {change['mountpoint']} at {after_pct}%",
"level": 3
})
except (ValueError, KeyError):
pass
elif data_type == "services":
# Check for critical service changes
critical_services = ["sshd", "systemd", "networking", "dbus"]
for service in system_diffs.get("removed_services", []):
if service["name"] in critical_services:
risk_factors.append({
"type": "critical_service_removed",
"description": f"Critical service removed: {service['name']}",
"level": 4
})
for change in system_diffs.get("status_changes", []):
if change["after"]["active_state"] == "failed":
risk_factors.append({
"type": "service_failure",
"description": f"Service failed: {change['name']}",
"level": 3
})
elif data_type == "disk_usage":
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 20:
risk_factors.append({
"type": "disk_usage_spike",
"description": f"Significant disk usage change: {change['mountpoint']} ({change['change_percent']}%)",
"level": 2
})
return risk_factors
def generate_recommendations(self, risk_assessment: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on risk assessment."""
recommendations = []
if risk_assessment["overall_risk"] in ["high", "critical"]:
recommendations.append("Immediate review required - critical changes detected")
recommendations.append("Consider rolling back migration if critical services are affected")
if any(f["type"] == "critical_mount_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Verify system boot capability after mount changes")
if any(f["type"] == "critical_service_removed" for f in risk_assessment["risk_factors"]):
recommendations.append("Ensure critical services are restored before production cutover")
if any(f["type"] == "filesystem_full" for f in risk_assessment["risk_factors"]):
recommendations.append("Monitor disk space closely - cleanup may be required")
if not recommendations:
recommendations.append("Changes appear safe - proceed with standard validation procedures")
return recommendations
def validate_changes(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that changes meet requirements."""
validation_results = {
"passed": True,
"checks": [],
"failed_checks": []
}
# Define validation checks
checks = [
self.check_critical_services_running,
self.check_filesystem_integrity,
self.check_no_critical_mounts_removed
]
for check_func in checks:
check_result = check_func(differences)
validation_results["checks"].append(check_result)
if not check_result["passed"]:
validation_results["passed"] = False
validation_results["failed_checks"].append(check_result)
return validation_results
def check_critical_services_running(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that critical services are still running."""
check = {
"name": "critical_services_running",
"description": "Verify critical services remain operational",
"passed": True,
"details": []
}
critical_services = ["sshd", "systemd"]
for data_type, systems in differences.items():
if data_type == "services":
for system, system_diffs in systems.items():
for change in system_diffs.get("status_changes", []):
if change["name"] in critical_services:
if change["after"]["active_state"] == "failed":
check["passed"] = False
check["details"].append(f"Critical service {change['name']} failed on {system}")
return check
def check_filesystem_integrity(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check filesystem integrity after changes."""
check = {
"name": "filesystem_integrity",
"description": "Verify filesystem integrity maintained",
"passed": True,
"details": []
}
for data_type, systems in differences.items():
if data_type == "disk_usage":
for system, system_diffs in systems.items():
for change in system_diffs.get("significant_usage_changes", []):
if change["change_percent"] > 50: # Arbitrary threshold
check["passed"] = False
check["details"].append(f"Extreme usage change on {system}:{change['mountpoint']}")
return check
def check_no_critical_mounts_removed(self, differences: Dict[str, Any]) -> Dict[str, Any]:
"""Check that no critical mount points were removed."""
check = {
"name": "no_critical_mounts_removed",
"description": "Verify critical mount points remain",
"passed": True,
"details": []
}
critical_mounts = ["/", "/boot", "/usr", "/var"]
for data_type, systems in differences.items():
if data_type == "mounts":
for system, system_diffs in systems.items():
for mount in system_diffs.get("removed_mounts", []):
if mount["mountpoint"] in critical_mounts:
check["passed"] = False
check["details"].append(f"Critical mount {mount['mountpoint']} removed from {system}")
return check
def compare_snapshots(snapshot1: Dict[str, Any], snapshot2: Dict[str, Any]) -> Dict[str, Any]:
"""Main comparison function."""
comparator = SnapshotComparator()
return comparator.compare_snapshots(snapshot1, snapshot2)