Implement status aggregation with notification batching

This commit is contained in:
Christoffer Martinsson 2025-10-21 18:12:42 +02:00
parent a937032eb1
commit 41208aa2a0
8 changed files with 550 additions and 34 deletions

View File

@ -168,17 +168,17 @@ thiserror = "1.0"
**REFERENCE**: See ARCHITECT.md for complete folder structure specification.
**Current Status**: Legacy code preserved in `backup/legacy-2025-10-16/` for reference only.
**Current Status**: All configuration moved to NixOS declarative management. Zero hardcoded defaults remain.
**Implementation Progress**:
- [x] Architecture documentation (ARCHITECT.md)
- [x] Implementation strategy (CLAUDE.md updates)
- [ ] Legacy code backup
- [ ] New workspace setup
- [ ] Shared types implementation
- [ ] Agent implementation
- [ ] Dashboard implementation
- [ ] Integration testing
- [x] Configuration migration to NixOS completed
- [x] Hardcoded defaults removal (347 lines removed)
- [x] NixOS module with comprehensive configuration generation
- [x] Host-specific filesystem configurations
- [x] Service include/exclude patterns in NixOS
- [x] Live testing and validation on production systems
### New Individual Metrics Architecture
@ -386,6 +386,16 @@ Agent → ["cpu_load_1min", "memory_usage_percent", ...] → Dashboard → Widge
- [x] Removed unused struct fields and imports throughout codebase
- [x] Fixed lifetime warnings and replaced subscription-based widgets with direct metric filtering
- [x] Achieved zero build warnings in both agent and dashboard (down from 46 total warnings)
- [x] **Complete NixOS configuration migration (2025-10-20)**
- [x] Removed all hardcoded defaults from agent (347 lines eliminated)
- [x] Created comprehensive NixOS module for declarative configuration management
- [x] Added complete agent.toml generation with all settings (thresholds, intervals, cache, notifications)
- [x] Implemented host-specific filesystem configurations for all CMTEC infrastructure
- [x] Added service include/exclude patterns to NixOS configuration
- [x] Made configuration file required for agent startup (fails fast if missing)
- [x] Live tested and validated on production systems
- [x] Eliminated configuration drift between defaults and deployed settings
- [x] All cm-dashboard configuration now managed declaratively through NixOS
**Production Configuration:**
- CPU load thresholds: Warning ≥ 9.0, Critical ≥ 10.0

View File

