From 1e0510be81511737ef067733de5dda8e5fbe9cb7 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Thu, 27 Nov 2025 16:34:08 +0100 Subject: [PATCH] Add comprehensive timeouts to all blocking system commands Fixes random host disconnections caused by blocking operations preventing timely ZMQ packet transmission. Changes: - Add run_command_with_timeout() wrapper using tokio for async command execution - Apply 10s timeout to smartctl (prevents 30+ second hangs on failing drives) - Apply 5s timeout to du, lsblk, systemctl list commands - Apply 3s timeout to systemctl show/is-active, df, ip commands - Apply 2s timeout to hostname command - Use system 'timeout' command for sync operations where async not needed Critical fixes: - smartctl: Failing drives could block for 30+ seconds per drive - du: Large directories (Docker, PostgreSQL) could block 10-30+ seconds - systemctl/docker: Commands could block indefinitely during system issues With 1-second collection interval and 10-second heartbeat timeout, any blocking operation >10s causes false "host offline" alerts. These timeouts ensure collection completes quickly even during system degradation. --- Cargo.lock | 6 +-- agent/src/collectors/disk.rs | 23 +++++++----- agent/src/collectors/memory.rs | 8 ++-- agent/src/collectors/mod.rs | 18 ++++++++- agent/src/collectors/network.rs | 4 +- agent/src/collectors/nixos.rs | 4 +- agent/src/collectors/systemd.rs | 65 ++++++++++++++++++--------------- 7 files changed, 77 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4034911..30876be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.185" +version = "0.1.186" dependencies = [ "anyhow", "chrono", @@ -301,7 +301,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.185" +version = "0.1.186" dependencies = [ "anyhow", "async-trait", @@ -324,7 +324,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.185" +version = "0.1.186" dependencies = [ "chrono", "serde", diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index 88c0459..3167f4e 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -112,9 +112,12 @@ impl DiskCollector { /// Get block devices and their mount points using lsblk async fn get_mount_devices(&self) -> Result, CollectorError> { - let output = Command::new("lsblk") - .args(&["-rn", "-o", "NAME,MOUNTPOINT"]) - .output() + use super::run_command_with_timeout; + + let mut cmd = Command::new("lsblk"); + cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]); + + let output = run_command_with_timeout(cmd, 5).await .map_err(|e| CollectorError::SystemRead { path: "block devices".to_string(), error: e.to_string(), @@ -186,8 +189,8 @@ impl DiskCollector { /// Get filesystem info for a single mount point fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> { - let output = Command::new("df") - .args(&["--block-size=1", mount_point]) + let output = std::process::Command::new("timeout") + .args(&["2", "df", "--block-size=1", mount_point]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("df {}", mount_point), @@ -413,6 +416,8 @@ impl DiskCollector { /// Get SMART data for a single drive async fn get_smart_data(&self, drive_name: &str) -> Result { + use super::run_command_with_timeout; + // Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities // For NVMe drives, specify device type explicitly let mut cmd = Command::new("smartctl"); @@ -422,7 +427,7 @@ impl DiskCollector { cmd.args(&["-a", &format!("/dev/{}", drive_name)]); } - let output = cmd.output() + let output = run_command_with_timeout(cmd, 10).await .map_err(|e| CollectorError::SystemRead { path: format!("SMART data for {}", drive_name), error: e.to_string(), @@ -757,9 +762,9 @@ impl DiskCollector { /// Get drive information for a mount path fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result { - // Use lsblk to find the backing device - let output = Command::new("lsblk") - .args(&["-rn", "-o", "NAME,MOUNTPOINT"]) + // Use lsblk to find the backing device with timeout + let output = Command::new("timeout") + .args(&["5", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"]) .output() .map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?; diff --git a/agent/src/collectors/memory.rs b/agent/src/collectors/memory.rs index 61d0d87..2f3e86c 100644 --- a/agent/src/collectors/memory.rs +++ b/agent/src/collectors/memory.rs @@ -105,12 +105,12 @@ impl MemoryCollector { return Ok(()); } - // Get usage data for all tmpfs mounts at once using df - let mut df_args = vec!["df", "--output=target,size,used", "--block-size=1"]; + // Get usage data for all tmpfs mounts at once using df (with 3 second timeout) + let mut df_args = vec!["3", "df", "--output=target,size,used", "--block-size=1"]; df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str())); - let df_output = std::process::Command::new(df_args[0]) - .args(&df_args[1..]) + let df_output = std::process::Command::new("timeout") + .args(&df_args[..]) .output() .map_err(|e| CollectorError::SystemRead { path: "tmpfs mounts".to_string(), diff --git a/agent/src/collectors/mod.rs b/agent/src/collectors/mod.rs index b969f7f..db338f3 100644 --- a/agent/src/collectors/mod.rs +++ b/agent/src/collectors/mod.rs @@ -1,6 +1,8 @@ use async_trait::async_trait; use cm_dashboard_shared::{AgentData}; - +use std::process::{Command, Output}; +use std::time::Duration; +use tokio::time::timeout; pub mod backup; pub mod cpu; @@ -13,6 +15,20 @@ pub mod systemd; pub use error::CollectorError; +/// Run a command with a timeout to prevent blocking +pub async fn run_command_with_timeout(mut cmd: Command, timeout_secs: u64) -> std::io::Result { + let timeout_duration = Duration::from_secs(timeout_secs); + + match timeout(timeout_duration, tokio::task::spawn_blocking(move || cmd.output())).await { + Ok(Ok(result)) => result, + Ok(Err(e)) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("Command timed out after {} seconds", timeout_secs) + )), + } +} + /// Base trait for all collectors with direct structured data output #[async_trait] diff --git a/agent/src/collectors/network.rs b/agent/src/collectors/network.rs index dc8d8a6..5286143 100644 --- a/agent/src/collectors/network.rs +++ b/agent/src/collectors/network.rs @@ -51,7 +51,7 @@ impl NetworkCollector { /// Get the primary physical interface (the one with default route) fn get_primary_physical_interface() -> Option { - match Command::new("ip").args(["route", "show", "default"]).output() { + match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() { Ok(output) if output.status.success() => { let output_str = String::from_utf8_lossy(&output.stdout); // Parse: "default via 192.168.1.1 dev eno1 ..." @@ -110,7 +110,7 @@ impl NetworkCollector { // Parse VLAN configuration let vlan_map = Self::parse_vlan_config(); - match Command::new("ip").args(["-j", "addr"]).output() { + match Command::new("timeout").args(["3", "ip", "-j", "addr"]).output() { Ok(output) if output.status.success() => { let json_str = String::from_utf8_lossy(&output.stdout); diff --git a/agent/src/collectors/nixos.rs b/agent/src/collectors/nixos.rs index ec9d246..c56ce5c 100644 --- a/agent/src/collectors/nixos.rs +++ b/agent/src/collectors/nixos.rs @@ -43,8 +43,8 @@ impl NixOSCollector { match fs::read_to_string("/etc/hostname") { Ok(hostname) => Some(hostname.trim().to_string()), Err(_) => { - // Fallback to hostname command - match Command::new("hostname").output() { + // Fallback to hostname command (with 2 second timeout) + match Command::new("timeout").args(["2", "hostname"]).output() { Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()), Err(_) => None, } diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index 4309bad..4e5a629 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr use std::process::Command; use std::sync::RwLock; use std::time::Instant; -use tracing::debug; +use tracing::{debug, warn}; use super::{Collector, CollectorError}; use crate::config::SystemdConfig; @@ -251,18 +251,18 @@ impl SystemdCollector { /// Auto-discover interesting services to monitor fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { - // First: Get all service unit files - let unit_files_output = Command::new("systemctl") - .args(&["list-unit-files", "--type=service", "--no-pager", "--plain"]) + // First: Get all service unit files (with 5 second timeout) + let unit_files_output = Command::new("timeout") + .args(&["5", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"]) .output()?; if !unit_files_output.status.success() { return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); } - // Second: Get runtime status of all units - let units_status_output = Command::new("systemctl") - .args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"]) + // Second: Get runtime status of all units (with 5 second timeout) + let units_status_output = Command::new("timeout") + .args(&["5", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"]) .output()?; if !units_status_output.status.success() { @@ -358,16 +358,16 @@ impl SystemdCollector { } } - // Fallback to systemctl if not in cache - let output = Command::new("systemctl") - .args(&["is-active", &format!("{}.service", service)]) + // Fallback to systemctl if not in cache (with 3 second timeout) + let output = Command::new("timeout") + .args(&["3", "systemctl", "is-active", &format!("{}.service", service)]) .output()?; let active_status = String::from_utf8(output.stdout)?.trim().to_string(); - // Get more detailed info - let output = Command::new("systemctl") - .args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) + // Get more detailed info (with 3 second timeout) + let output = Command::new("timeout") + .args(&["3", "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) .output()?; let detailed_info = String::from_utf8(output.stdout)?; @@ -419,7 +419,7 @@ impl SystemdCollector { if let Some(dirs) = self.config.service_directories.get(service_name) { // Service has configured paths - use the first accessible one for dir in dirs { - if let Some(size) = self.get_directory_size(dir) { + if let Some(size) = self.get_directory_size(dir).await { return Ok(size); } } @@ -427,9 +427,9 @@ impl SystemdCollector { return Ok(0.0); } - // No configured path - try to get WorkingDirectory from systemctl - let output = Command::new("systemctl") - .args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) + // No configured path - try to get WorkingDirectory from systemctl (with 3 second timeout) + let output = Command::new("timeout") + .args(&["3", "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("WorkingDirectory for {}", service_name), @@ -441,7 +441,7 @@ impl SystemdCollector { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { let dir = line.strip_prefix("WorkingDirectory=").unwrap_or(""); if !dir.is_empty() && dir != "/" { - return Ok(self.get_directory_size(dir).unwrap_or(0.0)); + return Ok(self.get_directory_size(dir).await.unwrap_or(0.0)); } } } @@ -449,18 +449,23 @@ impl SystemdCollector { Ok(0.0) } - /// Get size of a directory in GB - fn get_directory_size(&self, path: &str) -> Option { - let output = Command::new("sudo") - .args(&["du", "-sb", path]) - .output() - .ok()?; + /// Get size of a directory in GB (with 5 second timeout) + async fn get_directory_size(&self, path: &str) -> Option { + use super::run_command_with_timeout; + + // Use -s (summary) and --apparent-size for speed, 5 second timeout + let mut cmd = Command::new("sudo"); + cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]); + + let output = run_command_with_timeout(cmd, 5).await.ok()?; if !output.status.success() { // Log permission errors for debugging but don't spam logs let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Permission denied") { debug!("Permission denied accessing directory: {}", path); + } else if stderr.contains("timed out") { + warn!("Directory size check timed out for {}", path); } else { debug!("Failed to get size for directory {}: {}", path, stderr); } @@ -778,9 +783,9 @@ impl SystemdCollector { let mut containers = Vec::new(); // Check if docker is available (cm-agent user is in docker group) - // Use -a to show ALL containers (running and stopped) - let output = Command::new("docker") - .args(&["ps", "-a", "--format", "{{.Names}},{{.Status}}"]) + // Use -a to show ALL containers (running and stopped) with 5 second timeout + let output = Command::new("timeout") + .args(&["5", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"]) .output(); let output = match output { @@ -821,9 +826,9 @@ impl SystemdCollector { /// Get docker images as sub-services fn get_docker_images(&self) -> Vec<(String, String, String, f32)> { let mut images = Vec::new(); - // Check if docker is available (cm-agent user is in docker group) - let output = Command::new("docker") - .args(&["images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) + // Check if docker is available (cm-agent user is in docker group) with 5 second timeout + let output = Command::new("timeout") + .args(&["5", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) .output(); let output = match output {