From 5f6e47ece5591bbbbb2f9d0eab5487f61bad4944 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Thu, 6 Nov 2025 11:04:01 +0100 Subject: [PATCH] Implement heartbeat-based host connectivity detection - Add agent_heartbeat metric to agent transmission for reliable host detection - Update dashboard to track heartbeat timestamps per host instead of general metrics - Add configurable heartbeat_timeout_seconds to dashboard ZMQ config (default 10s) - Remove unused timeout_ms from agent config and revert to non-blocking command reception - Remove unused heartbeat_interval_ms from agent configuration - Host disconnect detection now uses dedicated heartbeat metrics for improved reliability - Bump version to 0.1.57 --- Cargo.lock | 6 +++--- agent/Cargo.toml | 2 +- agent/src/agent.rs | 19 +++++++++++++++++++ agent/src/communication/mod.rs | 2 -- agent/src/config/mod.rs | 2 -- agent/src/config/validation.rs | 4 ---- dashboard/Cargo.toml | 2 +- dashboard/src/app.rs | 6 +++--- dashboard/src/communication/mod.rs | 4 ++-- dashboard/src/config/mod.rs | 7 +++++++ dashboard/src/metrics/store.rs | 25 +++++++++++++++---------- shared/Cargo.toml | 2 +- 12 files changed, 52 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21a98bc..dd0f22a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cm-dashboard" -version = "0.1.55" +version = "0.1.56" dependencies = [ "anyhow", "chrono", @@ -292,7 +292,7 @@ dependencies = [ [[package]] name = "cm-dashboard-agent" -version = "0.1.55" +version = "0.1.56" dependencies = [ "anyhow", "async-trait", @@ -315,7 +315,7 @@ dependencies = [ [[package]] name = "cm-dashboard-shared" -version = "0.1.55" +version = "0.1.56" dependencies = [ "chrono", "serde", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 9b7edfc..5a86e2a 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-agent" -version = "0.1.56" +version = "0.1.57" edition = "2021" [dependencies] diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 60a80ac..e1b74a7 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -180,6 +180,10 @@ impl Agent { let version_metric = self.get_agent_version_metric(); metrics.push(version_metric); + // Add heartbeat metric for host connectivity detection + let heartbeat_metric = self.get_heartbeat_metric(); + metrics.push(heartbeat_metric); + // Check for user-stopped services that are now active and clear their flags self.clear_user_stopped_flags_for_active_services(&metrics); @@ -232,6 +236,21 @@ impl Agent { format!("v{}", env!("CARGO_PKG_VERSION")) } + /// Create heartbeat metric for host connectivity detection + fn get_heartbeat_metric(&self) -> Metric { + use std::time::{SystemTime, UNIX_EPOCH}; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + Metric::new( + "agent_heartbeat".to_string(), + MetricValue::Integer(timestamp as i64), + Status::Ok, + ) + } async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) diff --git a/agent/src/communication/mod.rs b/agent/src/communication/mod.rs index 2321183..6c2b4bd 100644 --- a/agent/src/communication/mod.rs +++ b/agent/src/communication/mod.rs @@ -66,8 +66,6 @@ impl ZmqHandler { } - /// Send heartbeat (placeholder for future use) - /// Try to receive a command (non-blocking) pub fn try_receive_command(&self) -> Result> { match self.command_receiver.recv_bytes(zmq::DONTWAIT) { diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 719948b..cccf4f9 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -28,8 +28,6 @@ pub struct ZmqConfig { pub publisher_port: u16, pub command_port: u16, pub bind_address: String, - pub timeout_ms: u64, - pub heartbeat_interval_ms: u64, pub transmission_interval_seconds: u64, } diff --git a/agent/src/config/validation.rs b/agent/src/config/validation.rs index 3c7618d..2747418 100644 --- a/agent/src/config/validation.rs +++ b/agent/src/config/validation.rs @@ -19,10 +19,6 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> { bail!("ZMQ bind address cannot be empty"); } - if config.zmq.timeout_ms == 0 { - bail!("ZMQ timeout cannot be 0"); - } - // Validate collection interval if config.collection_interval_seconds == 0 { bail!("Collection interval cannot be 0"); diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml index 243ddfa..406e96c 100644 --- a/dashboard/Cargo.toml +++ b/dashboard/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard" -version = "0.1.56" +version = "0.1.57" edition = "2021" [dependencies] diff --git a/dashboard/src/app.rs b/dashboard/src/app.rs index c78b8cf..b8a2704 100644 --- a/dashboard/src/app.rs +++ b/dashboard/src/app.rs @@ -22,7 +22,7 @@ pub struct Dashboard { terminal: Option>>, headless: bool, initial_commands_sent: std::collections::HashSet, - _config: DashboardConfig, + config: DashboardConfig, } impl Dashboard { @@ -133,7 +133,7 @@ impl Dashboard { terminal, headless, initial_commands_sent: std::collections::HashSet::new(), - _config: config, + config, }) } @@ -247,7 +247,7 @@ impl Dashboard { if let Some(ref mut tui_app) = self.tui_app { let connected_hosts = self .metric_store - .get_connected_hosts(Duration::from_secs(30)); + .get_connected_hosts(Duration::from_secs(self.config.zmq.heartbeat_timeout_seconds)); tui_app.update_hosts(connected_hosts); diff --git a/dashboard/src/communication/mod.rs b/dashboard/src/communication/mod.rs index 2d9688d..4ffb01a 100644 --- a/dashboard/src/communication/mod.rs +++ b/dashboard/src/communication/mod.rs @@ -141,9 +141,9 @@ impl ZmqConsumer { } } - /// Receive metrics from any connected agent (non-blocking) + /// Receive metrics from any connected agent (with timeout) pub async fn receive_metrics(&mut self) -> Result> { - match self.subscriber.recv_bytes(zmq::DONTWAIT) { + match self.subscriber.recv_bytes(0) { Ok(data) => { debug!("Received {} bytes from ZMQ", data.len()); diff --git a/dashboard/src/config/mod.rs b/dashboard/src/config/mod.rs index f731b1a..601f91f 100644 --- a/dashboard/src/config/mod.rs +++ b/dashboard/src/config/mod.rs @@ -16,6 +16,13 @@ pub struct DashboardConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ZmqConfig { pub subscriber_ports: Vec, + /// Heartbeat timeout in seconds - hosts considered offline if no heartbeat received within this time + #[serde(default = "default_heartbeat_timeout_seconds")] + pub heartbeat_timeout_seconds: u64, +} + +fn default_heartbeat_timeout_seconds() -> u64 { + 10 // Default to 10 seconds - allows for multiple missed heartbeats } /// Individual host configuration details diff --git a/dashboard/src/metrics/store.rs b/dashboard/src/metrics/store.rs index cd8b5ea..053e8f1 100644 --- a/dashboard/src/metrics/store.rs +++ b/dashboard/src/metrics/store.rs @@ -11,8 +11,8 @@ pub struct MetricStore { current_metrics: HashMap>, /// Historical metrics for trending historical_metrics: HashMap>, - /// Last update timestamp per host - last_update: HashMap, + /// Last heartbeat timestamp per host + last_heartbeat: HashMap, /// Configuration max_metrics_per_host: usize, history_retention: Duration, @@ -23,7 +23,7 @@ impl MetricStore { Self { current_metrics: HashMap::new(), historical_metrics: HashMap::new(), - last_update: HashMap::new(), + last_heartbeat: HashMap::new(), max_metrics_per_host, history_retention: Duration::from_secs(history_retention_hours * 3600), } @@ -56,10 +56,13 @@ impl MetricStore { // Add to history host_history.push(MetricDataPoint { received_at: now }); - } - // Update last update timestamp - self.last_update.insert(hostname.to_string(), now); + // Track heartbeat metrics for connectivity detection + if metric_name == "agent_heartbeat" { + self.last_heartbeat.insert(hostname.to_string(), now); + debug!("Updated heartbeat for host {}", hostname); + } + } // Get metrics count before cleanup let metrics_count = host_metrics.len(); @@ -88,16 +91,18 @@ impl MetricStore { } } - /// Get connected hosts (hosts with recent updates) + /// Get connected hosts (hosts with recent heartbeats) pub fn get_connected_hosts(&self, timeout: Duration) -> Vec { let now = Instant::now(); - self.last_update + self.last_heartbeat .iter() - .filter_map(|(hostname, &last_update)| { - if now.duration_since(last_update) <= timeout { + .filter_map(|(hostname, &last_heartbeat)| { + if now.duration_since(last_heartbeat) <= timeout { Some(hostname.clone()) } else { + debug!("Host {} considered offline - last heartbeat was {:?} ago", + hostname, now.duration_since(last_heartbeat)); None } }) diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 83bcf9d..8597900 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cm-dashboard-shared" -version = "0.1.56" +version = "0.1.57" edition = "2021" [dependencies]