735 lines
26 KiB
Python
735 lines
26 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Summarize suspicious authentication activity in local Linux auth logs."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
from collections import Counter, defaultdict
|
||
|
|
from datetime import datetime
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
EXIT_OK = 0
|
||
|
|
EXIT_FINDINGS = 1
|
||
|
|
EXIT_INVALID = 2
|
||
|
|
|
||
|
|
UNKNOWN = "UNKNOWN"
|
||
|
|
|
||
|
|
ISO_TIMESTAMP_RE = re.compile(r"\b(\d{4}-\d{2}-\d{2})[ T](\d{2}:\d{2}:\d{2})\b")
|
||
|
|
SYSLOG_TIMESTAMP_RE = re.compile(r"^([A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\b")
|
||
|
|
SERVICE_RE = re.compile(r"\s([A-Za-z0-9_.-]+)(?:\[\d+\])?:\s")
|
||
|
|
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
|
||
|
|
|
||
|
|
|
||
|
|
EVENT_PATTERNS = [
|
||
|
|
{
|
||
|
|
"event_type": "failed_ssh_password",
|
||
|
|
"category": "failed_login",
|
||
|
|
"method": "password",
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: Failed password for (?:(invalid user) )?(\S+) from ((?:\d{1,3}\.){3}\d{1,3})"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "failed_ssh_publickey",
|
||
|
|
"category": "failed_login",
|
||
|
|
"method": "publickey",
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: Failed publickey for (?:(invalid user) )?(\S+) from ((?:\d{1,3}\.){3}\d{1,3})"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "successful_ssh_login",
|
||
|
|
"category": "successful_login",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: Accepted (\S+) for (\S+) from ((?:\d{1,3}\.){3}\d{1,3})"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "invalid_user_attempt",
|
||
|
|
"category": "invalid_user",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: Invalid user (\S+) from ((?:\d{1,3}\.){3}\d{1,3})"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "refused_user_attempt",
|
||
|
|
"category": "refused_user",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: (?:User|Connection closed by invalid user) (\S+).*?from ((?:\d{1,3}\.){3}\d{1,3})"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "disconnect_after_failed_auth",
|
||
|
|
"category": "disconnect_after_failed_auth",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: Disconnected from (?:authenticating user \S+ |invalid user \S+ )?((?:\d{1,3}\.){3}\d{1,3}).*(?:preauth|Too many authentication failures)"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "too_many_auth_failures",
|
||
|
|
"category": "failed_login",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(
|
||
|
|
r"sshd(?:\[\d+\])?: .*(?:Too many authentication failures|maximum authentication attempts exceeded).*"
|
||
|
|
),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "sudo_command",
|
||
|
|
"category": "sudo_usage",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"sudo(?:\[\d+\])?:\s+(\S+)\s+:\s+TTY=.*COMMAND=(.+)$"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "sudo_auth_failure",
|
||
|
|
"category": "sudo_failure",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"sudo(?:\[\d+\])?: pam_unix\(sudo:auth\): authentication failure;.*"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "su_session_opened",
|
||
|
|
"category": "su_event",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"su(?:\[\d+\])?: pam_unix\(su(?:-l)?:session\): session opened for user (\S+)"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "su_auth_failure",
|
||
|
|
"category": "su_event",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"su(?:\[\d+\])?: pam_unix\(su(?:-l)?:auth\): authentication failure;.*"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "pam_unix_auth_failure",
|
||
|
|
"category": "generic_auth_failure",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"pam_unix\([^)]*:auth\): authentication failure;.*"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "user_unknown",
|
||
|
|
"category": "generic_auth_failure",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"user (?:unknown|not known to the underlying authentication module)"),
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event_type": "account_locked",
|
||
|
|
"category": "generic_auth_failure",
|
||
|
|
"method": None,
|
||
|
|
"regex": re.compile(r"(?:account locked|authentication failure;.*account locked)", re.IGNORECASE),
|
||
|
|
},
|
||
|
|
]
|
||
|
|
|
||
|
|
FAILED_CATEGORIES = {"failed_login", "generic_auth_failure"}
|
||
|
|
SAMPLE_CATEGORIES = [
|
||
|
|
"failed_login",
|
||
|
|
"invalid_user",
|
||
|
|
"root_login_attempt",
|
||
|
|
"sudo_failure",
|
||
|
|
"suspicious_source_ip",
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def build_parser() -> argparse.ArgumentParser:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Analyze local Linux authentication logs for suspicious patterns."
|
||
|
|
)
|
||
|
|
parser.add_argument("--file", required=True, help="Local auth.log or secure file to analyze.")
|
||
|
|
parser.add_argument(
|
||
|
|
"--format",
|
||
|
|
choices=("text", "markdown", "json"),
|
||
|
|
default="text",
|
||
|
|
help="Report format. Default: text.",
|
||
|
|
)
|
||
|
|
parser.add_argument("--output", help="Write report to this path instead of stdout.")
|
||
|
|
parser.add_argument(
|
||
|
|
"--top",
|
||
|
|
type=positive_int,
|
||
|
|
default=10,
|
||
|
|
help="Number of top IPs, usernames, and event types to display. Default: 10.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--threshold-failed",
|
||
|
|
type=positive_int,
|
||
|
|
default=5,
|
||
|
|
help="Failed attempt threshold for suspicious IPs and usernames. Default: 5.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--ignore-users",
|
||
|
|
default="",
|
||
|
|
help="Comma-separated usernames excluded from suspicious username thresholds.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--max-samples",
|
||
|
|
type=non_negative_int,
|
||
|
|
default=3,
|
||
|
|
help="Maximum sample lines per finding category. Default: 3.",
|
||
|
|
)
|
||
|
|
return parser
|
||
|
|
|
||
|
|
|
||
|
|
def positive_int(value: str) -> int:
|
||
|
|
try:
|
||
|
|
number = int(value)
|
||
|
|
except ValueError as exc:
|
||
|
|
raise argparse.ArgumentTypeError("must be a positive integer") from exc
|
||
|
|
if number <= 0:
|
||
|
|
raise argparse.ArgumentTypeError("must be a positive integer")
|
||
|
|
return number
|
||
|
|
|
||
|
|
|
||
|
|
def non_negative_int(value: str) -> int:
|
||
|
|
try:
|
||
|
|
number = int(value)
|
||
|
|
except ValueError as exc:
|
||
|
|
raise argparse.ArgumentTypeError("must be zero or a positive integer") from exc
|
||
|
|
if number < 0:
|
||
|
|
raise argparse.ArgumentTypeError("must be zero or a positive integer")
|
||
|
|
return number
|
||
|
|
|
||
|
|
|
||
|
|
def parse_ignore_users(value: str) -> list[str]:
|
||
|
|
if not value.strip():
|
||
|
|
return []
|
||
|
|
users = []
|
||
|
|
for item in value.split(","):
|
||
|
|
user = item.strip()
|
||
|
|
if user:
|
||
|
|
users.append(user)
|
||
|
|
return sorted(set(users))
|
||
|
|
|
||
|
|
|
||
|
|
def read_log_file(path: Path) -> list[str]:
|
||
|
|
if not path.exists():
|
||
|
|
raise OSError(f"file does not exist: {path}")
|
||
|
|
if not path.is_file():
|
||
|
|
raise OSError(f"path is not a regular file: {path}")
|
||
|
|
try:
|
||
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
||
|
|
except PermissionError as exc:
|
||
|
|
raise OSError(f"file is not readable: {path}") from exc
|
||
|
|
except OSError as exc:
|
||
|
|
raise OSError(f"unable to read file {path}: {exc}") from exc
|
||
|
|
if text == "":
|
||
|
|
raise ValueError(f"file is empty: {path}")
|
||
|
|
return text.splitlines()
|
||
|
|
|
||
|
|
|
||
|
|
def parse_line_timestamp(line: str, syslog_year: int) -> tuple[datetime | None, str]:
|
||
|
|
iso_match = ISO_TIMESTAMP_RE.search(line)
|
||
|
|
if iso_match:
|
||
|
|
raw = f"{iso_match.group(1)} {iso_match.group(2)}"
|
||
|
|
try:
|
||
|
|
return datetime.strptime(raw, "%Y-%m-%d %H:%M:%S"), raw
|
||
|
|
except ValueError:
|
||
|
|
return None, UNKNOWN
|
||
|
|
|
||
|
|
syslog_match = SYSLOG_TIMESTAMP_RE.search(line)
|
||
|
|
if syslog_match:
|
||
|
|
raw = syslog_match.group(1)
|
||
|
|
normalized = f"{syslog_year} {raw}"
|
||
|
|
try:
|
||
|
|
parsed = datetime.strptime(normalized, "%Y %b %d %H:%M:%S")
|
||
|
|
except ValueError:
|
||
|
|
return None, UNKNOWN
|
||
|
|
return parsed, raw
|
||
|
|
|
||
|
|
return None, UNKNOWN
|
||
|
|
|
||
|
|
|
||
|
|
def render_seen(value: tuple[datetime, str] | None) -> str:
|
||
|
|
if value is None:
|
||
|
|
return UNKNOWN
|
||
|
|
return value[1] or value[0].strftime("%Y-%m-%d %H:%M:%S")
|
||
|
|
|
||
|
|
|
||
|
|
def extract_service(line: str) -> str:
|
||
|
|
match = SERVICE_RE.search(line)
|
||
|
|
if match:
|
||
|
|
return match.group(1)
|
||
|
|
return UNKNOWN
|
||
|
|
|
||
|
|
|
||
|
|
def extract_ip(line: str) -> str:
|
||
|
|
match = IP_RE.search(line)
|
||
|
|
if match:
|
||
|
|
return match.group(0)
|
||
|
|
return UNKNOWN
|
||
|
|
|
||
|
|
|
||
|
|
def extract_user_from_key_values(line: str) -> str:
|
||
|
|
for pattern in (
|
||
|
|
r"\buser=([A-Za-z0-9_.@-]+)",
|
||
|
|
r"\bruser=([A-Za-z0-9_.@-]+)",
|
||
|
|
r"\bUSER=([A-Za-z0-9_.@-]+)",
|
||
|
|
):
|
||
|
|
match = re.search(pattern, line)
|
||
|
|
if match and match.group(1):
|
||
|
|
return match.group(1)
|
||
|
|
return UNKNOWN
|
||
|
|
|
||
|
|
|
||
|
|
def event_from_match(line: str, pattern: dict[str, Any], match: re.Match[str]) -> dict[str, Any]:
|
||
|
|
event_type = pattern["event_type"]
|
||
|
|
username = UNKNOWN
|
||
|
|
source_ip = extract_ip(line)
|
||
|
|
method = pattern["method"] or UNKNOWN
|
||
|
|
|
||
|
|
if event_type in ("failed_ssh_password", "failed_ssh_publickey"):
|
||
|
|
username = match.group(2)
|
||
|
|
source_ip = match.group(3)
|
||
|
|
elif event_type == "successful_ssh_login":
|
||
|
|
method = match.group(1)
|
||
|
|
username = match.group(2)
|
||
|
|
source_ip = match.group(3)
|
||
|
|
elif event_type in ("invalid_user_attempt", "refused_user_attempt"):
|
||
|
|
username = match.group(1)
|
||
|
|
source_ip = match.group(2)
|
||
|
|
elif event_type == "sudo_command":
|
||
|
|
username = match.group(1)
|
||
|
|
elif event_type == "su_session_opened":
|
||
|
|
username = match.group(1).rstrip(")")
|
||
|
|
elif event_type in ("sudo_auth_failure", "su_auth_failure", "pam_unix_auth_failure"):
|
||
|
|
username = extract_user_from_key_values(line)
|
||
|
|
|
||
|
|
if username == "root" and event_type in (
|
||
|
|
"failed_ssh_password",
|
||
|
|
"failed_ssh_publickey",
|
||
|
|
"successful_ssh_login",
|
||
|
|
"invalid_user_attempt",
|
||
|
|
"refused_user_attempt",
|
||
|
|
):
|
||
|
|
event_type = "root_login_attempt"
|
||
|
|
|
||
|
|
return {
|
||
|
|
"event_type": event_type,
|
||
|
|
"category": pattern["category"],
|
||
|
|
"username": username or UNKNOWN,
|
||
|
|
"source_ip": source_ip or UNKNOWN,
|
||
|
|
"method": method,
|
||
|
|
"service": extract_service(line),
|
||
|
|
"raw": line,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def detect_events(line: str) -> list[dict[str, Any]]:
|
||
|
|
events = []
|
||
|
|
for pattern in EVENT_PATTERNS:
|
||
|
|
match = pattern["regex"].search(line)
|
||
|
|
if match:
|
||
|
|
events.append(event_from_match(line, pattern, match))
|
||
|
|
|
||
|
|
if any(event["event_type"] in ("sudo_auth_failure", "su_auth_failure") for event in events):
|
||
|
|
events = [
|
||
|
|
event for event in events if event["event_type"] != "pam_unix_auth_failure"
|
||
|
|
]
|
||
|
|
|
||
|
|
if "authentication failure" in line and not events:
|
||
|
|
events.append(
|
||
|
|
{
|
||
|
|
"event_type": "authentication_failure",
|
||
|
|
"category": "generic_auth_failure",
|
||
|
|
"username": extract_user_from_key_values(line),
|
||
|
|
"source_ip": extract_ip(line),
|
||
|
|
"method": UNKNOWN,
|
||
|
|
"service": extract_service(line),
|
||
|
|
"raw": line,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return dedupe_events(events)
|
||
|
|
|
||
|
|
|
||
|
|
def dedupe_events(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||
|
|
deduped = []
|
||
|
|
seen = set()
|
||
|
|
for event in events:
|
||
|
|
key = (event["event_type"], event["username"], event["source_ip"], event["raw"])
|
||
|
|
if key in seen:
|
||
|
|
continue
|
||
|
|
seen.add(key)
|
||
|
|
deduped.append(event)
|
||
|
|
return deduped
|
||
|
|
|
||
|
|
|
||
|
|
def append_sample(samples: dict[str, list[str]], category: str, line: str, max_samples: int) -> None:
|
||
|
|
if max_samples == 0:
|
||
|
|
return
|
||
|
|
if len(samples[category]) < max_samples:
|
||
|
|
samples[category].append(line)
|
||
|
|
|
||
|
|
|
||
|
|
def update_seen(
|
||
|
|
first_seen: tuple[datetime, str] | None,
|
||
|
|
last_seen: tuple[datetime, str] | None,
|
||
|
|
parsed_at: datetime | None,
|
||
|
|
rendered_at: str,
|
||
|
|
) -> tuple[tuple[datetime, str] | None, tuple[datetime, str] | None]:
|
||
|
|
if parsed_at is None:
|
||
|
|
return first_seen, last_seen
|
||
|
|
if first_seen is None or parsed_at < first_seen[0]:
|
||
|
|
first_seen = (parsed_at, rendered_at)
|
||
|
|
if last_seen is None or parsed_at > last_seen[0]:
|
||
|
|
last_seen = (parsed_at, rendered_at)
|
||
|
|
return first_seen, last_seen
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_log(
|
||
|
|
lines: list[str],
|
||
|
|
threshold_failed: int,
|
||
|
|
ignore_users: list[str],
|
||
|
|
top: int,
|
||
|
|
max_samples: int,
|
||
|
|
) -> dict[str, Any]:
|
||
|
|
syslog_year = datetime.now().year
|
||
|
|
events = []
|
||
|
|
samples: dict[str, list[str]] = defaultdict(list)
|
||
|
|
event_type_counts: Counter[str] = Counter()
|
||
|
|
failed_by_ip: Counter[str] = Counter()
|
||
|
|
failed_by_user: Counter[str] = Counter()
|
||
|
|
success_by_ip: Counter[str] = Counter()
|
||
|
|
success_by_user: Counter[str] = Counter()
|
||
|
|
first_seen: tuple[datetime, str] | None = None
|
||
|
|
last_seen: tuple[datetime, str] | None = None
|
||
|
|
|
||
|
|
for line in lines:
|
||
|
|
parsed_at, rendered_at = parse_line_timestamp(line, syslog_year)
|
||
|
|
line_events = detect_events(line)
|
||
|
|
if not line_events:
|
||
|
|
continue
|
||
|
|
|
||
|
|
first_seen, last_seen = update_seen(first_seen, last_seen, parsed_at, rendered_at)
|
||
|
|
for event in line_events:
|
||
|
|
event["timestamp"] = rendered_at
|
||
|
|
events.append(event)
|
||
|
|
event_type_counts[event["event_type"]] += 1
|
||
|
|
|
||
|
|
category = event["category"]
|
||
|
|
username = event["username"]
|
||
|
|
source_ip = event["source_ip"]
|
||
|
|
|
||
|
|
if event["event_type"] == "root_login_attempt":
|
||
|
|
append_sample(samples, "root_login_attempt", line, max_samples)
|
||
|
|
category = "failed_login"
|
||
|
|
|
||
|
|
if category in FAILED_CATEGORIES:
|
||
|
|
if source_ip != UNKNOWN:
|
||
|
|
failed_by_ip[source_ip] += 1
|
||
|
|
if username != UNKNOWN:
|
||
|
|
failed_by_user[username] += 1
|
||
|
|
append_sample(samples, "failed_login", line, max_samples)
|
||
|
|
|
||
|
|
if category == "successful_login":
|
||
|
|
if source_ip != UNKNOWN:
|
||
|
|
success_by_ip[source_ip] += 1
|
||
|
|
if username != UNKNOWN:
|
||
|
|
success_by_user[username] += 1
|
||
|
|
|
||
|
|
if category == "invalid_user":
|
||
|
|
append_sample(samples, "invalid_user", line, max_samples)
|
||
|
|
if category == "sudo_failure":
|
||
|
|
append_sample(samples, "sudo_failure", line, max_samples)
|
||
|
|
|
||
|
|
suspicious_ips = {
|
||
|
|
ip: count for ip, count in failed_by_ip.items() if count >= threshold_failed
|
||
|
|
}
|
||
|
|
suspicious_users = {
|
||
|
|
user: count
|
||
|
|
for user, count in failed_by_user.items()
|
||
|
|
if count >= threshold_failed and user not in ignore_users
|
||
|
|
}
|
||
|
|
|
||
|
|
for event in events:
|
||
|
|
if event["source_ip"] in suspicious_ips:
|
||
|
|
append_sample(samples, "suspicious_source_ip", event["raw"], max_samples)
|
||
|
|
|
||
|
|
summary = build_summary(
|
||
|
|
lines=lines,
|
||
|
|
events=events,
|
||
|
|
failed_by_ip=failed_by_ip,
|
||
|
|
failed_by_user=failed_by_user,
|
||
|
|
suspicious_ips=suspicious_ips,
|
||
|
|
suspicious_users=suspicious_users,
|
||
|
|
event_type_counts=event_type_counts,
|
||
|
|
threshold_failed=threshold_failed,
|
||
|
|
ignore_users=ignore_users,
|
||
|
|
first_seen=first_seen,
|
||
|
|
last_seen=last_seen,
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"summary": summary,
|
||
|
|
"top_source_ips_by_failed_attempts": top_items(failed_by_ip, top),
|
||
|
|
"top_usernames_by_failed_attempts": top_items(failed_by_user, top),
|
||
|
|
"top_source_ips_by_successful_logins": top_items(success_by_ip, top),
|
||
|
|
"top_usernames_by_successful_logins": top_items(success_by_user, top),
|
||
|
|
"top_event_types": top_items(event_type_counts, top),
|
||
|
|
"suspicious_source_ips": sorted_count_items(suspicious_ips),
|
||
|
|
"suspicious_usernames": sorted_count_items(suspicious_users),
|
||
|
|
"samples": {category: samples.get(category, []) for category in SAMPLE_CATEGORIES},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_summary(
|
||
|
|
lines: list[str],
|
||
|
|
events: list[dict[str, Any]],
|
||
|
|
failed_by_ip: Counter[str],
|
||
|
|
failed_by_user: Counter[str],
|
||
|
|
suspicious_ips: dict[str, int],
|
||
|
|
suspicious_users: dict[str, int],
|
||
|
|
event_type_counts: Counter[str],
|
||
|
|
threshold_failed: int,
|
||
|
|
ignore_users: list[str],
|
||
|
|
first_seen: tuple[datetime, str] | None,
|
||
|
|
last_seen: tuple[datetime, str] | None,
|
||
|
|
) -> dict[str, Any]:
|
||
|
|
root_attempts = event_type_counts["root_login_attempt"]
|
||
|
|
sudo_failures = event_type_counts["sudo_auth_failure"]
|
||
|
|
invalid_users = event_type_counts["invalid_user_attempt"]
|
||
|
|
high_volume_ips = sum(1 for count in suspicious_ips.values() if count >= threshold_failed * 2)
|
||
|
|
high_volume_users = sum(1 for count in suspicious_users.values() if count >= threshold_failed * 2)
|
||
|
|
|
||
|
|
if (
|
||
|
|
root_attempts >= threshold_failed
|
||
|
|
or high_volume_ips > 0
|
||
|
|
or high_volume_users > 0
|
||
|
|
or len(suspicious_ips) >= 2
|
||
|
|
):
|
||
|
|
status = "CRITICAL"
|
||
|
|
elif suspicious_ips or suspicious_users or invalid_users > 0 or sudo_failures > 0 or root_attempts > 0:
|
||
|
|
status = "WARNING"
|
||
|
|
else:
|
||
|
|
status = "OK"
|
||
|
|
|
||
|
|
return {
|
||
|
|
"overall_status": status,
|
||
|
|
"first_seen": render_seen(first_seen),
|
||
|
|
"last_seen": render_seen(last_seen),
|
||
|
|
"total_lines_scanned": len(lines),
|
||
|
|
"authentication_events_detected": len(events),
|
||
|
|
"failed_login_count": sum(failed_by_ip.values()),
|
||
|
|
"successful_login_count": event_type_counts["successful_ssh_login"],
|
||
|
|
"invalid_user_count": invalid_users,
|
||
|
|
"root_login_attempt_count": root_attempts,
|
||
|
|
"sudo_command_count": event_type_counts["sudo_command"],
|
||
|
|
"sudo_failure_count": sudo_failures,
|
||
|
|
"su_event_count": event_type_counts["su_session_opened"] + event_type_counts["su_auth_failure"],
|
||
|
|
"suspicious_source_ip_count": len(suspicious_ips),
|
||
|
|
"suspicious_username_count": len(suspicious_users),
|
||
|
|
"threshold_failed": threshold_failed,
|
||
|
|
"ignored_users": ignore_users,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def top_items(counter: Counter[str], limit: int) -> list[dict[str, Any]]:
|
||
|
|
return [{"value": value, "count": count} for value, count in counter.most_common(limit)]
|
||
|
|
|
||
|
|
|
||
|
|
def sorted_count_items(items: dict[str, int]) -> list[dict[str, Any]]:
|
||
|
|
return [
|
||
|
|
{"value": value, "count": count}
|
||
|
|
for value, count in sorted(items.items(), key=lambda item: (-item[1], item[0]))
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def render_text(report: dict[str, Any]) -> str:
|
||
|
|
summary = report["summary"]
|
||
|
|
lines = [
|
||
|
|
"Auth Log Audit",
|
||
|
|
"==============",
|
||
|
|
"",
|
||
|
|
f"Overall status: {summary['overall_status']}",
|
||
|
|
f"First seen: {summary['first_seen']}",
|
||
|
|
f"Last seen: {summary['last_seen']}",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
|
||
|
|
lines.extend(render_text_table("Top Source IPs by Failed Attempts", report["top_source_ips_by_failed_attempts"]))
|
||
|
|
lines.extend(render_text_table("Top Usernames by Failed Attempts", report["top_usernames_by_failed_attempts"]))
|
||
|
|
lines.extend(render_text_table("Top Source IPs by Successful Logins", report["top_source_ips_by_successful_logins"]))
|
||
|
|
lines.extend(render_text_table("Top Usernames by Successful Logins", report["top_usernames_by_successful_logins"]))
|
||
|
|
lines.extend(render_text_table("Suspicious Source IPs", report["suspicious_source_ips"]))
|
||
|
|
lines.extend(render_text_table("Suspicious Usernames", report["suspicious_usernames"]))
|
||
|
|
lines.extend(render_text_table("Top Event Types", report["top_event_types"]))
|
||
|
|
lines.extend(render_text_samples(report["samples"]))
|
||
|
|
lines.extend(render_text_summary(summary))
|
||
|
|
return "\n".join(lines) + "\n"
|
||
|
|
|
||
|
|
|
||
|
|
def render_text_table(title: str, rows: list[dict[str, Any]]) -> list[str]:
|
||
|
|
lines = [title, "-" * len(title)]
|
||
|
|
if not rows:
|
||
|
|
lines.append("No entries detected.")
|
||
|
|
else:
|
||
|
|
for item in rows:
|
||
|
|
lines.append(f"- {item['value']}: {item['count']}")
|
||
|
|
lines.append("")
|
||
|
|
return lines
|
||
|
|
|
||
|
|
|
||
|
|
def render_text_samples(samples: dict[str, list[str]]) -> list[str]:
|
||
|
|
lines = ["Sample Log Lines", "----------------"]
|
||
|
|
for category in SAMPLE_CATEGORIES:
|
||
|
|
lines.append(f"{category}:")
|
||
|
|
if samples.get(category):
|
||
|
|
lines.extend(f" - {sample}" for sample in samples[category])
|
||
|
|
else:
|
||
|
|
lines.append(" - No samples retained")
|
||
|
|
lines.append("")
|
||
|
|
return lines
|
||
|
|
|
||
|
|
|
||
|
|
def render_text_summary(summary: dict[str, Any]) -> list[str]:
|
||
|
|
ignored = ", ".join(summary["ignored_users"]) if summary["ignored_users"] else "None"
|
||
|
|
return [
|
||
|
|
"Operational Summary",
|
||
|
|
"-------------------",
|
||
|
|
f"Overall status: {summary['overall_status']}",
|
||
|
|
f"Total lines scanned: {summary['total_lines_scanned']}",
|
||
|
|
f"Authentication events detected: {summary['authentication_events_detected']}",
|
||
|
|
f"Failed logins: {summary['failed_login_count']}",
|
||
|
|
f"Successful logins: {summary['successful_login_count']}",
|
||
|
|
f"Invalid user attempts: {summary['invalid_user_count']}",
|
||
|
|
f"Root login attempts: {summary['root_login_attempt_count']}",
|
||
|
|
f"Sudo usage events: {summary['sudo_command_count']}",
|
||
|
|
f"Sudo authentication failures: {summary['sudo_failure_count']}",
|
||
|
|
f"su events: {summary['su_event_count']}",
|
||
|
|
f"Suspicious source IPs: {summary['suspicious_source_ip_count']}",
|
||
|
|
f"Suspicious usernames: {summary['suspicious_username_count']}",
|
||
|
|
f"Threshold used: {summary['threshold_failed']}",
|
||
|
|
f"Ignored users: {ignored}",
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def render_markdown(report: dict[str, Any]) -> str:
|
||
|
|
summary = report["summary"]
|
||
|
|
lines = [
|
||
|
|
"# Auth Log Audit",
|
||
|
|
"",
|
||
|
|
f"- Overall status: {summary['overall_status']}",
|
||
|
|
f"- First seen: {summary['first_seen']}",
|
||
|
|
f"- Last seen: {summary['last_seen']}",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
|
||
|
|
lines.extend(render_markdown_table("Top Source IPs by Failed Attempts", report["top_source_ips_by_failed_attempts"]))
|
||
|
|
lines.extend(render_markdown_table("Top Usernames by Failed Attempts", report["top_usernames_by_failed_attempts"]))
|
||
|
|
lines.extend(render_markdown_table("Top Source IPs by Successful Logins", report["top_source_ips_by_successful_logins"]))
|
||
|
|
lines.extend(render_markdown_table("Top Usernames by Successful Logins", report["top_usernames_by_successful_logins"]))
|
||
|
|
lines.extend(render_markdown_table("Suspicious Source IPs", report["suspicious_source_ips"]))
|
||
|
|
lines.extend(render_markdown_table("Suspicious Usernames", report["suspicious_usernames"]))
|
||
|
|
lines.extend(render_markdown_table("Top Event Types", report["top_event_types"]))
|
||
|
|
lines.extend(render_markdown_samples(report["samples"]))
|
||
|
|
|
||
|
|
ignored = ", ".join(summary["ignored_users"]) if summary["ignored_users"] else "None"
|
||
|
|
lines.extend(
|
||
|
|
[
|
||
|
|
"## Operational Summary",
|
||
|
|
"",
|
||
|
|
f"- Overall status: {summary['overall_status']}",
|
||
|
|
f"- Total lines scanned: {summary['total_lines_scanned']}",
|
||
|
|
f"- Authentication events detected: {summary['authentication_events_detected']}",
|
||
|
|
f"- Failed logins: {summary['failed_login_count']}",
|
||
|
|
f"- Successful logins: {summary['successful_login_count']}",
|
||
|
|
f"- Invalid user attempts: {summary['invalid_user_count']}",
|
||
|
|
f"- Root login attempts: {summary['root_login_attempt_count']}",
|
||
|
|
f"- Sudo usage events: {summary['sudo_command_count']}",
|
||
|
|
f"- Sudo authentication failures: {summary['sudo_failure_count']}",
|
||
|
|
f"- su events: {summary['su_event_count']}",
|
||
|
|
f"- Suspicious source IPs: {summary['suspicious_source_ip_count']}",
|
||
|
|
f"- Suspicious usernames: {summary['suspicious_username_count']}",
|
||
|
|
f"- Threshold used: {summary['threshold_failed']}",
|
||
|
|
f"- Ignored users: {ignored}",
|
||
|
|
"",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
def render_markdown_table(title: str, rows: list[dict[str, Any]]) -> list[str]:
|
||
|
|
lines = [f"## {title}", ""]
|
||
|
|
if not rows:
|
||
|
|
lines.extend(["No entries detected.", ""])
|
||
|
|
return lines
|
||
|
|
lines.extend(["| Value | Count |", "| --- | ---: |"])
|
||
|
|
lines.extend(f"| {item['value']} | {item['count']} |" for item in rows)
|
||
|
|
lines.append("")
|
||
|
|
return lines
|
||
|
|
|
||
|
|
|
||
|
|
def render_markdown_samples(samples: dict[str, list[str]]) -> list[str]:
|
||
|
|
lines = ["## Sample Log Lines", ""]
|
||
|
|
for category in SAMPLE_CATEGORIES:
|
||
|
|
lines.extend([f"### {category}", ""])
|
||
|
|
if samples.get(category):
|
||
|
|
lines.append("```text")
|
||
|
|
lines.extend(samples[category])
|
||
|
|
lines.append("```")
|
||
|
|
else:
|
||
|
|
lines.append("_No samples retained._")
|
||
|
|
lines.append("")
|
||
|
|
return lines
|
||
|
|
|
||
|
|
|
||
|
|
def render_json(report: dict[str, Any]) -> str:
|
||
|
|
return json.dumps(report, indent=2, sort_keys=True) + "\n"
|
||
|
|
|
||
|
|
|
||
|
|
def write_report(input_path: Path, output_path: str | None, content: str) -> None:
|
||
|
|
if output_path is None:
|
||
|
|
sys.stdout.write(content)
|
||
|
|
return
|
||
|
|
|
||
|
|
path = Path(output_path)
|
||
|
|
try:
|
||
|
|
if path.resolve() == input_path.resolve():
|
||
|
|
raise OSError("output path must not be the same as input file")
|
||
|
|
path.write_text(content, encoding="utf-8")
|
||
|
|
except OSError as exc:
|
||
|
|
raise OSError(f"unable to write output {path}: {exc}") from exc
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
parser = build_parser()
|
||
|
|
args = parser.parse_args()
|
||
|
|
input_path = Path(args.file)
|
||
|
|
ignore_users = parse_ignore_users(args.ignore_users)
|
||
|
|
|
||
|
|
try:
|
||
|
|
lines = read_log_file(input_path)
|
||
|
|
report = analyze_log(
|
||
|
|
lines=lines,
|
||
|
|
threshold_failed=args.threshold_failed,
|
||
|
|
ignore_users=ignore_users,
|
||
|
|
top=args.top,
|
||
|
|
max_samples=args.max_samples,
|
||
|
|
)
|
||
|
|
|
||
|
|
if args.format == "text":
|
||
|
|
content = render_text(report)
|
||
|
|
elif args.format == "markdown":
|
||
|
|
content = render_markdown(report)
|
||
|
|
else:
|
||
|
|
content = render_json(report)
|
||
|
|
|
||
|
|
write_report(input_path, args.output, content)
|
||
|
|
except (OSError, ValueError) as exc:
|
||
|
|
print(f"CRITICAL: {exc}", file=sys.stderr)
|
||
|
|
return EXIT_INVALID
|
||
|
|
except RuntimeError as exc:
|
||
|
|
print(f"CRITICAL: runtime error: {exc}", file=sys.stderr)
|
||
|
|
return EXIT_INVALID
|
||
|
|
|
||
|
|
if report["summary"]["overall_status"] == "OK":
|
||
|
|
return EXIT_OK
|
||
|
|
return EXIT_FINDINGS
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|