Compare commits

...

22 Commits

Author SHA1 Message Date
2740de9b54 Implement cached collector architecture with configurable timeouts
All checks were successful
Build and Release / build-and-release (push) Successful in 1m20s
Major architectural refactor to eliminate false "host offline" alerts:

- Replace sequential blocking collectors with independent async tasks
- Each collector runs at configurable interval and updates shared cache
- ZMQ sender reads cache every 1-2s regardless of collector speed
- Collector intervals: CPU/Memory (1-10s), Backup/NixOS (30-60s), Disk/Systemd (60-300s)

All intervals now configurable via NixOS config:
- collectors.*.interval_seconds (collection frequency per collector)
- collectors.*.command_timeout_seconds (timeout for shell commands)
- notifications.check_interval_seconds (status change detection rate)

Command timeouts increased from hardcoded 2-3s to configurable 10-30s:
- Disk collector: 30s (SMART operations, lsblk)
- Systemd collector: 15s (systemctl, docker, du commands)
- Network collector: 10s (ip route, ip addr)

Benefits:
- No false "offline" alerts when slow collectors take >10s
- Different update rates for different metric types
- Better resource management with longer timeouts
- Full NixOS configuration control

Bump version to v0.1.193
2025-11-27 22:37:20 +01:00
37f2650200 Document cached collector architecture plan
Add architectural plan for separating ZMQ sending from data collection to prevent false 'host offline' alerts caused by slow collectors.

Key concepts:
- Shared cache (Arc<RwLock<AgentData>>)
- Independent async collector tasks with different update rates
- ZMQ sender always sends every 1s from cache
- Fast collectors (1s), medium (5s), slow (60s)
- No blocking regardless of collector speed
2025-11-27 21:49:44 +01:00
833010e270 Bump version to v0.1.192
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 18:34:53 +01:00
549d9d1c72 Replace whale emoji with ASCII 'D' for performance
Emoji rendering in terminals can be very slow, especially when rendered in the hot path (every frame for every docker image). The whale emoji 🐋 was causing significant rendering delays.

Temporary change to ASCII 'D' to test if emoji was the performance issue.
2025-11-27 18:34:27 +01:00
9b84b70581 Bump version to v0.1.191
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 18:16:49 +01:00
92c3ee3f2a Add Docker whale icon for docker images
Docker images now display with distinctive 🐋 whale icon in blue (highlight color) instead of status icons. This provides clear visual identification that these are docker images while not implying operational status.
2025-11-27 18:16:33 +01:00
1be55f765d Bump version to v0.1.190
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 18:09:49 +01:00
2f94a4b853 Add service_type field to separate data from presentation
Changes:
- Add service_type field to SubServiceData: 'nginx_site', 'container', 'image'
- Agent sends pure data without display formatting
- Dashboard checks service_type to decide presentation
- Docker images now display without status icon (service_type='image')
- Remove unused image_size_str from docker images tuple

Clean separation: agent provides data, dashboard handles display logic.
2025-11-27 18:09:20 +01:00
ff2b43827a Bump version to v0.1.189
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 17:57:38 +01:00
fac0188c6f Change docker image display format and status
Changes:
- Rename docker images from 'image_node:18...' to 'I node:18...' for conciseness
- Change image status from 'active' to 'inactive' for neutral informational display
- Images now show with gray empty circle ○ instead of green filled circle ●

Docker images are static artifacts without meaningful operational status, so using inactive status provides neutral gray display that won't trigger alerts or affect service status aggregation.
2025-11-27 17:57:24 +01:00
6bb350f016 Bump version to v0.1.188
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 16:39:46 +01:00
374b126446 Reduce all command timeouts to 2-3 seconds max
With 10-second host heartbeat timeout, all command timeouts must be significantly lower to ensure total collection time stays under 10 seconds.

Changed timeouts:
- smartctl: 10s → 3s (critical: multiple drives queried sequentially)
- du: 5s → 2s
- lsblk: 5s → 2s
- systemctl list commands: 5s → 3s
- systemctl show/is-active: 3s → 2s
- docker commands: 5s → 3s
- df, ip commands: 3s → 2s

Total worst-case collection time now capped at more reasonable levels, preventing false host offline alerts from blocking operations.
2025-11-27 16:38:54 +01:00
76c04633b5 Bump version to v0.1.187
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 16:34:42 +01:00
1e0510be81 Add comprehensive timeouts to all blocking system commands
Fixes random host disconnections caused by blocking operations preventing timely ZMQ packet transmission.

Changes:
- Add run_command_with_timeout() wrapper using tokio for async command execution
- Apply 10s timeout to smartctl (prevents 30+ second hangs on failing drives)
- Apply 5s timeout to du, lsblk, systemctl list commands
- Apply 3s timeout to systemctl show/is-active, df, ip commands
- Apply 2s timeout to hostname command
- Use system 'timeout' command for sync operations where async not needed

Critical fixes:
- smartctl: Failing drives could block for 30+ seconds per drive
- du: Large directories (Docker, PostgreSQL) could block 10-30+ seconds
- systemctl/docker: Commands could block indefinitely during system issues

With 1-second collection interval and 10-second heartbeat timeout, any blocking operation >10s causes false "host offline" alerts. These timeouts ensure collection completes quickly even during system degradation.
2025-11-27 16:34:08 +01:00
9a2df906ea Add ZMQ communication statistics tracking and display
All checks were successful
Build and Release / build-and-release (push) Successful in 1m10s
2025-11-27 16:14:45 +01:00
6d6beb207d Parse Docker image sizes to MB and sort services alphabetically
All checks were successful
Build and Release / build-and-release (push) Successful in 1m18s
2025-11-27 15:57:38 +01:00
7a68da01f5 Remove debug logging for NVMe SMART collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 15:40:16 +01:00
5be67fed64 Add debug logging for NVMe SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 15:00:48 +01:00
cac836601b Add NVMe device type flag for SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 13:34:30 +01:00
bd22ce265b Use direct smartctl with CAP_SYS_RAWIO instead of sudo
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 13:22:13 +01:00
bbc8b7b1cb Add info-level logging for SMART data collection debugging
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 13:15:53 +01:00
5dd8cadef3 Remove debug logging from Docker collection code
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 12:50:20 +01:00
19 changed files with 519 additions and 187 deletions

View File

