Testing
This commit is contained in:
@@ -10,7 +10,8 @@ async-trait = "0.1"
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
chrono = { version = "0.4", features = ["serde", "clock"] }
|
||||
chrono-tz = "0.8"
|
||||
thiserror = "1.0"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
|
||||
@@ -12,7 +12,6 @@ use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BackupCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub restic_repo: Option<String>,
|
||||
pub backup_service: String,
|
||||
@@ -21,13 +20,12 @@ pub struct BackupCollector {
|
||||
|
||||
impl BackupCollector {
|
||||
pub fn new(
|
||||
enabled: bool,
|
||||
_enabled: bool,
|
||||
interval_ms: u64,
|
||||
restic_repo: Option<String>,
|
||||
backup_service: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
restic_repo,
|
||||
backup_service,
|
||||
@@ -300,13 +298,6 @@ impl Collector for BackupCollector {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
false // Depends on restic repo permissions
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
// Try to get borgbackup metrics first, fall back to restic if not available
|
||||
@@ -383,9 +374,17 @@ impl Collector for BackupCollector {
|
||||
last_message: None,
|
||||
});
|
||||
|
||||
// Convert BackupStatus to standardized string format
|
||||
let status_string = match overall_status {
|
||||
BackupStatus::Healthy => "ok",
|
||||
BackupStatus::Warning => "warning",
|
||||
BackupStatus::Failed => "critical",
|
||||
BackupStatus::Unknown => "unknown",
|
||||
};
|
||||
|
||||
// Add disk information if available from borgbackup metrics
|
||||
let mut backup_json = json!({
|
||||
"overall_status": overall_status,
|
||||
"overall_status": status_string,
|
||||
"backup": backup_info,
|
||||
"service": service_data,
|
||||
"timestamp": Utc::now()
|
||||
@@ -407,7 +406,6 @@ impl Collector for BackupCollector {
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Backup,
|
||||
data: backup_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -457,39 +455,25 @@ struct JournalEntry {
|
||||
// Borgbackup metrics structure from backup script
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BorgbackupMetrics {
|
||||
backup_name: String,
|
||||
start_time: String,
|
||||
end_time: String,
|
||||
duration_seconds: i64,
|
||||
status: String,
|
||||
exit_codes: ExitCodes,
|
||||
repository: Repository,
|
||||
backup_disk: BackupDisk,
|
||||
timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ExitCodes {
|
||||
global: i32,
|
||||
backup: i32,
|
||||
prune: i32,
|
||||
compact: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Repository {
|
||||
total_archives: i32,
|
||||
latest_archive_size_bytes: i64,
|
||||
total_repository_size_bytes: i64,
|
||||
path: String,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BackupDisk {
|
||||
device: String,
|
||||
health: String,
|
||||
total_bytes: i64,
|
||||
used_bytes: i64,
|
||||
available_bytes: i64,
|
||||
usage_percent: f32,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -17,7 +16,6 @@ pub use cm_dashboard_shared::envelope::AgentType;
|
||||
pub struct CollectorOutput {
|
||||
pub agent_type: AgentType,
|
||||
pub data: Value,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -26,10 +24,4 @@ pub trait Collector: Send + Sync {
|
||||
fn agent_type(&self) -> AgentType;
|
||||
fn collect_interval(&self) -> Duration;
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError>;
|
||||
fn is_enabled(&self) -> bool {
|
||||
true
|
||||
}
|
||||
fn requires_root(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServiceCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub services: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
@@ -29,9 +28,8 @@ pub(crate) struct CpuSample {
|
||||
}
|
||||
|
||||
impl ServiceCollector {
|
||||
pub fn new(enabled: bool, interval_ms: u64, services: Vec<String>) -> Self {
|
||||
pub fn new(_enabled: bool, interval_ms: u64, services: Vec<String>) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
services,
|
||||
timeout_ms: 10000, // 10 second timeout for service checks
|
||||
@@ -409,6 +407,16 @@ impl ServiceCollector {
|
||||
}
|
||||
}
|
||||
|
||||
fn determine_cpu_temp_status(&self, temp_c: f32) -> String {
|
||||
if temp_c >= 80.0 {
|
||||
"critical".to_string()
|
||||
} else if temp_c >= 70.0 {
|
||||
"warning".to_string()
|
||||
} else {
|
||||
"ok".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn determine_services_status(&self, healthy: usize, degraded: usize, failed: usize) -> String {
|
||||
if failed > 0 {
|
||||
"critical".to_string()
|
||||
@@ -929,13 +937,6 @@ impl Collector for ServiceCollector {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
false // Most systemctl commands work without root
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
let mut services = Vec::new();
|
||||
@@ -1013,6 +1014,7 @@ impl Collector for ServiceCollector {
|
||||
|
||||
let cpu_cstate_info = self.get_cpu_cstate_info().await;
|
||||
let cpu_temp_c = self.get_cpu_temperature_c().await;
|
||||
let cpu_temp_status = cpu_temp_c.map(|temp| self.determine_cpu_temp_status(temp));
|
||||
let (gpu_load_percent, gpu_temp_c) = self.get_gpu_metrics().await;
|
||||
|
||||
// If no specific quotas are set, use system memory as reference
|
||||
@@ -1039,6 +1041,7 @@ impl Collector for ServiceCollector {
|
||||
"cpu_status": cpu_status,
|
||||
"cpu_cstate": cpu_cstate_info,
|
||||
"cpu_temp_c": cpu_temp_c,
|
||||
"cpu_temp_status": cpu_temp_status,
|
||||
"gpu_load_percent": gpu_load_percent,
|
||||
"gpu_temp_c": gpu_temp_c,
|
||||
},
|
||||
@@ -1049,7 +1052,6 @@ impl Collector for ServiceCollector {
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Service,
|
||||
data: service_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,16 +12,14 @@ use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SmartCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub devices: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl SmartCollector {
|
||||
pub fn new(enabled: bool, interval_ms: u64, devices: Vec<String>) -> Self {
|
||||
pub fn new(_enabled: bool, interval_ms: u64, devices: Vec<String>) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
devices,
|
||||
timeout_ms: 30000, // 30 second timeout for smartctl
|
||||
@@ -274,13 +272,6 @@ impl Collector for SmartCollector {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
true // smartctl typically requires root access
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
let mut drives = Vec::new();
|
||||
@@ -327,11 +318,11 @@ impl Collector for SmartCollector {
|
||||
let disk_usage = self.get_disk_usage().await?;
|
||||
|
||||
let status = if critical > 0 {
|
||||
"CRITICAL"
|
||||
"critical"
|
||||
} else if warning > 0 {
|
||||
"WARNING"
|
||||
"warning"
|
||||
} else {
|
||||
"HEALTHY"
|
||||
"ok"
|
||||
};
|
||||
|
||||
let smart_metrics = json!({
|
||||
@@ -352,7 +343,6 @@ impl Collector for SmartCollector {
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Smart,
|
||||
data: smart_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono_tz::Europe::Stockholm;
|
||||
use lettre::{Message, SmtpTransport, Transport};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{info, error, warn};
|
||||
@@ -81,15 +82,21 @@ impl NotificationManager {
|
||||
|
||||
fn should_notify(&mut self, change: &StatusChange) -> bool {
|
||||
if !self.config.enabled {
|
||||
info!("Notifications disabled, skipping {}.{}", change.component, change.metric);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Only notify on transitions to warning/critical, or recovery to ok
|
||||
match (change.old_status.as_str(), change.new_status.as_str()) {
|
||||
let should_send = match (change.old_status.as_str(), change.new_status.as_str()) {
|
||||
(_, "warning") | (_, "critical") => true,
|
||||
("warning" | "critical", "ok") => true,
|
||||
_ => false,
|
||||
}
|
||||
};
|
||||
|
||||
info!("Status change {}.{}: {} -> {} (notify: {})",
|
||||
change.component, change.metric, change.old_status, change.new_status, should_send);
|
||||
|
||||
should_send
|
||||
}
|
||||
|
||||
fn is_rate_limited(&mut self, change: &StatusChange) -> bool {
|
||||
@@ -98,11 +105,14 @@ impl NotificationManager {
|
||||
if let Some(last_time) = self.last_notification.get(&key) {
|
||||
let minutes_since = Utc::now().signed_duration_since(*last_time).num_minutes();
|
||||
if minutes_since < self.config.rate_limit_minutes as i64 {
|
||||
info!("Rate limiting {}.{}: {} minutes since last notification (limit: {})",
|
||||
change.component, change.metric, minutes_since, self.config.rate_limit_minutes);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
self.last_notification.insert(key, Utc::now());
|
||||
self.last_notification.insert(key.clone(), Utc::now());
|
||||
info!("Not rate limited {}.{}, sending notification", change.component, change.metric);
|
||||
false
|
||||
}
|
||||
|
||||
@@ -161,8 +171,8 @@ impl NotificationManager {
|
||||
change.metric,
|
||||
change.old_status,
|
||||
change.new_status,
|
||||
change.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
|
||||
Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
|
||||
change.timestamp.with_timezone(&Stockholm).format("%Y-%m-%d %H:%M:%S CET/CEST"),
|
||||
Utc::now().with_timezone(&Stockholm).format("%Y-%m-%d %H:%M:%S CET/CEST")
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ impl SimpleAgent {
|
||||
smtp_port: 25,
|
||||
from_email: format!("{}@cmtec.se", hostname),
|
||||
to_email: "cm@cmtec.se".to_string(),
|
||||
rate_limit_minutes: 30,
|
||||
rate_limit_minutes: 0, // Disabled for testing
|
||||
};
|
||||
let notification_manager = NotificationManager::new(notification_config.clone());
|
||||
info!("Notifications: {} -> {}", notification_config.from_email, notification_config.to_email);
|
||||
@@ -164,6 +164,7 @@ impl SimpleAgent {
|
||||
// Check CPU status
|
||||
if let Some(cpu_status) = summary.get("cpu_status").and_then(|v| v.as_str()) {
|
||||
if let Some(change) = self.notification_manager.update_status("system", "cpu", cpu_status) {
|
||||
info!("CPU status change detected: {} -> {}", change.old_status, change.new_status);
|
||||
self.notification_manager.send_notification(change).await;
|
||||
}
|
||||
}
|
||||
@@ -175,6 +176,14 @@ impl SimpleAgent {
|
||||
}
|
||||
}
|
||||
|
||||
// Check CPU temperature status
|
||||
if let Some(cpu_temp_status) = summary.get("cpu_temp_status").and_then(|v| v.as_str()) {
|
||||
if let Some(change) = self.notification_manager.update_status("system", "cpu_temp", cpu_temp_status) {
|
||||
info!("CPU temp status change detected: {} -> {}", change.old_status, change.new_status);
|
||||
self.notification_manager.send_notification(change).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Check services status
|
||||
if let Some(services_status) = summary.get("services_status").and_then(|v| v.as_str()) {
|
||||
if let Some(change) = self.notification_manager.update_status("system", "services", services_status) {
|
||||
|
||||
Reference in New Issue
Block a user