This commit is contained in:
Christoffer Martinsson 2025-10-12 20:29:08 +02:00
parent d08d8f306a
commit d9edcda36c
10 changed files with 263 additions and 1203 deletions

1
Cargo.lock generated
View File

@ -308,7 +308,6 @@ dependencies = [
"serde_json",
"thiserror",
"tokio",
"toml",
"tracing",
"tracing-appender",
"tracing-subscriber",

View File

@ -12,7 +12,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
thiserror = "1.0"
toml = "0.8"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
tracing-appender = "0.2"

View File

@ -1,263 +0,0 @@
use cm_dashboard_shared::envelope::MessageEnvelope;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use tracing::{debug, error, info, warn};
use zmq::{Context, SocketType};
use crate::collectors::{
backup::BackupCollector, service::ServiceCollector, smart::SmartCollector, AgentType,
CollectorOutput,
};
use crate::config::AgentConfig;
use crate::scheduler::{CollectorScheduler, HealthChecker, HealthStatus};
pub struct MetricsAgent {
config: AgentConfig,
scheduler: CollectorScheduler,
health_checker: Option<HealthChecker>,
}
impl MetricsAgent {
pub fn from_config(config: AgentConfig) -> Result<Self, Box<dyn std::error::Error>> {
let mut agent = Self::new(config)?;
agent.initialize_collectors()?;
Ok(agent)
}
pub fn new(config: AgentConfig) -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
config,
scheduler: CollectorScheduler::new(),
health_checker: None,
})
}
pub fn initialize_collectors(&mut self) -> Result<(), Box<dyn std::error::Error>> {
info!("Initializing collectors...");
// Create SMART collector
if self.config.collectors.smart.enabled {
let smart_collector = SmartCollector::new(
self.config.collectors.smart.enabled,
self.config.collectors.smart.interval_ms,
self.config.collectors.smart.devices.clone(),
);
self.scheduler.add_collector(Arc::new(smart_collector));
info!("SMART collector initialized");
}
// Create Service collector
if self.config.collectors.service.enabled {
let service_collector = ServiceCollector::new(
self.config.collectors.service.enabled,
self.config.collectors.service.interval_ms,
self.config.collectors.service.services.clone(),
);
self.scheduler.add_collector(Arc::new(service_collector));
info!("Service collector initialized");
}
// Create Backup collector
if self.config.collectors.backup.enabled {
let backup_collector = BackupCollector::new(
self.config.collectors.backup.enabled,
self.config.collectors.backup.interval_ms,
self.config.collectors.backup.restic_repo.clone(),
self.config.collectors.backup.backup_service.clone(),
);
self.scheduler.add_collector(Arc::new(backup_collector));
info!("Backup collector initialized");
}
let enabled_count = self.config.get_enabled_collector_count();
if enabled_count == 0 {
return Err("No collectors are enabled".into());
}
info!("Initialized {} collectors", enabled_count);
Ok(())
}
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Starting metrics agent for host '{}'",
self.config.agent.hostname
);
// Initialize health checker
let stats = self.scheduler.get_stats_handle();
self.health_checker = Some(HealthChecker::new(stats));
// Forward successful collection results to the publisher
let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
self.scheduler.set_metrics_sender(metrics_tx);
let publisher_task = self.start_publisher_task(metrics_rx)?;
// Start health monitoring task
let health_task = self.start_health_monitoring_task().await?;
// Start the collector scheduler (this will block)
let scheduler_result = self.scheduler.start().await;
// Drop the metrics sender so the publisher can exit cleanly
self.scheduler.clear_metrics_sender();
// Wait for background tasks to complete
if let Err(join_error) = health_task.await {
warn!("Health monitoring task ended unexpectedly: {}", join_error);
}
if let Err(join_error) = publisher_task.await {
warn!("Publisher task ended unexpectedly: {}", join_error);
}
match scheduler_result {
Ok(_) => {
info!("Agent shutdown completed successfully");
Ok(())
}
Err(e) => {
error!("Agent encountered an error: {}", e);
Err(e.into())
}
}
}
fn start_publisher_task(
&self,
mut metrics_rx: mpsc::UnboundedReceiver<CollectorOutput>,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
let bind_address = format!(
"tcp://{}:{}",
self.config.zmq.bind_address, self.config.zmq.port
);
let send_timeout = self.config.zmq.send_timeout_ms as i32;
let hostname = self.config.agent.hostname.clone();
let handle = tokio::spawn(async move {
let context = Context::new();
let socket = match context.socket(SocketType::PUB) {
Ok(socket) => socket,
Err(error) => {
error!("Failed to create ZMQ PUB socket: {}", error);
return;
}
};
if let Err(error) = socket.set_sndtimeo(send_timeout) {
warn!("Failed to apply ZMQ send timeout: {}", error);
}
if let Err(error) = socket.bind(&bind_address) {
error!(
"Failed to bind ZMQ publisher to {}: {}",
bind_address, error
);
return;
}
info!("ZMQ publisher bound to {}", bind_address);
while let Some(output) = metrics_rx.recv().await {
let CollectorOutput {
agent_type,
data,
timestamp,
} = output;
let envelope_agent_type = match agent_type {
AgentType::Smart => cm_dashboard_shared::envelope::AgentType::Smart,
AgentType::Service => cm_dashboard_shared::envelope::AgentType::Service,
AgentType::Backup => cm_dashboard_shared::envelope::AgentType::Backup,
};
let epoch = timestamp.timestamp();
let epoch_u64 = if epoch < 0 { 0 } else { epoch as u64 };
let envelope = MessageEnvelope {
hostname: hostname.clone(),
agent_type: envelope_agent_type.clone(),
timestamp: epoch_u64,
metrics: data,
};
match serde_json::to_vec(&envelope) {
Ok(serialized) => {
if let Err(error) = socket.send(serialized, 0) {
warn!(
"Failed to publish {:?} metrics: {}",
envelope.agent_type, error
);
} else {
debug!(
"Published {:?} metrics for host {}",
envelope.agent_type, envelope.hostname
);
}
}
Err(error) => {
warn!("Failed to serialize metrics envelope: {}", error);
}
}
}
info!("Metrics publisher task shutting down");
});
Ok(handle)
}
async fn start_health_monitoring_task(
&self,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
let health_checker = self.health_checker.as_ref().unwrap().clone();
let task = tokio::spawn(async move {
info!("Starting health monitoring task");
let mut health_interval = interval(Duration::from_secs(60)); // Check every minute
loop {
health_interval.tick().await;
match health_checker.check_health().await {
HealthStatus::Healthy => {
debug!("All collectors are healthy");
}
HealthStatus::Degraded {
degraded_collectors,
} => {
warn!("Degraded collectors: {:?}", degraded_collectors);
}
HealthStatus::Unhealthy {
unhealthy_collectors,
degraded_collectors,
} => {
error!(
"Unhealthy collectors: {:?}, Degraded: {:?}",
unhealthy_collectors, degraded_collectors
);
}
}
}
});
Ok(task)
}
pub async fn shutdown(&self) {
info!("Initiating graceful shutdown...");
self.scheduler.shutdown().await;
// ZMQ socket will be dropped automatically
info!("Agent shutdown completed");
}
}
impl Drop for MetricsAgent {
fn drop(&mut self) {
// ZMQ socket will be dropped automatically
}
}

