#!/usr/bin/env python3 """Analyze exported journalctl text logs for operational findings.""" from __future__ import annotations import argparse import json import re import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Any EXIT_OK = 0 EXIT_FINDINGS = 1 EXIT_INVALID = 2 UNKNOWN = "UNKNOWN" SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1} CRITICAL_PATTERNS = [ { "name": "failed to start", "pattern": "failed to start", "category": "failed_unit", "service_hint": "systemd", }, { "name": "entered failed state", "pattern": "entered failed state", "category": "failed_unit", "service_hint": "systemd", }, { "name": "dependency failed", "pattern": "dependency failed", "category": "dependency_failure", "service_hint": "systemd", }, { "name": "job failed", "pattern": "job failed", "category": "failed_unit", "service_hint": "systemd", }, { "name": "unit failed", "pattern": "unit failed", "category": "failed_unit", "service_hint": "systemd", }, { "name": "kernel panic", "pattern": "kernel panic", "category": "kernel_panic", "service_hint": "kernel", }, { "name": "panic", "pattern": "panic", "category": "kernel_panic", "service_hint": "kernel", }, { "name": "Out of memory", "pattern": "Out of memory", "category": "oom", "service_hint": "kernel", }, { "name": "invoked oom-killer", "pattern": "invoked oom-killer", "category": "oom", "service_hint": "kernel", }, { "name": "killed process", "pattern": "killed process", "category": "oom", "service_hint": "kernel", }, { "name": "no space left on device", "pattern": "no space left on device", "category": "disk_filesystem", "service_hint": "storage", }, { "name": "read-only file system", "pattern": "read-only file system", "category": "disk_filesystem", "service_hint": "storage", }, { "name": "segmentation fault", "pattern": "segmentation fault", "category": "crash", "service_hint": "application", }, { "name": "segfault", "pattern": "segfault", "category": "crash", "service_hint": "application", }, { "name": "certificate expired", "pattern": "certificate expired", "category": "tls_certificate", "service_hint": "tls", }, { "name": "TLS handshake failed", "pattern": "TLS handshake failed", "category": "tls_certificate", "service_hint": "tls", }, { "name": "emergency mode", "pattern": "emergency mode", "category": "system_recovery", "service_hint": "systemd", }, { "name": "filesystem error", "pattern": "filesystem error", "category": "disk_filesystem", "service_hint": "storage", }, { "name": "I/O error", "pattern": "I/O error", "category": "disk_filesystem", "service_hint": "storage", }, ] WARNING_PATTERNS = [ { "name": "service restart", "pattern": "service restart", "category": "restart", "service_hint": "systemd", }, { "name": "scheduled restart job", "pattern": "scheduled restart job", "category": "restart", "service_hint": "systemd", }, { "name": "start request repeated too quickly", "pattern": "start request repeated too quickly", "category": "restart", "service_hint": "systemd", }, { "name": "timeout", "pattern": "timeout", "category": "timeout", "service_hint": "application", }, { "name": "timed out", "pattern": "timed out", "category": "timeout", "service_hint": "application", }, { "name": "connection refused", "pattern": "connection refused", "category": "network", "service_hint": "network", }, { "name": "connection reset", "pattern": "connection reset", "category": "network", "service_hint": "network", }, { "name": "permission denied", "pattern": "permission denied", "category": "permission", "service_hint": "security", }, { "name": "authentication failure", "pattern": "authentication failure", "category": "authentication", "service_hint": "security", }, { "name": "denied", "pattern": "denied", "category": "permission", "service_hint": "security", }, { "name": "unavailable", "pattern": "unavailable", "category": "availability", "service_hint": "application", }, { "name": "degraded", "pattern": "degraded", "category": "degraded", "service_hint": "systemd", }, { "name": "failed", "pattern": "failed", "category": "generic_failure", "service_hint": "application", }, { "name": "warning", "pattern": "warning", "category": "warning", "service_hint": "application", }, ] ISO_TIMESTAMP_RE = re.compile( r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})([,.]\d{1,6})?\b" ) SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b") UNIT_RE = re.compile(r"\b([A-Za-z0-9_.@:-]+\.service)\b") ANY_UNIT_RE = re.compile( r"\b([A-Za-z0-9_.@:-]+\.(?:service|socket|mount|target|timer|path|slice|scope|device))\b" ) PREFIX_RE = re.compile( r"^(?:[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+)?" r"(?:\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[,.]\d{1,6})?\s+)?" r"(?:(?P[A-Za-z0-9_.:-]+)\s+)?" r"(?P[A-Za-z0-9_.@/-]+)(?:\[(?P\d+)\])?:" ) KILLED_PROCESS_RE = re.compile(r"Killed process \d+ \(([^)]+)\)") SYSTEMD_FAILED_START_RE = re.compile(r"Failed to start\s+(.+?)\.") SYSTEMD_TRIGGER_RE = re.compile(r"Triggered By:\s*([A-Za-z0-9_.@:-]+\.(?:service|socket|mount|target|timer|path|slice|scope|device))") PID_RE = re.compile(r"\bpid[ =](\d+)\b", re.IGNORECASE) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Analyze exported journalctl text logs for systemd and service findings." ) parser.add_argument("--file", required=True, help="Exported journal log file to analyze.") parser.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Report format. Default: text.", ) parser.add_argument("--output", help="Write report to this path instead of stdout.") parser.add_argument( "--service", help="Filter findings to a service, unit, or process name. Partial matching is allowed.", ) parser.add_argument( "--severity", choices=("warning", "critical"), help="Show only warning or critical findings.", ) parser.add_argument( "--top", type=positive_int, default=10, help="Number of top groups, services, and categories to display. Default: 10.", ) parser.add_argument( "--max-samples", type=non_negative_int, default=3, help="Maximum sample lines per finding group. Default: 3.", ) parser.add_argument( "--ignore-case", action="store_true", help="Match configured patterns case-insensitively.", ) parser.add_argument( "--since", type=parse_filter_timestamp, help='Include lines at or after "YYYY-MM-DD HH:MM:SS".', ) parser.add_argument( "--until", type=parse_filter_timestamp, help='Include lines at or before "YYYY-MM-DD HH:MM:SS".', ) return parser def positive_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be a positive integer") from exc if number <= 0: raise argparse.ArgumentTypeError("must be a positive integer") return number def non_negative_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc if number < 0: raise argparse.ArgumentTypeError("must be zero or a positive integer") return number def parse_filter_timestamp(value: str) -> datetime: for fmt in ( "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S,%f", ): try: return datetime.strptime(value, fmt) except ValueError: continue raise argparse.ArgumentTypeError( 'expected timestamp format "YYYY-MM-DD HH:MM:SS"' ) def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]: flags = re.IGNORECASE if ignore_case: flags |= re.IGNORECASE compiled = [] for item in CRITICAL_PATTERNS: compiled.append( { **item, "severity": "CRITICAL", "regex": re.compile(re.escape(item["pattern"]), flags), } ) for item in WARNING_PATTERNS: compiled.append( { **item, "severity": "WARNING", "regex": re.compile(re.escape(item["pattern"]), flags), } ) return compiled def read_log_file(path: Path) -> list[str]: if not path.exists(): raise OSError(f"file does not exist: {path}") if not path.is_file(): raise OSError(f"path is not a regular file: {path}") try: text = path.read_text(encoding="utf-8", errors="replace") except PermissionError as exc: raise OSError(f"file is not readable: {path}") from exc except OSError as exc: raise OSError(f"unable to read file {path}: {exc}") from exc if text == "": raise ValueError(f"file is empty: {path}") return text.splitlines() def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str]: iso_match = ISO_TIMESTAMP_RE.search(line) if iso_match: fraction = iso_match.group(3) or "" raw = f"{iso_match.group(1)} {iso_match.group(2)}" parse_value = raw fmt = "%Y-%m-%d %H:%M:%S" if fraction: parse_value = f"{raw}.{fraction[1:].ljust(6, '0')[:6]}" fmt = "%Y-%m-%d %H:%M:%S.%f" try: return datetime.strptime(parse_value, fmt), raw + fraction except ValueError: return None, UNKNOWN syslog_match = SYSLOG_TIMESTAMP_RE.search(line) if syslog_match: raw = syslog_match.group(1) try: parsed = datetime.strptime(f"{syslog_year} {raw}", "%Y %b %d %H:%M:%S") except ValueError: return None, UNKNOWN return parsed, raw return None, UNKNOWN def line_in_time_window( parsed_at: datetime | None, since: datetime | None, until: datetime | None ) -> bool: if parsed_at is None: return True if since is not None and parsed_at < since: return False if until is not None and parsed_at > until: return False return True def render_seen(value: tuple[datetime, str] | None) -> str: if value is None: return UNKNOWN return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S") def update_seen(group: dict[str, Any], parsed_at: datetime | None, rendered_at: str) -> None: if parsed_at is None: return if group["first_seen"] is None or parsed_at < group["first_seen"][0]: group["first_seen"] = (parsed_at, rendered_at) if group["last_seen"] is None or parsed_at > group["last_seen"][0]: group["last_seen"] = (parsed_at, rendered_at) def append_limited(items: list[str], value: str, limit: int) -> None: if limit == 0: return if value in items: return if len(items) < limit: items.append(value) def normalize_service_name(value: str) -> str: stripped = value.strip() if not stripped: return UNKNOWN return stripped def extract_service_info(line: str, pattern_item: dict[str, Any]) -> dict[str, str]: unit_match = UNIT_RE.search(line) any_unit_match = ANY_UNIT_RE.search(line) prefix_match = PREFIX_RE.search(line) killed_match = KILLED_PROCESS_RE.search(line) triggered_match = SYSTEMD_TRIGGER_RE.search(line) pid_match = PID_RE.search(line) unit = UNKNOWN process = UNKNOWN pid = UNKNOWN if unit_match: unit = unit_match.group(1) elif any_unit_match: unit = any_unit_match.group(1) if prefix_match: process = prefix_match.group("proc") or UNKNOWN pid = prefix_match.group("pid") or UNKNOWN if killed_match: process = normalize_service_name(killed_match.group(1)) if pid == UNKNOWN and pid_match: pid = pid_match.group(1) if unit == UNKNOWN and process == "systemd": failed_start_match = SYSTEMD_FAILED_START_RE.search(line) if failed_start_match: unit = normalize_service_name( failed_start_match.group(1).strip().replace(" ", "-") ) if not unit.endswith(".service"): unit = f"{unit}.service" if unit == UNKNOWN and triggered_match: unit = triggered_match.group(1) service = UNKNOWN if unit != UNKNOWN: service = unit elif process != UNKNOWN: service = process elif pattern_item.get("service_hint"): service = pattern_item["service_hint"] return { "service": service, "unit": unit, "process": process, "pid": pid, } def service_filter_matches(service_filter: str | None, service_info: dict[str, str], line: str) -> bool: if not service_filter: return True needle = service_filter.lower() candidates = [line.lower()] for key in ("service", "unit", "process"): value = service_info.get(key, UNKNOWN) if value != UNKNOWN: candidates.append(value.lower()) return any(needle in candidate for candidate in candidates) def severity_filter_matches(selected: str | None, severity: str) -> bool: if selected is None: return True return selected.upper() == severity def detect_failed_unit(line: str, service_info: dict[str, str], category: str) -> str | None: if category not in {"failed_unit", "dependency_failure"}: return None if service_info["unit"] != UNKNOWN: return service_info["unit"] match = ANY_UNIT_RE.search(line) if match: return match.group(1) return None def analyze_log( lines: list[str], patterns: list[dict[str, Any]], since: datetime | None, until: datetime | None, service_filter: str | None, severity_filter: str | None, top: int, max_samples: int, ) -> dict[str, Any]: syslog_year = since.year if since is not None else datetime.now().year groups: dict[str, dict[str, Any]] = {} total_lines_scanned = 0 parsed_timestamps = 0 unknown_timestamps = 0 top_services = Counter() top_categories = Counter() failed_units = Counter() restart_findings = 0 oom_findings = 0 filesystem_findings = 0 for line in lines: parsed_at, rendered_at = parse_line_timestamp(line, syslog_year) total_lines_scanned += 1 if parsed_at is not None: parsed_timestamps += 1 else: unknown_timestamps += 1 if not line_in_time_window(parsed_at, since, until): continue matched_items = [item for item in patterns if item["regex"].search(line)] if matched_items: has_specific_match = any( item["name"] not in {"failed", "warning"} for item in matched_items ) if has_specific_match: matched_items = [ item for item in matched_items if item["name"] not in {"failed", "warning"} ] for item in matched_items: if not severity_filter_matches(severity_filter, item["severity"]): continue service_info = extract_service_info(line, item) if not service_filter_matches(service_filter, service_info, line): continue key = ( f"{service_info['service']}::{item['name']}::{item['category']}::{item['severity']}" ) group = groups.setdefault( key, { "service": service_info["service"], "unit": service_info["unit"], "process": service_info["process"], "pid": service_info["pid"], "category": item["category"], "pattern": item["name"], "severity": item["severity"], "occurrences": 0, "first_seen": None, "last_seen": None, "samples": [], }, ) group["occurrences"] += 1 update_seen(group, parsed_at, rendered_at) append_limited(group["samples"], line, max_samples) top_services[group["service"]] += 1 top_categories[group["category"]] += 1 failed_unit = detect_failed_unit(line, service_info, item["category"]) if failed_unit: failed_units[failed_unit] += 1 if item["category"] == "restart": restart_findings += 1 if item["category"] == "oom": oom_findings += 1 if item["category"] == "disk_filesystem": filesystem_findings += 1 findings = sorted( groups.values(), key=lambda item: ( SEVERITY_ORDER[item["severity"]], -item["occurrences"], item["service"].lower(), item["category"].lower(), ), ) rendered_findings = [] for group in findings: rendered_findings.append( { "service": group["service"], "unit": group["unit"], "process": group["process"], "pid": group["pid"], "category": group["category"], "pattern": group["pattern"], "severity": group["severity"], "occurrences": group["occurrences"], "first_seen": render_seen(group["first_seen"]), "last_seen": render_seen(group["last_seen"]), "samples": group["samples"], } ) critical_groups = sum(1 for item in rendered_findings if item["severity"] == "CRITICAL") warning_groups = sum(1 for item in rendered_findings if item["severity"] == "WARNING") overall_status = "OK" if critical_groups > 0: overall_status = "CRITICAL" elif warning_groups > 0: overall_status = "WARNING" displayed_findings = rendered_findings[:top] return { "overall_status": overall_status, "total_lines_scanned": total_lines_scanned, "total_findings": sum(item["occurrences"] for item in rendered_findings), "critical_finding_groups": critical_groups, "warning_finding_groups": warning_groups, "affected_services_count": len([name for name in top_services if name != UNKNOWN]), "top_affected_services": [ {"service": name, "count": count} for name, count in top_services.most_common(top) ], "top_categories": [ {"category": name, "count": count} for name, count in top_categories.most_common(top) ], "failed_units": [ {"unit": name, "count": count} for name, count in failed_units.most_common(top) ], "restart_findings": restart_findings, "oom_findings": oom_findings, "filesystem_disk_findings": filesystem_findings, "timestamp_coverage": { "parsed_timestamps_count": parsed_timestamps, "unknown_timestamps_count": unknown_timestamps, }, "filters_used": { "service": service_filter or None, "severity": severity_filter or None, "since": since.strftime("%Y-%m-%d %H:%M:%S") if since else None, "until": until.strftime("%Y-%m-%d %H:%M:%S") if until else None, }, "finding_groups": displayed_findings, "finding_groups_total": len(rendered_findings), } def render_top_pairs(items: list[dict[str, Any]], key: str) -> str: if not items: return "None" return ", ".join(f"{item[key]} ({item['count']})" for item in items) def render_text(report: dict[str, Any]) -> str: lines = [ "Journal Analyzer", "================", "", f"Overall status: {report['overall_status']}", "Journal findings require review; logs alone do not prove root cause.", "", ] if report["finding_groups"]: for finding in report["finding_groups"]: lines.extend( [ f"[{finding['severity']}] {finding['service']} - {finding['category']}", f"Pattern: {finding['pattern']}", f"Occurrences: {finding['occurrences']}", f"Unit: {finding['unit']}", f"Process: {finding['process']}", f"PID: {finding['pid']}", f"First seen: {finding['first_seen']}", f"Last seen: {finding['last_seen']}", "Samples:", ] ) if finding["samples"]: for sample in finding["samples"]: lines.append(f" - {sample}") else: lines.append(" - None") lines.append("") else: lines.extend(["No journal findings detected for the selected filters.", ""]) lines.extend( [ "Operational Summary", "-------------------", f"Overall status: {report['overall_status']}", f"Total lines scanned: {report['total_lines_scanned']}", f"Total findings: {report['total_findings']}", f"Critical finding groups: {report['critical_finding_groups']}", f"Warning finding groups: {report['warning_finding_groups']}", f"Affected services/units count: {report['affected_services_count']}", "Top affected services/units: " + render_top_pairs(report["top_affected_services"], "service"), "Top finding categories: " + render_top_pairs(report["top_categories"], "category"), "Failed unit findings: " + render_top_pairs(report["failed_units"], "unit"), f"Restart findings: {report['restart_findings']}", f"OOM findings: {report['oom_findings']}", f"Filesystem/disk findings: {report['filesystem_disk_findings']}", "Timestamp coverage: " f"parsed={report['timestamp_coverage']['parsed_timestamps_count']}, " f"unknown={report['timestamp_coverage']['unknown_timestamps_count']}", "Filters used: " f"service={report['filters_used']['service'] or 'None'}, " f"severity={report['filters_used']['severity'] or 'None'}, " f"since={report['filters_used']['since'] or 'None'}, " f"until={report['filters_used']['until'] or 'None'}", ] ) return "\n".join(lines) def render_markdown(report: dict[str, Any]) -> str: lines = [ "# Journal Analyzer Report", "", f"- Overall status: `{report['overall_status']}`", "- Journal findings require review; logs alone do not prove root cause.", "", ] if report["finding_groups"]: lines.append("## Finding Groups") lines.append("") for finding in report["finding_groups"]: lines.extend( [ f"### [{finding['severity']}] {finding['service']} - {finding['category']}", "", f"- Pattern: `{finding['pattern']}`", f"- Occurrences: `{finding['occurrences']}`", f"- Unit: `{finding['unit']}`", f"- Process: `{finding['process']}`", f"- PID: `{finding['pid']}`", f"- First seen: `{finding['first_seen']}`", f"- Last seen: `{finding['last_seen']}`", "- Samples:", ] ) if finding["samples"]: for sample in finding["samples"]: lines.append(f" - `{sample}`") else: lines.append(" - `None`") lines.append("") else: lines.extend(["## Finding Groups", "", "No journal findings detected for the selected filters.", ""]) lines.extend( [ "## Operational Summary", "", f"- Overall status: `{report['overall_status']}`", f"- Total lines scanned: `{report['total_lines_scanned']}`", f"- Total findings: `{report['total_findings']}`", f"- Critical finding groups: `{report['critical_finding_groups']}`", f"- Warning finding groups: `{report['warning_finding_groups']}`", f"- Affected services/units count: `{report['affected_services_count']}`", "- Top affected services/units: " + (render_top_pairs(report["top_affected_services"], "service") or "None"), "- Top finding categories: " + (render_top_pairs(report["top_categories"], "category") or "None"), "- Failed unit findings: " + (render_top_pairs(report["failed_units"], "unit") or "None"), f"- Restart findings: `{report['restart_findings']}`", f"- OOM findings: `{report['oom_findings']}`", f"- Filesystem/disk findings: `{report['filesystem_disk_findings']}`", "- Timestamp coverage: " f"parsed=`{report['timestamp_coverage']['parsed_timestamps_count']}`, " f"unknown=`{report['timestamp_coverage']['unknown_timestamps_count']}`", "- Filters used: " f"service=`{report['filters_used']['service'] or 'None'}`, " f"severity=`{report['filters_used']['severity'] or 'None'}`, " f"since=`{report['filters_used']['since'] or 'None'}`, " f"until=`{report['filters_used']['until'] or 'None'}`", ] ) return "\n".join(lines) def render_json(report: dict[str, Any]) -> str: return json.dumps(report, indent=2) def write_output(text: str, output_path: str | None, input_path: Path) -> None: if output_path is None: print(text) return destination = Path(output_path) try: if destination.exists() and destination.resolve() == input_path.resolve(): raise OSError("output path must not overwrite the input log file") except OSError: pass try: destination.write_text(text + ("\n" if not text.endswith("\n") else ""), encoding="utf-8") except OSError as exc: raise OSError(f"unable to write report to {destination}: {exc}") from exc def determine_exit_code(report: dict[str, Any]) -> int: if report["total_findings"] > 0: return EXIT_FINDINGS return EXIT_OK def main() -> int: parser = build_parser() args = parser.parse_args() try: input_path = Path(args.file) lines = read_log_file(input_path) patterns = compile_patterns(args.ignore_case) report = analyze_log( lines=lines, patterns=patterns, since=args.since, until=args.until, service_filter=args.service, severity_filter=args.severity.upper() if args.severity else None, top=args.top, max_samples=args.max_samples, ) if args.format == "text": rendered = render_text(report) elif args.format == "markdown": rendered = render_markdown(report) else: rendered = render_json(report) write_output(rendered, args.output, input_path) return determine_exit_code(report) except (OSError, ValueError) as exc: print(f"ERROR: {exc}", file=sys.stderr) return EXIT_INVALID except Exception as exc: # pragma: no cover - defensive operational fallback print(f"ERROR: unexpected runtime failure: {exc}", file=sys.stderr) return EXIT_INVALID if __name__ == "__main__": sys.exit(main())