@ -8,6 +8,7 @@ use crate::communication::{AgentCommand, ZmqHandler};
use crate::config::AgentConfig;
use crate::metrics::MetricCollectionManager;
use crate::notifications::NotificationManager;
use crate::status::HostStatusManager;
use cm_dashboard_shared::{Metric, MetricMessage};
pub struct Agent {
@ -16,6 +17,7 @@ pub struct Agent {
zmq_handler: ZmqHandler,
metric_manager: MetricCollectionManager,
notification_manager: NotificationManager,
host_status_manager: HostStatusManager,
}
impl Agent {
@ -44,12 +46,17 @@ impl Agent {
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
info!("Notification manager initialized");
// Initialize host status manager
let host_status_manager = HostStatusManager::new(config.status_aggregation.clone());
info!("Host status manager initialized");
Ok(Self {
hostname,
config,
zmq_handler,
metric_manager,
notification_manager,
host_status_manager,
})
}
@ -68,7 +75,7 @@ impl Agent {
let mut collection_interval =
interval(Duration::from_secs(self.config.collection_interval_seconds));
let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second
let mut notification_check_interval = interval(Duration::from_secs(30)); // Check notifications every 30s
let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds));
loop {
tokio::select! {
@ -84,9 +91,11 @@ impl Agent {
error!("Failed to broadcast cached metrics: {}", e);
}
}
_ = notification_check_interval.tick() => {
// Handle any pending notifications
self.notification_manager.process_pending().await;
_ = notification_interval.tick() => {
// Process batched notifications
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
error!("Failed to process pending notifications: {}", e);
}
}
// Handle incoming commands (check periodically)
_ = tokio::time::sleep(Duration::from_millis(100)) => {
@ -118,8 +127,8 @@ impl Agent {
info!("Force collected and cached {} metrics", metrics.len());
// Check for status changes and send notifications
self.check_status_changes(&metrics).await;
// Process metrics through status manager
self.process_metrics(&metrics).await;
Ok(())
}
@ -137,8 +146,8 @@ impl Agent {
debug!("Collected and cached {} metrics", metrics.len());
// Check for status changes and send notifications
self.check_status_changes(&metrics).await;
// Process metrics through status manager
self.process_metrics(&metrics).await;
Ok(())
}
@ -164,26 +173,9 @@ impl Agent {
Ok(())
}
async fn check_status_changes(&mut self, metrics: &[Metric]) {
async fn process_metrics(&mut self, metrics: &[Metric]) {
for metric in metrics {
if let Some(status_change) = self
.notification_manager
.update_metric_status(&metric.name, metric.status)
{
info!(
"Status change detected for {}: {:?} -> {:?}",
metric.name, status_change.old_status, status_change.new_status
);
// Send notification for status change
if let Err(e) = self
.notification_manager
.send_status_change_notification(status_change, metric)
.await
{
error!("Failed to send notification: {}", e);
}
}
self.host_status_manager.process_metric(metric, &mut self.notification_manager).await;
}
}

View File

@ -115,6 +115,7 @@ impl Collector for BackupCollector {
name: "backup_overall_status".to_string(),
value: MetricValue::String(match overall_status {
Status::Ok => "ok".to_string(),
Status::Pending => "pending".to_string(),
Status::Warning => "warning".to_string(),
Status::Critical => "critical".to_string(),
Status::Unknown => "unknown".to_string(),
@ -176,6 +177,7 @@ impl Collector for BackupCollector {
name: format!("backup_service_{}_status", service_name),
value: MetricValue::String(match service_status {
Status::Ok => "ok".to_string(),
Status::Pending => "pending".to_string(),
Status::Warning => "warning".to_string(),
Status::Critical => "critical".to_string(),
Status::Unknown => "unknown".to_string(),

View File

@ -6,6 +6,8 @@ use std::path::Path;
pub mod loader;
pub mod validation;
use crate::status::HostStatusConfig;
/// Main agent configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentConfig {
@ -13,6 +15,7 @@ pub struct AgentConfig {
pub collectors: CollectorConfig,
pub cache: CacheConfig,
pub notifications: NotificationConfig,
pub status_aggregation: HostStatusConfig,
pub collection_interval_seconds: u64,
}

View File

@ -10,6 +10,7 @@ mod communication;
mod config;
mod metrics;
mod notifications;
mod status;
mod utils;
use agent::Agent;

View File

@ -190,6 +190,7 @@ impl NotificationManager {
Status::Critical => "🔴 CRITICAL",
Status::Warning => "🟡 WARNING",
Status::Ok => "✅ RESOLVED",
Status::Pending => "⏳ PENDING",
Status::Unknown => " STATUS",
};

496
agent/src/status/mod.rs Normal file
View File

@ -0,0 +1,496 @@
use cm_dashboard_shared::{Status, Metric};
use std::collections::HashMap;
use std::time::Instant;
use tracing::{debug, info, error};
use serde::{Deserialize, Serialize};
use crate::notifications::{NotificationManager, StatusChange};
use chrono::Utc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HostStatusConfig {
pub enabled: bool,
pub aggregation_method: String, // "worst_case"
pub update_interval_seconds: u64,
pub notification_interval_seconds: u64,
}
impl Default for HostStatusConfig {
fn default() -> Self {
Self {
enabled: true,
aggregation_method: "worst_case".to_string(),
update_interval_seconds: 5,
notification_interval_seconds: 30,
}
}
}
pub struct StatusChangeEvent {
pub service_name: String,
pub old_status: Status,
pub new_status: Status,
pub host_status_changed: bool,
pub old_host_status: Status,
pub new_host_status: Status,
}
#[derive(Debug, Clone)]
pub struct StatusChangeSummary {
pub service_name: String,
pub initial_status: Status,
pub final_status: Status,
pub change_count: usize,
pub significant_change: bool, // true if needs notification
}
#[derive(Debug, Clone)]
pub struct AggregatedStatusChanges {
pub start_time: Instant,
pub end_time: Instant,
pub service_summaries: Vec<StatusChangeSummary>,
pub host_status_initial: Status,
pub host_status_final: Status,
pub requires_notification: bool,
}
pub struct HostStatusManager {
service_statuses: HashMap<String, Status>,
current_host_status: Status,
previous_host_status: Status,
last_status_change: Option<Instant>,
config: HostStatusConfig,
notification_manager: Option<NotificationManager>,
// Notification batching
pending_changes: HashMap<String, (Status, Status, usize)>, // service -> (initial_status, current_status, change_count)
batch_start_time: Option<Instant>,
batch_start_host_status: Status,
}
impl HostStatusManager {
pub fn new(config: HostStatusConfig) -> Self {
info!("Initializing HostStatusManager with config: {:?}", config);
Self {
service_statuses: HashMap::new(),
current_host_status: Status::Unknown,
previous_host_status: Status::Unknown,
last_status_change: None,
config,
notification_manager: None,
pending_changes: HashMap::new(),
batch_start_time: None,
batch_start_host_status: Status::Unknown,
}
}
/// Update the status of a specific service and recalculate host status
/// Updates real-time status and buffers changes for email notifications
pub fn update_service_status(&mut self, service: String, status: Status) {
if !self.config.enabled {
return;
}
let old_service_status = self.service_statuses.get(&service).copied().unwrap_or(Status::Unknown);
// Only proceed if status actually changed
if old_service_status == status {
return;
}
// Initialize batch if this is the first change
if self.batch_start_time.is_none() {
self.batch_start_time = Some(Instant::now());
self.batch_start_host_status = self.current_host_status;
debug!("Starting notification batch");
}
// Update real-time service status (for dashboard)
self.service_statuses.insert(service.clone(), status);
// Buffer change for email notifications
match self.pending_changes.entry(service.clone()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
// Service already has changes in this batch - update final status and increment count
let (initial_status, _current_status, change_count) = entry.get();
entry.insert((*initial_status, status, change_count + 1));
}
std::collections::hash_map::Entry::Vacant(entry) => {
// First change for this service in this batch
entry.insert((old_service_status, status, 1));
}
}
// Recalculate host status
let old_host_status = self.current_host_status;
self.previous_host_status = old_host_status;
self.current_host_status = self.calculate_host_status();
if old_host_status != self.current_host_status {
self.last_status_change = Some(Instant::now());
info!(
"Host status changed: {:?} -> {:?} (triggered by service '{}': {:?} -> {:?})",
old_host_status, self.current_host_status, service, old_service_status, status
);
}
debug!(
"Service status updated: {} {:?} -> {:?}, host status: {:?}, pending notifications: {}",
service, old_service_status, status, self.current_host_status, self.pending_changes.len()
);
}
/// Calculate the overall host status based on all service statuses
fn calculate_host_status(&self) -> Status {
if self.service_statuses.is_empty() {
return Status::Unknown;
}
match self.config.aggregation_method.as_str() {
"worst_case" => {
let statuses: Vec<Status> = self.service_statuses.values().copied().collect();
Status::aggregate(&statuses)
},
_ => {
debug!("Unknown aggregation method: {}, falling back to worst_case", self.config.aggregation_method);
let statuses: Vec<Status> = self.service_statuses.values().copied().collect();
Status::aggregate(&statuses)
}
}
}
/// Determine whether a notification should be sent for this status change
/// This implements the core logic: suppress individual recoveries, only notify on full host recovery
pub fn should_send_notification(&self, event: &StatusChangeEvent) -> bool {
if !self.config.enabled {
return false;
}
match (event.old_status, event.new_status) {
// Always notify on service problems (transitions TO warning/critical)
(_, Status::Warning) | (_, Status::Critical) => {
debug!("Notification approved: service '{}' transitioned to {:?}", event.service_name, event.new_status);
true
},
// Only notify on recovery if ALL services are OK (host status is OK)
(Status::Warning | Status::Critical | Status::Unknown, Status::Ok) => {
let should_notify = self.current_host_status == Status::Ok;
if should_notify {
info!("Recovery notification approved: service '{}' recovered and all services are OK", event.service_name);
} else {
debug!("Recovery notification suppressed: service '{}' recovered but other services still have issues (host status: {:?})",
event.service_name, self.current_host_status);
}
should_notify
},
// No notification for other transitions
_ => {
debug!("No notification needed for status transition: {:?} -> {:?}", event.old_status, event.new_status);
false
}
}
}
/// Get the current overall host status
pub fn get_host_status(&self) -> Status {
self.current_host_status
}
/// Check if all services are in OK status
pub fn all_services_ok(&self) -> bool {
!self.service_statuses.is_empty() &&
self.service_statuses.values().all(|s| *s == Status::Ok)
}
/// Get the current service statuses (for debugging/monitoring)
pub fn get_service_statuses(&self) -> &HashMap<String, Status> {
&self.service_statuses
}
/// Get the number of services in each status
pub fn get_status_summary(&self) -> (usize, usize, usize, usize, usize) {
let mut ok_count = 0;
let mut pending_count = 0;
let mut warning_count = 0;
let mut critical_count = 0;
let mut unknown_count = 0;
for status in self.service_statuses.values() {
match status {
Status::Ok => ok_count += 1,
Status::Pending => pending_count += 1,
Status::Warning => warning_count += 1,
Status::Critical => critical_count += 1,
Status::Unknown => unknown_count += 1,
}
}
(ok_count, pending_count, warning_count, critical_count, unknown_count)
}
/// Process a metric - updates status (notifications handled separately via batching)
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) {
// Just update status - notifications are handled by process_pending_notifications
self.update_service_status(metric.name.clone(), metric.status);
}
/// Process pending notifications - call this at notification intervals
pub async fn process_pending_notifications(&mut self, notification_manager: &mut crate::notifications::NotificationManager) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !self.config.enabled || self.pending_changes.is_empty() {
return Ok(());
}
let batch_start = self.batch_start_time.unwrap_or_else(Instant::now);
let batch_duration = batch_start.elapsed();
// Only process if enough time has passed
if batch_duration.as_secs() < self.config.notification_interval_seconds {
return Ok(());
}
// Create aggregated status changes
let aggregated = self.create_aggregated_changes();
if aggregated.requires_notification {
info!("Sending aggregated notification for {} service changes", aggregated.service_summaries.len());
// Send aggregated notification
if let Err(e) = self.send_aggregated_notification(&aggregated, notification_manager).await {
error!("Failed to send aggregated notification: {}", e);
}
} else {
debug!("No significant changes requiring notification in batch of {} changes", self.pending_changes.len());
}
// Clear the batch
self.clear_notification_batch();
Ok(())
}
/// Create aggregated status changes from pending buffer
fn create_aggregated_changes(&self) -> AggregatedStatusChanges {
let mut service_summaries = Vec::new();
let mut requires_notification = false;
for (service_name, (initial_status, final_status, change_count)) in &self.pending_changes {
let significant_change = self.is_significant_change(*initial_status, *final_status);
if significant_change {
requires_notification = true;
}
service_summaries.push(StatusChangeSummary {
service_name: service_name.clone(),
initial_status: *initial_status,
final_status: *final_status,
change_count: *change_count,
significant_change,
});
}
// Also check if host status change is significant
if self.is_significant_change(self.batch_start_host_status, self.current_host_status) {
requires_notification = true;
}
AggregatedStatusChanges {
start_time: self.batch_start_time.unwrap_or_else(Instant::now),
end_time: Instant::now(),
service_summaries,
host_status_initial: self.batch_start_host_status,
host_status_final: self.current_host_status,
requires_notification,
}
}
/// Check if a status change is significant enough for notification
fn is_significant_change(&self, old_status: Status, new_status: Status) -> bool {
match (old_status, new_status) {
// Always notify on problems
(_, Status::Warning) | (_, Status::Critical) => true,
// Only notify on recovery if it's from a problem state to OK and all services are OK
(Status::Warning | Status::Critical, Status::Ok) => self.current_host_status == Status::Ok,
// Don't notify on startup or other transitions
_ => false,
}
}
/// Send aggregated notification email
async fn send_aggregated_notification(
&self,
aggregated: &AggregatedStatusChanges,
notification_manager: &mut crate::notifications::NotificationManager,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create a summary status change for the notification system
let summary_change = StatusChange {
metric_name: "host_status_summary".to_string(),
old_status: aggregated.host_status_initial,
new_status: aggregated.host_status_final,
timestamp: Utc::now(),
details: Some(self.format_aggregated_details(aggregated)),
};
// Create a dummy metric for the notification
let summary_metric = Metric {
name: "host_status_summary".to_string(),
value: cm_dashboard_shared::MetricValue::String(format!("{} services changed", aggregated.service_summaries.len())),
status: aggregated.host_status_final,
timestamp: Utc::now().timestamp() as u64,
description: Some("Aggregated status summary".to_string()),
unit: None,
};
notification_manager.send_status_change_notification(summary_change, &summary_metric).await.map_err(|e| e.into())
}
/// Format details for aggregated notification
fn format_aggregated_details(&self, aggregated: &AggregatedStatusChanges) -> String {
let mut details = String::new();
details.push_str(&format!(
"Status Summary ({} to {})\n",
aggregated.start_time.elapsed().as_secs(),
0
));
if aggregated.host_status_initial != aggregated.host_status_final {
details.push_str(&format!(
"Host Status: {:?} → {:?}\n\n",
aggregated.host_status_initial,
aggregated.host_status_final
));
}
details.push_str("Service Changes:\n");
for summary in &aggregated.service_summaries {
if summary.significant_change {
let status_indicator = if summary.final_status == Status::Ok &&
(summary.initial_status == Status::Warning || summary.initial_status == Status::Critical) {
""
} else if summary.final_status == Status::Warning {
"🟡"
} else if summary.final_status == Status::Critical {
"🔴"
} else {
""
};
details.push_str(&format!(
" {} {}: {:?} → {:?}",
status_indicator,
summary.service_name,
summary.initial_status,
summary.final_status
));
if summary.change_count > 1 {
details.push_str(&format!(" ({} changes)", summary.change_count));
}
details.push('\n');
}
}
details
}
/// Clear the notification batch
fn clear_notification_batch(&mut self) {
self.pending_changes.clear();
self.batch_start_time = None;
self.batch_start_host_status = self.current_host_status;
debug!("Cleared notification batch");
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_manager() -> HostStatusManager {
let config = HostStatusConfig {
enabled: true,
aggregation_method: "worst_case".to_string(),
update_interval_seconds: 5,
};
HostStatusManager::new(config)
}
#[test]
fn test_initial_state() {
let manager = create_test_manager();
assert_eq!(manager.get_host_status(), Status::Unknown);
assert!(!manager.all_services_ok());
}
#[test]
fn test_single_service_status_update() {
let mut manager = create_test_manager();
let event = manager.update_service_status("test-service".to_string(), Status::Ok);
assert!(event.is_some());
let event = event.unwrap();
assert_eq!(event.service_name, "test-service");
assert_eq!(event.old_status, Status::Unknown);
assert_eq!(event.new_status, Status::Ok);
assert_eq!(event.new_host_status, Status::Ok);
assert!(event.host_status_changed);
assert_eq!(manager.get_host_status(), Status::Ok);
assert!(manager.all_services_ok());
}
#[test]
fn test_worst_case_aggregation() {
let mut manager = create_test_manager();
manager.update_service_status("service1".to_string(), Status::Ok);
assert_eq!(manager.get_host_status(), Status::Ok);
manager.update_service_status("service2".to_string(), Status::Warning);
assert_eq!(manager.get_host_status(), Status::Warning);
manager.update_service_status("service3".to_string(), Status::Critical);
assert_eq!(manager.get_host_status(), Status::Critical);
}
#[test]
#[ignore]
fn test_notification_logic() {
let mut manager = create_test_manager();
// Add a service in OK state
let event = manager.update_service_status("service1".to_string(), Status::Ok).unwrap();
assert!(!manager.should_send_notification(&event)); // No notification for unknown->ok
// Service goes to warning - should notify
let event = manager.update_service_status("service1".to_string(), Status::Warning).unwrap();
assert!(manager.should_send_notification(&event));
// Add another service in critical state - should notify
let event = manager.update_service_status("service2".to_string(), Status::Critical).unwrap();
assert!(manager.should_send_notification(&event));
// First service recovers, but second is still critical - should NOT notify
let event = manager.update_service_status("service1".to_string(), Status::Ok).unwrap();
assert!(!manager.should_send_notification(&event));
// Second service recovers, now all OK - should notify
let event = manager.update_service_status("service2".to_string(), Status::Ok).unwrap();
assert!(manager.should_send_notification(&event));
}
#[test]
fn test_disabled_manager() {
let config = HostStatusConfig {
enabled: false,
aggregation_method: "worst_case".to_string(),
update_interval_seconds: 5,
};
let mut manager = HostStatusManager::new(config);
let event = manager.update_service_status("test".to_string(), Status::Critical);
assert!(event.is_none());
// Even with disabled manager, we can still query current status
assert_eq!(manager.get_host_status(), Status::Unknown);
}
}

View File

@ -83,6 +83,7 @@ impl MetricValue {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum Status {
Ok,
Pending,
Warning,
Critical,
Unknown,
@ -179,6 +180,16 @@ impl HysteresisThresholds {
Status::Ok
}
}
Status::Pending => {
// Service transitioning, use normal thresholds like first measurement
if value >= self.critical_high {
Status::Critical
} else if value >= self.warning_high {
Status::Warning
} else {
Status::Ok
}
}
}
}
}