All checks were successful
Build and Release / build-and-release (push) Successful in 1m43s
Move network collection from NixOS collector to dedicated NetworkCollector. Add link status detection for physical interfaces (up/down). Group interfaces by physical/virtual, show status icons for physical NICs only. Down interfaces show as Inactive instead of Critical. Version bump to 0.1.165
294 lines
11 KiB
Rust
294 lines
11 KiB
Rust
use anyhow::Result;
|
|
use gethostname::gethostname;
|
|
use std::time::Duration;
|
|
use tokio::time::interval;
|
|
use tracing::{debug, error, info};
|
|
|
|
use crate::communication::{AgentCommand, ZmqHandler};
|
|
use crate::config::AgentConfig;
|
|
use crate::collectors::{
|
|
Collector,
|
|
backup::BackupCollector,
|
|
cpu::CpuCollector,
|
|
disk::DiskCollector,
|
|
memory::MemoryCollector,
|
|
network::NetworkCollector,
|
|
nixos::NixOSCollector,
|
|
systemd::SystemdCollector,
|
|
};
|
|
use crate::notifications::NotificationManager;
|
|
use cm_dashboard_shared::AgentData;
|
|
|
|
pub struct Agent {
|
|
hostname: String,
|
|
config: AgentConfig,
|
|
zmq_handler: ZmqHandler,
|
|
collectors: Vec<Box<dyn Collector>>,
|
|
notification_manager: NotificationManager,
|
|
previous_status: Option<SystemStatus>,
|
|
}
|
|
|
|
/// Track system component status for change detection
|
|
#[derive(Debug, Clone)]
|
|
struct SystemStatus {
|
|
cpu_load_status: cm_dashboard_shared::Status,
|
|
cpu_temperature_status: cm_dashboard_shared::Status,
|
|
memory_usage_status: cm_dashboard_shared::Status,
|
|
// Add more as needed
|
|
}
|
|
|
|
impl Agent {
|
|
pub async fn new(config_path: Option<String>) -> Result<Self> {
|
|
let hostname = gethostname().to_string_lossy().to_string();
|
|
info!("Initializing agent for host: {}", hostname);
|
|
|
|
// Load configuration (now required)
|
|
let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?;
|
|
let config = AgentConfig::from_file(&config_path)?;
|
|
|
|
info!("Agent configuration loaded");
|
|
|
|
// Initialize ZMQ communication
|
|
let zmq_handler = ZmqHandler::new(&config.zmq).await?;
|
|
info!(
|
|
"ZMQ communication initialized on port {}",
|
|
config.zmq.publisher_port
|
|
);
|
|
|
|
// Initialize collectors
|
|
let mut collectors: Vec<Box<dyn Collector>> = Vec::new();
|
|
|
|
// Add enabled collectors
|
|
if config.collectors.cpu.enabled {
|
|
collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone())));
|
|
}
|
|
|
|
if config.collectors.memory.enabled {
|
|
collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone())));
|
|
}
|
|
|
|
if config.collectors.disk.enabled {
|
|
collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone())));
|
|
}
|
|
|
|
if config.collectors.systemd.enabled {
|
|
collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone())));
|
|
}
|
|
|
|
if config.collectors.backup.enabled {
|
|
collectors.push(Box::new(BackupCollector::new()));
|
|
}
|
|
|
|
if config.collectors.network.enabled {
|
|
collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone())));
|
|
}
|
|
|
|
if config.collectors.nixos.enabled {
|
|
collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone())));
|
|
}
|
|
|
|
info!("Initialized {} collectors", collectors.len());
|
|
|
|
// Initialize notification manager
|
|
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
|
|
info!("Notification manager initialized");
|
|
|
|
Ok(Self {
|
|
hostname,
|
|
config,
|
|
zmq_handler,
|
|
collectors,
|
|
notification_manager,
|
|
previous_status: None,
|
|
})
|
|
}
|
|
|
|
/// Main agent loop with structured data collection
|
|
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
|
|
info!("Starting agent main loop");
|
|
|
|
// Initial collection
|
|
if let Err(e) = self.collect_and_broadcast().await {
|
|
error!("Initial metric collection failed: {}", e);
|
|
}
|
|
|
|
// Set up intervals
|
|
let mut transmission_interval = interval(Duration::from_secs(
|
|
self.config.collection_interval_seconds,
|
|
));
|
|
let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s
|
|
|
|
// Skip initial ticks to avoid immediate execution
|
|
transmission_interval.tick().await;
|
|
notification_interval.tick().await;
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = transmission_interval.tick() => {
|
|
if let Err(e) = self.collect_and_broadcast().await {
|
|
error!("Failed to collect and broadcast metrics: {}", e);
|
|
}
|
|
}
|
|
_ = notification_interval.tick() => {
|
|
// Process any pending notifications
|
|
// NOTE: With structured data, we might need to implement status tracking differently
|
|
// For now, we skip this until status evaluation is migrated
|
|
}
|
|
// Handle incoming commands (check periodically)
|
|
_ = tokio::time::sleep(Duration::from_millis(100)) => {
|
|
if let Err(e) = self.handle_commands().await {
|
|
error!("Error handling commands: {}", e);
|
|
}
|
|
}
|
|
_ = &mut shutdown_rx => {
|
|
info!("Shutdown signal received, stopping agent loop");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Agent main loop stopped");
|
|
Ok(())
|
|
}
|
|
|
|
/// Collect structured data from all collectors and broadcast via ZMQ
|
|
async fn collect_and_broadcast(&mut self) -> Result<()> {
|
|
debug!("Starting structured data collection");
|
|
|
|
// Initialize empty AgentData
|
|
let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
|
|
|
|
// Collect data from all collectors
|
|
for collector in &self.collectors {
|
|
if let Err(e) = collector.collect_structured(&mut agent_data).await {
|
|
error!("Collector failed: {}", e);
|
|
// Continue with other collectors even if one fails
|
|
}
|
|
}
|
|
|
|
// Check for status changes and send notifications
|
|
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
|
|
error!("Failed to check status changes: {}", e);
|
|
}
|
|
|
|
// Broadcast the structured data via ZMQ
|
|
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
|
|
error!("Failed to broadcast agent data: {}", e);
|
|
} else {
|
|
debug!("Successfully broadcast structured agent data");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Check for status changes and send notifications
|
|
async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> {
|
|
// Extract current status
|
|
let current_status = SystemStatus {
|
|
cpu_load_status: agent_data.system.cpu.load_status.clone(),
|
|
cpu_temperature_status: agent_data.system.cpu.temperature_status.clone(),
|
|
memory_usage_status: agent_data.system.memory.usage_status.clone(),
|
|
};
|
|
|
|
// Check for status changes
|
|
if let Some(previous) = self.previous_status.clone() {
|
|
self.check_and_notify_status_change(
|
|
"CPU Load",
|
|
&previous.cpu_load_status,
|
|
¤t_status.cpu_load_status,
|
|
format!("CPU load: {:.1}", agent_data.system.cpu.load_1min)
|
|
).await?;
|
|
|
|
self.check_and_notify_status_change(
|
|
"CPU Temperature",
|
|
&previous.cpu_temperature_status,
|
|
¤t_status.cpu_temperature_status,
|
|
format!("CPU temperature: {}°C",
|
|
agent_data.system.cpu.temperature_celsius.unwrap_or(0.0) as i32)
|
|
).await?;
|
|
|
|
self.check_and_notify_status_change(
|
|
"Memory Usage",
|
|
&previous.memory_usage_status,
|
|
¤t_status.memory_usage_status,
|
|
format!("Memory usage: {:.1}%", agent_data.system.memory.usage_percent)
|
|
).await?;
|
|
}
|
|
|
|
// Store current status for next comparison
|
|
self.previous_status = Some(current_status);
|
|
Ok(())
|
|
}
|
|
|
|
/// Check individual status change and send notification if degraded
|
|
async fn check_and_notify_status_change(
|
|
&mut self,
|
|
component: &str,
|
|
previous: &cm_dashboard_shared::Status,
|
|
current: &cm_dashboard_shared::Status,
|
|
details: String
|
|
) -> Result<()> {
|
|
use cm_dashboard_shared::Status;
|
|
|
|
// Only notify on status degradation (OK → Warning/Critical, Warning → Critical)
|
|
let should_notify = match (previous, current) {
|
|
(Status::Ok, Status::Warning) => true,
|
|
(Status::Ok, Status::Critical) => true,
|
|
(Status::Warning, Status::Critical) => true,
|
|
_ => false,
|
|
};
|
|
|
|
if should_notify {
|
|
let subject = format!("{} {} Alert", self.hostname, component);
|
|
let body = format!(
|
|
"Alert: {} status changed from {:?} to {:?}\n\nDetails: {}\n\nTime: {}",
|
|
component,
|
|
previous,
|
|
current,
|
|
details,
|
|
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
|
|
);
|
|
|
|
info!("Sending notification: {} - {:?} → {:?}", component, previous, current);
|
|
|
|
if let Err(e) = self.notification_manager.send_direct_email(&subject, &body).await {
|
|
error!("Failed to send notification for {}: {}", component, e);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Handle incoming commands from dashboard
|
|
async fn handle_commands(&mut self) -> Result<()> {
|
|
// Try to receive a command (non-blocking)
|
|
if let Ok(Some(command)) = self.zmq_handler.try_receive_command() {
|
|
info!("Received command: {:?}", command);
|
|
|
|
match command {
|
|
AgentCommand::CollectNow => {
|
|
info!("Received immediate collection request");
|
|
if let Err(e) = self.collect_and_broadcast().await {
|
|
error!("Failed to collect on demand: {}", e);
|
|
}
|
|
}
|
|
AgentCommand::SetInterval { seconds } => {
|
|
info!("Received interval change request: {}s", seconds);
|
|
// Note: This would require more complex handling to update the interval
|
|
// For now, just acknowledge
|
|
}
|
|
AgentCommand::ToggleCollector { name, enabled } => {
|
|
info!("Received collector toggle request: {} -> {}", name, enabled);
|
|
// Note: This would require more complex handling to enable/disable collectors
|
|
// For now, just acknowledge
|
|
}
|
|
AgentCommand::Ping => {
|
|
info!("Received ping command");
|
|
// Maybe send back a pong or status
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
} |