Implement unified notification system for all agents

- Replace hardcoded agent-specific notification logic with generic scanner
- Automatically detect all '_status' fields across all collectors recursively
- Send email notifications from hostname@cmtec.se to cm@cmtec.se for any status changes
- Include agent name, component, and source path in notification description
- Works identically for System, Service, Smart, Backup, and future collectors
- Supports nested objects and arrays for comprehensive monitoring
This commit is contained in:
Christoffer Martinsson 2025-10-14 23:10:15 +02:00
parent a64464142c
commit 407329657f

View File

@ -163,164 +163,48 @@ impl SimpleAgent {
}
async fn check_status_changes(&mut self, output: &crate::collectors::CollectorOutput) {
// Extract status from collector output and check for changes
match output.agent_type {
AgentType::Service => {
if let Some(summary) = output.data.get("summary") {
// Check services status
if let Some(services_status) = summary.get("services_status").and_then(|v| v.as_str()) {
let details = self.build_service_failure_details(output);
if let Some(change) = self.notification_manager.update_status_with_details("system", "services", services_status, details) {
self.notification_manager.send_notification(change).await;
// Generic status change detection for all agents
self.scan_for_status_changes(&output.data, &format!("{:?}", output.agent_type)).await;
}
async fn scan_for_status_changes(&mut self, data: &serde_json::Value, agent_name: &str) {
// Recursively scan JSON for any field ending in "_status"
self.scan_object_for_status(data, agent_name, "").await;
}
async fn scan_object_for_status(&mut self, value: &serde_json::Value, agent_name: &str, path: &str) {
match value {
serde_json::Value::Object(obj) => {
for (key, val) in obj {
let current_path = if path.is_empty() { key.clone() } else { format!("{}.{}", path, key) };
if key.ends_with("_status") && val.is_string() {
// Found a status field - check for changes
if let Some(status) = val.as_str() {
let component = agent_name.to_lowercase();
let metric = key.trim_end_matches("_status");
let description = format!("Agent: {}, Component: {}, Source: {}", agent_name, component, current_path);
if let Some(change) = self.notification_manager.update_status_with_details(&component, metric, status, Some(description)) {
info!("Status change: {} {} -> {}", current_path, change.old_status, change.new_status);
self.notification_manager.send_notification(change).await;
}
}
} else {
// Recursively scan nested objects
self.scan_object_for_status(val, agent_name, &current_path).await;
}
}
}
AgentType::Smart => {
if let Some(status) = output.data.get("status").and_then(|v| v.as_str()) {
let normalized_status = match status {
"HEALTHY" => "ok",
"WARNING" => "warning",
"CRITICAL" => "critical",
_ => "unknown"
};
if let Some(change) = self.notification_manager.update_status("storage", "smart", normalized_status) {
self.notification_manager.send_notification(change).await;
}
}
}
AgentType::System => {
if let Some(summary) = output.data.get("summary") {
// Check CPU status
if let Some(cpu_status) = summary.get("cpu_status").and_then(|v| v.as_str()) {
let cpu_details = self.build_cpu_details(summary);
if let Some(change) = self.notification_manager.update_status_with_details("system", "cpu", cpu_status, cpu_details) {
info!("CPU status change detected: {} -> {}", change.old_status, change.new_status);
self.notification_manager.send_notification(change).await;
}
}
// Check memory status
if let Some(memory_status) = summary.get("memory_status").and_then(|v| v.as_str()) {
let memory_details = self.build_memory_details(summary);
if let Some(change) = self.notification_manager.update_status_with_details("system", "memory", memory_status, memory_details) {
info!("Memory status change detected: {} -> {}", change.old_status, change.new_status);
self.notification_manager.send_notification(change).await;
}
}
// Check CPU temp status (optional)
if let Some(cpu_temp_status) = summary.get("cpu_temp_status").and_then(|v| v.as_str()) {
let temp_details = self.build_cpu_temp_details(summary);
if let Some(change) = self.notification_manager.update_status_with_details("system", "cpu_temp", cpu_temp_status, temp_details) {
info!("CPU temp status change detected: {} -> {}", change.old_status, change.new_status);
self.notification_manager.send_notification(change).await;
}
}
}
}
AgentType::Backup => {
if let Some(status) = output.data.get("overall_status") {
let status_str = match status.as_str() {
Some("Healthy") => "ok",
Some("Warning") => "warning",
Some("Failed") => "critical",
_ => "unknown"
};
if let Some(change) = self.notification_manager.update_status("backup", "overall", status_str) {
self.notification_manager.send_notification(change).await;
}
serde_json::Value::Array(arr) => {
// Scan array elements for individual item status tracking
for (index, item) in arr.iter().enumerate() {
let item_path = format!("{}[{}]", path, index);
self.scan_object_for_status(item, agent_name, &item_path).await;
}
}
_ => {}
}
}
fn build_cpu_details(&self, summary: &serde_json::Value) -> Option<String> {
let cpu_load_1 = summary.get("cpu_load_1").and_then(|v| v.as_f64()).unwrap_or(0.0);
let cpu_load_5 = summary.get("cpu_load_5").and_then(|v| v.as_f64()).unwrap_or(0.0);
let cpu_load_15 = summary.get("cpu_load_15").and_then(|v| v.as_f64()).unwrap_or(0.0);
Some(format!("CPU load (1/5/15min): {:.2} / {:.2} / {:.2}", cpu_load_1, cpu_load_5, cpu_load_15))
}
fn build_memory_details(&self, summary: &serde_json::Value) -> Option<String> {
let used_mb = summary.get("memory_used_mb").and_then(|v| v.as_f64()).unwrap_or(0.0);
let total_mb = summary.get("memory_total_mb").and_then(|v| v.as_f64()).unwrap_or(1.0);
let usage_percent = summary.get("memory_usage_percent").and_then(|v| v.as_f64()).unwrap_or(0.0);
Some(format!("Memory usage: {:.1} / {:.1} GB ({:.1}%)", used_mb / 1024.0, total_mb / 1024.0, usage_percent))
}
fn build_cpu_temp_details(&self, summary: &serde_json::Value) -> Option<String> {
if let Some(temp_c) = summary.get("cpu_temp_c").and_then(|v| v.as_f64()) {
Some(format!("CPU temperature: {:.1}°C", temp_c))
} else {
None
}
}
fn build_service_failure_details(&self, output: &crate::collectors::CollectorOutput) -> Option<String> {
if let Some(services) = output.data.get("services").and_then(|v| v.as_array()) {
let mut failed_services = Vec::new();
let mut degraded_services = Vec::new();
for service in services {
if let (Some(name), Some(status)) = (
service.get("name").and_then(|v| v.as_str()),
service.get("status").and_then(|v| v.as_str())
) {
match status {
"Stopped" => {
let memory = service.get("memory_used_mb")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let disk = service.get("disk_used_gb")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
failed_services.push(format!("{} (stopped, was using {:.1}MB RAM, {:.1}GB disk)",
name, memory, disk));
},
"Degraded" | "Restarting" => {
let memory = service.get("memory_used_mb")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let disk = service.get("disk_used_gb")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
degraded_services.push(format!("{} ({}, using {:.1}MB RAM, {:.1}GB disk)",
name, status.to_lowercase(), memory, disk));
},
_ => {}
}
}
}
if !failed_services.is_empty() || !degraded_services.is_empty() {
let mut details = String::new();
if !failed_services.is_empty() {
details.push_str("Failed services:\n");
for service in &failed_services {
details.push_str(&format!("- {}\n", service));
}
}
if !degraded_services.is_empty() {
if !details.is_empty() {
details.push('\n');
}
details.push_str("Degraded services:\n");
for service in &degraded_services {
details.push_str(&format!("- {}\n", service));
}
}
Some(details.trim_end().to_string())
} else {
None
}
} else {
None
}
}
}