View File

@ -10,7 +10,7 @@ pub mod smart;
pub use error::CollectorError;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize)]
pub enum AgentType {
Smart,
Service,

View File

@ -1,325 +0,0 @@
use serde::{Deserialize, Serialize};
use std::path::Path;
use tokio::fs;
use tracing::info;
use crate::collectors::CollectorError;
use crate::discovery::AutoDiscovery;
use crate::notifications::NotificationConfig;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentConfig {
pub agent: AgentSettings,
pub zmq: ZmqSettings,
pub collectors: CollectorsConfig,
#[serde(default)]
pub notifications: NotificationConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentSettings {
pub hostname: String,
pub log_level: String,
pub metrics_buffer_size: usize,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ZmqSettings {
pub port: u16,
pub bind_address: String,
pub send_timeout_ms: u64,
pub receive_timeout_ms: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct CollectorsConfig {
pub smart: SmartCollectorConfig,
pub service: ServiceCollectorConfig,
pub backup: BackupCollectorConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SmartCollectorConfig {
pub enabled: bool,
pub interval_ms: u64,
pub devices: Vec<String>,
pub timeout_ms: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServiceCollectorConfig {
pub enabled: bool,
pub interval_ms: u64,
pub services: Vec<String>,
pub timeout_ms: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BackupCollectorConfig {
pub enabled: bool,
pub interval_ms: u64,
pub restic_repo: Option<String>,
pub backup_service: String,
pub timeout_ms: u64,
}
impl Default for AgentConfig {
fn default() -> Self {
Self {
agent: AgentSettings {
hostname: gethostname::gethostname().to_string_lossy().to_string(),
log_level: "info".to_string(),
metrics_buffer_size: 1000,
},
zmq: ZmqSettings {
port: 6130,
bind_address: "0.0.0.0".to_string(),
send_timeout_ms: 5000,
receive_timeout_ms: 5000,
},
collectors: CollectorsConfig {
smart: SmartCollectorConfig {
enabled: true,
interval_ms: 5000,
devices: vec!["nvme0n1".to_string()],
timeout_ms: 30000,
},
service: ServiceCollectorConfig {
enabled: true,
interval_ms: 5000,
services: vec![
"gitea".to_string(),
"immich".to_string(),
"vaultwarden".to_string(),
"unifi".to_string(),
],
timeout_ms: 10000,
},
backup: BackupCollectorConfig {
enabled: true,
interval_ms: 30000,
restic_repo: None,
backup_service: "restic-backup".to_string(),
timeout_ms: 30000,
},
},
}
}
}
impl AgentConfig {
pub async fn load_from_file<P: AsRef<Path>>(path: P) -> Result<Self, CollectorError> {
let content = fs::read_to_string(path)
.await
.map_err(|e| CollectorError::ConfigError {
message: format!("Failed to read config file: {}", e),
})?;
let config: AgentConfig =
toml::from_str(&content).map_err(|e| CollectorError::ConfigError {
message: format!("Failed to parse config file: {}", e),
})?;
config.validate()?;
Ok(config)
}
pub async fn save_to_file<P: AsRef<Path>>(&self, path: P) -> Result<(), CollectorError> {
let content = toml::to_string_pretty(self).map_err(|e| CollectorError::ConfigError {
message: format!("Failed to serialize config: {}", e),
})?;
fs::write(path, content)
.await
.map_err(|e| CollectorError::ConfigError {
message: format!("Failed to write config file: {}", e),
})?;
Ok(())
}
pub fn validate(&self) -> Result<(), CollectorError> {
// Validate ZMQ settings
if self.zmq.port == 0 {
return Err(CollectorError::ConfigError {
message: "ZMQ port cannot be 0".to_string(),
});
}
// Validate collector intervals
if self.collectors.smart.enabled && self.collectors.smart.interval_ms < 1000 {
return Err(CollectorError::ConfigError {
message: "SMART collector interval must be at least 1000ms".to_string(),
});
}
if self.collectors.service.enabled && self.collectors.service.interval_ms < 500 {
return Err(CollectorError::ConfigError {
message: "Service collector interval must be at least 500ms".to_string(),
});
}
if self.collectors.backup.enabled && self.collectors.backup.interval_ms < 5000 {
return Err(CollectorError::ConfigError {
message: "Backup collector interval must be at least 5000ms".to_string(),
});
}
// Validate smart devices
if self.collectors.smart.enabled && self.collectors.smart.devices.is_empty() {
return Err(CollectorError::ConfigError {
message: "SMART collector requires at least one device".to_string(),
});
}
// Validate services
if self.collectors.service.enabled && self.collectors.service.services.is_empty() {
return Err(CollectorError::ConfigError {
message: "Service collector requires at least one service".to_string(),
});
}
// Validate backup configuration
if self.collectors.backup.enabled {
if self.collectors.backup.restic_repo.is_none() {
tracing::warn!("Backup collector enabled but no restic repository configured");
}
if self.collectors.backup.backup_service.is_empty() {
return Err(CollectorError::ConfigError {
message: "Backup collector requires a backup service name".to_string(),
});
}
}
Ok(())
}
pub fn get_enabled_collector_count(&self) -> usize {
let mut count = 0;
if self.collectors.smart.enabled {
count += 1;
}
if self.collectors.service.enabled {
count += 1;
}
if self.collectors.backup.enabled {
count += 1;
}
count
}
pub async fn auto_configure(&mut self) -> Result<(), CollectorError> {
let hostname = &self.agent.hostname.clone();
info!("Auto-configuring agent for host: {}", hostname);
// Auto-detect storage devices
let devices = AutoDiscovery::discover_storage_devices().await;
let valid_devices = AutoDiscovery::validate_devices(&devices).await;
if !valid_devices.is_empty() {
self.collectors.smart.devices = valid_devices;
info!(
"Auto-detected storage devices: {:?}",
self.collectors.smart.devices
);
} else {
info!("No accessible storage devices found, disabling SMART collector");
self.collectors.smart.enabled = false;
}
// Auto-detect services
let services = AutoDiscovery::discover_services().await;
if !services.is_empty() {
self.collectors.service.services = services;
info!(
"Auto-detected services: {:?}",
self.collectors.service.services
);
} else {
info!("No monitorable services found, using minimal service list");
self.collectors.service.services = vec!["ssh".to_string()];
}
// Auto-detect backup configuration
let (backup_enabled, restic_repo, backup_service) =
AutoDiscovery::discover_backup_config(hostname).await;
self.collectors.backup.enabled = backup_enabled;
self.collectors.backup.restic_repo = restic_repo;
self.collectors.backup.backup_service = backup_service;
if backup_enabled {
info!(
"Auto-configured backup monitoring: repo={:?}, service={}",
self.collectors.backup.restic_repo, self.collectors.backup.backup_service
);
} else {
info!("Backup monitoring disabled for this host");
}
// Auto-configure notifications
self.notifications.enabled = true;
self.notifications.from_email = format!("{}@cmtec.se", hostname);
self.notifications.to_email = "cm@cmtec.se".to_string();
info!("Auto-configured notifications: {} -> {}",
self.notifications.from_email, self.notifications.to_email);
// Apply host-specific timing optimizations
self.apply_host_timing_overrides(hostname);
Ok(())
}
fn apply_host_timing_overrides(&mut self, hostname: &str) {
match hostname {
"srv01" => {
// Server host - standard 5 second monitoring
self.collectors.service.interval_ms = 5000;
self.collectors.smart.interval_ms = 5000;
}
"cmbox" | "labbox" | "simonbox" | "steambox" => {
// Workstation hosts - standard 5 second monitoring
self.collectors.smart.interval_ms = 5000;
self.collectors.service.interval_ms = 5000;
}
_ => {
// Unknown host - standard 5 second monitoring
self.collectors.smart.interval_ms = 5000;
self.collectors.service.interval_ms = 5000;
}
}
info!(
"Applied timing overrides for {}: smart={}ms, service={}ms",
hostname, self.collectors.smart.interval_ms, self.collectors.service.interval_ms
);
}
pub fn summary(&self) -> String {
let mut parts = Vec::new();
if self.collectors.smart.enabled {
parts.push(format!(
"SMART({} devices)",
self.collectors.smart.devices.len()
));
}
if self.collectors.service.enabled {
parts.push(format!(
"Services({} monitored)",
self.collectors.service.services.len()
));
}
if self.collectors.backup.enabled {
parts.push("Backup".to_string());
}
if parts.is_empty() {
"No collectors enabled".to_string()
} else {
parts.join(", ")
}
}
}

View File

@ -211,42 +211,21 @@ impl AutoDiscovery {
"unifi",
"wordpress",
"nginx",
"apache2",
"httpd",
"caddy",
// Databases
"postgresql",
"mysql",
"mariadb",
"redis",
"mongodb",
// Monitoring and infrastructure
"smart-metrics-api",
"service-metrics-api",
"backup-metrics-api",
"prometheus",
"grafana",
"influxdb",
// Backup and storage
"restic",
"borg",
"rclone",
"syncthing",
// Container runtimes
"docker",
"podman",
"containerd",
// Network services
"sshd",
"dnsmasq",
"bind9",
"pihole",
// Media services
"plex",
"jellyfin",
"emby",
"sonarr",
"radarr",
];
// Check if service name contains any of our interesting patterns
@ -255,21 +234,9 @@ impl AutoDiscovery {
.any(|&pattern| service_name.contains(pattern) || pattern.contains(service_name))
}
fn get_host_specific_services(hostname: &str) -> Vec<String> {
match hostname {
"srv01" => vec![
"gitea".to_string(),
"immich".to_string(),
"vaultwarden".to_string(),
"unifi".to_string(),
"smart-metrics-api".to_string(),
"service-metrics-api".to_string(),
"backup-metrics-api".to_string(),
],
"cmbox" | "labbox" | "simonbox" => vec!["docker".to_string(), "sshd".to_string()],
"steambox" => vec!["steam".to_string(), "sshd".to_string()],
_ => vec!["sshd".to_string()],
}
fn get_host_specific_services(_hostname: &str) -> Vec<String> {
// Pure auto-discovery - no hardcoded host-specific services
vec![]
}
fn canonical_service_name(service: &str) -> Option<String> {

View File

@ -1,204 +1,66 @@
use anyhow::{anyhow, Result};
use clap::{ArgAction, Parser};
use std::path::PathBuf;
use anyhow::Result;
use clap::Parser;
use tokio::signal;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
mod agent;
mod collectors;
mod config;
mod discovery;
mod notifications;
mod scheduler;
mod simple_agent;
use agent::MetricsAgent;
use config::AgentConfig;
use simple_agent::SimpleAgent;
#[derive(Parser, Debug)]
#[command(
name = "cm-dashboard-agent",
version,
about = "CM Dashboard ZMQ metrics agent with auto-detection"
)]
#[derive(Parser)]
#[command(name = "cm-dashboard-agent")]
#[command(about = "CM Dashboard metrics agent with auto-detection")]
#[command(version)]
struct Cli {
/// ZMQ port to bind to (default: 6130)
#[arg(long, value_name = "PORT")]
port: Option<u16>,
/// Path to load configuration from
#[arg(long, value_name = "FILE")]
config: Option<PathBuf>,
/// Optional path to persist the resolved configuration
#[arg(long, value_name = "FILE")]
write_config: Option<PathBuf>,
/// Disable SMART metrics collector
#[arg(long, action = ArgAction::SetTrue)]
disable_smart: bool,
/// Disable service metrics collector
#[arg(long, action = ArgAction::SetTrue)]
disable_service: bool,
/// Disable backup metrics collector
#[arg(long, action = ArgAction::SetTrue)]
disable_backup: bool,
/// Skip auto-detection and use minimal defaults
#[arg(long, action = ArgAction::SetTrue)]
no_auto_detect: bool,
/// Show detected configuration and exit
#[arg(long, action = ArgAction::SetTrue)]
show_config: bool,
/// Increase logging verbosity (-v, -vv)
#[arg(short, long, action = ArgAction::Count)]
#[arg(short, long, action = clap::ArgAction::Count)]
verbose: u8,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
init_tracing(cli.verbose)?;
// Start with file-based configuration if requested, otherwise defaults
let mut config = if let Some(path) = cli.config.as_ref() {
AgentConfig::load_from_file(path)
.await
.map_err(|e| anyhow!("Failed to load config from {}: {}", path.display(), e))?
} else {
AgentConfig::default()
};
// Hostname is auto-detected in AgentConfig::default()
// Apply CLI port override
if let Some(port) = cli.port {
config.zmq.port = port;
}
// Run auto-detection unless disabled
if !cli.no_auto_detect {
info!("Auto-detecting system configuration...");
config
.auto_configure()
.await
.map_err(|e| anyhow!("Auto-detection failed: {}", e))?;
} else {
info!("Skipping auto-detection, using minimal defaults");
}
// Apply CLI collector overrides after auto-detection
if cli.disable_smart {
config.collectors.smart.enabled = false;
}
if cli.disable_service {
config.collectors.service.enabled = false;
}
if cli.disable_backup {
config.collectors.backup.enabled = false;
}
if let Some(path) = cli.write_config.as_ref() {
config
.save_to_file(path)
.await
.map_err(|e| anyhow!("Failed to write config to {}: {}", path.display(), e))?;
info!("Persisted configuration to {}", path.display());
}
// Show configuration and exit if requested
if cli.show_config {
println!("Agent Configuration:");
println!(" Hostname: {}", config.agent.hostname);
println!(" ZMQ Port: {}", config.zmq.port);
println!(" Collectors: {}", config.summary());
if config.collectors.smart.enabled {
println!(" SMART Devices: {:?}", config.collectors.smart.devices);
}
if config.collectors.service.enabled {
println!(" Services: {:?}", config.collectors.service.services);
}
if config.collectors.backup.enabled {
println!(" Backup Repo: {:?}", config.collectors.backup.restic_repo);
println!(
" Backup Service: {}",
config.collectors.backup.backup_service
);
}
return Ok(());
}
info!(
"Starting agent for host '{}' on port {} with: {}",
config.agent.hostname,
config.zmq.port,
config.summary()
);
// Build and start the agent
let mut agent =
MetricsAgent::from_config(config).map_err(|e| anyhow!("Failed to create agent: {}", e))?;
// Set up graceful shutdown handling
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler");
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Failed to install SIGINT handler");
tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM"),
_ = sigint.recv() => info!("Received SIGINT"),
}
let _ = shutdown_tx.send(());
});
// Run the agent until shutdown
tokio::select! {
result = agent.run() => {
match result {
Ok(_) => info!("Agent completed successfully"),
Err(e) => error!("Agent error: {}", e),
}
}
_ = shutdown_rx => {
info!("Shutdown signal received");
agent.shutdown().await;
}
}
Ok(())
}
fn init_tracing(verbosity: u8) -> Result<()> {
let level = match verbosity {
// Setup logging
let log_level = match cli.verbose {
0 => "info",
1 => "debug",
_ => "trace",
};
let env_filter = std::env::var("RUST_LOG")
.ok()
.and_then(|value| EnvFilter::try_new(value).ok())
.unwrap_or_else(|| EnvFilter::new(level));
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.compact()
.try_init()
.map_err(|err| anyhow!(err))?;
.with_env_filter(EnvFilter::from_default_env().add_directive(log_level.parse()?))
.init();
info!("CM Dashboard Agent starting...");
// Create and run agent
let mut agent = SimpleAgent::new().await?;
// Setup graceful shutdown
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
// Run agent with graceful shutdown
tokio::select! {
result = agent.run() => {
if let Err(e) = result {
error!("Agent error: {}", e);
return Err(e);
}
}
_ = ctrl_c => {
info!("Shutdown signal received");
}
}
info!("Agent shutdown complete");
Ok(())
}

View File

@ -1,9 +1,10 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use lettre::{Message, SmtpTransport, Transport};
use serde::{Deserialize, Serialize};
use tracing::{info, error, warn};
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationConfig {
pub enabled: bool,
pub smtp_host: String,

View File

@ -1,393 +0,0 @@
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::time::{interval, Instant};
use tracing::{debug, error, info, warn};
use crate::collectors::{Collector, CollectorError, CollectorOutput};
pub struct CollectorScheduler {
collectors: Vec<Arc<dyn Collector>>,
sender: mpsc::UnboundedSender<SchedulerEvent>,
receiver: mpsc::UnboundedReceiver<SchedulerEvent>,
stats: Arc<RwLock<SchedulerStats>>,
metrics_sender: Option<mpsc::UnboundedSender<CollectorOutput>>,
}
#[derive(Debug)]
pub enum SchedulerEvent {
CollectionResult {
collector_name: String,
result: Result<CollectorOutput, CollectorError>,
duration: Duration,
},
Shutdown,
}
#[derive(Debug, Default, Clone)]
pub struct SchedulerStats {
pub total_collections: u64,
pub successful_collections: u64,
pub failed_collections: u64,
pub collector_stats: HashMap<String, CollectorStats>,
}
#[derive(Debug, Default, Clone)]
pub struct CollectorStats {
pub total_collections: u64,
pub successful_collections: u64,
pub failed_collections: u64,
pub last_success: Option<Instant>,
pub last_failure: Option<Instant>,
pub average_duration_ms: f64,
pub consecutive_failures: u32,
}
impl CollectorScheduler {
pub fn new() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
collectors: Vec::new(),
sender,
receiver,
stats: Arc::new(RwLock::new(SchedulerStats::default())),
metrics_sender: None,
}
}
pub fn set_metrics_sender(&mut self, sender: mpsc::UnboundedSender<CollectorOutput>) {
self.metrics_sender = Some(sender);
}
pub fn clear_metrics_sender(&mut self) {
self.metrics_sender = None;
}
pub fn add_collector(&mut self, collector: Arc<dyn Collector>) {
if collector.is_enabled() {
info!(
"Adding collector '{}' [{}] with interval {:?}",
collector.name(),
collector.agent_type().as_str(),
collector.collect_interval()
);
if collector.requires_root() {
debug!("Collector '{}' is flagged as root-only", collector.name());
}
self.collectors.push(collector);
} else {
info!("Skipping disabled collector '{}'", collector.name());
}
}
pub async fn start(&mut self) -> Result<(), CollectorError> {
if self.collectors.is_empty() {
return Err(CollectorError::ConfigError {
message: "No enabled collectors configured".to_string(),
});
}
info!(
"Starting scheduler with {} collectors",
self.collectors.len()
);
// Start collection tasks for each collector
let mut collection_tasks = FuturesUnordered::new();
for collector in self.collectors.clone() {
let sender = self.sender.clone();
let stats = self.stats.clone();
let task =
tokio::spawn(async move { Self::run_collector(collector, sender, stats).await });
collection_tasks.push(task);
}
// Main event loop
loop {
tokio::select! {
// Handle collection results
Some(event) = self.receiver.recv() => {
match event {
SchedulerEvent::CollectionResult { collector_name, result, duration } => {
self.handle_collection_result(&collector_name, result, duration).await;
}
SchedulerEvent::Shutdown => {
info!("Scheduler shutdown requested");
break;
}
}
}
// Handle task completion (shouldn't happen in normal operation)
Some(result) = collection_tasks.next() => {
match result {
Ok(_) => warn!("Collection task completed unexpectedly"),
Err(e) => error!("Collection task failed: {}", e),
}
}
// If all tasks are done and no more events, break
else => {
warn!("All collection tasks completed, shutting down scheduler");
break;
}
}
}
Ok(())
}
async fn run_collector(
collector: Arc<dyn Collector>,
sender: mpsc::UnboundedSender<SchedulerEvent>,
_stats: Arc<RwLock<SchedulerStats>>,
) {
let collector_name = collector.name().to_string();
let mut interval_timer = interval(collector.collect_interval());
interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
info!("Starting collection loop for '{}'", collector_name);
loop {
interval_timer.tick().await;
debug!("Running collection for '{}'", collector_name);
let start_time = Instant::now();
match collector.collect().await {
Ok(output) => {
let duration = start_time.elapsed();
debug!(
"Collection '{}' completed in {:?}",
collector_name, duration
);
if let Err(e) = sender.send(SchedulerEvent::CollectionResult {
collector_name: collector_name.clone(),
result: Ok(output),
duration,
}) {
error!(
"Failed to send collection result for '{}': {}",
collector_name, e
);
break;
}
}
Err(error) => {
let duration = start_time.elapsed();
warn!(
"Collection '{}' failed after {:?}: {}",
collector_name, duration, error
);
if let Err(e) = sender.send(SchedulerEvent::CollectionResult {
collector_name: collector_name.clone(),
result: Err(error),
duration,
}) {
error!(
"Failed to send collection error for '{}': {}",
collector_name, e
);
break;
}
}
}
}
warn!("Collection loop for '{}' ended", collector_name);
}
async fn handle_collection_result(
&self,
collector_name: &str,
result: Result<CollectorOutput, CollectorError>,
duration: Duration,
) {
let publish_output = match &result {
Ok(output) => Some(output.clone()),
Err(_) => None,
};
{
let mut stats = self.stats.write().await;
stats.total_collections += 1;
match &result {
Ok(_) => {
stats.successful_collections += 1;
}
Err(_) => {
stats.failed_collections += 1;
}
}
}
// Handle collector-specific stats
{
let mut stats = self.stats.write().await;
let duration_ms = duration.as_millis() as f64;
let collector_stats = stats
.collector_stats
.entry(collector_name.to_string())
.or_default();
collector_stats.total_collections += 1;
if collector_stats.average_duration_ms == 0.0 {
collector_stats.average_duration_ms = duration_ms;
} else {
// Simple moving average
collector_stats.average_duration_ms =
(collector_stats.average_duration_ms * 0.9) + (duration_ms * 0.1);
}
match &result {
Ok(output) => {
collector_stats.successful_collections += 1;
collector_stats.last_success = Some(Instant::now());
collector_stats.consecutive_failures = 0;
let metrics_count = match &output.data {
serde_json::Value::Object(map) => map.len(),
serde_json::Value::Array(values) => values.len(),
_ => 1,
};
debug!(
"Collector '{}' [{}] successful at {} ({} metrics)",
collector_name,
output.agent_type.as_str(),
output.timestamp,
metrics_count
);
}
Err(error) => {
collector_stats.failed_collections += 1;
collector_stats.last_failure = Some(Instant::now());
collector_stats.consecutive_failures += 1;
warn!("Collection '{}' failed: {}", collector_name, error);
// Log warning for consecutive failures
if collector_stats.consecutive_failures >= 5 {
error!(
"Collector '{}' has {} consecutive failures",
collector_name, collector_stats.consecutive_failures
);
}
}
}
}
if let (Some(sender), Some(output)) = (&self.metrics_sender, publish_output) {
if let Err(error) = sender.send(output) {
warn!("Metrics channel send error: {}", error);
}
}
}
pub fn get_stats_handle(&self) -> Arc<RwLock<SchedulerStats>> {
self.stats.clone()
}
pub async fn shutdown(&self) {
info!("Requesting scheduler shutdown");
if let Err(e) = self.sender.send(SchedulerEvent::Shutdown) {
error!("Failed to send shutdown event: {}", e);
}
}
}
impl Default for CollectorScheduler {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct HealthChecker {
stats: Arc<RwLock<SchedulerStats>>,
max_consecutive_failures: u32,
max_failure_rate: f64,
}
impl HealthChecker {
pub fn new(stats: Arc<RwLock<SchedulerStats>>) -> Self {
Self {
stats,
max_consecutive_failures: 10,
max_failure_rate: 0.5, // 50% failure rate threshold
}
}
pub async fn check_health(&self) -> HealthStatus {
let stats = self.stats.read().await;
let mut unhealthy_collectors = Vec::new();
let mut degraded_collectors = Vec::new();
for (name, collector_stats) in &stats.collector_stats {
// Check consecutive failures
if collector_stats.consecutive_failures >= self.max_consecutive_failures {
unhealthy_collectors.push(name.clone());
continue;
}
// Check failure rate
if collector_stats.total_collections > 10 {
let failure_rate = collector_stats.failed_collections as f64
/ collector_stats.total_collections as f64;
if failure_rate >= self.max_failure_rate {
degraded_collectors.push(name.clone());
}
}
// Check if collector hasn't succeeded recently
if let Some(last_success) = collector_stats.last_success {
if last_success.elapsed() > Duration::from_secs(300) {
// 5 minutes
degraded_collectors.push(name.clone());
}
} else if collector_stats.total_collections > 5 {
// No successful collections after several attempts
unhealthy_collectors.push(name.clone());
}
}
if !unhealthy_collectors.is_empty() {
HealthStatus::Unhealthy {
unhealthy_collectors,
degraded_collectors,
}
} else if !degraded_collectors.is_empty() {
HealthStatus::Degraded {
degraded_collectors,
}
} else {
HealthStatus::Healthy
}
}
}
#[derive(Debug, Clone)]
pub enum HealthStatus {
Healthy,
Degraded {
degraded_collectors: Vec<String>,
},
Unhealthy {
unhealthy_collectors: Vec<String>,
degraded_collectors: Vec<String>,
},
}

213
agent/src/simple_agent.rs Normal file
View File

@ -0,0 +1,213 @@
use std::time::Duration;
use chrono::Utc;
use gethostname::gethostname;
use tokio::time::interval;
use tracing::{info, error, warn};
use zmq::{Context, Socket, SocketType};
use crate::collectors::{
backup::BackupCollector,
service::ServiceCollector,
smart::SmartCollector,
Collector, AgentType
};
use crate::discovery::AutoDiscovery;
use crate::notifications::{NotificationManager, NotificationConfig};
pub struct SimpleAgent {
hostname: String,
zmq_socket: Socket,
notification_manager: NotificationManager,
collectors: Vec<Box<dyn Collector + Send + Sync>>,
}
impl SimpleAgent {
pub async fn new() -> anyhow::Result<Self> {
let hostname = gethostname().to_string_lossy().to_string();
info!("Starting CM Dashboard Agent on {}", hostname);
// Setup ZMQ
let context = Context::new();
let socket = context.socket(SocketType::PUB)?;
socket.bind("tcp://0.0.0.0:6130")?;
info!("ZMQ publisher bound to tcp://0.0.0.0:6130");
// Setup notifications
let notification_config = NotificationConfig {
enabled: true,
smtp_host: "localhost".to_string(),
smtp_port: 25,
from_email: format!("{}@cmtec.se", hostname),
to_email: "cm@cmtec.se".to_string(),
rate_limit_minutes: 30,
};
let notification_manager = NotificationManager::new(notification_config.clone());
info!("Notifications: {} -> {}", notification_config.from_email, notification_config.to_email);
// Auto-discover and create collectors
let mut collectors: Vec<Box<dyn Collector + Send + Sync>> = Vec::new();
// SMART collector
let devices = AutoDiscovery::discover_storage_devices().await;
let valid_devices = AutoDiscovery::validate_devices(&devices).await;
if !valid_devices.is_empty() {
let smart_collector = SmartCollector::new(true, 5000, valid_devices.clone());
collectors.push(Box::new(smart_collector));
info!("SMART monitoring: {:?}", valid_devices);
} else {
warn!("No storage devices found - SMART monitoring disabled");
}
// Service collector
let services = AutoDiscovery::discover_services().await;
let service_list = if !services.is_empty() {
services
} else {
vec!["ssh".to_string()] // Fallback to SSH only
};
let service_collector = ServiceCollector::new(true, 5000, service_list.clone());
collectors.push(Box::new(service_collector));
info!("Service monitoring: {:?}", service_list);
// Backup collector
let (backup_enabled, restic_repo, backup_service) =
AutoDiscovery::discover_backup_config(&hostname).await;
if backup_enabled {
let backup_collector = BackupCollector::new(true, 30000, restic_repo.clone(), backup_service.clone());
collectors.push(Box::new(backup_collector));
info!("Backup monitoring: repo={:?}, service={}", restic_repo, backup_service);
} else {
info!("Backup monitoring disabled (no backup system detected)");
}
info!("Agent initialized with {} collectors", collectors.len());
Ok(Self {
hostname,
zmq_socket: socket,
notification_manager,
collectors,
})
}
pub async fn run(&mut self) -> anyhow::Result<()> {
info!("Starting metrics collection...");
// Create collection tasks for each collector (unused for now)
let mut _tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for collector in &self.collectors {
let collector_name = collector.name().to_string();
let _agent_type = collector.agent_type();
let interval_duration = collector.collect_interval();
info!("{} collector: {}ms interval", collector_name, interval_duration.as_millis());
// Clone what we need for the task
let _hostname = self.hostname.clone();
// Create the collection task (we'll handle this differently since we can't clone collectors)
// For now, let's create a simpler approach
}
// For simplicity, let's run a main loop instead of separate tasks
let mut collection_interval = interval(Duration::from_millis(5000));
loop {
collection_interval.tick().await;
// Collect from all collectors
let mut outputs = Vec::new();
for collector in &self.collectors {
match collector.collect().await {
Ok(output) => {
// Send via ZMQ
if let Err(e) = self.send_metrics(&output.agent_type, &output.data).await {
error!("Failed to send metrics for {}: {}", collector.name(), e);
}
outputs.push(output);
}
Err(e) => {
error!("Collection failed for {}: {}", collector.name(), e);
}
}
}
// Process status changes after collection loop to avoid borrowing conflicts
for output in outputs {
self.check_status_changes(&output).await;
}
}
}
async fn send_metrics(&self, agent_type: &AgentType, data: &serde_json::Value) -> anyhow::Result<()> {
let message = serde_json::json!({
"host": self.hostname,
"agent_type": agent_type,
"data": data,
"timestamp": Utc::now()
});
let serialized = serde_json::to_string(&message)?;
self.zmq_socket.send(&serialized, 0)?;
Ok(())
}
async fn check_status_changes(&mut self, output: &crate::collectors::CollectorOutput) {
// Extract status from collector output and check for changes
match output.agent_type {
AgentType::Service => {
if let Some(summary) = output.data.get("summary") {
// Check CPU status
if let Some(cpu_status) = summary.get("cpu_status").and_then(|v| v.as_str()) {
if let Some(change) = self.notification_manager.update_status("system", "cpu", cpu_status) {
self.notification_manager.send_notification(change).await;
}
}
// Check memory status
if let Some(memory_status) = summary.get("memory_status").and_then(|v| v.as_str()) {
if let Some(change) = self.notification_manager.update_status("system", "memory", memory_status) {
self.notification_manager.send_notification(change).await;
}
}
// Check services status
if let Some(services_status) = summary.get("services_status").and_then(|v| v.as_str()) {
if let Some(change) = self.notification_manager.update_status("system", "services", services_status) {
self.notification_manager.send_notification(change).await;
}
}
}
}
AgentType::Smart => {
if let Some(status) = output.data.get("status").and_then(|v| v.as_str()) {
let normalized_status = match status {
"HEALTHY" => "ok",
"WARNING" => "warning",
"CRITICAL" => "critical",
_ => "unknown"
};
if let Some(change) = self.notification_manager.update_status("storage", "smart", normalized_status) {
self.notification_manager.send_notification(change).await;
}
}
}
AgentType::Backup => {
if let Some(status) = output.data.get("overall_status") {
let status_str = match status.as_str() {
Some("Healthy") => "ok",
Some("Warning") => "warning",
Some("Failed") => "critical",
_ => "unknown"
};
if let Some(change) = self.notification_manager.update_status("backup", "overall", status_str) {
self.notification_manager.send_notification(change).await;
}
}
}
}
}
}