All checks were successful
Build and Release / build-and-release (push) Successful in 1m13s
- Separate dashboard updates from email notifications for immediate status aggregation - Add metric caching to MetricCollectionManager for instant dashboard updates - Dashboard now receives cached data every 1 second instead of waiting for collection intervals - Fix transmission to use cached metrics rather than triggering fresh collection - Email notifications maintain separate 60-second batching interval - Update configurable email notification aggregation interval
311 lines
12 KiB
Rust
311 lines
12 KiB
Rust
use anyhow::Result;
|
|
use gethostname::gethostname;
|
|
use std::time::Duration;
|
|
use tokio::time::interval;
|
|
use tracing::{debug, error, info};
|
|
|
|
use crate::communication::{AgentCommand, ServiceAction, ZmqHandler};
|
|
use crate::config::AgentConfig;
|
|
use crate::metrics::MetricCollectionManager;
|
|
use crate::notifications::NotificationManager;
|
|
use crate::status::HostStatusManager;
|
|
use cm_dashboard_shared::{Metric, MetricMessage, MetricValue, Status};
|
|
|
|
pub struct Agent {
|
|
hostname: String,
|
|
config: AgentConfig,
|
|
zmq_handler: ZmqHandler,
|
|
metric_manager: MetricCollectionManager,
|
|
notification_manager: NotificationManager,
|
|
host_status_manager: HostStatusManager,
|
|
}
|
|
|
|
impl Agent {
|
|
pub async fn new(config_path: Option<String>) -> Result<Self> {
|
|
let hostname = gethostname().to_string_lossy().to_string();
|
|
info!("Initializing agent for host: {}", hostname);
|
|
|
|
// Load configuration (now required)
|
|
let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?;
|
|
let config = AgentConfig::from_file(&config_path)?;
|
|
|
|
info!("Agent configuration loaded");
|
|
|
|
// Initialize ZMQ communication
|
|
let zmq_handler = ZmqHandler::new(&config.zmq).await?;
|
|
info!(
|
|
"ZMQ communication initialized on port {}",
|
|
config.zmq.publisher_port
|
|
);
|
|
|
|
// Initialize metric collection manager with cache config
|
|
let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?;
|
|
info!("Metric collection manager initialized");
|
|
|
|
// Initialize notification manager
|
|
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
|
|
info!("Notification manager initialized");
|
|
|
|
// Initialize host status manager
|
|
let host_status_manager = HostStatusManager::new(config.status_aggregation.clone());
|
|
info!("Host status manager initialized");
|
|
|
|
Ok(Self {
|
|
hostname,
|
|
config,
|
|
zmq_handler,
|
|
metric_manager,
|
|
notification_manager,
|
|
host_status_manager,
|
|
})
|
|
}
|
|
|
|
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
|
|
info!("Starting agent main loop with separated collection and transmission");
|
|
|
|
// CRITICAL: Collect ALL data immediately at startup before entering the loop
|
|
info!("Performing initial FORCE collection of all metrics at startup");
|
|
if let Err(e) = self.collect_all_metrics_force().await {
|
|
error!("Failed to collect initial metrics: {}", e);
|
|
} else {
|
|
info!("Initial metric collection completed - all data cached and ready");
|
|
}
|
|
|
|
// Separate intervals for collection, transmission, and email notifications
|
|
let mut collection_interval =
|
|
interval(Duration::from_secs(self.config.collection_interval_seconds));
|
|
let mut transmission_interval = interval(Duration::from_secs(self.config.zmq.transmission_interval_seconds));
|
|
let mut notification_interval = interval(Duration::from_secs(self.config.notifications.aggregation_interval_seconds));
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = collection_interval.tick() => {
|
|
// Only collect and cache metrics, no ZMQ transmission
|
|
if let Err(e) = self.collect_metrics_only().await {
|
|
error!("Failed to collect metrics: {}", e);
|
|
}
|
|
}
|
|
_ = transmission_interval.tick() => {
|
|
// Send all metrics via ZMQ (dashboard updates only)
|
|
if let Err(e) = self.broadcast_all_metrics().await {
|
|
error!("Failed to broadcast metrics: {}", e);
|
|
}
|
|
}
|
|
_ = notification_interval.tick() => {
|
|
// Process batched email notifications (separate from dashboard updates)
|
|
if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await {
|
|
error!("Failed to process pending notifications: {}", e);
|
|
}
|
|
}
|
|
// Handle incoming commands (check periodically)
|
|
_ = tokio::time::sleep(Duration::from_millis(100)) => {
|
|
if let Err(e) = self.handle_commands().await {
|
|
error!("Error handling commands: {}", e);
|
|
}
|
|
}
|
|
_ = &mut shutdown_rx => {
|
|
info!("Shutdown signal received, stopping agent loop");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Agent main loop stopped");
|
|
Ok(())
|
|
}
|
|
|
|
async fn collect_all_metrics_force(&mut self) -> Result<()> {
|
|
info!("Starting FORCE metric collection for startup");
|
|
|
|
// Force collect all metrics from all collectors immediately
|
|
let metrics = self.metric_manager.collect_all_metrics_force().await?;
|
|
|
|
if metrics.is_empty() {
|
|
error!("No metrics collected during force collection!");
|
|
return Ok(());
|
|
}
|
|
|
|
info!("Force collected and cached {} metrics", metrics.len());
|
|
|
|
// Process metrics through status manager (collect status data at startup)
|
|
let _status_changed = self.process_metrics(&metrics).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn collect_metrics_only(&mut self) -> Result<()> {
|
|
debug!("Starting metric collection cycle (cache only)");
|
|
|
|
// Collect all metrics from all collectors and cache them
|
|
let metrics = self.metric_manager.collect_all_metrics().await?;
|
|
|
|
if metrics.is_empty() {
|
|
debug!("No metrics collected this cycle");
|
|
return Ok(());
|
|
}
|
|
|
|
debug!("Collected and cached {} metrics", metrics.len());
|
|
|
|
// Process metrics through status manager and trigger immediate transmission if status changed
|
|
let status_changed = self.process_metrics(&metrics).await;
|
|
|
|
if status_changed {
|
|
info!("Status change detected - triggering immediate metric transmission");
|
|
if let Err(e) = self.broadcast_all_metrics().await {
|
|
error!("Failed to broadcast metrics after status change: {}", e);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn broadcast_all_metrics(&mut self) -> Result<()> {
|
|
debug!("Broadcasting cached metrics via ZMQ");
|
|
|
|
// Get cached metrics (no fresh collection)
|
|
let mut metrics = self.metric_manager.get_cached_metrics();
|
|
|
|
// Add the host status summary metric from status manager
|
|
let host_status_metric = self.host_status_manager.get_host_status_metric();
|
|
metrics.push(host_status_metric);
|
|
|
|
// Add agent version metric for cross-host version comparison
|
|
let version_metric = self.get_agent_version_metric();
|
|
metrics.push(version_metric);
|
|
|
|
if metrics.is_empty() {
|
|
debug!("No metrics to broadcast");
|
|
return Ok(());
|
|
}
|
|
|
|
debug!("Broadcasting {} cached metrics (including host status summary)", metrics.len());
|
|
|
|
// Create and send message with all current data
|
|
let message = MetricMessage::new(self.hostname.clone(), metrics);
|
|
self.zmq_handler.publish_metrics(&message).await?;
|
|
|
|
debug!("Metrics broadcasted successfully");
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_metrics(&mut self, metrics: &[Metric]) -> bool {
|
|
let mut status_changed = false;
|
|
for metric in metrics {
|
|
if self.host_status_manager.process_metric(metric, &mut self.notification_manager).await {
|
|
status_changed = true;
|
|
}
|
|
}
|
|
status_changed
|
|
}
|
|
|
|
/// Create agent version metric for cross-host version comparison
|
|
fn get_agent_version_metric(&self) -> Metric {
|
|
// Get version from executable path (same logic as main.rs get_version)
|
|
let version = self.get_agent_version();
|
|
|
|
Metric::new(
|
|
"agent_version".to_string(),
|
|
MetricValue::String(version),
|
|
Status::Ok,
|
|
)
|
|
}
|
|
|
|
/// Get agent version from Cargo package version
|
|
fn get_agent_version(&self) -> String {
|
|
// Use the version from Cargo.toml (e.g., "0.1.11")
|
|
format!("v{}", env!("CARGO_PKG_VERSION"))
|
|
}
|
|
|
|
async fn handle_commands(&mut self) -> Result<()> {
|
|
// Try to receive commands (non-blocking)
|
|
match self.zmq_handler.try_receive_command() {
|
|
Ok(Some(command)) => {
|
|
info!("Received command: {:?}", command);
|
|
self.process_command(command).await?;
|
|
}
|
|
Ok(None) => {
|
|
// No command available - this is normal
|
|
}
|
|
Err(e) => {
|
|
error!("Error receiving command: {}", e);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_command(&mut self, command: AgentCommand) -> Result<()> {
|
|
match command {
|
|
AgentCommand::CollectNow => {
|
|
info!("Processing CollectNow command");
|
|
if let Err(e) = self.collect_metrics_only().await {
|
|
error!("Failed to collect metrics on command: {}", e);
|
|
}
|
|
}
|
|
AgentCommand::SetInterval { seconds } => {
|
|
info!("Processing SetInterval command: {} seconds", seconds);
|
|
// Note: This would require modifying the interval, which is complex
|
|
// For now, just log the request
|
|
info!("Interval change requested but not implemented yet");
|
|
}
|
|
AgentCommand::ToggleCollector { name, enabled } => {
|
|
info!(
|
|
"Processing ToggleCollector command: {} -> {}",
|
|
name, enabled
|
|
);
|
|
// Note: This would require dynamic collector management
|
|
info!("Collector toggle requested but not implemented yet");
|
|
}
|
|
AgentCommand::Ping => {
|
|
info!("Processing Ping command - agent is alive");
|
|
// Could send a response back via ZMQ if needed
|
|
}
|
|
AgentCommand::ServiceControl { service_name, action } => {
|
|
info!("Processing ServiceControl command: {} {:?}", service_name, action);
|
|
if let Err(e) = self.handle_service_control(&service_name, &action).await {
|
|
error!("Failed to execute service control: {}", e);
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Handle systemd service control commands
|
|
async fn handle_service_control(&self, service_name: &str, action: &ServiceAction) -> Result<()> {
|
|
let action_str = match action {
|
|
ServiceAction::Start => "start",
|
|
ServiceAction::Stop => "stop",
|
|
ServiceAction::Restart => "restart",
|
|
ServiceAction::Status => "status",
|
|
};
|
|
|
|
info!("Executing systemctl {} {}", action_str, service_name);
|
|
|
|
let output = tokio::process::Command::new("sudo")
|
|
.arg("systemctl")
|
|
.arg(action_str)
|
|
.arg(service_name)
|
|
.output()
|
|
.await?;
|
|
|
|
if output.status.success() {
|
|
info!("Service {} {} completed successfully", service_name, action_str);
|
|
if !output.stdout.is_empty() {
|
|
debug!("stdout: {}", String::from_utf8_lossy(&output.stdout));
|
|
}
|
|
} else {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
error!("Service {} {} failed: {}", service_name, action_str, stderr);
|
|
return Err(anyhow::anyhow!("systemctl {} {} failed: {}", action_str, service_name, stderr));
|
|
}
|
|
|
|
// Force refresh metrics after service control to update service status
|
|
if matches!(action, ServiceAction::Start | ServiceAction::Stop | ServiceAction::Restart) {
|
|
info!("Triggering metric refresh after service control");
|
|
// Note: We can't call self.collect_metrics_only() here due to borrowing issues
|
|
// The next metric collection cycle will pick up the changes
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
} |