#!/usr/bin/env python3 """Summarize suspicious authentication activity in local Linux auth logs.""" from __future__ import annotations import argparse import json import re import sys from collections import Counter, defaultdict from datetime import datetime from pathlib import Path from typing import Any EXIT_OK = 0 EXIT_FINDINGS = 1 EXIT_INVALID = 2 UNKNOWN = "UNKNOWN" ISO_TIMESTAMP_RE = re.compile(r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})\b") SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b") SERVICE_RE = re.compile(r"\s([A-Za-z0-9_.-]+)(?:\[\d+\])?:\s") IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") EVENT_PATTERNS = [ { "event_type": "failed_ssh_password", "category": "failed_login", "method": "password", "regex": re.compile( r"sshd(?:\[\d+\])?: Failed password for (?:(invalid user) )?(\S+) from ((?:\d{1,3}\.){3}\d{1,3})" ), }, { "event_type": "failed_ssh_publickey", "category": "failed_login", "method": "publickey", "regex": re.compile( r"sshd(?:\[\d+\])?: Failed publickey for (?:(invalid user) )?(\S+) from ((?:\d{1,3}\.){3}\d{1,3})" ), }, { "event_type": "successful_ssh_login", "category": "successful_login", "method": None, "regex": re.compile( r"sshd(?:\[\d+\])?: Accepted (\S+) for (\S+) from ((?:\d{1,3}\.){3}\d{1,3})" ), }, { "event_type": "invalid_user_attempt", "category": "invalid_user", "method": None, "regex": re.compile( r"sshd(?:\[\d+\])?: Invalid user (\S+) from ((?:\d{1,3}\.){3}\d{1,3})" ), }, { "event_type": "refused_user_attempt", "category": "refused_user", "method": None, "regex": re.compile( r"sshd(?:\[\d+\])?: (?:User|Connection closed by invalid user) (\S+).*?from ((?:\d{1,3}\.){3}\d{1,3})" ), }, { "event_type": "disconnect_after_failed_auth", "category": "disconnect_after_failed_auth", "method": None, "regex": re.compile( r"sshd(?:\[\d+\])?: Disconnected from (?:authenticating user \S+ |invalid user \S+ )?((?:\d{1,3}\.){3}\d{1,3}).*(?:preauth|Too many authentication failures)" ), }, { "event_type": "too_many_auth_failures", "category": "failed_login", "method": None, "regex": re.compile( r"sshd(?:\[\d+\])?: .*(?:Too many authentication failures|maximum authentication attempts exceeded).*" ), }, { "event_type": "sudo_command", "category": "sudo_usage", "method": None, "regex": re.compile(r"sudo(?:\[\d+\])?:\s+(\S+)\s+:\s+TTY=.*COMMAND=(.+)$"), }, { "event_type": "sudo_auth_failure", "category": "sudo_failure", "method": None, "regex": re.compile(r"sudo(?:\[\d+\])?: pam_unix\(sudo:auth\): authentication failure;.*"), }, { "event_type": "su_session_opened", "category": "su_event", "method": None, "regex": re.compile(r"su(?:\[\d+\])?: pam_unix\(su(?:-l)?:session\): session opened for user (\S+)"), }, { "event_type": "su_auth_failure", "category": "su_event", "method": None, "regex": re.compile(r"su(?:\[\d+\])?: pam_unix\(su(?:-l)?:auth\): authentication failure;.*"), }, { "event_type": "pam_unix_auth_failure", "category": "generic_auth_failure", "method": None, "regex": re.compile(r"pam_unix\([^)]*:auth\): authentication failure;.*"), }, { "event_type": "user_unknown", "category": "generic_auth_failure", "method": None, "regex": re.compile(r"user (?:unknown|not known to the underlying authentication module)"), }, { "event_type": "account_locked", "category": "generic_auth_failure", "method": None, "regex": re.compile(r"(?:account locked|authentication failure;.*account locked)", re.IGNORECASE), }, ] FAILED_CATEGORIES = {"failed_login", "generic_auth_failure"} SAMPLE_CATEGORIES = [ "failed_login", "invalid_user", "root_login_attempt", "sudo_failure", "suspicious_source_ip", ] def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Analyze local Linux authentication logs for suspicious patterns." ) parser.add_argument("--file", required=True, help="Local auth.log or secure file to analyze.") parser.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Report format. Default: text.", ) parser.add_argument("--output", help="Write report to this path instead of stdout.") parser.add_argument( "--top", type=positive_int, default=10, help="Number of top IPs, usernames, and event types to display. Default: 10.", ) parser.add_argument( "--threshold-failed", type=positive_int, default=5, help="Failed attempt threshold for suspicious IPs and usernames. Default: 5.", ) parser.add_argument( "--ignore-users", default="", help="Comma-separated usernames excluded from suspicious username thresholds.", ) parser.add_argument( "--max-samples", type=non_negative_int, default=3, help="Maximum sample lines per finding category. Default: 3.", ) return parser def positive_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be a positive integer") from exc if number <= 0: raise argparse.ArgumentTypeError("must be a positive integer") return number def non_negative_int(value: str) -> int: try: number = int(value) except ValueError as exc: raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc if number < 0: raise argparse.ArgumentTypeError("must be zero or a positive integer") return number def parse_ignore_users(value: str) -> list[str]: if not value.strip(): return [] users = [] for item in value.split(","): user = item.strip() if user: users.append(user) return sorted(set(users)) def read_log_file(path: Path) -> list[str]: if not path.exists(): raise OSError(f"file does not exist: {path}") if not path.is_file(): raise OSError(f"path is not a regular file: {path}") try: text = path.read_text(encoding="utf-8", errors="replace") except PermissionError as exc: raise OSError(f"file is not readable: {path}") from exc except OSError as exc: raise OSError(f"unable to read file {path}: {exc}") from exc if text == "": raise ValueError(f"file is empty: {path}") return text.splitlines() def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str]: iso_match = ISO_TIMESTAMP_RE.search(line) if iso_match: raw = f"{iso_match.group(1)} {iso_match.group(2)}" try: return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S"), raw except ValueError: return None, UNKNOWN syslog_match = SYSLOG_TIMESTAMP_RE.search(line) if syslog_match: raw = syslog_match.group(1) normalized = f"{syslog_year} {raw}" try: parsed = datetime.strptime(normalized, "%Y %b %d %H:%M:%S") except ValueError: return None, UNKNOWN return parsed, raw return None, UNKNOWN def render_seen(value: tuple[datetime, str] | None) -> str: if value is None: return UNKNOWN return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S") def extract_service(line: str) -> str: match = SERVICE_RE.search(line) if match: return match.group(1) return UNKNOWN def extract_ip(line: str) -> str: match = IP_RE.search(line) if match: return match.group(0) return UNKNOWN def extract_user_from_key_values(line: str) -> str: for pattern in ( r"\buser=([A-Za-z0-9_.@-]+)", r"\bruser=([A-Za-z0-9_.@-]+)", r"\bUSER=([A-Za-z0-9_.@-]+)", ): match = re.search(pattern, line) if match and match.group(1): return match.group(1) return UNKNOWN def event_from_match(line: str, pattern: dict[str, Any], match: re.Match[str]) -> dict[str, Any]: event_type = pattern["event_type"] username = UNKNOWN source_ip = extract_ip(line) method = pattern["method"] or UNKNOWN if event_type in ("failed_ssh_password", "failed_ssh_publickey"): username = match.group(2) source_ip = match.group(3) elif event_type == "successful_ssh_login": method = match.group(1) username = match.group(2) source_ip = match.group(3) elif event_type in ("invalid_user_attempt", "refused_user_attempt"): username = match.group(1) source_ip = match.group(2) elif event_type == "sudo_command": username = match.group(1) elif event_type == "su_session_opened": username = match.group(1).rstrip(")") elif event_type in ("sudo_auth_failure", "su_auth_failure", "pam_unix_auth_failure"): username = extract_user_from_key_values(line) if username == "root" and event_type in ( "failed_ssh_password", "failed_ssh_publickey", "successful_ssh_login", "invalid_user_attempt", "refused_user_attempt", ): event_type = "root_login_attempt" return { "event_type": event_type, "category": pattern["category"], "username": username or UNKNOWN, "source_ip": source_ip or UNKNOWN, "method": method, "service": extract_service(line), "raw": line, } def detect_events(line: str) -> list[dict[str, Any]]: events = [] for pattern in EVENT_PATTERNS: match = pattern["regex"].search(line) if match: events.append(event_from_match(line, pattern, match)) if any(event["event_type"] in ("sudo_auth_failure", "su_auth_failure") for event in events): events = [ event for event in events if event["event_type"] != "pam_unix_auth_failure" ] if "authentication failure" in line and not events: events.append( { "event_type": "authentication_failure", "category": "generic_auth_failure", "username": extract_user_from_key_values(line), "source_ip": extract_ip(line), "method": UNKNOWN, "service": extract_service(line), "raw": line, } ) return dedupe_events(events) def dedupe_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]: deduped = [] seen = set() for event in events: key = (event["event_type"], event["username"], event["source_ip"], event["raw"]) if key in seen: continue seen.add(key) deduped.append(event) return deduped def append_sample(samples: dict[str, list[str]], category: str, line: str, max_samples: int) -> None: if max_samples == 0: return if len(samples[category]) < max_samples: samples[category].append(line) def update_seen( first_seen: tuple[datetime, str] | None, last_seen: tuple[datetime, str] | None, parsed_at: datetime | None, rendered_at: str, ) -> tuple[tuple[datetime, str] | None, tuple[datetime, str] | None]: if parsed_at is None: return first_seen, last_seen if first_seen is None or parsed_at < first_seen[0]: first_seen = (parsed_at, rendered_at) if last_seen is None or parsed_at > last_seen[0]: last_seen = (parsed_at, rendered_at) return first_seen, last_seen def analyze_log( lines: list[str], threshold_failed: int, ignore_users: list[str], top: int, max_samples: int, ) -> dict[str, Any]: syslog_year = datetime.now().year events = [] samples: dict[str, list[str]] = defaultdict(list) event_type_counts: Counter[str] = Counter() failed_by_ip: Counter[str] = Counter() failed_by_user: Counter[str] = Counter() success_by_ip: Counter[str] = Counter() success_by_user: Counter[str] = Counter() first_seen: tuple[datetime, str] | None = None last_seen: tuple[datetime, str] | None = None for line in lines: parsed_at, rendered_at = parse_line_timestamp(line, syslog_year) line_events = detect_events(line) if not line_events: continue first_seen, last_seen = update_seen(first_seen, last_seen, parsed_at, rendered_at) for event in line_events: event["timestamp"] = rendered_at events.append(event) event_type_counts[event["event_type"]] += 1 category = event["category"] username = event["username"] source_ip = event["source_ip"] if event["event_type"] == "root_login_attempt": append_sample(samples, "root_login_attempt", line, max_samples) category = "failed_login" if category in FAILED_CATEGORIES: if source_ip != UNKNOWN: failed_by_ip[source_ip] += 1 if username != UNKNOWN: failed_by_user[username] += 1 append_sample(samples, "failed_login", line, max_samples) if category == "successful_login": if source_ip != UNKNOWN: success_by_ip[source_ip] += 1 if username != UNKNOWN: success_by_user[username] += 1 if category == "invalid_user": append_sample(samples, "invalid_user", line, max_samples) if category == "sudo_failure": append_sample(samples, "sudo_failure", line, max_samples) suspicious_ips = { ip: count for ip, count in failed_by_ip.items() if count >= threshold_failed } suspicious_users = { user: count for user, count in failed_by_user.items() if count >= threshold_failed and user not in ignore_users } for event in events: if event["source_ip"] in suspicious_ips: append_sample(samples, "suspicious_source_ip", event["raw"], max_samples) summary = build_summary( lines=lines, events=events, failed_by_ip=failed_by_ip, failed_by_user=failed_by_user, suspicious_ips=suspicious_ips, suspicious_users=suspicious_users, event_type_counts=event_type_counts, threshold_failed=threshold_failed, ignore_users=ignore_users, first_seen=first_seen, last_seen=last_seen, ) return { "summary": summary, "top_source_ips_by_failed_attempts": top_items(failed_by_ip, top), "top_usernames_by_failed_attempts": top_items(failed_by_user, top), "top_source_ips_by_successful_logins": top_items(success_by_ip, top), "top_usernames_by_successful_logins": top_items(success_by_user, top), "top_event_types": top_items(event_type_counts, top), "suspicious_source_ips": sorted_count_items(suspicious_ips), "suspicious_usernames": sorted_count_items(suspicious_users), "samples": {category: samples.get(category, []) for category in SAMPLE_CATEGORIES}, } def build_summary( lines: list[str], events: list[dict[str, Any]], failed_by_ip: Counter[str], failed_by_user: Counter[str], suspicious_ips: dict[str, int], suspicious_users: dict[str, int], event_type_counts: Counter[str], threshold_failed: int, ignore_users: list[str], first_seen: tuple[datetime, str] | None, last_seen: tuple[datetime, str] | None, ) -> dict[str, Any]: root_attempts = event_type_counts["root_login_attempt"] sudo_failures = event_type_counts["sudo_auth_failure"] invalid_users = event_type_counts["invalid_user_attempt"] high_volume_ips = sum(1 for count in suspicious_ips.values() if count >= threshold_failed * 2) high_volume_users = sum(1 for count in suspicious_users.values() if count >= threshold_failed * 2) if ( root_attempts >= threshold_failed or high_volume_ips > 0 or high_volume_users > 0 or len(suspicious_ips) >= 2 ): status = "CRITICAL" elif suspicious_ips or suspicious_users or invalid_users > 0 or sudo_failures > 0 or root_attempts > 0: status = "WARNING" else: status = "OK" return { "overall_status": status, "first_seen": render_seen(first_seen), "last_seen": render_seen(last_seen), "total_lines_scanned": len(lines), "authentication_events_detected": len(events), "failed_login_count": sum(failed_by_ip.values()), "successful_login_count": event_type_counts["successful_ssh_login"], "invalid_user_count": invalid_users, "root_login_attempt_count": root_attempts, "sudo_command_count": event_type_counts["sudo_command"], "sudo_failure_count": sudo_failures, "su_event_count": event_type_counts["su_session_opened"] + event_type_counts["su_auth_failure"], "suspicious_source_ip_count": len(suspicious_ips), "suspicious_username_count": len(suspicious_users), "threshold_failed": threshold_failed, "ignored_users": ignore_users, } def top_items(counter: Counter[str], limit: int) -> list[dict[str, Any]]: return [{"value": value, "count": count} for value, count in counter.most_common(limit)] def sorted_count_items(items: dict[str, int]) -> list[dict[str, Any]]: return [ {"value": value, "count": count} for value, count in sorted(items.items(), key=lambda item: (-item[1], item[0])) ] def render_text(report: dict[str, Any]) -> str: summary = report["summary"] lines = [ "Auth Log Audit", "==============", "", f"Overall status: {summary['overall_status']}", f"First seen: {summary['first_seen']}", f"Last seen: {summary['last_seen']}", "", ] lines.extend(render_text_table("Top Source IPs by Failed Attempts", report["top_source_ips_by_failed_attempts"])) lines.extend(render_text_table("Top Usernames by Failed Attempts", report["top_usernames_by_failed_attempts"])) lines.extend(render_text_table("Top Source IPs by Successful Logins", report["top_source_ips_by_successful_logins"])) lines.extend(render_text_table("Top Usernames by Successful Logins", report["top_usernames_by_successful_logins"])) lines.extend(render_text_table("Suspicious Source IPs", report["suspicious_source_ips"])) lines.extend(render_text_table("Suspicious Usernames", report["suspicious_usernames"])) lines.extend(render_text_table("Top Event Types", report["top_event_types"])) lines.extend(render_text_samples(report["samples"])) lines.extend(render_text_summary(summary)) return "\n".join(lines) + "\n" def render_text_table(title: str, rows: list[dict[str, Any]]) -> list[str]: lines = [title, "-" * len(title)] if not rows: lines.append("No entries detected.") else: for item in rows: lines.append(f"- {item['value']}: {item['count']}") lines.append("") return lines def render_text_samples(samples: dict[str, list[str]]) -> list[str]: lines = ["Sample Log Lines", "----------------"] for category in SAMPLE_CATEGORIES: lines.append(f"{category}:") if samples.get(category): lines.extend(f" - {sample}" for sample in samples[category]) else: lines.append(" - No samples retained") lines.append("") return lines def render_text_summary(summary: dict[str, Any]) -> list[str]: ignored = ", ".join(summary["ignored_users"]) if summary["ignored_users"] else "None" return [ "Operational Summary", "-------------------", f"Overall status: {summary['overall_status']}", f"Total lines scanned: {summary['total_lines_scanned']}", f"Authentication events detected: {summary['authentication_events_detected']}", f"Failed logins: {summary['failed_login_count']}", f"Successful logins: {summary['successful_login_count']}", f"Invalid user attempts: {summary['invalid_user_count']}", f"Root login attempts: {summary['root_login_attempt_count']}", f"Sudo usage events: {summary['sudo_command_count']}", f"Sudo authentication failures: {summary['sudo_failure_count']}", f"su events: {summary['su_event_count']}", f"Suspicious source IPs: {summary['suspicious_source_ip_count']}", f"Suspicious usernames: {summary['suspicious_username_count']}", f"Threshold used: {summary['threshold_failed']}", f"Ignored users: {ignored}", ] def render_markdown(report: dict[str, Any]) -> str: summary = report["summary"] lines = [ "# Auth Log Audit", "", f"- Overall status: {summary['overall_status']}", f"- First seen: {summary['first_seen']}", f"- Last seen: {summary['last_seen']}", "", ] lines.extend(render_markdown_table("Top Source IPs by Failed Attempts", report["top_source_ips_by_failed_attempts"])) lines.extend(render_markdown_table("Top Usernames by Failed Attempts", report["top_usernames_by_failed_attempts"])) lines.extend(render_markdown_table("Top Source IPs by Successful Logins", report["top_source_ips_by_successful_logins"])) lines.extend(render_markdown_table("Top Usernames by Successful Logins", report["top_usernames_by_successful_logins"])) lines.extend(render_markdown_table("Suspicious Source IPs", report["suspicious_source_ips"])) lines.extend(render_markdown_table("Suspicious Usernames", report["suspicious_usernames"])) lines.extend(render_markdown_table("Top Event Types", report["top_event_types"])) lines.extend(render_markdown_samples(report["samples"])) ignored = ", ".join(summary["ignored_users"]) if summary["ignored_users"] else "None" lines.extend( [ "## Operational Summary", "", f"- Overall status: {summary['overall_status']}", f"- Total lines scanned: {summary['total_lines_scanned']}", f"- Authentication events detected: {summary['authentication_events_detected']}", f"- Failed logins: {summary['failed_login_count']}", f"- Successful logins: {summary['successful_login_count']}", f"- Invalid user attempts: {summary['invalid_user_count']}", f"- Root login attempts: {summary['root_login_attempt_count']}", f"- Sudo usage events: {summary['sudo_command_count']}", f"- Sudo authentication failures: {summary['sudo_failure_count']}", f"- su events: {summary['su_event_count']}", f"- Suspicious source IPs: {summary['suspicious_source_ip_count']}", f"- Suspicious usernames: {summary['suspicious_username_count']}", f"- Threshold used: {summary['threshold_failed']}", f"- Ignored users: {ignored}", "", ] ) return "\n".join(lines) def render_markdown_table(title: str, rows: list[dict[str, Any]]) -> list[str]: lines = [f"## {title}", ""] if not rows: lines.extend(["No entries detected.", ""]) return lines lines.extend(["| Value | Count |", "| --- | ---: |"]) lines.extend(f"| {item['value']} | {item['count']} |" for item in rows) lines.append("") return lines def render_markdown_samples(samples: dict[str, list[str]]) -> list[str]: lines = ["## Sample Log Lines", ""] for category in SAMPLE_CATEGORIES: lines.extend([f"### {category}", ""]) if samples.get(category): lines.append("```text") lines.extend(samples[category]) lines.append("```") else: lines.append("_No samples retained._") lines.append("") return lines def render_json(report: dict[str, Any]) -> str: return json.dumps(report, indent=2, sort_keys=True) + "\n" def write_report(input_path: Path, output_path: str | None, content: str) -> None: if output_path is None: sys.stdout.write(content) return path = Path(output_path) try: if path.resolve() == input_path.resolve(): raise OSError("output path must not be the same as input file") path.write_text(content, encoding="utf-8") except OSError as exc: raise OSError(f"unable to write output {path}: {exc}") from exc def main() -> int: parser = build_parser() args = parser.parse_args() input_path = Path(args.file) ignore_users = parse_ignore_users(args.ignore_users) try: lines = read_log_file(input_path) report = analyze_log( lines=lines, threshold_failed=args.threshold_failed, ignore_users=ignore_users, top=args.top, max_samples=args.max_samples, ) if args.format == "text": content = render_text(report) elif args.format == "markdown": content = render_markdown(report) else: content = render_json(report) write_report(input_path, args.output, content) except (OSError, ValueError) as exc: print(f"CRITICAL: {exc}", file=sys.stderr) return EXIT_INVALID except RuntimeError as exc: print(f"CRITICAL: runtime error: {exc}", file=sys.stderr) return EXIT_INVALID if report["summary"]["overall_status"] == "OK": return EXIT_OK return EXIT_FINDINGS if __name__ == "__main__": sys.exit(main())