Compare commits

..

2 Commits

Author SHA1 Message Date
a2519b2814 Update version to 0.1.20 and fix email notification aggregation
All checks were successful
Build and Release / build-and-release (push) Successful in 1m11s
- Fix email notification aggregation to send batched notifications instead of individual emails
- Fix startup data collection to properly process initial status without triggering change notifications
- Maintain event-driven transmission while preserving aggregated notification batching
- Update version from 0.1.19 to 0.1.20 across all components
2025-10-28 10:48:29 +01:00
91f037aa3e Update to v0.1.19 with event-driven status aggregation
All checks were successful
Build and Release / build-and-release (push) Successful in 2m4s
Major architectural improvements:

CORE CHANGES:
- Remove notification_interval_seconds - status aggregation now immediate
- Status calculation moved to collection phase instead of transmission
- Event-driven transmission triggers immediately on status changes
- Dual transmission strategy: immediate on change + periodic backup
- Real-time notifications without batching delays

TECHNICAL IMPROVEMENTS:
- process_metric() now returns bool indicating status change
- Immediate ZMQ broadcast when status changes detected
- Status aggregation happens during metric collection, not later
- Legacy get_nixos_build_info() method removed (unused)
- All compilation warnings fixed

BEHAVIOR CHANGES:
- Critical alerts sent instantly instead of waiting for intervals
- Dashboard receives real-time status updates
- Notifications triggered immediately on status transitions
- Backup periodic transmission every 1s ensures heartbeat

This provides much more responsive monitoring with instant alerting
while maintaining the reliability of periodic transmission as backup.
2025-10-28 10:36:34 +01:00
8 changed files with 66 additions and 57 deletions

6
Cargo.lock generated
View File

