#!/usr/bin/env python3 """Summarize incident-oriented patterns in local log files.""" from __future__ import annotations import argparse import json import re import sys from datetime import datetime from pathlib import Path from typing import Any EXIT_OK = 0 EXIT_FINDINGS = 1 EXIT_INVALID = 2 UNKNOWN = "UNKNOWN" SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1} CRITICAL_PATTERNS = [ "CRITICAL", "FATAL", "panic", "kernel panic", "no space left on device", "out of memory", "killed process", "read-only file system", "segmentation fault", "segfault", "certificate expired", "TLS handshake failed", "SSLHandshakeException", "database unavailable", "HTTP 500", "HTTP 502", "HTTP 503", "HTTP 504", ] WARNING_PATTERNS = [ "ERROR", "failed", "failure", "timeout", "connection refused", "connection reset", "permission denied", "authentication failed", "denied", "unavailable", "service restart", "retrying", ] ISO_TIMESTAMP_RE = re.compile(r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})\b") SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Summarize suspicious and critical patterns in a local log file." ) parser.add_argument("--file", required=True, help="Local log file to analyze.") parser.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Report format. Default: text.", ) parser.add_argument("--output", help="Write report to this path instead of stdout.") parser.add_argument( "--top", type=positive_int, help="Limit finding groups after severity and count sorting.", ) parser.add_argument( "--ignore-case", action="store_true", help="Match all configured patterns case-insensitively.", ) parser.add_argument( "--since", type=parse_filter_timestamp, help='Include lines at or after "YYYY-MM-DD HH:MM:SS".', ) parser.add_argument( "--until", type=parse_filter_timestamp, help='Include lines at or before "YYYY-MM-DD HH:MM:SS".', ) parser.add_argument( "--max-samples", type=non_negative_int, default=3, help="Maximum sample lines per finding group. Default: 3.", ) return parser def positive_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be a positive integer") from exc if number <= 0: raise argparse.ArgumentTypeError("must be a positive integer") return number def non_negative_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc if number < 0: raise argparse.ArgumentTypeError("must be zero or a positive integer") return number def parse_filter_timestamp(value: str) -> datetime: for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(value, fmt) except ValueError: continue raise argparse.ArgumentTypeError( 'expected timestamp format "YYYY-MM-DD HH:MM:SS"' ) def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]: flags = re.IGNORECASE if ignore_case else 0 pattern_defs: list[dict[str, str]] = [] pattern_defs.extend( {"pattern": pattern, "severity": "CRITICAL"} for pattern in CRITICAL_PATTERNS ) pattern_defs.extend( {"pattern": pattern, "severity": "WARNING"} for pattern in WARNING_PATTERNS ) compiled = [] for item in pattern_defs: compiled.append( { "pattern": item["pattern"], "severity": item["severity"], "regex": re.compile(re.escape(item["pattern"]), flags), } ) return compiled def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str | None]: iso_match = ISO_TIMESTAMP_RE.search(line) if iso_match: raw = f"{iso_match.group(1)} {iso_match.group(2)}" try: return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S"), raw except ValueError: return None, None syslog_match = SYSLOG_TIMESTAMP_RE.search(line) if syslog_match: raw = syslog_match.group(1) normalized = f"{syslog_year} {raw}" try: parsed = datetime.strptime(normalized, "%Y %b %d %H:%M:%S") except ValueError: return None, None return parsed, parsed.strftime("%Y-%m-%d %H:%M:%S") return None, None def line_in_time_window( parsed_at: datetime | None, since: datetime | None, until: datetime | None ) -> bool: if parsed_at is None: return True if since is not None and parsed_at < since: return False if until is not None and parsed_at > until: return False return True def read_log_file(path: Path) -> list[str]: if not path.exists(): raise OSError(f"file does not exist: {path}") if not path.is_file(): raise OSError(f"path is not a regular file: {path}") try: text = path.read_text(encoding="utf-8", errors="replace") except PermissionError as exc: raise OSError(f"file is not readable: {path}") from exc except OSError as exc: raise OSError(f"unable to read file {path}: {exc}") from exc if text == "": raise ValueError(f"file is empty: {path}") return text.splitlines() def analyze_log( lines: list[str], patterns: list[dict[str, Any]], since: datetime | None, until: datetime | None, max_samples: int, ) -> dict[str, Any]: syslog_year = since.year if since is not None else datetime.now().year groups: dict[str, dict[str, Any]] = {} for line in lines: parsed_at, rendered_at = parse_line_timestamp(line, syslog_year) if not line_in_time_window(parsed_at, since, until): continue for item in patterns: if not item["regex"].search(line): continue key = f"{item['severity']}::{item['pattern']}" group = groups.setdefault( key, { "pattern": item["pattern"], "severity": item["severity"], "occurrences": 0, "first_seen": None, "last_seen": None, "samples": [], }, ) group["occurrences"] += 1 if parsed_at is not None: if group["first_seen"] is None or parsed_at < group["first_seen"][0]: group["first_seen"] = (parsed_at, rendered_at) if group["last_seen"] is None or parsed_at > group["last_seen"][0]: group["last_seen"] = (parsed_at, rendered_at) if len(group["samples"]) < max_samples: group["samples"].append(line) findings = sorted( groups.values(), key=lambda item: ( SEVERITY_ORDER[item["severity"]], -item["occurrences"], item["pattern"].lower(), ), ) rendered_findings = [] for group in findings: rendered_findings.append( { "pattern": group["pattern"], "severity": group["severity"], "occurrences": group["occurrences"], "first_seen": render_seen(group["first_seen"]), "last_seen": render_seen(group["last_seen"]), "samples": group["samples"], } ) return { "total_lines_scanned": len(lines), "findings": rendered_findings, } def render_seen(value: tuple[datetime, str | None] | None) -> str: if value is None: return UNKNOWN return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S") def apply_top_limit(report: dict[str, Any], top: int | None) -> dict[str, Any]: if top is None: return report limited = dict(report) limited["findings"] = report["findings"][:top] return limited def add_summary(report: dict[str, Any]) -> dict[str, Any]: findings = report["findings"] critical_groups = sum(1 for item in findings if item["severity"] == "CRITICAL") warning_groups = sum(1 for item in findings if item["severity"] == "WARNING") total_findings = sum(item["occurrences"] for item in findings) if critical_groups > 0: status = "CRITICAL" elif warning_groups > 0: status = "WARNING" else: status = "OK" enriched = dict(report) enriched["summary"] = { "total_lines_scanned": report["total_lines_scanned"], "total_findings": total_findings, "critical_finding_groups": critical_groups, "warning_finding_groups": warning_groups, "overall_status": status, } return enriched def render_text(report: dict[str, Any]) -> str: lines = ["Incident Log Summary", "====================", ""] if not report["findings"]: lines.append("No configured incident patterns were detected.") else: for finding in report["findings"]: lines.extend( [ f"[{finding['severity']}] {finding['pattern']}", f"Occurrences: {finding['occurrences']}", f"First seen: {finding['first_seen']}", f"Last seen: {finding['last_seen']}", "Samples:", ] ) if finding["samples"]: lines.extend(f" - {sample}" for sample in finding["samples"]) else: lines.append(" - No samples retained") lines.append("") lines.extend(render_text_summary(report["summary"])) return "\n".join(lines) + "\n" def render_text_summary(summary: dict[str, Any]) -> list[str]: return [ "Operational Summary", "-------------------", f"Total lines scanned: {summary['total_lines_scanned']}", f"Total findings: {summary['total_findings']}", f"Critical finding groups: {summary['critical_finding_groups']}", f"Warning finding groups: {summary['warning_finding_groups']}", f"Overall status: {summary['overall_status']}", ] def render_markdown(report: dict[str, Any]) -> str: lines = ["# Incident Log Summary", ""] if not report["findings"]: lines.extend(["No configured incident patterns were detected.", ""]) else: for finding in report["findings"]: lines.extend( [ f"## {finding['severity']}: {finding['pattern']}", "", f"- Occurrences: {finding['occurrences']}", f"- First seen: {finding['first_seen']}", f"- Last seen: {finding['last_seen']}", "", "Sample log lines:", "", ] ) if finding["samples"]: lines.append("```text") lines.extend(finding["samples"]) lines.append("```") else: lines.append("_No samples retained._") lines.append("") summary = report["summary"] lines.extend( [ "## Operational Summary", "", f"- Total lines scanned: {summary['total_lines_scanned']}", f"- Total findings: {summary['total_findings']}", f"- Critical finding groups: {summary['critical_finding_groups']}", f"- Warning finding groups: {summary['warning_finding_groups']}", f"- Overall status: {summary['overall_status']}", "", ] ) return "\n".join(lines) def render_json(report: dict[str, Any]) -> str: return json.dumps(report, indent=2, sort_keys=True) + "\n" def write_report(output_path: str | None, content: str) -> None: if output_path is None: sys.stdout.write(content) return path = Path(output_path) try: path.write_text(content, encoding="utf-8") except OSError as exc: raise OSError(f"unable to write output {path}: {exc}") from exc def main() -> int: parser = build_parser() args = parser.parse_args() if args.since is not None and args.until is not None and args.since > args.until: parser.error("--since must be earlier than or equal to --until") try: lines = read_log_file(Path(args.file)) report = analyze_log( lines=lines, patterns=compile_patterns(args.ignore_case), since=args.since, until=args.until, max_samples=args.max_samples, ) report = add_summary(apply_top_limit(report, args.top)) if args.format == "text": content = render_text(report) elif args.format == "markdown": content = render_markdown(report) else: content = render_json(report) write_report(args.output, content) except (OSError, ValueError) as exc: print(f"CRITICAL: {exc}", file=sys.stderr) return EXIT_INVALID except RuntimeError as exc: print(f"CRITICAL: runtime error: {exc}", file=sys.stderr) return EXIT_INVALID if report["summary"]["overall_status"] == "OK": return EXIT_OK return EXIT_FINDINGS if __name__ == "__main__": sys.exit(main())