@@ -156,6 +156,86 @@ Complete migration from string-based metrics to structured JSON data. Eliminates
- ✅ Backward compatibility via bridge conversion to existing UI widgets - ✅ Backward compatibility via bridge conversion to existing UI widgets
- ✅ All string parsing bugs eliminated - ✅ All string parsing bugs eliminated
### Cached Collector Architecture (✅ IMPLEMENTED)
**Problem:** Blocking collectors prevent timely ZMQ transmission, causing false "host offline" alerts.
**Previous (Sequential Blocking):**
```
Every 1 second:
└─ collect_all_data() [BLOCKS for 2-10+ seconds]
├─ CPU (fast: 10ms)
├─ Memory (fast: 20ms)
├─ Disk SMART (slow: 3s per drive × 4 drives = 12s)
├─ Service disk usage (slow: 2-8s per service)
└─ Docker (medium: 500ms)
└─ send_via_zmq() [Only after ALL collection completes]
Result: If any collector takes >10s → "host offline" false alert
```
**New (Cached Independent Collectors):**
```
Shared Cache: Arc<RwLock<AgentData>>
Background Collectors (independent async tasks):
├─ Fast collectors (CPU, RAM, Network)
│ └─ Update cache every 1 second
├─ Medium collectors (Services, Docker)
│ └─ Update cache every 5 seconds
└─ Slow collectors (Disk usage, SMART data)
└─ Update cache every 60 seconds
ZMQ Sender (separate async task):
Every 1 second:
└─ Read current cache
└─ Send via ZMQ [Always instant, never blocked]
```
**Benefits:**
- ✅ ZMQ sends every 1 second regardless of collector speed
- ✅ No false "host offline" alerts from slow collectors
- ✅ Different update rates for different metrics (CPU=1s, SMART=60s)
- ✅ System stays responsive even with slow operations
- ✅ Slow collectors can use longer timeouts without blocking
**Implementation Details:**
- **Shared cache**: `Arc<RwLock<AgentData>>` initialized at agent startup
- **Collector intervals**: Fully configurable via NixOS config (`interval_seconds` per collector)
- Recommended: Fast (1-10s): CPU, Memory, Network
- Recommended: Medium (30-60s): Backup, NixOS
- Recommended: Slow (60-300s): Disk, Systemd
- **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()`
- **Cache updates**: Collectors acquire write lock → update → release immediately
- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts
- **Notification check**: Runs every `notifications.check_interval_seconds`
- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission
- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage)
**Configuration (NixOS):**
All intervals and timeouts configurable in `services/cm-dashboard.nix`:
Collection Intervals:
- `collectors.cpu.interval_seconds` (default: 10s)
- `collectors.memory.interval_seconds` (default: 2s)
- `collectors.disk.interval_seconds` (default: 300s)
- `collectors.systemd.interval_seconds` (default: 10s)
- `collectors.backup.interval_seconds` (default: 60s)
- `collectors.network.interval_seconds` (default: 10s)
- `collectors.nixos.interval_seconds` (default: 60s)
- `notifications.check_interval_seconds` (default: 30s)
- `collection_interval_seconds` - ZMQ transmission rate (default: 2s)
Command Timeouts (prevent resource leaks from hung commands):
- `collectors.disk.command_timeout_seconds` (default: 30s) - lsblk, smartctl, etc.
- `collectors.systemd.command_timeout_seconds` (default: 15s) - systemctl, docker, du
- `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr
**Code Locations:**
- agent/src/agent.rs:59-133 - Collector task spawning
- agent/src/agent.rs:151-179 - Independent collector task runner
- agent/src/agent.rs:199-207 - ZMQ sender in main loop
### Maintenance Mode ### Maintenance Mode
- Agent checks for `/tmp/cm-maintenance` file before sending notifications - Agent checks for `/tmp/cm-maintenance` file before sending notifications

6
Cargo.lock generated
View File

