Compare commits

...

7 Commits

Author SHA1 Message Date
76c04633b5 Bump version to v0.1.187
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 16:34:42 +01:00
1e0510be81 Add comprehensive timeouts to all blocking system commands
Fixes random host disconnections caused by blocking operations preventing timely ZMQ packet transmission.

Changes:
- Add run_command_with_timeout() wrapper using tokio for async command execution
- Apply 10s timeout to smartctl (prevents 30+ second hangs on failing drives)
- Apply 5s timeout to du, lsblk, systemctl list commands
- Apply 3s timeout to systemctl show/is-active, df, ip commands
- Apply 2s timeout to hostname command
- Use system 'timeout' command for sync operations where async not needed

Critical fixes:
- smartctl: Failing drives could block for 30+ seconds per drive
- du: Large directories (Docker, PostgreSQL) could block 10-30+ seconds
- systemctl/docker: Commands could block indefinitely during system issues

With 1-second collection interval and 10-second heartbeat timeout, any blocking operation >10s causes false "host offline" alerts. These timeouts ensure collection completes quickly even during system degradation.
2025-11-27 16:34:08 +01:00
9a2df906ea Add ZMQ communication statistics tracking and display
All checks were successful
Build and Release / build-and-release (push) Successful in 1m10s
2025-11-27 16:14:45 +01:00
6d6beb207d Parse Docker image sizes to MB and sort services alphabetically
All checks were successful
Build and Release / build-and-release (push) Successful in 1m18s
2025-11-27 15:57:38 +01:00
7a68da01f5 Remove debug logging for NVMe SMART collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 15:40:16 +01:00
5be67fed64 Add debug logging for NVMe SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 15:00:48 +01:00
cac836601b Add NVMe device type flag for SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 13:34:30 +01:00
14 changed files with 198 additions and 87 deletions

6
Cargo.lock generated
View File

