Implement real-time process monitoring and fix UI hardcoded data
This commit addresses several key issues identified during development: Major Changes: - Replace hardcoded top CPU/RAM process display with real system data - Add intelligent process monitoring to CpuCollector using ps command - Fix disk metrics permission issues in systemd collector - Optimize service collection to focus on status, memory, and disk only - Update dashboard widgets to display live process information Process Monitoring Implementation: - Added collect_top_cpu_process() and collect_top_ram_process() methods - Implemented ps-based monitoring with accurate CPU percentages - Added filtering to prevent self-monitoring artifacts (ps commands) - Enhanced error handling and validation for process data - Dashboard now shows realistic values like "claude (PID 2974) 11.0%" Service Collection Optimization: - Removed CPU monitoring from systemd collector for efficiency - Enhanced service directory permission error logging - Simplified services widget to show essential metrics only - Fixed service-to-directory mapping accuracy UI and Dashboard Improvements: - Reorganized dashboard layout with btop-inspired multi-panel design - Updated system panel to include real top CPU/RAM process display - Enhanced widget formatting and data presentation - Removed placeholder/hardcoded data throughout the interface Technical Details: - Updated agent/src/collectors/cpu.rs with process monitoring - Modified dashboard/src/ui/mod.rs for real-time process display - Enhanced systemd collector error handling and disk metrics - Updated CLAUDE.md documentation with implementation details
This commit is contained in:
@@ -1,479 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
use tokio::fs;
|
||||
|
||||
use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BackupCollector {
|
||||
pub interval: Duration,
|
||||
pub restic_repo: Option<String>,
|
||||
pub backup_service: String,
|
||||
pub timeout_ms: u64,
|
||||
}
|
||||
|
||||
impl BackupCollector {
|
||||
pub fn new(
|
||||
_enabled: bool,
|
||||
interval_ms: u64,
|
||||
restic_repo: Option<String>,
|
||||
backup_service: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
interval: Duration::from_millis(interval_ms),
|
||||
restic_repo,
|
||||
backup_service,
|
||||
timeout_ms: 30000, // 30 second timeout for backup operations
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_borgbackup_metrics(&self) -> Result<BorgbackupMetrics, CollectorError> {
|
||||
// Read metrics from the borgbackup JSON file
|
||||
let metrics_path = "/var/lib/backup/backup-metrics.json";
|
||||
|
||||
let content = fs::read_to_string(metrics_path)
|
||||
.await
|
||||
.map_err(|e| CollectorError::IoError {
|
||||
message: format!("Failed to read backup metrics file: {}", e),
|
||||
})?;
|
||||
|
||||
let metrics: BorgbackupMetrics = serde_json::from_str(&content)
|
||||
.map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse backup metrics JSON: {}", e),
|
||||
})?;
|
||||
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
async fn get_restic_snapshots(&self) -> Result<ResticStats, CollectorError> {
|
||||
let repo = self
|
||||
.restic_repo
|
||||
.as_ref()
|
||||
.ok_or_else(|| CollectorError::ConfigError {
|
||||
message: "No restic repository configured".to_string(),
|
||||
})?;
|
||||
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
// Get restic snapshots
|
||||
let output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("restic")
|
||||
.args(["-r", repo, "snapshots", "--json"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} snapshots --json", repo),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} snapshots --json", repo),
|
||||
message: stderr.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let snapshots: Vec<ResticSnapshot> =
|
||||
serde_json::from_str(&stdout).map_err(|e| CollectorError::ParseError {
|
||||
message: format!("Failed to parse restic snapshots: {}", e),
|
||||
})?;
|
||||
|
||||
// Get repository stats
|
||||
let stats_output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("restic")
|
||||
.args(["-r", repo, "stats", "--json"])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("restic -r {} stats --json", repo),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let repo_size_gb = if stats_output.status.success() {
|
||||
let stats_stdout = String::from_utf8_lossy(&stats_output.stdout);
|
||||
let stats: Result<ResticStats, _> = serde_json::from_str(&stats_stdout);
|
||||
stats
|
||||
.ok()
|
||||
.map(|s| s.total_size as f32 / (1024.0 * 1024.0 * 1024.0))
|
||||
.unwrap_or(0.0)
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Find most recent snapshot
|
||||
let last_success = snapshots.iter().map(|s| s.time).max();
|
||||
|
||||
Ok(ResticStats {
|
||||
total_size: (repo_size_gb * 1024.0 * 1024.0 * 1024.0) as u64,
|
||||
snapshot_count: snapshots.len() as u32,
|
||||
last_success,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_backup_service_status(&self) -> Result<BackupServiceData, CollectorError> {
|
||||
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
||||
|
||||
// Get systemctl status for backup service
|
||||
let status_output = timeout(
|
||||
timeout_duration,
|
||||
Command::new("/run/current-system/sw/bin/systemctl")
|
||||
.args([
|
||||
"show",
|
||||
&self.backup_service,
|
||||
"--property=ActiveState,SubState,MainPID",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CollectorError::Timeout {
|
||||
duration_ms: self.timeout_ms,
|
||||
})?
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("systemctl show {}", self.backup_service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
let enabled = if status_output.status.success() {
|
||||
let status_stdout = String::from_utf8_lossy(&status_output.stdout);
|
||||
status_stdout.contains("ActiveState=active")
|
||||
|| status_stdout.contains("SubState=running")
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// Check for backup timer or service logs for last message
|
||||
let last_message = self.get_last_backup_log_message().await.ok();
|
||||
|
||||
// Check for pending backup jobs (simplified - could check systemd timers)
|
||||
let pending_jobs = 0; // TODO: Implement proper pending job detection
|
||||
|
||||
Ok(BackupServiceData {
|
||||
enabled,
|
||||
pending_jobs,
|
||||
last_message,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_last_backup_log_message(&self) -> Result<String, CollectorError> {
|
||||
let output = Command::new("/run/current-system/sw/bin/journalctl")
|
||||
.args([
|
||||
"-u",
|
||||
&self.backup_service,
|
||||
"--lines=1",
|
||||
"--no-pager",
|
||||
"--output=cat",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!("journalctl -u {} --lines=1", self.backup_service),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if output.status.success() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let message = stdout.trim().to_string();
|
||||
if !message.is_empty() {
|
||||
return Ok(message);
|
||||
}
|
||||
}
|
||||
|
||||
Err(CollectorError::ParseError {
|
||||
message: "No log messages found".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_backup_logs_for_failures(&self) -> Result<Option<DateTime<Utc>>, CollectorError> {
|
||||
let output = Command::new("/run/current-system/sw/bin/journalctl")
|
||||
.args([
|
||||
"-u",
|
||||
&self.backup_service,
|
||||
"--since",
|
||||
"1 week ago",
|
||||
"--grep=failed\\|error\\|ERROR",
|
||||
"--output=json",
|
||||
"--lines=1",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| CollectorError::CommandFailed {
|
||||
command: format!(
|
||||
"journalctl -u {} --since='1 week ago' --grep=failed",
|
||||
self.backup_service
|
||||
),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
|
||||
if output.status.success() {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
if let Ok(log_entry) = serde_json::from_str::<JournalEntry>(&stdout) {
|
||||
if let Ok(timestamp) = log_entry.realtime_timestamp.parse::<i64>() {
|
||||
let dt =
|
||||
DateTime::from_timestamp_micros(timestamp).unwrap_or_else(|| Utc::now());
|
||||
return Ok(Some(dt));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn determine_backup_status(
|
||||
&self,
|
||||
restic_stats: &Result<ResticStats, CollectorError>,
|
||||
service_data: &BackupServiceData,
|
||||
last_failure: Option<DateTime<Utc>>,
|
||||
) -> BackupStatus {
|
||||
match restic_stats {
|
||||
Ok(stats) => {
|
||||
if let Some(last_success) = stats.last_success {
|
||||
let hours_since_backup =
|
||||
Utc::now().signed_duration_since(last_success).num_hours();
|
||||
|
||||
if hours_since_backup > 48 {
|
||||
BackupStatus::Warning // More than 2 days since last backup
|
||||
} else if let Some(failure) = last_failure {
|
||||
if failure > last_success {
|
||||
BackupStatus::Failed // Failure after last success
|
||||
} else {
|
||||
BackupStatus::Healthy
|
||||
}
|
||||
} else {
|
||||
BackupStatus::Healthy
|
||||
}
|
||||
} else {
|
||||
BackupStatus::Warning // No successful backups found
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if service_data.enabled {
|
||||
BackupStatus::Failed // Service enabled but can't access repo
|
||||
} else {
|
||||
BackupStatus::Unknown // Service disabled
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Collector for BackupCollector {
|
||||
fn name(&self) -> &str {
|
||||
"backup"
|
||||
}
|
||||
|
||||
fn agent_type(&self) -> AgentType {
|
||||
AgentType::Backup
|
||||
}
|
||||
|
||||
fn collect_interval(&self) -> Duration {
|
||||
self.interval
|
||||
}
|
||||
|
||||
|
||||
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
||||
// Try to get borgbackup metrics first, fall back to restic if not available
|
||||
let borgbackup_result = self.get_borgbackup_metrics().await;
|
||||
|
||||
let (backup_info, overall_status) = match &borgbackup_result {
|
||||
Ok(borg_metrics) => {
|
||||
// Parse borgbackup timestamp to DateTime
|
||||
let last_success = chrono::DateTime::from_timestamp(borg_metrics.timestamp, 0);
|
||||
|
||||
// Determine status from borgbackup data
|
||||
let status = match borg_metrics.status.as_str() {
|
||||
"success" => BackupStatus::Healthy,
|
||||
"warning" => BackupStatus::Warning,
|
||||
"failed" => BackupStatus::Failed,
|
||||
_ => BackupStatus::Unknown,
|
||||
};
|
||||
|
||||
let backup_info = BackupInfo {
|
||||
last_success,
|
||||
last_failure: None, // borgbackup metrics don't include failure info
|
||||
size_gb: borg_metrics.repository.total_repository_size_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
latest_archive_size_gb: Some(borg_metrics.repository.latest_archive_size_bytes as f32 / (1024.0 * 1024.0 * 1024.0)),
|
||||
snapshot_count: borg_metrics.repository.total_archives as u32,
|
||||
};
|
||||
|
||||
(backup_info, status)
|
||||
},
|
||||
Err(_) => {
|
||||
// Fall back to restic if borgbackup metrics not available
|
||||
let restic_stats = self.get_restic_snapshots().await;
|
||||
let last_failure = self.get_backup_logs_for_failures().await.unwrap_or(None);
|
||||
|
||||
// Get backup service status for fallback determination
|
||||
let service_data = self
|
||||
.get_backup_service_status()
|
||||
.await
|
||||
.unwrap_or(BackupServiceData {
|
||||
enabled: false,
|
||||
pending_jobs: 0,
|
||||
last_message: None,
|
||||
});
|
||||
|
||||
let overall_status = self.determine_backup_status(&restic_stats, &service_data, last_failure);
|
||||
|
||||
let backup_info = match &restic_stats {
|
||||
Ok(stats) => BackupInfo {
|
||||
last_success: stats.last_success,
|
||||
last_failure,
|
||||
size_gb: stats.total_size as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
latest_archive_size_gb: None, // Restic doesn't provide this easily
|
||||
snapshot_count: stats.snapshot_count,
|
||||
},
|
||||
Err(_) => BackupInfo {
|
||||
last_success: None,
|
||||
last_failure,
|
||||
size_gb: 0.0,
|
||||
latest_archive_size_gb: None,
|
||||
snapshot_count: 0,
|
||||
},
|
||||
};
|
||||
|
||||
(backup_info, overall_status)
|
||||
}
|
||||
};
|
||||
|
||||
// Get backup service status
|
||||
let service_data = self
|
||||
.get_backup_service_status()
|
||||
.await
|
||||
.unwrap_or(BackupServiceData {
|
||||
enabled: false,
|
||||
pending_jobs: 0,
|
||||
last_message: None,
|
||||
});
|
||||
|
||||
// Convert BackupStatus to standardized string format
|
||||
let status_string = match overall_status {
|
||||
BackupStatus::Healthy => "ok",
|
||||
BackupStatus::Warning => "warning",
|
||||
BackupStatus::Failed => "critical",
|
||||
BackupStatus::Unknown => "unknown",
|
||||
};
|
||||
|
||||
// Add disk information if available from borgbackup metrics
|
||||
let mut backup_json = json!({
|
||||
"overall_status": status_string,
|
||||
"backup": backup_info,
|
||||
"service": service_data,
|
||||
"timestamp": Utc::now()
|
||||
});
|
||||
|
||||
// If we got borgbackup metrics, include disk information
|
||||
if let Ok(borg_metrics) = &borgbackup_result {
|
||||
backup_json["disk"] = json!({
|
||||
"device": borg_metrics.backup_disk.device,
|
||||
"health": borg_metrics.backup_disk.health,
|
||||
"total_gb": borg_metrics.backup_disk.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
"used_gb": borg_metrics.backup_disk.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
"usage_percent": borg_metrics.backup_disk.usage_percent
|
||||
});
|
||||
}
|
||||
|
||||
let backup_metrics = backup_json;
|
||||
|
||||
Ok(CollectorOutput {
|
||||
agent_type: AgentType::Backup,
|
||||
data: backup_metrics,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResticSnapshot {
|
||||
time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResticStats {
|
||||
total_size: u64,
|
||||
snapshot_count: u32,
|
||||
last_success: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct BackupServiceData {
|
||||
enabled: bool,
|
||||
pending_jobs: u32,
|
||||
last_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct BackupInfo {
|
||||
last_success: Option<DateTime<Utc>>,
|
||||
last_failure: Option<DateTime<Utc>>,
|
||||
size_gb: f32,
|
||||
latest_archive_size_gb: Option<f32>,
|
||||
snapshot_count: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
enum BackupStatus {
|
||||
Healthy,
|
||||
Warning,
|
||||
Failed,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct JournalEntry {
|
||||
#[serde(rename = "__REALTIME_TIMESTAMP")]
|
||||
realtime_timestamp: String,
|
||||
}
|
||||
|
||||
// Borgbackup metrics structure from backup script
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BorgbackupMetrics {
|
||||
status: String,
|
||||
repository: Repository,
|
||||
backup_disk: BackupDisk,
|
||||
timestamp: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Repository {
|
||||
total_archives: i32,
|
||||
latest_archive_size_bytes: i64,
|
||||
total_repository_size_bytes: i64,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct BackupDisk {
|
||||
device: String,
|
||||
health: String,
|
||||
total_bytes: i64,
|
||||
used_bytes: i64,
|
||||
usage_percent: f32,
|
||||
}
|
||||
Reference in New Issue
Block a user