@@ -270,7 +270,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]] [[package]]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.18" version = "0.1.19"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@@ -291,7 +291,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.18" version = "0.1.19"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -314,7 +314,7 @@ dependencies = [
[[package]] [[package]]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.18" version = "0.1.19"
dependencies = [ dependencies = [
"chrono", "chrono",
"serde", "serde",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-agent" name = "cm-dashboard-agent"
version = "0.1.18" version = "0.1.20"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -75,7 +75,6 @@ impl Agent {
let mut collection_interval = let mut collection_interval =
interval(Duration::from_secs(self.config.collection_interval_seconds)); interval(Duration::from_secs(self.config.collection_interval_seconds));
let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds));
let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds));
loop { loop {
tokio::select! { tokio::select! {
@@ -86,13 +85,12 @@ impl Agent {
} }
} }
_ = transmission_interval.tick() => { _ = transmission_interval.tick() => {
// Send all metrics via ZMQ every 1 second // Send all metrics via ZMQ and process notifications immediately
if let Err(e) = self.broadcast_all_metrics().await { if let Err(e) = self.broadcast_all_metrics().await {
error!("Failed to broadcast metrics: {}", e); error!("Failed to broadcast metrics: {}", e);
} }
}
_ = notification_interval.tick() => { // Process notifications immediately with each transmission
// Process batched notifications
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
error!("Failed to process pending notifications: {}", e); error!("Failed to process pending notifications: {}", e);
} }
@@ -127,8 +125,8 @@ impl Agent {
info!("Force collected and cached {} metrics", metrics.len()); info!("Force collected and cached {} metrics", metrics.len());
// Process metrics through status manager // Process metrics through status manager (collect status data at startup)
self.process_metrics(&metrics).await; let _status_changed = self.process_metrics(&metrics).await;
Ok(()) Ok(())
} }
@@ -146,8 +144,15 @@ impl Agent {
debug!("Collected and cached {} metrics", metrics.len()); debug!("Collected and cached {} metrics", metrics.len());
// Process metrics through status manager // Process metrics through status manager and trigger immediate transmission if status changed
self.process_metrics(&metrics).await; let status_changed = self.process_metrics(&metrics).await;
if status_changed {
info!("Status change detected - triggering immediate metric transmission");
if let Err(e) = self.broadcast_all_metrics().await {
error!("Failed to broadcast metrics after status change: {}", e);
}
}
Ok(()) Ok(())
} }
@@ -181,11 +186,15 @@ impl Agent {
Ok(()) Ok(())
} }
async fn process_metrics(&mut self, metrics: &[Metric]) { async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
let mut status_changed = false;
for metric in metrics { for metric in metrics {
self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await {
status_changed = true;
} }
} }
status_changed
}
/// Create agent version metric for cross-host version comparison /// Create agent version metric for cross-host version comparison
fn get_agent_version_metric(&self) -> Metric { fn get_agent_version_metric(&self) -> Metric {

View File

@@ -19,31 +19,6 @@ impl NixOSCollector {
Self {} Self {}
} }
/// Get NixOS build information
fn get_nixos_build_info(&self) -> Result<String, Box<dyn std::error::Error>> {
// Get nixos-version output directly
let output = Command::new("nixos-version").output()?;
if !output.status.success() {
return Err("nixos-version command failed".into());
}
let version_line = String::from_utf8_lossy(&output.stdout);
let version = version_line.trim();
if version.is_empty() {
return Err("Empty nixos-version output".into());
}
// Remove codename part (e.g., "(Warbler)")
let clean_version = if let Some(pos) = version.find(" (") {
version[..pos].to_string()
} else {
version.to_string()
};
Ok(clean_version)
}
/// Get agent hash from binary path /// Get agent hash from binary path
fn get_agent_hash(&self) -> Result<String, Box<dyn std::error::Error>> { fn get_agent_hash(&self) -> Result<String, Box<dyn std::error::Error>> {

View File

@@ -9,7 +9,6 @@ use chrono::Utc;
pub struct HostStatusConfig { pub struct HostStatusConfig {
pub enabled: bool, pub enabled: bool,
pub aggregation_method: String, // "worst_case" pub aggregation_method: String, // "worst_case"
pub notification_interval_seconds: u64,
} }
impl Default for HostStatusConfig { impl Default for HostStatusConfig {
@@ -17,7 +16,6 @@ impl Default for HostStatusConfig {
Self { Self {
enabled: true, enabled: true,
aggregation_method: "worst_case".to_string(), aggregation_method: "worst_case".to_string(),
notification_interval_seconds: 30,
} }
} }
} }
@@ -160,25 +158,52 @@ impl HostStatusManager {
/// Process a metric - updates status (notifications handled separately via batching) /// Process a metric - updates status and queues for aggregated notifications if status changed
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) { pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) -> bool {
// Just update status - notifications are handled by process_pending_notifications let old_status = self.service_statuses.get(&metric.name).copied();
self.update_service_status(metric.name.clone(), metric.status); let new_status = metric.status;
// Update status
self.update_service_status(metric.name.clone(), new_status);
// Check if status actually changed (ignore first-time status setting)
if let Some(old_status) = old_status {
if old_status != new_status {
debug!("Status change detected for {}: {:?} -> {:?}", metric.name, old_status, new_status);
// Queue change for aggregated notification (not immediate)
self.queue_status_change(&metric.name, old_status, new_status);
return true; // Status changed - caller should trigger immediate transmission
}
} else {
debug!("Initial status set for {}: {:?}", metric.name, new_status);
} }
/// Process pending notifications - call this at notification intervals false // No status change (or first-time status)
}
/// Queue status change for aggregated notification
fn queue_status_change(&mut self, metric_name: &str, old_status: Status, new_status: Status) {
// Add to pending changes for aggregated notification
let entry = self.pending_changes.entry(metric_name.to_string()).or_insert((old_status, old_status, 0));
entry.1 = new_status; // Update final status
entry.2 += 1; // Increment change count
// Set batch start time if this is the first change
if self.batch_start_time.is_none() {
self.batch_start_time = Some(Instant::now());
}
}
/// Process pending notifications - legacy method, now rarely used
pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !self.config.enabled || self.pending_changes.is_empty() { if !self.config.enabled || self.pending_changes.is_empty() {
return Ok(()); return Ok(());
} }
let batch_start = self.batch_start_time.unwrap_or_else(Instant::now); // Process notifications immediately without interval batching
let batch_duration = batch_start.elapsed();
// Only process if enough time has passed
if batch_duration.as_secs() < self.config.notification_interval_seconds {
return Ok(());
}
// Create aggregated status changes // Create aggregated status changes
let aggregated = self.create_aggregated_changes(); let aggregated = self.create_aggregated_changes();

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard" name = "cm-dashboard"
version = "0.1.18" version = "0.1.20"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@@ -14,7 +14,7 @@ use app::Dashboard;
/// Get hardcoded version /// Get hardcoded version
fn get_version() -> &'static str { fn get_version() -> &'static str {
"v0.1.18" "v0.1.20"
} }
/// Check if running inside tmux session /// Check if running inside tmux session

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "cm-dashboard-shared" name = "cm-dashboard-shared"
version = "0.1.18" version = "0.1.20"
edition = "2021" edition = "2021"
[dependencies] [dependencies]