""" Services Data Collector Collects system service information including running services, their states, startup configuration, and dependencies. """ import json import logging import subprocess from typing import Dict, Any, List logger = logging.getLogger(__name__) class ServicesCollector: """Collector for system service information.""" def __init__(self): self.service_manager = "systemd" # Default to systemd self.include_disabled = False def collect_services(self, system: str) -> Dict[str, Any]: """Collect service information from target system.""" logger.info(f"Collecting services data from {system}") try: # Detect service manager service_manager = self.detect_service_manager(system) if service_manager == "systemd": services = self.collect_systemd_services(system) elif service_manager == "sysv": services = self.collect_sysv_services(system) else: raise RuntimeError(f"Unsupported service manager: {service_manager}") return { "service_manager": service_manager, "services": services, "timestamp": self.get_timestamp(system) } except Exception as e: logger.error(f"Failed to collect services from {system}: {e}") raise def detect_service_manager(self, system: str) -> str: """Detect which service manager is running on the system.""" try: # Check for systemd result = subprocess.run( ["ssh", system, "ps -p 1 -o comm="], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: if "systemd" in result.stdout.strip(): return "systemd" elif "init" in result.stdout.strip(): return "sysv" # Fallback check result = subprocess.run( ["ssh", system, "which systemctl"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return "systemd" return "sysv" except Exception: return "unknown" def collect_systemd_services(self, system: str) -> List[Dict[str, Any]]: """Collect systemd service information.""" services = [] try: # Get all services result = subprocess.run( ["ssh", system, "systemctl list-units --type=service --all --no-pager --no-legend"], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: raise RuntimeError(f"systemctl list-units failed: {result.stderr}") # Parse service list for line in result.stdout.strip().split('\n'): if not line.strip(): continue parts = line.split() if len(parts) >= 4: service_name = parts[0] load_state = parts[1] active_state = parts[2] sub_state = parts[3] # Skip if disabled and not including disabled if not self.include_disabled and load_state == "not-found": continue # Get detailed service info service_info = self.get_systemd_service_details(system, service_name) services.append({ "name": service_name, "load_state": load_state, "active_state": active_state, "sub_state": sub_state, **service_info }) except subprocess.TimeoutExpired: logger.error(f"Timeout collecting systemd services from {system}") raise except Exception as e: logger.error(f"Failed to collect systemd services from {system}: {e}") raise return services def get_systemd_service_details(self, system: str, service_name: str) -> Dict[str, Any]: """Get detailed information for a systemd service.""" details = {} try: # Get service status result = subprocess.run( ["ssh", system, f"systemctl show {service_name} --no-pager"], capture_output=True, text=True, timeout=15 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if '=' in line: key, value = line.split('=', 1) details[key.lower()] = value except Exception as e: logger.warning(f"Failed to get details for {service_name}: {e}") return details def collect_sysv_services(self, system: str) -> List[Dict[str, Any]]: """Collect SysV init service information.""" services = [] try: # Get service list from /etc/init.d/ result = subprocess.run( ["ssh", system, "ls -1 /etc/init.d/"], capture_output=True, text=True, timeout=15 ) if result.returncode != 0: raise RuntimeError(f"Failed to list init.d services: {result.stderr}") for service_name in result.stdout.strip().split('\n'): if not service_name.strip(): continue # Get service status status_result = subprocess.run( ["ssh", system, f"/etc/init.d/{service_name} status"], capture_output=True, text=True, timeout=10 ) status = "unknown" if status_result.returncode == 0: status = "running" elif "not running" in status_result.stdout.lower(): status = "stopped" services.append({ "name": service_name, "status": status, "type": "sysv" }) except Exception as e: logger.error(f"Failed to collect SysV services from {system}: {e}") raise return services def get_timestamp(self, system: str) -> str: """Get current timestamp from target system.""" try: result = subprocess.run( ["ssh", system, "date -Iseconds"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return result.stdout.strip() else: return "unknown" except Exception: return "unknown" def collect(system: str) -> Dict[str, Any]: """Main collection function for services data.""" collector = ServicesCollector() return collector.collect_services(system)