From 91f037aa3ebd804b635827663e47eb356cf53970 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Tue, 28 Oct 2025 10:36:34 +0100 Subject: [PATCH] Update to v0.1.19 with event-driven status aggregation Major architectural improvements: CORE CHANGES: - Remove notification_interval_seconds - status aggregation now immediate - Status calculation moved to collection phase instead of transmission - Event-driven transmission triggers immediately on status changes - Dual transmission strategy: immediate on change + periodic backup - Real-time notifications without batching delays TECHNICAL IMPROVEMENTS: - process_metric() now returns bool indicating status change - Immediate ZMQ broadcast when status changes detected - Status aggregation happens during metric collection, not later - Legacy get_nixos_build_info() method removed (unused) - All compilation warnings fixed BEHAVIOR CHANGES: - Critical alerts sent instantly instead of waiting for intervals - Dashboard receives real-time status updates - Notifications triggered immediately on status transitions - Backup periodic transmission every 1s ensures heartbeat This provides much more responsive monitoring with instant alerting while maintaining the reliability of periodic transmission as backup. --- Cargo.lock | 6 +-- agent/Cargo.toml | 2 +- agent/src/agent.rs | 31 ++++++++++------ agent/src/collectors/nixos.rs | 25 ------------- agent/src/status/mod.rs | 70 ++++++++++++++++++++++++++++------- dashboard/Cargo.toml | 2 +- dashboard/src/main.rs | 2 +- shared/Cargo.toml | 2 +- 8 files changed, 83 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 666ef73..0bdc604 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.18" +version = "0.1.19" dependencies = [ "anyhow", "chrono", @@ -291,7 +291,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.18" +version = "0.1.19" dependencies = [ "anyhow", "async-trait", @@ -314,7 +314,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.18" +version = "0.1.19" dependencies = [ "chrono", "serde", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 0901035..243b710 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.18" +version = "0.1.19" edition = "2021" [dependencies] diff --git a/agent/src/agent.rs b/agent/src/agent.rs index bf7448e..b126ffe 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -75,7 +75,6 @@ impl Agent { let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds)); - let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds)); loop { tokio::select! { @@ -86,13 +85,12 @@ impl Agent { } } _ = transmission_interval.tick() => { - // Send all metrics via ZMQ every 1 second + // Send all metrics via ZMQ and process notifications immediately if let Err(e) = self.broadcast_all_metrics().await { error!("Failed to broadcast metrics: {}", e); } - } - _ = notification_interval.tick() => { - // Process batched notifications + + // Process notifications immediately with each transmission if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { error!("Failed to process pending notifications: {}", e); } @@ -127,8 +125,8 @@ impl Agent { info!("Force collected and cached {} metrics", metrics.len()); - // Process metrics through status manager - self.process_metrics(&metrics).await; + // Process metrics through status manager (don't trigger transmission on startup) + let _status_changed = self.process_metrics(&metrics).await; Ok(()) } @@ -146,8 +144,15 @@ impl Agent { debug!("Collected and cached {} metrics", metrics.len()); - // Process metrics through status manager - self.process_metrics(&metrics).await; + // Process metrics through status manager and trigger immediate transmission if status changed + let status_changed = self.process_metrics(&metrics).await; + + if status_changed { + info!("Status change detected - triggering immediate metric transmission"); + if let Err(e) = self.broadcast_all_metrics().await { + error!("Failed to broadcast metrics after status change: {}", e); + } + } Ok(()) } @@ -181,10 +186,14 @@ impl Agent { Ok(()) } - async fn process_metrics(&mut self, metrics: &[Metric]) { + async fn process_metrics(&mut self, metrics: &[Metric]) -> bool { + let mut status_changed = false; for metric in metrics { - self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; + if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await { + status_changed = true; + } } + status_changed } /// Create agent version metric for cross-host version comparison diff --git a/agent/src/collectors/nixos.rs b/agent/src/collectors/nixos.rs index 9310d07..8ba2286 100644 --- a/agent/src/collectors/nixos.rs +++ b/agent/src/collectors/nixos.rs @@ -19,31 +19,6 @@ impl NixOSCollector { Self {} } - /// Get NixOS build information - fn get_nixos_build_info(&self) -> Result> { - // Get nixos-version output directly - let output = Command::new("nixos-version").output()?; - - if !output.status.success() { - return Err("nixos-version command failed".into()); - } - - let version_line = String::from_utf8_lossy(&output.stdout); - let version = version_line.trim(); - - if version.is_empty() { - return Err("Empty nixos-version output".into()); - } - - // Remove codename part (e.g., "(Warbler)") - let clean_version = if let Some(pos) = version.find(" (") { - version[..pos].to_string() - } else { - version.to_string() - }; - - Ok(clean_version) - } /// Get agent hash from binary path fn get_agent_hash(&self) -> Result> { diff --git a/agent/src/status/mod.rs b/agent/src/status/mod.rs index 888cecb..ea1a074 100644 --- a/agent/src/status/mod.rs +++ b/agent/src/status/mod.rs @@ -9,7 +9,6 @@ use chrono::Utc; pub struct HostStatusConfig { pub enabled: bool, pub aggregation_method: String, // "worst_case" - pub notification_interval_seconds: u64, } impl Default for HostStatusConfig { @@ -17,7 +16,6 @@ impl Default for HostStatusConfig { Self { enabled: true, aggregation_method: "worst_case".to_string(), - notification_interval_seconds: 30, } } } @@ -160,25 +158,69 @@ impl HostStatusManager { - /// Process a metric - updates status (notifications handled separately via batching) - pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) { - // Just update status - notifications are handled by process_pending_notifications - self.update_service_status(metric.name.clone(), metric.status); + /// Process a metric - updates status and triggers immediate notifications if status changed + pub async fn process_metric(&mut self, metric: &Metric, notification_manager: &mut crate::notifications::NotificationManager) -> bool { + let old_status = self.service_statuses.get(&metric.name).copied().unwrap_or(Status::Unknown); + let new_status = metric.status; + + // Update status + self.update_service_status(metric.name.clone(), new_status); + + // Check if status actually changed + if old_status != new_status { + debug!("Status change detected for {}: {:?} -> {:?}", metric.name, old_status, new_status); + + // Process notification immediately on status change + if let Err(e) = self.process_status_change(&metric.name, old_status, new_status, notification_manager).await { + error!("Failed to process status change notification: {}", e); + } + + return true; // Status changed - caller should trigger immediate transmission + } + + false // No status change } - /// Process pending notifications - call this at notification intervals + /// Process immediate status change notification + async fn process_status_change(&mut self, metric_name: &str, old_status: Status, new_status: Status, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box> { + if !self.config.enabled { + return Ok(()); + } + + // Create immediate notification for this specific status change + let status_summary = StatusChangeSummary { + service_name: metric_name.to_string(), + initial_status: old_status, + final_status: new_status, + change_count: 1, + }; + + let aggregated = AggregatedStatusChanges { + start_time: Instant::now(), + end_time: Instant::now(), + service_summaries: vec![status_summary], + host_status_initial: old_status, + host_status_final: new_status, + requires_notification: true, + }; + + // Send immediate notification using existing method + if let Err(e) = self.send_aggregated_email(&aggregated, notification_manager).await { + error!("Failed to send immediate notification: {}", e); + return Err(e); + } + + info!("Sent immediate notification for {} status change: {:?} -> {:?}", metric_name, old_status, new_status); + Ok(()) + } + + /// Process pending notifications - legacy method, now rarely used pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box> { if !self.config.enabled || self.pending_changes.is_empty() { return Ok(()); } - let batch_start = self.batch_start_time.unwrap_or_else(Instant::now); - let batch_duration = batch_start.elapsed(); - - // Only process if enough time has passed - if batch_duration.as_secs() < self.config.notification_interval_seconds { - return Ok(()); - } + // Process notifications immediately without interval batching // Create aggregated status changes let aggregated = self.create_aggregated_changes(); diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index 98a1228..e9f41ab 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.18" +version = "0.1.19" edition = "2021" [dependencies] diff --git a/dashboard/src/main.rs b/dashboard/src/main.rs index 35d4895..8ecf9cb 100644 --- a/dashboard/src/main.rs +++ b/dashboard/src/main.rs @@ -14,7 +14,7 @@ use app::Dashboard; /// Get hardcoded version fn get_version() -> &'static str { - "v0.1.18" + "v0.1.19" } /// Check if running inside tmux session diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 84ca9a3..7024479 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.18" +version = "0.1.19" edition = "2021" [dependencies]