Implement comprehensive backup monitoring and fix timestamp issues

- Add BackupCollector for reading TOML status files with disk space metrics
- Implement BackupWidget with disk usage display and service status details
- Fix backup script disk space parsing by adding missing capture_output=True
- Update backup widget to show actual disk usage instead of repository size
- Fix timestamp parsing to use backup completion time instead of start time
- Resolve timezone issues by using UTC timestamps in backup script
- Add disk identification metrics (product name, serial number) to backup status
- Enhance UI layout with proper backup monitoring integration
This commit is contained in:
2025-10-18 18:33:41 +02:00
parent 8a36472a3d
commit 125111ee99
19 changed files with 2788 additions and 1020 deletions

View File

@@ -0,0 +1,388 @@
use async_trait::async_trait;
use cm_dashboard_shared::{Metric, MetricValue, Status, SharedError};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use tokio::fs;
use super::{Collector, CollectorError, utils};
use tracing::error;
/// Backup collector that reads TOML status files for borgbackup metrics
#[derive(Debug, Clone)]
pub struct BackupCollector {
pub backup_status_file: String,
pub max_age_hours: u64,
}
impl BackupCollector {
pub fn new(backup_status_file: Option<String>, max_age_hours: u64) -> Self {
Self {
backup_status_file: backup_status_file.unwrap_or_else(|| "/var/lib/backup/backup-status.toml".to_string()),
max_age_hours,
}
}
async fn read_backup_status(&self) -> Result<BackupStatusToml, CollectorError> {
let content = fs::read_to_string(&self.backup_status_file)
.await
.map_err(|e| CollectorError::SystemRead {
path: self.backup_status_file.clone(),
error: e.to_string(),
})?;
toml::from_str(&content).map_err(|e| CollectorError::Parse {
value: "backup status TOML".to_string(),
error: e.to_string(),
})
}
fn calculate_backup_status(&self, backup_status: &BackupStatusToml) -> Status {
// Parse the start time to check age - handle both RFC3339 and local timestamp formats
let start_time = match chrono::DateTime::parse_from_rfc3339(&backup_status.start_time) {
Ok(dt) => dt.with_timezone(&Utc),
Err(_) => {
// Try parsing as naive datetime and assume UTC
match chrono::NaiveDateTime::parse_from_str(&backup_status.start_time, "%Y-%m-%dT%H:%M:%S%.f") {
Ok(naive_dt) => naive_dt.and_utc(),
Err(_) => {
error!("Failed to parse backup timestamp: {}", backup_status.start_time);
return Status::Unknown;
}
}
}
};
let hours_since_backup = Utc::now().signed_duration_since(start_time).num_hours();
// Check overall backup status
match backup_status.status.as_str() {
"success" => {
if hours_since_backup > self.max_age_hours as i64 {
Status::Warning // Backup too old
} else {
Status::Ok
}
},
"failed" => Status::Critical,
"running" => Status::Ok, // Currently running is OK
_ => Status::Unknown,
}
}
fn calculate_service_status(&self, service: &ServiceStatus) -> Status {
match service.status.as_str() {
"completed" => {
if service.exit_code == 0 {
Status::Ok
} else {
Status::Critical
}
},
"failed" => Status::Critical,
"disabled" => Status::Warning, // Service intentionally disabled
"running" => Status::Ok,
_ => Status::Unknown,
}
}
fn bytes_to_gb(bytes: u64) -> f32 {
bytes as f32 / (1024.0 * 1024.0 * 1024.0)
}
}
#[async_trait]
impl Collector for BackupCollector {
fn name(&self) -> &str {
"backup"
}
async fn collect(&self) -> Result<Vec<Metric>, CollectorError> {
let backup_status = self.read_backup_status().await?;
let mut metrics = Vec::new();
let timestamp = chrono::Utc::now().timestamp() as u64;
// Overall backup status
let overall_status = self.calculate_backup_status(&backup_status);
metrics.push(Metric {
name: "backup_overall_status".to_string(),
value: MetricValue::String(match overall_status {
Status::Ok => "ok".to_string(),
Status::Warning => "warning".to_string(),
Status::Critical => "critical".to_string(),
Status::Unknown => "unknown".to_string(),
}),
status: overall_status,
timestamp,
description: Some(format!("Backup: {} at {}", backup_status.status, backup_status.start_time)),
unit: None,
});
// Backup duration
metrics.push(Metric {
name: "backup_duration_seconds".to_string(),
value: MetricValue::Integer(backup_status.duration_seconds),
status: Status::Ok,
timestamp,
description: Some("Duration of last backup run".to_string()),
unit: Some("seconds".to_string()),
});
// Last backup timestamp - use last_updated (when backup finished) instead of start_time
let last_updated_dt_result = chrono::DateTime::parse_from_rfc3339(&backup_status.last_updated)
.map(|dt| dt.with_timezone(&Utc))
.or_else(|_| {
// Try parsing as naive datetime and assume UTC
chrono::NaiveDateTime::parse_from_str(&backup_status.last_updated, "%Y-%m-%dT%H:%M:%S%.f")
.map(|naive_dt| naive_dt.and_utc())
});
if let Ok(last_updated_dt) = last_updated_dt_result {
metrics.push(Metric {
name: "backup_last_run_timestamp".to_string(),
value: MetricValue::Integer(last_updated_dt.timestamp()),
status: Status::Ok,
timestamp,
description: Some("Timestamp of last backup completion".to_string()),
unit: Some("unix_timestamp".to_string()),
});
} else {
error!("Failed to parse backup timestamp for last_run_timestamp: {}", backup_status.last_updated);
}
// Individual service metrics
for (service_name, service) in &backup_status.services {
let service_status = self.calculate_service_status(service);
// Service status
metrics.push(Metric {
name: format!("backup_service_{}_status", service_name),
value: MetricValue::String(match service_status {
Status::Ok => "ok".to_string(),
Status::Warning => "warning".to_string(),
Status::Critical => "critical".to_string(),
Status::Unknown => "unknown".to_string(),
}),
status: service_status,
timestamp,
description: Some(format!("Backup service {} status: {}", service_name, service.status)),
unit: None,
});
// Service exit code
metrics.push(Metric {
name: format!("backup_service_{}_exit_code", service_name),
value: MetricValue::Integer(service.exit_code),
status: if service.exit_code == 0 { Status::Ok } else { Status::Critical },
timestamp,
description: Some(format!("Exit code for backup service {}", service_name)),
unit: None,
});
// Repository archive count
metrics.push(Metric {
name: format!("backup_service_{}_archive_count", service_name),
value: MetricValue::Integer(service.archive_count),
status: Status::Ok,
timestamp,
description: Some(format!("Number of archives in {} repository", service_name)),
unit: Some("archives".to_string()),
});
// Repository size in GB
let repo_size_gb = Self::bytes_to_gb(service.repo_size_bytes);
metrics.push(Metric {
name: format!("backup_service_{}_repo_size_gb", service_name),
value: MetricValue::Float(repo_size_gb),
status: Status::Ok,
timestamp,
description: Some(format!("Repository size for {} in GB", service_name)),
unit: Some("GB".to_string()),
});
// Repository path for reference
metrics.push(Metric {
name: format!("backup_service_{}_repo_path", service_name),
value: MetricValue::String(service.repo_path.clone()),
status: Status::Ok,
timestamp,
description: Some(format!("Repository path for {}", service_name)),
unit: None,
});
}
// Total number of services
metrics.push(Metric {
name: "backup_total_services".to_string(),
value: MetricValue::Integer(backup_status.services.len() as i64),
status: Status::Ok,
timestamp,
description: Some("Total number of backup services".to_string()),
unit: Some("services".to_string()),
});
// Calculate total repository size
let total_size_bytes: u64 = backup_status.services.values()
.map(|s| s.repo_size_bytes)
.sum();
let total_size_gb = Self::bytes_to_gb(total_size_bytes);
metrics.push(Metric {
name: "backup_total_repo_size_gb".to_string(),
value: MetricValue::Float(total_size_gb),
status: Status::Ok,
timestamp,
description: Some("Total size of all backup repositories".to_string()),
unit: Some("GB".to_string()),
});
// Disk space metrics for backup directory
if let Some(ref disk_space) = backup_status.disk_space {
metrics.push(Metric {
name: "backup_disk_total_gb".to_string(),
value: MetricValue::Float(disk_space.total_gb as f32),
status: Status::Ok,
timestamp,
description: Some("Total disk space available for backups".to_string()),
unit: Some("GB".to_string()),
});
metrics.push(Metric {
name: "backup_disk_used_gb".to_string(),
value: MetricValue::Float(disk_space.used_gb as f32),
status: Status::Ok,
timestamp,
description: Some("Used disk space on backup drive".to_string()),
unit: Some("GB".to_string()),
});
metrics.push(Metric {
name: "backup_disk_available_gb".to_string(),
value: MetricValue::Float(disk_space.available_gb as f32),
status: Status::Ok,
timestamp,
description: Some("Available disk space on backup drive".to_string()),
unit: Some("GB".to_string()),
});
metrics.push(Metric {
name: "backup_disk_usage_percent".to_string(),
value: MetricValue::Float(disk_space.usage_percent as f32),
status: if disk_space.usage_percent >= 95.0 {
Status::Critical
} else if disk_space.usage_percent >= 85.0 {
Status::Warning
} else {
Status::Ok
},
timestamp,
description: Some("Backup disk usage percentage".to_string()),
unit: Some("percent".to_string()),
});
// Add disk identification metrics if available from disk_space
if let Some(ref product_name) = disk_space.product_name {
metrics.push(Metric {
name: "backup_disk_product_name".to_string(),
value: MetricValue::String(product_name.clone()),
status: Status::Ok,
timestamp,
description: Some("Backup disk product name from SMART data".to_string()),
unit: None,
});
}
if let Some(ref serial_number) = disk_space.serial_number {
metrics.push(Metric {
name: "backup_disk_serial_number".to_string(),
value: MetricValue::String(serial_number.clone()),
status: Status::Ok,
timestamp,
description: Some("Backup disk serial number from SMART data".to_string()),
unit: None,
});
}
}
// Add standalone disk identification metrics from TOML fields
if let Some(ref product_name) = backup_status.disk_product_name {
metrics.push(Metric {
name: "backup_disk_product_name".to_string(),
value: MetricValue::String(product_name.clone()),
status: Status::Ok,
timestamp,
description: Some("Backup disk product name from SMART data".to_string()),
unit: None,
});
}
if let Some(ref serial_number) = backup_status.disk_serial_number {
metrics.push(Metric {
name: "backup_disk_serial_number".to_string(),
value: MetricValue::String(serial_number.clone()),
status: Status::Ok,
timestamp,
description: Some("Backup disk serial number from SMART data".to_string()),
unit: None,
});
}
// Count services by status
let mut status_counts = HashMap::new();
for service in backup_status.services.values() {
*status_counts.entry(service.status.clone()).or_insert(0) += 1;
}
for (status_name, count) in status_counts {
metrics.push(Metric {
name: format!("backup_services_{}_count", status_name),
value: MetricValue::Integer(count),
status: Status::Ok,
timestamp,
description: Some(format!("Number of services with status: {}", status_name)),
unit: Some("services".to_string()),
});
}
Ok(metrics)
}
}
/// TOML structure for backup status file
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BackupStatusToml {
pub backup_name: String,
pub start_time: String,
pub current_time: String,
pub duration_seconds: i64,
pub status: String,
pub last_updated: String,
pub disk_space: Option<DiskSpace>,
pub disk_product_name: Option<String>,
pub disk_serial_number: Option<String>,
pub services: HashMap<String, ServiceStatus>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DiskSpace {
pub total_bytes: u64,
pub used_bytes: u64,
pub available_bytes: u64,
pub total_gb: f64,
pub used_gb: f64,
pub available_gb: f64,
pub usage_percent: f64,
// Optional disk identification fields
pub product_name: Option<String>,
pub serial_number: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServiceStatus {
pub status: String,
pub exit_code: i64,
pub repo_path: String,
pub archive_count: i64,
pub repo_size_bytes: u64,
}

View File

@@ -173,152 +173,7 @@ impl CpuCollector {
Ok(None)
}
/// Collect top CPU consuming process using ps command for accurate percentages
async fn collect_top_cpu_process(&self) -> Result<Option<Metric>, CollectorError> {
use std::process::Command;
// Use ps to get current CPU percentages, sorted by CPU usage
let output = Command::new("ps")
.arg("aux")
.arg("--sort=-%cpu")
.arg("--no-headers")
.output()
.map_err(|e| CollectorError::SystemRead {
path: "ps command".to_string(),
error: e.to_string(),
})?;
if !output.status.success() {
return Ok(None);
}
let output_str = String::from_utf8_lossy(&output.stdout);
// Parse lines and find the first non-ps process (to avoid catching our own ps command)
for line in output_str.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 11 {
// ps aux format: USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
let pid = parts[1];
let cpu_percent = parts[2];
let full_command = parts[10..].join(" ");
// Skip ps processes to avoid catching our own ps command
if full_command.contains("ps aux") || full_command.starts_with("ps ") {
continue;
}
// Extract just the command name (basename of executable)
let command_name = if let Some(first_part) = parts.get(10) {
// Get just the executable name, not the full path
if let Some(basename) = first_part.split('/').last() {
basename.to_string()
} else {
first_part.to_string()
}
} else {
"unknown".to_string()
};
// Validate CPU percentage is reasonable (not over 100% per core)
if let Ok(cpu_val) = cpu_percent.parse::<f32>() {
if cpu_val > 1000.0 {
// Skip obviously wrong values
continue;
}
}
let process_info = format!("{} (PID {}) {}%", command_name, pid, cpu_percent);
return Ok(Some(Metric::new(
"top_cpu_process".to_string(),
MetricValue::String(process_info),
Status::Ok,
).with_description("Process consuming the most CPU".to_string())));
}
}
Ok(Some(Metric::new(
"top_cpu_process".to_string(),
MetricValue::String("No processes found".to_string()),
Status::Ok,
).with_description("Process consuming the most CPU".to_string())))
}
/// Collect top RAM consuming process using ps command for accurate memory usage
async fn collect_top_ram_process(&self) -> Result<Option<Metric>, CollectorError> {
use std::process::Command;
// Use ps to get current memory usage, sorted by memory
let output = Command::new("ps")
.arg("aux")
.arg("--sort=-%mem")
.arg("--no-headers")
.output()
.map_err(|e| CollectorError::SystemRead {
path: "ps command".to_string(),
error: e.to_string(),
})?;
if !output.status.success() {
return Ok(None);
}
let output_str = String::from_utf8_lossy(&output.stdout);
// Parse lines and find the first non-ps process (to avoid catching our own ps command)
for line in output_str.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 11 {
// ps aux format: USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
let pid = parts[1];
let mem_percent = parts[3];
let rss_kb = parts[5]; // RSS in KB
let full_command = parts[10..].join(" ");
// Skip ps processes to avoid catching our own ps command
if full_command.contains("ps aux") || full_command.starts_with("ps ") {
continue;
}
// Extract just the command name (basename of executable)
let command_name = if let Some(first_part) = parts.get(10) {
// Get just the executable name, not the full path
if let Some(basename) = first_part.split('/').last() {
basename.to_string()
} else {
first_part.to_string()
}
} else {
"unknown".to_string()
};
// Convert RSS from KB to MB
if let Ok(rss_kb_val) = rss_kb.parse::<u64>() {
let rss_mb = rss_kb_val as f32 / 1024.0;
// Skip processes with very little memory (likely temporary commands)
if rss_mb < 1.0 {
continue;
}
let process_info = format!("{} (PID {}) {:.1}MB", command_name, pid, rss_mb);
return Ok(Some(Metric::new(
"top_ram_process".to_string(),
MetricValue::String(process_info),
Status::Ok,
).with_description("Process consuming the most RAM".to_string())));
}
}
}
Ok(Some(Metric::new(
"top_ram_process".to_string(),
MetricValue::String("No processes found".to_string()),
Status::Ok,
).with_description("Process consuming the most RAM".to_string())))
}
}
#[async_trait]
@@ -347,15 +202,6 @@ impl Collector for CpuCollector {
metrics.push(freq_metric);
}
// Collect top CPU process (optional)
if let Some(top_cpu_metric) = self.collect_top_cpu_process().await? {
metrics.push(top_cpu_metric);
}
// Collect top RAM process (optional)
if let Some(top_ram_metric) = self.collect_top_ram_process().await? {
metrics.push(top_ram_metric);
}
let duration = start.elapsed();
debug!("CPU collection completed in {:?} with {} metrics", duration, metrics.len());

