Add comprehensive timeouts to all blocking system commands
Fixes random host disconnections caused by blocking operations preventing timely ZMQ packet transmission. Changes: - Add run_command_with_timeout() wrapper using tokio for async command execution - Apply 10s timeout to smartctl (prevents 30+ second hangs on failing drives) - Apply 5s timeout to du, lsblk, systemctl list commands - Apply 3s timeout to systemctl show/is-active, df, ip commands - Apply 2s timeout to hostname command - Use system 'timeout' command for sync operations where async not needed Critical fixes: - smartctl: Failing drives could block for 30+ seconds per drive - du: Large directories (Docker, PostgreSQL) could block 10-30+ seconds - systemctl/docker: Commands could block indefinitely during system issues With 1-second collection interval and 10-second heartbeat timeout, any blocking operation >10s causes false "host offline" alerts. These timeouts ensure collection completes quickly even during system degradation.
This commit is contained in:
parent
9a2df906ea
commit
1e0510be81
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.185"
|
version = "0.1.186"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
@ -301,7 +301,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.185"
|
version = "0.1.186"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@ -324,7 +324,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.185"
|
version = "0.1.186"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@ -112,9 +112,12 @@ impl DiskCollector {
|
|||||||
|
|
||||||
/// Get block devices and their mount points using lsblk
|
/// Get block devices and their mount points using lsblk
|
||||||
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
|
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
|
||||||
let output = Command::new("lsblk")
|
use super::run_command_with_timeout;
|
||||||
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
|
|
||||||
.output()
|
let mut cmd = Command::new("lsblk");
|
||||||
|
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
|
||||||
|
|
||||||
|
let output = run_command_with_timeout(cmd, 5).await
|
||||||
.map_err(|e| CollectorError::SystemRead {
|
.map_err(|e| CollectorError::SystemRead {
|
||||||
path: "block devices".to_string(),
|
path: "block devices".to_string(),
|
||||||
error: e.to_string(),
|
error: e.to_string(),
|
||||||
@ -186,8 +189,8 @@ impl DiskCollector {
|
|||||||
|
|
||||||
/// Get filesystem info for a single mount point
|
/// Get filesystem info for a single mount point
|
||||||
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
|
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
|
||||||
let output = Command::new("df")
|
let output = std::process::Command::new("timeout")
|
||||||
.args(&["--block-size=1", mount_point])
|
.args(&["2", "df", "--block-size=1", mount_point])
|
||||||
.output()
|
.output()
|
||||||
.map_err(|e| CollectorError::SystemRead {
|
.map_err(|e| CollectorError::SystemRead {
|
||||||
path: format!("df {}", mount_point),
|
path: format!("df {}", mount_point),
|
||||||
@ -413,6 +416,8 @@ impl DiskCollector {
|
|||||||
|
|
||||||
/// Get SMART data for a single drive
|
/// Get SMART data for a single drive
|
||||||
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
|
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
|
||||||
|
use super::run_command_with_timeout;
|
||||||
|
|
||||||
// Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
|
// Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
|
||||||
// For NVMe drives, specify device type explicitly
|
// For NVMe drives, specify device type explicitly
|
||||||
let mut cmd = Command::new("smartctl");
|
let mut cmd = Command::new("smartctl");
|
||||||
@ -422,7 +427,7 @@ impl DiskCollector {
|
|||||||
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
|
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
let output = cmd.output()
|
let output = run_command_with_timeout(cmd, 10).await
|
||||||
.map_err(|e| CollectorError::SystemRead {
|
.map_err(|e| CollectorError::SystemRead {
|
||||||
path: format!("SMART data for {}", drive_name),
|
path: format!("SMART data for {}", drive_name),
|
||||||
error: e.to_string(),
|
error: e.to_string(),
|
||||||
@ -757,9 +762,9 @@ impl DiskCollector {
|
|||||||
|
|
||||||
/// Get drive information for a mount path
|
/// Get drive information for a mount path
|
||||||
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
|
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
|
||||||
// Use lsblk to find the backing device
|
// Use lsblk to find the backing device with timeout
|
||||||
let output = Command::new("lsblk")
|
let output = Command::new("timeout")
|
||||||
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
|
.args(&["5", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"])
|
||||||
.output()
|
.output()
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;
|
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;
|
||||||
|
|
||||||
|
|||||||
@ -105,12 +105,12 @@ impl MemoryCollector {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get usage data for all tmpfs mounts at once using df
|
// Get usage data for all tmpfs mounts at once using df (with 3 second timeout)
|
||||||
let mut df_args = vec!["df", "--output=target,size,used", "--block-size=1"];
|
let mut df_args = vec!["3", "df", "--output=target,size,used", "--block-size=1"];
|
||||||
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
|
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
|
||||||
|
|
||||||
let df_output = std::process::Command::new(df_args[0])
|
let df_output = std::process::Command::new("timeout")
|
||||||
.args(&df_args[1..])
|
.args(&df_args[..])
|
||||||
.output()
|
.output()
|
||||||
.map_err(|e| CollectorError::SystemRead {
|
.map_err(|e| CollectorError::SystemRead {
|
||||||
path: "tmpfs mounts".to_string(),
|
path: "tmpfs mounts".to_string(),
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use cm_dashboard_shared::{AgentData};
|
use cm_dashboard_shared::{AgentData};
|
||||||
|
use std::process::{Command, Output};
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
pub mod backup;
|
pub mod backup;
|
||||||
pub mod cpu;
|
pub mod cpu;
|
||||||
@ -13,6 +15,20 @@ pub mod systemd;
|
|||||||
|
|
||||||
pub use error::CollectorError;
|
pub use error::CollectorError;
|
||||||
|
|
||||||
|
/// Run a command with a timeout to prevent blocking
|
||||||
|
pub async fn run_command_with_timeout(mut cmd: Command, timeout_secs: u64) -> std::io::Result<Output> {
|
||||||
|
let timeout_duration = Duration::from_secs(timeout_secs);
|
||||||
|
|
||||||
|
match timeout(timeout_duration, tokio::task::spawn_blocking(move || cmd.output())).await {
|
||||||
|
Ok(Ok(result)) => result,
|
||||||
|
Ok(Err(e)) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
|
||||||
|
Err(_) => Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::TimedOut,
|
||||||
|
format!("Command timed out after {} seconds", timeout_secs)
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Base trait for all collectors with direct structured data output
|
/// Base trait for all collectors with direct structured data output
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@ -51,7 +51,7 @@ impl NetworkCollector {
|
|||||||
|
|
||||||
/// Get the primary physical interface (the one with default route)
|
/// Get the primary physical interface (the one with default route)
|
||||||
fn get_primary_physical_interface() -> Option<String> {
|
fn get_primary_physical_interface() -> Option<String> {
|
||||||
match Command::new("ip").args(["route", "show", "default"]).output() {
|
match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() {
|
||||||
Ok(output) if output.status.success() => {
|
Ok(output) if output.status.success() => {
|
||||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||||
// Parse: "default via 192.168.1.1 dev eno1 ..."
|
// Parse: "default via 192.168.1.1 dev eno1 ..."
|
||||||
@ -110,7 +110,7 @@ impl NetworkCollector {
|
|||||||
// Parse VLAN configuration
|
// Parse VLAN configuration
|
||||||
let vlan_map = Self::parse_vlan_config();
|
let vlan_map = Self::parse_vlan_config();
|
||||||
|
|
||||||
match Command::new("ip").args(["-j", "addr"]).output() {
|
match Command::new("timeout").args(["3", "ip", "-j", "addr"]).output() {
|
||||||
Ok(output) if output.status.success() => {
|
Ok(output) if output.status.success() => {
|
||||||
let json_str = String::from_utf8_lossy(&output.stdout);
|
let json_str = String::from_utf8_lossy(&output.stdout);
|
||||||
|
|
||||||
|
|||||||
@ -43,8 +43,8 @@ impl NixOSCollector {
|
|||||||
match fs::read_to_string("/etc/hostname") {
|
match fs::read_to_string("/etc/hostname") {
|
||||||
Ok(hostname) => Some(hostname.trim().to_string()),
|
Ok(hostname) => Some(hostname.trim().to_string()),
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
// Fallback to hostname command
|
// Fallback to hostname command (with 2 second timeout)
|
||||||
match Command::new("hostname").output() {
|
match Command::new("timeout").args(["2", "hostname"]).output() {
|
||||||
Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
|
Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr
|
|||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tracing::debug;
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
use super::{Collector, CollectorError};
|
use super::{Collector, CollectorError};
|
||||||
use crate::config::SystemdConfig;
|
use crate::config::SystemdConfig;
|
||||||
@ -251,18 +251,18 @@ impl SystemdCollector {
|
|||||||
|
|
||||||
/// Auto-discover interesting services to monitor
|
/// Auto-discover interesting services to monitor
|
||||||
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
|
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
|
||||||
// First: Get all service unit files
|
// First: Get all service unit files (with 5 second timeout)
|
||||||
let unit_files_output = Command::new("systemctl")
|
let unit_files_output = Command::new("timeout")
|
||||||
.args(&["list-unit-files", "--type=service", "--no-pager", "--plain"])
|
.args(&["5", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
|
||||||
.output()?;
|
.output()?;
|
||||||
|
|
||||||
if !unit_files_output.status.success() {
|
if !unit_files_output.status.success() {
|
||||||
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
|
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Second: Get runtime status of all units
|
// Second: Get runtime status of all units (with 5 second timeout)
|
||||||
let units_status_output = Command::new("systemctl")
|
let units_status_output = Command::new("timeout")
|
||||||
.args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
.args(&["5", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
||||||
.output()?;
|
.output()?;
|
||||||
|
|
||||||
if !units_status_output.status.success() {
|
if !units_status_output.status.success() {
|
||||||
@ -358,16 +358,16 @@ impl SystemdCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback to systemctl if not in cache
|
// Fallback to systemctl if not in cache (with 3 second timeout)
|
||||||
let output = Command::new("systemctl")
|
let output = Command::new("timeout")
|
||||||
.args(&["is-active", &format!("{}.service", service)])
|
.args(&["3", "systemctl", "is-active", &format!("{}.service", service)])
|
||||||
.output()?;
|
.output()?;
|
||||||
|
|
||||||
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
|
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
|
||||||
|
|
||||||
// Get more detailed info
|
// Get more detailed info (with 3 second timeout)
|
||||||
let output = Command::new("systemctl")
|
let output = Command::new("timeout")
|
||||||
.args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
|
.args(&["3", "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
|
||||||
.output()?;
|
.output()?;
|
||||||
|
|
||||||
let detailed_info = String::from_utf8(output.stdout)?;
|
let detailed_info = String::from_utf8(output.stdout)?;
|
||||||
@ -419,7 +419,7 @@ impl SystemdCollector {
|
|||||||
if let Some(dirs) = self.config.service_directories.get(service_name) {
|
if let Some(dirs) = self.config.service_directories.get(service_name) {
|
||||||
// Service has configured paths - use the first accessible one
|
// Service has configured paths - use the first accessible one
|
||||||
for dir in dirs {
|
for dir in dirs {
|
||||||
if let Some(size) = self.get_directory_size(dir) {
|
if let Some(size) = self.get_directory_size(dir).await {
|
||||||
return Ok(size);
|
return Ok(size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -427,9 +427,9 @@ impl SystemdCollector {
|
|||||||
return Ok(0.0);
|
return Ok(0.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// No configured path - try to get WorkingDirectory from systemctl
|
// No configured path - try to get WorkingDirectory from systemctl (with 3 second timeout)
|
||||||
let output = Command::new("systemctl")
|
let output = Command::new("timeout")
|
||||||
.args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
|
.args(&["3", "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
|
||||||
.output()
|
.output()
|
||||||
.map_err(|e| CollectorError::SystemRead {
|
.map_err(|e| CollectorError::SystemRead {
|
||||||
path: format!("WorkingDirectory for {}", service_name),
|
path: format!("WorkingDirectory for {}", service_name),
|
||||||
@ -441,7 +441,7 @@ impl SystemdCollector {
|
|||||||
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
|
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
|
||||||
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
|
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
|
||||||
if !dir.is_empty() && dir != "/" {
|
if !dir.is_empty() && dir != "/" {
|
||||||
return Ok(self.get_directory_size(dir).unwrap_or(0.0));
|
return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -449,18 +449,23 @@ impl SystemdCollector {
|
|||||||
Ok(0.0)
|
Ok(0.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get size of a directory in GB
|
/// Get size of a directory in GB (with 5 second timeout)
|
||||||
fn get_directory_size(&self, path: &str) -> Option<f32> {
|
async fn get_directory_size(&self, path: &str) -> Option<f32> {
|
||||||
let output = Command::new("sudo")
|
use super::run_command_with_timeout;
|
||||||
.args(&["du", "-sb", path])
|
|
||||||
.output()
|
// Use -s (summary) and --apparent-size for speed, 5 second timeout
|
||||||
.ok()?;
|
let mut cmd = Command::new("sudo");
|
||||||
|
cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]);
|
||||||
|
|
||||||
|
let output = run_command_with_timeout(cmd, 5).await.ok()?;
|
||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
// Log permission errors for debugging but don't spam logs
|
// Log permission errors for debugging but don't spam logs
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
if stderr.contains("Permission denied") {
|
if stderr.contains("Permission denied") {
|
||||||
debug!("Permission denied accessing directory: {}", path);
|
debug!("Permission denied accessing directory: {}", path);
|
||||||
|
} else if stderr.contains("timed out") {
|
||||||
|
warn!("Directory size check timed out for {}", path);
|
||||||
} else {
|
} else {
|
||||||
debug!("Failed to get size for directory {}: {}", path, stderr);
|
debug!("Failed to get size for directory {}: {}", path, stderr);
|
||||||
}
|
}
|
||||||
@ -778,9 +783,9 @@ impl SystemdCollector {
|
|||||||
let mut containers = Vec::new();
|
let mut containers = Vec::new();
|
||||||
|
|
||||||
// Check if docker is available (cm-agent user is in docker group)
|
// Check if docker is available (cm-agent user is in docker group)
|
||||||
// Use -a to show ALL containers (running and stopped)
|
// Use -a to show ALL containers (running and stopped) with 5 second timeout
|
||||||
let output = Command::new("docker")
|
let output = Command::new("timeout")
|
||||||
.args(&["ps", "-a", "--format", "{{.Names}},{{.Status}}"])
|
.args(&["5", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
|
||||||
.output();
|
.output();
|
||||||
|
|
||||||
let output = match output {
|
let output = match output {
|
||||||
@ -821,9 +826,9 @@ impl SystemdCollector {
|
|||||||
/// Get docker images as sub-services
|
/// Get docker images as sub-services
|
||||||
fn get_docker_images(&self) -> Vec<(String, String, String, f32)> {
|
fn get_docker_images(&self) -> Vec<(String, String, String, f32)> {
|
||||||
let mut images = Vec::new();
|
let mut images = Vec::new();
|
||||||
// Check if docker is available (cm-agent user is in docker group)
|
// Check if docker is available (cm-agent user is in docker group) with 5 second timeout
|
||||||
let output = Command::new("docker")
|
let output = Command::new("timeout")
|
||||||
.args(&["images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
|
.args(&["5", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
|
||||||
.output();
|
.output();
|
||||||
|
|
||||||
let output = match output {
|
let output = match output {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user