use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker}; use std::process::Command; use std::sync::RwLock; use std::time::Instant; use tracing::debug; use super::{Collector, CollectorError}; use crate::config::SystemdConfig; /// Systemd collector for monitoring systemd services pub struct SystemdCollector { /// Cached state with thread-safe interior mutability state: RwLock, /// Configuration for service monitoring config: SystemdConfig, } /// Internal state for service caching #[derive(Debug)] struct ServiceCacheState { /// Interesting services to monitor (cached after discovery) monitored_services: Vec, /// Last time services were discovered last_discovery_time: Option, /// How often to rediscover services (5 minutes) discovery_interval_seconds: u64, /// Cached nginx site latency metrics nginx_site_metrics: Vec, /// Last time nginx sites were checked last_nginx_check_time: Option, /// How often to check nginx site latency (30 seconds) nginx_check_interval_seconds: u64, } impl SystemdCollector { pub fn new(config: SystemdConfig) -> Self { Self { state: RwLock::new(ServiceCacheState { monitored_services: Vec::new(), last_discovery_time: None, discovery_interval_seconds: 300, // 5 minutes nginx_site_metrics: Vec::new(), last_nginx_check_time: None, nginx_check_interval_seconds: 30, // 30 seconds for nginx sites }), config, } } /// Get monitored services, discovering them if needed or cache is expired fn get_monitored_services(&self) -> Result> { let mut state = self.state.write().unwrap(); // Check if we need to discover services let needs_discovery = match state.last_discovery_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.discovery_interval_seconds } }; if needs_discovery { debug!("Discovering systemd services (cache expired or first run)"); match self.discover_services() { Ok(services) => { state.monitored_services = services; state.last_discovery_time = Some(Instant::now()); debug!( "Auto-discovered {} services to monitor: {:?}", state.monitored_services.len(), state.monitored_services ); } Err(e) => { debug!("Failed to discover services, using cached list: {}", e); // Continue with existing cached services if discovery fails } } } Ok(state.monitored_services.clone()) } /// Get nginx site metrics, checking them if cache is expired fn get_nginx_site_metrics(&self) -> Vec { let mut state = self.state.write().unwrap(); // Check if we need to refresh nginx site metrics let needs_refresh = match state.last_nginx_check_time { None => true, // First time Some(last_time) => { let elapsed = last_time.elapsed().as_secs(); elapsed >= state.nginx_check_interval_seconds } }; if needs_refresh { // Only check nginx sites if nginx service is active if state.monitored_services.iter().any(|s| s.contains("nginx")) { debug!( "Refreshing nginx site latency metrics (interval: {}s)", state.nginx_check_interval_seconds ); let fresh_metrics = self.get_nginx_sites(); state.nginx_site_metrics = fresh_metrics; state.last_nginx_check_time = Some(Instant::now()); } } state.nginx_site_metrics.clone() } /// Auto-discover interesting services to monitor fn discover_services(&self) -> Result> { // First get all unit files (includes inactive services) let unit_files_output = Command::new("systemctl") .arg("list-unit-files") .arg("--type=service") .arg("--no-pager") .arg("--plain") .output()?; // Then get all loaded units (includes running/failed services) let units_output = Command::new("systemctl") .arg("list-units") .arg("--type=service") .arg("--all") .arg("--no-pager") .arg("--plain") .output()?; // Use configured user mapping instead of hardcoded hostname logic let target_user = &self.config.host_user_mapping; // Also get user unit files (user-level services) for target user let user_unit_files_output = Command::new("sudo") .arg("-u") .arg(target_user) .arg("systemctl") .arg("--user") .arg("list-unit-files") .arg("--type=service") .arg("--no-pager") .arg("--plain") .output()?; // And user loaded units for target user let user_units_output = Command::new("sudo") .arg("-u") .arg(target_user) .arg("systemctl") .arg("--user") .arg("list-units") .arg("--type=service") .arg("--all") .arg("--no-pager") .arg("--plain") .output()?; if !unit_files_output.status.success() || !units_output.status.success() { return Err(anyhow::anyhow!("systemctl system command failed")); } // User commands might fail if no user session, so check individually let user_unit_files_success = user_unit_files_output.status.success(); let user_units_success = user_units_output.status.success(); let unit_files_str = String::from_utf8(unit_files_output.stdout)?; let units_str = String::from_utf8(units_output.stdout)?; let user_unit_files_str = if user_unit_files_success { String::from_utf8(user_unit_files_output.stdout).ok() } else { None }; let user_units_str = if user_units_success { String::from_utf8(user_units_output.stdout).ok() } else { None }; let mut services = Vec::new(); // Use configuration instead of hardcoded values let excluded_services = &self.config.excluded_services; let service_name_filters = &self.config.service_name_filters; // Parse both unit files and loaded units let mut all_service_names = std::collections::HashSet::new(); // Parse unit files (includes inactive services) for line in unit_files_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 2 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); } } // Parse loaded units (includes running/failed services) for line in units_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 4 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); } } // Parse user unit files if available if let Some(user_unit_files_str) = &user_unit_files_str { for line in user_unit_files_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 2 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); } } } // Parse user loaded units if available if let Some(user_units_str) = &user_units_str { for line in user_units_str.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 4 && fields[0].ends_with(".service") { let service_name = fields[0].trim_end_matches(".service"); all_service_names.insert(service_name.to_string()); } } } // Now process all discovered services for service_name in &all_service_names { debug!("Processing service: '{}'", service_name); // Skip excluded services first let mut is_excluded = false; for excluded in excluded_services { if service_name.contains(excluded) { debug!( "EXCLUDING service '{}' because it matches pattern '{}'", service_name, excluded ); is_excluded = true; break; } } if is_excluded { debug!("Skipping excluded service: '{}'", service_name); continue; } // Check if this service matches our filter patterns for pattern in service_name_filters { if service_name == pattern { debug!( "INCLUDING service '{}' because it matches pattern '{}'", service_name, pattern ); services.push(service_name.to_string()); break; } } } Ok(services) } /// Get service status using systemctl fn get_service_status(&self, service: &str) -> Result<(String, String)> { let output = Command::new("systemctl") .arg("is-active") .arg(format!("{}.service", service)) .output()?; let active_status = String::from_utf8(output.stdout)?.trim().to_string(); // Get more detailed info let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=LoadState,ActiveState,SubState") .output()?; let detailed_info = String::from_utf8(output.stdout)?; Ok((active_status, detailed_info)) } /// Calculate service status fn calculate_service_status(&self, active_status: &str) -> Status { match active_status.to_lowercase().as_str() { "active" => Status::Ok, "inactive" | "dead" => Status::Warning, "failed" | "error" => Status::Critical, "activating" | "deactivating" | "reloading" | "start" | "stop" | "restart" => Status::Pending, _ => Status::Unknown, } } /// Get service memory usage (if available) fn get_service_memory(&self, service: &str) -> Option { let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=MemoryCurrent") .output() .ok()?; let output_str = String::from_utf8(output.stdout).ok()?; for line in output_str.lines() { if line.starts_with("MemoryCurrent=") { let memory_str = line.trim_start_matches("MemoryCurrent="); if let Ok(memory_bytes) = memory_str.parse::() { return Some(memory_bytes as f32 / (1024.0 * 1024.0)); // Convert to MB } } } None } /// Get directory size in GB with permission-aware logging fn get_directory_size(&self, dir: &str) -> Option { let output = Command::new("sudo").arg("du").arg("-sb").arg(dir).output().ok()?; if !output.status.success() { // Log permission errors for debugging but don't spam logs let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Permission denied") { debug!("Permission denied accessing directory: {}", dir); } else { debug!("Failed to get size for directory {}: {}", dir, stderr); } return None; } let output_str = String::from_utf8(output.stdout).ok()?; let size_str = output_str.split_whitespace().next()?; if let Ok(size_bytes) = size_str.parse::() { let size_gb = size_bytes as f32 / (1024.0 * 1024.0 * 1024.0); // Return size even if very small (minimum 0.001 GB = 1MB for visibility) if size_gb > 0.0 { Some(size_gb.max(0.001)) } else { None } } else { None } } /// Get service disk usage - simplified and configuration-driven fn get_service_disk_usage(&self, service: &str) -> Option { // 1. Check if service has configured directories (exact match only) if let Some(dirs) = self.config.service_directories.get(service) { // Service has configured paths - use the first accessible one for dir in dirs { if let Some(size) = self.get_directory_size(dir) { return Some(size); } } // If configured paths failed, return None (shows as 0) return Some(0.0); } // 2. No configured path - use systemctl WorkingDirectory let output = Command::new("systemctl") .arg("show") .arg(format!("{}.service", service)) .arg("--property=WorkingDirectory") .output() .ok()?; let output_str = String::from_utf8(output.stdout).ok()?; for line in output_str.lines() { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { let dir = line.trim_start_matches("WorkingDirectory="); if !dir.is_empty() && dir != "/" { return self.get_directory_size(dir); } } } None } } #[async_trait] impl Collector for SystemdCollector { fn name(&self) -> &str { "systemd" } async fn collect(&self, _status_tracker: &mut StatusTracker) -> Result, CollectorError> { let start_time = Instant::now(); debug!("Collecting systemd services metrics"); let mut metrics = Vec::new(); // Get cached services (discovery only happens when needed) let monitored_services = match self.get_monitored_services() { Ok(services) => services, Err(e) => { debug!("Failed to get monitored services: {}", e); return Ok(metrics); } }; // Collect individual metrics for each monitored service (status, memory, disk only) for service in &monitored_services { match self.get_service_status(service) { Ok((active_status, _detailed_info)) => { let status = self.calculate_service_status(&active_status); // Individual service status metric metrics.push(Metric { name: format!("service_{}_status", service), value: MetricValue::String(active_status.clone()), unit: None, description: Some(format!("Service {} status", service)), status, timestamp: chrono::Utc::now().timestamp() as u64, }); // Service memory usage (if available) if let Some(memory_mb) = self.get_service_memory(service) { metrics.push(Metric { name: format!("service_{}_memory_mb", service), value: MetricValue::Float(memory_mb), unit: Some("MB".to_string()), description: Some(format!("Service {} memory usage", service)), status: Status::Ok, timestamp: chrono::Utc::now().timestamp() as u64, }); } // Service disk usage (comprehensive detection) if let Some(disk_gb) = self.get_service_disk_usage(service) { metrics.push(Metric { name: format!("service_{}_disk_gb", service), value: MetricValue::Float(disk_gb), unit: Some("GB".to_string()), description: Some(format!("Service {} disk usage", service)), status: Status::Ok, timestamp: chrono::Utc::now().timestamp() as u64, }); } // Sub-service metrics for specific services if service.contains("nginx") && active_status == "active" { metrics.extend(self.get_nginx_site_metrics()); } if service.contains("docker") && active_status == "active" { metrics.extend(self.get_docker_containers()); } } Err(e) => { debug!("Failed to get status for service {}: {}", service, e); } } } let collection_time = start_time.elapsed(); debug!( "Systemd collection completed in {:?} with {} individual service metrics", collection_time, metrics.len() ); Ok(metrics) } } impl SystemdCollector { /// Get nginx sites with latency checks fn get_nginx_sites(&self) -> Vec { let mut metrics = Vec::new(); let timestamp = chrono::Utc::now().timestamp() as u64; // Discover nginx sites from configuration let sites = self.discover_nginx_sites(); for (site_name, url) in &sites { match self.check_site_latency(url) { Ok(latency_ms) => { let status = if latency_ms < 500.0 { Status::Ok } else if latency_ms < 2000.0 { Status::Warning } else { Status::Critical }; metrics.push(Metric { name: format!("service_nginx_{}_latency_ms", site_name), value: MetricValue::Float(latency_ms), unit: Some("ms".to_string()), description: Some(format!("Response time for {}", url)), status, timestamp, }); } Err(_) => { // Site is unreachable metrics.push(Metric { name: format!("service_nginx_{}_latency_ms", site_name), value: MetricValue::Float(-1.0), // Use -1 to indicate error unit: Some("ms".to_string()), description: Some(format!("Response time for {} (unreachable)", url)), status: Status::Critical, timestamp, }); } } } metrics } /// Get docker containers as sub-services fn get_docker_containers(&self) -> Vec { let mut metrics = Vec::new(); let timestamp = chrono::Utc::now().timestamp() as u64; // Check if docker is available let output = Command::new("docker") .arg("ps") .arg("--format") .arg("{{.Names}},{{.Status}}") .output(); let output = match output { Ok(out) if out.status.success() => out, _ => return metrics, // Docker not available or failed }; let output_str = match String::from_utf8(output.stdout) { Ok(s) => s, Err(_) => return metrics, }; for line in output_str.lines() { if line.trim().is_empty() { continue; } let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let container_name = parts[0].trim(); let status_str = parts[1].trim(); let status = if status_str.contains("Up") { Status::Ok } else if status_str.contains("Exited") { Status::Warning } else { Status::Critical }; metrics.push(Metric { name: format!("service_docker_{}_status", container_name), value: MetricValue::String(status_str.to_string()), unit: None, description: Some(format!("Docker container {} status", container_name)), status, timestamp, }); } } metrics } /// Check site latency using HTTP GET requests fn check_site_latency(&self, url: &str) -> Result> { use std::time::Duration; use std::time::Instant; let start = Instant::now(); // Create HTTP client with timeouts (similar to legacy implementation) let client = reqwest::blocking::Client::builder() .timeout(Duration::from_secs(10)) .connect_timeout(Duration::from_secs(10)) .redirect(reqwest::redirect::Policy::limited(10)) .build()?; // Make GET request and measure latency let response = client.get(url).send()?; let latency = start.elapsed().as_millis() as f32; // Check if response is successful (2xx or 3xx status codes) if response.status().is_success() || response.status().is_redirection() { Ok(latency) } else { Err(format!( "HTTP request failed for {} with status: {}", url, response.status() ) .into()) } } /// Discover nginx sites from configuration files (like the old working implementation) fn discover_nginx_sites(&self) -> Vec<(String, String)> { use tracing::debug; // Use the same approach as the old working agent: get nginx config from systemd let config_content = match self.get_nginx_config_from_systemd() { Some(content) => content, None => { debug!("Could not get nginx config from systemd, trying nginx -T fallback"); match self.get_nginx_config_via_command() { Some(content) => content, None => { debug!("Could not get nginx config via any method"); return Vec::new(); } } } }; // Parse the config content to extract sites self.parse_nginx_config_for_sites(&config_content) } /// Get nginx config from systemd service definition (NixOS compatible) fn get_nginx_config_from_systemd(&self) -> Option { use tracing::debug; let output = std::process::Command::new("systemctl") .args(["show", "nginx", "--property=ExecStart", "--no-pager"]) .output() .ok()?; if !output.status.success() { debug!("Failed to get nginx ExecStart from systemd"); return None; } let stdout = String::from_utf8_lossy(&output.stdout); debug!("systemctl show nginx output: {}", stdout); // Parse ExecStart to extract -c config path for line in stdout.lines() { if line.starts_with("ExecStart=") { debug!("Found ExecStart line: {}", line); // Handle both traditional and NixOS systemd formats if let Some(config_path) = self.extract_config_path_from_exec_start(line) { debug!("Extracted config path: {}", config_path); // Read the config file return std::fs::read_to_string(&config_path) .map_err(|e| debug!("Failed to read config file {}: {}", config_path, e)) .ok(); } } } None } /// Extract config path from ExecStart line fn extract_config_path_from_exec_start(&self, exec_start: &str) -> Option { use tracing::debug; // Remove ExecStart= prefix let exec_part = exec_start.strip_prefix("ExecStart=")?; debug!("Parsing exec part: {}", exec_part); // Handle NixOS format: ExecStart={ path=...; argv[]=...nginx -c /config; ... } if exec_part.contains("argv[]=") { // Extract the part after argv[]= let argv_start = exec_part.find("argv[]=")?; let argv_part = &exec_part[argv_start + 7..]; // Skip "argv[]=" debug!("Found NixOS argv part: {}", argv_part); // Look for -c flag followed by config path if let Some(c_pos) = argv_part.find(" -c ") { let after_c = &argv_part[c_pos + 4..]; // Find the config path (until next space or semicolon) let config_path = after_c.split([' ', ';']).next()?; return Some(config_path.to_string()); } } else { // Handle traditional format: ExecStart=/path/nginx -c /config debug!("Parsing traditional format"); if let Some(c_pos) = exec_part.find(" -c ") { let after_c = &exec_part[c_pos + 4..]; let config_path = after_c.split_whitespace().next()?; return Some(config_path.to_string()); } } None } /// Fallback: get nginx config via nginx -T command fn get_nginx_config_via_command(&self) -> Option { use tracing::debug; let output = std::process::Command::new("nginx") .args(["-T"]) .output() .ok()?; if !output.status.success() { debug!("nginx -T failed"); return None; } Some(String::from_utf8_lossy(&output.stdout).to_string()) } /// Parse nginx config content to extract server names and build site list fn parse_nginx_config_for_sites(&self, config_content: &str) -> Vec<(String, String)> { use tracing::debug; let mut sites = Vec::new(); let lines: Vec<&str> = config_content.lines().collect(); let mut i = 0; debug!("Parsing nginx config with {} lines", lines.len()); while i < lines.len() { let line = lines[i].trim(); if line.starts_with("server") && line.contains("{") { if let Some(server_name) = self.parse_server_block(&lines, &mut i) { let url = format!("https://{}", server_name); sites.push((server_name.clone(), url)); } } i += 1; } debug!("Discovered {} nginx sites total", sites.len()); sites } /// Parse a server block to extract the primary server_name fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option { use tracing::debug; let mut server_names = Vec::new(); let mut has_redirect = false; let mut i = *start_index + 1; let mut brace_count = 1; // Parse until we close the server block while i < lines.len() && brace_count > 0 { let trimmed = lines[i].trim(); // Track braces brace_count += trimmed.matches('{').count(); brace_count -= trimmed.matches('}').count(); // Extract server_name if trimmed.starts_with("server_name") { if let Some(names_part) = trimmed.strip_prefix("server_name") { let names_clean = names_part.trim().trim_end_matches(';'); for name in names_clean.split_whitespace() { if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') { server_names.push(name.to_string()); debug!("Found server_name in block: {}", name); } } } } // Check for redirects (skip redirect-only servers) if trimmed.contains("return") && (trimmed.contains("301") || trimmed.contains("302")) { has_redirect = true; } i += 1; } *start_index = i - 1; if !server_names.is_empty() && !has_redirect { return Some(server_names[0].clone()); } None } }