#!/usr/bin/env python3 """Compare incident-oriented log patterns before and after a change.""" from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any EXIT_OK = 0 EXIT_FINDINGS = 1 EXIT_INVALID = 2 STATUS_ORDER = { "NEW": 0, "INCREASED": 1, "DECREASED": 2, "RESOLVED": 3, "UNCHANGED": 4, } SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1} CRITICAL_PATTERNS = [ "CRITICAL", "FATAL", "panic", "kernel panic", "no space left on device", "out of memory", "killed process", "read-only file system", "segmentation fault", "segfault", "certificate expired", "TLS handshake failed", "SSLHandshakeException", "database unavailable", "HTTP 500", "HTTP 502", "HTTP 503", "HTTP 504", ] WARNING_PATTERNS = [ "ERROR", "failed", "failure", "timeout", "connection refused", "connection reset", "permission denied", "authentication failed", "denied", "unavailable", "service restart", "retrying", ] def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Compare configured operational log patterns before and after a change." ) parser.add_argument("--before", required=True, help="Pre-change local log file.") parser.add_argument("--after", required=True, help="Post-change local log file.") parser.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Report format. Default: text.", ) parser.add_argument("--output", help="Write report to this path instead of stdout.") parser.add_argument( "--top", type=positive_int, help="Limit displayed findings after operational importance sorting.", ) parser.add_argument( "--ignore-case", action="store_true", help="Match all configured patterns case-insensitively.", ) parser.add_argument( "--max-samples", type=non_negative_int, default=3, help="Maximum sample lines per finding. Default: 3.", ) return parser def positive_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be a positive integer") from exc if number <= 0: raise argparse.ArgumentTypeError("must be a positive integer") return number def non_negative_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc if number < 0: raise argparse.ArgumentTypeError("must be zero or a positive integer") return number def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]: flags = re.IGNORECASE if ignore_case else 0 pattern_defs: list[dict[str, str]] = [] pattern_defs.extend( {"pattern": pattern, "severity": "CRITICAL"} for pattern in CRITICAL_PATTERNS ) pattern_defs.extend( {"pattern": pattern, "severity": "WARNING"} for pattern in WARNING_PATTERNS ) compiled = [] for item in pattern_defs: compiled.append( { "pattern": item["pattern"], "severity": item["severity"], "regex": re.compile(re.escape(item["pattern"]), flags), } ) return compiled def read_log_file(path: Path) -> list[str]: if not path.exists(): raise OSError(f"file does not exist: {path}") if not path.is_file(): raise OSError(f"path is not a regular file: {path}") try: text = path.read_text(encoding="utf-8", errors="replace") except PermissionError as exc: raise OSError(f"file is not readable: {path}") from exc except OSError as exc: raise OSError(f"unable to read file {path}: {exc}") from exc if text == "": raise ValueError(f"file is empty: {path}") return text.splitlines() def scan_log( lines: list[str], patterns: list[dict[str, Any]], max_samples: int ) -> dict[str, dict[str, Any]]: groups: dict[str, dict[str, Any]] = {} for line in lines: for item in patterns: if not item["regex"].search(line): continue key = f"{item['severity']}::{item['pattern']}" group = groups.setdefault( key, { "pattern": item["pattern"], "severity": item["severity"], "count": 0, "samples": [], }, ) group["count"] += 1 if len(group["samples"]) < max_samples: group["samples"].append(line) return groups def classify_status(before_count: int, after_count: int) -> str: if before_count == 0 and after_count > 0: return "NEW" if before_count > 0 and after_count == 0: return "RESOLVED" if after_count > before_count: return "INCREASED" if after_count < before_count: return "DECREASED" return "UNCHANGED" def sample_source_for(status: str) -> str: if status in ("NEW", "INCREASED"): return "after" if status in ("DECREASED", "RESOLVED"): return "before" return "after" def compare_logs( before_lines: list[str], after_lines: list[str], patterns: list[dict[str, Any]], max_samples: int, top: int | None, ) -> dict[str, Any]: before_groups = scan_log(before_lines, patterns, max_samples) after_groups = scan_log(after_lines, patterns, max_samples) compared_keys = sorted(set(before_groups) | set(after_groups)) findings = [] for key in compared_keys: before_group = before_groups.get(key) after_group = after_groups.get(key) reference = before_group or after_group if reference is None: continue before_count = before_group["count"] if before_group is not None else 0 after_count = after_group["count"] if after_group is not None else 0 status = classify_status(before_count, after_count) source = sample_source_for(status) sample_group = after_group if source == "after" else before_group findings.append( { "pattern": reference["pattern"], "severity": reference["severity"], "before_count": before_count, "after_count": after_count, "delta": after_count - before_count, "status": status, "sample_source": source, "samples": sample_group["samples"] if sample_group is not None else [], } ) sorted_findings = sorted(findings, key=finding_sort_key) summary = build_summary( before_lines=before_lines, after_lines=after_lines, findings=sorted_findings, ) displayed_findings = sorted_findings if top is None else sorted_findings[:top] return { "findings": displayed_findings, "summary": summary, } def finding_sort_key(finding: dict[str, Any]) -> tuple[int, int, int, int, str]: return ( STATUS_ORDER[finding["status"]], SEVERITY_ORDER[finding["severity"]], -abs(finding["delta"]), -finding["after_count"], finding["pattern"].lower(), ) def build_summary( before_lines: list[str], after_lines: list[str], findings: list[dict[str, Any]] ) -> dict[str, Any]: status_counts = { "NEW": 0, "INCREASED": 0, "DECREASED": 0, "RESOLVED": 0, "UNCHANGED": 0, } for finding in findings: status_counts[finding["status"]] += 1 critical_regressions = any( finding["severity"] == "CRITICAL" and finding["status"] in ("NEW", "INCREASED") for finding in findings ) warning_regressions = any( finding["severity"] == "WARNING" and finding["status"] in ("NEW", "INCREASED") for finding in findings ) if critical_regressions: overall_status = "CRITICAL" elif warning_regressions: overall_status = "WARNING" else: overall_status = "OK" return { "total_lines_scanned_before": len(before_lines), "total_lines_scanned_after": len(after_lines), "total_unique_patterns_compared": len(findings), "new_findings_count": status_counts["NEW"], "increased_findings_count": status_counts["INCREASED"], "decreased_findings_count": status_counts["DECREASED"], "resolved_findings_count": status_counts["RESOLVED"], "unchanged_findings_count": status_counts["UNCHANGED"], "overall_status": overall_status, } def render_text(report: dict[str, Any]) -> str: lines = ["Log Diff Checker", "================", ""] if not report["findings"]: lines.append("No configured operational patterns were detected in either log.") else: for finding in report["findings"]: lines.extend( [ f"[{finding['severity']}] {finding['pattern']} - {finding['status']}", f"Before count: {finding['before_count']}", f"After count: {finding['after_count']}", f"Delta: {finding['delta']:+d}", f"Sample source: {finding['sample_source']}", "Samples:", ] ) if finding["samples"]: lines.extend(f" - {sample}" for sample in finding["samples"]) else: lines.append(" - No samples retained") lines.append("") lines.extend(render_text_summary(report["summary"])) return "\n".join(lines) + "\n" def render_text_summary(summary: dict[str, Any]) -> list[str]: return [ "Operational Summary", "-------------------", f"Total lines scanned before: {summary['total_lines_scanned_before']}", f"Total lines scanned after: {summary['total_lines_scanned_after']}", f"Total unique patterns compared: {summary['total_unique_patterns_compared']}", f"New findings count: {summary['new_findings_count']}", f"Increased findings count: {summary['increased_findings_count']}", f"Decreased findings count: {summary['decreased_findings_count']}", f"Resolved findings count: {summary['resolved_findings_count']}", f"Unchanged findings count: {summary['unchanged_findings_count']}", f"Overall status: {summary['overall_status']}", ] def render_markdown(report: dict[str, Any]) -> str: lines = ["# Log Diff Checker", ""] if not report["findings"]: lines.extend(["No configured operational patterns were detected in either log.", ""]) else: for finding in report["findings"]: lines.extend( [ f"## {finding['severity']}: {finding['pattern']} ({finding['status']})", "", f"- Before count: {finding['before_count']}", f"- After count: {finding['after_count']}", f"- Delta: {finding['delta']:+d}", f"- Sample source: {finding['sample_source']}", "", "Sample log lines:", "", ] ) if finding["samples"]: lines.append("```text") lines.extend(finding["samples"]) lines.append("```") else: lines.append("_No samples retained._") lines.append("") summary = report["summary"] lines.extend( [ "## Operational Summary", "", f"- Total lines scanned before: {summary['total_lines_scanned_before']}", f"- Total lines scanned after: {summary['total_lines_scanned_after']}", f"- Total unique patterns compared: {summary['total_unique_patterns_compared']}", f"- New findings count: {summary['new_findings_count']}", f"- Increased findings count: {summary['increased_findings_count']}", f"- Decreased findings count: {summary['decreased_findings_count']}", f"- Resolved findings count: {summary['resolved_findings_count']}", f"- Unchanged findings count: {summary['unchanged_findings_count']}", f"- Overall status: {summary['overall_status']}", "", ] ) return "\n".join(lines) def render_json(report: dict[str, Any]) -> str: return json.dumps(report, indent=2, sort_keys=True) + "\n" def write_report( output_path: str | None, content: str, input_paths: tuple[Path, Path] ) -> None: if output_path is None: sys.stdout.write(content) return path = Path(output_path) try: output_resolved = path.resolve() input_resolved = {input_path.resolve() for input_path in input_paths} except OSError as exc: raise OSError(f"unable to validate output path {path}: {exc}") from exc if output_resolved in input_resolved: raise OSError("output path must not overwrite an input log file") try: path.write_text(content, encoding="utf-8") except OSError as exc: raise OSError(f"unable to write output {path}: {exc}") from exc def main() -> int: parser = build_parser() args = parser.parse_args() before_path = Path(args.before) after_path = Path(args.after) try: before_lines = read_log_file(before_path) after_lines = read_log_file(after_path) report = compare_logs( before_lines=before_lines, after_lines=after_lines, patterns=compile_patterns(args.ignore_case), max_samples=args.max_samples, top=args.top, ) if args.format == "text": content = render_text(report) elif args.format == "markdown": content = render_markdown(report) else: content = render_json(report) write_report(args.output, content, (before_path, after_path)) except (OSError, ValueError) as exc: print(f"CRITICAL: {exc}", file=sys.stderr) return EXIT_INVALID except RuntimeError as exc: print(f"CRITICAL: runtime error: {exc}", file=sys.stderr) return EXIT_INVALID if report["summary"]["overall_status"] == "OK": return EXIT_OK return EXIT_FINDINGS if __name__ == "__main__": sys.exit(main())