Compare commits

...

25 Commits

Author SHA1 Message Date
01e1f33b66 Fix ZMQ sender blocking - move to independent thread with try_read
All checks were successful
Build and Release / build-and-release (push) Successful in 1m21s
CRITICAL FIX: The previous cached collector architecture still had ZMQ sending
in the main event loop, where it could block waiting for RwLock when collectors
were writing. This caused the 3-8 second delays you observed.

Changes:
- Move ZMQ publisher to dedicated std::thread (ZMQ sockets aren't thread-safe)
- Use try_read() instead of read() to avoid blocking on write locks
- Send previous data if cache is locked by collector
- ZMQ now sends every 2s regardless of collector timing
- Remove publisher from ZmqHandler (now only handles commands)

Architecture:
- Collectors: Independent tokio tasks updating shared cache
- ZMQ Sender: Dedicated OS thread with its own publisher socket
- Main Loop: Only handles commands and notifications

This ensures ZMQ transmission is NEVER blocked by slow collectors.

Bump version to v0.1.195
2025-11-27 22:56:58 +01:00
ed6399b914 Bump version to v0.1.194
All checks were successful
Build and Release / build-and-release (push) Successful in 1m20s
2025-11-27 22:46:17 +01:00
14618c59c6 Fix data duplication in cached collector architecture
Critical bug fix: Collectors were appending to Vecs instead of replacing them,
causing duplicate entries with each collection cycle.

Fixed by adding .clear() calls before populating:
- Memory collector: tmpfs Vec (was showing 11+ duplicates)
- Disk collector: drives and pools Vecs
- Systemd collector: services Vec
- Network collector: Already correct (assigns new Vec)

This prevents the exponential growth of duplicate entries in the dashboard UI.
2025-11-27 22:45:44 +01:00
2740de9b54 Implement cached collector architecture with configurable timeouts
All checks were successful
Build and Release / build-and-release (push) Successful in 1m20s
Major architectural refactor to eliminate false "host offline" alerts:

- Replace sequential blocking collectors with independent async tasks
- Each collector runs at configurable interval and updates shared cache
- ZMQ sender reads cache every 1-2s regardless of collector speed
- Collector intervals: CPU/Memory (1-10s), Backup/NixOS (30-60s), Disk/Systemd (60-300s)

All intervals now configurable via NixOS config:
- collectors.*.interval_seconds (collection frequency per collector)
- collectors.*.command_timeout_seconds (timeout for shell commands)
- notifications.check_interval_seconds (status change detection rate)

Command timeouts increased from hardcoded 2-3s to configurable 10-30s:
- Disk collector: 30s (SMART operations, lsblk)
- Systemd collector: 15s (systemctl, docker, du commands)
- Network collector: 10s (ip route, ip addr)

Benefits:
- No false "offline" alerts when slow collectors take >10s
- Different update rates for different metric types
- Better resource management with longer timeouts
- Full NixOS configuration control

Bump version to v0.1.193
2025-11-27 22:37:20 +01:00
37f2650200 Document cached collector architecture plan
Add architectural plan for separating ZMQ sending from data collection to prevent false 'host offline' alerts caused by slow collectors.

Key concepts:
- Shared cache (Arc<RwLock<AgentData>>)
- Independent async collector tasks with different update rates
- ZMQ sender always sends every 1s from cache
- Fast collectors (1s), medium (5s), slow (60s)
- No blocking regardless of collector speed
2025-11-27 21:49:44 +01:00
833010e270 Bump version to v0.1.192
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 18:34:53 +01:00
549d9d1c72 Replace whale emoji with ASCII 'D' for performance
Emoji rendering in terminals can be very slow, especially when rendered in the hot path (every frame for every docker image). The whale emoji 🐋 was causing significant rendering delays.

Temporary change to ASCII 'D' to test if emoji was the performance issue.
2025-11-27 18:34:27 +01:00
9b84b70581 Bump version to v0.1.191
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 18:16:49 +01:00
92c3ee3f2a Add Docker whale icon for docker images
Docker images now display with distinctive 🐋 whale icon in blue (highlight color) instead of status icons. This provides clear visual identification that these are docker images while not implying operational status.
2025-11-27 18:16:33 +01:00
1be55f765d Bump version to v0.1.190
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 18:09:49 +01:00
2f94a4b853 Add service_type field to separate data from presentation
Changes:
- Add service_type field to SubServiceData: 'nginx_site', 'container', 'image'
- Agent sends pure data without display formatting
- Dashboard checks service_type to decide presentation
- Docker images now display without status icon (service_type='image')
- Remove unused image_size_str from docker images tuple

Clean separation: agent provides data, dashboard handles display logic.
2025-11-27 18:09:20 +01:00
ff2b43827a Bump version to v0.1.189
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 17:57:38 +01:00
fac0188c6f Change docker image display format and status
Changes:
- Rename docker images from 'image_node:18...' to 'I node:18...' for conciseness
- Change image status from 'active' to 'inactive' for neutral informational display
- Images now show with gray empty circle ○ instead of green filled circle ●

Docker images are static artifacts without meaningful operational status, so using inactive status provides neutral gray display that won't trigger alerts or affect service status aggregation.
2025-11-27 17:57:24 +01:00
6bb350f016 Bump version to v0.1.188
All checks were successful
Build and Release / build-and-release (push) Successful in 1m8s
2025-11-27 16:39:46 +01:00
374b126446 Reduce all command timeouts to 2-3 seconds max
With 10-second host heartbeat timeout, all command timeouts must be significantly lower to ensure total collection time stays under 10 seconds.

Changed timeouts:
- smartctl: 10s → 3s (critical: multiple drives queried sequentially)
- du: 5s → 2s
- lsblk: 5s → 2s
- systemctl list commands: 5s → 3s
- systemctl show/is-active: 3s → 2s
- docker commands: 5s → 3s
- df, ip commands: 3s → 2s

Total worst-case collection time now capped at more reasonable levels, preventing false host offline alerts from blocking operations.
2025-11-27 16:38:54 +01:00
76c04633b5 Bump version to v0.1.187
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 16:34:42 +01:00
1e0510be81 Add comprehensive timeouts to all blocking system commands
Fixes random host disconnections caused by blocking operations preventing timely ZMQ packet transmission.

Changes:
- Add run_command_with_timeout() wrapper using tokio for async command execution
- Apply 10s timeout to smartctl (prevents 30+ second hangs on failing drives)
- Apply 5s timeout to du, lsblk, systemctl list commands
- Apply 3s timeout to systemctl show/is-active, df, ip commands
- Apply 2s timeout to hostname command
- Use system 'timeout' command for sync operations where async not needed

Critical fixes:
- smartctl: Failing drives could block for 30+ seconds per drive
- du: Large directories (Docker, PostgreSQL) could block 10-30+ seconds
- systemctl/docker: Commands could block indefinitely during system issues

With 1-second collection interval and 10-second heartbeat timeout, any blocking operation >10s causes false "host offline" alerts. These timeouts ensure collection completes quickly even during system degradation.
2025-11-27 16:34:08 +01:00
9a2df906ea Add ZMQ communication statistics tracking and display
All checks were successful
Build and Release / build-and-release (push) Successful in 1m10s
2025-11-27 16:14:45 +01:00
6d6beb207d Parse Docker image sizes to MB and sort services alphabetically
All checks were successful
Build and Release / build-and-release (push) Successful in 1m18s
2025-11-27 15:57:38 +01:00
7a68da01f5 Remove debug logging for NVMe SMART collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 15:40:16 +01:00
5be67fed64 Add debug logging for NVMe SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 15:00:48 +01:00
cac836601b Add NVMe device type flag for SMART data collection
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 13:34:30 +01:00
bd22ce265b Use direct smartctl with CAP_SYS_RAWIO instead of sudo
All checks were successful
Build and Release / build-and-release (push) Successful in 1m9s
2025-11-27 13:22:13 +01:00
bbc8b7b1cb Add info-level logging for SMART data collection debugging
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 13:15:53 +01:00
5dd8cadef3 Remove debug logging from Docker collection code
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
2025-11-27 12:50:20 +01:00
20 changed files with 580 additions and 232 deletions

View File

@@ -156,6 +156,86 @@ Complete migration from string-based metrics to structured JSON data. Eliminates
- ✅ Backward compatibility via bridge conversion to existing UI widgets
- ✅ All string parsing bugs eliminated
### Cached Collector Architecture (✅ IMPLEMENTED)
**Problem:** Blocking collectors prevent timely ZMQ transmission, causing false "host offline" alerts.
**Previous (Sequential Blocking):**
```
Every 1 second:
└─ collect_all_data() [BLOCKS for 2-10+ seconds]
├─ CPU (fast: 10ms)
├─ Memory (fast: 20ms)
├─ Disk SMART (slow: 3s per drive × 4 drives = 12s)
├─ Service disk usage (slow: 2-8s per service)
└─ Docker (medium: 500ms)
└─ send_via_zmq() [Only after ALL collection completes]
Result: If any collector takes >10s → "host offline" false alert
```
**New (Cached Independent Collectors):**
```
Shared Cache: Arc<RwLock<AgentData>>
Background Collectors (independent async tasks):
├─ Fast collectors (CPU, RAM, Network)
│ └─ Update cache every 1 second
├─ Medium collectors (Services, Docker)
│ └─ Update cache every 5 seconds
└─ Slow collectors (Disk usage, SMART data)
└─ Update cache every 60 seconds
ZMQ Sender (separate async task):
Every 1 second:
└─ Read current cache
└─ Send via ZMQ [Always instant, never blocked]
```
**Benefits:**
- ✅ ZMQ sends every 1 second regardless of collector speed
- ✅ No false "host offline" alerts from slow collectors
- ✅ Different update rates for different metrics (CPU=1s, SMART=60s)
- ✅ System stays responsive even with slow operations
- ✅ Slow collectors can use longer timeouts without blocking
**Implementation Details:**
- **Shared cache**: `Arc<RwLock<AgentData>>` initialized at agent startup
- **Collector intervals**: Fully configurable via NixOS config (`interval_seconds` per collector)
- Recommended: Fast (1-10s): CPU, Memory, Network
- Recommended: Medium (30-60s): Backup, NixOS
- Recommended: Slow (60-300s): Disk, Systemd
- **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()`
- **Cache updates**: Collectors acquire write lock → update → release immediately
- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts
- **Notification check**: Runs every `notifications.check_interval_seconds`
- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission
- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage)
**Configuration (NixOS):**
All intervals and timeouts configurable in `services/cm-dashboard.nix`:
Collection Intervals:
- `collectors.cpu.interval_seconds` (default: 10s)
- `collectors.memory.interval_seconds` (default: 2s)
- `collectors.disk.interval_seconds` (default: 300s)
- `collectors.systemd.interval_seconds` (default: 10s)
- `collectors.backup.interval_seconds` (default: 60s)
- `collectors.network.interval_seconds` (default: 10s)
- `collectors.nixos.interval_seconds` (default: 60s)
- `notifications.check_interval_seconds` (default: 30s)
- `collection_interval_seconds` - ZMQ transmission rate (default: 2s)
Command Timeouts (prevent resource leaks from hung commands):
- `collectors.disk.command_timeout_seconds` (default: 30s) - lsblk, smartctl, etc.
- `collectors.systemd.command_timeout_seconds` (default: 15s) - systemctl, docker, du
- `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr
**Code Locations:**
- agent/src/agent.rs:59-133 - Collector task spawning
- agent/src/agent.rs:151-179 - Independent collector task runner
- agent/src/agent.rs:199-207 - ZMQ sender in main loop
### Maintenance Mode
- Agent checks for `/tmp/cm-maintenance` file before sending notifications

6
Cargo.lock generated
View File

@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cm-dashboard"
version = "0.1.175"
version = "0.1.194"
dependencies = [
"anyhow",
"chrono",
@@ -301,7 +301,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-agent"
version = "0.1.175"
version = "0.1.194"
dependencies = [
"anyhow",
"async-trait",
@@ -324,7 +324,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-shared"
version = "0.1.175"
version = "0.1.194"
dependencies = [
"chrono",
"serde",

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-agent"
version = "0.1.178"
version = "0.1.195"
edition = "2021"
[dependencies]

View File

@@ -1,13 +1,14 @@
use anyhow::Result;
use gethostname::gethostname;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::interval;
use tracing::{debug, error, info};
use crate::communication::{AgentCommand, ZmqHandler};
use crate::config::AgentConfig;
use crate::collectors::{
Collector,
backup::BackupCollector,
cpu::CpuCollector,
disk::DiskCollector,
@@ -23,7 +24,7 @@ pub struct Agent {
hostname: String,
config: AgentConfig,
zmq_handler: ZmqHandler,
collectors: Vec<Box<dyn Collector>>,
cache: Arc<RwLock<AgentData>>,
notification_manager: NotificationManager,
previous_status: Option<SystemStatus>,
}
@@ -55,39 +56,94 @@ impl Agent {
config.zmq.publisher_port
);
// Initialize collectors
let mut collectors: Vec<Box<dyn Collector>> = Vec::new();
// Add enabled collectors
// Initialize shared cache
let cache = Arc::new(RwLock::new(AgentData::new(
hostname.clone(),
env!("CARGO_PKG_VERSION").to_string()
)));
info!("Initialized shared agent data cache");
// Spawn independent collector tasks
let mut collector_count = 0;
// CPU collector
if config.collectors.cpu.enabled {
collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone())));
let cache_clone = cache.clone();
let collector = CpuCollector::new(config.collectors.cpu.clone());
let interval = config.collectors.cpu.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "CPU").await;
});
collector_count += 1;
}
// Memory collector
if config.collectors.memory.enabled {
collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone())));
}
if config.collectors.disk.enabled {
collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone())));
}
if config.collectors.systemd.enabled {
collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone())));
}
if config.collectors.backup.enabled {
collectors.push(Box::new(BackupCollector::new()));
let cache_clone = cache.clone();
let collector = MemoryCollector::new(config.collectors.memory.clone());
let interval = config.collectors.memory.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Memory").await;
});
collector_count += 1;
}
// Network collector
if config.collectors.network.enabled {
collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone())));
let cache_clone = cache.clone();
let collector = NetworkCollector::new(config.collectors.network.clone());
let interval = config.collectors.network.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Network").await;
});
collector_count += 1;
}
// Backup collector
if config.collectors.backup.enabled {
let cache_clone = cache.clone();
let collector = BackupCollector::new();
let interval = config.collectors.backup.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Backup").await;
});
collector_count += 1;
}
// NixOS collector
if config.collectors.nixos.enabled {
collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone())));
let cache_clone = cache.clone();
let collector = NixOSCollector::new(config.collectors.nixos.clone());
let interval = config.collectors.nixos.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "NixOS").await;
});
collector_count += 1;
}
info!("Initialized {} collectors", collectors.len());
// Disk collector
if config.collectors.disk.enabled {
let cache_clone = cache.clone();
let collector = DiskCollector::new(config.collectors.disk.clone());
let interval = config.collectors.disk.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Disk").await;
});
collector_count += 1;
}
// Systemd collector
if config.collectors.systemd.enabled {
let cache_clone = cache.clone();
let collector = SystemdCollector::new(config.collectors.systemd.clone());
let interval = config.collectors.systemd.interval_seconds;
tokio::spawn(async move {
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Systemd").await;
});
collector_count += 1;
}
info!("Spawned {} independent collector tasks", collector_count);
// Initialize notification manager
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
@@ -97,45 +153,121 @@ impl Agent {
hostname,
config,
zmq_handler,
collectors,
cache,
notification_manager,
previous_status: None,
})
}
/// Main agent loop with structured data collection
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
info!("Starting agent main loop");
/// Independent collector task runner
async fn run_collector_task<C>(
cache: Arc<RwLock<AgentData>>,
collector: C,
interval_duration: Duration,
name: &str,
) where
C: crate::collectors::Collector + Send + 'static,
{
let mut interval_timer = interval(interval_duration);
info!("{} collector task started (interval: {:?})", name, interval_duration);
// Initial collection
if let Err(e) = self.collect_and_broadcast().await {
error!("Initial metric collection failed: {}", e);
loop {
interval_timer.tick().await;
// Acquire write lock and update cache
{
let mut agent_data = cache.write().await;
match collector.collect_structured(&mut *agent_data).await {
Ok(_) => {
debug!("{} collector updated cache", name);
}
Err(e) => {
error!("{} collector failed: {}", name, e);
}
}
} // Release lock immediately after collection
}
}
// Set up intervals
let mut transmission_interval = interval(Duration::from_secs(
self.config.collection_interval_seconds,
/// Main agent loop with cached data architecture
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
info!("Starting agent main loop with cached collector architecture");
// Spawn independent ZMQ sender task
// Create dedicated ZMQ publisher for the sender task
let cache_clone = self.cache.clone();
let publisher_config = self.config.zmq.clone();
let transmission_interval_secs = self.config.collection_interval_seconds;
std::thread::spawn(move || {
// Create ZMQ publisher in this thread (ZMQ sockets are not thread-safe)
let context = zmq::Context::new();
let publisher = context.socket(zmq::SocketType::PUB).unwrap();
let bind_address = format!("tcp://{}:{}", publisher_config.bind_address, publisher_config.publisher_port);
publisher.bind(&bind_address).unwrap();
publisher.set_sndhwm(1000).unwrap();
publisher.set_linger(1000).unwrap();
info!("ZMQ sender task started on {} (interval: {}s)", bind_address, transmission_interval_secs);
let mut last_sent_data: Option<AgentData> = None;
let interval_duration = std::time::Duration::from_secs(transmission_interval_secs);
let mut next_send = std::time::Instant::now() + interval_duration;
loop {
// Sleep until next send time
std::thread::sleep(next_send.saturating_duration_since(std::time::Instant::now()));
next_send = std::time::Instant::now() + interval_duration;
// Try to read cache without blocking - if locked, send last known data
let data_to_send = match cache_clone.try_read() {
Ok(agent_data) => {
let data_clone = agent_data.clone();
drop(agent_data); // Release lock immediately
last_sent_data = Some(data_clone.clone());
Some(data_clone)
}
Err(_) => {
// Lock is held by collector - use last sent data
debug!("Cache locked by collector, sending previous data");
last_sent_data.clone()
}
};
if let Some(data) = data_to_send {
// Publish via ZMQ
if let Ok(envelope) = cm_dashboard_shared::MessageEnvelope::agent_data(data) {
if let Ok(serialized) = serde_json::to_vec(&envelope) {
if let Err(e) = publisher.send(&serialized, 0) {
error!("Failed to send ZMQ message: {}", e);
} else {
debug!("Successfully broadcast agent data");
}
}
}
}
}
});
// Set up intervals for notifications and commands
let mut notification_interval = interval(Duration::from_secs(
self.config.notifications.check_interval_seconds,
));
let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s
let mut command_interval = interval(Duration::from_millis(100));
// Skip initial ticks to avoid immediate execution
transmission_interval.tick().await;
// Skip initial ticks
notification_interval.tick().await;
command_interval.tick().await;
loop {
tokio::select! {
_ = transmission_interval.tick() => {
if let Err(e) = self.collect_and_broadcast().await {
error!("Failed to collect and broadcast metrics: {}", e);
_ = notification_interval.tick() => {
// Read cache and check for status changes
let agent_data = self.cache.read().await.clone();
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
error!("Failed to check status changes: {}", e);
}
}
_ = notification_interval.tick() => {
// Process any pending notifications
// NOTE: With structured data, we might need to implement status tracking differently
// For now, we skip this until status evaluation is migrated
}
// Handle incoming commands (check periodically)
_ = tokio::time::sleep(Duration::from_millis(100)) => {
_ = command_interval.tick() => {
if let Err(e) = self.handle_commands().await {
error!("Error handling commands: {}", e);
}
@@ -151,35 +283,6 @@ impl Agent {
Ok(())
}
/// Collect structured data from all collectors and broadcast via ZMQ
async fn collect_and_broadcast(&mut self) -> Result<()> {
debug!("Starting structured data collection");
// Initialize empty AgentData
let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
// Collect data from all collectors
for collector in &self.collectors {
if let Err(e) = collector.collect_structured(&mut agent_data).await {
error!("Collector failed: {}", e);
// Continue with other collectors even if one fails
}
}
// Check for status changes and send notifications
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
error!("Failed to check status changes: {}", e);
}
// Broadcast the structured data via ZMQ
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
error!("Failed to broadcast agent data: {}", e);
} else {
debug!("Successfully broadcast structured agent data");
}
Ok(())
}
/// Check for status changes and send notifications
async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> {
@@ -267,10 +370,10 @@ impl Agent {
match command {
AgentCommand::CollectNow => {
info!("Received immediate collection request");
if let Err(e) = self.collect_and_broadcast().await {
error!("Failed to collect on demand: {}", e);
}
info!("Received immediate transmission request");
// With cached architecture and dedicated ZMQ sender thread,
// data is already being sent every interval
// This command is acknowledged but not actionable in new architecture
}
AgentCommand::SetInterval { seconds } => {
info!("Received interval change request: {}s", seconds);

View File

@@ -112,9 +112,12 @@ impl DiskCollector {
/// Get block devices and their mount points using lsblk
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
let output = Command::new("lsblk")
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
.output()
use super::run_command_with_timeout;
let mut cmd = Command::new("lsblk");
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await
.map_err(|e| CollectorError::SystemRead {
path: "block devices".to_string(),
error: e.to_string(),
@@ -186,8 +189,8 @@ impl DiskCollector {
/// Get filesystem info for a single mount point
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
let output = Command::new("df")
.args(&["--block-size=1", mount_point])
let output = std::process::Command::new("timeout")
.args(&["2", "df", "--block-size=1", mount_point])
.output()
.map_err(|e| CollectorError::SystemRead {
path: format!("df {}", mount_point),
@@ -386,7 +389,7 @@ impl DiskCollector {
/// Get SMART data for drives
async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> {
let mut smart_data = HashMap::new();
// Collect all drive names
let mut all_drives = std::collections::HashSet::new();
for drive in physical_drives {
@@ -413,23 +416,26 @@ impl DiskCollector {
/// Get SMART data for a single drive
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
let output = Command::new("sudo")
.args(&["smartctl", "-a", &format!("/dev/{}", drive_name)])
.output()
use super::run_command_with_timeout;
// Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
// For NVMe drives, specify device type explicitly
let mut cmd = Command::new("smartctl");
if drive_name.starts_with("nvme") {
cmd.args(&["-d", "nvme", "-a", &format!("/dev/{}", drive_name)]);
} else {
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
}
let output = run_command_with_timeout(cmd, 3).await
.map_err(|e| CollectorError::SystemRead {
path: format!("SMART data for {}", drive_name),
error: e.to_string(),
})?;
let output_str = String::from_utf8_lossy(&output.stdout);
let error_str = String::from_utf8_lossy(&output.stderr);
// Debug logging for SMART command results
debug!("SMART output for {}: status={}, stdout_len={}, stderr={}",
drive_name, output.status, output_str.len(), error_str);
if !output.status.success() {
debug!("SMART command failed for {}: {}", drive_name, error_str);
// Return unknown data rather than failing completely
return Ok(SmartData {
health: "UNKNOWN".to_string(),
@@ -524,6 +530,9 @@ impl DiskCollector {
/// Populate drives data into AgentData
fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing drives data to prevent duplicates in cached architecture
agent_data.system.storage.drives.clear();
for drive in physical_drives {
let smart = smart_data.get(&drive.name);
@@ -561,6 +570,9 @@ impl DiskCollector {
/// Populate pools data into AgentData
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing pools data to prevent duplicates in cached architecture
agent_data.system.storage.pools.clear();
for pool in mergerfs_pools {
// Calculate pool health and statuses based on member drive health
let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);
@@ -756,9 +768,9 @@ impl DiskCollector {
/// Get drive information for a mount path
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
// Use lsblk to find the backing device
let output = Command::new("lsblk")
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
// Use lsblk to find the backing device with timeout
let output = Command::new("timeout")
.args(&["2", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"])
.output()
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;

View File

@@ -97,20 +97,23 @@ impl MemoryCollector {
/// Populate tmpfs data into AgentData
async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing tmpfs data to prevent duplicates in cached architecture
agent_data.system.memory.tmpfs.clear();
// Discover all tmpfs mount points
let tmpfs_mounts = self.discover_tmpfs_mounts()?;
if tmpfs_mounts.is_empty() {
debug!("No tmpfs mounts found to monitor");
return Ok(());
}
// Get usage data for all tmpfs mounts at once using df
let mut df_args = vec!["df", "--output=target,size,used", "--block-size=1"];
// Get usage data for all tmpfs mounts at once using df (with 2 second timeout)
let mut df_args = vec!["2", "df", "--output=target,size,used", "--block-size=1"];
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
let df_output = std::process::Command::new(df_args[0])
.args(&df_args[1..])
let df_output = std::process::Command::new("timeout")
.args(&df_args[..])
.output()
.map_err(|e| CollectorError::SystemRead {
path: "tmpfs mounts".to_string(),

View File

@@ -1,6 +1,8 @@
use async_trait::async_trait;
use cm_dashboard_shared::{AgentData};
use std::process::{Command, Output};
use std::time::Duration;
use tokio::time::timeout;
pub mod backup;
pub mod cpu;
@@ -13,6 +15,20 @@ pub mod systemd;
pub use error::CollectorError;
/// Run a command with a timeout to prevent blocking
pub async fn run_command_with_timeout(mut cmd: Command, timeout_secs: u64) -> std::io::Result<Output> {
let timeout_duration = Duration::from_secs(timeout_secs);
match timeout(timeout_duration, tokio::task::spawn_blocking(move || cmd.output())).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Command timed out after {} seconds", timeout_secs)
)),
}
}
/// Base trait for all collectors with direct structured data output
#[async_trait]

View File

@@ -8,12 +8,12 @@ use crate::config::NetworkConfig;
/// Network interface collector with physical/virtual classification and link status
pub struct NetworkCollector {
_config: NetworkConfig,
config: NetworkConfig,
}
impl NetworkCollector {
pub fn new(config: NetworkConfig) -> Self {
Self { _config: config }
Self { config }
}
/// Check if interface is physical (not virtual)
@@ -50,8 +50,9 @@ impl NetworkCollector {
}
/// Get the primary physical interface (the one with default route)
fn get_primary_physical_interface() -> Option<String> {
match Command::new("ip").args(["route", "show", "default"]).output() {
fn get_primary_physical_interface(&self) -> Option<String> {
let timeout_str = self.config.command_timeout_seconds.to_string();
match Command::new("timeout").args([&timeout_str, "ip", "route", "show", "default"]).output() {
Ok(output) if output.status.success() => {
let output_str = String::from_utf8_lossy(&output.stdout);
// Parse: "default via 192.168.1.1 dev eno1 ..."
@@ -110,7 +111,8 @@ impl NetworkCollector {
// Parse VLAN configuration
let vlan_map = Self::parse_vlan_config();
match Command::new("ip").args(["-j", "addr"]).output() {
let timeout_str = self.config.command_timeout_seconds.to_string();
match Command::new("timeout").args([&timeout_str, "ip", "-j", "addr"]).output() {
Ok(output) if output.status.success() => {
let json_str = String::from_utf8_lossy(&output.stdout);
@@ -195,7 +197,7 @@ impl NetworkCollector {
}
// Assign primary physical interface as parent to virtual interfaces without explicit parent
let primary_interface = Self::get_primary_physical_interface();
let primary_interface = self.get_primary_physical_interface();
if let Some(primary) = primary_interface {
for interface in interfaces.iter_mut() {
// Only assign parent to virtual interfaces that don't already have one

View File

@@ -43,8 +43,8 @@ impl NixOSCollector {
match fs::read_to_string("/etc/hostname") {
Ok(hostname) => Some(hostname.trim().to_string()),
Err(_) => {
// Fallback to hostname command
match Command::new("hostname").output() {
// Fallback to hostname command (with 2 second timeout)
match Command::new("timeout").args(["2", "hostname"]).output() {
Ok(output) => Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
Err(_) => None,
}

View File

@@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr
use std::process::Command;
use std::sync::RwLock;
use std::time::Instant;
use tracing::{debug, info};
use tracing::{debug, warn};
use super::{Collector, CollectorError};
use crate::config::SystemdConfig;
@@ -113,15 +113,13 @@ impl SystemdCollector {
name: site_name.clone(),
service_status: self.calculate_service_status(&site_name, &site_status),
metrics,
service_type: "nginx_site".to_string(),
});
}
}
if service_name.contains("docker") && active_status == "active" {
info!("Collecting Docker sub-services for service: {}", service_name);
let docker_containers = self.get_docker_containers();
info!("Found {} Docker containers", docker_containers.len());
for (container_name, container_status) in docker_containers {
// For now, docker containers have no additional metrics
// Future: could add memory_mb, cpu_percent, restart_count, etc.
@@ -131,28 +129,27 @@ impl SystemdCollector {
name: container_name.clone(),
service_status: self.calculate_service_status(&container_name, &container_status),
metrics,
service_type: "container".to_string(),
});
}
// Add Docker images
let docker_images = self.get_docker_images();
info!("Found {} Docker images", docker_images.len());
for (image_name, image_status, image_size) in docker_images {
for (image_name, image_status, image_size_mb) in docker_images {
let mut metrics = Vec::new();
metrics.push(SubServiceMetric {
label: "size".to_string(),
value: 0.0, // Size as string in name instead
unit: None,
value: image_size_mb,
unit: Some("MB".to_string()),
});
sub_services.push(SubServiceData {
name: format!("{} ({})", image_name, image_size),
name: image_name.to_string(),
service_status: self.calculate_service_status(&image_name, &image_status),
metrics,
service_type: "image".to_string(),
});
}
info!("Total Docker sub-services added: {}", sub_services.len());
}
// Create complete service data
@@ -175,6 +172,10 @@ impl SystemdCollector {
}
}
// Sort services alphabetically by name
agent_data.services.sort_by(|a, b| a.name.cmp(&b.name));
complete_service_data.sort_by(|a, b| a.name.cmp(&b.name));
// Update cached state
{
let mut state = self.state.write().unwrap();
@@ -254,8 +255,9 @@ impl SystemdCollector {
/// Auto-discover interesting services to monitor
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
// First: Get all service unit files
let unit_files_output = Command::new("systemctl")
.args(&["list-unit-files", "--type=service", "--no-pager", "--plain"])
let timeout_str = self.config.command_timeout_seconds.to_string();
let unit_files_output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
.output()?;
if !unit_files_output.status.success() {
@@ -263,8 +265,8 @@ impl SystemdCollector {
}
// Second: Get runtime status of all units
let units_status_output = Command::new("systemctl")
.args(&["list-units", "--type=service", "--all", "--no-pager", "--plain"])
let units_status_output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
.output()?;
if !units_status_output.status.success() {
@@ -361,15 +363,16 @@ impl SystemdCollector {
}
// Fallback to systemctl if not in cache
let output = Command::new("systemctl")
.args(&["is-active", &format!("{}.service", service)])
let timeout_str = self.config.command_timeout_seconds.to_string();
let output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "is-active", &format!("{}.service", service)])
.output()?;
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
// Get more detailed info
let output = Command::new("systemctl")
.args(&["show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
let output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
.output()?;
let detailed_info = String::from_utf8(output.stdout)?;
@@ -421,7 +424,7 @@ impl SystemdCollector {
if let Some(dirs) = self.config.service_directories.get(service_name) {
// Service has configured paths - use the first accessible one
for dir in dirs {
if let Some(size) = self.get_directory_size(dir) {
if let Some(size) = self.get_directory_size(dir).await {
return Ok(size);
}
}
@@ -430,8 +433,9 @@ impl SystemdCollector {
}
// No configured path - try to get WorkingDirectory from systemctl
let output = Command::new("systemctl")
.args(&["show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
let timeout_str = self.config.command_timeout_seconds.to_string();
let output = Command::new("timeout")
.args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
.output()
.map_err(|e| CollectorError::SystemRead {
path: format!("WorkingDirectory for {}", service_name),
@@ -443,7 +447,7 @@ impl SystemdCollector {
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
if !dir.is_empty() && dir != "/" {
return Ok(self.get_directory_size(dir).unwrap_or(0.0));
return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
}
}
}
@@ -452,17 +456,22 @@ impl SystemdCollector {
}
/// Get size of a directory in GB
fn get_directory_size(&self, path: &str) -> Option<f32> {
let output = Command::new("sudo")
.args(&["du", "-sb", path])
.output()
.ok()?;
async fn get_directory_size(&self, path: &str) -> Option<f32> {
use super::run_command_with_timeout;
// Use -s (summary) and --apparent-size for speed
let mut cmd = Command::new("sudo");
cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]);
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await.ok()?;
if !output.status.success() {
// Log permission errors for debugging but don't spam logs
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("Permission denied") {
debug!("Permission denied accessing directory: {}", path);
} else if stderr.contains("timed out") {
warn!("Directory size check timed out for {}", path);
} else {
debug!("Failed to get size for directory {}: {}", path, stderr);
}
@@ -781,8 +790,9 @@ impl SystemdCollector {
// Check if docker is available (cm-agent user is in docker group)
// Use -a to show ALL containers (running and stopped)
let output = Command::new("docker")
.args(&["ps", "-a", "--format", "{{.Names}},{{.Status}}"])
let timeout_str = self.config.command_timeout_seconds.to_string();
let output = Command::new("timeout")
.args(&[&timeout_str, "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
.output();
let output = match output {
@@ -821,25 +831,20 @@ impl SystemdCollector {
}
/// Get docker images as sub-services
fn get_docker_images(&self) -> Vec<(String, String, String)> {
fn get_docker_images(&self) -> Vec<(String, String, f32)> {
let mut images = Vec::new();
info!("Collecting Docker images");
// Check if docker is available (cm-agent user is in docker group)
let output = Command::new("docker")
.args(&["images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
let timeout_str = self.config.command_timeout_seconds.to_string();
let output = Command::new("timeout")
.args(&[&timeout_str, "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
.output();
let output = match output {
Ok(out) if out.status.success() => out,
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
info!("Docker images command failed with status: {}, stderr: {}", out.status, stderr);
Ok(_) => {
return images;
}
Err(e) => {
info!("Docker images command error: {}", e);
Err(_) => {
return images;
}
};
@@ -857,28 +862,62 @@ impl SystemdCollector {
let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2 {
let image_name = parts[0].trim();
let size = parts[1].trim();
let size_str = parts[1].trim();
// Skip <none>:<none> images (dangling images)
if image_name.contains("<none>") {
continue;
}
// Parse size to MB (sizes come as "142MB", "1.5GB", "512kB", etc.)
let size_mb = self.parse_docker_size(size_str);
images.push((
format!("image_{}", image_name),
"active".to_string(), // Images are always "active" (present)
size.to_string()
image_name.to_string(),
"inactive".to_string(), // Images are informational - use inactive for neutral display
size_mb
));
}
}
images
}
/// Parse Docker size string to MB
fn parse_docker_size(&self, size_str: &str) -> f32 {
let size_upper = size_str.to_uppercase();
// Extract numeric part and unit
let mut num_str = String::new();
let mut unit = String::new();
for ch in size_upper.chars() {
if ch.is_ascii_digit() || ch == '.' {
num_str.push(ch);
} else if ch.is_alphabetic() {
unit.push(ch);
}
}
let value: f32 = num_str.parse().unwrap_or(0.0);
// Convert to MB
match unit.as_str() {
"KB" | "K" => value / 1024.0,
"MB" | "M" => value,
"GB" | "G" => value * 1024.0,
"TB" | "T" => value * 1024.0 * 1024.0,
_ => value, // Assume bytes if no unit
}
}
}
#[async_trait]
impl Collector for SystemdCollector {
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear existing services data to prevent duplicates in cached architecture
agent_data.services.clear();
// Use cached complete data if available and fresh
if let Some(cached_complete_services) = self.get_cached_complete_services() {
for service_data in cached_complete_services {

View File

@@ -1,13 +1,12 @@
use anyhow::Result;
use cm_dashboard_shared::{AgentData, MessageEnvelope};
use tracing::{debug, info};
use zmq::{Context, Socket, SocketType};
use crate::config::ZmqConfig;
/// ZMQ communication handler for publishing metrics and receiving commands
/// ZMQ communication handler for receiving commands
/// NOTE: Publishing is handled by dedicated thread in Agent::run()
pub struct ZmqHandler {
publisher: Socket,
command_receiver: Socket,
}
@@ -15,17 +14,6 @@ impl ZmqHandler {
pub async fn new(config: &ZmqConfig) -> Result<Self> {
let context = Context::new();
// Create publisher socket for metrics
let publisher = context.socket(SocketType::PUB)?;
let pub_bind_address = format!("tcp://{}:{}", config.bind_address, config.publisher_port);
publisher.bind(&pub_bind_address)?;
info!("ZMQ publisher bound to {}", pub_bind_address);
// Set socket options for efficiency
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
publisher.set_linger(1000)?; // Linger time on close
// Create command receiver socket (PULL socket to receive commands from dashboard)
let command_receiver = context.socket(SocketType::PULL)?;
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
@@ -38,33 +26,10 @@ impl ZmqHandler {
command_receiver.set_linger(1000)?;
Ok(Self {
publisher,
command_receiver,
})
}
/// Publish agent data via ZMQ
pub async fn publish_agent_data(&self, data: &AgentData) -> Result<()> {
debug!(
"Publishing agent data for host {}",
data.hostname
);
// Create message envelope for agent data
let envelope = MessageEnvelope::agent_data(data.clone())
.map_err(|e| anyhow::anyhow!("Failed to create agent data envelope: {}", e))?;
// Serialize envelope
let serialized = serde_json::to_vec(&envelope)?;
// Send via ZMQ
self.publisher.send(&serialized, 0)?;
debug!("Published agent data message ({} bytes)", serialized.len());
Ok(())
}
/// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {

View File

@@ -79,6 +79,9 @@ pub struct DiskConfig {
pub temperature_critical_celsius: f32,
pub wear_warning_percent: f32,
pub wear_critical_percent: f32,
/// Command timeout in seconds for lsblk, smartctl, etc.
#[serde(default = "default_disk_command_timeout")]
pub command_timeout_seconds: u64,
}
/// Filesystem configuration entry
@@ -108,6 +111,9 @@ pub struct SystemdConfig {
pub http_timeout_seconds: u64,
pub http_connect_timeout_seconds: u64,
pub nginx_latency_critical_ms: f32,
/// Command timeout in seconds for systemctl, docker, du commands
#[serde(default = "default_systemd_command_timeout")]
pub command_timeout_seconds: u64,
}
@@ -132,6 +138,9 @@ pub struct BackupConfig {
pub struct NetworkConfig {
pub enabled: bool,
pub interval_seconds: u64,
/// Command timeout in seconds for ip route, ip addr commands
#[serde(default = "default_network_command_timeout")]
pub command_timeout_seconds: u64,
}
/// Notification configuration
@@ -145,6 +154,9 @@ pub struct NotificationConfig {
pub rate_limit_minutes: u64,
/// Email notification batching interval in seconds (default: 60)
pub aggregation_interval_seconds: u64,
/// Status check interval in seconds for detecting changes (default: 30)
#[serde(default = "default_notification_check_interval")]
pub check_interval_seconds: u64,
/// List of metric names to exclude from email notifications
#[serde(default)]
pub exclude_email_metrics: Vec<String>,
@@ -158,10 +170,26 @@ fn default_heartbeat_interval_seconds() -> u64 {
5
}
fn default_notification_check_interval() -> u64 {
30
}
fn default_maintenance_mode_file() -> String {
"/tmp/cm-maintenance".to_string()
}
fn default_disk_command_timeout() -> u64 {
30
}
fn default_systemd_command_timeout() -> u64 {
15
}
fn default_network_command_timeout() -> u64 {
10
}
impl AgentConfig {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
loader::load_config(path)

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard"
version = "0.1.178"
version = "0.1.195"
edition = "2021"
[dependencies]

View File

@@ -215,7 +215,7 @@ impl Dashboard {
// Update TUI with new metrics (only if not headless)
if let Some(ref mut tui_app) = self.tui_app {
tui_app.update_metrics(&self.metric_store);
tui_app.update_metrics(&mut self.metric_store);
}
}

View File

@@ -5,6 +5,14 @@ use tracing::{debug, info, warn};
use super::MetricDataPoint;
/// ZMQ communication statistics per host
#[derive(Debug, Clone)]
pub struct ZmqStats {
pub packets_received: u64,
pub last_packet_time: Instant,
pub last_packet_age_secs: f64,
}
/// Central metric storage for the dashboard
pub struct MetricStore {
/// Current structured data: hostname -> AgentData
@@ -13,6 +21,8 @@ pub struct MetricStore {
historical_metrics: HashMap<String, Vec<MetricDataPoint>>,
/// Last heartbeat timestamp per host
last_heartbeat: HashMap<String, Instant>,
/// ZMQ communication statistics per host
zmq_stats: HashMap<String, ZmqStats>,
/// Configuration
max_metrics_per_host: usize,
history_retention: Duration,
@@ -24,6 +34,7 @@ impl MetricStore {
current_agent_data: HashMap::new(),
historical_metrics: HashMap::new(),
last_heartbeat: HashMap::new(),
zmq_stats: HashMap::new(),
max_metrics_per_host,
history_retention: Duration::from_secs(history_retention_hours * 3600),
}
@@ -44,6 +55,16 @@ impl MetricStore {
self.last_heartbeat.insert(hostname.clone(), now);
debug!("Updated heartbeat for host {}", hostname);
// Update ZMQ stats
let stats = self.zmq_stats.entry(hostname.clone()).or_insert(ZmqStats {
packets_received: 0,
last_packet_time: now,
last_packet_age_secs: 0.0,
});
stats.packets_received += 1;
stats.last_packet_time = now;
stats.last_packet_age_secs = 0.0; // Just received
// Add to history
let host_history = self
.historical_metrics
@@ -65,6 +86,15 @@ impl MetricStore {
self.current_agent_data.get(hostname)
}
/// Get ZMQ communication statistics for a host
pub fn get_zmq_stats(&mut self, hostname: &str) -> Option<ZmqStats> {
let now = Instant::now();
self.zmq_stats.get_mut(hostname).map(|stats| {
// Update packet age
stats.last_packet_age_secs = now.duration_since(stats.last_packet_time).as_secs_f64();
stats.clone()
})
}
/// Get connected hosts (hosts with recent heartbeats)
pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> {

View File

@@ -100,7 +100,7 @@ impl TuiApp {
}
/// Update widgets with structured data from store (only for current host)
pub fn update_metrics(&mut self, metric_store: &MetricStore) {
pub fn update_metrics(&mut self, metric_store: &mut MetricStore) {
if let Some(hostname) = self.current_host.clone() {
// Get structured data for this host
if let Some(agent_data) = metric_store.get_agent_data(&hostname) {
@@ -110,6 +110,14 @@ impl TuiApp {
host_widgets.system_widget.update_from_agent_data(agent_data);
host_widgets.services_widget.update_from_agent_data(agent_data);
// Update ZMQ stats
if let Some(zmq_stats) = metric_store.get_zmq_stats(&hostname) {
host_widgets.system_widget.update_zmq_stats(
zmq_stats.packets_received,
zmq_stats.last_packet_age_secs
);
}
host_widgets.last_update = Some(Instant::now());
}
}

View File

@@ -32,6 +32,7 @@ struct ServiceInfo {
disk_gb: Option<f32>,
metrics: Vec<(String, f32, Option<String>)>, // (label, value, unit)
widget_status: Status,
service_type: String, // "nginx_site", "container", "image", or empty for parent services
}
impl ServicesWidget {
@@ -169,7 +170,7 @@ impl ServicesWidget {
// Convert Status enum to display text for sub-services
match info.widget_status {
Status::Ok => "active",
Status::Inactive => "inactive",
Status::Inactive => "inactive",
Status::Critical => "failed",
Status::Pending => "pending",
Status::Warning => "warning",
@@ -179,32 +180,62 @@ impl ServicesWidget {
};
let tree_symbol = if is_last { "└─" } else { "├─" };
vec![
// Indentation and tree prefix
ratatui::text::Span::styled(
format!(" {} ", tree_symbol),
Typography::tree(),
),
// Status icon
ratatui::text::Span::styled(
format!("{} ", icon),
Style::default().fg(status_color).bg(Theme::background()),
),
// Service name
ratatui::text::Span::styled(
format!("{:<18} ", short_name),
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
// Status/latency text
ratatui::text::Span::styled(
status_str,
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
]
// Docker images use docker whale icon
if info.service_type == "image" {
vec![
// Indentation and tree prefix
ratatui::text::Span::styled(
format!(" {} ", tree_symbol),
Typography::tree(),
),
// Docker icon (simple character for performance)
ratatui::text::Span::styled(
"D ".to_string(),
Style::default().fg(Theme::highlight()).bg(Theme::background()),
),
// Service name
ratatui::text::Span::styled(
format!("{:<18} ", short_name),
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
// Status/metrics text
ratatui::text::Span::styled(
status_str,
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
]
} else {
vec![
// Indentation and tree prefix
ratatui::text::Span::styled(
format!(" {} ", tree_symbol),
Typography::tree(),
),
// Status icon
ratatui::text::Span::styled(
format!("{} ", icon),
Style::default().fg(status_color).bg(Theme::background()),
),
// Service name
ratatui::text::Span::styled(
format!("{:<18} ", short_name),
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
// Status/latency text
ratatui::text::Span::styled(
status_str,
Style::default()
.fg(Theme::secondary_text())
.bg(Theme::background()),
),
]
}
}
/// Move selection up
@@ -282,9 +313,10 @@ impl Widget for ServicesWidget {
disk_gb: Some(service.disk_gb),
metrics: Vec::new(), // Parent services don't have custom metrics
widget_status: service.service_status,
service_type: String::new(), // Parent services have no type
};
self.parent_services.insert(service.name.clone(), parent_info);
// Process sub-services if any
if !service.sub_services.is_empty() {
let mut sub_list = Vec::new();
@@ -293,12 +325,13 @@ impl Widget for ServicesWidget {
let metrics: Vec<(String, f32, Option<String>)> = sub_service.metrics.iter()
.map(|m| (m.label.clone(), m.value, m.unit.clone()))
.collect();
let sub_info = ServiceInfo {
memory_mb: None, // Not used for sub-services
disk_gb: None, // Not used for sub-services
metrics,
widget_status: sub_service.service_status,
service_type: sub_service.service_type.clone(),
};
sub_list.push((sub_service.name.clone(), sub_info));
}
@@ -342,6 +375,7 @@ impl ServicesWidget {
disk_gb: None,
metrics: Vec::new(),
widget_status: Status::Unknown,
service_type: String::new(),
});
if metric.name.ends_with("_status") {
@@ -377,6 +411,7 @@ impl ServicesWidget {
disk_gb: None,
metrics: Vec::new(),
widget_status: Status::Unknown,
service_type: String::new(), // Unknown type in legacy path
},
));
&mut sub_service_list.last_mut().unwrap().1

View File

@@ -15,6 +15,10 @@ pub struct SystemWidget {
nixos_build: Option<String>,
agent_hash: Option<String>,
// ZMQ communication stats
zmq_packets_received: Option<u64>,
zmq_last_packet_age: Option<f64>,
// Network interfaces
network_interfaces: Vec<cm_dashboard_shared::NetworkInterfaceData>,
@@ -92,6 +96,8 @@ impl SystemWidget {
Self {
nixos_build: None,
agent_hash: None,
zmq_packets_received: None,
zmq_last_packet_age: None,
network_interfaces: Vec::new(),
cpu_load_1min: None,
cpu_load_5min: None,
@@ -154,6 +160,12 @@ impl SystemWidget {
pub fn _get_agent_hash(&self) -> Option<&String> {
self.agent_hash.as_ref()
}
/// Update ZMQ communication statistics
pub fn update_zmq_stats(&mut self, packets_received: u64, last_packet_age_secs: f64) {
self.zmq_packets_received = Some(packets_received);
self.zmq_last_packet_age = Some(last_packet_age_secs);
}
}
use super::Widget;
@@ -796,6 +808,18 @@ impl SystemWidget {
Span::styled(format!("Agent: {}", agent_version_text), Typography::secondary())
]));
// ZMQ communication stats
if let (Some(packets), Some(age)) = (self.zmq_packets_received, self.zmq_last_packet_age) {
let age_text = if age < 1.0 {
format!("{:.0}ms ago", age * 1000.0)
} else {
format!("{:.1}s ago", age)
};
lines.push(Line::from(vec![
Span::styled(format!("ZMQ: {} pkts, last {}", packets, age_text), Typography::secondary())
]));
}
// CPU section
lines.push(Line::from(vec![
Span::styled("CPU:", Typography::widget_title())

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-shared"
version = "0.1.178"
version = "0.1.195"
edition = "2021"
[dependencies]

View File

@@ -149,6 +149,9 @@ pub struct SubServiceData {
pub name: String,
pub service_status: Status,
pub metrics: Vec<SubServiceMetric>,
/// Type of sub-service: "nginx_site", "container", "image"
#[serde(default)]
pub service_type: String,
}
/// Individual metric for a sub-service