Add JVM log analyzer tool

This commit is contained in:
Mateusz Suski
2026-05-11 17:05:27 +00:00
parent 2da5e8b46c
commit 89b7fabb96
4 changed files with 1301 additions and 0 deletions
@@ -0,0 +1,837 @@
#!/usr/bin/env python3
"""Analyze JVM and Java application logs for operational findings."""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any
EXIT_OK = 0
EXIT_FINDINGS = 1
EXIT_INVALID = 2
UNKNOWN = "UNKNOWN"
SEVERITY_ORDER = {"CRITICAL": 0, "WARNING": 1}
CRITICAL_PATTERNS = [
{"name": "OutOfMemoryError", "pattern": "OutOfMemoryError", "symptom": "jvm_memory"},
{"name": "Java heap space", "pattern": "Java heap space", "symptom": "jvm_memory"},
{"name": "GC overhead limit exceeded", "pattern": "GC overhead limit exceeded", "symptom": "jvm_memory"},
{"name": "StackOverflowError", "pattern": "StackOverflowError", "symptom": "jvm_stack"},
{"name": "NoClassDefFoundError", "pattern": "NoClassDefFoundError", "symptom": "class_loading"},
{"name": "ClassNotFoundException", "pattern": "ClassNotFoundException", "symptom": "class_loading"},
{"name": "ExceptionInInitializerError", "pattern": "ExceptionInInitializerError", "symptom": "class_loading"},
{"name": "SSLHandshakeException", "pattern": "SSLHandshakeException", "symptom": "tls_certificate"},
{"name": "CertificateExpiredException", "pattern": "CertificateExpiredException", "symptom": "tls_certificate"},
{"name": "SQLException", "pattern": "SQLException", "symptom": "database"},
{"name": "SQLRecoverableException", "pattern": "SQLRecoverableException", "symptom": "database"},
{"name": "CommunicationsException", "pattern": "CommunicationsException", "symptom": "database"},
{"name": "database unavailable", "pattern": "database unavailable", "symptom": "database"},
{"name": "connection pool exhausted", "pattern": "connection pool exhausted", "symptom": "database"},
{"name": "FATAL", "pattern": "FATAL", "symptom": "fatal"},
]
WARNING_PATTERNS = [
{"name": "NullPointerException", "pattern": "NullPointerException", "symptom": "application_exception"},
{"name": "IllegalArgumentException", "pattern": "IllegalArgumentException", "symptom": "application_exception"},
{"name": "IllegalStateException", "pattern": "IllegalStateException", "symptom": "application_exception"},
{"name": "SocketTimeoutException", "pattern": "SocketTimeoutException", "symptom": "network_timeout"},
{"name": "ConnectException", "pattern": "ConnectException", "symptom": "network_connectivity"},
{"name": "TimeoutException", "pattern": "TimeoutException", "symptom": "network_timeout"},
{"name": "connection refused", "pattern": "connection refused", "symptom": "network_connectivity"},
{"name": "connection reset", "pattern": "connection reset", "symptom": "network_connectivity"},
{"name": "Broken pipe", "pattern": "Broken pipe", "symptom": "network_connectivity"},
{"name": "WARN", "pattern": "WARN", "symptom": "log_level"},
{"name": "ERROR", "pattern": "ERROR", "symptom": "log_level"},
{"name": "retrying", "pattern": "retrying", "symptom": "retry"},
{"name": "slow query", "pattern": "slow query", "symptom": "database"},
{"name": "deadlock detected", "pattern": "deadlock detected", "symptom": "database"},
]
HTTP_PATTERNS = [
{"name": "HTTP 500", "pattern": "HTTP 500", "symptom": "http_5xx"},
{"name": "HTTP 502", "pattern": "HTTP 502", "symptom": "http_5xx"},
{"name": "HTTP 503", "pattern": "HTTP 503", "symptom": "http_5xx"},
{"name": "HTTP 504", "pattern": "HTTP 504", "symptom": "http_5xx"},
]
ISO_TIMESTAMP_RE = re.compile(
r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})([,.]\d{1,6})?\b"
)
SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b")
LEVEL_RE = re.compile(r"\b(TRACE|DEBUG|INFO|WARN|ERROR|FATAL)\b")
SPRING_LOGGER_RE = re.compile(r"\s---\s+\[[^\]]+\]\s+([A-Za-z0-9_.$-]+)\s*:")
GENERIC_LOGGER_RE = re.compile(
r"\b(?:TRACE|DEBUG|INFO|WARN|ERROR|FATAL)\b\s+(?:\d+\s+)?([A-Za-z0-9_.$-]+)\s*:"
)
THREAD_RE = re.compile(r"\[([^\]]+)\]")
SPRING_THREAD_RE = re.compile(r"\s---\s+\[([^\]]+)\]")
EXCEPTION_RE = re.compile(
r"\b((?:[A-Za-z_$][\w$]*\.)+[A-Za-z_$][\w$]*(?:Exception|Error)|[A-Za-z_$][\w$]*(?:Exception|Error))\b"
)
STACK_FRAME_RE = re.compile(r"^\s+at\s+")
CAUSED_BY_RE = re.compile(r"^\s*Caused by:\s+")
MORE_RE = re.compile(r"^\s*\.\.\.\s+\d+\s+more\b")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Analyze local JVM and Java application logs for operational findings."
)
parser.add_argument("--file", required=True, help="Local JVM or Java application log to analyze.")
parser.add_argument(
"--format",
choices=("text", "markdown", "json"),
default="text",
help="Report format. Default: text.",
)
parser.add_argument("--output", help="Write report to this path instead of stdout.")
parser.add_argument(
"--top",
type=positive_int,
default=10,
help="Number of top finding groups, exception types, and symptoms to display. Default: 10.",
)
parser.add_argument(
"--max-samples",
type=non_negative_int,
default=3,
help="Maximum sample lines per finding group. Default: 3.",
)
parser.add_argument(
"--include-stacktraces",
action="store_true",
help="Include short multiline stack trace samples in text and Markdown reports.",
)
parser.add_argument(
"--max-stack-lines",
type=positive_int,
default=12,
help="Maximum lines retained per stack trace sample. Default: 12.",
)
parser.add_argument(
"--http-critical-threshold",
type=positive_int,
default=5,
help="HTTP 5xx count that raises HTTP findings to CRITICAL. Default: 5.",
)
parser.add_argument(
"--ignore-case",
action="store_true",
help="Match configured patterns case-insensitively.",
)
parser.add_argument(
"--since",
type=parse_filter_timestamp,
help='Include lines at or after "YYYY-MM-DD HH:MM:SS".',
)
parser.add_argument(
"--until",
type=parse_filter_timestamp,
help='Include lines at or before "YYYY-MM-DD HH:MM:SS".',
)
return parser
def positive_int(value: str) -> int:
try:
number = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be a positive integer") from exc
if number <= 0:
raise argparse.ArgumentTypeError("must be a positive integer")
return number
def non_negative_int(value: str) -> int:
try:
number = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc
if number < 0:
raise argparse.ArgumentTypeError("must be zero or a positive integer")
return number
def parse_filter_timestamp(value: str) -> datetime:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
raise argparse.ArgumentTypeError('expected timestamp format "YYYY-MM-DD HH:MM:SS"')
def compile_patterns(ignore_case: bool) -> list[dict[str, Any]]:
flags = re.IGNORECASE if ignore_case else 0
compiled = []
for item in CRITICAL_PATTERNS:
compiled.append({**item, "severity": "CRITICAL", "kind": "pattern", "regex": re.compile(re.escape(item["pattern"]), flags)})
for item in WARNING_PATTERNS:
compiled.append({**item, "severity": "WARNING", "kind": "pattern", "regex": re.compile(re.escape(item["pattern"]), flags)})
for item in HTTP_PATTERNS:
compiled.append({**item, "severity": "WARNING", "kind": "http_5xx", "regex": re.compile(re.escape(item["pattern"]), flags)})
return compiled
def read_log_file(path: Path) -> list[str]:
if not path.exists():
raise OSError(f"file does not exist: {path}")
if not path.is_file():
raise OSError(f"path is not a regular file: {path}")
try:
text = path.read_text(encoding="utf-8", errors="replace")
except PermissionError as exc:
raise OSError(f"file is not readable: {path}") from exc
except OSError as exc:
raise OSError(f"unable to read file {path}: {exc}") from exc
if text == "":
raise ValueError(f"file is empty: {path}")
return text.splitlines()
def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str]:
iso_match = ISO_TIMESTAMP_RE.search(line)
if iso_match:
fraction = iso_match.group(3) or ""
raw = f"{iso_match.group(1)} {iso_match.group(2)}"
parse_value = raw
fmt = "%Y-%m-%d %H:%M:%S"
if fraction:
parse_value = f"{raw}.{fraction[1:].ljust(6, '0')[:6]}"
fmt = "%Y-%m-%d %H:%M:%S.%f"
try:
return datetime.strptime(parse_value, fmt), raw + fraction
except ValueError:
return None, UNKNOWN
syslog_match = SYSLOG_TIMESTAMP_RE.search(line)
if syslog_match:
raw = syslog_match.group(1)
try:
parsed = datetime.strptime(f"{syslog_year} {raw}", "%Y %b %d %H:%M:%S")
except ValueError:
return None, UNKNOWN
return parsed, raw
return None, UNKNOWN
def line_in_time_window(
parsed_at: datetime | None, since: datetime | None, until: datetime | None
) -> bool:
if parsed_at is None:
return True
if since is not None and parsed_at < since:
return False
if until is not None and parsed_at > until:
return False
return True
def render_seen(value: tuple[datetime, str] | None) -> str:
if value is None:
return UNKNOWN
return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S")
def extract_level(line: str) -> str:
match = LEVEL_RE.search(line)
if match:
return match.group(1)
return UNKNOWN
def extract_thread(line: str) -> str:
for regex in (SPRING_THREAD_RE, THREAD_RE):
match = regex.search(line)
if match:
return match.group(1)
return UNKNOWN
def extract_logger(line: str) -> str:
for regex in (SPRING_LOGGER_RE, GENERIC_LOGGER_RE):
match = regex.search(line)
if match:
return match.group(1)
return UNKNOWN
def normalize_exception_type(value: str) -> str:
return value.split(".")[-1]
def extract_exception_type(line: str) -> str:
match = EXCEPTION_RE.search(line)
if match:
return normalize_exception_type(match.group(1))
return UNKNOWN
def is_stack_start(line: str) -> bool:
return (
"Exception in thread" in line
or CAUSED_BY_RE.search(line) is not None
or EXCEPTION_RE.search(line) is not None
)
def is_stack_continuation(line: str) -> bool:
return (
STACK_FRAME_RE.search(line) is not None
or CAUSED_BY_RE.search(line) is not None
or MORE_RE.search(line) is not None
)
def update_seen(
group: dict[str, Any], parsed_at: datetime | None, rendered_at: str
) -> None:
if parsed_at is None:
return
if group["first_seen"] is None or parsed_at < group["first_seen"][0]:
group["first_seen"] = (parsed_at, rendered_at)
if group["last_seen"] is None or parsed_at > group["last_seen"][0]:
group["last_seen"] = (parsed_at, rendered_at)
def append_limited(items: list[Any], value: Any, limit: int) -> None:
if limit == 0:
return
if value in items:
return
if len(items) < limit:
items.append(value)
def finding_key(severity: str, name: str) -> str:
return f"{severity}::{name}"
def ensure_group(
groups: dict[str, dict[str, Any]],
name: str,
severity: str,
symptom: str,
kind: str,
) -> dict[str, Any]:
key = finding_key(severity, name)
return groups.setdefault(
key,
{
"name": name,
"severity": severity,
"symptom": symptom,
"kind": kind,
"occurrences": 0,
"stack_trace_count": 0,
"first_seen": None,
"last_seen": None,
"samples": [],
"stack_trace_samples": [],
"fields": [],
},
)
def add_finding(
groups: dict[str, dict[str, Any]],
name: str,
severity: str,
symptom: str,
kind: str,
line: str,
parsed_at: datetime | None,
rendered_at: str,
max_samples: int,
) -> dict[str, Any]:
group = ensure_group(groups, name, severity, symptom, kind)
group["occurrences"] += 1
update_seen(group, parsed_at, rendered_at)
append_limited(group["samples"], line, max_samples)
append_limited(
group["fields"],
{
"timestamp": rendered_at,
"log_level": extract_level(line),
"logger": extract_logger(line),
"thread": extract_thread(line),
"exception_type": extract_exception_type(line),
"raw": line,
},
max_samples,
)
return group
def record_stack_trace(
groups: dict[str, dict[str, Any]],
stack: dict[str, Any],
max_samples: int,
max_stack_lines: int,
) -> None:
exception_type = stack["exception_type"] if stack["exception_type"] != UNKNOWN else "Java stack trace"
severity = severity_for_exception(exception_type)
group = ensure_group(groups, exception_type, severity, "stack_trace", "stack_trace")
group["stack_trace_count"] += 1
update_seen(group, stack["parsed_at"], stack["rendered_at"])
append_limited(group["samples"], stack["lines"][0], max_samples)
append_limited(group["stack_trace_samples"], stack["lines"][:max_stack_lines], max_samples)
def severity_for_exception(exception_type: str) -> str:
critical = {item["name"] for item in CRITICAL_PATTERNS}
if exception_type in critical or exception_type in {"OutOfMemoryError", "StackOverflowError"}:
return "CRITICAL"
return "WARNING"
def detect_stack_traces(
included: list[dict[str, Any]],
groups: dict[str, dict[str, Any]],
max_samples: int,
max_stack_lines: int,
) -> int:
stack: dict[str, Any] | None = None
stack_count = 0
for item in included:
line = item["line"]
if stack is None:
if is_stack_start(line):
stack = {
"lines": [line],
"exception_type": extract_exception_type(line),
"parsed_at": item["parsed_at"],
"rendered_at": item["rendered_at"],
}
continue
if is_stack_continuation(line):
stack["lines"].append(line)
if stack["exception_type"] == UNKNOWN:
stack["exception_type"] = extract_exception_type(line)
continue
if len(stack["lines"]) > 1:
record_stack_trace(groups, stack, max_samples, max_stack_lines)
stack_count += 1
stack = None
if is_stack_start(line):
stack = {
"lines": [line],
"exception_type": extract_exception_type(line),
"parsed_at": item["parsed_at"],
"rendered_at": item["rendered_at"],
}
if stack is not None and len(stack["lines"]) > 1:
record_stack_trace(groups, stack, max_samples, max_stack_lines)
stack_count += 1
return stack_count
def analyze_log(
lines: list[str],
patterns: list[dict[str, Any]],
since: datetime | None,
until: datetime | None,
top: int,
max_samples: int,
max_stack_lines: int,
http_critical_threshold: int,
) -> dict[str, Any]:
syslog_year = since.year if since is not None else datetime.now().year
groups: dict[str, dict[str, Any]] = {}
exception_counts: Counter[str] = Counter()
symptom_counts: Counter[str] = Counter()
parsed_timestamps = 0
unknown_timestamps = 0
included: list[dict[str, Any]] = []
http_5xx_count = 0
context_parsed_at: datetime | None = None
context_rendered_at = UNKNOWN
for line in lines:
parsed_at, rendered_at = parse_line_timestamp(line, syslog_year)
if parsed_at is None:
unknown_timestamps += 1
else:
parsed_timestamps += 1
context_parsed_at = parsed_at
context_rendered_at = rendered_at
if not line_in_time_window(parsed_at, since, until):
continue
# Stack trace frames often omit timestamps; keep nearby log context for first/last seen.
effective_parsed_at = parsed_at
effective_rendered_at = rendered_at
if parsed_at is None and (is_stack_start(line) or is_stack_continuation(line)):
effective_parsed_at = context_parsed_at
effective_rendered_at = context_rendered_at
included.append(
{
"line": line,
"parsed_at": effective_parsed_at,
"rendered_at": effective_rendered_at,
}
)
matched_names = set()
for item in patterns:
if not item["regex"].search(line):
continue
severity = item["severity"]
if item["kind"] == "http_5xx":
http_5xx_count += 1
add_finding(
groups=groups,
name=item["name"],
severity=severity,
symptom=item["symptom"],
kind=item["kind"],
line=line,
parsed_at=effective_parsed_at,
rendered_at=effective_rendered_at,
max_samples=max_samples,
)
symptom_counts[item["symptom"]] += 1
matched_names.add(item["name"])
exception_type = extract_exception_type(line)
if exception_type != UNKNOWN:
exception_counts[exception_type] += 1
if exception_type not in matched_names:
severity = severity_for_exception(exception_type)
add_finding(
groups=groups,
name=exception_type,
severity=severity,
symptom="application_exception",
kind="exception",
line=line,
parsed_at=effective_parsed_at,
rendered_at=effective_rendered_at,
max_samples=max_samples,
)
symptom_counts["application_exception"] += 1
stack_trace_count = detect_stack_traces(included, groups, max_samples, max_stack_lines)
promote_http_5xx(groups, http_5xx_count, http_critical_threshold)
findings = sorted(
(render_group(group) for group in groups.values()),
key=lambda item: (
SEVERITY_ORDER[item["severity"]],
-item["occurrences"],
item["name"].lower(),
),
)
summary = build_summary(
total_lines=len(lines),
findings=findings,
stack_trace_count=stack_trace_count,
http_5xx_count=http_5xx_count,
parsed_timestamps=parsed_timestamps,
unknown_timestamps=unknown_timestamps,
)
return {
"summary": summary,
"findings": findings[:top],
"top_exception_types": top_items(exception_counts, top),
"top_operational_symptoms": top_items(symptom_counts, top),
}
def promote_http_5xx(
groups: dict[str, dict[str, Any]], http_5xx_count: int, threshold: int
) -> None:
if http_5xx_count < threshold:
return
http_names = {item["name"] for item in HTTP_PATTERNS}
for old_key, group in list(groups.items()):
if group["name"] not in http_names or group["severity"] == "CRITICAL":
continue
group["severity"] = "CRITICAL"
new_key = finding_key("CRITICAL", group["name"])
groups[new_key] = group
del groups[old_key]
def render_group(group: dict[str, Any]) -> dict[str, Any]:
return {
"name": group["name"],
"severity": group["severity"],
"symptom": group["symptom"],
"kind": group["kind"],
"occurrences": group["occurrences"],
"stack_trace_count": group["stack_trace_count"],
"first_seen": render_seen(group["first_seen"]),
"last_seen": render_seen(group["last_seen"]),
"samples": group["samples"],
"stack_trace_samples": group["stack_trace_samples"],
"fields": group["fields"],
}
def build_summary(
total_lines: int,
findings: list[dict[str, Any]],
stack_trace_count: int,
http_5xx_count: int,
parsed_timestamps: int,
unknown_timestamps: int,
) -> dict[str, Any]:
critical_groups = sum(1 for item in findings if item["severity"] == "CRITICAL")
warning_groups = sum(1 for item in findings if item["severity"] == "WARNING")
total_findings = sum(item["occurrences"] for item in findings)
if critical_groups > 0:
status = "CRITICAL"
elif warning_groups > 0:
status = "WARNING"
else:
status = "OK"
return {
"overall_status": status,
"total_lines_scanned": total_lines,
"total_findings": total_findings,
"total_stack_traces_detected": stack_trace_count,
"critical_finding_groups": critical_groups,
"warning_finding_groups": warning_groups,
"http_5xx_count": http_5xx_count,
"timestamp_coverage": {
"parsed_timestamps_count": parsed_timestamps,
"unknown_timestamps_count": unknown_timestamps,
},
}
def top_items(counter: Counter[str], limit: int) -> list[dict[str, Any]]:
return [{"value": value, "count": count} for value, count in counter.most_common(limit)]
def render_text(report: dict[str, Any], include_stacktraces: bool) -> str:
lines = ["JVM Log Analyzer", "================", ""]
summary = report["summary"]
lines.extend(
[
f"Overall status: {summary['overall_status']}",
"Findings require review; logs alone do not prove root cause.",
"",
]
)
if not report["findings"]:
lines.extend(["No configured JVM/application findings were detected.", ""])
else:
for finding in report["findings"]:
lines.extend(
[
f"[{finding['severity']}] {finding['name']}",
f"Occurrences: {finding['occurrences']}",
f"Symptom: {finding['symptom']}",
f"First seen: {finding['first_seen']}",
f"Last seen: {finding['last_seen']}",
f"Stack traces linked: {finding['stack_trace_count']}",
"Samples:",
]
)
if finding["samples"]:
lines.extend(f" - {sample}" for sample in finding["samples"])
else:
lines.append(" - No samples retained")
if include_stacktraces and finding["stack_trace_samples"]:
lines.append("Stack trace samples:")
for stack in finding["stack_trace_samples"]:
lines.append(" ---")
lines.extend(f" {entry}" for entry in stack)
lines.append("")
lines.extend(render_text_table("Top Exception Types", report["top_exception_types"]))
lines.extend(render_text_table("Top Operational Symptoms", report["top_operational_symptoms"]))
lines.extend(render_text_summary(summary))
return "\n".join(lines) + "\n"
def render_text_table(title: str, rows: list[dict[str, Any]]) -> list[str]:
lines = [title, "-" * len(title)]
if not rows:
lines.append("No entries detected.")
else:
lines.extend(f"- {item['value']}: {item['count']}" for item in rows)
lines.append("")
return lines
def render_text_summary(summary: dict[str, Any]) -> list[str]:
coverage = summary["timestamp_coverage"]
return [
"Operational Summary",
"-------------------",
f"Overall status: {summary['overall_status']}",
f"Total lines scanned: {summary['total_lines_scanned']}",
f"Total findings: {summary['total_findings']}",
f"Total stack traces detected: {summary['total_stack_traces_detected']}",
f"Critical finding groups: {summary['critical_finding_groups']}",
f"Warning finding groups: {summary['warning_finding_groups']}",
f"HTTP 5xx count: {summary['http_5xx_count']}",
f"Parsed timestamps count: {coverage['parsed_timestamps_count']}",
f"Unknown timestamps count: {coverage['unknown_timestamps_count']}",
]
def render_markdown(report: dict[str, Any], include_stacktraces: bool) -> str:
summary = report["summary"]
lines = [
"# JVM Log Analyzer",
"",
f"- Overall status: {summary['overall_status']}",
"- Finding language is a triage summary; logs alone do not prove root cause.",
"",
]
if not report["findings"]:
lines.extend(["No configured JVM/application findings were detected.", ""])
else:
for finding in report["findings"]:
lines.extend(
[
f"## {finding['severity']}: {finding['name']}",
"",
f"- Occurrences: {finding['occurrences']}",
f"- Symptom: {finding['symptom']}",
f"- First seen: {finding['first_seen']}",
f"- Last seen: {finding['last_seen']}",
f"- Stack traces linked: {finding['stack_trace_count']}",
"",
"Sample log lines:",
"",
]
)
if finding["samples"]:
lines.append("```text")
lines.extend(finding["samples"])
lines.append("```")
else:
lines.append("_No samples retained._")
lines.append("")
if include_stacktraces and finding["stack_trace_samples"]:
lines.extend(["Stack trace samples:", ""])
for stack in finding["stack_trace_samples"]:
lines.append("```text")
lines.extend(stack)
lines.append("```")
lines.append("")
lines.extend(render_markdown_table("Top Exception Types", report["top_exception_types"]))
lines.extend(render_markdown_table("Top Operational Symptoms", report["top_operational_symptoms"]))
lines.extend(render_markdown_summary(summary))
return "\n".join(lines)
def render_markdown_table(title: str, rows: list[dict[str, Any]]) -> list[str]:
lines = [f"## {title}", ""]
if not rows:
lines.extend(["No entries detected.", ""])
return lines
lines.extend(["| Value | Count |", "| --- | ---: |"])
lines.extend(f"| {item['value']} | {item['count']} |" for item in rows)
lines.append("")
return lines
def render_markdown_summary(summary: dict[str, Any]) -> list[str]:
coverage = summary["timestamp_coverage"]
return [
"## Operational Summary",
"",
f"- Overall status: {summary['overall_status']}",
f"- Total lines scanned: {summary['total_lines_scanned']}",
f"- Total findings: {summary['total_findings']}",
f"- Total stack traces detected: {summary['total_stack_traces_detected']}",
f"- Critical finding groups: {summary['critical_finding_groups']}",
f"- Warning finding groups: {summary['warning_finding_groups']}",
f"- HTTP 5xx count: {summary['http_5xx_count']}",
f"- Parsed timestamps count: {coverage['parsed_timestamps_count']}",
f"- Unknown timestamps count: {coverage['unknown_timestamps_count']}",
"",
]
def render_json(report: dict[str, Any]) -> str:
return json.dumps(report, indent=2, sort_keys=True) + "\n"
def write_report(input_path: Path, output_path: str | None, content: str) -> None:
if output_path is None:
sys.stdout.write(content)
return
path = Path(output_path)
try:
if path.resolve() == input_path.resolve():
raise OSError("output path must not be the same as input file")
path.write_text(content, encoding="utf-8")
except OSError as exc:
raise OSError(f"unable to write output {path}: {exc}") from exc
def main() -> int:
parser = build_parser()
args = parser.parse_args()
input_path = Path(args.file)
if args.since is not None and args.until is not None and args.since > args.until:
parser.error("--since must be earlier than or equal to --until")
try:
lines = read_log_file(input_path)
report = analyze_log(
lines=lines,
patterns=compile_patterns(args.ignore_case),
since=args.since,
until=args.until,
top=args.top,
max_samples=args.max_samples,
max_stack_lines=args.max_stack_lines,
http_critical_threshold=args.http_critical_threshold,
)
if args.format == "text":
content = render_text(report, args.include_stacktraces)
elif args.format == "markdown":
content = render_markdown(report, args.include_stacktraces)
else:
content = render_json(report)
write_report(input_path, args.output, content)
except (OSError, ValueError) as exc:
print(f"CRITICAL: {exc}", file=sys.stderr)
return EXIT_INVALID
except RuntimeError as exc:
print(f"CRITICAL: runtime error: {exc}", file=sys.stderr)
return EXIT_INVALID
if report["summary"]["overall_status"] == "OK":
return EXIT_OK
return EXIT_FINDINGS
if __name__ == "__main__":
sys.exit(main())