Implement heartbeat-based host connectivity detection
All checks were successful
Build and Release / build-and-release (push) Successful in 2m8s

- Add agent_heartbeat metric to agent transmission for reliable host detection
- Update dashboard to track heartbeat timestamps per host instead of general metrics
- Add configurable heartbeat_timeout_seconds to dashboard ZMQ config (default 10s)
- Remove unused timeout_ms from agent config and revert to non-blocking command reception
- Remove unused heartbeat_interval_ms from agent configuration
- Host disconnect detection now uses dedicated heartbeat metrics for improved reliability
- Bump version to 0.1.57
This commit is contained in:
Christoffer Martinsson 2025-11-06 11:04:01 +01:00
parent 0e7cf24dbb
commit 5f6e47ece5
12 changed files with 52 additions and 29 deletions

6
Cargo.lock generated
View File

@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]] [[package]]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.55" version = "0.1.56"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@ -292,7 +292,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.55" version = "0.1.56"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -315,7 +315,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.55" version = "0.1.56"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.56" version = "0.1.57"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -180,6 +180,10 @@ impl Agent {
let version_metric = self.get_agent_version_metric(); let version_metric = self.get_agent_version_metric();
metrics.push(version_metric); metrics.push(version_metric);
// Add heartbeat metric for host connectivity detection
let heartbeat_metric = self.get_heartbeat_metric();
metrics.push(heartbeat_metric);
// Check for user-stopped services that are now active and clear their flags // Check for user-stopped services that are now active and clear their flags
self.clear_user_stopped_flags_for_active_services(&metrics); self.clear_user_stopped_flags_for_active_services(&metrics);
@ -232,6 +236,21 @@ impl Agent {
format!("v{}", env!("CARGO_PKG_VERSION")) format!("v{}", env!("CARGO_PKG_VERSION"))
} }
/// Create heartbeat metric for host connectivity detection
fn get_heartbeat_metric(&self) -> Metric {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Metric::new(
"agent_heartbeat".to_string(),
MetricValue::Integer(timestamp as i64),
Status::Ok,
)
}
async fn handle_commands(&mut self) -> Result<()> { async fn handle_commands(&mut self) -> Result<()> {
// Try to receive commands (non-blocking) // Try to receive commands (non-blocking)

View File

@ -66,8 +66,6 @@ impl ZmqHandler {
} }
/// Send heartbeat (placeholder for future use)
/// Try to receive a command (non-blocking) /// Try to receive a command (non-blocking)
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> { pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
match self.command_receiver.recv_bytes(zmq::DONTWAIT) { match self.command_receiver.recv_bytes(zmq::DONTWAIT) {

View File

@ -28,8 +28,6 @@ pub struct ZmqConfig {
pub publisher_port: u16, pub publisher_port: u16,
pub command_port: u16, pub command_port: u16,
pub bind_address: String, pub bind_address: String,
pub timeout_ms: u64,
pub heartbeat_interval_ms: u64,
pub transmission_interval_seconds: u64, pub transmission_interval_seconds: u64,
} }

View File

@ -19,10 +19,6 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> {
bail!("ZMQ bind address cannot be empty"); bail!("ZMQ bind address cannot be empty");
} }
if config.zmq.timeout_ms == 0 {
bail!("ZMQ timeout cannot be 0");
}
// Validate collection interval // Validate collection interval
if config.collection_interval_seconds == 0 { if config.collection_interval_seconds == 0 {
bail!("Collection interval cannot be 0"); bail!("Collection interval cannot be 0");

View File

@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.56" version = "0.1.57"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -22,7 +22,7 @@ pub struct Dashboard {
terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>, terminal: Option<Terminal<CrosstermBackend<io::Stdout>>>,
headless: bool, headless: bool,
initial_commands_sent: std::collections::HashSet<String>, initial_commands_sent: std::collections::HashSet<String>,
_config: DashboardConfig, config: DashboardConfig,
} }
impl Dashboard { impl Dashboard {
@ -133,7 +133,7 @@ impl Dashboard {
terminal, terminal,
headless, headless,
initial_commands_sent: std::collections::HashSet::new(), initial_commands_sent: std::collections::HashSet::new(),
_config: config, config,
}) })
} }
@ -247,7 +247,7 @@ impl Dashboard {
if let Some(ref mut tui_app) = self.tui_app { if let Some(ref mut tui_app) = self.tui_app {
let connected_hosts = self let connected_hosts = self
.metric_store .metric_store
.get_connected_hosts(Duration::from_secs(30)); .get_connected_hosts(Duration::from_secs(self.config.zmq.heartbeat_timeout_seconds));
tui_app.update_hosts(connected_hosts); tui_app.update_hosts(connected_hosts);

View File

@ -141,9 +141,9 @@ impl ZmqConsumer {
} }
} }
/// Receive metrics from any connected agent (non-blocking) /// Receive metrics from any connected agent (with timeout)
pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> { pub async fn receive_metrics(&mut self) -> Result<Option<MetricMessage>> {
match self.subscriber.recv_bytes(zmq::DONTWAIT) { match self.subscriber.recv_bytes(0) {
Ok(data) => { Ok(data) => {
debug!("Received {} bytes from ZMQ", data.len()); debug!("Received {} bytes from ZMQ", data.len());

View File

@ -16,6 +16,13 @@ pub struct DashboardConfig {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZmqConfig { pub struct ZmqConfig {
pub subscriber_ports: Vec<u16>, pub subscriber_ports: Vec<u16>,
/// Heartbeat timeout in seconds - hosts considered offline if no heartbeat received within this time
#[serde(default = "default_heartbeat_timeout_seconds")]
pub heartbeat_timeout_seconds: u64,
}
fn default_heartbeat_timeout_seconds() -> u64 {
10 // Default to 10 seconds - allows for multiple missed heartbeats
} }
/// Individual host configuration details /// Individual host configuration details

View File

@ -11,8 +11,8 @@ pub struct MetricStore {
current_metrics: HashMap<String, HashMap<String, Metric>>, current_metrics: HashMap<String, HashMap<String, Metric>>,
/// Historical metrics for trending /// Historical metrics for trending
historical_metrics: HashMap<String, Vec<MetricDataPoint>>, historical_metrics: HashMap<String, Vec<MetricDataPoint>>,
/// Last update timestamp per host /// Last heartbeat timestamp per host
last_update: HashMap<String, Instant>, last_heartbeat: HashMap<String, Instant>,
/// Configuration /// Configuration
max_metrics_per_host: usize, max_metrics_per_host: usize,
history_retention: Duration, history_retention: Duration,
@ -23,7 +23,7 @@ impl MetricStore {
Self { Self {
current_metrics: HashMap::new(), current_metrics: HashMap::new(),
historical_metrics: HashMap::new(), historical_metrics: HashMap::new(),
last_update: HashMap::new(), last_heartbeat: HashMap::new(),
max_metrics_per_host, max_metrics_per_host,
history_retention: Duration::from_secs(history_retention_hours * 3600), history_retention: Duration::from_secs(history_retention_hours * 3600),
} }
@ -56,10 +56,13 @@ impl MetricStore {
// Add to history // Add to history
host_history.push(MetricDataPoint { received_at: now }); host_history.push(MetricDataPoint { received_at: now });
}
// Update last update timestamp // Track heartbeat metrics for connectivity detection
self.last_update.insert(hostname.to_string(), now); if metric_name == "agent_heartbeat" {
self.last_heartbeat.insert(hostname.to_string(), now);
debug!("Updated heartbeat for host {}", hostname);
}
}
// Get metrics count before cleanup // Get metrics count before cleanup
let metrics_count = host_metrics.len(); let metrics_count = host_metrics.len();
@ -88,16 +91,18 @@ impl MetricStore {
} }
} }
/// Get connected hosts (hosts with recent updates) /// Get connected hosts (hosts with recent heartbeats)
pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> { pub fn get_connected_hosts(&self, timeout: Duration) -> Vec<String> {
let now = Instant::now(); let now = Instant::now();
self.last_update self.last_heartbeat
.iter() .iter()
.filter_map(|(hostname, &last_update)| { .filter_map(|(hostname, &last_heartbeat)| {
if now.duration_since(last_update) <= timeout { if now.duration_since(last_heartbeat) <= timeout {
Some(hostname.clone()) Some(hostname.clone())
} else { } else {
debug!("Host {} considered offline - last heartbeat was {:?} ago",
hostname, now.duration_since(last_heartbeat));
None None
} }
}) })

View File

@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.56" version = "0.1.57"
edition = "2021" edition = "2021"
[dependencies] [dependencies]