#!/usr/bin/env python3 """Analyze JVM and Java application logs for operational findings.""" from __future__ import annotations import argparse import json import re import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Any EXIT_OK = 0 EXIT_FINDINGS = 1 EXIT_INVALID = 2 UNKNOWN = "UNKNOWN" SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1} CRITICAL_PATTERNS = [ {"name": "OutOfMemoryError", "pattern": "OutOfMemoryError", "symptom": "jvm_memory"}, {"name": "Java heap space", "pattern": "Java heap space", "symptom": "jvm_memory"}, {"name": "GC overhead limit exceeded", "pattern": "GC overhead limit exceeded", "symptom": "jvm_memory"}, {"name": "StackOverflowError", "pattern": "StackOverflowError", "symptom": "jvm_stack"}, {"name": "NoClassDefFoundError", "pattern": "NoClassDefFoundError", "symptom": "class_loading"}, {"name": "ClassNotFoundException", "pattern": "ClassNotFoundException", "symptom": "class_loading"}, {"name": "ExceptionInInitializerError", "pattern": "ExceptionInInitializerError", "symptom": "class_loading"}, {"name": "SSLHandshakeException", "pattern": "SSLHandshakeException", "symptom": "tls_certificate"}, {"name": "CertificateExpiredException", "pattern": "CertificateExpiredException", "symptom": "tls_certificate"}, {"name": "SQLException", "pattern": "SQLException", "symptom": "database"}, {"name": "SQLRecoverableException", "pattern": "SQLRecoverableException", "symptom": "database"}, {"name": "CommunicationsException", "pattern": "CommunicationsException", "symptom": "database"}, {"name": "database unavailable", "pattern": "database unavailable", "symptom": "database"}, {"name": "connection pool exhausted", "pattern": "connection pool exhausted", "symptom": "database"}, {"name": "FATAL", "pattern": "FATAL", "symptom": "fatal"}, ] WARNING_PATTERNS = [ {"name": "NullPointerException", "pattern": "NullPointerException", "symptom": "application_exception"}, {"name": "IllegalArgumentException", "pattern": "IllegalArgumentException", "symptom": "application_exception"}, {"name": "IllegalStateException", "pattern": "IllegalStateException", "symptom": "application_exception"}, {"name": "SocketTimeoutException", "pattern": "SocketTimeoutException", "symptom": "network_timeout"}, {"name": "ConnectException", "pattern": "ConnectException", "symptom": "network_connectivity"}, {"name": "TimeoutException", "pattern": "TimeoutException", "symptom": "network_timeout"}, {"name": "connection refused", "pattern": "connection refused", "symptom": "network_connectivity"}, {"name": "connection reset", "pattern": "connection reset", "symptom": "network_connectivity"}, {"name": "Broken pipe", "pattern": "Broken pipe", "symptom": "network_connectivity"}, {"name": "WARN", "pattern": "WARN", "symptom": "log_level"}, {"name": "ERROR", "pattern": "ERROR", "symptom": "log_level"}, {"name": "retrying", "pattern": "retrying", "symptom": "retry"}, {"name": "slow query", "pattern": "slow query", "symptom": "database"}, {"name": "deadlock detected", "pattern": "deadlock detected", "symptom": "database"}, ] HTTP_PATTERNS = [ {"name": "HTTP 500", "pattern": "HTTP 500", "symptom": "http_5xx"}, {"name": "HTTP 502", "pattern": "HTTP 502", "symptom": "http_5xx"}, {"name": "HTTP 503", "pattern": "HTTP 503", "symptom": "http_5xx"}, {"name": "HTTP 504", "pattern": "HTTP 504", "symptom": "http_5xx"}, ] ISO_TIMESTAMP_RE = re.compile( r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})([,.]\d{1,6})?\b" ) SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b") LEVEL_RE = re.compile(r"\b(TRACE|DEBUG|INFO|WARN|ERROR|FATAL)\b") SPRING_LOGGER_RE = re.compile(r"\s---\s+\[[^\]]+\]\s+([A-Za-z0-9_.$-]+)\s*:") GENERIC_LOGGER_RE = re.compile( r"\b(?:TRACE|DEBUG|INFO|WARN|ERROR|FATAL)\b\s+(?:\d+\s+)?([A-Za-z0-9_.$-]+)\s*:" ) THREAD_RE = re.compile(r"\[([^\]]+)\]") SPRING_THREAD_RE = re.compile(r"\s---\s+\[([^\]]+)\]") EXCEPTION_RE = re.compile( r"\b((?:[A-Za-z_$][\w$]*\.)+[A-Za-z_$][\w$]*(?:Exception|Error)|[A-Za-z_$][\w$]*(?:Exception|Error))\b" ) STACK_FRAME_RE = re.compile(r"^\s+at\s+") CAUSED_BY_RE = re.compile(r"^\s*Caused by:\s+") MORE_RE = re.compile(r"^\s*\.\.\.\s+\d+\s+more\b") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Analyze local JVM and Java application logs for operational findings." ) parser.add_argument("--file", required=True, help="Local JVM or Java application log to analyze.") parser.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Report format. Default: text.", ) parser.add_argument("--output", help="Write report to this path instead of stdout.") parser.add_argument( "--top", type=positive_int, default=10, help="Number of top finding groups, exception types, and symptoms to display. Default: 10.", ) parser.add_argument( "--max-samples", type=non_negative_int, default=3, help="Maximum sample lines per finding group. Default: 3.", ) parser.add_argument( "--include-stacktraces", action="store_true", help="Include short multiline stack trace samples in text and Markdown reports.", ) parser.add_argument( "--max-stack-lines", type=positive_int, default=12, help="Maximum lines retained per stack trace sample. Default: 12.", ) parser.add_argument( "--http-critical-threshold", type=positive_int, default=5, help="HTTP 5xx count that raises HTTP findings to CRITICAL. Default: 5.", ) parser.add_argument( "--ignore-case", action="store_true", help="Match configured patterns case-insensitively.", ) parser.add_argument( "--since", type=parse_filter_timestamp, help='Include lines at or after "YYYY-MM-DD HH:MM:SS".', ) parser.add_argument( "--until", type=parse_filter_timestamp, help='Include lines at or before "YYYY-MM-DD HH:MM:SS".', ) return parser def positive_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be a positive integer") from exc if number <= 0: raise argparse.ArgumentTypeError("must be a positive integer") return number def non_negative_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc if number < 0: raise argparse.ArgumentTypeError("must be zero or a positive integer") return number def parse_filter_timestamp(value: str) -> datetime: for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(value, fmt) except ValueError: continue raise argparse.ArgumentTypeError('expected timestamp format "YYYY-MM-DD HH:MM:SS"') def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]: flags = re.IGNORECASE if ignore_case else 0 compiled = [] for item in CRITICAL_PATTERNS: compiled.append({**item, "severity": "CRITICAL", "kind": "pattern", "regex": re.compile(re.escape(item["pattern"]), flags)}) for item in WARNING_PATTERNS: compiled.append({**item, "severity": "WARNING", "kind": "pattern", "regex": re.compile(re.escape(item["pattern"]), flags)}) for item in HTTP_PATTERNS: compiled.append({**item, "severity": "WARNING", "kind": "http_5xx", "regex": re.compile(re.escape(item["pattern"]), flags)}) return compiled def read_log_file(path: Path) -> list[str]: if not path.exists(): raise OSError(f"file does not exist: {path}") if not path.is_file(): raise OSError(f"path is not a regular file: {path}") try: text = path.read_text(encoding="utf-8", errors="replace") except PermissionError as exc: raise OSError(f"file is not readable: {path}") from exc except OSError as exc: raise OSError(f"unable to read file {path}: {exc}") from exc if text == "": raise ValueError(f"file is empty: {path}") return text.splitlines() def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str]: iso_match = ISO_TIMESTAMP_RE.search(line) if iso_match: fraction = iso_match.group(3) or "" raw = f"{iso_match.group(1)} {iso_match.group(2)}" parse_value = raw fmt = "%Y-%m-%d %H:%M:%S" if fraction: parse_value = f"{raw}.{fraction[1:].ljust(6, '0')[:6]}" fmt = "%Y-%m-%d %H:%M:%S.%f" try: return datetime.strptime(parse_value, fmt), raw + fraction except ValueError: return None, UNKNOWN syslog_match = SYSLOG_TIMESTAMP_RE.search(line) if syslog_match: raw = syslog_match.group(1) try: parsed = datetime.strptime(f"{syslog_year} {raw}", "%Y %b %d %H:%M:%S") except ValueError: return None, UNKNOWN return parsed, raw return None, UNKNOWN def line_in_time_window( parsed_at: datetime | None, since: datetime | None, until: datetime | None ) -> bool: if parsed_at is None: return True if since is not None and parsed_at < since: return False if until is not None and parsed_at > until: return False return True def render_seen(value: tuple[datetime, str] | None) -> str: if value is None: return UNKNOWN return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S") def extract_level(line: str) -> str: match = LEVEL_RE.search(line) if match: return match.group(1) return UNKNOWN def extract_thread(line: str) -> str: for regex in (SPRING_THREAD_RE, THREAD_RE): match = regex.search(line) if match: return match.group(1) return UNKNOWN def extract_logger(line: str) -> str: for regex in (SPRING_LOGGER_RE, GENERIC_LOGGER_RE): match = regex.search(line) if match: return match.group(1) return UNKNOWN def normalize_exception_type(value: str) -> str: return value.split(".")[-1] def extract_exception_type(line: str) -> str: match = EXCEPTION_RE.search(line) if match: return normalize_exception_type(match.group(1)) return UNKNOWN def is_stack_start(line: str) -> bool: return ( "Exception in thread" in line or CAUSED_BY_RE.search(line) is not None or EXCEPTION_RE.search(line) is not None ) def is_stack_continuation(line: str) -> bool: return ( STACK_FRAME_RE.search(line) is not None or CAUSED_BY_RE.search(line) is not None or MORE_RE.search(line) is not None ) def update_seen( group: dict[str, Any], parsed_at: datetime | None, rendered_at: str ) -> None: if parsed_at is None: return if group["first_seen"] is None or parsed_at < group["first_seen"][0]: group["first_seen"] = (parsed_at, rendered_at) if group["last_seen"] is None or parsed_at > group["last_seen"][0]: group["last_seen"] = (parsed_at, rendered_at) def append_limited(items: list[Any], value: Any, limit: int) -> None: if limit == 0: return if value in items: return if len(items) < limit: items.append(value) def finding_key(severity: str, name: str) -> str: return f"{severity}::{name}" def ensure_group( groups: dict[str, dict[str, Any]], name: str, severity: str, symptom: str, kind: str, ) -> dict[str, Any]: key = finding_key(severity, name) return groups.setdefault( key, { "name": name, "severity": severity, "symptom": symptom, "kind": kind, "occurrences": 0, "stack_trace_count": 0, "first_seen": None, "last_seen": None, "samples": [], "stack_trace_samples": [], "fields": [], }, ) def add_finding( groups: dict[str, dict[str, Any]], name: str, severity: str, symptom: str, kind: str, line: str, parsed_at: datetime | None, rendered_at: str, max_samples: int, ) -> dict[str, Any]: group = ensure_group(groups, name, severity, symptom, kind) group["occurrences"] += 1 update_seen(group, parsed_at, rendered_at) append_limited(group["samples"], line, max_samples) append_limited( group["fields"], { "timestamp": rendered_at, "log_level": extract_level(line), "logger": extract_logger(line), "thread": extract_thread(line), "exception_type": extract_exception_type(line), "raw": line, }, max_samples, ) return group def record_stack_trace( groups: dict[str, dict[str, Any]], stack: dict[str, Any], max_samples: int, max_stack_lines: int, ) -> None: exception_type = stack["exception_type"] if stack["exception_type"] != UNKNOWN else "Java stack trace" severity = severity_for_exception(exception_type) group = ensure_group(groups, exception_type, severity, "stack_trace", "stack_trace") group["stack_trace_count"] += 1 update_seen(group, stack["parsed_at"], stack["rendered_at"]) append_limited(group["samples"], stack["lines"][0], max_samples) append_limited(group["stack_trace_samples"], stack["lines"][:max_stack_lines], max_samples) def severity_for_exception(exception_type: str) -> str: critical = {item["name"] for item in CRITICAL_PATTERNS} if exception_type in critical or exception_type in {"OutOfMemoryError", "StackOverflowError"}: return "CRITICAL" return "WARNING" def detect_stack_traces( included: list[dict[str, Any]], groups: dict[str, dict[str, Any]], max_samples: int, max_stack_lines: int, ) -> int: stack: dict[str, Any] | None = None stack_count = 0 for item in included: line = item["line"] if stack is None: if is_stack_start(line): stack = { "lines": [line], "exception_type": extract_exception_type(line), "parsed_at": item["parsed_at"], "rendered_at": item["rendered_at"], } continue if is_stack_continuation(line): stack["lines"].append(line) if stack["exception_type"] == UNKNOWN: stack["exception_type"] = extract_exception_type(line) continue if len(stack["lines"]) > 1: record_stack_trace(groups, stack, max_samples, max_stack_lines) stack_count += 1 stack = None if is_stack_start(line): stack = { "lines": [line], "exception_type": extract_exception_type(line), "parsed_at": item["parsed_at"], "rendered_at": item["rendered_at"], } if stack is not None and len(stack["lines"]) > 1: record_stack_trace(groups, stack, max_samples, max_stack_lines) stack_count += 1 return stack_count def analyze_log( lines: list[str], patterns: list[dict[str, Any]], since: datetime | None, until: datetime | None, top: int, max_samples: int, max_stack_lines: int, http_critical_threshold: int, ) -> dict[str, Any]: syslog_year = since.year if since is not None else datetime.now().year groups: dict[str, dict[str, Any]] = {} exception_counts: Counter[str] = Counter() symptom_counts: Counter[str] = Counter() parsed_timestamps = 0 unknown_timestamps = 0 included: list[dict[str, Any]] = [] http_5xx_count = 0 context_parsed_at: datetime | None = None context_rendered_at = UNKNOWN for line in lines: parsed_at, rendered_at = parse_line_timestamp(line, syslog_year) if parsed_at is None: unknown_timestamps += 1 else: parsed_timestamps += 1 context_parsed_at = parsed_at context_rendered_at = rendered_at if not line_in_time_window(parsed_at, since, until): continue # Stack trace frames often omit timestamps; keep nearby log context for first/last seen. effective_parsed_at = parsed_at effective_rendered_at = rendered_at if parsed_at is None and (is_stack_start(line) or is_stack_continuation(line)): effective_parsed_at = context_parsed_at effective_rendered_at = context_rendered_at included.append( { "line": line, "parsed_at": effective_parsed_at, "rendered_at": effective_rendered_at, } ) matched_names = set() for item in patterns: if not item["regex"].search(line): continue severity = item["severity"] if item["kind"] == "http_5xx": http_5xx_count += 1 add_finding( groups=groups, name=item["name"], severity=severity, symptom=item["symptom"], kind=item["kind"], line=line, parsed_at=effective_parsed_at, rendered_at=effective_rendered_at, max_samples=max_samples, ) symptom_counts[item["symptom"]] += 1 matched_names.add(item["name"]) exception_type = extract_exception_type(line) if exception_type != UNKNOWN: exception_counts[exception_type] += 1 if exception_type not in matched_names: severity = severity_for_exception(exception_type) add_finding( groups=groups, name=exception_type, severity=severity, symptom="application_exception", kind="exception", line=line, parsed_at=effective_parsed_at, rendered_at=effective_rendered_at, max_samples=max_samples, ) symptom_counts["application_exception"] += 1 stack_trace_count = detect_stack_traces(included, groups, max_samples, max_stack_lines) promote_http_5xx(groups, http_5xx_count, http_critical_threshold) findings = sorted( (render_group(group) for group in groups.values()), key=lambda item: ( SEVERITY_ORDER[item["severity"]], -item["occurrences"], item["name"].lower(), ), ) summary = build_summary( total_lines=len(lines), findings=findings, stack_trace_count=stack_trace_count, http_5xx_count=http_5xx_count, parsed_timestamps=parsed_timestamps, unknown_timestamps=unknown_timestamps, ) return { "summary": summary, "findings": findings[:top], "top_exception_types": top_items(exception_counts, top), "top_operational_symptoms": top_items(symptom_counts, top), } def promote_http_5xx( groups: dict[str, dict[str, Any]], http_5xx_count: int, threshold: int ) -> None: if http_5xx_count < threshold: return http_names = {item["name"] for item in HTTP_PATTERNS} for old_key, group in list(groups.items()): if group["name"] not in http_names or group["severity"] == "CRITICAL": continue group["severity"] = "CRITICAL" new_key = finding_key("CRITICAL", group["name"]) groups[new_key] = group del groups[old_key] def render_group(group: dict[str, Any]) -> dict[str, Any]: return { "name": group["name"], "severity": group["severity"], "symptom": group["symptom"], "kind": group["kind"], "occurrences": group["occurrences"], "stack_trace_count": group["stack_trace_count"], "first_seen": render_seen(group["first_seen"]), "last_seen": render_seen(group["last_seen"]), "samples": group["samples"], "stack_trace_samples": group["stack_trace_samples"], "fields": group["fields"], } def build_summary( total_lines: int, findings: list[dict[str, Any]], stack_trace_count: int, http_5xx_count: int, parsed_timestamps: int, unknown_timestamps: int, ) -> dict[str, Any]: critical_groups = sum(1 for item in findings if item["severity"] == "CRITICAL") warning_groups = sum(1 for item in findings if item["severity"] == "WARNING") total_findings = sum(item["occurrences"] for item in findings) if critical_groups > 0: status = "CRITICAL" elif warning_groups > 0: status = "WARNING" else: status = "OK" return { "overall_status": status, "total_lines_scanned": total_lines, "total_findings": total_findings, "total_stack_traces_detected": stack_trace_count, "critical_finding_groups": critical_groups, "warning_finding_groups": warning_groups, "http_5xx_count": http_5xx_count, "timestamp_coverage": { "parsed_timestamps_count": parsed_timestamps, "unknown_timestamps_count": unknown_timestamps, }, } def top_items(counter: Counter[str], limit: int) -> list[dict[str, Any]]: return [{"value": value, "count": count} for value, count in counter.most_common(limit)] def render_text(report: dict[str, Any], include_stacktraces: bool) -> str: lines = ["JVM Log Analyzer", "================", ""] summary = report["summary"] lines.extend( [ f"Overall status: {summary['overall_status']}", "Findings require review; logs alone do not prove root cause.", "", ] ) if not report["findings"]: lines.extend(["No configured JVM/application findings were detected.", ""]) else: for finding in report["findings"]: lines.extend( [ f"[{finding['severity']}] {finding['name']}", f"Occurrences: {finding['occurrences']}", f"Symptom: {finding['symptom']}", f"First seen: {finding['first_seen']}", f"Last seen: {finding['last_seen']}", f"Stack traces linked: {finding['stack_trace_count']}", "Samples:", ] ) if finding["samples"]: lines.extend(f" - {sample}" for sample in finding["samples"]) else: lines.append(" - No samples retained") if include_stacktraces and finding["stack_trace_samples"]: lines.append("Stack trace samples:") for stack in finding["stack_trace_samples"]: lines.append(" ---") lines.extend(f" {entry}" for entry in stack) lines.append("") lines.extend(render_text_table("Top Exception Types", report["top_exception_types"])) lines.extend(render_text_table("Top Operational Symptoms", report["top_operational_symptoms"])) lines.extend(render_text_summary(summary)) return "\n".join(lines) + "\n" def render_text_table(title: str, rows: list[dict[str, Any]]) -> list[str]: lines = [title, "-" * len(title)] if not rows: lines.append("No entries detected.") else: lines.extend(f"- {item['value']}: {item['count']}" for item in rows) lines.append("") return lines def render_text_summary(summary: dict[str, Any]) -> list[str]: coverage = summary["timestamp_coverage"] return [ "Operational Summary", "-------------------", f"Overall status: {summary['overall_status']}", f"Total lines scanned: {summary['total_lines_scanned']}", f"Total findings: {summary['total_findings']}", f"Total stack traces detected: {summary['total_stack_traces_detected']}", f"Critical finding groups: {summary['critical_finding_groups']}", f"Warning finding groups: {summary['warning_finding_groups']}", f"HTTP 5xx count: {summary['http_5xx_count']}", f"Parsed timestamps count: {coverage['parsed_timestamps_count']}", f"Unknown timestamps count: {coverage['unknown_timestamps_count']}", ] def render_markdown(report: dict[str, Any], include_stacktraces: bool) -> str: summary = report["summary"] lines = [ "# JVM Log Analyzer", "", f"- Overall status: {summary['overall_status']}", "- Finding language is a triage summary; logs alone do not prove root cause.", "", ] if not report["findings"]: lines.extend(["No configured JVM/application findings were detected.", ""]) else: for finding in report["findings"]: lines.extend( [ f"## {finding['severity']}: {finding['name']}", "", f"- Occurrences: {finding['occurrences']}", f"- Symptom: {finding['symptom']}", f"- First seen: {finding['first_seen']}", f"- Last seen: {finding['last_seen']}", f"- Stack traces linked: {finding['stack_trace_count']}", "", "Sample log lines:", "", ] ) if finding["samples"]: lines.append("```text") lines.extend(finding["samples"]) lines.append("```") else: lines.append("_No samples retained._") lines.append("") if include_stacktraces and finding["stack_trace_samples"]: lines.extend(["Stack trace samples:", ""]) for stack in finding["stack_trace_samples"]: lines.append("```text") lines.extend(stack) lines.append("```") lines.append("") lines.extend(render_markdown_table("Top Exception Types", report["top_exception_types"])) lines.extend(render_markdown_table("Top Operational Symptoms", report["top_operational_symptoms"])) lines.extend(render_markdown_summary(summary)) return "\n".join(lines) def render_markdown_table(title: str, rows: list[dict[str, Any]]) -> list[str]: lines = [f"## {title}", ""] if not rows: lines.extend(["No entries detected.", ""]) return lines lines.extend(["| Value | Count |", "| --- | ---: |"]) lines.extend(f"| {item['value']} | {item['count']} |" for item in rows) lines.append("") return lines def render_markdown_summary(summary: dict[str, Any]) -> list[str]: coverage = summary["timestamp_coverage"] return [ "## Operational Summary", "", f"- Overall status: {summary['overall_status']}", f"- Total lines scanned: {summary['total_lines_scanned']}", f"- Total findings: {summary['total_findings']}", f"- Total stack traces detected: {summary['total_stack_traces_detected']}", f"- Critical finding groups: {summary['critical_finding_groups']}", f"- Warning finding groups: {summary['warning_finding_groups']}", f"- HTTP 5xx count: {summary['http_5xx_count']}", f"- Parsed timestamps count: {coverage['parsed_timestamps_count']}", f"- Unknown timestamps count: {coverage['unknown_timestamps_count']}", "", ] def render_json(report: dict[str, Any]) -> str: return json.dumps(report, indent=2, sort_keys=True) + "\n" def write_report(input_path: Path, output_path: str | None, content: str) -> None: if output_path is None: sys.stdout.write(content) return path = Path(output_path) try: if path.resolve() == input_path.resolve(): raise OSError("output path must not be the same as input file") path.write_text(content, encoding="utf-8") except OSError as exc: raise OSError(f"unable to write output {path}: {exc}") from exc def main() -> int: parser = build_parser() args = parser.parse_args() input_path = Path(args.file) if args.since is not None and args.until is not None and args.since > args.until: parser.error("--since must be earlier than or equal to --until") try: lines = read_log_file(input_path) report = analyze_log( lines=lines, patterns=compile_patterns(args.ignore_case), since=args.since, until=args.until, top=args.top, max_samples=args.max_samples, max_stack_lines=args.max_stack_lines, http_critical_threshold=args.http_critical_threshold, ) if args.format == "text": content = render_text(report, args.include_stacktraces) elif args.format == "markdown": content = render_markdown(report, args.include_stacktraces) else: content = render_json(report) write_report(input_path, args.output, content) except (OSError, ValueError) as exc: print(f"CRITICAL: {exc}", file=sys.stderr) return EXIT_INVALID except RuntimeError as exc: print(f"CRITICAL: runtime error: {exc}", file=sys.stderr) return EXIT_INVALID if report["summary"]["overall_status"] == "OK": return EXIT_OK return EXIT_FINDINGS if __name__ == "__main__": sys.exit(main())