diff --git a/CLAUDE.md b/CLAUDE.md index 32fe7aa..3786ee8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -156,7 +156,7 @@ Complete migration from string-based metrics to structured JSON data. Eliminates - ✅ Backward compatibility via bridge conversion to existing UI widgets - ✅ All string parsing bugs eliminated -### Cached Collector Architecture (✅ IMPLEMENTED) +### Cached Collector Architecture (🚧 PLANNED) **Problem:** Blocking collectors prevent timely ZMQ transmission, causing false "host offline" alerts. @@ -199,42 +199,12 @@ Every 1 second: - ✅ System stays responsive even with slow operations - ✅ Slow collectors can use longer timeouts without blocking -**Implementation Details:** -- **Shared cache**: `Arc>` initialized at agent startup -- **Collector intervals**: Fully configurable via NixOS config (`interval_seconds` per collector) - - Recommended: Fast (1-10s): CPU, Memory, Network - - Recommended: Medium (30-60s): Backup, NixOS - - Recommended: Slow (60-300s): Disk, Systemd -- **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()` -- **Cache updates**: Collectors acquire write lock → update → release immediately -- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts -- **Notification check**: Runs every `notifications.check_interval_seconds` -- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission -- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage) - -**Configuration (NixOS):** -All intervals and timeouts configurable in `services/cm-dashboard.nix`: - -Collection Intervals: -- `collectors.cpu.interval_seconds` (default: 10s) -- `collectors.memory.interval_seconds` (default: 2s) -- `collectors.disk.interval_seconds` (default: 300s) -- `collectors.systemd.interval_seconds` (default: 10s) -- `collectors.backup.interval_seconds` (default: 60s) -- `collectors.network.interval_seconds` (default: 10s) -- `collectors.nixos.interval_seconds` (default: 60s) -- `notifications.check_interval_seconds` (default: 30s) -- `collection_interval_seconds` - ZMQ transmission rate (default: 2s) - -Command Timeouts (prevent resource leaks from hung commands): -- `collectors.disk.command_timeout_seconds` (default: 30s) - lsblk, smartctl, etc. -- `collectors.systemd.command_timeout_seconds` (default: 15s) - systemctl, docker, du -- `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr - -**Code Locations:** -- agent/src/agent.rs:59-133 - Collector task spawning -- agent/src/agent.rs:151-179 - Independent collector task runner -- agent/src/agent.rs:199-207 - ZMQ sender in main loop +**Implementation:** +- Shared `AgentData` cache wrapped in `Arc>` +- Each collector spawned as independent tokio task +- Collectors update their section of cache at their own rate +- ZMQ sender reads cache every 1s and transmits +- Stale data acceptable for slow-changing metrics (disk usage, SMART) ### Maintenance Mode diff --git a/Cargo.lock b/Cargo.lock index 6dfaddb..343050f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,7 +278,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.192" +version = "0.1.191" dependencies = [ "anyhow", "chrono", @@ -300,7 +300,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.192" +version = "0.1.191" dependencies = [ "anyhow", "async-trait", @@ -323,7 +323,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.192" +version = "0.1.191" dependencies = [ "chrono", "serde", diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 900c7a2..d74a4c7 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -1,14 +1,13 @@ use anyhow::Result; use gethostname::gethostname; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::{AgentCommand, ZmqHandler}; use crate::config::AgentConfig; use crate::collectors::{ + Collector, backup::BackupCollector, cpu::CpuCollector, disk::DiskCollector, @@ -24,7 +23,7 @@ pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, - cache: Arc>, + collectors: Vec>, notification_manager: NotificationManager, previous_status: Option, } @@ -56,94 +55,39 @@ impl Agent { config.zmq.publisher_port ); - // Initialize shared cache - let cache = Arc::new(RwLock::new(AgentData::new( - hostname.clone(), - env!("CARGO_PKG_VERSION").to_string() - ))); - info!("Initialized shared agent data cache"); - - // Spawn independent collector tasks - let mut collector_count = 0; - - // CPU collector + // Initialize collectors + let mut collectors: Vec> = Vec::new(); + + // Add enabled collectors if config.collectors.cpu.enabled { - let cache_clone = cache.clone(); - let collector = CpuCollector::new(config.collectors.cpu.clone()); - let interval = config.collectors.cpu.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "CPU").await; - }); - collector_count += 1; + collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone()))); } - - // Memory collector + if config.collectors.memory.enabled { - let cache_clone = cache.clone(); - let collector = MemoryCollector::new(config.collectors.memory.clone()); - let interval = config.collectors.memory.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Memory").await; - }); - collector_count += 1; + collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone()))); } - - // Network collector - if config.collectors.network.enabled { - let cache_clone = cache.clone(); - let collector = NetworkCollector::new(config.collectors.network.clone()); - let interval = config.collectors.network.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Network").await; - }); - collector_count += 1; - } - - // Backup collector - if config.collectors.backup.enabled { - let cache_clone = cache.clone(); - let collector = BackupCollector::new(); - let interval = config.collectors.backup.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Backup").await; - }); - collector_count += 1; - } - - // NixOS collector - if config.collectors.nixos.enabled { - let cache_clone = cache.clone(); - let collector = NixOSCollector::new(config.collectors.nixos.clone()); - let interval = config.collectors.nixos.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "NixOS").await; - }); - collector_count += 1; - } - - // Disk collector + if config.collectors.disk.enabled { - let cache_clone = cache.clone(); - let collector = DiskCollector::new(config.collectors.disk.clone()); - let interval = config.collectors.disk.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Disk").await; - }); - collector_count += 1; + collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone()))); } - - // Systemd collector + if config.collectors.systemd.enabled { - let cache_clone = cache.clone(); - let collector = SystemdCollector::new(config.collectors.systemd.clone()); - let interval = config.collectors.systemd.interval_seconds; - tokio::spawn(async move { - Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Systemd").await; - }); - collector_count += 1; + collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone()))); + } + + if config.collectors.backup.enabled { + collectors.push(Box::new(BackupCollector::new())); } - info!("Spawned {} independent collector tasks", collector_count); + if config.collectors.network.enabled { + collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone()))); + } + + if config.collectors.nixos.enabled { + collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone()))); + } + + info!("Initialized {} collectors", collectors.len()); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; @@ -153,79 +97,45 @@ impl Agent { hostname, config, zmq_handler, - cache, + collectors, notification_manager, previous_status: None, }) } - /// Independent collector task runner - async fn run_collector_task( - cache: Arc>, - collector: C, - interval_duration: Duration, - name: &str, - ) where - C: crate::collectors::Collector + Send + 'static, - { - let mut interval_timer = interval(interval_duration); - info!("{} collector task started (interval: {:?})", name, interval_duration); - - loop { - interval_timer.tick().await; - - // Acquire write lock and update cache - { - let mut agent_data = cache.write().await; - match collector.collect_structured(&mut *agent_data).await { - Ok(_) => { - debug!("{} collector updated cache", name); - } - Err(e) => { - error!("{} collector failed: {}", name, e); - } - } - } // Release lock immediately after collection - } - } - - /// Main agent loop with cached data architecture + /// Main agent loop with structured data collection pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { - info!("Starting agent main loop with cached collector architecture"); + info!("Starting agent main loop"); - // Set up intervals from config + // Initial collection + if let Err(e) = self.collect_and_broadcast().await { + error!("Initial metric collection failed: {}", e); + } + + // Set up intervals let mut transmission_interval = interval(Duration::from_secs( self.config.collection_interval_seconds, )); - let mut notification_interval = interval(Duration::from_secs( - self.config.notifications.check_interval_seconds, - )); - let mut command_interval = interval(Duration::from_millis(100)); + let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s - // Skip initial ticks + // Skip initial ticks to avoid immediate execution transmission_interval.tick().await; notification_interval.tick().await; - command_interval.tick().await; loop { tokio::select! { _ = transmission_interval.tick() => { - // Read current cache state and broadcast via ZMQ - let agent_data = self.cache.read().await.clone(); - if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { - error!("Failed to broadcast agent data: {}", e); - } else { - debug!("Successfully broadcast agent data"); + if let Err(e) = self.collect_and_broadcast().await { + error!("Failed to collect and broadcast metrics: {}", e); } } _ = notification_interval.tick() => { - // Read cache and check for status changes - let agent_data = self.cache.read().await.clone(); - if let Err(e) = self.check_status_changes_and_notify(&agent_data).await { - error!("Failed to check status changes: {}", e); - } + // Process any pending notifications + // NOTE: With structured data, we might need to implement status tracking differently + // For now, we skip this until status evaluation is migrated } - _ = command_interval.tick() => { + // Handle incoming commands (check periodically) + _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } @@ -241,6 +151,35 @@ impl Agent { Ok(()) } + /// Collect structured data from all collectors and broadcast via ZMQ + async fn collect_and_broadcast(&mut self) -> Result<()> { + debug!("Starting structured data collection"); + + // Initialize empty AgentData + let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string()); + + // Collect data from all collectors + for collector in &self.collectors { + if let Err(e) = collector.collect_structured(&mut agent_data).await { + error!("Collector failed: {}", e); + // Continue with other collectors even if one fails + } + } + + // Check for status changes and send notifications + if let Err(e) = self.check_status_changes_and_notify(&agent_data).await { + error!("Failed to check status changes: {}", e); + } + + // Broadcast the structured data via ZMQ + if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { + error!("Failed to broadcast agent data: {}", e); + } else { + debug!("Successfully broadcast structured agent data"); + } + + Ok(()) + } /// Check for status changes and send notifications async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> { @@ -328,12 +267,9 @@ impl Agent { match command { AgentCommand::CollectNow => { - info!("Received immediate transmission request"); - // With cached architecture, collectors run independently - // Just send current cache state immediately - let agent_data = self.cache.read().await.clone(); - if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await { - error!("Failed to broadcast on demand: {}", e); + info!("Received immediate collection request"); + if let Err(e) = self.collect_and_broadcast().await { + error!("Failed to collect on demand: {}", e); } } AgentCommand::SetInterval { seconds } => { diff --git a/agent/src/collectors/disk.rs b/agent/src/collectors/disk.rs index 588bec8..71c53cf 100644 --- a/agent/src/collectors/disk.rs +++ b/agent/src/collectors/disk.rs @@ -117,7 +117,7 @@ impl DiskCollector { let mut cmd = Command::new("lsblk"); cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]); - let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await + let output = run_command_with_timeout(cmd, 2).await .map_err(|e| CollectorError::SystemRead { path: "block devices".to_string(), error: e.to_string(), diff --git a/agent/src/collectors/network.rs b/agent/src/collectors/network.rs index 5a26b05..fd4dbe2 100644 --- a/agent/src/collectors/network.rs +++ b/agent/src/collectors/network.rs @@ -8,12 +8,12 @@ use crate::config::NetworkConfig; /// Network interface collector with physical/virtual classification and link status pub struct NetworkCollector { - config: NetworkConfig, + _config: NetworkConfig, } impl NetworkCollector { pub fn new(config: NetworkConfig) -> Self { - Self { config } + Self { _config: config } } /// Check if interface is physical (not virtual) @@ -50,9 +50,8 @@ impl NetworkCollector { } /// Get the primary physical interface (the one with default route) - fn get_primary_physical_interface(&self) -> Option { - let timeout_str = self.config.command_timeout_seconds.to_string(); - match Command::new("timeout").args([&timeout_str, "ip", "route", "show", "default"]).output() { + fn get_primary_physical_interface() -> Option { + match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() { Ok(output) if output.status.success() => { let output_str = String::from_utf8_lossy(&output.stdout); // Parse: "default via 192.168.1.1 dev eno1 ..." @@ -111,8 +110,7 @@ impl NetworkCollector { // Parse VLAN configuration let vlan_map = Self::parse_vlan_config(); - let timeout_str = self.config.command_timeout_seconds.to_string(); - match Command::new("timeout").args([&timeout_str, "ip", "-j", "addr"]).output() { + match Command::new("timeout").args(["2", "ip", "-j", "addr"]).output() { Ok(output) if output.status.success() => { let json_str = String::from_utf8_lossy(&output.stdout); @@ -197,7 +195,7 @@ impl NetworkCollector { } // Assign primary physical interface as parent to virtual interfaces without explicit parent - let primary_interface = self.get_primary_physical_interface(); + let primary_interface = Self::get_primary_physical_interface(); if let Some(primary) = primary_interface { for interface in interfaces.iter_mut() { // Only assign parent to virtual interfaces that don't already have one diff --git a/agent/src/collectors/systemd.rs b/agent/src/collectors/systemd.rs index 9ba8663..bcaa6be 100644 --- a/agent/src/collectors/systemd.rs +++ b/agent/src/collectors/systemd.rs @@ -254,19 +254,18 @@ impl SystemdCollector { /// Auto-discover interesting services to monitor fn discover_services_internal(&self) -> Result<(Vec, std::collections::HashMap)> { - // First: Get all service unit files - let timeout_str = self.config.command_timeout_seconds.to_string(); + // First: Get all service unit files (with 3 second timeout) let unit_files_output = Command::new("timeout") - .args(&[&timeout_str, "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"]) + .args(&["3", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"]) .output()?; if !unit_files_output.status.success() { return Err(anyhow::anyhow!("systemctl list-unit-files command failed")); } - // Second: Get runtime status of all units + // Second: Get runtime status of all units (with 3 second timeout) let units_status_output = Command::new("timeout") - .args(&[&timeout_str, "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"]) + .args(&["3", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"]) .output()?; if !units_status_output.status.success() { @@ -362,17 +361,16 @@ impl SystemdCollector { } } - // Fallback to systemctl if not in cache - let timeout_str = self.config.command_timeout_seconds.to_string(); + // Fallback to systemctl if not in cache (with 2 second timeout) let output = Command::new("timeout") - .args(&[&timeout_str, "systemctl", "is-active", &format!("{}.service", service)]) + .args(&["2", "systemctl", "is-active", &format!("{}.service", service)]) .output()?; let active_status = String::from_utf8(output.stdout)?.trim().to_string(); - // Get more detailed info + // Get more detailed info (with 2 second timeout) let output = Command::new("timeout") - .args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) + .args(&["2", "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"]) .output()?; let detailed_info = String::from_utf8(output.stdout)?; @@ -432,10 +430,9 @@ impl SystemdCollector { return Ok(0.0); } - // No configured path - try to get WorkingDirectory from systemctl - let timeout_str = self.config.command_timeout_seconds.to_string(); + // No configured path - try to get WorkingDirectory from systemctl (with 2 second timeout) let output = Command::new("timeout") - .args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) + .args(&["2", "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("WorkingDirectory for {}", service_name), @@ -455,15 +452,15 @@ impl SystemdCollector { Ok(0.0) } - /// Get size of a directory in GB + /// Get size of a directory in GB (with 2 second timeout) async fn get_directory_size(&self, path: &str) -> Option { use super::run_command_with_timeout; - // Use -s (summary) and --apparent-size for speed + // Use -s (summary) and --apparent-size for speed, 2 second timeout let mut cmd = Command::new("sudo"); cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]); - let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await.ok()?; + let output = run_command_with_timeout(cmd, 2).await.ok()?; if !output.status.success() { // Log permission errors for debugging but don't spam logs @@ -789,10 +786,9 @@ impl SystemdCollector { let mut containers = Vec::new(); // Check if docker is available (cm-agent user is in docker group) - // Use -a to show ALL containers (running and stopped) - let timeout_str = self.config.command_timeout_seconds.to_string(); + // Use -a to show ALL containers (running and stopped) with 3 second timeout let output = Command::new("timeout") - .args(&[&timeout_str, "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"]) + .args(&["3", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"]) .output(); let output = match output { @@ -833,10 +829,9 @@ impl SystemdCollector { /// Get docker images as sub-services fn get_docker_images(&self) -> Vec<(String, String, f32)> { let mut images = Vec::new(); - // Check if docker is available (cm-agent user is in docker group) - let timeout_str = self.config.command_timeout_seconds.to_string(); + // Check if docker is available (cm-agent user is in docker group) with 3 second timeout let output = Command::new("timeout") - .args(&[&timeout_str, "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) + .args(&["3", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"]) .output(); let output = match output { diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index f0f403a..8593b54 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -79,9 +79,6 @@ pub struct DiskConfig { pub temperature_critical_celsius: f32, pub wear_warning_percent: f32, pub wear_critical_percent: f32, - /// Command timeout in seconds for lsblk, smartctl, etc. - #[serde(default = "default_disk_command_timeout")] - pub command_timeout_seconds: u64, } /// Filesystem configuration entry @@ -111,9 +108,6 @@ pub struct SystemdConfig { pub http_timeout_seconds: u64, pub http_connect_timeout_seconds: u64, pub nginx_latency_critical_ms: f32, - /// Command timeout in seconds for systemctl, docker, du commands - #[serde(default = "default_systemd_command_timeout")] - pub command_timeout_seconds: u64, } @@ -138,9 +132,6 @@ pub struct BackupConfig { pub struct NetworkConfig { pub enabled: bool, pub interval_seconds: u64, - /// Command timeout in seconds for ip route, ip addr commands - #[serde(default = "default_network_command_timeout")] - pub command_timeout_seconds: u64, } /// Notification configuration @@ -154,9 +145,6 @@ pub struct NotificationConfig { pub rate_limit_minutes: u64, /// Email notification batching interval in seconds (default: 60) pub aggregation_interval_seconds: u64, - /// Status check interval in seconds for detecting changes (default: 30) - #[serde(default = "default_notification_check_interval")] - pub check_interval_seconds: u64, /// List of metric names to exclude from email notifications #[serde(default)] pub exclude_email_metrics: Vec, @@ -170,26 +158,10 @@ fn default_heartbeat_interval_seconds() -> u64 { 5 } -fn default_notification_check_interval() -> u64 { - 30 -} - fn default_maintenance_mode_file() -> String { "/tmp/cm-maintenance".to_string() } -fn default_disk_command_timeout() -> u64 { - 30 -} - -fn default_systemd_command_timeout() -> u64 { - 15 -} - -fn default_network_command_timeout() -> u64 { - 10 -} - impl AgentConfig { pub fn from_file>(path: P) -> Result { loader::load_config(path)