@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]] [[package]]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.180" version = "0.1.186"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@@ -301,7 +301,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.180" version = "0.1.186"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -324,7 +324,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.180" version = "0.1.186"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.181" version = "0.1.187"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -112,9 +112,12 @@ impl DiskCollector {
/// Get block devices and their mount points using lsblk /// Get block devices and their mount points using lsblk
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> { async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
let output = Command::new("lsblk") use super::run_command_with_timeout;
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
.output() let mut cmd = Command::new("lsblk");
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
let output = run_command_with_timeout(cmd, 5).await
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: "block devices".to_string(), path: "block devices".to_string(),
error: e.to_string(), error: e.to_string(),
@@ -186,8 +189,8 @@ impl DiskCollector {
/// Get filesystem info for a single mount point /// Get filesystem info for a single mount point
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> { fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
let output = Command::new("df") let output = std::process::Command::new("timeout")
.args(&["--block-size=1", mount_point]) .args(&["2", "df", "--block-size=1", mount_point])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("df {}", mount_point), path: format!("df {}", mount_point),
@@ -385,7 +388,6 @@ impl DiskCollector {
/// Get SMART data for drives /// Get SMART data for drives
async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> { async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> {
use tracing::info;
let mut smart_data = HashMap::new(); let mut smart_data = HashMap::new();
// Collect all drive names // Collect all drive names
@@ -402,48 +404,38 @@ impl DiskCollector {
} }
} }
info!("Collecting SMART data for {} drives", all_drives.len());
// Get SMART data for each drive // Get SMART data for each drive
for drive_name in &all_drives { for drive_name in all_drives {
match self.get_smart_data(drive_name).await { if let Ok(data) = self.get_smart_data(&drive_name).await {
Ok(data) => { smart_data.insert(drive_name, data);
info!("SMART data collected for {}: serial={:?}, temp={:?}, health={}",
drive_name, data.serial_number, data.temperature_celsius, data.health);
smart_data.insert(drive_name.clone(), data);
}
Err(e) => {
info!("Failed to get SMART data for {}: {:?}", drive_name, e);
}
} }
} }
info!("SMART data collection complete: {}/{} drives successful", smart_data.len(), all_drives.len());
smart_data smart_data
} }
/// Get SMART data for a single drive /// Get SMART data for a single drive
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> { async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
use tracing::info; use super::run_command_with_timeout;
// Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO capability // Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
let output = Command::new("smartctl") // For NVMe drives, specify device type explicitly
.args(&["-a", &format!("/dev/{}", drive_name)]) let mut cmd = Command::new("smartctl");
.output() if drive_name.starts_with("nvme") {
cmd.args(&["-d", "nvme", "-a", &format!("/dev/{}", drive_name)]);
} else {
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
}
let output = run_command_with_timeout(cmd, 10).await
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("SMART data for {}", drive_name), path: format!("SMART data for {}", drive_name),
error: e.to_string(), error: e.to_string(),
})?; })?;
let output_str = String::from_utf8_lossy(&output.stdout); let output_str = String::from_utf8_lossy(&output.stdout);
let error_str = String::from_utf8_lossy(&output.stderr);
// Debug logging for SMART command results
debug!("SMART output for {}: status={}, stdout_len={}, stderr={}",
drive_name, output.status, output_str.len(), error_str);
if !output.status.success() { if !output.status.success() {
info!("SMART command failed for {}, status={}, stderr={}", drive_name, output.status, error_str);
// Return unknown data rather than failing completely // Return unknown data rather than failing completely
return Ok(SmartData { return Ok(SmartData {
health: "UNKNOWN".to_string(), health: "UNKNOWN".to_string(),
@@ -770,9 +762,9 @@ impl DiskCollector {
/// Get drive information for a mount path /// Get drive information for a mount path
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> { fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
// Use lsblk to find the backing device // Use lsblk to find the backing device with timeout
let output = Command::new("lsblk") let output = Command::new("timeout")
.args(&["-rn", "-o", "NAME,MOUNTPOINT"]) .args(&["5", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"])
.output() .output()
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;

View File

@@ -105,12 +105,12 @@ impl MemoryCollector {
return Ok(()); return Ok(());
} }
// Get usage data for all tmpfs mounts at once using df // Get usage data for all tmpfs mounts at once using df (with 3 second timeout)
let mut df_args = vec!["df", "--output=target,size,used", "--block-size=1"]; let mut df_args = vec!["3", "df", "--output=target,size,used", "--block-size=1"];
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str())); df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
let df_output = std::process::Command::new(df_args[0]) let df_output = std::process::Command::new("timeout")
.args(&df_args[1..]) .args(&df_args[..])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: "tmpfs mounts".to_string(), path: "tmpfs mounts".to_string(),

View File

@@ -1,6 +1,8 @@
use async_trait::async_trait; use async_trait::async_trait;
use cm_dashboard_shared::{AgentData}; use cm_dashboard_shared::{AgentData};
use std::process::{Command, Output};
use std::time::Duration;
use tokio::time::timeout;
pub mod backup; pub mod backup;
pub mod cpu; pub mod cpu;
@@ -13,6 +15,20 @@ pub mod systemd;
pub use error::CollectorError; pub use error::CollectorError;
/// Run a command with a timeout to prevent blocking
pub async fn run_command_with_timeout(mut cmd: Command, timeout_secs: u64) -> std::io::Result<Output> {
let timeout_duration = Duration::from_secs(timeout_secs);
match timeout(timeout_duration, tokio::task::spawn_blocking(move || cmd.output())).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Command timed out after {} seconds", timeout_secs)
)),
}
}
/// Base trait for all collectors with direct structured data output /// Base trait for all collectors with direct structured data output
#[async_trait] #[async_trait]

View File

@@ -51,7 +51,7 @@ impl NetworkCollector {
/// Get the primary physical interface (the one with default route) /// Get the primary physical interface (the one with default route)
fn get_primary_physical_interface() -> Option<String> { fn get_primary_physical_interface() -> Option<String> {
match Command::new("ip").args(["route", "show", "default"]).output() { match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {
let output_str = String::from_utf8_lossy(&output.stdout); let output_str = String::from_utf8_lossy(&output.stdout);
// Parse: "default via 192.168.1.1 dev eno1 ..." // Parse: "default via 192.168.1.1 dev eno1 ..."
@@ -110,7 +110,7 @@ impl NetworkCollector {
// Parse VLAN configuration // Parse VLAN configuration
let vlan_map = Self::parse_vlan_config(); let vlan_map = Self::parse_vlan_config();
match Command::new("ip").args(["-j", "addr"]).output() { match Command::new("timeout").args(["3", "ip", "-j", "addr"]).output() {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {
let json_str = String::from_utf8_lossy(&output.stdout); let json_str = String::from_utf8_lossy(&output.stdout);

View File

@@ -43,8 +43,8 @@ impl NixOSCollector {
match fs::read_to_string("/etc/hostname") { match fs::read_to_string("/etc/hostname") {
Ok(hostname) => Some(hostname.trim().to_string()), Ok(hostname) => Some(hostname.trim().to_string()),
Err(_) => { Err(_) => {
// Fallback to hostname command // Fallback to hostname command (with 2 second timeout)
match Command::new("hostname").output() { match Command::new("timeout").args(["2", "hostname"]).output() {
Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()), Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
Err(_) => None, Err(_) => None,
} }

View File

@@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr
use std::process::Command; use std::process::Command;
use std::sync::RwLock; use std::sync::RwLock;
use std::time::Instant; use std::time::Instant;
use tracing::debug; use tracing::{debug, warn};
use super::{Collector, CollectorError}; use super::{Collector, CollectorError};
use crate::config::SystemdConfig; use crate::config::SystemdConfig;
@@ -133,16 +133,16 @@ impl SystemdCollector {
// Add Docker images // Add Docker images
let docker_images = self.get_docker_images(); let docker_images = self.get_docker_images();
for (image_name, image_status, image_size) in docker_images { for (image_name, image_status, image_size_str, image_size_mb) in docker_images {
let mut metrics = Vec::new(); let mut metrics = Vec::new();
metrics.push(SubServiceMetric { metrics.push(SubServiceMetric {
label: "size".to_string(), label: "size".to_string(),
value: 0.0, // Size as string in name instead value: image_size_mb,
unit: None, unit: Some("MB".to_string()),
}); });
sub_services.push(SubServiceData { sub_services.push(SubServiceData {
name: format!("{} ({})", image_name, image_size), name: format!("{} ({})", image_name, image_size_str),
service_status: self.calculate_service_status(&image_name, &image_status), service_status: self.calculate_service_status(&image_name, &image_status),
metrics, metrics,
}); });
@@ -169,6 +169,10 @@ impl SystemdCollector {
} }
} }
// Sort services alphabetically by name
agent_data.services.sort_by(|a, b| a.name.cmp(&b.name));
complete_service_data.sort_by(|a, b| a.name.cmp(&b.name));
// Update cached state // Update cached state
{ {
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
@@ -247,18 +251,18 @@ impl SystemdCollector {
/// Auto-discover interesting services to monitor /// Auto-discover interesting services to monitor
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> { fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
// First: Get all service unit files // First: Get all service unit files (with 5 second timeout)
let unit_files_output = Command::new("systemctl") let unit_files_output = Command::new("timeout")
.args(&["list-unit-files", "--type=service", "--no-pager", "--plain"]) .args(&["5", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
.output()?; .output()?;
if !unit_files_output.status.success() { if !unit_files_output.status.success() {
return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
} }
// Second: Get runtime status of all units // Second: Get runtime status of all units (with 5 second timeout)
let units_status_output = Command::new("systemctl") let units_status_output = Command::new("timeout")
.args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"]) .args(&["5", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
.output()?; .output()?;
if !units_status_output.status.success() { if !units_status_output.status.success() {
@@ -354,16 +358,16 @@ impl SystemdCollector {
} }
} }
// Fallback to systemctl if not in cache // Fallback to systemctl if not in cache (with 3 second timeout)
let output = Command::new("systemctl") let output = Command::new("timeout")
.args(&["is-active", &format!("{}.service", service)]) .args(&["3", "systemctl", "is-active", &format!("{}.service", service)])
.output()?; .output()?;
let active_status = String::from_utf8(output.stdout)?.trim().to_string(); let active_status = String::from_utf8(output.stdout)?.trim().to_string();
// Get more detailed info // Get more detailed info (with 3 second timeout)
let output = Command::new("systemctl") let output = Command::new("timeout")
.args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) .args(&["3", "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
.output()?; .output()?;
let detailed_info = String::from_utf8(output.stdout)?; let detailed_info = String::from_utf8(output.stdout)?;
@@ -415,7 +419,7 @@ impl SystemdCollector {
if let Some(dirs) = self.config.service_directories.get(service_name) { if let Some(dirs) = self.config.service_directories.get(service_name) {
// Service has configured paths - use the first accessible one // Service has configured paths - use the first accessible one
for dir in dirs { for dir in dirs {
if let Some(size) = self.get_directory_size(dir) { if let Some(size) = self.get_directory_size(dir).await {
return Ok(size); return Ok(size);
} }
} }
@@ -423,9 +427,9 @@ impl SystemdCollector {
return Ok(0.0); return Ok(0.0);
} }
// No configured path - try to get WorkingDirectory from systemctl // No configured path - try to get WorkingDirectory from systemctl (with 3 second timeout)
let output = Command::new("systemctl") let output = Command::new("timeout")
.args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) .args(&["3", "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("WorkingDirectory for {}", service_name), path: format!("WorkingDirectory for {}", service_name),
@@ -437,7 +441,7 @@ impl SystemdCollector {
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or(""); let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
if !dir.is_empty() && dir != "/" { if !dir.is_empty() && dir != "/" {
return Ok(self.get_directory_size(dir).unwrap_or(0.0)); return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
} }
} }
} }
@@ -445,18 +449,23 @@ impl SystemdCollector {
Ok(0.0) Ok(0.0)
} }
/// Get size of a directory in GB /// Get size of a directory in GB (with 5 second timeout)
fn get_directory_size(&self, path: &str) -> Option<f32> { async fn get_directory_size(&self, path: &str) -> Option<f32> {
let output = Command::new("sudo") use super::run_command_with_timeout;
.args(&["du", "-sb", path])
.output() // Use -s (summary) and --apparent-size for speed, 5 second timeout
.ok()?; let mut cmd = Command::new("sudo");
cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]);
let output = run_command_with_timeout(cmd, 5).await.ok()?;
if !output.status.success() { if !output.status.success() {
// Log permission errors for debugging but don't spam logs // Log permission errors for debugging but don't spam logs
let stderr = String::from_utf8_lossy(&output.stderr); let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("Permission denied") { if stderr.contains("Permission denied") {
debug!("Permission denied accessing directory: {}", path); debug!("Permission denied accessing directory: {}", path);
} else if stderr.contains("timed out") {
warn!("Directory size check timed out for {}", path);
} else { } else {
debug!("Failed to get size for directory {}: {}", path, stderr); debug!("Failed to get size for directory {}: {}", path, stderr);
} }
@@ -774,9 +783,9 @@ impl SystemdCollector {
let mut containers = Vec::new(); let mut containers = Vec::new();
// Check if docker is available (cm-agent user is in docker group) // Check if docker is available (cm-agent user is in docker group)
// Use -a to show ALL containers (running and stopped) // Use -a to show ALL containers (running and stopped) with 5 second timeout
let output = Command::new("docker") let output = Command::new("timeout")
.args(&["ps", "-a", "--format", "{{.Names}},{{.Status}}"]) .args(&["5", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
.output(); .output();
let output = match output { let output = match output {
@@ -815,11 +824,11 @@ impl SystemdCollector {
} }
/// Get docker images as sub-services /// Get docker images as sub-services
fn get_docker_images(&self) -> Vec<(String, String, String)> { fn get_docker_images(&self) -> Vec<(String, String, String, f32)> {
let mut images = Vec::new(); let mut images = Vec::new();
// Check if docker is available (cm-agent user is in docker group) // Check if docker is available (cm-agent user is in docker group) with 5 second timeout
let output = Command::new("docker") let output = Command::new("timeout")
.args(&["images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) .args(&["5", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
.output(); .output();
let output = match output { let output = match output {
@@ -845,23 +854,55 @@ impl SystemdCollector {
let parts: Vec<&str> = line.split(',').collect(); let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2 { if parts.len() >= 2 {
let image_name = parts[0].trim(); let image_name = parts[0].trim();
let size = parts[1].trim(); let size_str = parts[1].trim();
// Skip <none>:<none> images (dangling images) // Skip <none>:<none> images (dangling images)
if image_name.contains("<none>") { if image_name.contains("<none>") {
continue; continue;
} }
// Parse size to MB (sizes come as "142MB", "1.5GB", "512kB", etc.)
let size_mb = self.parse_docker_size(size_str);
images.push(( images.push((
format!("image_{}", image_name), format!("image_{}", image_name),
"active".to_string(), // Images are always "active" (present) "active".to_string(), // Images are always "active" (present)
size.to_string() size_str.to_string(),
size_mb
)); ));
} }
} }
images images
} }
/// Parse Docker size string to MB
fn parse_docker_size(&self, size_str: &str) -> f32 {
let size_upper = size_str.to_uppercase();
// Extract numeric part and unit
let mut num_str = String::new();
let mut unit = String::new();
for ch in size_upper.chars() {
if ch.is_ascii_digit() || ch == '.' {
num_str.push(ch);
} else if ch.is_alphabetic() {
unit.push(ch);
}
}
let value: f32 = num_str.parse().unwrap_or(0.0);
// Convert to MB
match unit.as_str() {
"KB" | "K" => value / 1024.0,
"MB" | "M" => value,
"GB" | "G" => value * 1024.0,
"TB" | "T" => value * 1024.0 * 1024.0,
_ => value, // Assume bytes if no unit
}
}
} }
#[async_trait] #[async_trait]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.181" version = "0.1.187"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -215,7 +215,7 @@ impl Dashboard {
// Update TUI with new metrics (only if not headless) // Update TUI with new metrics (only if not headless)
if let Some(ref mut tui_app) = self.tui_app { if let Some(ref mut tui_app) = self.tui_app {
tui_app.update_metrics(&self.metric_store); tui_app.update_metrics(&mut self.metric_store);
} }
} }

View File

@@ -5,6 +5,14 @@ use tracing::{debug, info, warn};
use super::MetricDataPoint; use super::MetricDataPoint;
/// ZMQ communication statistics per host
#[derive(Debug, Clone)]
pub struct ZmqStats {
pub packets_received: u64,
pub last_packet_time: Instant,
pub last_packet_age_secs: f64,
}
/// Central metric storage for the dashboard /// Central metric storage for the dashboard
pub struct MetricStore { pub struct MetricStore {
/// Current structured data: hostname -> AgentData /// Current structured data: hostname -> AgentData
@@ -13,6 +21,8 @@ pub struct MetricStore {
historical_metrics: HashMap<String, Vec<MetricDataPoint>>, historical_metrics: HashMap<String, Vec<MetricDataPoint>>,
/// Last heartbeat timestamp per host /// Last heartbeat timestamp per host
last_heartbeat: HashMap<String, Instant>, last_heartbeat: HashMap<String, Instant>,
/// ZMQ communication statistics per host
zmq_stats: HashMap<String, ZmqStats>,
/// Configuration /// Configuration
max_metrics_per_host: usize, max_metrics_per_host: usize,
history_retention: Duration, history_retention: Duration,
@@ -24,6 +34,7 @@ impl MetricStore {
current_agent_data: HashMap::new(), current_agent_data: HashMap::new(),
historical_metrics: HashMap::new(), historical_metrics: HashMap::new(),
last_heartbeat: HashMap::new(), last_heartbeat: HashMap::new(),
zmq_stats: HashMap::new(),
max_metrics_per_host, max_metrics_per_host,
history_retention: Duration::from_secs(history_retention_hours * 3600), history_retention: Duration::from_secs(history_retention_hours * 3600),
} }
@@ -44,6 +55,16 @@ impl MetricStore {
self.last_heartbeat.insert(hostname.clone(), now); self.last_heartbeat.insert(hostname.clone(), now);
debug!("Updated heartbeat for host {}", hostname); debug!("Updated heartbeat for host {}", hostname);
// Update ZMQ stats
let stats = self.zmq_stats.entry(hostname.clone()).or_insert(ZmqStats {
packets_received: 0,
last_packet_time: now,
last_packet_age_secs: 0.0,
});
stats.packets_received += 1;
stats.last_packet_time = now;
stats.last_packet_age_secs = 0.0; // Just received
// Add to history // Add to history
let host_history = self let host_history = self
.historical_metrics .historical_metrics
@@ -65,6 +86,15 @@ impl MetricStore {
self.current_agent_data.get(hostname) self.current_agent_data.get(hostname)
} }
/// Get ZMQ communication statistics for a host
pub fn get_zmq_stats(&mut self, hostname: &str) -> Option<ZmqStats> {
let now = Instant::now();
self.zmq_stats.get_mut(hostname).map(|stats| {
// Update packet age
stats.last_packet_age_secs = now.duration_since(stats.last_packet_time).as_secs_f64();
stats.clone()
})
}
/// Get connected hosts (hosts with recent heartbeats) /// Get connected hosts (hosts with recent heartbeats)
pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> { pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> {

View File

@@ -100,7 +100,7 @@ impl TuiApp {
} }
/// Update widgets with structured data from store (only for current host) /// Update widgets with structured data from store (only for current host)
pub fn update_metrics(&mut self, metric_store: &MetricStore) { pub fn update_metrics(&mut self, metric_store: &mut MetricStore) {
if let Some(hostname) = self.current_host.clone() { if let Some(hostname) = self.current_host.clone() {
// Get structured data for this host // Get structured data for this host
if let Some(agent_data) = metric_store.get_agent_data(&hostname) { if let Some(agent_data) = metric_store.get_agent_data(&hostname) {
@@ -110,6 +110,14 @@ impl TuiApp {
host_widgets.system_widget.update_from_agent_data(agent_data); host_widgets.system_widget.update_from_agent_data(agent_data);
host_widgets.services_widget.update_from_agent_data(agent_data); host_widgets.services_widget.update_from_agent_data(agent_data);
// Update ZMQ stats
if let Some(zmq_stats) = metric_store.get_zmq_stats(&hostname) {
host_widgets.system_widget.update_zmq_stats(
zmq_stats.packets_received,
zmq_stats.last_packet_age_secs
);
}
host_widgets.last_update = Some(Instant::now()); host_widgets.last_update = Some(Instant::now());
} }
} }

View File

@@ -15,6 +15,10 @@ pub struct SystemWidget {
nixos_build: Option<String>, nixos_build: Option<String>,
agent_hash: Option<String>, agent_hash: Option<String>,
// ZMQ communication stats
zmq_packets_received: Option<u64>,
zmq_last_packet_age: Option<f64>,
// Network interfaces // Network interfaces
network_interfaces: Vec<cm_dashboard_shared::NetworkInterfaceData>, network_interfaces: Vec<cm_dashboard_shared::NetworkInterfaceData>,
@@ -92,6 +96,8 @@ impl SystemWidget {
Self { Self {
nixos_build: None, nixos_build: None,
agent_hash: None, agent_hash: None,
zmq_packets_received: None,
zmq_last_packet_age: None,
network_interfaces: Vec::new(), network_interfaces: Vec::new(),
cpu_load_1min: None, cpu_load_1min: None,
cpu_load_5min: None, cpu_load_5min: None,
@@ -154,6 +160,12 @@ impl SystemWidget {
pub fn _get_agent_hash(&self) -> Option<&String> { pub fn _get_agent_hash(&self) -> Option<&String> {
self.agent_hash.as_ref() self.agent_hash.as_ref()
} }
/// Update ZMQ communication statistics
pub fn update_zmq_stats(&mut self, packets_received: u64, last_packet_age_secs: f64) {
self.zmq_packets_received = Some(packets_received);
self.zmq_last_packet_age = Some(last_packet_age_secs);
}
} }
use super::Widget; use super::Widget;
@@ -796,6 +808,18 @@ impl SystemWidget {
Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary()) Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary())
])); ]));
// ZMQ communication stats
if let (Some(packets), Some(age)) = (self.zmq_packets_received, self.zmq_last_packet_age) {
let age_text = if age < 1.0 {
format!("{:.0}ms ago", age * 1000.0)
} else {
format!("{:.1}s ago", age)
};
lines.push(Line::from(vec![
Span::styled(format!("ZMQ: {} pkts, last {}", packets, age_text), Typography::secondary())
]));
}
// CPU section // CPU section
lines.push(Line::from(vec![ lines.push(Line::from(vec![
Span::styled("CPU:", Typography::widget_title()) Span::styled("CPU:", Typography::widget_title())

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.181" version = "0.1.187"
edition = "2021" edition = "2021"
[dependencies] [dependencies]