All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
859 lines
35 KiB
Rust
859 lines
35 KiB
Rust
use anyhow::Result;
|
|
use async_trait::async_trait;
|
|
use cm_dashboard_shared::{AgentData, DriveData, FilesystemData, PoolData, HysteresisThresholds, Status};
|
|
|
|
use crate::config::DiskConfig;
|
|
use std::process::Command;
|
|
use std::time::Instant;
|
|
use std::collections::HashMap;
|
|
use tracing::debug;
|
|
|
|
use super::{Collector, CollectorError};
|
|
|
|
/// Storage collector with clean architecture and structured data output
|
|
pub struct DiskCollector {
|
|
config: DiskConfig,
|
|
temperature_thresholds: HysteresisThresholds,
|
|
}
|
|
|
|
/// A physical drive with its filesystems
|
|
#[derive(Debug, Clone)]
|
|
struct PhysicalDrive {
|
|
name: String, // e.g., "nvme0n1", "sda"
|
|
health: String, // SMART health status
|
|
filesystems: Vec<Filesystem>, // mounted filesystems on this drive
|
|
}
|
|
|
|
/// A filesystem mounted on a drive
|
|
#[derive(Debug, Clone)]
|
|
struct Filesystem {
|
|
mount_point: String, // e.g., "/", "/boot"
|
|
usage_percent: f32, // Usage percentage
|
|
used_bytes: u64, // Used bytes
|
|
total_bytes: u64, // Total bytes
|
|
}
|
|
|
|
/// MergerFS pool
|
|
#[derive(Debug, Clone)]
|
|
struct MergerfsPool {
|
|
name: String, // e.g., "srv_media"
|
|
mount_point: String, // e.g., "/srv/media"
|
|
total_bytes: u64, // Pool total bytes
|
|
used_bytes: u64, // Pool used bytes
|
|
data_drives: Vec<PoolDrive>, // Data drives in pool
|
|
parity_drives: Vec<PoolDrive>, // Parity drives in pool
|
|
}
|
|
|
|
/// Drive in a storage pool
|
|
#[derive(Debug, Clone)]
|
|
struct PoolDrive {
|
|
name: String, // Drive name
|
|
mount_point: String, // e.g., "/mnt/disk1"
|
|
temperature_celsius: Option<f32>, // Drive temperature
|
|
}
|
|
|
|
impl DiskCollector {
|
|
pub fn new(config: DiskConfig) -> Self {
|
|
let temperature_thresholds = HysteresisThresholds::new(
|
|
config.temperature_warning_celsius,
|
|
config.temperature_critical_celsius,
|
|
);
|
|
|
|
Self {
|
|
config,
|
|
temperature_thresholds,
|
|
}
|
|
}
|
|
|
|
/// Collect all storage data and populate AgentData
|
|
async fn collect_storage_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
let start_time = Instant::now();
|
|
debug!("Starting clean storage collection");
|
|
|
|
// Step 1: Get mount points and their backing devices
|
|
let mount_devices = self.get_mount_devices().await?;
|
|
|
|
// Step 2: Get filesystem usage for each mount point using df
|
|
let mut filesystem_usage = self.get_filesystem_usage(&mount_devices).map_err(|e| CollectorError::Parse {
|
|
value: "filesystem usage".to_string(),
|
|
error: format!("Failed to get filesystem usage: {}", e),
|
|
})?;
|
|
|
|
// Step 2.5: Add MergerFS mount points that weren't in lsblk output
|
|
self.add_mergerfs_filesystem_usage(&mut filesystem_usage).map_err(|e| CollectorError::Parse {
|
|
value: "mergerfs filesystem usage".to_string(),
|
|
error: format!("Failed to get mergerfs filesystem usage: {}", e),
|
|
})?;
|
|
|
|
// Step 3: Detect MergerFS pools
|
|
let mergerfs_pools = self.detect_mergerfs_pools(&filesystem_usage).map_err(|e| CollectorError::Parse {
|
|
value: "mergerfs pools".to_string(),
|
|
error: format!("Failed to detect mergerfs pools: {}", e),
|
|
})?;
|
|
|
|
// Step 4: Group filesystems by physical drive (excluding mergerfs members)
|
|
let physical_drives = self.group_by_physical_drive(&mount_devices, &filesystem_usage, &mergerfs_pools).map_err(|e| CollectorError::Parse {
|
|
value: "physical drives".to_string(),
|
|
error: format!("Failed to group by physical drive: {}", e),
|
|
})?;
|
|
|
|
// Step 5: Get SMART data for all drives
|
|
let smart_data = self.get_smart_data_for_drives(&physical_drives, &mergerfs_pools).await;
|
|
|
|
// Step 6: Populate AgentData
|
|
self.populate_drives_data(&physical_drives, &smart_data, agent_data)?;
|
|
self.populate_pools_data(&mergerfs_pools, &smart_data, agent_data)?;
|
|
|
|
let elapsed = start_time.elapsed();
|
|
debug!("Storage collection completed in {:?}", elapsed);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get block devices and their mount points using lsblk
|
|
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
|
|
let output = Command::new("lsblk")
|
|
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
|
|
.output()
|
|
.map_err(|e| CollectorError::SystemRead {
|
|
path: "block devices".to_string(),
|
|
error: e.to_string(),
|
|
})?;
|
|
|
|
let mut mount_devices = HashMap::new();
|
|
for line in String::from_utf8_lossy(&output.stdout).lines() {
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() >= 2 {
|
|
let device_name = parts[0];
|
|
let mount_point = parts[1];
|
|
|
|
// Skip swap partitions and unmounted devices
|
|
if mount_point == "[SWAP]" || mount_point.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Convert device name to full path
|
|
let device_path = format!("/dev/{}", device_name);
|
|
mount_devices.insert(mount_point.to_string(), device_path);
|
|
}
|
|
}
|
|
|
|
debug!("Found {} mounted block devices", mount_devices.len());
|
|
Ok(mount_devices)
|
|
}
|
|
|
|
/// Use df to get filesystem usage for mount points
|
|
fn get_filesystem_usage(&self, mount_devices: &HashMap<String, String>) -> anyhow::Result<HashMap<String, (u64, u64)>> {
|
|
let mut filesystem_usage = HashMap::new();
|
|
|
|
for mount_point in mount_devices.keys() {
|
|
match self.get_filesystem_info(mount_point) {
|
|
Ok((total, used)) => {
|
|
filesystem_usage.insert(mount_point.clone(), (total, used));
|
|
}
|
|
Err(e) => {
|
|
debug!("Failed to get filesystem info for {}: {}", mount_point, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(filesystem_usage)
|
|
}
|
|
|
|
/// Add filesystem usage for MergerFS mount points that aren't in lsblk
|
|
fn add_mergerfs_filesystem_usage(&self, filesystem_usage: &mut HashMap<String, (u64, u64)>) -> anyhow::Result<()> {
|
|
let mounts_content = std::fs::read_to_string("/proc/mounts")
|
|
.map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?;
|
|
|
|
for line in mounts_content.lines() {
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() >= 3 && parts[2] == "fuse.mergerfs" {
|
|
let mount_point = parts[1].to_string();
|
|
|
|
// Only add if we don't already have usage data for this mount point
|
|
if !filesystem_usage.contains_key(&mount_point) {
|
|
if let Ok((total, used)) = self.get_filesystem_info(&mount_point) {
|
|
debug!("Added MergerFS filesystem usage for {}: {}GB total, {}GB used",
|
|
mount_point, total as f32 / (1024.0 * 1024.0 * 1024.0), used as f32 / (1024.0 * 1024.0 * 1024.0));
|
|
filesystem_usage.insert(mount_point, (total, used));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get filesystem info for a single mount point
|
|
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
|
|
let output = Command::new("df")
|
|
.args(&["--block-size=1", mount_point])
|
|
.output()
|
|
.map_err(|e| CollectorError::SystemRead {
|
|
path: format!("df {}", mount_point),
|
|
error: e.to_string(),
|
|
})?;
|
|
|
|
let output_str = String::from_utf8_lossy(&output.stdout);
|
|
let lines: Vec<&str> = output_str.lines().collect();
|
|
|
|
if lines.len() < 2 {
|
|
return Err(CollectorError::Parse {
|
|
value: output_str.to_string(),
|
|
error: "Expected at least 2 lines from df output".to_string(),
|
|
});
|
|
}
|
|
|
|
// Parse the data line (skip header)
|
|
let parts: Vec<&str> = lines[1].split_whitespace().collect();
|
|
if parts.len() < 4 {
|
|
return Err(CollectorError::Parse {
|
|
value: lines[1].to_string(),
|
|
error: "Expected at least 4 fields in df output".to_string(),
|
|
});
|
|
}
|
|
|
|
let total_bytes: u64 = parts[1].parse().map_err(|e| CollectorError::Parse {
|
|
value: parts[1].to_string(),
|
|
error: format!("Failed to parse total bytes: {}", e),
|
|
})?;
|
|
|
|
let used_bytes: u64 = parts[2].parse().map_err(|e| CollectorError::Parse {
|
|
value: parts[2].to_string(),
|
|
error: format!("Failed to parse used bytes: {}", e),
|
|
})?;
|
|
|
|
Ok((total_bytes, used_bytes))
|
|
}
|
|
|
|
/// Detect MergerFS pools from mount data
|
|
fn detect_mergerfs_pools(&self, filesystem_usage: &HashMap<String, (u64, u64)>) -> anyhow::Result<Vec<MergerfsPool>> {
|
|
let mounts_content = std::fs::read_to_string("/proc/mounts")
|
|
.map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?;
|
|
let mut pools = Vec::new();
|
|
|
|
for line in mounts_content.lines() {
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() >= 3 && parts[2] == "fuse.mergerfs" {
|
|
let mount_point = parts[1].to_string();
|
|
let device_sources = parts[0]; // e.g., "/mnt/disk1:/mnt/disk2"
|
|
|
|
// Get pool usage
|
|
let (total_bytes, used_bytes) = filesystem_usage.get(&mount_point)
|
|
.copied()
|
|
.unwrap_or((0, 0));
|
|
|
|
// Extract pool name from mount point (e.g., "/srv/media" -> "srv_media")
|
|
let pool_name = if mount_point == "/" {
|
|
"root".to_string()
|
|
} else {
|
|
mount_point.trim_start_matches('/').replace('/', "_")
|
|
};
|
|
|
|
if pool_name.is_empty() {
|
|
debug!("Skipping mergerfs pool with empty name: {}", mount_point);
|
|
continue;
|
|
}
|
|
|
|
// Parse member paths - handle both full paths and numeric references
|
|
let raw_paths: Vec<String> = device_sources
|
|
.split(':')
|
|
.map(|s| s.trim().to_string())
|
|
.filter(|s| !s.is_empty())
|
|
.collect();
|
|
|
|
// Convert numeric references to actual mount points if needed
|
|
let member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
|
|
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
|
|
self.resolve_numeric_mergerfs_paths(&raw_paths)?
|
|
} else {
|
|
// Already full paths
|
|
raw_paths
|
|
};
|
|
|
|
// For SnapRAID setups, include parity drives that are related to this pool's data drives
|
|
let mut all_member_paths = member_paths.clone();
|
|
let related_parity_paths = self.discover_related_parity_drives(&member_paths)?;
|
|
all_member_paths.extend(related_parity_paths);
|
|
|
|
// Categorize as data vs parity drives
|
|
let (data_drives, parity_drives) = match self.categorize_pool_drives(&all_member_paths) {
|
|
Ok(drives) => drives,
|
|
Err(e) => {
|
|
debug!("Failed to categorize drives for pool {}: {}. Skipping.", mount_point, e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
pools.push(MergerfsPool {
|
|
name: pool_name,
|
|
mount_point,
|
|
total_bytes,
|
|
used_bytes,
|
|
data_drives,
|
|
parity_drives,
|
|
});
|
|
}
|
|
}
|
|
|
|
debug!("Found {} mergerfs pools", pools.len());
|
|
Ok(pools)
|
|
}
|
|
|
|
/// Group filesystems by physical drive (excluding mergerfs members) - exact old logic
|
|
fn group_by_physical_drive(
|
|
&self,
|
|
mount_devices: &HashMap<String, String>,
|
|
filesystem_usage: &HashMap<String, (u64, u64)>,
|
|
mergerfs_pools: &[MergerfsPool]
|
|
) -> anyhow::Result<Vec<PhysicalDrive>> {
|
|
let mut drive_groups: HashMap<String, Vec<Filesystem>> = HashMap::new();
|
|
|
|
// Get all mergerfs member paths to exclude them - exactly like old code
|
|
let mut mergerfs_members = std::collections::HashSet::new();
|
|
for pool in mergerfs_pools {
|
|
for drive in &pool.data_drives {
|
|
mergerfs_members.insert(drive.mount_point.clone());
|
|
}
|
|
for drive in &pool.parity_drives {
|
|
mergerfs_members.insert(drive.mount_point.clone());
|
|
}
|
|
}
|
|
|
|
// Group filesystems by base device
|
|
for (mount_point, device) in mount_devices {
|
|
// Skip mergerfs member mounts
|
|
if mergerfs_members.contains(mount_point) {
|
|
continue;
|
|
}
|
|
|
|
let base_device = self.extract_base_device(device);
|
|
|
|
if let Some((total, used)) = filesystem_usage.get(mount_point) {
|
|
let usage_percent = (*used as f32 / *total as f32) * 100.0;
|
|
|
|
let filesystem = Filesystem {
|
|
mount_point: mount_point.clone(), // Keep actual mount point like "/" and "/boot"
|
|
usage_percent,
|
|
used_bytes: *used,
|
|
total_bytes: *total,
|
|
};
|
|
|
|
drive_groups.entry(base_device).or_insert_with(Vec::new).push(filesystem);
|
|
}
|
|
}
|
|
|
|
// Convert to PhysicalDrive structs
|
|
let mut physical_drives = Vec::new();
|
|
for (drive_name, filesystems) in drive_groups {
|
|
let physical_drive = PhysicalDrive {
|
|
name: drive_name,
|
|
health: "UNKNOWN".to_string(), // Will be updated with SMART data
|
|
filesystems,
|
|
};
|
|
physical_drives.push(physical_drive);
|
|
}
|
|
|
|
physical_drives.sort_by(|a, b| a.name.cmp(&b.name));
|
|
Ok(physical_drives)
|
|
}
|
|
|
|
/// Extract base device name from device path
|
|
fn extract_base_device(&self, device: &str) -> String {
|
|
// Extract base device name (e.g., "/dev/nvme0n1p1" -> "nvme0n1")
|
|
if let Some(dev_name) = device.strip_prefix("/dev/") {
|
|
// Remove partition numbers: nvme0n1p1 -> nvme0n1, sda1 -> sda
|
|
if let Some(pos) = dev_name.find('p') {
|
|
if dev_name[pos+1..].chars().all(char::is_numeric) {
|
|
return dev_name[..pos].to_string();
|
|
}
|
|
}
|
|
// Handle traditional naming: sda1 -> sda
|
|
let mut result = String::new();
|
|
for ch in dev_name.chars() {
|
|
if ch.is_ascii_digit() {
|
|
break;
|
|
}
|
|
result.push(ch);
|
|
}
|
|
if !result.is_empty() {
|
|
return result;
|
|
}
|
|
}
|
|
device.to_string()
|
|
}
|
|
|
|
/// Get SMART data for drives
|
|
async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> {
|
|
use tracing::info;
|
|
let mut smart_data = HashMap::new();
|
|
|
|
// Collect all drive names
|
|
let mut all_drives = std::collections::HashSet::new();
|
|
for drive in physical_drives {
|
|
all_drives.insert(drive.name.clone());
|
|
}
|
|
for pool in mergerfs_pools {
|
|
for drive in &pool.data_drives {
|
|
all_drives.insert(drive.name.clone());
|
|
}
|
|
for drive in &pool.parity_drives {
|
|
all_drives.insert(drive.name.clone());
|
|
}
|
|
}
|
|
|
|
info!("Collecting SMART data for {} drives", all_drives.len());
|
|
|
|
// Get SMART data for each drive
|
|
for drive_name in &all_drives {
|
|
match self.get_smart_data(drive_name).await {
|
|
Ok(data) => {
|
|
info!("SMART data collected for {}: serial={:?}, temp={:?}, health={}",
|
|
drive_name, data.serial_number, data.temperature_celsius, data.health);
|
|
smart_data.insert(drive_name.clone(), data);
|
|
}
|
|
Err(e) => {
|
|
info!("Failed to get SMART data for {}: {:?}", drive_name, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("SMART data collection complete: {}/{} drives successful", smart_data.len(), all_drives.len());
|
|
smart_data
|
|
}
|
|
|
|
/// Get SMART data for a single drive
|
|
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
|
|
use tracing::info;
|
|
|
|
let output = Command::new("sudo")
|
|
.args(&["smartctl", "-a", &format!("/dev/{}", drive_name)])
|
|
.output()
|
|
.map_err(|e| CollectorError::SystemRead {
|
|
path: format!("SMART data for {}", drive_name),
|
|
error: e.to_string(),
|
|
})?;
|
|
|
|
let output_str = String::from_utf8_lossy(&output.stdout);
|
|
let error_str = String::from_utf8_lossy(&output.stderr);
|
|
|
|
// Debug logging for SMART command results
|
|
debug!("SMART output for {}: status={}, stdout_len={}, stderr={}",
|
|
drive_name, output.status, output_str.len(), error_str);
|
|
|
|
if !output.status.success() {
|
|
info!("SMART command failed for {}, status={}, stderr={}", drive_name, output.status, error_str);
|
|
// Return unknown data rather than failing completely
|
|
return Ok(SmartData {
|
|
health: "UNKNOWN".to_string(),
|
|
serial_number: None,
|
|
temperature_celsius: None,
|
|
wear_percent: None,
|
|
});
|
|
}
|
|
|
|
let mut health = "UNKNOWN".to_string();
|
|
let mut serial_number = None;
|
|
let mut temperature = None;
|
|
let mut wear_percent = None;
|
|
|
|
for line in output_str.lines() {
|
|
if line.contains("SMART overall-health") {
|
|
if line.contains("PASSED") {
|
|
health = "PASSED".to_string();
|
|
} else if line.contains("FAILED") {
|
|
health = "FAILED".to_string();
|
|
}
|
|
}
|
|
|
|
// Serial number parsing (both SATA and NVMe)
|
|
if line.contains("Serial Number:") {
|
|
if let Some(serial_part) = line.split("Serial Number:").nth(1) {
|
|
let serial_str = serial_part.trim();
|
|
if !serial_str.is_empty() {
|
|
// Take first whitespace-separated token
|
|
if let Some(serial) = serial_str.split_whitespace().next() {
|
|
serial_number = Some(serial.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Temperature parsing for different drive types
|
|
if line.contains("Temperature_Celsius") || line.contains("Airflow_Temperature_Cel") || line.contains("Temperature_Case") {
|
|
// Traditional SATA drives: attribute table format
|
|
if let Some(temp_str) = line.split_whitespace().nth(9) {
|
|
if let Ok(temp) = temp_str.parse::<f32>() {
|
|
temperature = Some(temp);
|
|
}
|
|
}
|
|
} else if line.starts_with("Temperature:") {
|
|
// NVMe drives: simple "Temperature: 27 Celsius" format
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() >= 2 {
|
|
if let Ok(temp) = parts[1].parse::<f32>() {
|
|
temperature = Some(temp);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wear level parsing for SSDs
|
|
if line.contains("Media_Wearout_Indicator") {
|
|
// Media_Wearout_Indicator stores remaining life % in column 3 (VALUE)
|
|
if let Some(wear_str) = line.split_whitespace().nth(3) {
|
|
if let Ok(remaining) = wear_str.parse::<f32>() {
|
|
wear_percent = Some(100.0 - remaining); // Convert remaining life to wear
|
|
}
|
|
}
|
|
} else if line.contains("Wear_Leveling_Count") || line.contains("SSD_Life_Left") {
|
|
// Other wear attributes store value in column 9 (RAW_VALUE)
|
|
if let Some(wear_str) = line.split_whitespace().nth(9) {
|
|
if let Ok(wear) = wear_str.parse::<f32>() {
|
|
wear_percent = Some(100.0 - wear); // Convert remaining life to wear
|
|
}
|
|
}
|
|
}
|
|
// NVMe wear parsing: "Percentage Used: 1%"
|
|
if line.contains("Percentage Used:") {
|
|
if let Some(percent_part) = line.split("Percentage Used:").nth(1) {
|
|
if let Some(percent_str) = percent_part.split_whitespace().next() {
|
|
if let Some(percent_clean) = percent_str.strip_suffix('%') {
|
|
if let Ok(wear) = percent_clean.parse::<f32>() {
|
|
wear_percent = Some(wear);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(SmartData {
|
|
health,
|
|
serial_number,
|
|
temperature_celsius: temperature,
|
|
wear_percent,
|
|
})
|
|
}
|
|
|
|
/// Populate drives data into AgentData
|
|
fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
for drive in physical_drives {
|
|
let smart = smart_data.get(&drive.name);
|
|
|
|
let mut filesystems: Vec<FilesystemData> = drive.filesystems.iter().map(|fs| {
|
|
FilesystemData {
|
|
mount: fs.mount_point.clone(), // This preserves "/" and "/boot" correctly
|
|
usage_percent: fs.usage_percent,
|
|
used_gb: fs.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
total_gb: fs.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
usage_status: self.calculate_filesystem_usage_status(fs.usage_percent),
|
|
}
|
|
}).collect();
|
|
|
|
// Sort filesystems by mount point for consistent display order
|
|
filesystems.sort_by(|a, b| a.mount.cmp(&b.mount));
|
|
|
|
agent_data.system.storage.drives.push(DriveData {
|
|
name: drive.name.clone(),
|
|
serial_number: smart.and_then(|s| s.serial_number.clone()),
|
|
health: smart.map(|s| s.health.clone()).unwrap_or_else(|| drive.health.clone()),
|
|
temperature_celsius: smart.and_then(|s| s.temperature_celsius),
|
|
wear_percent: smart.and_then(|s| s.wear_percent),
|
|
filesystems,
|
|
temperature_status: smart.and_then(|s| s.temperature_celsius)
|
|
.map(|temp| self.calculate_temperature_status(temp))
|
|
.unwrap_or(Status::Unknown),
|
|
health_status: self.calculate_health_status(
|
|
smart.map(|s| s.health.as_str()).unwrap_or("UNKNOWN")
|
|
),
|
|
});
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Populate pools data into AgentData
|
|
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
for pool in mergerfs_pools {
|
|
// Calculate pool health and statuses based on member drive health
|
|
let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);
|
|
|
|
let pool_data = PoolData {
|
|
name: pool.name.clone(),
|
|
mount: pool.mount_point.clone(),
|
|
pool_type: format!("mergerfs ({}+{})", pool.data_drives.len(), pool.parity_drives.len()),
|
|
health: pool_health,
|
|
usage_percent: if pool.total_bytes > 0 {
|
|
(pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0
|
|
} else { 0.0 },
|
|
used_gb: pool.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
total_gb: pool.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
|
data_drives: data_drive_data,
|
|
parity_drives: parity_drive_data,
|
|
health_status,
|
|
usage_status,
|
|
};
|
|
|
|
agent_data.system.storage.pools.push(pool_data);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Calculate pool health based on member drive status
|
|
fn calculate_pool_health(&self, pool: &MergerfsPool, smart_data: &HashMap<String, SmartData>) -> (String, cm_dashboard_shared::Status, cm_dashboard_shared::Status, Vec<cm_dashboard_shared::PoolDriveData>, Vec<cm_dashboard_shared::PoolDriveData>) {
|
|
let mut failed_data = 0;
|
|
let mut failed_parity = 0;
|
|
|
|
// Process data drives
|
|
let data_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.data_drives.iter().map(|d| {
|
|
let smart = smart_data.get(&d.name);
|
|
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
|
|
let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius);
|
|
|
|
if health == "FAILED" {
|
|
failed_data += 1;
|
|
}
|
|
|
|
// Calculate drive statuses using config thresholds
|
|
let health_status = self.calculate_health_status(&health);
|
|
let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown);
|
|
|
|
cm_dashboard_shared::PoolDriveData {
|
|
name: d.name.clone(),
|
|
serial_number: smart.and_then(|s| s.serial_number.clone()),
|
|
temperature_celsius: temperature,
|
|
health,
|
|
wear_percent: smart.and_then(|s| s.wear_percent),
|
|
health_status,
|
|
temperature_status,
|
|
}
|
|
}).collect();
|
|
|
|
// Process parity drives
|
|
let parity_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.parity_drives.iter().map(|d| {
|
|
let smart = smart_data.get(&d.name);
|
|
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
|
|
let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius);
|
|
|
|
if health == "FAILED" {
|
|
failed_parity += 1;
|
|
}
|
|
|
|
// Calculate drive statuses using config thresholds
|
|
let health_status = self.calculate_health_status(&health);
|
|
let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown);
|
|
|
|
cm_dashboard_shared::PoolDriveData {
|
|
name: d.name.clone(),
|
|
serial_number: smart.and_then(|s| s.serial_number.clone()),
|
|
temperature_celsius: temperature,
|
|
health,
|
|
wear_percent: smart.and_then(|s| s.wear_percent),
|
|
health_status,
|
|
temperature_status,
|
|
}
|
|
}).collect();
|
|
|
|
// Calculate overall pool health string and status
|
|
// SnapRAID logic: can tolerate up to N parity drive failures (where N = number of parity drives)
|
|
// If data drives fail AND we've lost parity protection, that's critical
|
|
let (pool_health, health_status) = if failed_data == 0 && failed_parity == 0 {
|
|
("healthy".to_string(), cm_dashboard_shared::Status::Ok)
|
|
} else if failed_data == 0 && failed_parity > 0 {
|
|
// Parity failed but no data loss - degraded (reduced protection)
|
|
("degraded".to_string(), cm_dashboard_shared::Status::Warning)
|
|
} else if failed_data == 1 && failed_parity == 0 {
|
|
// One data drive failed, parity intact - degraded (recoverable)
|
|
("degraded".to_string(), cm_dashboard_shared::Status::Warning)
|
|
} else {
|
|
// Multiple data drives failed OR data+parity failed = data loss risk
|
|
("critical".to_string(), cm_dashboard_shared::Status::Critical)
|
|
};
|
|
|
|
// Calculate pool usage status using config thresholds
|
|
let usage_percent = if pool.total_bytes > 0 {
|
|
(pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0
|
|
} else { 0.0 };
|
|
|
|
let usage_status = if usage_percent >= self.config.usage_critical_percent {
|
|
cm_dashboard_shared::Status::Critical
|
|
} else if usage_percent >= self.config.usage_warning_percent {
|
|
cm_dashboard_shared::Status::Warning
|
|
} else {
|
|
cm_dashboard_shared::Status::Ok
|
|
};
|
|
|
|
(pool_health, health_status, usage_status, data_drive_data, parity_drive_data)
|
|
}
|
|
|
|
/// Calculate filesystem usage status
|
|
fn calculate_filesystem_usage_status(&self, usage_percent: f32) -> Status {
|
|
// Use standard filesystem warning/critical thresholds
|
|
if usage_percent >= 95.0 {
|
|
Status::Critical
|
|
} else if usage_percent >= 85.0 {
|
|
Status::Warning
|
|
} else {
|
|
Status::Ok
|
|
}
|
|
}
|
|
|
|
/// Calculate drive temperature status
|
|
fn calculate_temperature_status(&self, temperature: f32) -> Status {
|
|
self.temperature_thresholds.evaluate(temperature)
|
|
}
|
|
|
|
/// Calculate drive health status
|
|
fn calculate_health_status(&self, health: &str) -> Status {
|
|
match health {
|
|
"PASSED" => Status::Ok,
|
|
"FAILED" => Status::Critical,
|
|
_ => Status::Unknown,
|
|
}
|
|
}
|
|
|
|
/// Discover parity drives that are related to the given data drives
|
|
fn discover_related_parity_drives(&self, data_drives: &[String]) -> anyhow::Result<Vec<String>> {
|
|
let mount_devices = tokio::task::block_in_place(|| {
|
|
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
|
|
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
|
|
|
|
let mut related_parity = Vec::new();
|
|
|
|
// Find parity drives that share the same parent directory as the data drives
|
|
for data_path in data_drives {
|
|
if let Some(parent_dir) = self.get_parent_directory(data_path) {
|
|
// Look for parity drives in the same parent directory
|
|
for (mount_point, _device) in &mount_devices {
|
|
if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) {
|
|
if !related_parity.contains(mount_point) {
|
|
related_parity.push(mount_point.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(related_parity)
|
|
}
|
|
|
|
/// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt")
|
|
fn get_parent_directory(&self, path: &str) -> Option<String> {
|
|
if let Some(last_slash) = path.rfind('/') {
|
|
if last_slash > 0 {
|
|
return Some(path[..last_slash].to_string());
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Categorize pool member drives as data vs parity
|
|
fn categorize_pool_drives(&self, member_paths: &[String]) -> anyhow::Result<(Vec<PoolDrive>, Vec<PoolDrive>)> {
|
|
let mut data_drives = Vec::new();
|
|
let mut parity_drives = Vec::new();
|
|
|
|
for path in member_paths {
|
|
let drive_info = self.get_drive_info_for_path(path)?;
|
|
|
|
// Heuristic: if path contains "parity", it's parity
|
|
if path.to_lowercase().contains("parity") {
|
|
parity_drives.push(drive_info);
|
|
} else {
|
|
data_drives.push(drive_info);
|
|
}
|
|
}
|
|
|
|
Ok((data_drives, parity_drives))
|
|
}
|
|
|
|
/// Get drive information for a mount path
|
|
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
|
|
// Use lsblk to find the backing device
|
|
let output = Command::new("lsblk")
|
|
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
|
|
.output()
|
|
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;
|
|
|
|
let output_str = String::from_utf8_lossy(&output.stdout);
|
|
let mut device = String::new();
|
|
|
|
for line in output_str.lines() {
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() >= 2 && parts[1] == path {
|
|
device = parts[0].to_string();
|
|
break;
|
|
}
|
|
}
|
|
|
|
if device.is_empty() {
|
|
return Err(anyhow::anyhow!("Could not find device for path {}", path));
|
|
}
|
|
|
|
// Extract base device name (e.g., "sda1" -> "sda")
|
|
let base_device = self.extract_base_device(&format!("/dev/{}", device));
|
|
|
|
// Get temperature from SMART data if available
|
|
let temperature = if let Ok(smart_data) = tokio::task::block_in_place(|| {
|
|
tokio::runtime::Handle::current().block_on(self.get_smart_data(&base_device))
|
|
}) {
|
|
smart_data.temperature_celsius
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Ok(PoolDrive {
|
|
name: base_device,
|
|
mount_point: path.to_string(),
|
|
temperature_celsius: temperature,
|
|
})
|
|
}
|
|
|
|
/// Resolve numeric mergerfs references like "1:2" to actual mount paths
|
|
fn resolve_numeric_mergerfs_paths(&self, numeric_refs: &[String]) -> anyhow::Result<Vec<String>> {
|
|
let mut resolved_paths = Vec::new();
|
|
|
|
// Get all mount points that look like /mnt/disk* or /mnt/parity*
|
|
let mount_devices = tokio::task::block_in_place(|| {
|
|
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
|
|
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
|
|
|
|
let mut disk_mounts: Vec<String> = mount_devices.keys()
|
|
.filter(|path| path.starts_with("/mnt/disk") || path.starts_with("/mnt/parity"))
|
|
.cloned()
|
|
.collect();
|
|
disk_mounts.sort(); // Ensure consistent ordering
|
|
|
|
for num_ref in numeric_refs {
|
|
if let Ok(index) = num_ref.parse::<usize>() {
|
|
// Convert 1-based index to 0-based
|
|
if index > 0 && index <= disk_mounts.len() {
|
|
resolved_paths.push(disk_mounts[index - 1].clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fallback: if we couldn't resolve, return the original paths
|
|
if resolved_paths.is_empty() {
|
|
resolved_paths = numeric_refs.to_vec();
|
|
}
|
|
|
|
Ok(resolved_paths)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Collector for DiskCollector {
|
|
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
self.collect_storage_data(agent_data).await
|
|
}
|
|
}
|
|
|
|
/// SMART data for a drive
|
|
#[derive(Debug, Clone)]
|
|
struct SmartData {
|
|
health: String,
|
|
serial_number: Option<String>,
|
|
temperature_celsius: Option<f32>,
|
|
wear_percent: Option<f32>,
|
|
} |