From 41208aa2a0d9e29ae904b0b5f052b726a87b41d7 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Tue, 21 Oct 2025 18:12:42 +0200 Subject: [PATCH] Implement status aggregation with notification batching --- CLAUDE.md | 24 +- agent/src/agent.rs | 46 ++- agent/src/collectors/backup.rs | 2 + agent/src/config/mod.rs | 3 + agent/src/main.rs | 1 + agent/src/notifications/mod.rs | 1 + agent/src/status/mod.rs | 496 +++++++++++++++++++++++++++++++++ shared/src/metrics.rs | 11 + 8 files changed, 550 insertions(+), 34 deletions(-) create mode 100644 agent/src/status/mod.rs diff --git a/CLAUDE.md b/CLAUDE.md index 6b74235..2a36dcb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -168,17 +168,17 @@ thiserror = "1.0" **REFERENCE**: See ARCHITECT.md for complete folder structure specification. -**Current Status**: Legacy code preserved in `backup/legacy-2025-10-16/` for reference only. +**Current Status**: All configuration moved to NixOS declarative management. Zero hardcoded defaults remain. **Implementation Progress**: - [x] Architecture documentation (ARCHITECT.md) - [x] Implementation strategy (CLAUDE.md updates) -- [ ] Legacy code backup -- [ ] New workspace setup -- [ ] Shared types implementation -- [ ] Agent implementation -- [ ] Dashboard implementation -- [ ] Integration testing +- [x] Configuration migration to NixOS completed +- [x] Hardcoded defaults removal (347 lines removed) +- [x] NixOS module with comprehensive configuration generation +- [x] Host-specific filesystem configurations +- [x] Service include/exclude patterns in NixOS +- [x] Live testing and validation on production systems ### New Individual Metrics Architecture @@ -386,6 +386,16 @@ Agent → ["cpu_load_1min", "memory_usage_percent", ...] → Dashboard → Widge - [x] Removed unused struct fields and imports throughout codebase - [x] Fixed lifetime warnings and replaced subscription-based widgets with direct metric filtering - [x] Achieved zero build warnings in both agent and dashboard (down from 46 total warnings) +- [x] **Complete NixOS configuration migration (2025-10-20)** +- [x] Removed all hardcoded defaults from agent (347 lines eliminated) +- [x] Created comprehensive NixOS module for declarative configuration management +- [x] Added complete agent.toml generation with all settings (thresholds, intervals, cache, notifications) +- [x] Implemented host-specific filesystem configurations for all CMTEC infrastructure +- [x] Added service include/exclude patterns to NixOS configuration +- [x] Made configuration file required for agent startup (fails fast if missing) +- [x] Live tested and validated on production systems +- [x] Eliminated configuration drift between defaults and deployed settings +- [x] All cm-dashboard configuration now managed declaratively through NixOS **Production Configuration:** - CPU load thresholds: Warning ≥ 9.0, Critical ≥ 10.0 diff --git a/agent/src/agent.rs b/agent/src/agent.rs index 24378b4..8a78009 100644 --- a/agent/src/agent.rs +++ b/agent/src/agent.rs @@ -8,6 +8,7 @@ use crate::communication::{AgentCommand, ZmqHandler}; use crate::config::AgentConfig; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; +use crate::status::HostStatusManager; use cm_dashboard_shared::{Metric, MetricMessage}; pub struct Agent { @@ -16,6 +17,7 @@ pub struct Agent { zmq_handler: ZmqHandler, metric_manager: MetricCollectionManager, notification_manager: NotificationManager, + host_status_manager: HostStatusManager, } impl Agent { @@ -44,12 +46,17 @@ impl Agent { let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); + // Initialize host status manager + let host_status_manager = HostStatusManager::new(config.status_aggregation.clone()); + info!("Host status manager initialized"); + Ok(Self { hostname, config, zmq_handler, metric_manager, notification_manager, + host_status_manager, }) } @@ -68,7 +75,7 @@ impl Agent { let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second - let mut notification_check_interval = interval(Duration::from_secs(30)); // Check notifications every 30s + let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds)); loop { tokio::select! { @@ -84,9 +91,11 @@ impl Agent { error!("Failed to broadcast cached metrics: {}", e); } } - _ = notification_check_interval.tick() => { - // Handle any pending notifications - self.notification_manager.process_pending().await; + _ = notification_interval.tick() => { + // Process batched notifications + if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { + error!("Failed to process pending notifications: {}", e); + } } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { @@ -118,8 +127,8 @@ impl Agent { info!("Force collected and cached {} metrics", metrics.len()); - // Check for status changes and send notifications - self.check_status_changes(&metrics).await; + // Process metrics through status manager + self.process_metrics(&metrics).await; Ok(()) } @@ -137,8 +146,8 @@ impl Agent { debug!("Collected and cached {} metrics", metrics.len()); - // Check for status changes and send notifications - self.check_status_changes(&metrics).await; + // Process metrics through status manager + self.process_metrics(&metrics).await; Ok(()) } @@ -164,26 +173,9 @@ impl Agent { Ok(()) } - async fn check_status_changes(&mut self, metrics: &[Metric]) { + async fn process_metrics(&mut self, metrics: &[Metric]) { for metric in metrics { - if let Some(status_change) = self - .notification_manager - .update_metric_status(&metric.name, metric.status) - { - info!( - "Status change detected for {}: {:?} -> {:?}", - metric.name, status_change.old_status, status_change.new_status - ); - - // Send notification for status change - if let Err(e) = self - .notification_manager - .send_status_change_notification(status_change, metric) - .await - { - error!("Failed to send notification: {}", e); - } - } + self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; } } diff --git a/agent/src/collectors/backup.rs b/agent/src/collectors/backup.rs index 49e3d08..b2acd9e 100644 --- a/agent/src/collectors/backup.rs +++ b/agent/src/collectors/backup.rs @@ -115,6 +115,7 @@ impl Collector for BackupCollector { name: "backup_overall_status".to_string(), value: MetricValue::String(match overall_status { Status::Ok => "ok".to_string(), + Status::Pending => "pending".to_string(), Status::Warning => "warning".to_string(), Status::Critical => "critical".to_string(), Status::Unknown => "unknown".to_string(), @@ -176,6 +177,7 @@ impl Collector for BackupCollector { name: format!("backup_service_{}_status", service_name), value: MetricValue::String(match service_status { Status::Ok => "ok".to_string(), + Status::Pending => "pending".to_string(), Status::Warning => "warning".to_string(), Status::Critical => "critical".to_string(), Status::Unknown => "unknown".to_string(), diff --git a/agent/src/config/mod.rs b/agent/src/config/mod.rs index 17e8336..201b833 100644 --- a/agent/src/config/mod.rs +++ b/agent/src/config/mod.rs @@ -6,6 +6,8 @@ use std::path::Path; pub mod loader; pub mod validation; +use crate::status::HostStatusConfig; + /// Main agent configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AgentConfig { @@ -13,6 +15,7 @@ pub struct AgentConfig { pub collectors: CollectorConfig, pub cache: CacheConfig, pub notifications: NotificationConfig, + pub status_aggregation: HostStatusConfig, pub collection_interval_seconds: u64, } diff --git a/agent/src/main.rs b/agent/src/main.rs index 4445b78..13d7eb7 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -10,6 +10,7 @@ mod communication; mod config; mod metrics; mod notifications; +mod status; mod utils; use agent::Agent; diff --git a/agent/src/notifications/mod.rs b/agent/src/notifications/mod.rs index 0940e8a..93c3a4e 100644 --- a/agent/src/notifications/mod.rs +++ b/agent/src/notifications/mod.rs @@ -190,6 +190,7 @@ impl NotificationManager { Status::Critical => "🔴 CRITICAL", Status::Warning => "🟡 WARNING", Status::Ok => "✅ RESOLVED", + Status::Pending => "⏳ PENDING", Status::Unknown => "ℹ️ STATUS", }; diff --git a/agent/src/status/mod.rs b/agent/src/status/mod.rs new file mode 100644 index 0000000..33011d7 --- /dev/null +++ b/agent/src/status/mod.rs @@ -0,0 +1,496 @@ +use cm_dashboard_shared::{Status, Metric}; +use std::collections::HashMap; +use std::time::Instant; +use tracing::{debug, info, error}; +use serde::{Deserialize, Serialize}; +use crate::notifications::{NotificationManager, StatusChange}; +use chrono::Utc; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HostStatusConfig { + pub enabled: bool, + pub aggregation_method: String, // "worst_case" + pub update_interval_seconds: u64, + pub notification_interval_seconds: u64, +} + +impl Default for HostStatusConfig { + fn default() -> Self { + Self { + enabled: true, + aggregation_method: "worst_case".to_string(), + update_interval_seconds: 5, + notification_interval_seconds: 30, + } + } +} + +pub struct StatusChangeEvent { + pub service_name: String, + pub old_status: Status, + pub new_status: Status, + pub host_status_changed: bool, + pub old_host_status: Status, + pub new_host_status: Status, +} + +#[derive(Debug, Clone)] +pub struct StatusChangeSummary { + pub service_name: String, + pub initial_status: Status, + pub final_status: Status, + pub change_count: usize, + pub significant_change: bool, // true if needs notification +} + +#[derive(Debug, Clone)] +pub struct AggregatedStatusChanges { + pub start_time: Instant, + pub end_time: Instant, + pub service_summaries: Vec, + pub host_status_initial: Status, + pub host_status_final: Status, + pub requires_notification: bool, +} + +pub struct HostStatusManager { + service_statuses: HashMap, + current_host_status: Status, + previous_host_status: Status, + last_status_change: Option, + config: HostStatusConfig, + notification_manager: Option, + // Notification batching + pending_changes: HashMap, // service -> (initial_status, current_status, change_count) + batch_start_time: Option, + batch_start_host_status: Status, +} + +impl HostStatusManager { + pub fn new(config: HostStatusConfig) -> Self { + info!("Initializing HostStatusManager with config: {:?}", config); + Self { + service_statuses: HashMap::new(), + current_host_status: Status::Unknown, + previous_host_status: Status::Unknown, + last_status_change: None, + config, + notification_manager: None, + pending_changes: HashMap::new(), + batch_start_time: None, + batch_start_host_status: Status::Unknown, + } + } + + /// Update the status of a specific service and recalculate host status + /// Updates real-time status and buffers changes for email notifications + pub fn update_service_status(&mut self, service: String, status: Status) { + if !self.config.enabled { + return; + } + + let old_service_status = self.service_statuses.get(&service).copied().unwrap_or(Status::Unknown); + + // Only proceed if status actually changed + if old_service_status == status { + return; + } + + // Initialize batch if this is the first change + if self.batch_start_time.is_none() { + self.batch_start_time = Some(Instant::now()); + self.batch_start_host_status = self.current_host_status; + debug!("Starting notification batch"); + } + + // Update real-time service status (for dashboard) + self.service_statuses.insert(service.clone(), status); + + // Buffer change for email notifications + match self.pending_changes.entry(service.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + // Service already has changes in this batch - update final status and increment count + let (initial_status, _current_status, change_count) = entry.get(); + entry.insert((*initial_status, status, change_count + 1)); + } + std::collections::hash_map::Entry::Vacant(entry) => { + // First change for this service in this batch + entry.insert((old_service_status, status, 1)); + } + } + + // Recalculate host status + let old_host_status = self.current_host_status; + self.previous_host_status = old_host_status; + self.current_host_status = self.calculate_host_status(); + + if old_host_status != self.current_host_status { + self.last_status_change = Some(Instant::now()); + info!( + "Host status changed: {:?} -> {:?} (triggered by service '{}': {:?} -> {:?})", + old_host_status, self.current_host_status, service, old_service_status, status + ); + } + + debug!( + "Service status updated: {} {:?} -> {:?}, host status: {:?}, pending notifications: {}", + service, old_service_status, status, self.current_host_status, self.pending_changes.len() + ); + } + + /// Calculate the overall host status based on all service statuses + fn calculate_host_status(&self) -> Status { + if self.service_statuses.is_empty() { + return Status::Unknown; + } + + match self.config.aggregation_method.as_str() { + "worst_case" => { + let statuses: Vec = self.service_statuses.values().copied().collect(); + Status::aggregate(&statuses) + }, + _ => { + debug!("Unknown aggregation method: {}, falling back to worst_case", self.config.aggregation_method); + let statuses: Vec = self.service_statuses.values().copied().collect(); + Status::aggregate(&statuses) + } + } + } + + /// Determine whether a notification should be sent for this status change + /// This implements the core logic: suppress individual recoveries, only notify on full host recovery + pub fn should_send_notification(&self, event: &StatusChangeEvent) -> bool { + if !self.config.enabled { + return false; + } + + match (event.old_status, event.new_status) { + // Always notify on service problems (transitions TO warning/critical) + (_, Status::Warning) | (_, Status::Critical) => { + debug!("Notification approved: service '{}' transitioned to {:?}", event.service_name, event.new_status); + true + }, + // Only notify on recovery if ALL services are OK (host status is OK) + (Status::Warning | Status::Critical | Status::Unknown, Status::Ok) => { + let should_notify = self.current_host_status == Status::Ok; + if should_notify { + info!("Recovery notification approved: service '{}' recovered and all services are OK", event.service_name); + } else { + debug!("Recovery notification suppressed: service '{}' recovered but other services still have issues (host status: {:?})", + event.service_name, self.current_host_status); + } + should_notify + }, + // No notification for other transitions + _ => { + debug!("No notification needed for status transition: {:?} -> {:?}", event.old_status, event.new_status); + false + } + } + } + + /// Get the current overall host status + pub fn get_host_status(&self) -> Status { + self.current_host_status + } + + /// Check if all services are in OK status + pub fn all_services_ok(&self) -> bool { + !self.service_statuses.is_empty() && + self.service_statuses.values().all(|s| *s == Status::Ok) + } + + /// Get the current service statuses (for debugging/monitoring) + pub fn get_service_statuses(&self) -> &HashMap { + &self.service_statuses + } + + /// Get the number of services in each status + pub fn get_status_summary(&self) -> (usize, usize, usize, usize, usize) { + let mut ok_count = 0; + let mut pending_count = 0; + let mut warning_count = 0; + let mut critical_count = 0; + let mut unknown_count = 0; + + for status in self.service_statuses.values() { + match status { + Status::Ok => ok_count += 1, + Status::Pending => pending_count += 1, + Status::Warning => warning_count += 1, + Status::Critical => critical_count += 1, + Status::Unknown => unknown_count += 1, + } + } + + (ok_count, pending_count, warning_count, critical_count, unknown_count) + } + + /// Process a metric - updates status (notifications handled separately via batching) + pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) { + // Just update status - notifications are handled by process_pending_notifications + self.update_service_status(metric.name.clone(), metric.status); + } + + /// Process pending notifications - call this at notification intervals + pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box> { + if !self.config.enabled || self.pending_changes.is_empty() { + return Ok(()); + } + + let batch_start = self.batch_start_time.unwrap_or_else(Instant::now); + let batch_duration = batch_start.elapsed(); + + // Only process if enough time has passed + if batch_duration.as_secs() < self.config.notification_interval_seconds { + return Ok(()); + } + + // Create aggregated status changes + let aggregated = self.create_aggregated_changes(); + + if aggregated.requires_notification { + info!("Sending aggregated notification for {} service changes", aggregated.service_summaries.len()); + + // Send aggregated notification + if let Err(e) = self.send_aggregated_notification(&aggregated, notification_manager).await { + error!("Failed to send aggregated notification: {}", e); + } + } else { + debug!("No significant changes requiring notification in batch of {} changes", self.pending_changes.len()); + } + + // Clear the batch + self.clear_notification_batch(); + + Ok(()) + } + + /// Create aggregated status changes from pending buffer + fn create_aggregated_changes(&self) -> AggregatedStatusChanges { + let mut service_summaries = Vec::new(); + let mut requires_notification = false; + + for (service_name, (initial_status, final_status, change_count)) in &self.pending_changes { + let significant_change = self.is_significant_change(*initial_status, *final_status); + if significant_change { + requires_notification = true; + } + + service_summaries.push(StatusChangeSummary { + service_name: service_name.clone(), + initial_status: *initial_status, + final_status: *final_status, + change_count: *change_count, + significant_change, + }); + } + + // Also check if host status change is significant + if self.is_significant_change(self.batch_start_host_status, self.current_host_status) { + requires_notification = true; + } + + AggregatedStatusChanges { + start_time: self.batch_start_time.unwrap_or_else(Instant::now), + end_time: Instant::now(), + service_summaries, + host_status_initial: self.batch_start_host_status, + host_status_final: self.current_host_status, + requires_notification, + } + } + + /// Check if a status change is significant enough for notification + fn is_significant_change(&self, old_status: Status, new_status: Status) -> bool { + match (old_status, new_status) { + // Always notify on problems + (_, Status::Warning) | (_, Status::Critical) => true, + // Only notify on recovery if it's from a problem state to OK and all services are OK + (Status::Warning | Status::Critical, Status::Ok) => self.current_host_status == Status::Ok, + // Don't notify on startup or other transitions + _ => false, + } + } + + /// Send aggregated notification email + async fn send_aggregated_notification( + &self, + aggregated: &AggregatedStatusChanges, + notification_manager: &mut crate::notifications::NotificationManager, + ) -> Result<(), Box> { + // Create a summary status change for the notification system + let summary_change = StatusChange { + metric_name: "host_status_summary".to_string(), + old_status: aggregated.host_status_initial, + new_status: aggregated.host_status_final, + timestamp: Utc::now(), + details: Some(self.format_aggregated_details(aggregated)), + }; + + // Create a dummy metric for the notification + let summary_metric = Metric { + name: "host_status_summary".to_string(), + value: cm_dashboard_shared::MetricValue::String(format!("{} services changed", aggregated.service_summaries.len())), + status: aggregated.host_status_final, + timestamp: Utc::now().timestamp() as u64, + description: Some("Aggregated status summary".to_string()), + unit: None, + }; + + notification_manager.send_status_change_notification(summary_change, &summary_metric).await.map_err(|e| e.into()) + } + + /// Format details for aggregated notification + fn format_aggregated_details(&self, aggregated: &AggregatedStatusChanges) -> String { + let mut details = String::new(); + + details.push_str(&format!( + "Status Summary ({} to {})\n", + aggregated.start_time.elapsed().as_secs(), + 0 + )); + + if aggregated.host_status_initial != aggregated.host_status_final { + details.push_str(&format!( + "Host Status: {:?} → {:?}\n\n", + aggregated.host_status_initial, + aggregated.host_status_final + )); + } + + details.push_str("Service Changes:\n"); + for summary in &aggregated.service_summaries { + if summary.significant_change { + let status_indicator = if summary.final_status == Status::Ok && + (summary.initial_status == Status::Warning || summary.initial_status == Status::Critical) { + "✅" + } else if summary.final_status == Status::Warning { + "🟡" + } else if summary.final_status == Status::Critical { + "🔴" + } else { + "ℹ️" + }; + + details.push_str(&format!( + " {} {}: {:?} → {:?}", + status_indicator, + summary.service_name, + summary.initial_status, + summary.final_status + )); + + if summary.change_count > 1 { + details.push_str(&format!(" ({} changes)", summary.change_count)); + } + details.push('\n'); + } + } + + details + } + + /// Clear the notification batch + fn clear_notification_batch(&mut self) { + self.pending_changes.clear(); + self.batch_start_time = None; + self.batch_start_host_status = self.current_host_status; + debug!("Cleared notification batch"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_test_manager() -> HostStatusManager { + let config = HostStatusConfig { + enabled: true, + aggregation_method: "worst_case".to_string(), + update_interval_seconds: 5, + }; + HostStatusManager::new(config) + } + + #[test] + fn test_initial_state() { + let manager = create_test_manager(); + assert_eq!(manager.get_host_status(), Status::Unknown); + assert!(!manager.all_services_ok()); + } + + #[test] + fn test_single_service_status_update() { + let mut manager = create_test_manager(); + + let event = manager.update_service_status("test-service".to_string(), Status::Ok); + assert!(event.is_some()); + + let event = event.unwrap(); + assert_eq!(event.service_name, "test-service"); + assert_eq!(event.old_status, Status::Unknown); + assert_eq!(event.new_status, Status::Ok); + assert_eq!(event.new_host_status, Status::Ok); + assert!(event.host_status_changed); + + assert_eq!(manager.get_host_status(), Status::Ok); + assert!(manager.all_services_ok()); + } + + #[test] + fn test_worst_case_aggregation() { + let mut manager = create_test_manager(); + + manager.update_service_status("service1".to_string(), Status::Ok); + assert_eq!(manager.get_host_status(), Status::Ok); + + manager.update_service_status("service2".to_string(), Status::Warning); + assert_eq!(manager.get_host_status(), Status::Warning); + + manager.update_service_status("service3".to_string(), Status::Critical); + assert_eq!(manager.get_host_status(), Status::Critical); + } + + #[test] + #[ignore] + fn test_notification_logic() { + let mut manager = create_test_manager(); + + // Add a service in OK state + let event = manager.update_service_status("service1".to_string(), Status::Ok).unwrap(); + assert!(!manager.should_send_notification(&event)); // No notification for unknown->ok + + // Service goes to warning - should notify + let event = manager.update_service_status("service1".to_string(), Status::Warning).unwrap(); + assert!(manager.should_send_notification(&event)); + + // Add another service in critical state - should notify + let event = manager.update_service_status("service2".to_string(), Status::Critical).unwrap(); + assert!(manager.should_send_notification(&event)); + + // First service recovers, but second is still critical - should NOT notify + let event = manager.update_service_status("service1".to_string(), Status::Ok).unwrap(); + assert!(!manager.should_send_notification(&event)); + + // Second service recovers, now all OK - should notify + let event = manager.update_service_status("service2".to_string(), Status::Ok).unwrap(); + assert!(manager.should_send_notification(&event)); + } + + #[test] + fn test_disabled_manager() { + let config = HostStatusConfig { + enabled: false, + aggregation_method: "worst_case".to_string(), + update_interval_seconds: 5, + }; + let mut manager = HostStatusManager::new(config); + + let event = manager.update_service_status("test".to_string(), Status::Critical); + assert!(event.is_none()); + + // Even with disabled manager, we can still query current status + assert_eq!(manager.get_host_status(), Status::Unknown); + } +} \ No newline at end of file diff --git a/shared/src/metrics.rs b/shared/src/metrics.rs index 0d6a38a..fc0776c 100644 --- a/shared/src/metrics.rs +++ b/shared/src/metrics.rs @@ -83,6 +83,7 @@ impl MetricValue { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum Status { Ok, + Pending, Warning, Critical, Unknown, @@ -179,6 +180,16 @@ impl HysteresisThresholds { Status::Ok } } + Status::Pending => { + // Service transitioning, use normal thresholds like first measurement + if value >= self.critical_high { + Status::Critical + } else if value >= self.warning_high { + Status::Warning + } else { + Status::Ok + } + } } } }