use anyhow::Result; use gethostname::gethostname; use std::time::Duration; use tokio::time::interval; use tracing::{debug, error, info}; use crate::communication::{AgentCommand, ServiceAction, ZmqHandler}; use crate::config::AgentConfig; use crate::metrics::MetricCollectionManager; use crate::notifications::NotificationManager; use crate::status::HostStatusManager; use cm_dashboard_shared::{CommandOutputMessage, Metric, MetricMessage, MetricValue, Status}; pub struct Agent { hostname: String, config: AgentConfig, zmq_handler: ZmqHandler, metric_manager: MetricCollectionManager, notification_manager: NotificationManager, host_status_manager: HostStatusManager, } impl Agent { pub async fn new(config_path: Option) -> Result { let hostname = gethostname().to_string_lossy().to_string(); info!("Initializing agent for host: {}", hostname); // Load configuration (now required) let config_path = config_path.ok_or_else(|| anyhow::anyhow!("Configuration file path is required"))?; let config = AgentConfig::from_file(&config_path)?; info!("Agent configuration loaded"); // Initialize ZMQ communication let zmq_handler = ZmqHandler::new(&config.zmq).await?; info!( "ZMQ communication initialized on port {}", config.zmq.publisher_port ); // Initialize metric collection manager with cache config let metric_manager = MetricCollectionManager::new(&config.collectors, &config).await?; info!("Metric collection manager initialized"); // Initialize notification manager let notification_manager = NotificationManager::new(&config.notifications, &hostname)?; info!("Notification manager initialized"); // Initialize host status manager let host_status_manager = HostStatusManager::new(config.status_aggregation.clone()); info!("Host status manager initialized"); Ok(Self { hostname, config, zmq_handler, metric_manager, notification_manager, host_status_manager, }) } pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> { info!("Starting agent main loop with separated collection and transmission"); // CRITICAL: Collect ALL data immediately at startup before entering the loop info!("Performing initial FORCE collection of all metrics at startup"); if let Err(e) = self.collect_all_metrics_force().await { error!("Failed to collect initial metrics: {}", e); } else { info!("Initial metric collection completed - all data cached and ready"); } // Separate intervals for collection and transmission let mut collection_interval = interval(Duration::from_secs(self.config.collection_interval_seconds)); let mut transmission_interval = interval(Duration::from_secs(1)); // ZMQ broadcast every 1 second let mut notification_interval = interval(Duration::from_secs(self.config.status_aggregation.notification_interval_seconds)); loop { tokio::select! { _ = collection_interval.tick() => { // Only collect and cache metrics, no ZMQ transmission if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics: {}", e); } } _ = transmission_interval.tick() => { // Send all metrics via ZMQ every 1 second if let Err(e) = self.broadcast_all_metrics().await { error!("Failed to broadcast metrics: {}", e); } } _ = notification_interval.tick() => { // Process batched notifications if let Err(e) = self.host_status_manager.process_pending_notifications(&mut self.notification_manager).await { error!("Failed to process pending notifications: {}", e); } } // Handle incoming commands (check periodically) _ = tokio::time::sleep(Duration::from_millis(100)) => { if let Err(e) = self.handle_commands().await { error!("Error handling commands: {}", e); } } _ = &mut shutdown_rx => { info!("Shutdown signal received, stopping agent loop"); break; } } } info!("Agent main loop stopped"); Ok(()) } async fn collect_all_metrics_force(&mut self) -> Result<()> { info!("Starting FORCE metric collection for startup"); // Force collect all metrics from all collectors immediately let metrics = self.metric_manager.collect_all_metrics_force().await?; if metrics.is_empty() { error!("No metrics collected during force collection!"); return Ok(()); } info!("Force collected and cached {} metrics", metrics.len()); // Process metrics through status manager self.process_metrics(&metrics).await; Ok(()) } async fn collect_metrics_only(&mut self) -> Result<()> { debug!("Starting metric collection cycle (cache only)"); // Collect all metrics from all collectors and cache them let metrics = self.metric_manager.collect_all_metrics().await?; if metrics.is_empty() { debug!("No metrics collected this cycle"); return Ok(()); } debug!("Collected and cached {} metrics", metrics.len()); // Process metrics through status manager self.process_metrics(&metrics).await; Ok(()) } async fn broadcast_all_metrics(&mut self) -> Result<()> { debug!("Broadcasting all metrics via ZMQ"); // Get all current metrics from collectors let mut metrics = self.metric_manager.collect_all_metrics().await?; // Add the host status summary metric from status manager let host_status_metric = self.host_status_manager.get_host_status_metric(); metrics.push(host_status_metric); // Add agent version metric for cross-host version comparison let version_metric = self.get_agent_version_metric(); metrics.push(version_metric); if metrics.is_empty() { debug!("No metrics to broadcast"); return Ok(()); } debug!("Broadcasting {} metrics (including host status summary)", metrics.len()); // Create and send message with all current data let message = MetricMessage::new(self.hostname.clone(), metrics); self.zmq_handler.publish_metrics(&message).await?; debug!("Metrics broadcasted successfully"); Ok(()) } async fn process_metrics(&mut self, metrics: &[Metric]) { for metric in metrics { self.host_status_manager.process_metric(metric, &mut self.notification_manager).await; } } /// Create agent version metric for cross-host version comparison fn get_agent_version_metric(&self) -> Metric { // Get version from executable path (same logic as main.rs get_version) let version = self.get_agent_version(); Metric::new( "agent_version".to_string(), MetricValue::String(version), Status::Ok, ) } /// Get agent version from executable path fn get_agent_version(&self) -> String { match std::env::current_exe() { Ok(exe_path) => { let exe_str = exe_path.to_string_lossy(); // Extract Nix store hash from path if let Some(hash_part) = exe_str.strip_prefix("/nix/store/") { if let Some(hash) = hash_part.split('-').next() { if hash.len() >= 8 { return hash[..8].to_string(); } } } "unknown".to_string() }, Err(_) => "unknown".to_string() } } async fn handle_commands(&mut self) -> Result<()> { // Try to receive commands (non-blocking) match self.zmq_handler.try_receive_command() { Ok(Some(command)) => { info!("Received command: {:?}", command); self.process_command(command).await?; } Ok(None) => { // No command available - this is normal } Err(e) => { error!("Error receiving command: {}", e); } } Ok(()) } async fn process_command(&mut self, command: AgentCommand) -> Result<()> { match command { AgentCommand::CollectNow => { info!("Processing CollectNow command"); if let Err(e) = self.collect_metrics_only().await { error!("Failed to collect metrics on command: {}", e); } } AgentCommand::SetInterval { seconds } => { info!("Processing SetInterval command: {} seconds", seconds); // Note: This would require modifying the interval, which is complex // For now, just log the request info!("Interval change requested but not implemented yet"); } AgentCommand::ToggleCollector { name, enabled } => { info!( "Processing ToggleCollector command: {} -> {}", name, enabled ); // Note: This would require dynamic collector management info!("Collector toggle requested but not implemented yet"); } AgentCommand::Ping => { info!("Processing Ping command - agent is alive"); // Could send a response back via ZMQ if needed } AgentCommand::ServiceControl { service_name, action } => { info!("Processing ServiceControl command: {} {:?}", service_name, action); if let Err(e) = self.handle_service_control(&service_name, &action).await { error!("Failed to execute service control: {}", e); } } AgentCommand::SystemRebuild { git_url, git_branch, working_dir, api_key_file } => { info!("Processing SystemRebuild command: {} @ {} -> {}", git_url, git_branch, working_dir); if let Err(e) = self.handle_system_rebuild(&git_url, &git_branch, &working_dir, api_key_file.as_deref()).await { error!("Failed to execute system rebuild: {}", e); } } } Ok(()) } /// Handle systemd service control commands async fn handle_service_control(&self, service_name: &str, action: &ServiceAction) -> Result<()> { let action_str = match action { ServiceAction::Start => "start", ServiceAction::Stop => "stop", ServiceAction::Restart => "restart", ServiceAction::Status => "status", }; info!("Executing systemctl {} {}", action_str, service_name); let output = tokio::process::Command::new("sudo") .arg("systemctl") .arg(action_str) .arg(service_name) .output() .await?; if output.status.success() { info!("Service {} {} completed successfully", service_name, action_str); if !output.stdout.is_empty() { debug!("stdout: {}", String::from_utf8_lossy(&output.stdout)); } } else { let stderr = String::from_utf8_lossy(&output.stderr); error!("Service {} {} failed: {}", service_name, action_str, stderr); return Err(anyhow::anyhow!("systemctl {} {} failed: {}", action_str, service_name, stderr)); } // Force refresh metrics after service control to update service status if matches!(action, ServiceAction::Start | ServiceAction::Stop | ServiceAction::Restart) { info!("Triggering metric refresh after service control"); // Note: We can't call self.collect_metrics_only() here due to borrowing issues // The next metric collection cycle will pick up the changes } Ok(()) } /// Handle NixOS system rebuild commands with real-time output streaming async fn handle_system_rebuild(&self, git_url: &str, git_branch: &str, working_dir: &str, api_key_file: Option<&str>) -> Result<()> { info!("Starting NixOS system rebuild: {} @ {} -> {}", git_url, git_branch, working_dir); let command_id = format!("rebuild_{}", chrono::Utc::now().timestamp()); // Send initial status self.send_command_output(&command_id, "SystemRebuild", "Starting NixOS system rebuild...").await?; // Enable maintenance mode before rebuild let maintenance_file = "/tmp/cm-maintenance"; if let Err(e) = tokio::fs::File::create(maintenance_file).await { self.send_command_output(&command_id, "SystemRebuild", &format!("Warning: Failed to create maintenance mode file: {}", e)).await?; } else { self.send_command_output(&command_id, "SystemRebuild", "Maintenance mode enabled").await?; } // Clone or update repository self.send_command_output(&command_id, "SystemRebuild", "Cloning/updating git repository...").await?; let git_result = self.ensure_git_repository_with_output(&command_id, git_url, git_branch, working_dir, api_key_file).await; if git_result.is_err() { self.send_command_output(&command_id, "SystemRebuild", &format!("Git operation failed: {:?}", git_result)).await?; self.send_command_output_complete(&command_id, "SystemRebuild").await?; return git_result; } self.send_command_output(&command_id, "SystemRebuild", "Git repository ready, starting nixos-rebuild...").await?; // Execute nixos-rebuild with real-time output streaming let rebuild_result = self.execute_nixos_rebuild_with_streaming(&command_id, working_dir).await; // Always try to remove maintenance mode file if let Err(e) = tokio::fs::remove_file(maintenance_file).await { if e.kind() != std::io::ErrorKind::NotFound { self.send_command_output(&command_id, "SystemRebuild", &format!("Warning: Failed to remove maintenance mode file: {}", e)).await?; } } else { self.send_command_output(&command_id, "SystemRebuild", "Maintenance mode disabled").await?; } // Handle rebuild result match rebuild_result { Ok(()) => { self.send_command_output(&command_id, "SystemRebuild", "✓ NixOS rebuild completed successfully!").await?; } Err(e) => { self.send_command_output(&command_id, "SystemRebuild", &format!("✗ NixOS rebuild failed: {}", e)).await?; } } // Signal completion self.send_command_output_complete(&command_id, "SystemRebuild").await?; info!("System rebuild streaming completed"); Ok(()) } /// Send command output line to dashboard async fn send_command_output(&self, command_id: &str, command_type: &str, output_line: &str) -> Result<()> { let message = CommandOutputMessage::new( self.hostname.clone(), command_id.to_string(), command_type.to_string(), output_line.to_string(), false, ); self.zmq_handler.publish_command_output(&message).await } /// Send command completion signal to dashboard async fn send_command_output_complete(&self, command_id: &str, command_type: &str) -> Result<()> { let message = CommandOutputMessage::new( self.hostname.clone(), command_id.to_string(), command_type.to_string(), "Command completed".to_string(), true, ); self.zmq_handler.publish_command_output(&message).await } /// Execute nixos-rebuild with real-time output streaming async fn execute_nixos_rebuild_with_streaming(&self, command_id: &str, working_dir: &str) -> Result<()> { use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; let mut child = Command::new("sudo") .arg("/run/current-system/sw/bin/nixos-rebuild") .arg("switch") .arg("--option") .arg("sandbox") .arg("false") .arg("--flake") .arg(".") .current_dir(working_dir) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn()?; // Get stdout and stderr handles let stdout = child.stdout.take().expect("Failed to get stdout"); let stderr = child.stderr.take().expect("Failed to get stderr"); // Create readers for both streams let stdout_reader = BufReader::new(stdout); let stderr_reader = BufReader::new(stderr); let mut stdout_lines = stdout_reader.lines(); let mut stderr_lines = stderr_reader.lines(); // Stream output lines in real-time loop { tokio::select! { // Read from stdout line = stdout_lines.next_line() => { match line { Ok(Some(line)) => { self.send_command_output(command_id, "SystemRebuild", &line).await?; } Ok(None) => { // stdout closed } Err(e) => { self.send_command_output(command_id, "SystemRebuild", &format!("stdout error: {}", e)).await?; } } } // Read from stderr line = stderr_lines.next_line() => { match line { Ok(Some(line)) => { self.send_command_output(command_id, "SystemRebuild", &line).await?; } Ok(None) => { // stderr closed } Err(e) => { self.send_command_output(command_id, "SystemRebuild", &format!("stderr error: {}", e)).await?; } } } // Wait for process completion result = child.wait() => { let status = result?; if status.success() { return Ok(()); } else { return Err(anyhow::anyhow!("nixos-rebuild exited with status: {}", status)); } } } } } /// Ensure git repository with output streaming async fn ensure_git_repository_with_output(&self, command_id: &str, git_url: &str, git_branch: &str, working_dir: &str, api_key_file: Option<&str>) -> Result<()> { // This is a simplified version - we can enhance this later with git output streaming self.ensure_git_repository(git_url, git_branch, working_dir, api_key_file).await } /// Ensure git repository is cloned and up to date with force clone approach async fn ensure_git_repository(&self, git_url: &str, git_branch: &str, working_dir: &str, api_key_file: Option<&str>) -> Result<()> { use std::path::Path; // Read API key if provided let auth_url = if let Some(key_file) = api_key_file { match tokio::fs::read_to_string(key_file).await { Ok(api_key) => { let api_key = api_key.trim(); if !api_key.is_empty() { // Convert https://gitea.cmtec.se/cm/nixosbox.git to https://token@gitea.cmtec.se/cm/nixosbox.git if git_url.starts_with("https://") { let url_without_protocol = &git_url[8..]; // Remove "https://" format!("https://{}@{}", api_key, url_without_protocol) } else { info!("API key provided but URL is not HTTPS, using original URL"); git_url.to_string() } } else { info!("API key file is empty, using original URL"); git_url.to_string() } } Err(e) => { info!("Could not read API key file {}: {}, using original URL", key_file, e); git_url.to_string() } } } else { git_url.to_string() }; // Always remove existing directory and do fresh clone for consistent state let working_path = Path::new(working_dir); if working_path.exists() { info!("Removing existing repository directory: {}", working_dir); if let Err(e) = tokio::fs::remove_dir_all(working_path).await { error!("Failed to remove existing directory: {}", e); return Err(anyhow::anyhow!("Failed to remove existing directory: {}", e)); } } info!("Force cloning git repository from {} (branch: {})", git_url, git_branch); // Force clone with depth 1 for efficiency (no history needed for deployment) let output = tokio::process::Command::new("git") .arg("clone") .arg("--depth") .arg("1") .arg("--branch") .arg(git_branch) .arg(&auth_url) .arg(working_dir) .output() .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); error!("Git clone failed: {}", stderr); return Err(anyhow::anyhow!("Git clone failed: {}", stderr)); } info!("Git repository cloned successfully with latest state"); Ok(()) } }