Files
portfolio/infra-run/scripts/python/log-diff-checker/log_diff_checker.py
T

463 lines
14 KiB
Python
Raw Normal View History

2026-05-11 17:04:10 +00:00
#!/usr/bin/env python3
"""Compare incident-oriented log patterns before and after a change."""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
EXIT_OK = 0
EXIT_FINDINGS = 1
EXIT_INVALID = 2
STATUS_ORDER = {
"NEW": 0,
"INCREASED": 1,
"DECREASED": 2,
"RESOLVED": 3,
"UNCHANGED": 4,
}
SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1}
CRITICAL_PATTERNS = [
"CRITICAL",
"FATAL",
"panic",
"kernel panic",
"no space left on device",
"out of memory",
"killed process",
"read-only file system",
"segmentation fault",
"segfault",
"certificate expired",
"TLS handshake failed",
"SSLHandshakeException",
"database unavailable",
"HTTP 500",
"HTTP 502",
"HTTP 503",
"HTTP 504",
]
WARNING_PATTERNS = [
"ERROR",
"failed",
"failure",
"timeout",
"connection refused",
"connection reset",
"permission denied",
"authentication failed",
"denied",
"unavailable",
"service restart",
"retrying",
]
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Compare configured operational log patterns before and after a change."
)
parser.add_argument("--before", required=True, help="Pre-change local log file.")
parser.add_argument("--after", required=True, help="Post-change local log file.")
parser.add_argument(
"--format",
choices=("text", "markdown", "json"),
default="text",
help="Report format. Default: text.",
)
parser.add_argument("--output", help="Write report to this path instead of stdout.")
parser.add_argument(
"--top",
type=positive_int,
help="Limit displayed findings after operational importance sorting.",
)
parser.add_argument(
"--ignore-case",
action="store_true",
help="Match all configured patterns case-insensitively.",
)
parser.add_argument(
"--max-samples",
type=non_negative_int,
default=3,
help="Maximum sample lines per finding. Default: 3.",
)
return parser
def positive_int(value: str) -> int:
try:
number = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be a positive integer") from exc
if number <= 0:
raise argparse.ArgumentTypeError("must be a positive integer")
return number
def non_negative_int(value: str) -> int:
try:
number = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc
if number < 0:
raise argparse.ArgumentTypeError("must be zero or a positive integer")
return number
def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]:
flags = re.IGNORECASE if ignore_case else 0
pattern_defs: list[dict[str, str]] = []
pattern_defs.extend(
{"pattern": pattern, "severity": "CRITICAL"} for pattern in CRITICAL_PATTERNS
)
pattern_defs.extend(
{"pattern": pattern, "severity": "WARNING"} for pattern in WARNING_PATTERNS
)
compiled = []
for item in pattern_defs:
compiled.append(
{
"pattern": item["pattern"],
"severity": item["severity"],
"regex": re.compile(re.escape(item["pattern"]), flags),
}
)
return compiled
def read_log_file(path: Path) -> list[str]:
if not path.exists():
raise OSError(f"file does not exist: {path}")
if not path.is_file():
raise OSError(f"path is not a regular file: {path}")
try:
text = path.read_text(encoding="utf-8", errors="replace")
except PermissionError as exc:
raise OSError(f"file is not readable: {path}") from exc
except OSError as exc:
raise OSError(f"unable to read file {path}: {exc}") from exc
if text == "":
raise ValueError(f"file is empty: {path}")
return text.splitlines()
def scan_log(
lines: list[str], patterns: list[dict[str, Any]], max_samples: int
) -> dict[str, dict[str, Any]]:
groups: dict[str, dict[str, Any]] = {}
for line in lines:
for item in patterns:
if not item["regex"].search(line):
continue
key = f"{item['severity']}::{item['pattern']}"
group = groups.setdefault(
key,
{
"pattern": item["pattern"],
"severity": item["severity"],
"count": 0,
"samples": [],
},
)
group["count"] += 1
if len(group["samples"]) < max_samples:
group["samples"].append(line)
return groups
def classify_status(before_count: int, after_count: int) -> str:
if before_count == 0 and after_count > 0:
return "NEW"
if before_count > 0 and after_count == 0:
return "RESOLVED"
if after_count > before_count:
return "INCREASED"
if after_count < before_count:
return "DECREASED"
return "UNCHANGED"
def sample_source_for(status: str) -> str:
if status in ("NEW", "INCREASED"):
return "after"
if status in ("DECREASED", "RESOLVED"):
return "before"
return "after"
def compare_logs(
before_lines: list[str],
after_lines: list[str],
patterns: list[dict[str, Any]],
max_samples: int,
top: int | None,
) -> dict[str, Any]:
before_groups = scan_log(before_lines, patterns, max_samples)
after_groups = scan_log(after_lines, patterns, max_samples)
compared_keys = sorted(set(before_groups) | set(after_groups))
findings = []
for key in compared_keys:
before_group = before_groups.get(key)
after_group = after_groups.get(key)
reference = before_group or after_group
if reference is None:
continue
before_count = before_group["count"] if before_group is not None else 0
after_count = after_group["count"] if after_group is not None else 0
status = classify_status(before_count, after_count)
source = sample_source_for(status)
sample_group = after_group if source == "after" else before_group
findings.append(
{
"pattern": reference["pattern"],
"severity": reference["severity"],
"before_count": before_count,
"after_count": after_count,
"delta": after_count - before_count,
"status": status,
"sample_source": source,
"samples": sample_group["samples"] if sample_group is not None else [],
}
)
sorted_findings = sorted(findings, key=finding_sort_key)
summary = build_summary(
before_lines=before_lines,
after_lines=after_lines,
findings=sorted_findings,
)
displayed_findings = sorted_findings if top is None else sorted_findings[:top]
return {
"findings": displayed_findings,
"summary": summary,
}
def finding_sort_key(finding: dict[str, Any]) -> tuple[int, int, int, int, str]:
return (
STATUS_ORDER[finding["status"]],
SEVERITY_ORDER[finding["severity"]],
-abs(finding["delta"]),
-finding["after_count"],
finding["pattern"].lower(),
)
def build_summary(
before_lines: list[str], after_lines: list[str], findings: list[dict[str, Any]]
) -> dict[str, Any]:
status_counts = {
"NEW": 0,
"INCREASED": 0,
"DECREASED": 0,
"RESOLVED": 0,
"UNCHANGED": 0,
}
for finding in findings:
status_counts[finding["status"]] += 1
critical_regressions = any(
finding["severity"] == "CRITICAL"
and finding["status"] in ("NEW", "INCREASED")
for finding in findings
)
warning_regressions = any(
finding["severity"] == "WARNING"
and finding["status"] in ("NEW", "INCREASED")
for finding in findings
)
if critical_regressions:
overall_status = "CRITICAL"
elif warning_regressions:
overall_status = "WARNING"
else:
overall_status = "OK"
return {
"total_lines_scanned_before": len(before_lines),
"total_lines_scanned_after": len(after_lines),
"total_unique_patterns_compared": len(findings),
"new_findings_count": status_counts["NEW"],
"increased_findings_count": status_counts["INCREASED"],
"decreased_findings_count": status_counts["DECREASED"],
"resolved_findings_count": status_counts["RESOLVED"],
"unchanged_findings_count": status_counts["UNCHANGED"],
"overall_status": overall_status,
}
def render_text(report: dict[str, Any]) -> str:
lines = ["Log Diff Checker", "================", ""]
if not report["findings"]:
lines.append("No configured operational patterns were detected in either log.")
else:
for finding in report["findings"]:
lines.extend(
[
f"[{finding['severity']}] {finding['pattern']} - {finding['status']}",
f"Before count: {finding['before_count']}",
f"After count: {finding['after_count']}",
f"Delta: {finding['delta']:+d}",
f"Sample source: {finding['sample_source']}",
"Samples:",
]
)
if finding["samples"]:
lines.extend(f" - {sample}" for sample in finding["samples"])
else:
lines.append(" - No samples retained")
lines.append("")
lines.extend(render_text_summary(report["summary"]))
return "\n".join(lines) + "\n"
def render_text_summary(summary: dict[str, Any]) -> list[str]:
return [
"Operational Summary",
"-------------------",
f"Total lines scanned before: {summary['total_lines_scanned_before']}",
f"Total lines scanned after: {summary['total_lines_scanned_after']}",
f"Total unique patterns compared: {summary['total_unique_patterns_compared']}",
f"New findings count: {summary['new_findings_count']}",
f"Increased findings count: {summary['increased_findings_count']}",
f"Decreased findings count: {summary['decreased_findings_count']}",
f"Resolved findings count: {summary['resolved_findings_count']}",
f"Unchanged findings count: {summary['unchanged_findings_count']}",
f"Overall status: {summary['overall_status']}",
]
def render_markdown(report: dict[str, Any]) -> str:
lines = ["# Log Diff Checker", ""]
if not report["findings"]:
lines.extend(["No configured operational patterns were detected in either log.", ""])
else:
for finding in report["findings"]:
lines.extend(
[
f"## {finding['severity']}: {finding['pattern']} ({finding['status']})",
"",
f"- Before count: {finding['before_count']}",
f"- After count: {finding['after_count']}",
f"- Delta: {finding['delta']:+d}",
f"- Sample source: {finding['sample_source']}",
"",
"Sample log lines:",
"",
]
)
if finding["samples"]:
lines.append("```text")
lines.extend(finding["samples"])
lines.append("```")
else:
lines.append("_No samples retained._")
lines.append("")
summary = report["summary"]
lines.extend(
[
"## Operational Summary",
"",
f"- Total lines scanned before: {summary['total_lines_scanned_before']}",
f"- Total lines scanned after: {summary['total_lines_scanned_after']}",
f"- Total unique patterns compared: {summary['total_unique_patterns_compared']}",
f"- New findings count: {summary['new_findings_count']}",
f"- Increased findings count: {summary['increased_findings_count']}",
f"- Decreased findings count: {summary['decreased_findings_count']}",
f"- Resolved findings count: {summary['resolved_findings_count']}",
f"- Unchanged findings count: {summary['unchanged_findings_count']}",
f"- Overall status: {summary['overall_status']}",
"",
]
)
return "\n".join(lines)
def render_json(report: dict[str, Any]) -> str:
return json.dumps(report, indent=2, sort_keys=True) + "\n"
def write_report(
output_path: str | None, content: str, input_paths: tuple[Path, Path]
) -> None:
if output_path is None:
sys.stdout.write(content)
return
path = Path(output_path)
try:
output_resolved = path.resolve()
input_resolved = {input_path.resolve() for input_path in input_paths}
except OSError as exc:
raise OSError(f"unable to validate output path {path}: {exc}") from exc
if output_resolved in input_resolved:
raise OSError("output path must not overwrite an input log file")
try:
path.write_text(content, encoding="utf-8")
except OSError as exc:
raise OSError(f"unable to write output {path}: {exc}") from exc
def main() -> int:
parser = build_parser()
args = parser.parse_args()
before_path = Path(args.before)
after_path = Path(args.after)
try:
before_lines = read_log_file(before_path)
after_lines = read_log_file(after_path)
report = compare_logs(
before_lines=before_lines,
after_lines=after_lines,
patterns=compile_patterns(args.ignore_case),
max_samples=args.max_samples,
top=args.top,
)
if args.format == "text":
content = render_text(report)
elif args.format == "markdown":
content = render_markdown(report)
else:
content = render_json(report)
write_report(args.output, content, (before_path, after_path))
except (OSError, ValueError) as exc:
print(f"CRITICAL: {exc}", file=sys.stderr)
return EXIT_INVALID
except RuntimeError as exc:
print(f"CRITICAL: runtime error: {exc}", file=sys.stderr)
return EXIT_INVALID
if report["summary"]["overall_status"] == "OK":
return EXIT_OK
return EXIT_FINDINGS
if __name__ == "__main__":
sys.exit(main())