Implement per-service disk usage monitoring
Replaced system-wide disk usage with accurate per-service tracking by scanning service-specific directories. Services like sshd now correctly show minimal disk usage instead of misleading system totals. - Rename storage widget and add drive capacity/usage columns - Move host display to main dashboard title for cleaner layout - Replace separate alert displays with color-coded row highlighting - Add per-service disk usage collection using du command - Update services widget formatting to handle small disk values - Restructure into workspace with dedicated agent and dashboard packages
This commit is contained in:
@@ -6,13 +6,18 @@ edition = "2021"
|
||||
[dependencies]
|
||||
cm-dashboard-shared = { path = "../shared" }
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1"
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
thiserror = "1.0"
|
||||
toml = "0.8"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
tracing-appender = "0.2"
|
||||
zmq = "0.10"
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
tokio = { version = "1.0", features = ["full", "process"] }
|
||||
futures = "0.3"
|
||||
rand = "0.8"
|
||||
gethostname = "0.4"
|
||||
|
||||
263
agent/src/agent.rs
Normal file
263
agent/src/agent.rs
Normal file
@@ -0,0 +1,263 @@
|
||||
use cm_dashboard_shared::envelope::MessageEnvelope;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{interval, Duration};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use zmq::{Context, SocketType};
|
||||
|
||||
use crate::collectors::{
|
||||
backup::BackupCollector, service::ServiceCollector, smart::SmartCollector, AgentType,
|
||||
CollectorOutput,
|
||||
};
|
||||
use crate::config::AgentConfig;
|
||||
use crate::scheduler::{CollectorScheduler, HealthChecker, HealthStatus};
|
||||
|
||||
pub struct MetricsAgent {
|
||||
config: AgentConfig,
|
||||
scheduler: CollectorScheduler,
|
||||
health_checker: Option<HealthChecker>,
|
||||
}
|
||||
|
||||
impl MetricsAgent {
|
||||
pub fn from_config(config: AgentConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let mut agent = Self::new(config)?;
|
||||
agent.initialize_collectors()?;
|
||||
Ok(agent)
|
||||
}
|
||||
|
||||
pub fn new(config: AgentConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
Ok(Self {
|
||||
config,
|
||||
scheduler: CollectorScheduler::new(),
|
||||
health_checker: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn initialize_collectors(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Initializing collectors...");
|
||||
|
||||
// Create SMART collector
|
||||
if self.config.collectors.smart.enabled {
|
||||
let smart_collector = SmartCollector::new(
|
||||
self.config.collectors.smart.enabled,
|
||||
self.config.collectors.smart.interval_ms,
|
||||
self.config.collectors.smart.devices.clone(),
|
||||
);
|
||||
self.scheduler.add_collector(Arc::new(smart_collector));
|
||||
info!("SMART collector initialized");
|
||||
}
|
||||
|
||||
// Create Service collector
|
||||
if self.config.collectors.service.enabled {
|
||||
let service_collector = ServiceCollector::new(
|
||||
self.config.collectors.service.enabled,
|
||||
self.config.collectors.service.interval_ms,
|
||||
self.config.collectors.service.services.clone(),
|
||||
);
|
||||
self.scheduler.add_collector(Arc::new(service_collector));
|
||||
info!("Service collector initialized");
|
||||
}
|
||||
|
||||
// Create Backup collector
|
||||
if self.config.collectors.backup.enabled {
|
||||
let backup_collector = BackupCollector::new(
|
||||
self.config.collectors.backup.enabled,
|
||||
self.config.collectors.backup.interval_ms,
|
||||
self.config.collectors.backup.restic_repo.clone(),
|
||||
self.config.collectors.backup.backup_service.clone(),
|
||||
);
|
||||
self.scheduler.add_collector(Arc::new(backup_collector));
|
||||
info!("Backup collector initialized");
|
||||
}
|
||||
|
||||
let enabled_count = self.config.get_enabled_collector_count();
|
||||
if enabled_count == 0 {
|
||||
return Err("No collectors are enabled".into());
|
||||
}
|
||||
|
||||
info!("Initialized {} collectors", enabled_count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!(
|
||||
"Starting metrics agent for host '{}'",
|
||||
self.config.agent.hostname
|
||||
);
|
||||
|
||||
// Initialize health checker
|
||||
let stats = self.scheduler.get_stats_handle();
|
||||
self.health_checker = Some(HealthChecker::new(stats));
|
||||
|
||||
// Forward successful collection results to the publisher
|
||||
let (metrics_tx, metrics_rx) = mpsc::unbounded_channel();
|
||||
self.scheduler.set_metrics_sender(metrics_tx);
|
||||
let publisher_task = self.start_publisher_task(metrics_rx)?;
|
||||
|
||||
// Start health monitoring task
|
||||
let health_task = self.start_health_monitoring_task().await?;
|
||||
|
||||
// Start the collector scheduler (this will block)
|
||||
let scheduler_result = self.scheduler.start().await;
|
||||
|
||||
// Drop the metrics sender so the publisher can exit cleanly
|
||||
self.scheduler.clear_metrics_sender();
|
||||
|
||||
// Wait for background tasks to complete
|
||||
if let Err(join_error) = health_task.await {
|
||||
warn!("Health monitoring task ended unexpectedly: {}", join_error);
|
||||
}
|
||||
|
||||
if let Err(join_error) = publisher_task.await {
|
||||
warn!("Publisher task ended unexpectedly: {}", join_error);
|
||||
}
|
||||
|
||||
match scheduler_result {
|
||||
Ok(_) => {
|
||||
info!("Agent shutdown completed successfully");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Agent encountered an error: {}", e);
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_publisher_task(
|
||||
&self,
|
||||
mut metrics_rx: mpsc::UnboundedReceiver<CollectorOutput>,
|
||||
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
|
||||
let bind_address = format!(
|
||||
"tcp://{}:{}",
|
||||
self.config.zmq.bind_address, self.config.zmq.port
|
||||
);
|
||||
let send_timeout = self.config.zmq.send_timeout_ms as i32;
|
||||
let hostname = self.config.agent.hostname.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let context = Context::new();
|
||||
|
||||
let socket = match context.socket(SocketType::PUB) {
|
||||
Ok(socket) => socket,
|
||||
Err(error) => {
|
||||
error!("Failed to create ZMQ PUB socket: {}", error);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(error) = socket.set_sndtimeo(send_timeout) {
|
||||
warn!("Failed to apply ZMQ send timeout: {}", error);
|
||||
}
|
||||
|
||||
if let Err(error) = socket.bind(&bind_address) {
|
||||
error!(
|
||||
"Failed to bind ZMQ publisher to {}: {}",
|
||||
bind_address, error
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
info!("ZMQ publisher bound to {}", bind_address);
|
||||
|
||||
while let Some(output) = metrics_rx.recv().await {
|
||||
let CollectorOutput {
|
||||
agent_type,
|
||||
data,
|
||||
timestamp,
|
||||
} = output;
|
||||
|
||||
let envelope_agent_type = match agent_type {
|
||||
AgentType::Smart => cm_dashboard_shared::envelope::AgentType::Smart,
|
||||
AgentType::Service => cm_dashboard_shared::envelope::AgentType::Service,
|
||||
AgentType::Backup => cm_dashboard_shared::envelope::AgentType::Backup,
|
||||
};
|
||||
|
||||
let epoch = timestamp.timestamp();
|
||||
let epoch_u64 = if epoch < 0 { 0 } else { epoch as u64 };
|
||||
|
||||
let envelope = MessageEnvelope {
|
||||
hostname: hostname.clone(),
|
||||
agent_type: envelope_agent_type.clone(),
|
||||
timestamp: epoch_u64,
|
||||
metrics: data,
|
||||
};
|
||||
|
||||
match serde_json::to_vec(&envelope) {
|
||||
Ok(serialized) => {
|
||||
if let Err(error) = socket.send(serialized, 0) {
|
||||
warn!(
|
||||
"Failed to publish {:?} metrics: {}",
|
||||
envelope.agent_type, error
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Published {:?} metrics for host {}",
|
||||
envelope.agent_type, envelope.hostname
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
warn!("Failed to serialize metrics envelope: {}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Metrics publisher task shutting down");
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
async fn start_health_monitoring_task(
|
||||
&self,
|
||||
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error>> {
|
||||
let health_checker = self.health_checker.as_ref().unwrap().clone();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
info!("Starting health monitoring task");
|
||||
let mut health_interval = interval(Duration::from_secs(60)); // Check every minute
|
||||
|
||||
loop {
|
||||
health_interval.tick().await;
|
||||
|
||||
match health_checker.check_health().await {
|
||||
HealthStatus::Healthy => {
|
||||
debug!("All collectors are healthy");
|
||||
}
|
||||
HealthStatus::Degraded {
|
||||
degraded_collectors,
|
||||
} => {
|
||||
warn!("Degraded collectors: {:?}", degraded_collectors);
|
||||
}
|
||||
HealthStatus::Unhealthy {
|
||||
unhealthy_collectors,
|
||||
degraded_collectors,
|
||||
} => {
|
||||
error!(
|
||||
"Unhealthy collectors: {:?}, Degraded: {:?}",
|
||||
unhealthy_collectors, degraded_collectors
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
info!("Initiating graceful shutdown...");
|
||||
self.scheduler.shutdown().await;
|
||||
|
||||
// ZMQ socket will be dropped automatically
|
||||
|
||||
info!("Agent shutdown completed");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MetricsAgent {
|
||||
fn drop(&mut self) {
|
||||
// ZMQ socket will be dropped automatically
|
||||
}
|
||||
}
|
||||
388
agent/src/collectors/backup.rs
Normal file
388
agent/src/collectors/backup.rs
Normal file
@@ -0,0 +1,388 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BackupCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub restic_repo: Option<String>,
|
||||
pub backup_service: String,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl BackupCollector {
|
||||
pub fn new(
|
||||
enabled: bool,
|
||||
interval_ms: u64,
|
||||
restic_repo: Option<String>,
|
||||
backup_service: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
restic_repo,
|
||||
backup_service,
|
||||
timeout_ms: 30000, // 30 second timeout for backup operations
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_restic_snapshots(&self) -> Result<ResticStats, CollectorError> {
|
||||
let repo = self
|
||||
.restic_repo
|
||||
.as_ref()
|
||||
.ok_or_else(|| CollectorError::ConfigError {
|
||||
message: "No restic repository configured".to_string(),
|
||||
})?;
|
||||
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
// Get restic snapshots
|
||||
let output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("restic")
|
||||
.args(["-r", repo, "snapshots", "--json"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} snapshots --json", repo),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} snapshots --json", repo),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let snapshots: Vec<ResticSnapshot> =
|
||||
serde_json::from_str(&stdout).map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse restic snapshots: {}", e),
|
||||
})?;
|
||||
|
||||
// Get repository stats
|
||||
let stats_output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("restic")
|
||||
.args(["-r", repo, "stats", "--json"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} stats --json", repo),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let repo_size_gb = if stats_output.status.success() {
|
||||
let stats_stdout = String::from_utf8_lossy(&stats_output.stdout);
|
||||
let stats: Result<ResticStats, _> = serde_json::from_str(&stats_stdout);
|
||||
stats
|
||||
.ok()
|
||||
.map(|s| s.total_size as f32 / (1024.0 * 1024.0 * 1024.0))
|
||||
.unwrap_or(0.0)
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Find most recent snapshot
|
||||
let last_success = snapshots.iter().map(|s| s.time).max();
|
||||
|
||||
Ok(ResticStats {
|
||||
total_size: (repo_size_gb * 1024.0 * 1024.0 * 1024.0) as u64,
|
||||
snapshot_count: snapshots.len() as u32,
|
||||
last_success,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_backup_service_status(&self) -> Result<BackupServiceData, CollectorError> {
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
// Get systemctl status for backup service
|
||||
let status_output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("systemctl")
|
||||
.args([
|
||||
"show",
|
||||
&self.backup_service,
|
||||
"--property=ActiveState,SubState,MainPID",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("systemctl show {}", self.backup_service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let enabled = if status_output.status.success() {
|
||||
let status_stdout = String::from_utf8_lossy(&status_output.stdout);
|
||||
status_stdout.contains("ActiveState=active")
|
||||
|| status_stdout.contains("SubState=running")
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// Check for backup timer or service logs for last message
|
||||
let last_message = self.get_last_backup_log_message().await.ok();
|
||||
|
||||
// Check for pending backup jobs (simplified - could check systemd timers)
|
||||
let pending_jobs = 0; // TODO: Implement proper pending job detection
|
||||
|
||||
Ok(BackupServiceData {
|
||||
enabled,
|
||||
pending_jobs,
|
||||
last_message,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_last_backup_log_message(&self) -> Result<String, CollectorError> {
|
||||
let output = Command::new("journalctl")
|
||||
.args([
|
||||
"-u",
|
||||
&self.backup_service,
|
||||
"--lines=1",
|
||||
"--no-pager",
|
||||
"--output=cat",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("journalctl -u {} --lines=1", self.backup_service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if output.status.success() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let message = stdout.trim().to_string();
|
||||
if !message.is_empty() {
|
||||
return Ok(message);
|
||||
}
|
||||
}
|
||||
|
||||
Err(CollectorError::ParseError {
|
||||
message: "No log messages found".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_backup_logs_for_failures(&self) -> Result<Option<DateTime<Utc>>, CollectorError> {
|
||||
let output = Command::new("journalctl")
|
||||
.args([
|
||||
"-u",
|
||||
&self.backup_service,
|
||||
"--since",
|
||||
"1 week ago",
|
||||
"--grep=failed\\|error\\|ERROR",
|
||||
"--output=json",
|
||||
"--lines=1",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!(
|
||||
"journalctl -u {} --since='1 week ago' --grep=failed",
|
||||
self.backup_service
|
||||
),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if output.status.success() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
if let Ok(log_entry) = serde_json::from_str::<JournalEntry>(&stdout) {
|
||||
if let Ok(timestamp) = log_entry.realtime_timestamp.parse::<i64>() {
|
||||
let dt =
|
||||
DateTime::from_timestamp_micros(timestamp).unwrap_or_else(|| Utc::now());
|
||||
return Ok(Some(dt));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn determine_backup_status(
|
||||
&self,
|
||||
restic_stats: &Result<ResticStats, CollectorError>,
|
||||
service_data: &BackupServiceData,
|
||||
last_failure: Option<DateTime<Utc>>,
|
||||
) -> BackupStatus {
|
||||
match restic_stats {
|
||||
Ok(stats) => {
|
||||
if let Some(last_success) = stats.last_success {
|
||||
let hours_since_backup =
|
||||
Utc::now().signed_duration_since(last_success).num_hours();
|
||||
|
||||
if hours_since_backup > 48 {
|
||||
BackupStatus::Warning // More than 2 days since last backup
|
||||
} else if let Some(failure) = last_failure {
|
||||
if failure > last_success {
|
||||
BackupStatus::Failed // Failure after last success
|
||||
} else {
|
||||
BackupStatus::Healthy
|
||||
}
|
||||
} else {
|
||||
BackupStatus::Healthy
|
||||
}
|
||||
} else {
|
||||
BackupStatus::Warning // No successful backups found
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if service_data.enabled {
|
||||
BackupStatus::Failed // Service enabled but can't access repo
|
||||
} else {
|
||||
BackupStatus::Unknown // Service disabled
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Collector for BackupCollector {
|
||||
fn name(&self) -> &str {
|
||||
"backup"
|
||||
}
|
||||
|
||||
fn agent_type(&self) -> AgentType {
|
||||
AgentType::Backup
|
||||
}
|
||||
|
||||
fn collect_interval(&self) -> Duration {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
false // Depends on restic repo permissions
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
// Get restic repository stats
|
||||
let restic_stats = self.get_restic_snapshots().await;
|
||||
|
||||
// Get backup service status
|
||||
let service_data = self
|
||||
.get_backup_service_status()
|
||||
.await
|
||||
.unwrap_or(BackupServiceData {
|
||||
enabled: false,
|
||||
pending_jobs: 0,
|
||||
last_message: None,
|
||||
});
|
||||
|
||||
// Check for recent failures
|
||||
let last_failure = self.get_backup_logs_for_failures().await.unwrap_or(None);
|
||||
|
||||
// Determine overall backup status
|
||||
let overall_status =
|
||||
self.determine_backup_status(&restic_stats, &service_data, last_failure);
|
||||
|
||||
let (backup_info, _size_gb) = match &restic_stats {
|
||||
Ok(stats) => (
|
||||
BackupInfo {
|
||||
last_success: stats.last_success,
|
||||
last_failure,
|
||||
size_gb: stats.total_size as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
snapshot_count: stats.snapshot_count,
|
||||
},
|
||||
stats.total_size as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
),
|
||||
Err(_) => (
|
||||
BackupInfo {
|
||||
last_success: None,
|
||||
last_failure,
|
||||
size_gb: 0.0,
|
||||
snapshot_count: 0,
|
||||
},
|
||||
0.0,
|
||||
),
|
||||
};
|
||||
|
||||
let backup_metrics = json!({
|
||||
"overall_status": overall_status,
|
||||
"backup": backup_info,
|
||||
"service": service_data,
|
||||
"timestamp": Utc::now()
|
||||
});
|
||||
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Backup,
|
||||
data: backup_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResticSnapshot {
|
||||
time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResticStats {
|
||||
total_size: u64,
|
||||
snapshot_count: u32,
|
||||
last_success: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct BackupServiceData {
|
||||
enabled: bool,
|
||||
pending_jobs: u32,
|
||||
last_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct BackupInfo {
|
||||
last_success: Option<DateTime<Utc>>,
|
||||
last_failure: Option<DateTime<Utc>>,
|
||||
size_gb: f32,
|
||||
snapshot_count: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
enum BackupStatus {
|
||||
Healthy,
|
||||
Warning,
|
||||
Failed,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct JournalEntry {
|
||||
#[serde(rename = "__REALTIME_TIMESTAMP")]
|
||||
realtime_timestamp: String,
|
||||
}
|
||||
53
agent/src/collectors/error.rs
Normal file
53
agent/src/collectors/error.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CollectorError {
|
||||
#[error("Command execution failed: {command} - {message}")]
|
||||
CommandFailed { command: String, message: String },
|
||||
|
||||
#[error("Permission denied: {message}")]
|
||||
PermissionDenied { message: String },
|
||||
|
||||
#[error("Data parsing error: {message}")]
|
||||
ParseError { message: String },
|
||||
|
||||
#[error("Timeout after {duration_ms}ms")]
|
||||
Timeout { duration_ms: u64 },
|
||||
|
||||
#[error("IO error: {message}")]
|
||||
IoError { message: String },
|
||||
|
||||
#[error("Configuration error: {message}")]
|
||||
ConfigError { message: String },
|
||||
|
||||
#[error("Service not found: {service}")]
|
||||
ServiceNotFound { service: String },
|
||||
|
||||
#[error("Device not found: {device}")]
|
||||
DeviceNotFound { device: String },
|
||||
|
||||
#[error("External dependency error: {dependency} - {message}")]
|
||||
ExternalDependency { dependency: String, message: String },
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for CollectorError {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
CollectorError::IoError {
|
||||
message: err.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for CollectorError {
|
||||
fn from(err: serde_json::Error) -> Self {
|
||||
CollectorError::ParseError {
|
||||
message: err.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::time::error::Elapsed> for CollectorError {
|
||||
fn from(_: tokio::time::error::Elapsed) -> Self {
|
||||
CollectorError::Timeout { duration_ms: 0 }
|
||||
}
|
||||
}
|
||||
49
agent/src/collectors/mod.rs
Normal file
49
agent/src/collectors/mod.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
|
||||
pub mod backup;
|
||||
pub mod error;
|
||||
pub mod service;
|
||||
pub mod smart;
|
||||
|
||||
pub use error::CollectorError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AgentType {
|
||||
Smart,
|
||||
Service,
|
||||
Backup,
|
||||
}
|
||||
|
||||
impl AgentType {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
AgentType::Smart => "smart",
|
||||
AgentType::Service => "service",
|
||||
AgentType::Backup => "backup",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CollectorOutput {
|
||||
pub agent_type: AgentType,
|
||||
pub data: Value,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Collector: Send + Sync {
|
||||
fn name(&self) -> &str;
|
||||
fn agent_type(&self) -> AgentType;
|
||||
fn collect_interval(&self) -> Duration;
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError>;
|
||||
fn is_enabled(&self) -> bool {
|
||||
true
|
||||
}
|
||||
fn requires_root(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
603
agent/src/collectors/service.rs
Normal file
603
agent/src/collectors/service.rs
Normal file
@@ -0,0 +1,603 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServiceCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub services: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl ServiceCollector {
|
||||
pub fn new(enabled: bool, interval_ms: u64, services: Vec<String>) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
services,
|
||||
timeout_ms: 10000, // 10 second timeout for service checks
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_service_status(&self, service: &str) -> Result<ServiceData, CollectorError> {
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
// Get systemctl status
|
||||
let status_output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("systemctl")
|
||||
.args(["show", service, "--property=ActiveState,SubState,MainPID"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("systemctl show {}", service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !status_output.status.success() {
|
||||
return Err(CollectorError::ServiceNotFound {
|
||||
service: service.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let status_stdout = String::from_utf8_lossy(&status_output.stdout);
|
||||
let mut active_state = None;
|
||||
let mut sub_state = None;
|
||||
let mut main_pid = None;
|
||||
|
||||
for line in status_stdout.lines() {
|
||||
if let Some(value) = line.strip_prefix("ActiveState=") {
|
||||
active_state = Some(value.to_string());
|
||||
} else if let Some(value) = line.strip_prefix("SubState=") {
|
||||
sub_state = Some(value.to_string());
|
||||
} else if let Some(value) = line.strip_prefix("MainPID=") {
|
||||
main_pid = value.parse::<u32>().ok();
|
||||
}
|
||||
}
|
||||
|
||||
let status = self.determine_service_status(&active_state, &sub_state);
|
||||
|
||||
// Get resource usage if service is running
|
||||
let (memory_used_mb, cpu_percent) = if let Some(pid) = main_pid {
|
||||
self.get_process_resources(pid).await.unwrap_or((0.0, 0.0))
|
||||
} else {
|
||||
(0.0, 0.0)
|
||||
};
|
||||
|
||||
// Get memory quota from systemd if available
|
||||
let memory_quota_mb = self.get_service_memory_limit(service).await.unwrap_or(0.0);
|
||||
|
||||
// Get disk usage for this service
|
||||
let disk_used_gb = self.get_service_disk_usage(service).await.unwrap_or(0.0);
|
||||
|
||||
Ok(ServiceData {
|
||||
name: service.to_string(),
|
||||
status,
|
||||
memory_used_mb,
|
||||
memory_quota_mb,
|
||||
cpu_percent,
|
||||
sandbox_limit: None, // TODO: Implement sandbox limit detection
|
||||
disk_used_gb,
|
||||
})
|
||||
}
|
||||
|
||||
fn determine_service_status(
|
||||
&self,
|
||||
active_state: &Option<String>,
|
||||
sub_state: &Option<String>,
|
||||
) -> ServiceStatus {
|
||||
match (active_state.as_deref(), sub_state.as_deref()) {
|
||||
(Some("active"), Some("running")) => ServiceStatus::Running,
|
||||
(Some("active"), Some("exited")) => ServiceStatus::Running, // One-shot services
|
||||
(Some("reloading"), _) | (Some("activating"), _) => ServiceStatus::Restarting,
|
||||
(Some("failed"), _) | (Some("inactive"), Some("failed")) => ServiceStatus::Stopped,
|
||||
(Some("inactive"), _) => ServiceStatus::Stopped,
|
||||
_ => ServiceStatus::Degraded,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_process_resources(&self, pid: u32) -> Result<(f32, f32), CollectorError> {
|
||||
// Read /proc/{pid}/stat for CPU and memory info
|
||||
let stat_path = format!("/proc/{}/stat", pid);
|
||||
let stat_content =
|
||||
fs::read_to_string(&stat_path)
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let stat_fields: Vec<&str> = stat_content.split_whitespace().collect();
|
||||
if stat_fields.len() < 24 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: format!("Invalid /proc/{}/stat format", pid),
|
||||
});
|
||||
}
|
||||
|
||||
// Field 23 is RSS (Resident Set Size) in pages
|
||||
let rss_pages: u64 = stat_fields[23]
|
||||
.parse()
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse RSS from /proc/{}/stat: {}", pid, e),
|
||||
})?;
|
||||
|
||||
// Convert pages to MB (assuming 4KB pages)
|
||||
let memory_mb = (rss_pages * 4) as f32 / 1024.0;
|
||||
|
||||
// For CPU, we'd need to track over time - simplified to 0 for now
|
||||
// TODO: Implement proper CPU percentage calculation
|
||||
let cpu_percent = 0.0;
|
||||
|
||||
Ok((memory_mb, cpu_percent))
|
||||
}
|
||||
|
||||
async fn get_service_disk_usage(&self, service: &str) -> Result<f32, CollectorError> {
|
||||
// For systemd services, check if they have private /var directories or specific data paths
|
||||
// This is a simplified implementation - could be enhanced to check actual service-specific paths
|
||||
|
||||
// Common service data directories to check
|
||||
let potential_paths = vec![
|
||||
format!("/var/lib/{}", service),
|
||||
format!("/var/cache/{}", service),
|
||||
format!("/var/log/{}", service),
|
||||
format!("/opt/{}", service),
|
||||
format!("/srv/{}", service),
|
||||
];
|
||||
|
||||
let mut total_usage = 0.0;
|
||||
|
||||
for path in potential_paths {
|
||||
if let Ok(usage) = self.get_directory_size(&path).await {
|
||||
total_usage += usage;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(total_usage)
|
||||
}
|
||||
|
||||
async fn get_directory_size(&self, path: &str) -> Result<f32, CollectorError> {
|
||||
let output = Command::new("du")
|
||||
.args(["-s", "-k", path]) // Use kilobytes instead of forcing GB
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("du -s -k {}", path),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
// Directory doesn't exist or permission denied - return 0
|
||||
return Ok(0.0);
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
if let Some(line) = stdout.lines().next() {
|
||||
if let Some(size_str) = line.split_whitespace().next() {
|
||||
let size_kb = size_str.parse::<f32>().unwrap_or(0.0);
|
||||
let size_gb = size_kb / (1024.0 * 1024.0); // Convert KB to GB
|
||||
return Ok(size_gb);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(0.0)
|
||||
}
|
||||
|
||||
async fn get_service_memory_limit(&self, service: &str) -> Result<f32, CollectorError> {
|
||||
let output = Command::new("systemctl")
|
||||
.args(["show", service, "--property=MemoryMax"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("systemctl show {} --property=MemoryMax", service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
for line in stdout.lines() {
|
||||
if let Some(value) = line.strip_prefix("MemoryMax=") {
|
||||
if value == "infinity" {
|
||||
return Ok(0.0); // No limit
|
||||
}
|
||||
if let Ok(bytes) = value.parse::<u64>() {
|
||||
return Ok(bytes as f32 / (1024.0 * 1024.0)); // Convert to MB
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(0.0) // No limit or couldn't parse
|
||||
}
|
||||
|
||||
async fn get_system_memory_info(&self) -> Result<SystemMemoryInfo, CollectorError> {
|
||||
let meminfo =
|
||||
fs::read_to_string("/proc/meminfo")
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let mut memory_info = HashMap::new();
|
||||
for line in meminfo.lines() {
|
||||
if let Some((key, value)) = line.split_once(':') {
|
||||
let value = value.trim().trim_end_matches(" kB");
|
||||
if let Ok(kb) = value.parse::<u64>() {
|
||||
memory_info.insert(key.to_string(), kb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let total_kb = memory_info.get("MemTotal").copied().unwrap_or(0);
|
||||
let available_kb = memory_info.get("MemAvailable").copied().unwrap_or(0);
|
||||
let used_kb = total_kb.saturating_sub(available_kb);
|
||||
|
||||
Ok(SystemMemoryInfo {
|
||||
total_mb: total_kb as f32 / 1024.0,
|
||||
used_mb: used_kb as f32 / 1024.0,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_disk_usage(&self) -> Result<DiskUsage, CollectorError> {
|
||||
let output = Command::new("df")
|
||||
.args(["-BG", "--output=size,used,avail", "/"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: "df -BG --output=size,used,avail /".to_string(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: "df -BG --output=size,used,avail /".to_string(),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let lines: Vec<&str> = stdout.lines().collect();
|
||||
|
||||
if lines.len() < 2 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: "Unexpected df output format".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let data_line = lines[1].trim();
|
||||
let parts: Vec<&str> = data_line.split_whitespace().collect();
|
||||
if parts.len() < 3 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: format!("Unexpected df data format: {}", data_line),
|
||||
});
|
||||
}
|
||||
|
||||
let parse_size = |s: &str| -> Result<f32, CollectorError> {
|
||||
s.trim_end_matches('G')
|
||||
.parse::<f32>()
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse disk size '{}': {}", s, e),
|
||||
})
|
||||
};
|
||||
|
||||
Ok(DiskUsage {
|
||||
total_gb: parse_size(parts[0])?,
|
||||
used_gb: parse_size(parts[1])?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_cpu_load(&self) -> Result<(f32, f32, f32), CollectorError> {
|
||||
let loadavg =
|
||||
fs::read_to_string("/proc/loadavg")
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let parts: Vec<&str> = loadavg.split_whitespace().collect();
|
||||
if parts.len() < 3 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: "Unexpected /proc/loadavg format".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let parse = |s: &str| -> Result<f32, CollectorError> {
|
||||
s.parse::<f32>().map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse load average '{}': {}", s, e),
|
||||
})
|
||||
};
|
||||
|
||||
Ok((parse(parts[0])?, parse(parts[1])?, parse(parts[2])?))
|
||||
}
|
||||
|
||||
async fn get_cpu_frequency_mhz(&self) -> Option<f32> {
|
||||
let candidates = [
|
||||
"/sys/devices/system/cpu/cpufreq/policy0/scaling_cur_freq",
|
||||
"/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq",
|
||||
];
|
||||
|
||||
for path in candidates {
|
||||
if let Ok(content) = fs::read_to_string(path).await {
|
||||
if let Ok(khz) = content.trim().parse::<f32>() {
|
||||
if khz > 0.0 {
|
||||
return Some(khz / 1000.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(content) = fs::read_to_string("/proc/cpuinfo").await {
|
||||
for line in content.lines() {
|
||||
if let Some(rest) = line.strip_prefix("cpu MHz") {
|
||||
if let Some(value) = rest.split(':').nth(1) {
|
||||
if let Ok(mhz) = value.trim().parse::<f32>() {
|
||||
if mhz > 0.0 {
|
||||
return Some(mhz);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn get_cpu_temperature_c(&self) -> Option<f32> {
|
||||
let mut entries = fs::read_dir("/sys/class/thermal").await.ok()?;
|
||||
let mut fallback: Option<f32> = None;
|
||||
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
let path = entry.path();
|
||||
let type_path = path.join("type");
|
||||
let temp_path = path.join("temp");
|
||||
|
||||
let label = fs::read_to_string(&type_path).await.ok()?.to_lowercase();
|
||||
let raw = match fs::read_to_string(&temp_path).await {
|
||||
Ok(value) => value,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let milli: f32 = match raw.trim().parse() {
|
||||
Ok(value) => value,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let temp_c = milli / 1000.0;
|
||||
if label.contains("cpu") || label.contains("pkg") {
|
||||
if temp_c > 0.0 {
|
||||
return Some(temp_c);
|
||||
}
|
||||
}
|
||||
|
||||
if fallback.is_none() && temp_c > 0.0 {
|
||||
fallback = Some(temp_c);
|
||||
}
|
||||
}
|
||||
|
||||
fallback
|
||||
}
|
||||
|
||||
async fn get_gpu_metrics(&self) -> (Option<f32>, Option<f32>) {
|
||||
let output = Command::new("nvidia-smi")
|
||||
.args([
|
||||
"--query-gpu=utilization.gpu,temperature.gpu",
|
||||
"--format=csv,noheader,nounits",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(result) if result.status.success() => {
|
||||
let stdout = String::from_utf8_lossy(&result.stdout);
|
||||
if let Some(line) = stdout.lines().next() {
|
||||
let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
|
||||
if parts.len() >= 2 {
|
||||
let load = parts[0].parse::<f32>().ok();
|
||||
let temp = parts[1].parse::<f32>().ok();
|
||||
return (load, temp);
|
||||
}
|
||||
}
|
||||
(None, None)
|
||||
}
|
||||
Ok(_) | Err(_) => {
|
||||
let util_output = Command::new("/opt/vc/bin/vcgencmd")
|
||||
.arg("measure_temp")
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await;
|
||||
|
||||
if let Ok(result) = util_output {
|
||||
if result.status.success() {
|
||||
let stdout = String::from_utf8_lossy(&result.stdout);
|
||||
if let Some(value) = stdout
|
||||
.trim()
|
||||
.strip_prefix("temp=")
|
||||
.and_then(|s| s.strip_suffix("'C"))
|
||||
{
|
||||
if let Ok(temp_c) = value.parse::<f32>() {
|
||||
return (None, Some(temp_c));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(None, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Collector for ServiceCollector {
|
||||
fn name(&self) -> &str {
|
||||
"service"
|
||||
}
|
||||
|
||||
fn agent_type(&self) -> AgentType {
|
||||
AgentType::Service
|
||||
}
|
||||
|
||||
fn collect_interval(&self) -> Duration {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
false // Most systemctl commands work without root
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
let mut services = Vec::new();
|
||||
let mut healthy = 0;
|
||||
let mut degraded = 0;
|
||||
let mut failed = 0;
|
||||
let mut total_memory_used = 0.0;
|
||||
let mut total_memory_quota = 0.0;
|
||||
let mut total_disk_used = 0.0;
|
||||
|
||||
// Collect data from all configured services
|
||||
for service in &self.services {
|
||||
match self.get_service_status(service).await {
|
||||
Ok(service_data) => {
|
||||
match service_data.status {
|
||||
ServiceStatus::Running => healthy += 1,
|
||||
ServiceStatus::Degraded | ServiceStatus::Restarting => degraded += 1,
|
||||
ServiceStatus::Stopped => failed += 1,
|
||||
}
|
||||
|
||||
total_memory_used += service_data.memory_used_mb;
|
||||
if service_data.memory_quota_mb > 0.0 {
|
||||
total_memory_quota += service_data.memory_quota_mb;
|
||||
}
|
||||
total_disk_used += service_data.disk_used_gb;
|
||||
|
||||
services.push(service_data);
|
||||
}
|
||||
Err(e) => {
|
||||
failed += 1;
|
||||
// Add a placeholder service entry for failed collection
|
||||
services.push(ServiceData {
|
||||
name: service.clone(),
|
||||
status: ServiceStatus::Stopped,
|
||||
memory_used_mb: 0.0,
|
||||
memory_quota_mb: 0.0,
|
||||
cpu_percent: 0.0,
|
||||
sandbox_limit: None,
|
||||
disk_used_gb: 0.0,
|
||||
});
|
||||
tracing::warn!("Failed to collect metrics for service {}: {}", service, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get system memory info for quota calculation
|
||||
let system_memory = self
|
||||
.get_system_memory_info()
|
||||
.await
|
||||
.unwrap_or(SystemMemoryInfo {
|
||||
total_mb: 0.0,
|
||||
used_mb: 0.0,
|
||||
});
|
||||
|
||||
let _disk_usage = self.get_disk_usage().await.unwrap_or(DiskUsage {
|
||||
total_gb: 0.0,
|
||||
used_gb: 0.0,
|
||||
});
|
||||
|
||||
let (cpu_load_1, cpu_load_5, cpu_load_15) =
|
||||
self.get_cpu_load().await.unwrap_or((0.0, 0.0, 0.0));
|
||||
let cpu_freq_mhz = self.get_cpu_frequency_mhz().await;
|
||||
let cpu_temp_c = self.get_cpu_temperature_c().await;
|
||||
let (gpu_load_percent, gpu_temp_c) = self.get_gpu_metrics().await;
|
||||
|
||||
// If no specific quotas are set, use system memory as reference
|
||||
if total_memory_quota == 0.0 {
|
||||
total_memory_quota = system_memory.total_mb;
|
||||
}
|
||||
|
||||
let service_metrics = json!({
|
||||
"summary": {
|
||||
"healthy": healthy,
|
||||
"degraded": degraded,
|
||||
"failed": failed,
|
||||
"memory_used_mb": total_memory_used,
|
||||
"memory_quota_mb": total_memory_quota,
|
||||
"system_memory_used_mb": system_memory.used_mb,
|
||||
"system_memory_total_mb": system_memory.total_mb,
|
||||
"disk_used_gb": total_disk_used,
|
||||
"disk_total_gb": total_disk_used, // For services, total = used (no quota concept)
|
||||
"cpu_load_1": cpu_load_1,
|
||||
"cpu_load_5": cpu_load_5,
|
||||
"cpu_load_15": cpu_load_15,
|
||||
"cpu_freq_mhz": cpu_freq_mhz,
|
||||
"cpu_temp_c": cpu_temp_c,
|
||||
"gpu_load_percent": gpu_load_percent,
|
||||
"gpu_temp_c": gpu_temp_c,
|
||||
},
|
||||
"services": services,
|
||||
"timestamp": Utc::now()
|
||||
});
|
||||
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Service,
|
||||
data: service_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct ServiceData {
|
||||
name: String,
|
||||
status: ServiceStatus,
|
||||
memory_used_mb: f32,
|
||||
memory_quota_mb: f32,
|
||||
cpu_percent: f32,
|
||||
sandbox_limit: Option<f32>,
|
||||
disk_used_gb: f32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
enum ServiceStatus {
|
||||
Running,
|
||||
Degraded,
|
||||
Restarting,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
struct SystemMemoryInfo {
|
||||
total_mb: f32,
|
||||
used_mb: f32,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct DiskUsage {
|
||||
total_gb: f32,
|
||||
used_gb: f32,
|
||||
}
|
||||
447
agent/src/collectors/smart.rs
Normal file
447
agent/src/collectors/smart.rs
Normal file
@@ -0,0 +1,447 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::io::ErrorKind;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SmartCollector {
|
||||
pub enabled: bool,
|
||||
pub interval: Duration,
|
||||
pub devices: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl SmartCollector {
|
||||
pub fn new(enabled: bool, interval_ms: u64, devices: Vec<String>) -> Self {
|
||||
Self {
|
||||
enabled,
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
devices,
|
||||
timeout_ms: 30000, // 30 second timeout for smartctl
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_smart_data(&self, device: &str) -> Result<SmartDeviceData, CollectorError> {
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
let command_result = timeout(
|
||||
timeout_duration,
|
||||
Command::new("smartctl")
|
||||
.args(["-a", "-j", &format!("/dev/{}", device)])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?;
|
||||
|
||||
let output = command_result.map_err(|e| match e.kind() {
|
||||
ErrorKind::NotFound => CollectorError::ExternalDependency {
|
||||
dependency: "smartctl".to_string(),
|
||||
message: e.to_string(),
|
||||
},
|
||||
ErrorKind::PermissionDenied => CollectorError::PermissionDenied {
|
||||
message: e.to_string(),
|
||||
},
|
||||
_ => CollectorError::CommandFailed {
|
||||
command: format!("smartctl -a -j /dev/{}", device),
|
||||
message: e.to_string(),
|
||||
},
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
let stderr_lower = stderr.to_lowercase();
|
||||
|
||||
if stderr_lower.contains("permission denied") {
|
||||
return Err(CollectorError::PermissionDenied {
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
if stderr_lower.contains("no such device") || stderr_lower.contains("cannot open") {
|
||||
return Err(CollectorError::DeviceNotFound {
|
||||
device: device.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: format!("smartctl -a -j /dev/{}", device),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let smart_output: SmartCtlOutput =
|
||||
serde_json::from_str(&stdout).map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse smartctl output for {}: {}", device, e),
|
||||
})?;
|
||||
|
||||
Ok(SmartDeviceData::from_smartctl_output(device, smart_output))
|
||||
}
|
||||
|
||||
async fn get_drive_usage(&self, device: &str) -> Result<(Option<f32>, Option<f32>), CollectorError> {
|
||||
// Get capacity first
|
||||
let capacity = match self.get_drive_capacity(device).await {
|
||||
Ok(cap) => Some(cap),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
// Try to get usage information
|
||||
// For simplicity, we'll use the root filesystem usage for now
|
||||
// In the future, this could be enhanced to map drives to specific mount points
|
||||
let usage = if device.contains("nvme0n1") || device.contains("sda") {
|
||||
// This is likely the main system drive, use root filesystem usage
|
||||
match self.get_disk_usage().await {
|
||||
Ok(disk_usage) => Some(disk_usage.used_gb),
|
||||
Err(_) => None,
|
||||
}
|
||||
} else {
|
||||
// For other drives, we don't have usage info yet
|
||||
None
|
||||
};
|
||||
|
||||
Ok((capacity, usage))
|
||||
}
|
||||
|
||||
async fn get_drive_capacity(&self, device: &str) -> Result<f32, CollectorError> {
|
||||
let output = Command::new("lsblk")
|
||||
.args(["-J", "-o", "NAME,SIZE", &format!("/dev/{}", device)])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("lsblk -J -o NAME,SIZE /dev/{}", device),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: format!("lsblk -J -o NAME,SIZE /dev/{}", device),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let lsblk_output: serde_json::Value = serde_json::from_str(&stdout)
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse lsblk JSON: {}", e),
|
||||
})?;
|
||||
|
||||
// Extract size from the first blockdevice
|
||||
if let Some(blockdevices) = lsblk_output["blockdevices"].as_array() {
|
||||
if let Some(device_info) = blockdevices.first() {
|
||||
if let Some(size_str) = device_info["size"].as_str() {
|
||||
return self.parse_lsblk_size(size_str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(CollectorError::ParseError {
|
||||
message: format!("No size information found for device {}", device),
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_lsblk_size(&self, size_str: &str) -> Result<f32, CollectorError> {
|
||||
// Parse sizes like "953,9G", "1T", "512M"
|
||||
let size_str = size_str.replace(',', "."); // Handle European decimal separator
|
||||
|
||||
if let Some(pos) = size_str.find(|c: char| c.is_alphabetic()) {
|
||||
let (number_part, unit_part) = size_str.split_at(pos);
|
||||
let number: f32 = number_part.parse()
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse size number '{}': {}", number_part, e),
|
||||
})?;
|
||||
|
||||
let multiplier = match unit_part.to_uppercase().as_str() {
|
||||
"T" | "TB" => 1024.0,
|
||||
"G" | "GB" => 1.0,
|
||||
"M" | "MB" => 1.0 / 1024.0,
|
||||
"K" | "KB" => 1.0 / (1024.0 * 1024.0),
|
||||
_ => return Err(CollectorError::ParseError {
|
||||
message: format!("Unknown size unit: {}", unit_part),
|
||||
}),
|
||||
};
|
||||
|
||||
Ok(number * multiplier)
|
||||
} else {
|
||||
Err(CollectorError::ParseError {
|
||||
message: format!("Invalid size format: {}", size_str),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_disk_usage(&self) -> Result<DiskUsage, CollectorError> {
|
||||
let output = Command::new("df")
|
||||
.args(["-BG", "--output=size,used,avail", "/"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: "df -BG --output=size,used,avail /".to_string(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: "df -BG --output=size,used,avail /".to_string(),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let lines: Vec<&str> = stdout.lines().collect();
|
||||
|
||||
if lines.len() < 2 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: "Unexpected df output format".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Skip header line, parse data line
|
||||
let data_line = lines[1].trim();
|
||||
let parts: Vec<&str> = data_line.split_whitespace().collect();
|
||||
|
||||
if parts.len() < 3 {
|
||||
return Err(CollectorError::ParseError {
|
||||
message: format!("Unexpected df data format: {}", data_line),
|
||||
});
|
||||
}
|
||||
|
||||
let parse_size = |s: &str| -> Result<f32, CollectorError> {
|
||||
s.trim_end_matches('G')
|
||||
.parse::<f32>()
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse disk size '{}': {}", s, e),
|
||||
})
|
||||
};
|
||||
|
||||
Ok(DiskUsage {
|
||||
total_gb: parse_size(parts[0])?,
|
||||
used_gb: parse_size(parts[1])?,
|
||||
available_gb: parse_size(parts[2])?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Collector for SmartCollector {
|
||||
fn name(&self) -> &str {
|
||||
"smart"
|
||||
}
|
||||
|
||||
fn agent_type(&self) -> AgentType {
|
||||
AgentType::Smart
|
||||
}
|
||||
|
||||
fn collect_interval(&self) -> Duration {
|
||||
self.interval
|
||||
}
|
||||
|
||||
fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
fn requires_root(&self) -> bool {
|
||||
true // smartctl typically requires root access
|
||||
}
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
let mut drives = Vec::new();
|
||||
let mut issues = Vec::new();
|
||||
let mut healthy = 0;
|
||||
let mut warning = 0;
|
||||
let mut critical = 0;
|
||||
|
||||
// Collect data from all configured devices
|
||||
for device in &self.devices {
|
||||
match self.get_smart_data(device).await {
|
||||
Ok(mut drive_data) => {
|
||||
// Try to get capacity and usage for this drive
|
||||
if let Ok((capacity, usage)) = self.get_drive_usage(device).await {
|
||||
drive_data.capacity_gb = capacity;
|
||||
drive_data.used_gb = usage;
|
||||
}
|
||||
match drive_data.health_status.as_str() {
|
||||
"PASSED" => healthy += 1,
|
||||
"FAILED" => {
|
||||
critical += 1;
|
||||
issues.push(format!("{}: SMART status FAILED", device));
|
||||
}
|
||||
_ => {
|
||||
warning += 1;
|
||||
issues.push(format!("{}: Unknown SMART status", device));
|
||||
}
|
||||
}
|
||||
drives.push(drive_data);
|
||||
}
|
||||
Err(e) => {
|
||||
warning += 1;
|
||||
issues.push(format!("{}: {}", device, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get disk usage information
|
||||
let disk_usage = self.get_disk_usage().await?;
|
||||
|
||||
let status = if critical > 0 {
|
||||
"CRITICAL"
|
||||
} else if warning > 0 {
|
||||
"WARNING"
|
||||
} else {
|
||||
"HEALTHY"
|
||||
};
|
||||
|
||||
let smart_metrics = json!({
|
||||
"status": status,
|
||||
"drives": drives,
|
||||
"summary": {
|
||||
"healthy": healthy,
|
||||
"warning": warning,
|
||||
"critical": critical,
|
||||
"capacity_total_gb": disk_usage.total_gb,
|
||||
"capacity_used_gb": disk_usage.used_gb,
|
||||
"capacity_available_gb": disk_usage.available_gb
|
||||
},
|
||||
"issues": issues,
|
||||
"timestamp": Utc::now()
|
||||
});
|
||||
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Smart,
|
||||
data: smart_metrics,
|
||||
timestamp: Utc::now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct SmartDeviceData {
|
||||
name: String,
|
||||
temperature_c: f32,
|
||||
wear_level: f32,
|
||||
power_on_hours: u64,
|
||||
available_spare: f32,
|
||||
health_status: String,
|
||||
capacity_gb: Option<f32>,
|
||||
used_gb: Option<f32>,
|
||||
}
|
||||
|
||||
impl SmartDeviceData {
|
||||
fn from_smartctl_output(device: &str, output: SmartCtlOutput) -> Self {
|
||||
let temperature_c = output.temperature.and_then(|t| t.current).unwrap_or(0.0);
|
||||
|
||||
let wear_level = output
|
||||
.nvme_smart_health_information_log
|
||||
.as_ref()
|
||||
.and_then(|nvme| nvme.percentage_used)
|
||||
.unwrap_or(0.0);
|
||||
|
||||
let power_on_hours = output.power_on_time.and_then(|p| p.hours).unwrap_or(0);
|
||||
|
||||
let available_spare = output
|
||||
.nvme_smart_health_information_log
|
||||
.as_ref()
|
||||
.and_then(|nvme| nvme.available_spare)
|
||||
.unwrap_or(100.0);
|
||||
|
||||
let health_status = output
|
||||
.smart_status
|
||||
.and_then(|s| s.passed)
|
||||
.map(|passed| {
|
||||
if passed {
|
||||
"PASSED".to_string()
|
||||
} else {
|
||||
"FAILED".to_string()
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| "UNKNOWN".to_string());
|
||||
|
||||
Self {
|
||||
name: device.to_string(),
|
||||
temperature_c,
|
||||
wear_level,
|
||||
power_on_hours,
|
||||
available_spare,
|
||||
health_status,
|
||||
capacity_gb: None, // Will be set later by the collector
|
||||
used_gb: None, // Will be set later by the collector
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DiskUsage {
|
||||
total_gb: f32,
|
||||
used_gb: f32,
|
||||
available_gb: f32,
|
||||
}
|
||||
|
||||
// Minimal smartctl JSON output structure - only the fields we need
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SmartCtlOutput {
|
||||
temperature: Option<Temperature>,
|
||||
power_on_time: Option<PowerOnTime>,
|
||||
smart_status: Option<SmartStatus>,
|
||||
nvme_smart_health_information_log: Option<NvmeSmartLog>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Temperature {
|
||||
current: Option<f32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct PowerOnTime {
|
||||
hours: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SmartStatus {
|
||||
passed: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct NvmeSmartLog {
|
||||
percentage_used: Option<f32>,
|
||||
available_spare: Option<f32>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_lsblk_size() {
|
||||
let collector = SmartCollector::new(true, 5000, vec![]);
|
||||
|
||||
// Test gigabyte sizes
|
||||
assert!((collector.parse_lsblk_size("953,9G").unwrap() - 953.9).abs() < 0.1);
|
||||
assert!((collector.parse_lsblk_size("1G").unwrap() - 1.0).abs() < 0.1);
|
||||
|
||||
// Test terabyte sizes
|
||||
assert!((collector.parse_lsblk_size("1T").unwrap() - 1024.0).abs() < 0.1);
|
||||
assert!((collector.parse_lsblk_size("2,5T").unwrap() - 2560.0).abs() < 0.1);
|
||||
|
||||
// Test megabyte sizes
|
||||
assert!((collector.parse_lsblk_size("512M").unwrap() - 0.5).abs() < 0.1);
|
||||
|
||||
// Test error cases
|
||||
assert!(collector.parse_lsblk_size("invalid").is_err());
|
||||
assert!(collector.parse_lsblk_size("1X").is_err());
|
||||
}
|
||||
}
|
||||
315
agent/src/config.rs
Normal file
315
agent/src/config.rs
Normal file
@@ -0,0 +1,315 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use tracing::info;
|
||||
|
||||
use crate::collectors::CollectorError;
|
||||
use crate::discovery::AutoDiscovery;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentConfig {
|
||||
pub agent: AgentSettings,
|
||||
pub zmq: ZmqSettings,
|
||||
pub collectors: CollectorsConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentSettings {
|
||||
pub hostname: String,
|
||||
pub log_level: String,
|
||||
pub metrics_buffer_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct ZmqSettings {
|
||||
pub port: u16,
|
||||
pub bind_address: String,
|
||||
pub send_timeout_ms: u64,
|
||||
pub receive_timeout_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct CollectorsConfig {
|
||||
pub smart: SmartCollectorConfig,
|
||||
pub service: ServiceCollectorConfig,
|
||||
pub backup: BackupCollectorConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct SmartCollectorConfig {
|
||||
pub enabled: bool,
|
||||
pub interval_ms: u64,
|
||||
pub devices: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct ServiceCollectorConfig {
|
||||
pub enabled: bool,
|
||||
pub interval_ms: u64,
|
||||
pub services: Vec<String>,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct BackupCollectorConfig {
|
||||
pub enabled: bool,
|
||||
pub interval_ms: u64,
|
||||
pub restic_repo: Option<String>,
|
||||
pub backup_service: String,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl Default for AgentConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
agent: AgentSettings {
|
||||
hostname: gethostname::gethostname().to_string_lossy().to_string(),
|
||||
log_level: "info".to_string(),
|
||||
metrics_buffer_size: 1000,
|
||||
},
|
||||
zmq: ZmqSettings {
|
||||
port: 6130,
|
||||
bind_address: "0.0.0.0".to_string(),
|
||||
send_timeout_ms: 5000,
|
||||
receive_timeout_ms: 5000,
|
||||
},
|
||||
collectors: CollectorsConfig {
|
||||
smart: SmartCollectorConfig {
|
||||
enabled: true,
|
||||
interval_ms: 5000,
|
||||
devices: vec!["nvme0n1".to_string()],
|
||||
timeout_ms: 30000,
|
||||
},
|
||||
service: ServiceCollectorConfig {
|
||||
enabled: true,
|
||||
interval_ms: 2000,
|
||||
services: vec![
|
||||
"gitea".to_string(),
|
||||
"immich".to_string(),
|
||||
"vaultwarden".to_string(),
|
||||
"unifi".to_string(),
|
||||
],
|
||||
timeout_ms: 10000,
|
||||
},
|
||||
backup: BackupCollectorConfig {
|
||||
enabled: true,
|
||||
interval_ms: 30000,
|
||||
restic_repo: None,
|
||||
backup_service: "restic-backup".to_string(),
|
||||
timeout_ms: 30000,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
pub async fn load_from_file<P: AsRef<Path>>(path: P) -> Result<Self, CollectorError> {
|
||||
let content = fs::read_to_string(path)
|
||||
.await
|
||||
.map_err(|e| CollectorError::ConfigError {
|
||||
message: format!("Failed to read config file: {}", e),
|
||||
})?;
|
||||
|
||||
let config: AgentConfig =
|
||||
toml::from_str(&content).map_err(|e| CollectorError::ConfigError {
|
||||
message: format!("Failed to parse config file: {}", e),
|
||||
})?;
|
||||
|
||||
config.validate()?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub async fn save_to_file<P: AsRef<Path>>(&self, path: P) -> Result<(), CollectorError> {
|
||||
let content = toml::to_string_pretty(self).map_err(|e| CollectorError::ConfigError {
|
||||
message: format!("Failed to serialize config: {}", e),
|
||||
})?;
|
||||
|
||||
fs::write(path, content)
|
||||
.await
|
||||
.map_err(|e| CollectorError::ConfigError {
|
||||
message: format!("Failed to write config file: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), CollectorError> {
|
||||
// Validate ZMQ settings
|
||||
if self.zmq.port == 0 {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "ZMQ port cannot be 0".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate collector intervals
|
||||
if self.collectors.smart.enabled && self.collectors.smart.interval_ms < 1000 {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "SMART collector interval must be at least 1000ms".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
if self.collectors.service.enabled && self.collectors.service.interval_ms < 500 {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "Service collector interval must be at least 500ms".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
if self.collectors.backup.enabled && self.collectors.backup.interval_ms < 5000 {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "Backup collector interval must be at least 5000ms".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate smart devices
|
||||
if self.collectors.smart.enabled && self.collectors.smart.devices.is_empty() {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "SMART collector requires at least one device".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate services
|
||||
if self.collectors.service.enabled && self.collectors.service.services.is_empty() {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "Service collector requires at least one service".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Validate backup configuration
|
||||
if self.collectors.backup.enabled {
|
||||
if self.collectors.backup.restic_repo.is_none() {
|
||||
tracing::warn!("Backup collector enabled but no restic repository configured");
|
||||
}
|
||||
if self.collectors.backup.backup_service.is_empty() {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "Backup collector requires a backup service name".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_enabled_collector_count(&self) -> usize {
|
||||
let mut count = 0;
|
||||
if self.collectors.smart.enabled {
|
||||
count += 1;
|
||||
}
|
||||
if self.collectors.service.enabled {
|
||||
count += 1;
|
||||
}
|
||||
if self.collectors.backup.enabled {
|
||||
count += 1;
|
||||
}
|
||||
count
|
||||
}
|
||||
|
||||
pub async fn auto_configure(&mut self) -> Result<(), CollectorError> {
|
||||
let hostname = &self.agent.hostname.clone();
|
||||
info!("Auto-configuring agent for host: {}", hostname);
|
||||
|
||||
// Auto-detect storage devices
|
||||
let devices = AutoDiscovery::discover_storage_devices().await;
|
||||
let valid_devices = AutoDiscovery::validate_devices(&devices).await;
|
||||
|
||||
if !valid_devices.is_empty() {
|
||||
self.collectors.smart.devices = valid_devices;
|
||||
info!(
|
||||
"Auto-detected storage devices: {:?}",
|
||||
self.collectors.smart.devices
|
||||
);
|
||||
} else {
|
||||
info!("No accessible storage devices found, disabling SMART collector");
|
||||
self.collectors.smart.enabled = false;
|
||||
}
|
||||
|
||||
// Auto-detect services
|
||||
let services = AutoDiscovery::discover_services().await;
|
||||
if !services.is_empty() {
|
||||
self.collectors.service.services = services;
|
||||
info!(
|
||||
"Auto-detected services: {:?}",
|
||||
self.collectors.service.services
|
||||
);
|
||||
} else {
|
||||
info!("No monitorable services found, using minimal service list");
|
||||
self.collectors.service.services = vec!["ssh".to_string()];
|
||||
}
|
||||
|
||||
// Auto-detect backup configuration
|
||||
let (backup_enabled, restic_repo, backup_service) =
|
||||
AutoDiscovery::discover_backup_config(hostname).await;
|
||||
|
||||
self.collectors.backup.enabled = backup_enabled;
|
||||
self.collectors.backup.restic_repo = restic_repo;
|
||||
self.collectors.backup.backup_service = backup_service;
|
||||
|
||||
if backup_enabled {
|
||||
info!(
|
||||
"Auto-configured backup monitoring: repo={:?}, service={}",
|
||||
self.collectors.backup.restic_repo, self.collectors.backup.backup_service
|
||||
);
|
||||
} else {
|
||||
info!("Backup monitoring disabled for this host");
|
||||
}
|
||||
|
||||
// Apply host-specific timing optimizations
|
||||
self.apply_host_timing_overrides(hostname);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_host_timing_overrides(&mut self, hostname: &str) {
|
||||
match hostname {
|
||||
"srv01" => {
|
||||
// Server host - more frequent monitoring
|
||||
self.collectors.service.interval_ms = 1000;
|
||||
self.collectors.smart.interval_ms = 5000;
|
||||
}
|
||||
"cmbox" | "labbox" | "simonbox" | "steambox" => {
|
||||
// Workstation hosts - less frequent monitoring
|
||||
self.collectors.smart.interval_ms = 10000;
|
||||
self.collectors.service.interval_ms = 5000;
|
||||
}
|
||||
_ => {
|
||||
// Unknown host - conservative defaults
|
||||
self.collectors.smart.interval_ms = 10000;
|
||||
self.collectors.service.interval_ms = 5000;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Applied timing overrides for {}: smart={}ms, service={}ms",
|
||||
hostname, self.collectors.smart.interval_ms, self.collectors.service.interval_ms
|
||||
);
|
||||
}
|
||||
|
||||
pub fn summary(&self) -> String {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
if self.collectors.smart.enabled {
|
||||
parts.push(format!(
|
||||
"SMART({} devices)",
|
||||
self.collectors.smart.devices.len()
|
||||
));
|
||||
}
|
||||
|
||||
if self.collectors.service.enabled {
|
||||
parts.push(format!(
|
||||
"Services({} monitored)",
|
||||
self.collectors.service.services.len()
|
||||
));
|
||||
}
|
||||
|
||||
if self.collectors.backup.enabled {
|
||||
parts.push("Backup".to_string());
|
||||
}
|
||||
|
||||
if parts.is_empty() {
|
||||
"No collectors enabled".to_string()
|
||||
} else {
|
||||
parts.join(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
449
agent/src/discovery.rs
Normal file
449
agent/src/discovery.rs
Normal file
@@ -0,0 +1,449 @@
|
||||
use std::collections::HashSet;
|
||||
use std::process::Stdio;
|
||||
use tokio::fs;
|
||||
use tokio::process::Command;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::collectors::CollectorError;
|
||||
|
||||
pub struct AutoDiscovery;
|
||||
|
||||
impl AutoDiscovery {
|
||||
/// Auto-detect storage devices suitable for SMART monitoring
|
||||
pub async fn discover_storage_devices() -> Vec<String> {
|
||||
let mut devices = Vec::new();
|
||||
|
||||
// Method 1: Try lsblk to find block devices
|
||||
if let Ok(lsblk_devices) = Self::discover_via_lsblk().await {
|
||||
devices.extend(lsblk_devices);
|
||||
}
|
||||
|
||||
// Method 2: Scan /dev for common device patterns
|
||||
if devices.is_empty() {
|
||||
if let Ok(dev_devices) = Self::discover_via_dev_scan().await {
|
||||
devices.extend(dev_devices);
|
||||
}
|
||||
}
|
||||
|
||||
// Method 3: Fallback to common device names
|
||||
if devices.is_empty() {
|
||||
devices = Self::fallback_device_names();
|
||||
}
|
||||
|
||||
// Remove duplicates and sort
|
||||
let mut unique_devices: Vec<String> = devices
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect();
|
||||
unique_devices.sort();
|
||||
|
||||
debug!("Auto-detected storage devices: {:?}", unique_devices);
|
||||
unique_devices
|
||||
}
|
||||
|
||||
async fn discover_via_lsblk() -> Result<Vec<String>, CollectorError> {
|
||||
let output = Command::new("lsblk")
|
||||
.args(["-d", "-o", "NAME,TYPE", "-n", "-r"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: "lsblk".to_string(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: "lsblk".to_string(),
|
||||
message: String::from_utf8_lossy(&output.stderr).to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let mut devices = Vec::new();
|
||||
|
||||
for line in stdout.lines() {
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if parts.len() >= 2 {
|
||||
let device_name = parts[0];
|
||||
let device_type = parts[1];
|
||||
|
||||
// Include disk type devices and filter out unwanted ones
|
||||
if device_type == "disk" && Self::is_suitable_device(device_name) {
|
||||
devices.push(device_name.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(devices)
|
||||
}
|
||||
|
||||
async fn discover_via_dev_scan() -> Result<Vec<String>, CollectorError> {
|
||||
let mut devices = Vec::new();
|
||||
|
||||
// Read /dev directory
|
||||
let mut dev_entries = fs::read_dir("/dev")
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
while let Some(entry) =
|
||||
dev_entries
|
||||
.next_entry()
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: e.to_string(),
|
||||
})?
|
||||
{
|
||||
let file_name = entry.file_name();
|
||||
let device_name = file_name.to_string_lossy();
|
||||
|
||||
if Self::is_suitable_device(&device_name) {
|
||||
devices.push(device_name.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(devices)
|
||||
}
|
||||
|
||||
fn is_suitable_device(device_name: &str) -> bool {
|
||||
// Include NVMe, SATA, and other storage devices
|
||||
// Exclude partitions, loop devices, etc.
|
||||
(device_name.starts_with("nvme") && device_name.contains("n") && !device_name.contains("p")) ||
|
||||
(device_name.starts_with("sd") && device_name.len() == 3) || // sda, sdb, etc. not sda1
|
||||
(device_name.starts_with("hd") && device_name.len() == 3) || // hda, hdb, etc.
|
||||
(device_name.starts_with("vd") && device_name.len() == 3) // vda, vdb for VMs
|
||||
}
|
||||
|
||||
fn fallback_device_names() -> Vec<String> {
|
||||
vec!["nvme0n1".to_string(), "sda".to_string(), "sdb".to_string()]
|
||||
}
|
||||
|
||||
/// Auto-detect systemd services suitable for monitoring
|
||||
pub async fn discover_services() -> Vec<String> {
|
||||
let mut services = Vec::new();
|
||||
|
||||
// Method 1: Try to find running services
|
||||
if let Ok(running_services) = Self::discover_running_services().await {
|
||||
services.extend(running_services);
|
||||
}
|
||||
|
||||
// Method 2: Add host-specific services based on hostname
|
||||
let hostname = gethostname::gethostname().to_string_lossy().to_string();
|
||||
services.extend(Self::get_host_specific_services(&hostname));
|
||||
|
||||
// Normalize aliases and verify the units actually exist before deduping
|
||||
let canonicalized: Vec<String> = services
|
||||
.into_iter()
|
||||
.filter_map(|svc| Self::canonical_service_name(&svc))
|
||||
.collect();
|
||||
|
||||
let existing = Self::filter_existing_services(&canonicalized).await;
|
||||
|
||||
let mut unique_services: Vec<String> = existing
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect();
|
||||
unique_services.sort();
|
||||
|
||||
debug!("Auto-detected services: {:?}", unique_services);
|
||||
unique_services
|
||||
}
|
||||
|
||||
async fn discover_running_services() -> Result<Vec<String>, CollectorError> {
|
||||
let output = Command::new("systemctl")
|
||||
.args([
|
||||
"list-units",
|
||||
"--type=service",
|
||||
"--state=active",
|
||||
"--no-pager",
|
||||
"--no-legend",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: "systemctl list-units".to_string(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: "systemctl list-units".to_string(),
|
||||
message: String::from_utf8_lossy(&output.stderr).to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let mut services = Vec::new();
|
||||
|
||||
for line in stdout.lines() {
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if !parts.is_empty() {
|
||||
let service_name = parts[0];
|
||||
// Remove .service suffix if present
|
||||
let clean_name = service_name
|
||||
.strip_suffix(".service")
|
||||
.unwrap_or(service_name);
|
||||
|
||||
// Only include services we're interested in monitoring
|
||||
if Self::is_monitorable_service(clean_name) {
|
||||
services.push(clean_name.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(services)
|
||||
}
|
||||
|
||||
fn is_monitorable_service(service_name: &str) -> bool {
|
||||
// Define patterns for services we want to monitor
|
||||
let interesting_services = [
|
||||
// Web applications
|
||||
"gitea",
|
||||
"immich",
|
||||
"vaultwarden",
|
||||
"unifi",
|
||||
"wordpress",
|
||||
"nginx",
|
||||
"apache2",
|
||||
"httpd",
|
||||
"caddy",
|
||||
// Databases
|
||||
"postgresql",
|
||||
"mysql",
|
||||
"mariadb",
|
||||
"redis",
|
||||
"mongodb",
|
||||
// Monitoring and infrastructure
|
||||
"smart-metrics-api",
|
||||
"service-metrics-api",
|
||||
"backup-metrics-api",
|
||||
"prometheus",
|
||||
"grafana",
|
||||
"influxdb",
|
||||
// Backup and storage
|
||||
"restic",
|
||||
"borg",
|
||||
"rclone",
|
||||
"syncthing",
|
||||
// Container runtimes
|
||||
"docker",
|
||||
"podman",
|
||||
"containerd",
|
||||
// Network services
|
||||
"sshd",
|
||||
"dnsmasq",
|
||||
"bind9",
|
||||
"pihole",
|
||||
// Media services
|
||||
"plex",
|
||||
"jellyfin",
|
||||
"emby",
|
||||
"sonarr",
|
||||
"radarr",
|
||||
];
|
||||
|
||||
// Check if service name contains any of our interesting patterns
|
||||
interesting_services
|
||||
.iter()
|
||||
.any(|&pattern| service_name.contains(pattern) || pattern.contains(service_name))
|
||||
}
|
||||
|
||||
fn get_host_specific_services(hostname: &str) -> Vec<String> {
|
||||
match hostname {
|
||||
"srv01" => vec![
|
||||
"gitea".to_string(),
|
||||
"immich".to_string(),
|
||||
"vaultwarden".to_string(),
|
||||
"unifi".to_string(),
|
||||
"smart-metrics-api".to_string(),
|
||||
"service-metrics-api".to_string(),
|
||||
"backup-metrics-api".to_string(),
|
||||
],
|
||||
"cmbox" | "labbox" | "simonbox" => vec!["docker".to_string(), "sshd".to_string()],
|
||||
"steambox" => vec!["steam".to_string(), "sshd".to_string()],
|
||||
_ => vec!["sshd".to_string()],
|
||||
}
|
||||
}
|
||||
|
||||
fn canonical_service_name(service: &str) -> Option<String> {
|
||||
let trimmed = service.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let lower = trimmed.to_lowercase();
|
||||
let aliases = [
|
||||
("ssh", "sshd"),
|
||||
("sshd", "sshd"),
|
||||
("docker.service", "docker"),
|
||||
];
|
||||
|
||||
for (alias, target) in aliases {
|
||||
if lower == alias {
|
||||
return Some(target.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
|
||||
async fn filter_existing_services(services: &[String]) -> Vec<String> {
|
||||
let mut existing = Vec::new();
|
||||
|
||||
for service in services {
|
||||
if Self::service_exists(service).await {
|
||||
existing.push(service.clone());
|
||||
}
|
||||
}
|
||||
|
||||
existing
|
||||
}
|
||||
|
||||
async fn service_exists(service: &str) -> bool {
|
||||
let unit = if service.ends_with(".service") {
|
||||
service.to_string()
|
||||
} else {
|
||||
format!("{}.service", service)
|
||||
};
|
||||
|
||||
match Command::new("systemctl")
|
||||
.args(["status", &unit])
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null())
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
Ok(output) => output.status.success(),
|
||||
Err(error) => {
|
||||
warn!("Failed to check service {}: {}", unit, error);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-detect backup configuration
|
||||
pub async fn discover_backup_config(hostname: &str) -> (bool, Option<String>, String) {
|
||||
// Check if this host should have backup monitoring
|
||||
let backup_enabled = hostname == "srv01" || Self::has_backup_service().await;
|
||||
|
||||
// Try to find restic repository
|
||||
let restic_repo = if backup_enabled {
|
||||
Self::discover_restic_repo().await
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Determine backup service name
|
||||
let backup_service = Self::discover_backup_service()
|
||||
.await
|
||||
.unwrap_or_else(|| "restic-backup".to_string());
|
||||
|
||||
(backup_enabled, restic_repo, backup_service)
|
||||
}
|
||||
|
||||
async fn has_backup_service() -> bool {
|
||||
// Check for common backup services
|
||||
let backup_services = ["restic", "borg", "duplicati", "rclone"];
|
||||
|
||||
for service in backup_services {
|
||||
if let Ok(output) = Command::new("systemctl")
|
||||
.args(["is-enabled", service])
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
if output.status.success() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn discover_restic_repo() -> Option<String> {
|
||||
// Common restic repository locations
|
||||
let common_paths = [
|
||||
"/srv/backups/restic",
|
||||
"/var/backups/restic",
|
||||
"/home/restic",
|
||||
"/backup/restic",
|
||||
"/mnt/backup/restic",
|
||||
];
|
||||
|
||||
for path in common_paths {
|
||||
if fs::metadata(path).await.is_ok() {
|
||||
debug!("Found restic repository at: {}", path);
|
||||
return Some(path.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
// Try to find via environment variables or config files
|
||||
if let Ok(content) = fs::read_to_string("/etc/restic/repository").await {
|
||||
let repo_path = content.trim();
|
||||
if !repo_path.is_empty() {
|
||||
return Some(repo_path.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn discover_backup_service() -> Option<String> {
|
||||
let backup_services = ["restic-backup", "restic", "borg-backup", "borg", "backup"];
|
||||
|
||||
for service in backup_services {
|
||||
if let Ok(output) = Command::new("systemctl")
|
||||
.args(["is-enabled", &format!("{}.service", service)])
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
if output.status.success() {
|
||||
return Some(service.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Validate auto-detected configuration
|
||||
pub async fn validate_devices(devices: &[String]) -> Vec<String> {
|
||||
let mut valid_devices = Vec::new();
|
||||
|
||||
for device in devices {
|
||||
if Self::can_access_device(device).await {
|
||||
valid_devices.push(device.clone());
|
||||
} else {
|
||||
warn!("Cannot access device {}, skipping", device);
|
||||
}
|
||||
}
|
||||
|
||||
valid_devices
|
||||
}
|
||||
|
||||
async fn can_access_device(device: &str) -> bool {
|
||||
let device_path = format!("/dev/{}", device);
|
||||
|
||||
// Try to run smartctl to see if device is accessible
|
||||
if let Ok(output) = Command::new("smartctl")
|
||||
.args(["-i", &device_path])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
// smartctl returns 0 for success, but may return other codes for warnings
|
||||
// that are still acceptable (like device supports SMART but has some issues)
|
||||
output.status.code().map_or(false, |code| code <= 4)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,161 +1,182 @@
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use chrono::Utc;
|
||||
use anyhow::{anyhow, Result};
|
||||
use clap::{ArgAction, Parser};
|
||||
use cm_dashboard_shared::envelope::{AgentType, MetricsEnvelope};
|
||||
use rand::Rng;
|
||||
use serde_json::json;
|
||||
use tracing::info;
|
||||
use std::path::PathBuf;
|
||||
use tokio::signal;
|
||||
use tracing::{error, info};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use zmq::{Context as ZmqContext, SocketType};
|
||||
|
||||
mod agent;
|
||||
mod collectors;
|
||||
mod config;
|
||||
mod discovery;
|
||||
mod scheduler;
|
||||
|
||||
use agent::MetricsAgent;
|
||||
use config::AgentConfig;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
name = "cm-dashboard-agent",
|
||||
version,
|
||||
about = "CM Dashboard metrics agent"
|
||||
about = "CM Dashboard ZMQ metrics agent with auto-detection"
|
||||
)]
|
||||
struct Cli {
|
||||
/// Hostname to advertise in metric envelopes
|
||||
#[arg(long, value_name = "HOSTNAME")]
|
||||
hostname: String,
|
||||
/// ZMQ port to bind to (default: 6130)
|
||||
#[arg(long, value_name = "PORT")]
|
||||
port: Option<u16>,
|
||||
|
||||
/// Bind endpoint for PUB socket (default tcp://*:6130)
|
||||
#[arg(long, default_value = "tcp://*:6130", value_name = "ENDPOINT")]
|
||||
bind: String,
|
||||
/// Path to load configuration from
|
||||
#[arg(long, value_name = "FILE")]
|
||||
config: Option<PathBuf>,
|
||||
|
||||
/// Publish interval in milliseconds
|
||||
#[arg(long, default_value_t = 5000)]
|
||||
interval_ms: u64,
|
||||
/// Optional path to persist the resolved configuration
|
||||
#[arg(long, value_name = "FILE")]
|
||||
write_config: Option<PathBuf>,
|
||||
|
||||
/// Disable smart metrics publisher
|
||||
/// Disable SMART metrics collector
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
disable_smart: bool,
|
||||
|
||||
/// Disable service metrics publisher
|
||||
/// Disable service metrics collector
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
disable_service: bool,
|
||||
|
||||
/// Disable backup metrics publisher
|
||||
/// Disable backup metrics collector
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
disable_backup: bool,
|
||||
|
||||
/// Skip auto-detection and use minimal defaults
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
no_auto_detect: bool,
|
||||
|
||||
/// Show detected configuration and exit
|
||||
#[arg(long, action = ArgAction::SetTrue)]
|
||||
show_config: bool,
|
||||
|
||||
/// Increase logging verbosity (-v, -vv)
|
||||
#[arg(short, long, action = ArgAction::Count)]
|
||||
verbose: u8,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
init_tracing(cli.verbose)?;
|
||||
|
||||
let context = ZmqContext::new();
|
||||
let socket = context
|
||||
.socket(SocketType::PUB)
|
||||
.context("failed to create ZMQ PUB socket")?;
|
||||
socket
|
||||
.bind(&cli.bind)
|
||||
.with_context(|| format!("failed to bind to {}", cli.bind))?;
|
||||
info!(endpoint = %cli.bind, host = %cli.hostname, "agent started");
|
||||
// Start with file-based configuration if requested, otherwise defaults
|
||||
let mut config = if let Some(path) = cli.config.as_ref() {
|
||||
AgentConfig::load_from_file(path)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to load config from {}: {}", path.display(), e))?
|
||||
} else {
|
||||
AgentConfig::default()
|
||||
};
|
||||
|
||||
let interval = Duration::from_millis(cli.interval_ms.max(100));
|
||||
let mut rng = rand::thread_rng();
|
||||
// Hostname is auto-detected in AgentConfig::default()
|
||||
|
||||
loop {
|
||||
let now = Utc::now();
|
||||
let timestamp = now.timestamp() as u64;
|
||||
let timestamp_rfc3339 = now.to_rfc3339();
|
||||
|
||||
if !cli.disable_smart {
|
||||
let envelope = MetricsEnvelope {
|
||||
hostname: cli.hostname.clone(),
|
||||
agent_type: AgentType::Smart,
|
||||
timestamp,
|
||||
metrics: json!({
|
||||
"status": "Healthy",
|
||||
"drives": [{
|
||||
"name": "nvme0n1",
|
||||
"temperature_c": rng.gen_range(30.0..60.0),
|
||||
"wear_level": rng.gen_range(1.0..10.0),
|
||||
"power_on_hours": rng.gen_range(1000..20000),
|
||||
"available_spare": rng.gen_range(90.0..100.0)
|
||||
}],
|
||||
"summary": {
|
||||
"healthy": 1,
|
||||
"warning": 0,
|
||||
"critical": 0,
|
||||
"capacity_total_gb": 1024,
|
||||
"capacity_used_gb": rng.gen_range(100.0..800.0)
|
||||
},
|
||||
"issues": [],
|
||||
"timestamp": timestamp_rfc3339
|
||||
}),
|
||||
};
|
||||
publish(&socket, &envelope)?;
|
||||
}
|
||||
|
||||
if !cli.disable_service {
|
||||
let envelope = MetricsEnvelope {
|
||||
hostname: cli.hostname.clone(),
|
||||
agent_type: AgentType::Service,
|
||||
timestamp,
|
||||
metrics: json!({
|
||||
"summary": {
|
||||
"healthy": 5,
|
||||
"degraded": 0,
|
||||
"failed": 0,
|
||||
"memory_used_mb": rng.gen_range(512.0..2048.0),
|
||||
"memory_quota_mb": 4096.0
|
||||
},
|
||||
"services": [
|
||||
{
|
||||
"name": "example",
|
||||
"status": "Running",
|
||||
"memory_used_mb": rng.gen_range(128.0..512.0),
|
||||
"memory_quota_mb": 1024.0,
|
||||
"cpu_percent": rng.gen_range(0.0..75.0),
|
||||
"sandbox_limit": null
|
||||
}
|
||||
],
|
||||
"timestamp": timestamp_rfc3339
|
||||
}),
|
||||
};
|
||||
publish(&socket, &envelope)?;
|
||||
}
|
||||
|
||||
if !cli.disable_backup {
|
||||
let envelope = MetricsEnvelope {
|
||||
hostname: cli.hostname.clone(),
|
||||
agent_type: AgentType::Backup,
|
||||
timestamp,
|
||||
metrics: json!({
|
||||
"overall_status": "Healthy",
|
||||
"backup": {
|
||||
"last_success": timestamp_rfc3339,
|
||||
"last_failure": null,
|
||||
"size_gb": rng.gen_range(100.0..500.0),
|
||||
"snapshot_count": rng.gen_range(10..40)
|
||||
},
|
||||
"service": {
|
||||
"enabled": true,
|
||||
"pending_jobs": 0,
|
||||
"last_message": "Backups up-to-date"
|
||||
},
|
||||
"timestamp": timestamp_rfc3339
|
||||
}),
|
||||
};
|
||||
publish(&socket, &envelope)?;
|
||||
}
|
||||
|
||||
thread::sleep(interval);
|
||||
// Apply CLI port override
|
||||
if let Some(port) = cli.port {
|
||||
config.zmq.port = port;
|
||||
}
|
||||
|
||||
// Run auto-detection unless disabled
|
||||
if !cli.no_auto_detect {
|
||||
info!("Auto-detecting system configuration...");
|
||||
config
|
||||
.auto_configure()
|
||||
.await
|
||||
.map_err(|e| anyhow!("Auto-detection failed: {}", e))?;
|
||||
} else {
|
||||
info!("Skipping auto-detection, using minimal defaults");
|
||||
}
|
||||
|
||||
// Apply CLI collector overrides after auto-detection
|
||||
if cli.disable_smart {
|
||||
config.collectors.smart.enabled = false;
|
||||
}
|
||||
if cli.disable_service {
|
||||
config.collectors.service.enabled = false;
|
||||
}
|
||||
if cli.disable_backup {
|
||||
config.collectors.backup.enabled = false;
|
||||
}
|
||||
|
||||
if let Some(path) = cli.write_config.as_ref() {
|
||||
config
|
||||
.save_to_file(path)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to write config to {}: {}", path.display(), e))?;
|
||||
info!("Persisted configuration to {}", path.display());
|
||||
}
|
||||
|
||||
// Show configuration and exit if requested
|
||||
if cli.show_config {
|
||||
println!("Agent Configuration:");
|
||||
println!(" Hostname: {}", config.agent.hostname);
|
||||
println!(" ZMQ Port: {}", config.zmq.port);
|
||||
println!(" Collectors: {}", config.summary());
|
||||
|
||||
if config.collectors.smart.enabled {
|
||||
println!(" SMART Devices: {:?}", config.collectors.smart.devices);
|
||||
}
|
||||
|
||||
if config.collectors.service.enabled {
|
||||
println!(" Services: {:?}", config.collectors.service.services);
|
||||
}
|
||||
|
||||
if config.collectors.backup.enabled {
|
||||
println!(" Backup Repo: {:?}", config.collectors.backup.restic_repo);
|
||||
println!(
|
||||
" Backup Service: {}",
|
||||
config.collectors.backup.backup_service
|
||||
);
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"Starting agent for host '{}' on port {} with: {}",
|
||||
config.agent.hostname,
|
||||
config.zmq.port,
|
||||
config.summary()
|
||||
);
|
||||
|
||||
// Build and start the agent
|
||||
let mut agent =
|
||||
MetricsAgent::from_config(config).map_err(|e| anyhow!("Failed to create agent: {}", e))?;
|
||||
|
||||
// Set up graceful shutdown handling
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
|
||||
.expect("Failed to install SIGTERM handler");
|
||||
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
|
||||
.expect("Failed to install SIGINT handler");
|
||||
|
||||
tokio::select! {
|
||||
_ = sigterm.recv() => info!("Received SIGTERM"),
|
||||
_ = sigint.recv() => info!("Received SIGINT"),
|
||||
}
|
||||
|
||||
let _ = shutdown_tx.send(());
|
||||
});
|
||||
|
||||
// Run the agent until shutdown
|
||||
tokio::select! {
|
||||
result = agent.run() => {
|
||||
match result {
|
||||
Ok(_) => info!("Agent completed successfully"),
|
||||
Err(e) => error!("Agent error: {}", e),
|
||||
}
|
||||
}
|
||||
_ = shutdown_rx => {
|
||||
info!("Shutdown signal received");
|
||||
agent.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn publish(socket: &zmq::Socket, envelope: &MetricsEnvelope) -> Result<()> {
|
||||
let serialized = serde_json::to_vec(envelope)?;
|
||||
socket.send(serialized, 0)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
393
agent/src/scheduler.rs
Normal file
393
agent/src/scheduler.rs
Normal file
@@ -0,0 +1,393 @@
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
use tokio::time::{interval, Instant};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::collectors::{Collector, CollectorError, CollectorOutput};
|
||||
|
||||
pub struct CollectorScheduler {
|
||||
collectors: Vec<Arc<dyn Collector>>,
|
||||
sender: mpsc::UnboundedSender<SchedulerEvent>,
|
||||
receiver: mpsc::UnboundedReceiver<SchedulerEvent>,
|
||||
stats: Arc<RwLock<SchedulerStats>>,
|
||||
metrics_sender: Option<mpsc::UnboundedSender<CollectorOutput>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SchedulerEvent {
|
||||
CollectionResult {
|
||||
collector_name: String,
|
||||
result: Result<CollectorOutput, CollectorError>,
|
||||
duration: Duration,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct SchedulerStats {
|
||||
pub total_collections: u64,
|
||||
pub successful_collections: u64,
|
||||
pub failed_collections: u64,
|
||||
pub collector_stats: HashMap<String, CollectorStats>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct CollectorStats {
|
||||
pub total_collections: u64,
|
||||
pub successful_collections: u64,
|
||||
pub failed_collections: u64,
|
||||
pub last_success: Option<Instant>,
|
||||
pub last_failure: Option<Instant>,
|
||||
pub average_duration_ms: f64,
|
||||
pub consecutive_failures: u32,
|
||||
}
|
||||
|
||||
impl CollectorScheduler {
|
||||
pub fn new() -> Self {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
|
||||
Self {
|
||||
collectors: Vec::new(),
|
||||
sender,
|
||||
receiver,
|
||||
stats: Arc::new(RwLock::new(SchedulerStats::default())),
|
||||
metrics_sender: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_metrics_sender(&mut self, sender: mpsc::UnboundedSender<CollectorOutput>) {
|
||||
self.metrics_sender = Some(sender);
|
||||
}
|
||||
|
||||
pub fn clear_metrics_sender(&mut self) {
|
||||
self.metrics_sender = None;
|
||||
}
|
||||
|
||||
pub fn add_collector(&mut self, collector: Arc<dyn Collector>) {
|
||||
if collector.is_enabled() {
|
||||
info!(
|
||||
"Adding collector '{}' [{}] with interval {:?}",
|
||||
collector.name(),
|
||||
collector.agent_type().as_str(),
|
||||
collector.collect_interval()
|
||||
);
|
||||
|
||||
if collector.requires_root() {
|
||||
debug!("Collector '{}' is flagged as root-only", collector.name());
|
||||
}
|
||||
self.collectors.push(collector);
|
||||
} else {
|
||||
info!("Skipping disabled collector '{}'", collector.name());
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<(), CollectorError> {
|
||||
if self.collectors.is_empty() {
|
||||
return Err(CollectorError::ConfigError {
|
||||
message: "No enabled collectors configured".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
"Starting scheduler with {} collectors",
|
||||
self.collectors.len()
|
||||
);
|
||||
|
||||
// Start collection tasks for each collector
|
||||
let mut collection_tasks = FuturesUnordered::new();
|
||||
|
||||
for collector in self.collectors.clone() {
|
||||
let sender = self.sender.clone();
|
||||
let stats = self.stats.clone();
|
||||
|
||||
let task =
|
||||
tokio::spawn(async move { Self::run_collector(collector, sender, stats).await });
|
||||
|
||||
collection_tasks.push(task);
|
||||
}
|
||||
|
||||
// Main event loop
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Handle collection results
|
||||
Some(event) = self.receiver.recv() => {
|
||||
match event {
|
||||
SchedulerEvent::CollectionResult { collector_name, result, duration } => {
|
||||
self.handle_collection_result(&collector_name, result, duration).await;
|
||||
}
|
||||
SchedulerEvent::Shutdown => {
|
||||
info!("Scheduler shutdown requested");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle task completion (shouldn't happen in normal operation)
|
||||
Some(result) = collection_tasks.next() => {
|
||||
match result {
|
||||
Ok(_) => warn!("Collection task completed unexpectedly"),
|
||||
Err(e) => error!("Collection task failed: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// If all tasks are done and no more events, break
|
||||
else => {
|
||||
warn!("All collection tasks completed, shutting down scheduler");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_collector(
|
||||
collector: Arc<dyn Collector>,
|
||||
sender: mpsc::UnboundedSender<SchedulerEvent>,
|
||||
_stats: Arc<RwLock<SchedulerStats>>,
|
||||
) {
|
||||
let collector_name = collector.name().to_string();
|
||||
let mut interval_timer = interval(collector.collect_interval());
|
||||
interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
info!("Starting collection loop for '{}'", collector_name);
|
||||
|
||||
loop {
|
||||
interval_timer.tick().await;
|
||||
|
||||
debug!("Running collection for '{}'", collector_name);
|
||||
let start_time = Instant::now();
|
||||
|
||||
match collector.collect().await {
|
||||
Ok(output) => {
|
||||
let duration = start_time.elapsed();
|
||||
debug!(
|
||||
"Collection '{}' completed in {:?}",
|
||||
collector_name, duration
|
||||
);
|
||||
|
||||
if let Err(e) = sender.send(SchedulerEvent::CollectionResult {
|
||||
collector_name: collector_name.clone(),
|
||||
result: Ok(output),
|
||||
duration,
|
||||
}) {
|
||||
error!(
|
||||
"Failed to send collection result for '{}': {}",
|
||||
collector_name, e
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
let duration = start_time.elapsed();
|
||||
warn!(
|
||||
"Collection '{}' failed after {:?}: {}",
|
||||
collector_name, duration, error
|
||||
);
|
||||
|
||||
if let Err(e) = sender.send(SchedulerEvent::CollectionResult {
|
||||
collector_name: collector_name.clone(),
|
||||
result: Err(error),
|
||||
duration,
|
||||
}) {
|
||||
error!(
|
||||
"Failed to send collection error for '{}': {}",
|
||||
collector_name, e
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warn!("Collection loop for '{}' ended", collector_name);
|
||||
}
|
||||
|
||||
async fn handle_collection_result(
|
||||
&self,
|
||||
collector_name: &str,
|
||||
result: Result<CollectorOutput, CollectorError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let publish_output = match &result {
|
||||
Ok(output) => Some(output.clone()),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
{
|
||||
let mut stats = self.stats.write().await;
|
||||
stats.total_collections += 1;
|
||||
|
||||
match &result {
|
||||
Ok(_) => {
|
||||
stats.successful_collections += 1;
|
||||
}
|
||||
Err(_) => {
|
||||
stats.failed_collections += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle collector-specific stats
|
||||
{
|
||||
let mut stats = self.stats.write().await;
|
||||
let duration_ms = duration.as_millis() as f64;
|
||||
|
||||
let collector_stats = stats
|
||||
.collector_stats
|
||||
.entry(collector_name.to_string())
|
||||
.or_default();
|
||||
|
||||
collector_stats.total_collections += 1;
|
||||
|
||||
if collector_stats.average_duration_ms == 0.0 {
|
||||
collector_stats.average_duration_ms = duration_ms;
|
||||
} else {
|
||||
// Simple moving average
|
||||
collector_stats.average_duration_ms =
|
||||
(collector_stats.average_duration_ms * 0.9) + (duration_ms * 0.1);
|
||||
}
|
||||
|
||||
match &result {
|
||||
Ok(output) => {
|
||||
collector_stats.successful_collections += 1;
|
||||
collector_stats.last_success = Some(Instant::now());
|
||||
collector_stats.consecutive_failures = 0;
|
||||
|
||||
let metrics_count = match &output.data {
|
||||
serde_json::Value::Object(map) => map.len(),
|
||||
serde_json::Value::Array(values) => values.len(),
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Collector '{}' [{}] successful at {} ({} metrics)",
|
||||
collector_name,
|
||||
output.agent_type.as_str(),
|
||||
output.timestamp,
|
||||
metrics_count
|
||||
);
|
||||
}
|
||||
Err(error) => {
|
||||
collector_stats.failed_collections += 1;
|
||||
collector_stats.last_failure = Some(Instant::now());
|
||||
collector_stats.consecutive_failures += 1;
|
||||
|
||||
warn!("Collection '{}' failed: {}", collector_name, error);
|
||||
|
||||
// Log warning for consecutive failures
|
||||
if collector_stats.consecutive_failures >= 5 {
|
||||
error!(
|
||||
"Collector '{}' has {} consecutive failures",
|
||||
collector_name, collector_stats.consecutive_failures
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(sender), Some(output)) = (&self.metrics_sender, publish_output) {
|
||||
if let Err(error) = sender.send(output) {
|
||||
warn!("Metrics channel send error: {}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stats_handle(&self) -> Arc<RwLock<SchedulerStats>> {
|
||||
self.stats.clone()
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
info!("Requesting scheduler shutdown");
|
||||
if let Err(e) = self.sender.send(SchedulerEvent::Shutdown) {
|
||||
error!("Failed to send shutdown event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CollectorScheduler {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HealthChecker {
|
||||
stats: Arc<RwLock<SchedulerStats>>,
|
||||
max_consecutive_failures: u32,
|
||||
max_failure_rate: f64,
|
||||
}
|
||||
|
||||
impl HealthChecker {
|
||||
pub fn new(stats: Arc<RwLock<SchedulerStats>>) -> Self {
|
||||
Self {
|
||||
stats,
|
||||
max_consecutive_failures: 10,
|
||||
max_failure_rate: 0.5, // 50% failure rate threshold
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_health(&self) -> HealthStatus {
|
||||
let stats = self.stats.read().await;
|
||||
|
||||
let mut unhealthy_collectors = Vec::new();
|
||||
let mut degraded_collectors = Vec::new();
|
||||
|
||||
for (name, collector_stats) in &stats.collector_stats {
|
||||
// Check consecutive failures
|
||||
if collector_stats.consecutive_failures >= self.max_consecutive_failures {
|
||||
unhealthy_collectors.push(name.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check failure rate
|
||||
if collector_stats.total_collections > 10 {
|
||||
let failure_rate = collector_stats.failed_collections as f64
|
||||
/ collector_stats.total_collections as f64;
|
||||
|
||||
if failure_rate >= self.max_failure_rate {
|
||||
degraded_collectors.push(name.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Check if collector hasn't succeeded recently
|
||||
if let Some(last_success) = collector_stats.last_success {
|
||||
if last_success.elapsed() > Duration::from_secs(300) {
|
||||
// 5 minutes
|
||||
degraded_collectors.push(name.clone());
|
||||
}
|
||||
} else if collector_stats.total_collections > 5 {
|
||||
// No successful collections after several attempts
|
||||
unhealthy_collectors.push(name.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !unhealthy_collectors.is_empty() {
|
||||
HealthStatus::Unhealthy {
|
||||
unhealthy_collectors,
|
||||
degraded_collectors,
|
||||
}
|
||||
} else if !degraded_collectors.is_empty() {
|
||||
HealthStatus::Degraded {
|
||||
degraded_collectors,
|
||||
}
|
||||
} else {
|
||||
HealthStatus::Healthy
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum HealthStatus {
|
||||
Healthy,
|
||||
Degraded {
|
||||
degraded_collectors: Vec<String>,
|
||||
},
|
||||
Unhealthy {
|
||||
unhealthy_collectors: Vec<String>,
|
||||
degraded_collectors: Vec<String>,
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user