View File

@@ -1,12 +1,26 @@
use anyhow::Result;
use async_trait::async_trait;
use cm_dashboard_shared::{Metric, MetricValue, Status};
use std::collections::HashMap;
use std::process::Command;
use std::time::Instant;
use tracing::debug;
use super::{Collector, CollectorError, PerformanceMetrics};
/// Information about a mounted disk
#[derive(Debug, Clone)]
struct MountedDisk {
device: String, // e.g., "/dev/nvme0n1p1"
physical_device: String, // e.g., "/dev/nvme0n1"
mount_point: String, // e.g., "/"
filesystem: String, // e.g., "ext4"
size: String, // e.g., "120G"
used: String, // e.g., "45G"
available: String, // e.g., "75G"
usage_percent: f32, // e.g., 38.5
}
/// Disk usage collector for monitoring filesystem sizes
pub struct DiskCollector {
// Immutable collector for caching compatibility
@@ -71,6 +85,142 @@ impl DiskCollector {
Ok((total_bytes, used_bytes))
}
/// Get root filesystem disk usage
fn get_root_filesystem_usage(&self) -> Result<(u64, u64, f32)> {
let (total_bytes, used_bytes) = self.get_filesystem_info("/")?;
let usage_percent = (used_bytes as f64 / total_bytes as f64) * 100.0;
Ok((total_bytes, used_bytes, usage_percent as f32))
}
/// Get all mounted disks with their mount points and underlying devices
fn get_mounted_disks(&self) -> Result<Vec<MountedDisk>> {
let output = Command::new("df")
.arg("-h")
.arg("--output=source,target,fstype,size,used,avail,pcent")
.output()?;
if !output.status.success() {
return Err(anyhow::anyhow!("df command failed"));
}
let output_str = String::from_utf8(output.stdout)?;
let mut mounted_disks = Vec::new();
for line in output_str.lines().skip(1) { // Skip header
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 7 {
let source = fields[0];
let target = fields[1];
let fstype = fields[2];
let size = fields[3];
let used = fields[4];
let avail = fields[5];
let pcent_str = fields[6];
// Skip special filesystems
if source.starts_with("/dev/") &&
!fstype.contains("tmpfs") &&
!fstype.contains("devtmpfs") &&
!target.starts_with("/proc") &&
!target.starts_with("/sys") &&
!target.starts_with("/dev") {
// Extract percentage
let usage_percent = pcent_str
.trim_end_matches('%')
.parse::<f32>()
.unwrap_or(0.0);
// Get underlying physical device
let physical_device = self.get_physical_device(source)?;
mounted_disks.push(MountedDisk {
device: source.to_string(),
physical_device,
mount_point: target.to_string(),
filesystem: fstype.to_string(),
size: size.to_string(),
used: used.to_string(),
available: avail.to_string(),
usage_percent,
});
}
}
}
Ok(mounted_disks)
}
/// Get the physical device for a given device (resolves symlinks, gets parent device)
fn get_physical_device(&self, device: &str) -> Result<String> {
// For NVMe: /dev/nvme0n1p1 -> /dev/nvme0n1
if device.contains("nvme") && device.contains("p") {
if let Some(base) = device.split('p').next() {
return Ok(base.to_string());
}
}
// For SATA: /dev/sda1 -> /dev/sda
if device.starts_with("/dev/sd") && device.len() > 8 {
return Ok(device[..8].to_string()); // Keep /dev/sdX
}
// For VirtIO: /dev/vda1 -> /dev/vda
if device.starts_with("/dev/vd") && device.len() > 8 {
return Ok(device[..8].to_string());
}
// If no partition detected, return as-is
Ok(device.to_string())
}
/// Get SMART health for a specific physical device
fn get_smart_health(&self, device: &str) -> (String, f32) {
if let Ok(output) = Command::new("smartctl")
.arg("-H")
.arg(device)
.output()
{
if output.status.success() {
let output_str = String::from_utf8_lossy(&output.stdout);
let health_status = if output_str.contains("PASSED") {
"PASSED"
} else if output_str.contains("FAILED") {
"FAILED"
} else {
"UNKNOWN"
};
// Try to get temperature
let temperature = if let Ok(temp_output) = Command::new("smartctl")
.arg("-A")
.arg(device)
.output()
{
let temp_str = String::from_utf8_lossy(&temp_output.stdout);
// Look for temperature in SMART attributes
for line in temp_str.lines() {
if line.contains("Temperature") && line.contains("Celsius") {
if let Some(temp_part) = line.split_whitespace().nth(9) {
if let Ok(temp) = temp_part.parse::<f32>() {
return (health_status.to_string(), temp);
}
}
}
}
0.0
} else {
0.0
};
return (health_status.to_string(), temperature);
}
}
("UNKNOWN".to_string(), 0.0)
}
/// Calculate status based on usage percentage
fn calculate_usage_status(&self, used_bytes: u64, total_bytes: u64) -> Status {
if total_bytes == 0 {
@@ -88,6 +238,38 @@ impl DiskCollector {
Status::Ok
}
}
/// Parse size string (e.g., "120G", "45M") to GB value
fn parse_size_to_gb(&self, size_str: &str) -> f32 {
let size_str = size_str.trim();
if size_str.is_empty() || size_str == "-" {
return 0.0;
}
// Extract numeric part and unit
let (num_str, unit) = if let Some(last_char) = size_str.chars().last() {
if last_char.is_alphabetic() {
let num_part = &size_str[..size_str.len()-1];
let unit_part = &size_str[size_str.len()-1..];
(num_part, unit_part)
} else {
(size_str, "")
}
} else {
(size_str, "")
};
let number: f32 = num_str.parse().unwrap_or(0.0);
match unit.to_uppercase().as_str() {
"T" | "TB" => number * 1024.0,
"G" | "GB" => number,
"M" | "MB" => number / 1024.0,
"K" | "KB" => number / (1024.0 * 1024.0),
"B" | "" => number / (1024.0 * 1024.0 * 1024.0),
_ => number, // Assume GB if unknown unit
}
}
}
#[async_trait]
@@ -98,11 +280,186 @@ impl Collector for DiskCollector {
async fn collect(&self) -> Result<Vec<Metric>, CollectorError> {
let start_time = Instant::now();
debug!("Collecting disk metrics");
debug!("Collecting multi-disk metrics");
let mut metrics = Vec::new();
// Monitor /tmp directory size
// Collect all mounted disks
match self.get_mounted_disks() {
Ok(mounted_disks) => {
debug!("Found {} mounted disks", mounted_disks.len());
// Group disks by physical device to avoid duplicate SMART checks
let mut physical_devices: std::collections::HashMap<String, Vec<&MountedDisk>> = std::collections::HashMap::new();
for disk in &mounted_disks {
physical_devices.entry(disk.physical_device.clone())
.or_insert_with(Vec::new)
.push(disk);
}
// Generate metrics for each mounted disk
for (disk_index, disk) in mounted_disks.iter().enumerate() {
let timestamp = chrono::Utc::now().timestamp() as u64;
// Parse size strings to get actual values for calculations
let size_gb = self.parse_size_to_gb(&disk.size);
let used_gb = self.parse_size_to_gb(&disk.used);
let avail_gb = self.parse_size_to_gb(&disk.available);
// Calculate status based on usage percentage
let status = if disk.usage_percent >= 95.0 {
Status::Critical
} else if disk.usage_percent >= 85.0 {
Status::Warning
} else {
Status::Ok
};
// Device and mount point info
metrics.push(Metric {
name: format!("disk_{}_device", disk_index),
value: MetricValue::String(disk.device.clone()),
unit: None,
description: Some(format!("Device: {}", disk.device)),
status: Status::Ok,
timestamp,
});
metrics.push(Metric {
name: format!("disk_{}_mount_point", disk_index),
value: MetricValue::String(disk.mount_point.clone()),
unit: None,
description: Some(format!("Mount: {}", disk.mount_point)),
status: Status::Ok,
timestamp,
});
metrics.push(Metric {
name: format!("disk_{}_filesystem", disk_index),
value: MetricValue::String(disk.filesystem.clone()),
unit: None,
description: Some(format!("FS: {}", disk.filesystem)),
status: Status::Ok,
timestamp,
});
// Size metrics
metrics.push(Metric {
name: format!("disk_{}_total_gb", disk_index),
value: MetricValue::Float(size_gb),
unit: Some("GB".to_string()),
description: Some(format!("Total: {}", disk.size)),
status: Status::Ok,
timestamp,
});
metrics.push(Metric {
name: format!("disk_{}_used_gb", disk_index),
value: MetricValue::Float(used_gb),
unit: Some("GB".to_string()),
description: Some(format!("Used: {}", disk.used)),
status,
timestamp,
});
metrics.push(Metric {
name: format!("disk_{}_available_gb", disk_index),
value: MetricValue::Float(avail_gb),
unit: Some("GB".to_string()),
description: Some(format!("Available: {}", disk.available)),
status: Status::Ok,
timestamp,
});
metrics.push(Metric {
name: format!("disk_{}_usage_percent", disk_index),
value: MetricValue::Float(disk.usage_percent),
unit: Some("%".to_string()),
description: Some(format!("Usage: {:.1}%", disk.usage_percent)),
status,
timestamp,
});
// Physical device name (for SMART health grouping)
let physical_device_name = disk.physical_device
.strip_prefix("/dev/")
.unwrap_or(&disk.physical_device);
metrics.push(Metric {
name: format!("disk_{}_physical_device", disk_index),
value: MetricValue::String(physical_device_name.to_string()),
unit: None,
description: Some(format!("Physical: {}", physical_device_name)),
status: Status::Ok,
timestamp,
});
}
// Add SMART health metrics for each unique physical device
for (physical_device, disks) in physical_devices {
let (health_status, temperature) = self.get_smart_health(&physical_device);
let device_name = physical_device.strip_prefix("/dev/").unwrap_or(&physical_device);
let timestamp = chrono::Utc::now().timestamp() as u64;
let health_status_enum = match health_status.as_str() {
"PASSED" => Status::Ok,
"FAILED" => Status::Critical,
_ => Status::Unknown,
};
metrics.push(Metric {
name: format!("disk_smart_{}_health", device_name),
value: MetricValue::String(health_status.clone()),
unit: None,
description: Some(format!("SMART Health: {}", health_status)),
status: health_status_enum,
timestamp,
});
if temperature > 0.0 {
let temp_status = if temperature >= 70.0 {
Status::Critical
} else if temperature >= 60.0 {
Status::Warning
} else {
Status::Ok
};
metrics.push(Metric {
name: format!("disk_smart_{}_temperature", device_name),
value: MetricValue::Float(temperature),
unit: Some("°C".to_string()),
description: Some(format!("Temperature: {:.0}°C", temperature)),
status: temp_status,
timestamp,
});
}
}
// Add disk count metric
metrics.push(Metric {
name: "disk_count".to_string(),
value: MetricValue::Integer(mounted_disks.len() as i64),
unit: None,
description: Some(format!("Total mounted disks: {}", mounted_disks.len())),
status: Status::Ok,
timestamp: chrono::Utc::now().timestamp() as u64,
});
}
Err(e) => {
debug!("Failed to get mounted disks: {}", e);
metrics.push(Metric {
name: "disk_count".to_string(),
value: MetricValue::Integer(0),
unit: None,
description: Some(format!("Error: {}", e)),
status: Status::Unknown,
timestamp: chrono::Utc::now().timestamp() as u64,
});
}
}
// Monitor /tmp directory size (keep existing functionality)
match self.get_directory_size("/tmp") {
Ok(tmp_size_bytes) => {
let tmp_size_mb = tmp_size_bytes as f64 / (1024.0 * 1024.0);
@@ -161,7 +518,7 @@ impl Collector for DiskCollector {
}
let collection_time = start_time.elapsed();
debug!("Disk collection completed in {:?} with {} metrics",
debug!("Multi-disk collection completed in {:?} with {} metrics",
collection_time, metrics.len());
Ok(metrics)

View File

@@ -7,6 +7,7 @@ pub mod cpu;
pub mod memory;
pub mod disk;
pub mod systemd;
pub mod backup;
pub mod error;
pub use error::CollectorError;

View File

@@ -25,6 +25,12 @@ struct ServiceCacheState {
last_discovery_time: Option<Instant>,
/// How often to rediscover services (5 minutes)
discovery_interval_seconds: u64,
/// Cached nginx site latency metrics
nginx_site_metrics: Vec<Metric>,
/// Last time nginx sites were checked
last_nginx_check_time: Option<Instant>,
/// How often to check nginx site latency (30 seconds)
nginx_check_interval_seconds: u64,
}
impl SystemdCollector {
@@ -35,6 +41,9 @@ impl SystemdCollector {
monitored_services: Vec::new(),
last_discovery_time: None,
discovery_interval_seconds: 300, // 5 minutes
nginx_site_metrics: Vec::new(),
last_nginx_check_time: None,
nginx_check_interval_seconds: 30, // 30 seconds for nginx sites
}),
}
}
@@ -71,6 +80,32 @@ impl SystemdCollector {
Ok(state.monitored_services.clone())
}
/// Get nginx site metrics, checking them if cache is expired
fn get_nginx_site_metrics(&self) -> Vec<Metric> {
let mut state = self.state.write().unwrap();
// Check if we need to refresh nginx site metrics
let needs_refresh = match state.last_nginx_check_time {
None => true, // First time
Some(last_time) => {
let elapsed = last_time.elapsed().as_secs();
elapsed >= state.nginx_check_interval_seconds
}
};
if needs_refresh {
// Only check nginx sites if nginx service is active
if state.monitored_services.iter().any(|s| s.contains("nginx")) {
debug!("Refreshing nginx site latency metrics (interval: {}s)", state.nginx_check_interval_seconds);
let fresh_metrics = self.get_nginx_sites();
state.nginx_site_metrics = fresh_metrics;
state.last_nginx_check_time = Some(Instant::now());
}
}
state.nginx_site_metrics.clone()
}
/// Auto-discover interesting services to monitor
fn discover_services(&self) -> Result<Vec<String>> {
let output = Command::new("systemctl")
@@ -88,22 +123,86 @@ impl SystemdCollector {
let output_str = String::from_utf8(output.stdout)?;
let mut services = Vec::new();
// Interesting service patterns to monitor
let interesting_patterns = [
"nginx", "apache", "httpd", "gitea", "docker", "mysql", "postgresql",
"redis", "ssh", "sshd", "postfix", "mosquitto", "grafana", "prometheus",
"vaultwarden", "unifi", "immich", "plex", "jellyfin", "transmission",
"syncthing", "nextcloud", "owncloud", "mariadb", "mongodb"
// Skip setup/certificate services that don't need monitoring (from legacy)
let excluded_services = [
"mosquitto-certs",
"immich-setup",
"phpfpm-kryddorten",
"phpfpm-mariehall2",
"acme-haasp.net",
"acme-selfsigned-haasp",
"borgbackup",
"haasp-site-deploy",
"mosquitto-backup",
"nginx-config-reload",
"sshd-keygen",
];
// Define patterns for services we want to monitor (from legacy)
let interesting_services = [
// Web applications
"gitea",
"immich",
"vaultwarden",
"unifi",
"wordpress",
"nginx",
"httpd",
// Databases
"postgresql",
"mysql",
"mariadb",
"redis",
"mongodb",
"mongod",
// Backup and storage
"borg",
"rclone",
// Container runtimes
"docker",
// CI/CD services
"gitea-actions",
"gitea-runner",
"actions-runner",
// Network services
"sshd",
"dnsmasq",
// MQTT and IoT services
"mosquitto",
"mqtt",
// PHP-FPM services
"phpfpm",
// Home automation
"haasp",
// Backup services
"backup",
];
for line in output_str.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 4 && fields[0].ends_with(".service") {
let service_name = fields[0].trim_end_matches(".service");
debug!("Processing service: '{}'", service_name);
// Skip excluded services first
let mut is_excluded = false;
for excluded in &excluded_services {
if service_name.contains(excluded) {
debug!("EXCLUDING service '{}' because it matches pattern '{}'", service_name, excluded);
is_excluded = true;
break;
}
}
if is_excluded {
debug!("Skipping excluded service: '{}'", service_name);
continue;
}
// Check if this service matches our interesting patterns
for pattern in &interesting_patterns {
if service_name.contains(pattern) {
for pattern in &interesting_services {
if service_name.contains(pattern) || pattern.contains(service_name) {
debug!("INCLUDING service '{}' because it matches pattern '{}'", service_name, pattern);
services.push(service_name.to_string());
break;
}
@@ -571,140 +670,7 @@ impl SystemdCollector {
Some(estimated_gb)
}
/// Get nginx virtual hosts/sites
fn get_nginx_sites(&self) -> Vec<Metric> {
let mut metrics = Vec::new();
// Check sites-enabled directory
let output = Command::new("ls")
.arg("/etc/nginx/sites-enabled/")
.output();
if let Ok(output) = output {
if output.status.success() {
let output_str = String::from_utf8_lossy(&output.stdout);
for line in output_str.lines() {
let site_name = line.trim();
if !site_name.is_empty() && site_name != "default" {
// Check if site config is valid
let test_output = Command::new("nginx")
.arg("-t")
.arg("-c")
.arg(format!("/etc/nginx/sites-enabled/{}", site_name))
.output();
let status = match test_output {
Ok(out) if out.status.success() => Status::Ok,
_ => Status::Warning,
};
metrics.push(Metric {
name: format!("service_nginx_site_{}_status", site_name),
value: MetricValue::String(if status == Status::Ok { "active".to_string() } else { "error".to_string() }),
unit: None,
description: Some(format!("Nginx site {} configuration status", site_name)),
status,
timestamp: chrono::Utc::now().timestamp() as u64,
});
}
}
}
}
metrics
}
/// Get docker containers
fn get_docker_containers(&self) -> Vec<Metric> {
let mut metrics = Vec::new();
let output = Command::new("docker")
.arg("ps")
.arg("-a")
.arg("--format")
.arg("{{.Names}}\t{{.Status}}\t{{.State}}")
.output();
if let Ok(output) = output {
if output.status.success() {
let output_str = String::from_utf8_lossy(&output.stdout);
for line in output_str.lines() {
let parts: Vec<&str> = line.split('\t').collect();
if parts.len() >= 3 {
let container_name = parts[0].trim();
let status_info = parts[1].trim();
let state = parts[2].trim();
let status = match state.to_lowercase().as_str() {
"running" => Status::Ok,
"exited" | "dead" => Status::Warning,
"paused" | "restarting" => Status::Warning,
_ => Status::Critical,
};
metrics.push(Metric {
name: format!("service_docker_container_{}_status", container_name),
value: MetricValue::String(state.to_string()),
unit: None,
description: Some(format!("Docker container {} status: {}", container_name, status_info)),
status,
timestamp: chrono::Utc::now().timestamp() as u64,
});
// Get container memory usage
if state == "running" {
if let Some(memory_mb) = self.get_container_memory(container_name) {
metrics.push(Metric {
name: format!("service_docker_container_{}_memory_mb", container_name),
value: MetricValue::Float(memory_mb),
unit: Some("MB".to_string()),
description: Some(format!("Docker container {} memory usage", container_name)),
status: Status::Ok,
timestamp: chrono::Utc::now().timestamp() as u64,
});
}
}
}
}
}
}
metrics
}
/// Get container memory usage
fn get_container_memory(&self, container_name: &str) -> Option<f32> {
let output = Command::new("docker")
.arg("stats")
.arg("--no-stream")
.arg("--format")
.arg("{{.MemUsage}}")
.arg(container_name)
.output()
.ok()?;
if !output.status.success() {
return None;
}
let output_str = String::from_utf8(output.stdout).ok()?;
let mem_usage = output_str.trim();
// Parse format like "123.4MiB / 4GiB"
if let Some(used_part) = mem_usage.split(" / ").next() {
if used_part.ends_with("MiB") {
let num_str = used_part.trim_end_matches("MiB");
return num_str.parse::<f32>().ok();
} else if used_part.ends_with("GiB") {
let num_str = used_part.trim_end_matches("GiB");
if let Ok(gb) = num_str.parse::<f32>() {
return Some(gb * 1024.0); // Convert to MB
}
}
}
None
}
}
#[async_trait]
@@ -770,13 +736,11 @@ impl Collector for SystemdCollector {
// Sub-service metrics for specific services
if service.contains("nginx") && active_status == "active" {
let nginx_sites = self.get_nginx_sites();
metrics.extend(nginx_sites);
metrics.extend(self.get_nginx_site_metrics());
}
if service.contains("docker") && active_status == "active" {
let docker_containers = self.get_docker_containers();
metrics.extend(docker_containers);
metrics.extend(self.get_docker_containers());
}
}
Err(e) => {
@@ -795,4 +759,321 @@ impl Collector for SystemdCollector {
fn get_performance_metrics(&self) -> Option<PerformanceMetrics> {
None // Performance tracking handled by cache system
}
}
impl SystemdCollector {
/// Get nginx sites with latency checks
fn get_nginx_sites(&self) -> Vec<Metric> {
let mut metrics = Vec::new();
let timestamp = chrono::Utc::now().timestamp() as u64;
// Discover nginx sites from configuration
let sites = self.discover_nginx_sites();
for (site_name, url) in &sites {
match self.check_site_latency(url) {
Ok(latency_ms) => {
let status = if latency_ms < 500.0 {
Status::Ok
} else if latency_ms < 2000.0 {
Status::Warning
} else {
Status::Critical
};
metrics.push(Metric {
name: format!("service_nginx_{}_latency_ms", site_name),
value: MetricValue::Float(latency_ms),
unit: Some("ms".to_string()),
description: Some(format!("Response time for {}", url)),
status,
timestamp,
});
}
Err(_) => {
// Site is unreachable
metrics.push(Metric {
name: format!("service_nginx_{}_latency_ms", site_name),
value: MetricValue::Float(-1.0), // Use -1 to indicate error
unit: Some("ms".to_string()),
description: Some(format!("Response time for {} (unreachable)", url)),
status: Status::Critical,
timestamp,
});
}
}
}
metrics
}
/// Get docker containers as sub-services
fn get_docker_containers(&self) -> Vec<Metric> {
let mut metrics = Vec::new();
let timestamp = chrono::Utc::now().timestamp() as u64;
// Check if docker is available
let output = Command::new("docker")
.arg("ps")
.arg("--format")
.arg("{{.Names}},{{.Status}}")
.output();
let output = match output {
Ok(out) if out.status.success() => out,
_ => return metrics, // Docker not available or failed
};
let output_str = match String::from_utf8(output.stdout) {
Ok(s) => s,
Err(_) => return metrics,
};
for line in output_str.lines() {
if line.trim().is_empty() {
continue;
}
let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2 {
let container_name = parts[0].trim();
let status_str = parts[1].trim();
let status = if status_str.contains("Up") {
Status::Ok
} else if status_str.contains("Exited") {
Status::Warning
} else {
Status::Critical
};
metrics.push(Metric {
name: format!("service_docker_{}_status", container_name),
value: MetricValue::String(status_str.to_string()),
unit: None,
description: Some(format!("Docker container {} status", container_name)),
status,
timestamp,
});
}
}
metrics
}
/// Check site latency using curl GET requests
fn check_site_latency(&self, url: &str) -> Result<f32, Box<dyn std::error::Error>> {
let _start = std::time::Instant::now();
let output = Command::new("curl")
.arg("-X")
.arg("GET") // Explicitly use GET method
.arg("-s")
.arg("-o")
.arg("/dev/null")
.arg("-w")
.arg("%{time_total}")
.arg("--max-time")
.arg("5") // 5 second timeout
.arg("--connect-timeout")
.arg("2") // 2 second connection timeout
.arg("--location") // Follow redirects
.arg("--fail") // Fail on HTTP errors (4xx, 5xx)
.arg(url)
.output()?;
if !output.status.success() {
return Err(format!("Curl GET request failed for {}", url).into());
}
let time_str = String::from_utf8(output.stdout)?;
let time_seconds: f32 = time_str.trim().parse()?;
let time_ms = time_seconds * 1000.0;
Ok(time_ms)
}
/// Discover nginx sites from configuration files (like the old working implementation)
fn discover_nginx_sites(&self) -> Vec<(String, String)> {
use tracing::debug;
// Use the same approach as the old working agent: get nginx config from systemd
let config_content = match self.get_nginx_config_from_systemd() {
Some(content) => content,
None => {
debug!("Could not get nginx config from systemd, trying nginx -T fallback");
match self.get_nginx_config_via_command() {
Some(content) => content,
None => {
debug!("Could not get nginx config via any method");
return Vec::new();
}
}
}
};
// Parse the config content to extract sites
self.parse_nginx_config_for_sites(&config_content)
}
/// Get nginx config from systemd service definition (NixOS compatible)
fn get_nginx_config_from_systemd(&self) -> Option<String> {
use tracing::debug;
let output = std::process::Command::new("systemctl")
.args(["show", "nginx", "--property=ExecStart", "--no-pager"])
.output()
.ok()?;
if !output.status.success() {
debug!("Failed to get nginx ExecStart from systemd");
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
debug!("systemctl show nginx output: {}", stdout);
// Parse ExecStart to extract -c config path
for line in stdout.lines() {
if line.starts_with("ExecStart=") {
debug!("Found ExecStart line: {}", line);
// Handle both traditional and NixOS systemd formats
if let Some(config_path) = self.extract_config_path_from_exec_start(line) {
debug!("Extracted config path: {}", config_path);
// Read the config file
return std::fs::read_to_string(&config_path)
.map_err(|e| debug!("Failed to read config file {}: {}", config_path, e))
.ok();
}
}
}
None
}
/// Extract config path from ExecStart line
fn extract_config_path_from_exec_start(&self, exec_start: &str) -> Option<String> {
use tracing::debug;
// Remove ExecStart= prefix
let exec_part = exec_start.strip_prefix("ExecStart=")?;
debug!("Parsing exec part: {}", exec_part);
// Handle NixOS format: ExecStart={ path=...; argv[]=...nginx -c /config; ... }
if exec_part.contains("argv[]=") {
// Extract the part after argv[]=
let argv_start = exec_part.find("argv[]=")?;
let argv_part = &exec_part[argv_start + 7..]; // Skip "argv[]="
debug!("Found NixOS argv part: {}", argv_part);
// Look for -c flag followed by config path
if let Some(c_pos) = argv_part.find(" -c ") {
let after_c = &argv_part[c_pos + 4..];
// Find the config path (until next space or semicolon)
let config_path = after_c.split([' ', ';']).next()?;
return Some(config_path.to_string());
}
} else {
// Handle traditional format: ExecStart=/path/nginx -c /config
debug!("Parsing traditional format");
if let Some(c_pos) = exec_part.find(" -c ") {
let after_c = &exec_part[c_pos + 4..];
let config_path = after_c.split_whitespace().next()?;
return Some(config_path.to_string());
}
}
None
}
/// Fallback: get nginx config via nginx -T command
fn get_nginx_config_via_command(&self) -> Option<String> {
use tracing::debug;
let output = std::process::Command::new("nginx")
.args(["-T"])
.output()
.ok()?;
if !output.status.success() {
debug!("nginx -T failed");
return None;
}
Some(String::from_utf8_lossy(&output.stdout).to_string())
}
/// Parse nginx config content to extract server names and build site list
fn parse_nginx_config_for_sites(&self, config_content: &str) -> Vec<(String, String)> {
use tracing::debug;
let mut sites = Vec::new();
let lines: Vec<&str> = config_content.lines().collect();
let mut i = 0;
debug!("Parsing nginx config with {} lines", lines.len());
while i < lines.len() {
let line = lines[i].trim();
if line.starts_with("server") && line.contains("{") {
debug!("Found server block at line {}", i);
if let Some(server_name) = self.parse_server_block(&lines, &mut i) {
debug!("Extracted server name: {}", server_name);
let url = format!("https://{}", server_name);
// Use the full domain as the site name for clarity
sites.push((server_name.clone(), url));
}
}
i += 1;
}
debug!("Discovered {} nginx sites total", sites.len());
sites
}
/// Parse a server block to extract the primary server_name
fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option<String> {
use tracing::debug;
let mut server_names = Vec::new();
let mut has_redirect = false;
let mut i = *start_index + 1;
let mut brace_count = 1;
// Parse until we close the server block
while i < lines.len() && brace_count > 0 {
let trimmed = lines[i].trim();
// Track braces
brace_count += trimmed.matches('{').count();
brace_count -= trimmed.matches('}').count();
// Extract server_name
if trimmed.starts_with("server_name") {
if let Some(names_part) = trimmed.strip_prefix("server_name") {
let names_clean = names_part.trim().trim_end_matches(';');
for name in names_clean.split_whitespace() {
if name != "_" && !name.is_empty() && name.contains('.') && !name.starts_with('$') {
server_names.push(name.to_string());
debug!("Found server_name in block: {}", name);
}
}
}
}
// Check for redirects (skip redirect-only servers)
if trimmed.contains("return") && (trimmed.contains("301") || trimmed.contains("302")) {
has_redirect = true;
}
i += 1;
}
*start_index = i - 1;
// Only return hostnames that are not redirects and have actual content
if !server_names.is_empty() && !has_redirect {
Some(server_names[0].clone())
} else {
None
}
}
}