496 lines
16 KiB
Rust
496 lines
16 KiB
Rust
use async_trait::async_trait;
|
|
use chrono::{DateTime, Utc};
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_json::json;
|
|
use std::process::Stdio;
|
|
use std::time::Duration;
|
|
use tokio::process::Command;
|
|
use tokio::time::timeout;
|
|
use tokio::fs;
|
|
|
|
use super::{AgentType, Collector, CollectorError, CollectorOutput};
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct BackupCollector {
|
|
pub enabled: bool,
|
|
pub interval: Duration,
|
|
pub restic_repo: Option<String>,
|
|
pub backup_service: String,
|
|
pub timeout_ms: u64,
|
|
}
|
|
|
|
impl BackupCollector {
|
|
pub fn new(
|
|
enabled: bool,
|
|
interval_ms: u64,
|
|
restic_repo: Option<String>,
|
|
backup_service: String,
|
|
) -> Self {
|
|
Self {
|
|
enabled,
|
|
interval: Duration::from_millis(interval_ms),
|
|
restic_repo,
|
|
backup_service,
|
|
timeout_ms: 30000, // 30 second timeout for backup operations
|
|
}
|
|
}
|
|
|
|
async fn get_borgbackup_metrics(&self) -> Result<BorgbackupMetrics, CollectorError> {
|
|
// Read metrics from the borgbackup JSON file
|
|
let metrics_path = "/var/lib/backup/backup-metrics.json";
|
|
|
|
let content = fs::read_to_string(metrics_path)
|
|
.await
|
|
.map_err(|e| CollectorError::IoError {
|
|
message: format!("Failed to read backup metrics file: {}", e),
|
|
})?;
|
|
|
|
let metrics: BorgbackupMetrics = serde_json::from_str(&content)
|
|
.map_err(|e| CollectorError::ParseError {
|
|
message: format!("Failed to parse backup metrics JSON: {}", e),
|
|
})?;
|
|
|
|
Ok(metrics)
|
|
}
|
|
|
|
async fn get_restic_snapshots(&self) -> Result<ResticStats, CollectorError> {
|
|
let repo = self
|
|
.restic_repo
|
|
.as_ref()
|
|
.ok_or_else(|| CollectorError::ConfigError {
|
|
message: "No restic repository configured".to_string(),
|
|
})?;
|
|
|
|
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
|
|
|
// Get restic snapshots
|
|
let output = timeout(
|
|
timeout_duration,
|
|
Command::new("restic")
|
|
.args(["-r", repo, "snapshots", "--json"])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.output(),
|
|
)
|
|
.await
|
|
.map_err(|_| CollectorError::Timeout {
|
|
duration_ms: self.timeout_ms,
|
|
})?
|
|
.map_err(|e| CollectorError::CommandFailed {
|
|
command: format!("restic -r {} snapshots --json", repo),
|
|
message: e.to_string(),
|
|
})?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
return Err(CollectorError::CommandFailed {
|
|
command: format!("restic -r {} snapshots --json", repo),
|
|
message: stderr.to_string(),
|
|
});
|
|
}
|
|
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let snapshots: Vec<ResticSnapshot> =
|
|
serde_json::from_str(&stdout).map_err(|e| CollectorError::ParseError {
|
|
message: format!("Failed to parse restic snapshots: {}", e),
|
|
})?;
|
|
|
|
// Get repository stats
|
|
let stats_output = timeout(
|
|
timeout_duration,
|
|
Command::new("restic")
|
|
.args(["-r", repo, "stats", "--json"])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.output(),
|
|
)
|
|
.await
|
|
.map_err(|_| CollectorError::Timeout {
|
|
duration_ms: self.timeout_ms,
|
|
})?
|
|
.map_err(|e| CollectorError::CommandFailed {
|
|
command: format!("restic -r {} stats --json", repo),
|
|
message: e.to_string(),
|
|
})?;
|
|
|
|
let repo_size_gb = if stats_output.status.success() {
|
|
let stats_stdout = String::from_utf8_lossy(&stats_output.stdout);
|
|
let stats: Result<ResticStats, _> = serde_json::from_str(&stats_stdout);
|
|
stats
|
|
.ok()
|
|
.map(|s| s.total_size as f32 / (1024.0 * 1024.0 * 1024.0))
|
|
.unwrap_or(0.0)
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
// Find most recent snapshot
|
|
let last_success = snapshots.iter().map(|s| s.time).max();
|
|
|
|
Ok(ResticStats {
|
|
total_size: (repo_size_gb * 1024.0 * 1024.0 * 1024.0) as u64,
|
|
snapshot_count: snapshots.len() as u32,
|
|
last_success,
|
|
})
|
|
}
|
|
|
|
async fn get_backup_service_status(&self) -> Result<BackupServiceData, CollectorError> {
|
|
let timeout_duration = Duration::from_millis(self.timeout_ms);
|
|
|
|
// Get systemctl status for backup service
|
|
let status_output = timeout(
|
|
timeout_duration,
|
|
Command::new("systemctl")
|
|
.args([
|
|
"show",
|
|
&self.backup_service,
|
|
"--property=ActiveState,SubState,MainPID",
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.output(),
|
|
)
|
|
.await
|
|
.map_err(|_| CollectorError::Timeout {
|
|
duration_ms: self.timeout_ms,
|
|
})?
|
|
.map_err(|e| CollectorError::CommandFailed {
|
|
command: format!("systemctl show {}", self.backup_service),
|
|
message: e.to_string(),
|
|
})?;
|
|
|
|
let enabled = if status_output.status.success() {
|
|
let status_stdout = String::from_utf8_lossy(&status_output.stdout);
|
|
status_stdout.contains("ActiveState=active")
|
|
|| status_stdout.contains("SubState=running")
|
|
} else {
|
|
false
|
|
};
|
|
|
|
// Check for backup timer or service logs for last message
|
|
let last_message = self.get_last_backup_log_message().await.ok();
|
|
|
|
// Check for pending backup jobs (simplified - could check systemd timers)
|
|
let pending_jobs = 0; // TODO: Implement proper pending job detection
|
|
|
|
Ok(BackupServiceData {
|
|
enabled,
|
|
pending_jobs,
|
|
last_message,
|
|
})
|
|
}
|
|
|
|
async fn get_last_backup_log_message(&self) -> Result<String, CollectorError> {
|
|
let output = Command::new("journalctl")
|
|
.args([
|
|
"-u",
|
|
&self.backup_service,
|
|
"--lines=1",
|
|
"--no-pager",
|
|
"--output=cat",
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.output()
|
|
.await
|
|
.map_err(|e| CollectorError::CommandFailed {
|
|
command: format!("journalctl -u {} --lines=1", self.backup_service),
|
|
message: e.to_string(),
|
|
})?;
|
|
|
|
if output.status.success() {
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let message = stdout.trim().to_string();
|
|
if !message.is_empty() {
|
|
return Ok(message);
|
|
}
|
|
}
|
|
|
|
Err(CollectorError::ParseError {
|
|
message: "No log messages found".to_string(),
|
|
})
|
|
}
|
|
|
|
async fn get_backup_logs_for_failures(&self) -> Result<Option<DateTime<Utc>>, CollectorError> {
|
|
let output = Command::new("journalctl")
|
|
.args([
|
|
"-u",
|
|
&self.backup_service,
|
|
"--since",
|
|
"1 week ago",
|
|
"--grep=failed\\|error\\|ERROR",
|
|
"--output=json",
|
|
"--lines=1",
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.output()
|
|
.await
|
|
.map_err(|e| CollectorError::CommandFailed {
|
|
command: format!(
|
|
"journalctl -u {} --since='1 week ago' --grep=failed",
|
|
self.backup_service
|
|
),
|
|
message: e.to_string(),
|
|
})?;
|
|
|
|
if output.status.success() {
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
if let Ok(log_entry) = serde_json::from_str::<JournalEntry>(&stdout) {
|
|
if let Ok(timestamp) = log_entry.realtime_timestamp.parse::<i64>() {
|
|
let dt =
|
|
DateTime::from_timestamp_micros(timestamp).unwrap_or_else(|| Utc::now());
|
|
return Ok(Some(dt));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
fn determine_backup_status(
|
|
&self,
|
|
restic_stats: &Result<ResticStats, CollectorError>,
|
|
service_data: &BackupServiceData,
|
|
last_failure: Option<DateTime<Utc>>,
|
|
) -> BackupStatus {
|
|
match restic_stats {
|
|
Ok(stats) => {
|
|
if let Some(last_success) = stats.last_success {
|
|
let hours_since_backup =
|
|
Utc::now().signed_duration_since(last_success).num_hours();
|
|
|
|
if hours_since_backup > 48 {
|
|
BackupStatus::Warning // More than 2 days since last backup
|
|
} else if let Some(failure) = last_failure {
|
|
if failure > last_success {
|
|
BackupStatus::Failed // Failure after last success
|
|
} else {
|
|
BackupStatus::Healthy
|
|
}
|
|
} else {
|
|
BackupStatus::Healthy
|
|
}
|
|
} else {
|
|
BackupStatus::Warning // No successful backups found
|
|
}
|
|
}
|
|
Err(_) => {
|
|
if service_data.enabled {
|
|
BackupStatus::Failed // Service enabled but can't access repo
|
|
} else {
|
|
BackupStatus::Unknown // Service disabled
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Collector for BackupCollector {
|
|
fn name(&self) -> &str {
|
|
"backup"
|
|
}
|
|
|
|
fn agent_type(&self) -> AgentType {
|
|
AgentType::Backup
|
|
}
|
|
|
|
fn collect_interval(&self) -> Duration {
|
|
self.interval
|
|
}
|
|
|
|
fn is_enabled(&self) -> bool {
|
|
self.enabled
|
|
}
|
|
|
|
fn requires_root(&self) -> bool {
|
|
false // Depends on restic repo permissions
|
|
}
|
|
|
|
async fn collect(&self) -> Result<CollectorOutput, CollectorError> {
|
|
// Try to get borgbackup metrics first, fall back to restic if not available
|
|
let borgbackup_result = self.get_borgbackup_metrics().await;
|
|
|
|
let (backup_info, overall_status) = match &borgbackup_result {
|
|
Ok(borg_metrics) => {
|
|
// Parse borgbackup timestamp to DateTime
|
|
let last_success = chrono::DateTime::from_timestamp(borg_metrics.timestamp, 0);
|
|
|
|
// Determine status from borgbackup data
|
|
let status = match borg_metrics.status.as_str() {
|
|
"success" => BackupStatus::Healthy,
|
|
"warning" => BackupStatus::Warning,
|
|
"failed" => BackupStatus::Failed,
|
|
_ => BackupStatus::Unknown,
|
|
};
|
|
|
|
let backup_info = BackupInfo {
|
|
last_success,
|
|
last_failure: None, // borgbackup metrics don't include failure info
|
|
size_gb: borg_metrics.repository.total_repository_size_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
latest_archive_size_gb: Some(borg_metrics.repository.latest_archive_size_bytes as f32 / (1024.0 * 1024.0 * 1024.0)),
|
|
snapshot_count: borg_metrics.repository.total_archives as u32,
|
|
};
|
|
|
|
(backup_info, status)
|
|
},
|
|
Err(_) => {
|
|
// Fall back to restic if borgbackup metrics not available
|
|
let restic_stats = self.get_restic_snapshots().await;
|
|
let last_failure = self.get_backup_logs_for_failures().await.unwrap_or(None);
|
|
|
|
// Get backup service status for fallback determination
|
|
let service_data = self
|
|
.get_backup_service_status()
|
|
.await
|
|
.unwrap_or(BackupServiceData {
|
|
enabled: false,
|
|
pending_jobs: 0,
|
|
last_message: None,
|
|
});
|
|
|
|
let overall_status = self.determine_backup_status(&restic_stats, &service_data, last_failure);
|
|
|
|
let backup_info = match &restic_stats {
|
|
Ok(stats) => BackupInfo {
|
|
last_success: stats.last_success,
|
|
last_failure,
|
|
size_gb: stats.total_size as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
latest_archive_size_gb: None, // Restic doesn't provide this easily
|
|
snapshot_count: stats.snapshot_count,
|
|
},
|
|
Err(_) => BackupInfo {
|
|
last_success: None,
|
|
last_failure,
|
|
size_gb: 0.0,
|
|
latest_archive_size_gb: None,
|
|
snapshot_count: 0,
|
|
},
|
|
};
|
|
|
|
(backup_info, overall_status)
|
|
}
|
|
};
|
|
|
|
// Get backup service status
|
|
let service_data = self
|
|
.get_backup_service_status()
|
|
.await
|
|
.unwrap_or(BackupServiceData {
|
|
enabled: false,
|
|
pending_jobs: 0,
|
|
last_message: None,
|
|
});
|
|
|
|
// Add disk information if available from borgbackup metrics
|
|
let mut backup_json = json!({
|
|
"overall_status": overall_status,
|
|
"backup": backup_info,
|
|
"service": service_data,
|
|
"timestamp": Utc::now()
|
|
});
|
|
|
|
// If we got borgbackup metrics, include disk information
|
|
if let Ok(borg_metrics) = &borgbackup_result {
|
|
backup_json["disk"] = json!({
|
|
"device": borg_metrics.backup_disk.device,
|
|
"health": borg_metrics.backup_disk.health,
|
|
"total_gb": borg_metrics.backup_disk.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
"used_gb": borg_metrics.backup_disk.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
"usage_percent": borg_metrics.backup_disk.usage_percent
|
|
});
|
|
}
|
|
|
|
let backup_metrics = backup_json;
|
|
|
|
Ok(CollectorOutput {
|
|
agent_type: AgentType::Backup,
|
|
data: backup_metrics,
|
|
timestamp: Utc::now(),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct ResticSnapshot {
|
|
time: DateTime<Utc>,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct ResticStats {
|
|
total_size: u64,
|
|
snapshot_count: u32,
|
|
last_success: Option<DateTime<Utc>>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
struct BackupServiceData {
|
|
enabled: bool,
|
|
pending_jobs: u32,
|
|
last_message: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
struct BackupInfo {
|
|
last_success: Option<DateTime<Utc>>,
|
|
last_failure: Option<DateTime<Utc>>,
|
|
size_gb: f32,
|
|
latest_archive_size_gb: Option<f32>,
|
|
snapshot_count: u32,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
enum BackupStatus {
|
|
Healthy,
|
|
Warning,
|
|
Failed,
|
|
Unknown,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct JournalEntry {
|
|
#[serde(rename = "__REALTIME_TIMESTAMP")]
|
|
realtime_timestamp: String,
|
|
}
|
|
|
|
// Borgbackup metrics structure from backup script
|
|
#[derive(Debug, Deserialize)]
|
|
struct BorgbackupMetrics {
|
|
backup_name: String,
|
|
start_time: String,
|
|
end_time: String,
|
|
duration_seconds: i64,
|
|
status: String,
|
|
exit_codes: ExitCodes,
|
|
repository: Repository,
|
|
backup_disk: BackupDisk,
|
|
timestamp: i64,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct ExitCodes {
|
|
global: i32,
|
|
backup: i32,
|
|
prune: i32,
|
|
compact: i32,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct Repository {
|
|
total_archives: i32,
|
|
latest_archive_size_bytes: i64,
|
|
total_repository_size_bytes: i64,
|
|
path: String,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct BackupDisk {
|
|
device: String,
|
|
health: String,
|
|
total_bytes: i64,
|
|
used_bytes: i64,
|
|
available_bytes: i64,
|
|
usage_percent: f32,
|
|
}
|