From d9edcda36cb6a4a6b12d446b2bd16c18afc181c1 Mon Sep 17 00:00:00 2001 From: Christoffer Martinsson Date: Sun, 12 Oct 2025 20:29:08 +0200 Subject: [PATCH] Testing --- Cargo.lock | 1 - agent/Cargo.toml | 1 - agent/src/agent.rs | 263 ------------------------ agent/src/collectors/mod.rs | 2 +- agent/src/config.rs | 325 ----------------------------- agent/src/discovery.rs | 39 +--- agent/src/main.rs | 226 ++++----------------- agent/src/notifications.rs | 3 +- agent/src/scheduler.rs | 393 ------------------------------------ agent/src/simple_agent.rs | 213 +++++++++++++++++++ 10 files changed, 263 insertions(+), 1203 deletions(-) delete mode 100644 agent/src/agent.rs delete mode 100644 agent/src/config.rs delete mode 100644 agent/src/scheduler.rs create mode 100644 agent/src/simple_agent.rs diff --git a/Cargo.lock b/Cargo.lock index 50e9aa1..89c34f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,7 +308,6 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "toml", "tracing", "tracing-appender", "tracing-subscriber", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 5193681..5d152a8 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -12,7 +12,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } thiserror = "1.0" -toml = "0.8" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tracing-appender = "0.2" diff --git a/agent/src/agent.rs b/agent/src/agent.rs deleted file mode 100644 index 76f65cb..0000000 --- a/agent/src/agent.rs +++ /dev/null @@ -1,263 +0,0 @@ -use cm_dashboard_shared::envelope::MessageEnvelope; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::time::{interval, Duration}; -use tracing::{debug, error, info, warn}; -use zmq::{Context, SocketType}; - -use crate::collectors::{ - backup::BackupCollector, service::ServiceCollector, smart::SmartCollector, AgentType, - CollectorOutput, -}; -use crate::config::AgentConfig; -use crate::scheduler::{CollectorScheduler, HealthChecker, HealthStatus}; - -pub struct MetricsAgent { - config: AgentConfig, - scheduler: CollectorScheduler, - health_checker: Option, -} - -impl MetricsAgent { - pub fn from_config(config: AgentConfig) -> Result> { - let mut agent = Self::new(config)?; - agent.initialize_collectors()?; - Ok(agent) - } - - pub fn new(config: AgentConfig) -> Result> { - Ok(Self { - config, - scheduler: CollectorScheduler::new(), - health_checker: None, - }) - } - - pub fn initialize_collectors(&mut self) -> Result<(), Box> { - info!("Initializing collectors..."); - - // Create SMART collector - if self.config.collectors.smart.enabled { - let smart_collector = SmartCollector::new( - self.config.collectors.smart.enabled, - self.config.collectors.smart.interval_ms, - self.config.collectors.smart.devices.clone(), - ); - self.scheduler.add_collector(Arc::new(smart_collector)); - info!("SMART collector initialized"); - } - - // Create Service collector - if self.config.collectors.service.enabled { - let service_collector = ServiceCollector::new( - self.config.collectors.service.enabled, - self.config.collectors.service.interval_ms, - self.config.collectors.service.services.clone(), - ); - self.scheduler.add_collector(Arc::new(service_collector)); - info!("Service collector initialized"); - } - - // Create Backup collector - if self.config.collectors.backup.enabled { - let backup_collector = BackupCollector::new( - self.config.collectors.backup.enabled, - self.config.collectors.backup.interval_ms, - self.config.collectors.backup.restic_repo.clone(), - self.config.collectors.backup.backup_service.clone(), - ); - self.scheduler.add_collector(Arc::new(backup_collector)); - info!("Backup collector initialized"); - } - - let enabled_count = self.config.get_enabled_collector_count(); - if enabled_count == 0 { - return Err("No collectors are enabled".into()); - } - - info!("Initialized {} collectors", enabled_count); - Ok(()) - } - - pub async fn run(&mut self) -> Result<(), Box> { - info!( - "Starting metrics agent for host '{}'", - self.config.agent.hostname - ); - - // Initialize health checker - let stats = self.scheduler.get_stats_handle(); - self.health_checker = Some(HealthChecker::new(stats)); - - // Forward successful collection results to the publisher - let (metrics_tx, metrics_rx) = mpsc::unbounded_channel(); - self.scheduler.set_metrics_sender(metrics_tx); - let publisher_task = self.start_publisher_task(metrics_rx)?; - - // Start health monitoring task - let health_task = self.start_health_monitoring_task().await?; - - // Start the collector scheduler (this will block) - let scheduler_result = self.scheduler.start().await; - - // Drop the metrics sender so the publisher can exit cleanly - self.scheduler.clear_metrics_sender(); - - // Wait for background tasks to complete - if let Err(join_error) = health_task.await { - warn!("Health monitoring task ended unexpectedly: {}", join_error); - } - - if let Err(join_error) = publisher_task.await { - warn!("Publisher task ended unexpectedly: {}", join_error); - } - - match scheduler_result { - Ok(_) => { - info!("Agent shutdown completed successfully"); - Ok(()) - } - Err(e) => { - error!("Agent encountered an error: {}", e); - Err(e.into()) - } - } - } - - fn start_publisher_task( - &self, - mut metrics_rx: mpsc::UnboundedReceiver, - ) -> Result, Box> { - let bind_address = format!( - "tcp://{}:{}", - self.config.zmq.bind_address, self.config.zmq.port - ); - let send_timeout = self.config.zmq.send_timeout_ms as i32; - let hostname = self.config.agent.hostname.clone(); - - let handle = tokio::spawn(async move { - let context = Context::new(); - - let socket = match context.socket(SocketType::PUB) { - Ok(socket) => socket, - Err(error) => { - error!("Failed to create ZMQ PUB socket: {}", error); - return; - } - }; - - if let Err(error) = socket.set_sndtimeo(send_timeout) { - warn!("Failed to apply ZMQ send timeout: {}", error); - } - - if let Err(error) = socket.bind(&bind_address) { - error!( - "Failed to bind ZMQ publisher to {}: {}", - bind_address, error - ); - return; - } - - info!("ZMQ publisher bound to {}", bind_address); - - while let Some(output) = metrics_rx.recv().await { - let CollectorOutput { - agent_type, - data, - timestamp, - } = output; - - let envelope_agent_type = match agent_type { - AgentType::Smart => cm_dashboard_shared::envelope::AgentType::Smart, - AgentType::Service => cm_dashboard_shared::envelope::AgentType::Service, - AgentType::Backup => cm_dashboard_shared::envelope::AgentType::Backup, - }; - - let epoch = timestamp.timestamp(); - let epoch_u64 = if epoch < 0 { 0 } else { epoch as u64 }; - - let envelope = MessageEnvelope { - hostname: hostname.clone(), - agent_type: envelope_agent_type.clone(), - timestamp: epoch_u64, - metrics: data, - }; - - match serde_json::to_vec(&envelope) { - Ok(serialized) => { - if let Err(error) = socket.send(serialized, 0) { - warn!( - "Failed to publish {:?} metrics: {}", - envelope.agent_type, error - ); - } else { - debug!( - "Published {:?} metrics for host {}", - envelope.agent_type, envelope.hostname - ); - } - } - Err(error) => { - warn!("Failed to serialize metrics envelope: {}", error); - } - } - } - - info!("Metrics publisher task shutting down"); - }); - - Ok(handle) - } - - async fn start_health_monitoring_task( - &self, - ) -> Result, Box> { - let health_checker = self.health_checker.as_ref().unwrap().clone(); - - let task = tokio::spawn(async move { - info!("Starting health monitoring task"); - let mut health_interval = interval(Duration::from_secs(60)); // Check every minute - - loop { - health_interval.tick().await; - - match health_checker.check_health().await { - HealthStatus::Healthy => { - debug!("All collectors are healthy"); - } - HealthStatus::Degraded { - degraded_collectors, - } => { - warn!("Degraded collectors: {:?}", degraded_collectors); - } - HealthStatus::Unhealthy { - unhealthy_collectors, - degraded_collectors, - } => { - error!( - "Unhealthy collectors: {:?}, Degraded: {:?}", - unhealthy_collectors, degraded_collectors - ); - } - } - } - }); - - Ok(task) - } - - pub async fn shutdown(&self) { - info!("Initiating graceful shutdown..."); - self.scheduler.shutdown().await; - - // ZMQ socket will be dropped automatically - - info!("Agent shutdown completed"); - } -} - -impl Drop for MetricsAgent { - fn drop(&mut self) { - // ZMQ socket will be dropped automatically - } -} diff --git a/agent/src/collectors/mod.rs b/agent/src/collectors/mod.rs index b81d03c..373d564 100644 --- a/agent/src/collectors/mod.rs +++ b/agent/src/collectors/mod.rs @@ -10,7 +10,7 @@ pub mod smart; pub use error::CollectorError; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize)] pub enum AgentType { Smart, Service, diff --git a/agent/src/config.rs b/agent/src/config.rs deleted file mode 100644 index 8beb7b8..0000000 --- a/agent/src/config.rs +++ /dev/null @@ -1,325 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::path::Path; -use tokio::fs; -use tracing::info; - -use crate::collectors::CollectorError; -use crate::discovery::AutoDiscovery; -use crate::notifications::NotificationConfig; - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AgentConfig { - pub agent: AgentSettings, - pub zmq: ZmqSettings, - pub collectors: CollectorsConfig, - #[serde(default)] - pub notifications: NotificationConfig, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AgentSettings { - pub hostname: String, - pub log_level: String, - pub metrics_buffer_size: usize, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct ZmqSettings { - pub port: u16, - pub bind_address: String, - pub send_timeout_ms: u64, - pub receive_timeout_ms: u64, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct CollectorsConfig { - pub smart: SmartCollectorConfig, - pub service: ServiceCollectorConfig, - pub backup: BackupCollectorConfig, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct SmartCollectorConfig { - pub enabled: bool, - pub interval_ms: u64, - pub devices: Vec, - pub timeout_ms: u64, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct ServiceCollectorConfig { - pub enabled: bool, - pub interval_ms: u64, - pub services: Vec, - pub timeout_ms: u64, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct BackupCollectorConfig { - pub enabled: bool, - pub interval_ms: u64, - pub restic_repo: Option, - pub backup_service: String, - pub timeout_ms: u64, -} - -impl Default for AgentConfig { - fn default() -> Self { - Self { - agent: AgentSettings { - hostname: gethostname::gethostname().to_string_lossy().to_string(), - log_level: "info".to_string(), - metrics_buffer_size: 1000, - }, - zmq: ZmqSettings { - port: 6130, - bind_address: "0.0.0.0".to_string(), - send_timeout_ms: 5000, - receive_timeout_ms: 5000, - }, - collectors: CollectorsConfig { - smart: SmartCollectorConfig { - enabled: true, - interval_ms: 5000, - devices: vec!["nvme0n1".to_string()], - timeout_ms: 30000, - }, - service: ServiceCollectorConfig { - enabled: true, - interval_ms: 5000, - services: vec![ - "gitea".to_string(), - "immich".to_string(), - "vaultwarden".to_string(), - "unifi".to_string(), - ], - timeout_ms: 10000, - }, - backup: BackupCollectorConfig { - enabled: true, - interval_ms: 30000, - restic_repo: None, - backup_service: "restic-backup".to_string(), - timeout_ms: 30000, - }, - }, - } - } -} - -impl AgentConfig { - pub async fn load_from_file>(path: P) -> Result { - let content = fs::read_to_string(path) - .await - .map_err(|e| CollectorError::ConfigError { - message: format!("Failed to read config file: {}", e), - })?; - - let config: AgentConfig = - toml::from_str(&content).map_err(|e| CollectorError::ConfigError { - message: format!("Failed to parse config file: {}", e), - })?; - - config.validate()?; - Ok(config) - } - - pub async fn save_to_file>(&self, path: P) -> Result<(), CollectorError> { - let content = toml::to_string_pretty(self).map_err(|e| CollectorError::ConfigError { - message: format!("Failed to serialize config: {}", e), - })?; - - fs::write(path, content) - .await - .map_err(|e| CollectorError::ConfigError { - message: format!("Failed to write config file: {}", e), - })?; - - Ok(()) - } - - pub fn validate(&self) -> Result<(), CollectorError> { - // Validate ZMQ settings - if self.zmq.port == 0 { - return Err(CollectorError::ConfigError { - message: "ZMQ port cannot be 0".to_string(), - }); - } - - // Validate collector intervals - if self.collectors.smart.enabled && self.collectors.smart.interval_ms < 1000 { - return Err(CollectorError::ConfigError { - message: "SMART collector interval must be at least 1000ms".to_string(), - }); - } - - if self.collectors.service.enabled && self.collectors.service.interval_ms < 500 { - return Err(CollectorError::ConfigError { - message: "Service collector interval must be at least 500ms".to_string(), - }); - } - - if self.collectors.backup.enabled && self.collectors.backup.interval_ms < 5000 { - return Err(CollectorError::ConfigError { - message: "Backup collector interval must be at least 5000ms".to_string(), - }); - } - - // Validate smart devices - if self.collectors.smart.enabled && self.collectors.smart.devices.is_empty() { - return Err(CollectorError::ConfigError { - message: "SMART collector requires at least one device".to_string(), - }); - } - - // Validate services - if self.collectors.service.enabled && self.collectors.service.services.is_empty() { - return Err(CollectorError::ConfigError { - message: "Service collector requires at least one service".to_string(), - }); - } - - // Validate backup configuration - if self.collectors.backup.enabled { - if self.collectors.backup.restic_repo.is_none() { - tracing::warn!("Backup collector enabled but no restic repository configured"); - } - if self.collectors.backup.backup_service.is_empty() { - return Err(CollectorError::ConfigError { - message: "Backup collector requires a backup service name".to_string(), - }); - } - } - - Ok(()) - } - - pub fn get_enabled_collector_count(&self) -> usize { - let mut count = 0; - if self.collectors.smart.enabled { - count += 1; - } - if self.collectors.service.enabled { - count += 1; - } - if self.collectors.backup.enabled { - count += 1; - } - count - } - - pub async fn auto_configure(&mut self) -> Result<(), CollectorError> { - let hostname = &self.agent.hostname.clone(); - info!("Auto-configuring agent for host: {}", hostname); - - // Auto-detect storage devices - let devices = AutoDiscovery::discover_storage_devices().await; - let valid_devices = AutoDiscovery::validate_devices(&devices).await; - - if !valid_devices.is_empty() { - self.collectors.smart.devices = valid_devices; - info!( - "Auto-detected storage devices: {:?}", - self.collectors.smart.devices - ); - } else { - info!("No accessible storage devices found, disabling SMART collector"); - self.collectors.smart.enabled = false; - } - - // Auto-detect services - let services = AutoDiscovery::discover_services().await; - if !services.is_empty() { - self.collectors.service.services = services; - info!( - "Auto-detected services: {:?}", - self.collectors.service.services - ); - } else { - info!("No monitorable services found, using minimal service list"); - self.collectors.service.services = vec!["ssh".to_string()]; - } - - // Auto-detect backup configuration - let (backup_enabled, restic_repo, backup_service) = - AutoDiscovery::discover_backup_config(hostname).await; - - self.collectors.backup.enabled = backup_enabled; - self.collectors.backup.restic_repo = restic_repo; - self.collectors.backup.backup_service = backup_service; - - if backup_enabled { - info!( - "Auto-configured backup monitoring: repo={:?}, service={}", - self.collectors.backup.restic_repo, self.collectors.backup.backup_service - ); - } else { - info!("Backup monitoring disabled for this host"); - } - - // Auto-configure notifications - self.notifications.enabled = true; - self.notifications.from_email = format!("{}@cmtec.se", hostname); - self.notifications.to_email = "cm@cmtec.se".to_string(); - info!("Auto-configured notifications: {} -> {}", - self.notifications.from_email, self.notifications.to_email); - - // Apply host-specific timing optimizations - self.apply_host_timing_overrides(hostname); - - Ok(()) - } - - fn apply_host_timing_overrides(&mut self, hostname: &str) { - match hostname { - "srv01" => { - // Server host - standard 5 second monitoring - self.collectors.service.interval_ms = 5000; - self.collectors.smart.interval_ms = 5000; - } - "cmbox" | "labbox" | "simonbox" | "steambox" => { - // Workstation hosts - standard 5 second monitoring - self.collectors.smart.interval_ms = 5000; - self.collectors.service.interval_ms = 5000; - } - _ => { - // Unknown host - standard 5 second monitoring - self.collectors.smart.interval_ms = 5000; - self.collectors.service.interval_ms = 5000; - } - } - - info!( - "Applied timing overrides for {}: smart={}ms, service={}ms", - hostname, self.collectors.smart.interval_ms, self.collectors.service.interval_ms - ); - } - - pub fn summary(&self) -> String { - let mut parts = Vec::new(); - - if self.collectors.smart.enabled { - parts.push(format!( - "SMART({} devices)", - self.collectors.smart.devices.len() - )); - } - - if self.collectors.service.enabled { - parts.push(format!( - "Services({} monitored)", - self.collectors.service.services.len() - )); - } - - if self.collectors.backup.enabled { - parts.push("Backup".to_string()); - } - - if parts.is_empty() { - "No collectors enabled".to_string() - } else { - parts.join(", ") - } - } -} diff --git a/agent/src/discovery.rs b/agent/src/discovery.rs index 62a777a..2c87968 100644 --- a/agent/src/discovery.rs +++ b/agent/src/discovery.rs @@ -211,42 +211,21 @@ impl AutoDiscovery { "unifi", "wordpress", "nginx", - "apache2", "httpd", - "caddy", // Databases "postgresql", "mysql", "mariadb", "redis", "mongodb", - // Monitoring and infrastructure - "smart-metrics-api", - "service-metrics-api", - "backup-metrics-api", - "prometheus", - "grafana", - "influxdb", // Backup and storage - "restic", "borg", "rclone", - "syncthing", // Container runtimes "docker", - "podman", - "containerd", // Network services "sshd", "dnsmasq", - "bind9", - "pihole", - // Media services - "plex", - "jellyfin", - "emby", - "sonarr", - "radarr", ]; // Check if service name contains any of our interesting patterns @@ -255,21 +234,9 @@ impl AutoDiscovery { .any(|&pattern| service_name.contains(pattern) || pattern.contains(service_name)) } - fn get_host_specific_services(hostname: &str) -> Vec { - match hostname { - "srv01" => vec![ - "gitea".to_string(), - "immich".to_string(), - "vaultwarden".to_string(), - "unifi".to_string(), - "smart-metrics-api".to_string(), - "service-metrics-api".to_string(), - "backup-metrics-api".to_string(), - ], - "cmbox" | "labbox" | "simonbox" => vec!["docker".to_string(), "sshd".to_string()], - "steambox" => vec!["steam".to_string(), "sshd".to_string()], - _ => vec!["sshd".to_string()], - } + fn get_host_specific_services(_hostname: &str) -> Vec { + // Pure auto-discovery - no hardcoded host-specific services + vec![] } fn canonical_service_name(service: &str) -> Option { diff --git a/agent/src/main.rs b/agent/src/main.rs index 5e8b876..d3dd955 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,204 +1,66 @@ -use anyhow::{anyhow, Result}; -use clap::{ArgAction, Parser}; -use std::path::PathBuf; +use anyhow::Result; +use clap::Parser; use tokio::signal; use tracing::{error, info}; use tracing_subscriber::EnvFilter; -mod agent; mod collectors; -mod config; mod discovery; mod notifications; -mod scheduler; +mod simple_agent; -use agent::MetricsAgent; -use config::AgentConfig; +use simple_agent::SimpleAgent; -#[derive(Parser, Debug)] -#[command( - name = "cm-dashboard-agent", - version, - about = "CM Dashboard ZMQ metrics agent with auto-detection" -)] +#[derive(Parser)] +#[command(name = "cm-dashboard-agent")] +#[command(about = "CM Dashboard metrics agent with auto-detection")] +#[command(version)] struct Cli { - /// ZMQ port to bind to (default: 6130) - #[arg(long, value_name = "PORT")] - port: Option, - - /// Path to load configuration from - #[arg(long, value_name = "FILE")] - config: Option, - - /// Optional path to persist the resolved configuration - #[arg(long, value_name = "FILE")] - write_config: Option, - - /// Disable SMART metrics collector - #[arg(long, action = ArgAction::SetTrue)] - disable_smart: bool, - - /// Disable service metrics collector - #[arg(long, action = ArgAction::SetTrue)] - disable_service: bool, - - /// Disable backup metrics collector - #[arg(long, action = ArgAction::SetTrue)] - disable_backup: bool, - - /// Skip auto-detection and use minimal defaults - #[arg(long, action = ArgAction::SetTrue)] - no_auto_detect: bool, - - /// Show detected configuration and exit - #[arg(long, action = ArgAction::SetTrue)] - show_config: bool, - /// Increase logging verbosity (-v, -vv) - #[arg(short, long, action = ArgAction::Count)] + #[arg(short, long, action = clap::ArgAction::Count)] verbose: u8, } #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); - init_tracing(cli.verbose)?; - - // Start with file-based configuration if requested, otherwise defaults - let mut config = if let Some(path) = cli.config.as_ref() { - AgentConfig::load_from_file(path) - .await - .map_err(|e| anyhow!("Failed to load config from {}: {}", path.display(), e))? - } else { - AgentConfig::default() - }; - - // Hostname is auto-detected in AgentConfig::default() - - // Apply CLI port override - if let Some(port) = cli.port { - config.zmq.port = port; - } - - // Run auto-detection unless disabled - if !cli.no_auto_detect { - info!("Auto-detecting system configuration..."); - config - .auto_configure() - .await - .map_err(|e| anyhow!("Auto-detection failed: {}", e))?; - } else { - info!("Skipping auto-detection, using minimal defaults"); - } - - // Apply CLI collector overrides after auto-detection - if cli.disable_smart { - config.collectors.smart.enabled = false; - } - if cli.disable_service { - config.collectors.service.enabled = false; - } - if cli.disable_backup { - config.collectors.backup.enabled = false; - } - - if let Some(path) = cli.write_config.as_ref() { - config - .save_to_file(path) - .await - .map_err(|e| anyhow!("Failed to write config to {}: {}", path.display(), e))?; - info!("Persisted configuration to {}", path.display()); - } - - // Show configuration and exit if requested - if cli.show_config { - println!("Agent Configuration:"); - println!(" Hostname: {}", config.agent.hostname); - println!(" ZMQ Port: {}", config.zmq.port); - println!(" Collectors: {}", config.summary()); - - if config.collectors.smart.enabled { - println!(" SMART Devices: {:?}", config.collectors.smart.devices); - } - - if config.collectors.service.enabled { - println!(" Services: {:?}", config.collectors.service.services); - } - - if config.collectors.backup.enabled { - println!(" Backup Repo: {:?}", config.collectors.backup.restic_repo); - println!( - " Backup Service: {}", - config.collectors.backup.backup_service - ); - } - - return Ok(()); - } - - info!( - "Starting agent for host '{}' on port {} with: {}", - config.agent.hostname, - config.zmq.port, - config.summary() - ); - - // Build and start the agent - let mut agent = - MetricsAgent::from_config(config).map_err(|e| anyhow!("Failed to create agent: {}", e))?; - - // Set up graceful shutdown handling - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); - - tokio::spawn(async move { - let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("Failed to install SIGTERM handler"); - let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt()) - .expect("Failed to install SIGINT handler"); - - tokio::select! { - _ = sigterm.recv() => info!("Received SIGTERM"), - _ = sigint.recv() => info!("Received SIGINT"), - } - - let _ = shutdown_tx.send(()); - }); - - // Run the agent until shutdown - tokio::select! { - result = agent.run() => { - match result { - Ok(_) => info!("Agent completed successfully"), - Err(e) => error!("Agent error: {}", e), - } - } - _ = shutdown_rx => { - info!("Shutdown signal received"); - agent.shutdown().await; - } - } - - Ok(()) -} - -fn init_tracing(verbosity: u8) -> Result<()> { - let level = match verbosity { + + // Setup logging + let log_level = match cli.verbose { 0 => "info", - 1 => "debug", + 1 => "debug", _ => "trace", }; - - let env_filter = std::env::var("RUST_LOG") - .ok() - .and_then(|value| EnvFilter::try_new(value).ok()) - .unwrap_or_else(|| EnvFilter::new(level)); - + tracing_subscriber::fmt() - .with_env_filter(env_filter) - .with_target(false) - .compact() - .try_init() - .map_err(|err| anyhow!(err))?; - + .with_env_filter(EnvFilter::from_default_env().add_directive(log_level.parse()?)) + .init(); + + info!("CM Dashboard Agent starting..."); + + // Create and run agent + let mut agent = SimpleAgent::new().await?; + + // Setup graceful shutdown + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + // Run agent with graceful shutdown + tokio::select! { + result = agent.run() => { + if let Err(e) = result { + error!("Agent error: {}", e); + return Err(e); + } + } + _ = ctrl_c => { + info!("Shutdown signal received"); + } + } + + info!("Agent shutdown complete"); Ok(()) -} +} \ No newline at end of file diff --git a/agent/src/notifications.rs b/agent/src/notifications.rs index a177ead..f106232 100644 --- a/agent/src/notifications.rs +++ b/agent/src/notifications.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; use lettre::{Message, SmtpTransport, Transport}; +use serde::{Deserialize, Serialize}; use tracing::{info, error, warn}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct NotificationConfig { pub enabled: bool, pub smtp_host: String, diff --git a/agent/src/scheduler.rs b/agent/src/scheduler.rs deleted file mode 100644 index 1164024..0000000 --- a/agent/src/scheduler.rs +++ /dev/null @@ -1,393 +0,0 @@ -use futures::stream::{FuturesUnordered, StreamExt}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{mpsc, RwLock}; -use tokio::time::{interval, Instant}; -use tracing::{debug, error, info, warn}; - -use crate::collectors::{Collector, CollectorError, CollectorOutput}; - -pub struct CollectorScheduler { - collectors: Vec>, - sender: mpsc::UnboundedSender, - receiver: mpsc::UnboundedReceiver, - stats: Arc>, - metrics_sender: Option>, -} - -#[derive(Debug)] -pub enum SchedulerEvent { - CollectionResult { - collector_name: String, - result: Result, - duration: Duration, - }, - Shutdown, -} - -#[derive(Debug, Default, Clone)] -pub struct SchedulerStats { - pub total_collections: u64, - pub successful_collections: u64, - pub failed_collections: u64, - pub collector_stats: HashMap, -} - -#[derive(Debug, Default, Clone)] -pub struct CollectorStats { - pub total_collections: u64, - pub successful_collections: u64, - pub failed_collections: u64, - pub last_success: Option, - pub last_failure: Option, - pub average_duration_ms: f64, - pub consecutive_failures: u32, -} - -impl CollectorScheduler { - pub fn new() -> Self { - let (sender, receiver) = mpsc::unbounded_channel(); - - Self { - collectors: Vec::new(), - sender, - receiver, - stats: Arc::new(RwLock::new(SchedulerStats::default())), - metrics_sender: None, - } - } - - pub fn set_metrics_sender(&mut self, sender: mpsc::UnboundedSender) { - self.metrics_sender = Some(sender); - } - - pub fn clear_metrics_sender(&mut self) { - self.metrics_sender = None; - } - - pub fn add_collector(&mut self, collector: Arc) { - if collector.is_enabled() { - info!( - "Adding collector '{}' [{}] with interval {:?}", - collector.name(), - collector.agent_type().as_str(), - collector.collect_interval() - ); - - if collector.requires_root() { - debug!("Collector '{}' is flagged as root-only", collector.name()); - } - self.collectors.push(collector); - } else { - info!("Skipping disabled collector '{}'", collector.name()); - } - } - - pub async fn start(&mut self) -> Result<(), CollectorError> { - if self.collectors.is_empty() { - return Err(CollectorError::ConfigError { - message: "No enabled collectors configured".to_string(), - }); - } - - info!( - "Starting scheduler with {} collectors", - self.collectors.len() - ); - - // Start collection tasks for each collector - let mut collection_tasks = FuturesUnordered::new(); - - for collector in self.collectors.clone() { - let sender = self.sender.clone(); - let stats = self.stats.clone(); - - let task = - tokio::spawn(async move { Self::run_collector(collector, sender, stats).await }); - - collection_tasks.push(task); - } - - // Main event loop - loop { - tokio::select! { - // Handle collection results - Some(event) = self.receiver.recv() => { - match event { - SchedulerEvent::CollectionResult { collector_name, result, duration } => { - self.handle_collection_result(&collector_name, result, duration).await; - } - SchedulerEvent::Shutdown => { - info!("Scheduler shutdown requested"); - break; - } - } - } - - // Handle task completion (shouldn't happen in normal operation) - Some(result) = collection_tasks.next() => { - match result { - Ok(_) => warn!("Collection task completed unexpectedly"), - Err(e) => error!("Collection task failed: {}", e), - } - } - - // If all tasks are done and no more events, break - else => { - warn!("All collection tasks completed, shutting down scheduler"); - break; - } - } - } - - Ok(()) - } - - async fn run_collector( - collector: Arc, - sender: mpsc::UnboundedSender, - _stats: Arc>, - ) { - let collector_name = collector.name().to_string(); - let mut interval_timer = interval(collector.collect_interval()); - interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - info!("Starting collection loop for '{}'", collector_name); - - loop { - interval_timer.tick().await; - - debug!("Running collection for '{}'", collector_name); - let start_time = Instant::now(); - - match collector.collect().await { - Ok(output) => { - let duration = start_time.elapsed(); - debug!( - "Collection '{}' completed in {:?}", - collector_name, duration - ); - - if let Err(e) = sender.send(SchedulerEvent::CollectionResult { - collector_name: collector_name.clone(), - result: Ok(output), - duration, - }) { - error!( - "Failed to send collection result for '{}': {}", - collector_name, e - ); - break; - } - } - Err(error) => { - let duration = start_time.elapsed(); - warn!( - "Collection '{}' failed after {:?}: {}", - collector_name, duration, error - ); - - if let Err(e) = sender.send(SchedulerEvent::CollectionResult { - collector_name: collector_name.clone(), - result: Err(error), - duration, - }) { - error!( - "Failed to send collection error for '{}': {}", - collector_name, e - ); - break; - } - } - } - } - - warn!("Collection loop for '{}' ended", collector_name); - } - - async fn handle_collection_result( - &self, - collector_name: &str, - result: Result, - duration: Duration, - ) { - let publish_output = match &result { - Ok(output) => Some(output.clone()), - Err(_) => None, - }; - - { - let mut stats = self.stats.write().await; - stats.total_collections += 1; - - match &result { - Ok(_) => { - stats.successful_collections += 1; - } - Err(_) => { - stats.failed_collections += 1; - } - } - } - - // Handle collector-specific stats - { - let mut stats = self.stats.write().await; - let duration_ms = duration.as_millis() as f64; - - let collector_stats = stats - .collector_stats - .entry(collector_name.to_string()) - .or_default(); - - collector_stats.total_collections += 1; - - if collector_stats.average_duration_ms == 0.0 { - collector_stats.average_duration_ms = duration_ms; - } else { - // Simple moving average - collector_stats.average_duration_ms = - (collector_stats.average_duration_ms * 0.9) + (duration_ms * 0.1); - } - - match &result { - Ok(output) => { - collector_stats.successful_collections += 1; - collector_stats.last_success = Some(Instant::now()); - collector_stats.consecutive_failures = 0; - - let metrics_count = match &output.data { - serde_json::Value::Object(map) => map.len(), - serde_json::Value::Array(values) => values.len(), - _ => 1, - }; - - debug!( - "Collector '{}' [{}] successful at {} ({} metrics)", - collector_name, - output.agent_type.as_str(), - output.timestamp, - metrics_count - ); - } - Err(error) => { - collector_stats.failed_collections += 1; - collector_stats.last_failure = Some(Instant::now()); - collector_stats.consecutive_failures += 1; - - warn!("Collection '{}' failed: {}", collector_name, error); - - // Log warning for consecutive failures - if collector_stats.consecutive_failures >= 5 { - error!( - "Collector '{}' has {} consecutive failures", - collector_name, collector_stats.consecutive_failures - ); - } - } - } - } - - if let (Some(sender), Some(output)) = (&self.metrics_sender, publish_output) { - if let Err(error) = sender.send(output) { - warn!("Metrics channel send error: {}", error); - } - } - } - - pub fn get_stats_handle(&self) -> Arc> { - self.stats.clone() - } - - pub async fn shutdown(&self) { - info!("Requesting scheduler shutdown"); - if let Err(e) = self.sender.send(SchedulerEvent::Shutdown) { - error!("Failed to send shutdown event: {}", e); - } - } -} - -impl Default for CollectorScheduler { - fn default() -> Self { - Self::new() - } -} - -#[derive(Debug, Clone)] -pub struct HealthChecker { - stats: Arc>, - max_consecutive_failures: u32, - max_failure_rate: f64, -} - -impl HealthChecker { - pub fn new(stats: Arc>) -> Self { - Self { - stats, - max_consecutive_failures: 10, - max_failure_rate: 0.5, // 50% failure rate threshold - } - } - - pub async fn check_health(&self) -> HealthStatus { - let stats = self.stats.read().await; - - let mut unhealthy_collectors = Vec::new(); - let mut degraded_collectors = Vec::new(); - - for (name, collector_stats) in &stats.collector_stats { - // Check consecutive failures - if collector_stats.consecutive_failures >= self.max_consecutive_failures { - unhealthy_collectors.push(name.clone()); - continue; - } - - // Check failure rate - if collector_stats.total_collections > 10 { - let failure_rate = collector_stats.failed_collections as f64 - / collector_stats.total_collections as f64; - - if failure_rate >= self.max_failure_rate { - degraded_collectors.push(name.clone()); - } - } - - // Check if collector hasn't succeeded recently - if let Some(last_success) = collector_stats.last_success { - if last_success.elapsed() > Duration::from_secs(300) { - // 5 minutes - degraded_collectors.push(name.clone()); - } - } else if collector_stats.total_collections > 5 { - // No successful collections after several attempts - unhealthy_collectors.push(name.clone()); - } - } - - if !unhealthy_collectors.is_empty() { - HealthStatus::Unhealthy { - unhealthy_collectors, - degraded_collectors, - } - } else if !degraded_collectors.is_empty() { - HealthStatus::Degraded { - degraded_collectors, - } - } else { - HealthStatus::Healthy - } - } -} - -#[derive(Debug, Clone)] -pub enum HealthStatus { - Healthy, - Degraded { - degraded_collectors: Vec, - }, - Unhealthy { - unhealthy_collectors: Vec, - degraded_collectors: Vec, - }, -} diff --git a/agent/src/simple_agent.rs b/agent/src/simple_agent.rs new file mode 100644 index 0000000..d0949c3 --- /dev/null +++ b/agent/src/simple_agent.rs @@ -0,0 +1,213 @@ +use std::time::Duration; +use chrono::Utc; +use gethostname::gethostname; +use tokio::time::interval; +use tracing::{info, error, warn}; +use zmq::{Context, Socket, SocketType}; + +use crate::collectors::{ + backup::BackupCollector, + service::ServiceCollector, + smart::SmartCollector, + Collector, AgentType +}; +use crate::discovery::AutoDiscovery; +use crate::notifications::{NotificationManager, NotificationConfig}; + +pub struct SimpleAgent { + hostname: String, + zmq_socket: Socket, + notification_manager: NotificationManager, + collectors: Vec>, +} + +impl SimpleAgent { + pub async fn new() -> anyhow::Result { + let hostname = gethostname().to_string_lossy().to_string(); + + info!("Starting CM Dashboard Agent on {}", hostname); + + // Setup ZMQ + let context = Context::new(); + let socket = context.socket(SocketType::PUB)?; + socket.bind("tcp://0.0.0.0:6130")?; + info!("ZMQ publisher bound to tcp://0.0.0.0:6130"); + + // Setup notifications + let notification_config = NotificationConfig { + enabled: true, + smtp_host: "localhost".to_string(), + smtp_port: 25, + from_email: format!("{}@cmtec.se", hostname), + to_email: "cm@cmtec.se".to_string(), + rate_limit_minutes: 30, + }; + let notification_manager = NotificationManager::new(notification_config.clone()); + info!("Notifications: {} -> {}", notification_config.from_email, notification_config.to_email); + + // Auto-discover and create collectors + let mut collectors: Vec> = Vec::new(); + + // SMART collector + let devices = AutoDiscovery::discover_storage_devices().await; + let valid_devices = AutoDiscovery::validate_devices(&devices).await; + if !valid_devices.is_empty() { + let smart_collector = SmartCollector::new(true, 5000, valid_devices.clone()); + collectors.push(Box::new(smart_collector)); + info!("SMART monitoring: {:?}", valid_devices); + } else { + warn!("No storage devices found - SMART monitoring disabled"); + } + + // Service collector + let services = AutoDiscovery::discover_services().await; + let service_list = if !services.is_empty() { + services + } else { + vec!["ssh".to_string()] // Fallback to SSH only + }; + let service_collector = ServiceCollector::new(true, 5000, service_list.clone()); + collectors.push(Box::new(service_collector)); + info!("Service monitoring: {:?}", service_list); + + // Backup collector + let (backup_enabled, restic_repo, backup_service) = + AutoDiscovery::discover_backup_config(&hostname).await; + if backup_enabled { + let backup_collector = BackupCollector::new(true, 30000, restic_repo.clone(), backup_service.clone()); + collectors.push(Box::new(backup_collector)); + info!("Backup monitoring: repo={:?}, service={}", restic_repo, backup_service); + } else { + info!("Backup monitoring disabled (no backup system detected)"); + } + + info!("Agent initialized with {} collectors", collectors.len()); + + Ok(Self { + hostname, + zmq_socket: socket, + notification_manager, + collectors, + }) + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + info!("Starting metrics collection..."); + + // Create collection tasks for each collector (unused for now) + let mut _tasks: Vec> = Vec::new(); + + for collector in &self.collectors { + let collector_name = collector.name().to_string(); + let _agent_type = collector.agent_type(); + let interval_duration = collector.collect_interval(); + + info!("{} collector: {}ms interval", collector_name, interval_duration.as_millis()); + + // Clone what we need for the task + let _hostname = self.hostname.clone(); + + // Create the collection task (we'll handle this differently since we can't clone collectors) + // For now, let's create a simpler approach + } + + // For simplicity, let's run a main loop instead of separate tasks + let mut collection_interval = interval(Duration::from_millis(5000)); + + loop { + collection_interval.tick().await; + + // Collect from all collectors + let mut outputs = Vec::new(); + for collector in &self.collectors { + match collector.collect().await { + Ok(output) => { + // Send via ZMQ + if let Err(e) = self.send_metrics(&output.agent_type, &output.data).await { + error!("Failed to send metrics for {}: {}", collector.name(), e); + } + outputs.push(output); + } + Err(e) => { + error!("Collection failed for {}: {}", collector.name(), e); + } + } + } + + // Process status changes after collection loop to avoid borrowing conflicts + for output in outputs { + self.check_status_changes(&output).await; + } + } + } + + async fn send_metrics(&self, agent_type: &AgentType, data: &serde_json::Value) -> anyhow::Result<()> { + let message = serde_json::json!({ + "host": self.hostname, + "agent_type": agent_type, + "data": data, + "timestamp": Utc::now() + }); + + let serialized = serde_json::to_string(&message)?; + self.zmq_socket.send(&serialized, 0)?; + + Ok(()) + } + + async fn check_status_changes(&mut self, output: &crate::collectors::CollectorOutput) { + // Extract status from collector output and check for changes + match output.agent_type { + AgentType::Service => { + if let Some(summary) = output.data.get("summary") { + // Check CPU status + if let Some(cpu_status) = summary.get("cpu_status").and_then(|v| v.as_str()) { + if let Some(change) = self.notification_manager.update_status("system", "cpu", cpu_status) { + self.notification_manager.send_notification(change).await; + } + } + + // Check memory status + if let Some(memory_status) = summary.get("memory_status").and_then(|v| v.as_str()) { + if let Some(change) = self.notification_manager.update_status("system", "memory", memory_status) { + self.notification_manager.send_notification(change).await; + } + } + + // Check services status + if let Some(services_status) = summary.get("services_status").and_then(|v| v.as_str()) { + if let Some(change) = self.notification_manager.update_status("system", "services", services_status) { + self.notification_manager.send_notification(change).await; + } + } + } + } + AgentType::Smart => { + if let Some(status) = output.data.get("status").and_then(|v| v.as_str()) { + let normalized_status = match status { + "HEALTHY" => "ok", + "WARNING" => "warning", + "CRITICAL" => "critical", + _ => "unknown" + }; + if let Some(change) = self.notification_manager.update_status("storage", "smart", normalized_status) { + self.notification_manager.send_notification(change).await; + } + } + } + AgentType::Backup => { + if let Some(status) = output.data.get("overall_status") { + let status_str = match status.as_str() { + Some("Healthy") => "ok", + Some("Warning") => "warning", + Some("Failed") => "critical", + _ => "unknown" + }; + if let Some(change) = self.notification_manager.update_status("backup", "overall", status_str) { + self.notification_manager.send_notification(change).await; + } + } + } + } + } +} \ No newline at end of file