@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]] [[package]]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.175" version = "0.1.192"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@@ -301,7 +301,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.175" version = "0.1.192"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -324,7 +324,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.175" version = "0.1.192"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.178" version = "0.1.193"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -1,13 +1,14 @@
use anyhow::Result; use anyhow::Result;
use gethostname::gethostname; use gethostname::gethostname;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::interval; use tokio::time::interval;
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::communication::{AgentCommand, ZmqHandler}; use crate::communication::{AgentCommand, ZmqHandler};
use crate::config::AgentConfig; use crate::config::AgentConfig;
use crate::collectors::{ use crate::collectors::{
Collector,
backup::BackupCollector, backup::BackupCollector,
cpu::CpuCollector, cpu::CpuCollector,
disk::DiskCollector, disk::DiskCollector,
@@ -23,7 +24,7 @@ pub struct Agent {
hostname: String, hostname: String,
config: AgentConfig, config: AgentConfig,
zmq_handler: ZmqHandler, zmq_handler: ZmqHandler,
collectors: Vec<Box<dyn Collector>>, cache: Arc<RwLock<AgentData>>,
notification_manager: NotificationManager, notification_manager: NotificationManager,
previous_status: Option<SystemStatus>, previous_status: Option<SystemStatus>,
} }
@@ -55,39 +56,94 @@ impl Agent {
config.zmq.publisher_port config.zmq.publisher_port
); );
// Initialize collectors // Initialize shared cache
let mut collectors: Vec<Box<dyn Collector>> = Vec::new(); let cache = Arc::new(RwLock::new(AgentData::new(
hostname.clone(),
// Add enabled collectors env!("CARGO_PKG_VERSION").to_string()
)));
info!("Initialized shared agent data cache");
// Spawn independent collector tasks
let mut collector_count = 0;
// CPU collector
if config.collectors.cpu.enabled { if config.collectors.cpu.enabled {
collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone()))); let cache_clone = cache.clone();
let collector = CpuCollector::new(config.collectors.cpu.clone());
let interval = config.collectors.cpu.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "CPU").await;
});
collector_count += 1;
} }
// Memory collector
if config.collectors.memory.enabled { if config.collectors.memory.enabled {
collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone()))); let cache_clone = cache.clone();
} let collector = MemoryCollector::new(config.collectors.memory.clone());
let interval = config.collectors.memory.interval_seconds;
if config.collectors.disk.enabled { tokio::spawn(async move {
collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone()))); Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Memory").await;
} });
collector_count += 1;
if config.collectors.systemd.enabled {
collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone())));
}
if config.collectors.backup.enabled {
collectors.push(Box::new(BackupCollector::new()));
} }
// Network collector
if config.collectors.network.enabled { if config.collectors.network.enabled {
collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone()))); let cache_clone = cache.clone();
let collector = NetworkCollector::new(config.collectors.network.clone());
let interval = config.collectors.network.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Network").await;
});
collector_count += 1;
} }
// Backup collector
if config.collectors.backup.enabled {
let cache_clone = cache.clone();
let collector = BackupCollector::new();
let interval = config.collectors.backup.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Backup").await;
});
collector_count += 1;
}
// NixOS collector
if config.collectors.nixos.enabled { if config.collectors.nixos.enabled {
collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone()))); let cache_clone = cache.clone();
let collector = NixOSCollector::new(config.collectors.nixos.clone());
let interval = config.collectors.nixos.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "NixOS").await;
});
collector_count += 1;
} }
info!("Initialized {} collectors", collectors.len()); // Disk collector
if config.collectors.disk.enabled {
let cache_clone = cache.clone();
let collector = DiskCollector::new(config.collectors.disk.clone());
let interval = config.collectors.disk.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Disk").await;
});
collector_count += 1;
}
// Systemd collector
if config.collectors.systemd.enabled {
let cache_clone = cache.clone();
let collector = SystemdCollector::new(config.collectors.systemd.clone());
let interval = config.collectors.systemd.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Systemd").await;
});
collector_count += 1;
}
info!("Spawned {} independent collector tasks", collector_count);
// Initialize notification manager // Initialize notification manager
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
@@ -97,45 +153,79 @@ impl Agent {
hostname, hostname,
config, config,
zmq_handler, zmq_handler,
collectors, cache,
notification_manager, notification_manager,
previous_status: None, previous_status: None,
}) })
} }
/// Main agent loop with structured data collection /// Independent collector task runner
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { async fn run_collector_task<C>(
info!("Starting agent main loop"); cache: Arc<RwLock<AgentData>>,
collector: C,
interval_duration: Duration,
name: &str,
) where
C: crate::collectors::Collector + Send + 'static,
{
let mut interval_timer = interval(interval_duration);
info!("{} collector task started (interval: {:?})", name, interval_duration);
// Initial collection loop {
if let Err(e) = self.collect_and_broadcast().await { interval_timer.tick().await;
error!("Initial metric collection failed: {}", e);
// Acquire write lock and update cache
{
let mut agent_data = cache.write().await;
match collector.collect_structured(&mut *agent_data).await {
Ok(_) => {
debug!("{} collector updated cache", name);
}
Err(e) => {
error!("{} collector failed: {}", name, e);
}
}
} // Release lock immediately after collection
} }
}
// Set up intervals /// Main agent loop with cached data architecture
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
info!("Starting agent main loop with cached collector architecture");
// Set up intervals from config
let mut transmission_interval = interval(Duration::from_secs( let mut transmission_interval = interval(Duration::from_secs(
self.config.collection_interval_seconds, self.config.collection_interval_seconds,
)); ));
let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s let mut notification_interval = interval(Duration::from_secs(
self.config.notifications.check_interval_seconds,
));
let mut command_interval = interval(Duration::from_millis(100));
// Skip initial ticks to avoid immediate execution // Skip initial ticks
transmission_interval.tick().await; transmission_interval.tick().await;
notification_interval.tick().await; notification_interval.tick().await;
command_interval.tick().await;
loop { loop {
tokio::select! { tokio::select! {
_ = transmission_interval.tick() => { _ = transmission_interval.tick() => {
if let Err(e) = self.collect_and_broadcast().await { // Read current cache state and broadcast via ZMQ
error!("Failed to collect and broadcast metrics: {}", e); let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast agent data: {}", e);
} else {
debug!("Successfully broadcast agent data");
} }
} }
_ = notification_interval.tick() => { _ = notification_interval.tick() => {
// Process any pending notifications // Read cache and check for status changes
// NOTE: With structured data, we might need to implement status tracking differently let agent_data = self.cache.read().await.clone();
// For now, we skip this until status evaluation is migrated if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
error!("Failed to check status changes: {}", e);
}
} }
// Handle incoming commands (check periodically) _ = command_interval.tick() => {
_ = tokio::time::sleep(Duration::from_millis(100)) => {
if let Err(e) = self.handle_commands().await { if let Err(e) = self.handle_commands().await {
error!("Error handling commands: {}", e); error!("Error handling commands: {}", e);
} }
@@ -151,35 +241,6 @@ impl Agent {
Ok(()) Ok(())
} }
/// Collect structured data from all collectors and broadcast via ZMQ
async fn collect_and_broadcast(&mut self) -> Result<()> {
debug!("Starting structured data collection");
// Initialize empty AgentData
let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
// Collect data from all collectors
for collector in &self.collectors {
if let Err(e) = collector.collect_structured(&mut agent_data).await {
error!("Collector failed: {}", e);
// Continue with other collectors even if one fails
}
}
// Check for status changes and send notifications
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
error!("Failed to check status changes: {}", e);
}
// Broadcast the structured data via ZMQ
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast agent data: {}", e);
} else {
debug!("Successfully broadcast structured agent data");
}
Ok(())
}
/// Check for status changes and send notifications /// Check for status changes and send notifications
async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> { async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> {
@@ -267,9 +328,12 @@ impl Agent {
match command { match command {
AgentCommand::CollectNow => { AgentCommand::CollectNow => {
info!("Received immediate collection request"); info!("Received immediate transmission request");
if let Err(e) = self.collect_and_broadcast().await { // With cached architecture, collectors run independently
error!("Failed to collect on demand: {}", e); // Just send current cache state immediately
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast on demand: {}", e);
} }
} }
AgentCommand::SetInterval { seconds } => { AgentCommand::SetInterval { seconds } => {

View File

@@ -112,9 +112,12 @@ impl DiskCollector {
/// Get block devices and their mount points using lsblk /// Get block devices and their mount points using lsblk
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> { async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
let output = Command::new("lsblk") use super::run_command_with_timeout;
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
.output() let mut cmd = Command::new("lsblk");
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: "block devices".to_string(), path: "block devices".to_string(),
error: e.to_string(), error: e.to_string(),
@@ -186,8 +189,8 @@ impl DiskCollector {
/// Get filesystem info for a single mount point /// Get filesystem info for a single mount point
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> { fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
let output = Command::new("df") let output = std::process::Command::new("timeout")
.args(&["--block-size=1", mount_point]) .args(&["2", "df", "--block-size=1", mount_point])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("df {}", mount_point), path: format!("df {}", mount_point),
@@ -386,7 +389,7 @@ impl DiskCollector {
/// Get SMART data for drives /// Get SMART data for drives
async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> { async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> {
let mut smart_data = HashMap::new(); let mut smart_data = HashMap::new();
// Collect all drive names // Collect all drive names
let mut all_drives = std::collections::HashSet::new(); let mut all_drives = std::collections::HashSet::new();
for drive in physical_drives { for drive in physical_drives {
@@ -413,23 +416,26 @@ impl DiskCollector {
/// Get SMART data for a single drive /// Get SMART data for a single drive
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> { async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
let output = Command::new("sudo") use super::run_command_with_timeout;
.args(&["smartctl", "-a", &format!("/dev/{}", drive_name)])
.output() // Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
// For NVMe drives, specify device type explicitly
let mut cmd = Command::new("smartctl");
if drive_name.starts_with("nvme") {
cmd.args(&["-d", "nvme", "-a", &format!("/dev/{}", drive_name)]);
} else {
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
}
let output = run_command_with_timeout(cmd, 3).await
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("SMART data for {}", drive_name), path: format!("SMART data for {}", drive_name),
error: e.to_string(), error: e.to_string(),
})?; })?;
let output_str = String::from_utf8_lossy(&output.stdout); let output_str = String::from_utf8_lossy(&output.stdout);
let error_str = String::from_utf8_lossy(&output.stderr);
// Debug logging for SMART command results
debug!("SMART output for {}: status={}, stdout_len={}, stderr={}",
drive_name, output.status, output_str.len(), error_str);
if !output.status.success() { if !output.status.success() {
debug!("SMART command failed for {}: {}", drive_name, error_str);
// Return unknown data rather than failing completely // Return unknown data rather than failing completely
return Ok(SmartData { return Ok(SmartData {
health: "UNKNOWN".to_string(), health: "UNKNOWN".to_string(),
@@ -756,9 +762,9 @@ impl DiskCollector {
/// Get drive information for a mount path /// Get drive information for a mount path
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> { fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
// Use lsblk to find the backing device // Use lsblk to find the backing device with timeout
let output = Command::new("lsblk") let output = Command::new("timeout")
.args(&["-rn", "-o", "NAME,MOUNTPOINT"]) .args(&["2", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"])
.output() .output()
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;

View File

@@ -105,12 +105,12 @@ impl MemoryCollector {
return Ok(()); return Ok(());
} }
// Get usage data for all tmpfs mounts at once using df // Get usage data for all tmpfs mounts at once using df (with 2 second timeout)
let mut df_args = vec!["df", "--output=target,size,used", "--block-size=1"]; let mut df_args = vec!["2", "df", "--output=target,size,used", "--block-size=1"];
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str())); df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
let df_output = std::process::Command::new(df_args[0]) let df_output = std::process::Command::new("timeout")
.args(&df_args[1..]) .args(&df_args[..])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: "tmpfs mounts".to_string(), path: "tmpfs mounts".to_string(),

View File

@@ -1,6 +1,8 @@
use async_trait::async_trait; use async_trait::async_trait;
use cm_dashboard_shared::{AgentData}; use cm_dashboard_shared::{AgentData};
use std::process::{Command, Output};
use std::time::Duration;
use tokio::time::timeout;
pub mod backup; pub mod backup;
pub mod cpu; pub mod cpu;
@@ -13,6 +15,20 @@ pub mod systemd;
pub use error::CollectorError; pub use error::CollectorError;
/// Run a command with a timeout to prevent blocking
pub async fn run_command_with_timeout(mut cmd: Command, timeout_secs: u64) -> std::io::Result<Output> {
let timeout_duration = Duration::from_secs(timeout_secs);
match timeout(timeout_duration, tokio::task::spawn_blocking(move || cmd.output())).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Command timed out after {} seconds", timeout_secs)
)),
}
}
/// Base trait for all collectors with direct structured data output /// Base trait for all collectors with direct structured data output
#[async_trait] #[async_trait]

View File

@@ -8,12 +8,12 @@ use crate::config::NetworkConfig;
/// Network interface collector with physical/virtual classification and link status /// Network interface collector with physical/virtual classification and link status
pub struct NetworkCollector { pub struct NetworkCollector {
_config: NetworkConfig, config: NetworkConfig,
} }
impl NetworkCollector { impl NetworkCollector {
pub fn new(config: NetworkConfig) -> Self { pub fn new(config: NetworkConfig) -> Self {
Self { _config: config } Self { config }
} }
/// Check if interface is physical (not virtual) /// Check if interface is physical (not virtual)
@@ -50,8 +50,9 @@ impl NetworkCollector {
} }
/// Get the primary physical interface (the one with default route) /// Get the primary physical interface (the one with default route)
fn get_primary_physical_interface() -> Option<String> { fn get_primary_physical_interface(&self) -> Option<String> {
match Command::new("ip").args(["route", "show", "default"]).output() { let timeout_str = self.config.command_timeout_seconds.to_string();
match Command::new("timeout").args([&timeout_str, "ip", "route", "show", "default"]).output() {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {
let output_str = String::from_utf8_lossy(&output.stdout); let output_str = String::from_utf8_lossy(&output.stdout);
// Parse: "default via 192.168.1.1 dev eno1 ..." // Parse: "default via 192.168.1.1 dev eno1 ..."
@@ -110,7 +111,8 @@ impl NetworkCollector {
// Parse VLAN configuration // Parse VLAN configuration
let vlan_map = Self::parse_vlan_config(); let vlan_map = Self::parse_vlan_config();
match Command::new("ip").args(["-j", "addr"]).output() { let timeout_str = self.config.command_timeout_seconds.to_string();
match Command::new("timeout").args([&timeout_str, "ip", "-j", "addr"]).output() {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {
let json_str = String::from_utf8_lossy(&output.stdout); let json_str = String::from_utf8_lossy(&output.stdout);
@@ -195,7 +197,7 @@ impl NetworkCollector {
} }
// Assign primary physical interface as parent to virtual interfaces without explicit parent // Assign primary physical interface as parent to virtual interfaces without explicit parent
let primary_interface = Self::get_primary_physical_interface(); let primary_interface = self.get_primary_physical_interface();
if let Some(primary) = primary_interface { if let Some(primary) = primary_interface {
for interface in interfaces.iter_mut() { for interface in interfaces.iter_mut() {
// Only assign parent to virtual interfaces that don't already have one // Only assign parent to virtual interfaces that don't already have one

View File

@@ -43,8 +43,8 @@ impl NixOSCollector {
match fs::read_to_string("/etc/hostname") { match fs::read_to_string("/etc/hostname") {
Ok(hostname) => Some(hostname.trim().to_string()), Ok(hostname) => Some(hostname.trim().to_string()),
Err(_) => { Err(_) => {
// Fallback to hostname command // Fallback to hostname command (with 2 second timeout)
match Command::new("hostname").output() { match Command::new("timeout").args(["2", "hostname"]).output() {
Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()), Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
Err(_) => None, Err(_) => None,
} }

View File

@@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr
use std::process::Command; use std::process::Command;
use std::sync::RwLock; use std::sync::RwLock;
use std::time::Instant; use std::time::Instant;
use tracing::{debug, info}; use tracing::{debug, warn};
use super::{Collector, CollectorError}; use super::{Collector, CollectorError};
use crate::config::SystemdConfig; use crate::config::SystemdConfig;
@@ -113,15 +113,13 @@ impl SystemdCollector {
name: site_name.clone(), name: site_name.clone(),
service_status: self.calculate_service_status(&site_name, &site_status), service_status: self.calculate_service_status(&site_name, &site_status),
metrics, metrics,
service_type: "nginx_site".to_string(),
}); });
} }
} }
if service_name.contains("docker") && active_status == "active" { if service_name.contains("docker") && active_status == "active" {
info!("Collecting Docker sub-services for service: {}", service_name);
let docker_containers = self.get_docker_containers(); let docker_containers = self.get_docker_containers();
info!("Found {} Docker containers", docker_containers.len());
for (container_name, container_status) in docker_containers { for (container_name, container_status) in docker_containers {
// For now, docker containers have no additional metrics // For now, docker containers have no additional metrics
// Future: could add memory_mb, cpu_percent, restart_count, etc. // Future: could add memory_mb, cpu_percent, restart_count, etc.
@@ -131,28 +129,27 @@ impl SystemdCollector {
name: container_name.clone(), name: container_name.clone(),
service_status: self.calculate_service_status(&container_name, &container_status), service_status: self.calculate_service_status(&container_name, &container_status),
metrics, metrics,
service_type: "container".to_string(),
}); });
} }
// Add Docker images // Add Docker images
let docker_images = self.get_docker_images(); let docker_images = self.get_docker_images();
info!("Found {} Docker images", docker_images.len()); for (image_name, image_status, image_size_mb) in docker_images {
for (image_name, image_status, image_size) in docker_images {
let mut metrics = Vec::new(); let mut metrics = Vec::new();
metrics.push(SubServiceMetric { metrics.push(SubServiceMetric {
label: "size".to_string(), label: "size".to_string(),
value: 0.0, // Size as string in name instead value: image_size_mb,
unit: None, unit: Some("MB".to_string()),
}); });
sub_services.push(SubServiceData { sub_services.push(SubServiceData {
name: format!("{} ({})", image_name, image_size), name: image_name.to_string(),
service_status: self.calculate_service_status(&image_name, &image_status), service_status: self.calculate_service_status(&image_name, &image_status),
metrics, metrics,
service_type: "image".to_string(),
}); });
} }
info!("Total Docker sub-services added: {}", sub_services.len());
} }
// Create complete service data // Create complete service data
@@ -175,6 +172,10 @@ impl SystemdCollector {
} }
} }
// Sort services alphabetically by name
agent_data.services.sort_by(|a, b| a.name.cmp(&b.name));
complete_service_data.sort_by(|a, b| a.name.cmp(&b.name));
// Update cached state // Update cached state
{ {
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
@@ -254,8 +255,9 @@ impl SystemdCollector {
/// Auto-discover interesting services to monitor /// Auto-discover interesting services to monitor
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> { fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
// First: Get all service unit files // First: Get all service unit files
let unit_files_output = Command::new("systemctl") let timeout_str = self.config.command_timeout_seconds.to_string();
.args(&["list-unit-files", "--type=service", "--no-pager", "--plain"]) let unit_files_output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
.output()?; .output()?;
if !unit_files_output.status.success() { if !unit_files_output.status.success() {
@@ -263,8 +265,8 @@ impl SystemdCollector {
} }
// Second: Get runtime status of all units // Second: Get runtime status of all units
let units_status_output = Command::new("systemctl") let units_status_output = Command::new("timeout")
.args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"]) .args(&[&timeout_str, "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
.output()?; .output()?;
if !units_status_output.status.success() { if !units_status_output.status.success() {
@@ -361,15 +363,16 @@ impl SystemdCollector {
} }
// Fallback to systemctl if not in cache // Fallback to systemctl if not in cache
let output = Command::new("systemctl") let timeout_str = self.config.command_timeout_seconds.to_string();
.args(&["is-active", &format!("{}.service", service)]) let output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "is-active", &format!("{}.service", service)])
.output()?; .output()?;
let active_status = String::from_utf8(output.stdout)?.trim().to_string(); let active_status = String::from_utf8(output.stdout)?.trim().to_string();
// Get more detailed info // Get more detailed info
let output = Command::new("systemctl") let output = Command::new("timeout")
.args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) .args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
.output()?; .output()?;
let detailed_info = String::from_utf8(output.stdout)?; let detailed_info = String::from_utf8(output.stdout)?;
@@ -421,7 +424,7 @@ impl SystemdCollector {
if let Some(dirs) = self.config.service_directories.get(service_name) { if let Some(dirs) = self.config.service_directories.get(service_name) {
// Service has configured paths - use the first accessible one // Service has configured paths - use the first accessible one
for dir in dirs { for dir in dirs {
if let Some(size) = self.get_directory_size(dir) { if let Some(size) = self.get_directory_size(dir).await {
return Ok(size); return Ok(size);
} }
} }
@@ -430,8 +433,9 @@ impl SystemdCollector {
} }
// No configured path - try to get WorkingDirectory from systemctl // No configured path - try to get WorkingDirectory from systemctl
let output = Command::new("systemctl") let timeout_str = self.config.command_timeout_seconds.to_string();
.args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) let output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
.output() .output()
.map_err(|e| CollectorError::SystemRead { .map_err(|e| CollectorError::SystemRead {
path: format!("WorkingDirectory for {}", service_name), path: format!("WorkingDirectory for {}", service_name),
@@ -443,7 +447,7 @@ impl SystemdCollector {
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") { if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or(""); let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
if !dir.is_empty() && dir != "/" { if !dir.is_empty() && dir != "/" {
return Ok(self.get_directory_size(dir).unwrap_or(0.0)); return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
} }
} }
} }
@@ -452,17 +456,22 @@ impl SystemdCollector {
} }
/// Get size of a directory in GB /// Get size of a directory in GB
fn get_directory_size(&self, path: &str) -> Option<f32> { async fn get_directory_size(&self, path: &str) -> Option<f32> {
let output = Command::new("sudo") use super::run_command_with_timeout;
.args(&["du", "-sb", path])
.output() // Use -s (summary) and --apparent-size for speed
.ok()?; let mut cmd = Command::new("sudo");
cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]);
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await.ok()?;
if !output.status.success() { if !output.status.success() {
// Log permission errors for debugging but don't spam logs // Log permission errors for debugging but don't spam logs
let stderr = String::from_utf8_lossy(&output.stderr); let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("Permission denied") { if stderr.contains("Permission denied") {
debug!("Permission denied accessing directory: {}", path); debug!("Permission denied accessing directory: {}", path);
} else if stderr.contains("timed out") {
warn!("Directory size check timed out for {}", path);
} else { } else {
debug!("Failed to get size for directory {}: {}", path, stderr); debug!("Failed to get size for directory {}: {}", path, stderr);
} }
@@ -781,8 +790,9 @@ impl SystemdCollector {
// Check if docker is available (cm-agent user is in docker group) // Check if docker is available (cm-agent user is in docker group)
// Use -a to show ALL containers (running and stopped) // Use -a to show ALL containers (running and stopped)
let output = Command::new("docker") let timeout_str = self.config.command_timeout_seconds.to_string();
.args(&["ps", "-a", "--format", "{{.Names}},{{.Status}}"]) let output = Command::new("timeout")
.args(&[&timeout_str, "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
.output(); .output();
let output = match output { let output = match output {
@@ -821,25 +831,20 @@ impl SystemdCollector {
} }
/// Get docker images as sub-services /// Get docker images as sub-services
fn get_docker_images(&self) -> Vec<(String, String, String)> { fn get_docker_images(&self) -> Vec<(String, String, f32)> {
let mut images = Vec::new(); let mut images = Vec::new();
info!("Collecting Docker images");
// Check if docker is available (cm-agent user is in docker group) // Check if docker is available (cm-agent user is in docker group)
let output = Command::new("docker") let timeout_str = self.config.command_timeout_seconds.to_string();
.args(&["images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) let output = Command::new("timeout")
.args(&[&timeout_str, "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
.output(); .output();
let output = match output { let output = match output {
Ok(out) if out.status.success() => out, Ok(out) if out.status.success() => out,
Ok(out) => { Ok(_) => {
let stderr = String::from_utf8_lossy(&out.stderr);
info!("Docker images command failed with status: {}, stderr: {}", out.status, stderr);
return images; return images;
} }
Err(e) => { Err(_) => {
info!("Docker images command error: {}", e);
return images; return images;
} }
}; };
@@ -857,23 +862,54 @@ impl SystemdCollector {
let parts: Vec<&str> = line.split(',').collect(); let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2 { if parts.len() >= 2 {
let image_name = parts[0].trim(); let image_name = parts[0].trim();
let size = parts[1].trim(); let size_str = parts[1].trim();
// Skip <none>:<none> images (dangling images) // Skip <none>:<none> images (dangling images)
if image_name.contains("<none>") { if image_name.contains("<none>") {
continue; continue;
} }
// Parse size to MB (sizes come as "142MB", "1.5GB", "512kB", etc.)
let size_mb = self.parse_docker_size(size_str);
images.push(( images.push((
format!("image_{}", image_name), image_name.to_string(),
"active".to_string(), // Images are always "active" (present) "inactive".to_string(), // Images are informational - use inactive for neutral display
size.to_string() size_mb
)); ));
} }
} }
images images
} }
/// Parse Docker size string to MB
fn parse_docker_size(&self, size_str: &str) -> f32 {
let size_upper = size_str.to_uppercase();
// Extract numeric part and unit
let mut num_str = String::new();
let mut unit = String::new();
for ch in size_upper.chars() {
if ch.is_ascii_digit() || ch == '.' {
num_str.push(ch);
} else if ch.is_alphabetic() {
unit.push(ch);
}
}
let value: f32 = num_str.parse().unwrap_or(0.0);
// Convert to MB
match unit.as_str() {
"KB" | "K" => value / 1024.0,
"MB" | "M" => value,
"GB" | "G" => value * 1024.0,
"TB" | "T" => value * 1024.0 * 1024.0,
_ => value, // Assume bytes if no unit
}
}
} }
#[async_trait] #[async_trait]

View File

@@ -79,6 +79,9 @@ pub struct DiskConfig {
pub temperature_critical_celsius: f32, pub temperature_critical_celsius: f32,
pub wear_warning_percent: f32, pub wear_warning_percent: f32,
pub wear_critical_percent: f32, pub wear_critical_percent: f32,
/// Command timeout in seconds for lsblk, smartctl, etc.
#[serde(default = "default_disk_command_timeout")]
pub command_timeout_seconds: u64,
} }
/// Filesystem configuration entry /// Filesystem configuration entry
@@ -108,6 +111,9 @@ pub struct SystemdConfig {
pub http_timeout_seconds: u64, pub http_timeout_seconds: u64,
pub http_connect_timeout_seconds: u64, pub http_connect_timeout_seconds: u64,
pub nginx_latency_critical_ms: f32, pub nginx_latency_critical_ms: f32,
/// Command timeout in seconds for systemctl, docker, du commands
#[serde(default = "default_systemd_command_timeout")]
pub command_timeout_seconds: u64,
} }
@@ -132,6 +138,9 @@ pub struct BackupConfig {
pub struct NetworkConfig { pub struct NetworkConfig {
pub enabled: bool, pub enabled: bool,
pub interval_seconds: u64, pub interval_seconds: u64,
/// Command timeout in seconds for ip route, ip addr commands
#[serde(default = "default_network_command_timeout")]
pub command_timeout_seconds: u64,
} }
/// Notification configuration /// Notification configuration
@@ -145,6 +154,9 @@ pub struct NotificationConfig {
pub rate_limit_minutes: u64, pub rate_limit_minutes: u64,
/// Email notification batching interval in seconds (default: 60) /// Email notification batching interval in seconds (default: 60)
pub aggregation_interval_seconds: u64, pub aggregation_interval_seconds: u64,
/// Status check interval in seconds for detecting changes (default: 30)
#[serde(default = "default_notification_check_interval")]
pub check_interval_seconds: u64,
/// List of metric names to exclude from email notifications /// List of metric names to exclude from email notifications
#[serde(default)] #[serde(default)]
pub exclude_email_metrics: Vec<String>, pub exclude_email_metrics: Vec<String>,
@@ -158,10 +170,26 @@ fn default_heartbeat_interval_seconds() -> u64 {
5 5
} }
fn default_notification_check_interval() -> u64 {
30
}
fn default_maintenance_mode_file() -> String { fn default_maintenance_mode_file() -> String {
"/tmp/cm-maintenance".to_string() "/tmp/cm-maintenance".to_string()
} }
fn default_disk_command_timeout() -> u64 {
30
}
fn default_systemd_command_timeout() -> u64 {
15
}
fn default_network_command_timeout() -> u64 {
10
}
impl AgentConfig { impl AgentConfig {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> { pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
loader::load_config(path) loader::load_config(path)

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.178" version = "0.1.193"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -215,7 +215,7 @@ impl Dashboard {
// Update TUI with new metrics (only if not headless) // Update TUI with new metrics (only if not headless)
if let Some(ref mut tui_app) = self.tui_app { if let Some(ref mut tui_app) = self.tui_app {
tui_app.update_metrics(&self.metric_store); tui_app.update_metrics(&mut self.metric_store);
} }
} }

View File

@@ -5,6 +5,14 @@ use tracing::{debug, info, warn};
use super::MetricDataPoint; use super::MetricDataPoint;
/// ZMQ communication statistics per host
#[derive(Debug, Clone)]
pub struct ZmqStats {
pub packets_received: u64,
pub last_packet_time: Instant,
pub last_packet_age_secs: f64,
}
/// Central metric storage for the dashboard /// Central metric storage for the dashboard
pub struct MetricStore { pub struct MetricStore {
/// Current structured data: hostname -> AgentData /// Current structured data: hostname -> AgentData
@@ -13,6 +21,8 @@ pub struct MetricStore {
historical_metrics: HashMap<String, Vec<MetricDataPoint>>, historical_metrics: HashMap<String, Vec<MetricDataPoint>>,
/// Last heartbeat timestamp per host /// Last heartbeat timestamp per host
last_heartbeat: HashMap<String, Instant>, last_heartbeat: HashMap<String, Instant>,
/// ZMQ communication statistics per host
zmq_stats: HashMap<String, ZmqStats>,
/// Configuration /// Configuration
max_metrics_per_host: usize, max_metrics_per_host: usize,
history_retention: Duration, history_retention: Duration,
@@ -24,6 +34,7 @@ impl MetricStore {
current_agent_data: HashMap::new(), current_agent_data: HashMap::new(),
historical_metrics: HashMap::new(), historical_metrics: HashMap::new(),
last_heartbeat: HashMap::new(), last_heartbeat: HashMap::new(),
zmq_stats: HashMap::new(),
max_metrics_per_host, max_metrics_per_host,
history_retention: Duration::from_secs(history_retention_hours * 3600), history_retention: Duration::from_secs(history_retention_hours * 3600),
} }
@@ -44,6 +55,16 @@ impl MetricStore {
self.last_heartbeat.insert(hostname.clone(), now); self.last_heartbeat.insert(hostname.clone(), now);
debug!("Updated heartbeat for host {}", hostname); debug!("Updated heartbeat for host {}", hostname);
// Update ZMQ stats
let stats = self.zmq_stats.entry(hostname.clone()).or_insert(ZmqStats {
packets_received: 0,
last_packet_time: now,
last_packet_age_secs: 0.0,
});
stats.packets_received += 1;
stats.last_packet_time = now;
stats.last_packet_age_secs = 0.0; // Just received
// Add to history // Add to history
let host_history = self let host_history = self
.historical_metrics .historical_metrics
@@ -65,6 +86,15 @@ impl MetricStore {
self.current_agent_data.get(hostname) self.current_agent_data.get(hostname)
} }
/// Get ZMQ communication statistics for a host
pub fn get_zmq_stats(&mut self, hostname: &str) -> Option<ZmqStats> {
let now = Instant::now();
self.zmq_stats.get_mut(hostname).map(|stats| {
// Update packet age
stats.last_packet_age_secs = now.duration_since(stats.last_packet_time).as_secs_f64();
stats.clone()
})
}
/// Get connected hosts (hosts with recent heartbeats) /// Get connected hosts (hosts with recent heartbeats)
pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> { pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> {

View File

@@ -100,7 +100,7 @@ impl TuiApp {
} }
/// Update widgets with structured data from store (only for current host) /// Update widgets with structured data from store (only for current host)
pub fn update_metrics(&mut self, metric_store: &MetricStore) { pub fn update_metrics(&mut self, metric_store: &mut MetricStore) {
if let Some(hostname) = self.current_host.clone() { if let Some(hostname) = self.current_host.clone() {
// Get structured data for this host // Get structured data for this host
if let Some(agent_data) = metric_store.get_agent_data(&hostname) { if let Some(agent_data) = metric_store.get_agent_data(&hostname) {
@@ -110,6 +110,14 @@ impl TuiApp {
host_widgets.system_widget.update_from_agent_data(agent_data); host_widgets.system_widget.update_from_agent_data(agent_data);
host_widgets.services_widget.update_from_agent_data(agent_data); host_widgets.services_widget.update_from_agent_data(agent_data);
// Update ZMQ stats
if let Some(zmq_stats) = metric_store.get_zmq_stats(&hostname) {
host_widgets.system_widget.update_zmq_stats(
zmq_stats.packets_received,
zmq_stats.last_packet_age_secs
);
}
host_widgets.last_update = Some(Instant::now()); host_widgets.last_update = Some(Instant::now());
} }
} }

View File

@@ -32,6 +32,7 @@ struct ServiceInfo {
disk_gb: Option<f32>, disk_gb: Option<f32>,
metrics: Vec<(String, f32, Option<String>)>, // (label, value, unit) metrics: Vec<(String, f32, Option<String>)>, // (label, value, unit)
widget_status: Status, widget_status: Status,
service_type: String, // "nginx_site", "container", "image", or empty for parent services
} }
impl ServicesWidget { impl ServicesWidget {
@@ -169,7 +170,7 @@ impl ServicesWidget {
// Convert Status enum to display text for sub-services // Convert Status enum to display text for sub-services
match info.widget_status { match info.widget_status {
Status::Ok => "active", Status::Ok => "active",
Status::Inactive => "inactive", Status::Inactive => "inactive",
Status::Critical => "failed", Status::Critical => "failed",
Status::Pending => "pending", Status::Pending => "pending",
Status::Warning => "warning", Status::Warning => "warning",
@@ -179,32 +180,62 @@ impl ServicesWidget {
}; };
let tree_symbol = if is_last { "└─" } else { "├─" }; let tree_symbol = if is_last { "└─" } else { "├─" };
vec![ // Docker images use docker whale icon
// Indentation and tree prefix if info.service_type == "image" {
ratatui::text::Span::styled( vec![
format!(" {} ", tree_symbol), // Indentation and tree prefix
Typography::tree(), ratatui::text::Span::styled(
), format!(" {} ", tree_symbol),
// Status icon Typography::tree(),
ratatui::text::Span::styled( ),
format!("{} ", icon), // Docker icon (simple character for performance)
Style::default().fg(status_color).bg(Theme::background()), ratatui::text::Span::styled(
), "D ".to_string(),
// Service name Style::default().fg(Theme::highlight()).bg(Theme::background()),
ratatui::text::Span::styled( ),
format!("{:<18} ", short_name), // Service name
Style::default() ratatui::text::Span::styled(
.fg(Theme::secondary_text()) format!("{:<18} ", short_name),
.bg(Theme::background()), Style::default()
), .fg(Theme::secondary_text())
// Status/latency text .bg(Theme::background()),
ratatui::text::Span::styled( ),
status_str, // Status/metrics text
Style::default() ratatui::text::Span::styled(
.fg(Theme::secondary_text()) status_str,
.bg(Theme::background()), Style::default()
), .fg(Theme::secondary_text())
] .bg(Theme::background()),
),
]
} else {
vec![
// Indentation and tree prefix
ratatui::text::Span::styled(
format!(" {} ", tree_symbol),
Typography::tree(),
),
// Status icon
ratatui::text::Span::styled(
format!("{} ", icon),
Style::default().fg(status_color).bg(Theme::background()),
),
// Service name
ratatui::text::Span::styled(
format!("{:<18} ", short_name),
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
// Status/latency text
ratatui::text::Span::styled(
status_str,
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
]
}
} }
/// Move selection up /// Move selection up
@@ -282,9 +313,10 @@ impl Widget for ServicesWidget {
disk_gb: Some(service.disk_gb), disk_gb: Some(service.disk_gb),
metrics: Vec::new(), // Parent services don't have custom metrics metrics: Vec::new(), // Parent services don't have custom metrics
widget_status: service.service_status, widget_status: service.service_status,
service_type: String::new(), // Parent services have no type
}; };
self.parent_services.insert(service.name.clone(), parent_info); self.parent_services.insert(service.name.clone(), parent_info);
// Process sub-services if any // Process sub-services if any
if !service.sub_services.is_empty() { if !service.sub_services.is_empty() {
let mut sub_list = Vec::new(); let mut sub_list = Vec::new();
@@ -293,12 +325,13 @@ impl Widget for ServicesWidget {
let metrics: Vec<(String, f32, Option<String>)> = sub_service.metrics.iter() let metrics: Vec<(String, f32, Option<String>)> = sub_service.metrics.iter()
.map(|m| (m.label.clone(), m.value, m.unit.clone())) .map(|m| (m.label.clone(), m.value, m.unit.clone()))
.collect(); .collect();
let sub_info = ServiceInfo { let sub_info = ServiceInfo {
memory_mb: None, // Not used for sub-services memory_mb: None, // Not used for sub-services
disk_gb: None, // Not used for sub-services disk_gb: None, // Not used for sub-services
metrics, metrics,
widget_status: sub_service.service_status, widget_status: sub_service.service_status,
service_type: sub_service.service_type.clone(),
}; };
sub_list.push((sub_service.name.clone(), sub_info)); sub_list.push((sub_service.name.clone(), sub_info));
} }
@@ -342,6 +375,7 @@ impl ServicesWidget {
disk_gb: None, disk_gb: None,
metrics: Vec::new(), metrics: Vec::new(),
widget_status: Status::Unknown, widget_status: Status::Unknown,
service_type: String::new(),
}); });
if metric.name.ends_with("_status") { if metric.name.ends_with("_status") {
@@ -377,6 +411,7 @@ impl ServicesWidget {
disk_gb: None, disk_gb: None,
metrics: Vec::new(), metrics: Vec::new(),
widget_status: Status::Unknown, widget_status: Status::Unknown,
service_type: String::new(), // Unknown type in legacy path
}, },
)); ));
&mut sub_service_list.last_mut().unwrap().1 &mut sub_service_list.last_mut().unwrap().1

View File

@@ -15,6 +15,10 @@ pub struct SystemWidget {
nixos_build: Option<String>, nixos_build: Option<String>,
agent_hash: Option<String>, agent_hash: Option<String>,
// ZMQ communication stats
zmq_packets_received: Option<u64>,
zmq_last_packet_age: Option<f64>,
// Network interfaces // Network interfaces
network_interfaces: Vec<cm_dashboard_shared::NetworkInterfaceData>, network_interfaces: Vec<cm_dashboard_shared::NetworkInterfaceData>,
@@ -92,6 +96,8 @@ impl SystemWidget {
Self { Self {
nixos_build: None, nixos_build: None,
agent_hash: None, agent_hash: None,
zmq_packets_received: None,
zmq_last_packet_age: None,
network_interfaces: Vec::new(), network_interfaces: Vec::new(),
cpu_load_1min: None, cpu_load_1min: None,
cpu_load_5min: None, cpu_load_5min: None,
@@ -154,6 +160,12 @@ impl SystemWidget {
pub fn _get_agent_hash(&self) -> Option<&String> { pub fn _get_agent_hash(&self) -> Option<&String> {
self.agent_hash.as_ref() self.agent_hash.as_ref()
} }
/// Update ZMQ communication statistics
pub fn update_zmq_stats(&mut self, packets_received: u64, last_packet_age_secs: f64) {
self.zmq_packets_received = Some(packets_received);
self.zmq_last_packet_age = Some(last_packet_age_secs);
}
} }
use super::Widget; use super::Widget;
@@ -796,6 +808,18 @@ impl SystemWidget {
Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary()) Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary())
])); ]));
// ZMQ communication stats
if let (Some(packets), Some(age)) = (self.zmq_packets_received, self.zmq_last_packet_age) {
let age_text = if age < 1.0 {
format!("{:.0}ms ago", age * 1000.0)
} else {
format!("{:.1}s ago", age)
};
lines.push(Line::from(vec![
Span::styled(format!("ZMQ: {} pkts, last {}", packets, age_text), Typography::secondary())
]));
}
// CPU section // CPU section
lines.push(Line::from(vec![ lines.push(Line::from(vec![
Span::styled("CPU:", Typography::widget_title()) Span::styled("CPU:", Typography::widget_title())

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.178" version = "0.1.193"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -149,6 +149,9 @@ pub struct SubServiceData {
pub name: String, pub name: String,
pub service_status: Status, pub service_status: Status,
pub metrics: Vec<SubServiceMetric>, pub metrics: Vec<SubServiceMetric>,
/// Type of sub-service: "nginx_site", "container", "image"
#[serde(default)]
pub service_type: String,
} }
/// Individual metric for a sub-service /// Individual metric for a sub-service