use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{Metric, MetricValue, Status, StatusTracker, HysteresisThresholds}; use crate::config::DiskConfig; use std::process::Command; use std::time::Instant; use std::fs; use tracing::debug; use super::{Collector, CollectorError}; /// Mount point information from /proc/mounts #[derive(Debug, Clone)] struct MountInfo { device: String, // e.g., "/dev/sda1" or "/mnt/disk1:/mnt/disk2" mount_point: String, // e.g., "/", "/srv/media" fs_type: String, // e.g., "ext4", "xfs", "fuse.mergerfs" } /// Auto-discovered storage topology #[derive(Debug, Clone)] struct StorageTopology { single_disks: Vec, mergerfs_pools: Vec, } /// MergerFS pool information #[derive(Debug, Clone)] struct MergerfsPoolInfo { mount_point: String, // e.g., "/srv/media" data_members: Vec, // e.g., ["/mnt/disk1", "/mnt/disk2"] parity_disks: Vec, // e.g., ["/mnt/parity"] } /// Information about a storage pool (mount point with underlying drives) #[derive(Debug, Clone)] struct StoragePool { name: String, // e.g., "steampool", "root" mount_point: String, // e.g., "/mnt/steampool", "/" filesystem: String, // e.g., "mergerfs", "ext4", "zfs", "btrfs" pool_type: StoragePoolType, // Enhanced pool type with configuration size: String, // e.g., "2.5TB" used: String, // e.g., "2.1TB" available: String, // e.g., "400GB" usage_percent: f32, // e.g., 85.0 underlying_drives: Vec, // Individual physical drives pool_health: PoolHealth, // Overall pool health status } /// Enhanced storage pool types with specific configurations #[derive(Debug, Clone)] enum StoragePoolType { Single, // Traditional single disk (legacy) PhysicalDrive { // Physical drive with multiple filesystems filesystems: Vec, // Mount points on this drive }, MergerfsPool { // MergerFS with optional parity data_disks: Vec, // Member disk names (sdb, sdd) parity_disks: Vec, // Parity disk names (sdc) }, #[allow(dead_code)] RaidArray { // Hardware RAID (future) level: String, // "RAID1", "RAID5", etc. member_disks: Vec, spare_disks: Vec, }, #[allow(dead_code)] ZfsPool { // ZFS pool (future) pool_name: String, vdevs: Vec, } } /// Pool health status for redundant storage #[derive(Debug, Clone, Copy, PartialEq)] enum PoolHealth { Healthy, // All drives OK, parity current Degraded, // One drive failed or parity outdated, still functional Critical, // Multiple failures, data at risk #[allow(dead_code)] Rebuilding, // Actively rebuilding/scrubbing (future: SnapRAID status integration) Unknown, // Cannot determine status } /// Information about an individual physical drive #[derive(Debug, Clone)] struct DriveInfo { device: String, // e.g., "sda", "nvme0n1" health_status: String, // e.g., "PASSED", "FAILED" temperature: Option, // e.g., 45.0°C wear_level: Option, // e.g., 12.0% (for SSDs) } /// Disk usage collector for monitoring filesystem sizes pub struct DiskCollector { config: DiskConfig, temperature_thresholds: HysteresisThresholds, detected_devices: std::collections::HashMap>, // mount_point -> devices storage_topology: Option, // Auto-discovered storage layout } impl DiskCollector { pub fn new(config: DiskConfig) -> Self { // Create hysteresis thresholds for disk temperature from config let temperature_thresholds = HysteresisThresholds::with_custom_gaps( config.temperature_warning_celsius, 5.0, // 5°C gap for recovery config.temperature_critical_celsius, 5.0, // 5°C gap for recovery ); // Perform auto-discovery of storage topology let storage_topology = match Self::auto_discover_storage() { Ok(topology) => { debug!("Auto-discovered storage topology: {} single disks, {} mergerfs pools", topology.single_disks.len(), topology.mergerfs_pools.len()); Some(topology) } Err(e) => { debug!("Failed to auto-discover storage topology: {}", e); None } }; // Detect devices for discovered storage let mut detected_devices = std::collections::HashMap::new(); if let Some(ref topology) = storage_topology { // Add single disks for disk in &topology.single_disks { if let Ok(devices) = Self::detect_device_for_mount_point_static(&disk.mount_point) { detected_devices.insert(disk.mount_point.clone(), devices); } } // Add mergerfs pools and their members for pool in &topology.mergerfs_pools { // Detect devices for the pool itself if let Ok(devices) = Self::detect_device_for_mount_point_static(&pool.mount_point) { detected_devices.insert(pool.mount_point.clone(), devices); } // Detect devices for member disks for member in &pool.data_members { if let Ok(devices) = Self::detect_device_for_mount_point_static(member) { detected_devices.insert(member.clone(), devices); } } // Detect devices for parity disks for parity in &pool.parity_disks { if let Ok(devices) = Self::detect_device_for_mount_point_static(parity) { detected_devices.insert(parity.clone(), devices); } } } } else { // Fallback: use legacy filesystem config detection for fs_config in &config.filesystems { if fs_config.monitor { if let Ok(devices) = Self::detect_device_for_mount_point_static(&fs_config.mount_point) { detected_devices.insert(fs_config.mount_point.clone(), devices); } } } } Self { config, temperature_thresholds, detected_devices, storage_topology, } } /// Auto-discover storage topology by parsing system information fn auto_discover_storage() -> Result { let mounts = Self::parse_proc_mounts()?; let mut single_disks = Vec::new(); let mut mergerfs_pools = Vec::new(); // Filter out unwanted filesystem types and mount points let exclude_fs_types = ["tmpfs", "devtmpfs", "sysfs", "proc", "cgroup", "cgroup2", "devpts"]; let exclude_mount_prefixes = ["/proc", "/sys", "/dev", "/tmp", "/run"]; for mount in mounts { // Skip excluded filesystem types if exclude_fs_types.contains(&mount.fs_type.as_str()) { continue; } // Skip excluded mount point prefixes if exclude_mount_prefixes.iter().any(|prefix| mount.mount_point.starts_with(prefix)) { continue; } match mount.fs_type.as_str() { "fuse.mergerfs" => { // Parse mergerfs pool let data_members = Self::parse_mergerfs_sources(&mount.device); let parity_disks = Self::detect_parity_disks(&data_members); mergerfs_pools.push(MergerfsPoolInfo { mount_point: mount.mount_point.clone(), data_members, parity_disks, }); debug!("Discovered mergerfs pool at {}", mount.mount_point); } "ext4" | "xfs" | "btrfs" | "ntfs" | "vfat" => { // Check if this mount is part of a mergerfs pool let is_mergerfs_member = mergerfs_pools.iter() .any(|pool| pool.data_members.contains(&mount.mount_point) || pool.parity_disks.contains(&mount.mount_point)); if !is_mergerfs_member { debug!("Discovered single disk at {}", mount.mount_point); single_disks.push(mount); } } _ => { debug!("Skipping unsupported filesystem type: {}", mount.fs_type); } } } Ok(StorageTopology { single_disks, mergerfs_pools, }) } /// Parse /proc/mounts to get all mount information fn parse_proc_mounts() -> Result> { let mounts_content = fs::read_to_string("/proc/mounts")?; let mut mounts = Vec::new(); for line in mounts_content.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 3 { mounts.push(MountInfo { device: parts[0].to_string(), mount_point: parts[1].to_string(), fs_type: parts[2].to_string(), }); } } Ok(mounts) } /// Parse mergerfs source string to extract member paths fn parse_mergerfs_sources(source: &str) -> Vec { // MergerFS source format: "/mnt/disk1:/mnt/disk2:/mnt/disk3" source.split(':') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect() } /// Detect potential parity disks based on data member heuristics fn detect_parity_disks(data_members: &[String]) -> Vec { let mut parity_disks = Vec::new(); // Heuristic 1: Look for mount points with "parity" in the name if let Ok(mounts) = Self::parse_proc_mounts() { for mount in mounts { if mount.mount_point.to_lowercase().contains("parity") && (mount.fs_type == "xfs" || mount.fs_type == "ext4") { debug!("Detected parity disk by name: {}", mount.mount_point); parity_disks.push(mount.mount_point); } } } // Heuristic 2: Look for sequential device pattern // If data members are /mnt/disk1, /mnt/disk2, look for /mnt/disk* that's not in data if parity_disks.is_empty() { if let Some(pattern) = Self::extract_mount_pattern(data_members) { if let Ok(mounts) = Self::parse_proc_mounts() { for mount in mounts { if mount.mount_point.starts_with(&pattern) && !data_members.contains(&mount.mount_point) && (mount.fs_type == "xfs" || mount.fs_type == "ext4") { debug!("Detected parity disk by pattern: {}", mount.mount_point); parity_disks.push(mount.mount_point); } } } } } parity_disks } /// Extract common mount point pattern from data members fn extract_mount_pattern(data_members: &[String]) -> Option { if data_members.is_empty() { return None; } // Find common prefix (e.g., "/mnt/disk" from "/mnt/disk1", "/mnt/disk2") let first = &data_members[0]; if let Some(last_slash) = first.rfind('/') { let base = &first[..last_slash + 1]; // Include the slash // Check if all members share this base if data_members.iter().all(|member| member.starts_with(base)) { return Some(base.to_string()); } } None } /// Calculate disk temperature status using hysteresis thresholds fn calculate_temperature_status(&self, metric_name: &str, temperature: f32, status_tracker: &mut StatusTracker) -> Status { status_tracker.calculate_with_hysteresis(metric_name, temperature, &self.temperature_thresholds) } /// Get storage pools using auto-discovered topology or fallback to configuration fn get_configured_storage_pools(&self) -> Result> { if let Some(ref topology) = self.storage_topology { self.get_auto_discovered_storage_pools(topology) } else { self.get_legacy_configured_storage_pools() } } /// Get storage pools from auto-discovered topology fn get_auto_discovered_storage_pools(&self, topology: &StorageTopology) -> Result> { let mut storage_pools = Vec::new(); // Group single disks by physical drive for unified pool display let grouped_disks = self.group_filesystems_by_physical_drive(&topology.single_disks)?; // Process grouped single disks (each physical drive becomes a pool) for (drive_name, filesystems) in grouped_disks { // Create a unified pool for this physical drive let pool = self.create_physical_drive_pool(&drive_name, &filesystems)?; storage_pools.push(pool); } // IMPORTANT: Do not create individual filesystem pools when using auto-discovery // All single disk filesystems should be grouped into physical drive pools above // Process mergerfs pools (these remain as logical pools) for pool_info in &topology.mergerfs_pools { if let Ok((total_bytes, used_bytes)) = self.get_filesystem_info(&pool_info.mount_point) { let available_bytes = total_bytes - used_bytes; let usage_percent = if total_bytes > 0 { (used_bytes as f64 / total_bytes as f64) * 100.0 } else { 0.0 }; let size = self.bytes_to_human_readable(total_bytes); let used = self.bytes_to_human_readable(used_bytes); let available = self.bytes_to_human_readable(available_bytes); // Collect all member and parity drives let mut all_drives = Vec::new(); // Add data member drives for member in &pool_info.data_members { if let Some(devices) = self.detected_devices.get(member) { all_drives.extend(devices.clone()); } } // Add parity drives for parity in &pool_info.parity_disks { if let Some(devices) = self.detected_devices.get(parity) { all_drives.extend(devices.clone()); } } let underlying_drives = self.get_drive_info_for_devices(&all_drives)?; // Calculate pool health let pool_health = self.calculate_mergerfs_pool_health(&pool_info.data_members, &pool_info.parity_disks, &underlying_drives); // Generate pool name from mount point let name = pool_info.mount_point.trim_start_matches('/').replace('/', "_"); storage_pools.push(StoragePool { name, mount_point: pool_info.mount_point.clone(), filesystem: "fuse.mergerfs".to_string(), pool_type: StoragePoolType::MergerfsPool { data_disks: pool_info.data_members.iter() .filter_map(|member| self.detected_devices.get(member).and_then(|devices| devices.first().cloned())) .collect(), parity_disks: pool_info.parity_disks.iter() .filter_map(|parity| self.detected_devices.get(parity).and_then(|devices| devices.first().cloned())) .collect(), }, size, used, available, usage_percent: usage_percent as f32, underlying_drives, pool_health, }); debug!("Auto-discovered mergerfs pool: {} with {} data + {} parity disks", pool_info.mount_point, pool_info.data_members.len(), pool_info.parity_disks.len()); } } Ok(storage_pools) } /// Group filesystems by their backing physical drive fn group_filesystems_by_physical_drive(&self, filesystems: &[MountInfo]) -> Result>> { let mut grouped = std::collections::HashMap::new(); for fs in filesystems { // Get the physical drive name for this mount point if let Some(devices) = self.detected_devices.get(&fs.mount_point) { if let Some(device_name) = devices.first() { // Extract base drive name from detected device let drive_name = Self::extract_base_device(device_name) .unwrap_or_else(|| device_name.clone()); debug!("Grouping filesystem {} (device: {}) under drive: {}", fs.mount_point, device_name, drive_name); grouped.entry(drive_name).or_insert_with(Vec::new).push(fs.clone()); } } } debug!("Filesystem grouping result: {} drives with filesystems: {:?}", grouped.len(), grouped.keys().collect::>()); Ok(grouped) } /// Create a physical drive pool containing multiple filesystems fn create_physical_drive_pool(&self, drive_name: &str, filesystems: &[MountInfo]) -> Result { if filesystems.is_empty() { return Err(anyhow::anyhow!("No filesystems for drive {}", drive_name)); } // Calculate total usage across all filesystems on this drive let mut total_capacity = 0u64; let mut total_used = 0u64; for fs in filesystems { if let Ok((capacity, used)) = self.get_filesystem_info(&fs.mount_point) { total_capacity += capacity; total_used += used; } } let total_available = total_capacity.saturating_sub(total_used); let usage_percent = if total_capacity > 0 { (total_used as f64 / total_capacity as f64) * 100.0 } else { 0.0 }; // Get drive information for SMART data let device_names = vec![drive_name.to_string()]; let underlying_drives = self.get_drive_info_for_devices(&device_names)?; // Collect filesystem mount points for this drive let filesystem_mount_points: Vec = filesystems.iter() .map(|fs| fs.mount_point.clone()) .collect(); Ok(StoragePool { name: drive_name.to_string(), mount_point: format!("(physical drive)"), // Special marker for physical drives filesystem: "physical".to_string(), pool_type: StoragePoolType::PhysicalDrive { filesystems: filesystem_mount_points, }, size: self.bytes_to_human_readable(total_capacity), used: self.bytes_to_human_readable(total_used), available: self.bytes_to_human_readable(total_available), usage_percent: usage_percent as f32, pool_health: if underlying_drives.iter().all(|d| d.health_status == "PASSED") { PoolHealth::Healthy } else { PoolHealth::Critical }, underlying_drives, }) } /// Calculate pool health specifically for mergerfs pools fn calculate_mergerfs_pool_health(&self, data_members: &[String], parity_disks: &[String], drives: &[DriveInfo]) -> PoolHealth { // Get device names for data and parity drives let mut data_device_names = Vec::new(); let mut parity_device_names = Vec::new(); for member in data_members { if let Some(devices) = self.detected_devices.get(member) { data_device_names.extend(devices.clone()); } } for parity in parity_disks { if let Some(devices) = self.detected_devices.get(parity) { parity_device_names.extend(devices.clone()); } } let failed_data = drives.iter() .filter(|d| data_device_names.contains(&d.device) && d.health_status != "PASSED") .count(); let failed_parity = drives.iter() .filter(|d| parity_device_names.contains(&d.device) && d.health_status != "PASSED") .count(); match (failed_data, failed_parity) { (0, 0) => PoolHealth::Healthy, (1, 0) => PoolHealth::Degraded, // Can recover with parity (0, 1) => PoolHealth::Degraded, // Lost parity protection _ => PoolHealth::Critical, // Multiple failures } } /// Fallback to legacy configuration-based storage pools fn get_legacy_configured_storage_pools(&self) -> Result> { let mut storage_pools = Vec::new(); let mut processed_pools = std::collections::HashSet::new(); // Legacy implementation: use filesystem configuration for fs_config in &self.config.filesystems { if !fs_config.monitor { continue; } let (pool_type, skip_in_single_mode) = self.determine_pool_type(&fs_config.storage_type); // Skip member disks if they're part of a pool if skip_in_single_mode { continue; } // Check if this pool was already processed (in case of multiple member disks) let pool_key = match &pool_type { StoragePoolType::MergerfsPool { .. } => { // For mergerfs pools, use the main mount point if fs_config.fs_type == "fuse.mergerfs" { fs_config.mount_point.clone() } else { continue; // Skip member disks } } _ => fs_config.mount_point.clone() }; if processed_pools.contains(&pool_key) { continue; } processed_pools.insert(pool_key.clone()); // Get filesystem stats for the mount point match self.get_filesystem_info(&fs_config.mount_point) { Ok((total_bytes, used_bytes)) => { let available_bytes = total_bytes - used_bytes; let usage_percent = if total_bytes > 0 { (used_bytes as f64 / total_bytes as f64) * 100.0 } else { 0.0 }; // Convert bytes to human-readable format let size = self.bytes_to_human_readable(total_bytes); let used = self.bytes_to_human_readable(used_bytes); let available = self.bytes_to_human_readable(available_bytes); // Get underlying drives based on pool type let underlying_drives = self.get_pool_drives(&pool_type, &fs_config.mount_point)?; // Calculate pool health let pool_health = self.calculate_pool_health(&pool_type, &underlying_drives); let drive_count = underlying_drives.len(); storage_pools.push(StoragePool { name: fs_config.name.clone(), mount_point: fs_config.mount_point.clone(), filesystem: fs_config.fs_type.clone(), pool_type: pool_type.clone(), size, used, available, usage_percent: usage_percent as f32, underlying_drives, pool_health, }); debug!( "Legacy configured storage pool '{}' ({:?}) at {} with {} drives, health: {:?}", fs_config.name, pool_type, fs_config.mount_point, drive_count, pool_health ); } Err(e) => { debug!( "Failed to get filesystem info for storage pool '{}': {}", fs_config.name, e ); } } } Ok(storage_pools) } /// Determine the storage pool type from configuration fn determine_pool_type(&self, storage_type: &str) -> (StoragePoolType, bool) { match storage_type { "single" => (StoragePoolType::Single, false), "mergerfs_pool" | "mergerfs" => { // Find associated member disks let data_disks = self.find_pool_member_disks("mergerfs_member"); let parity_disks = self.find_pool_member_disks("parity"); (StoragePoolType::MergerfsPool { data_disks, parity_disks }, false) } "mergerfs_member" => (StoragePoolType::Single, true), // Skip, part of pool "parity" => (StoragePoolType::Single, true), // Skip, part of pool "raid1" | "raid5" | "raid6" => { let member_disks = self.find_pool_member_disks(&format!("{}_member", storage_type)); (StoragePoolType::RaidArray { level: storage_type.to_uppercase(), member_disks, spare_disks: Vec::new() }, false) } _ => (StoragePoolType::Single, false) // Default to single } } /// Find member disks for a specific storage type fn find_pool_member_disks(&self, member_type: &str) -> Vec { let mut member_disks = Vec::new(); for fs_config in &self.config.filesystems { if fs_config.storage_type == member_type && fs_config.monitor { // Get device names for this mount point if let Some(devices) = self.detected_devices.get(&fs_config.mount_point) { member_disks.extend(devices.clone()); } } } member_disks } /// Get drive information for a specific pool type fn get_pool_drives(&self, pool_type: &StoragePoolType, mount_point: &str) -> Result> { match pool_type { StoragePoolType::Single => { // Single disk - use detected devices for this mount point let device_names = self.detected_devices.get(mount_point).cloned().unwrap_or_default(); self.get_drive_info_for_devices(&device_names) } StoragePoolType::PhysicalDrive { .. } => { // Physical drive - get drive info for the drive directly (mount_point not used) let device_names = vec![mount_point.to_string()]; self.get_drive_info_for_devices(&device_names) } StoragePoolType::MergerfsPool { data_disks, parity_disks } => { // Mergerfs pool - collect all member drives let mut all_disks = data_disks.clone(); all_disks.extend(parity_disks.clone()); self.get_drive_info_for_devices(&all_disks) } StoragePoolType::RaidArray { member_disks, spare_disks, .. } => { // RAID array - collect member and spare drives let mut all_disks = member_disks.clone(); all_disks.extend(spare_disks.clone()); self.get_drive_info_for_devices(&all_disks) } StoragePoolType::ZfsPool { .. } => { // ZFS pool - use detected devices (future implementation) let device_names = self.detected_devices.get(mount_point).cloned().unwrap_or_default(); self.get_drive_info_for_devices(&device_names) } } } /// Calculate pool health based on drive status and pool type fn calculate_pool_health(&self, pool_type: &StoragePoolType, drives: &[DriveInfo]) -> PoolHealth { match pool_type { StoragePoolType::Single => { // Single disk - health is just the drive health if drives.is_empty() { PoolHealth::Unknown } else if drives.iter().all(|d| d.health_status == "PASSED") { PoolHealth::Healthy } else { PoolHealth::Critical } } StoragePoolType::PhysicalDrive { .. } => { // Physical drive - health is just the drive health (similar to Single) if drives.is_empty() { PoolHealth::Unknown } else if drives.iter().all(|d| d.health_status == "PASSED") { PoolHealth::Healthy } else { PoolHealth::Critical } } StoragePoolType::MergerfsPool { data_disks, parity_disks } => { let failed_data = drives.iter() .filter(|d| data_disks.contains(&d.device) && d.health_status != "PASSED") .count(); let failed_parity = drives.iter() .filter(|d| parity_disks.contains(&d.device) && d.health_status != "PASSED") .count(); match (failed_data, failed_parity) { (0, 0) => PoolHealth::Healthy, (1, 0) => PoolHealth::Degraded, // Can recover with parity (0, 1) => PoolHealth::Degraded, // Lost parity protection _ => PoolHealth::Critical, // Multiple failures } } StoragePoolType::RaidArray { level, .. } => { let failed_drives = drives.iter().filter(|d| d.health_status != "PASSED").count(); // Basic RAID health logic (can be enhanced per RAID level) match failed_drives { 0 => PoolHealth::Healthy, 1 if level.contains('1') || level.contains('5') || level.contains('6') => PoolHealth::Degraded, _ => PoolHealth::Critical, } } StoragePoolType::ZfsPool { .. } => { // ZFS health would require zpool status parsing (future) if drives.iter().all(|d| d.health_status == "PASSED") { PoolHealth::Healthy } else { PoolHealth::Degraded } } } } /// Get drive information for a list of device names fn get_drive_info_for_devices(&self, device_names: &[String]) -> Result> { let mut drives = Vec::new(); for device_name in device_names { let device_path = format!("/dev/{}", device_name); // Get SMART data for this drive let (health_status, temperature, wear_level) = self.get_smart_data(&device_path); drives.push(DriveInfo { device: device_name.clone(), health_status: health_status.clone(), temperature, wear_level, }); debug!( "Drive info for {}: health={}, temp={:?}°C, wear={:?}%", device_name, health_status, temperature, wear_level ); } Ok(drives) } /// Get SMART data for a drive (health, temperature, wear level) fn get_smart_data(&self, device_path: &str) -> (String, Option, Option) { // Try to get SMART data using smartctl let output = Command::new("sudo") .arg("smartctl") .arg("-a") .arg(device_path) .output(); match output { Ok(result) if result.status.success() => { let stdout = String::from_utf8_lossy(&result.stdout); // Parse health status let health = if stdout.contains("PASSED") { "PASSED".to_string() } else if stdout.contains("FAILED") { "FAILED".to_string() } else { "UNKNOWN".to_string() }; // Parse temperature (look for various temperature indicators) let temperature = self.parse_temperature_from_smart(&stdout); // Parse wear level (for SSDs) let wear_level = self.parse_wear_level_from_smart(&stdout); (health, temperature, wear_level) } _ => { debug!("Failed to get SMART data for {}", device_path); ("UNKNOWN".to_string(), None, None) } } } /// Parse temperature from SMART output fn parse_temperature_from_smart(&self, smart_output: &str) -> Option { for line in smart_output.lines() { // Look for temperature in various formats if line.contains("Temperature_Celsius") || line.contains("Temperature") { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 10 { if let Ok(temp) = parts[9].parse::() { return Some(temp); } } } // NVMe drives might show temperature differently if line.contains("temperature:") { if let Some(temp_part) = line.split("temperature:").nth(1) { if let Some(temp_str) = temp_part.split_whitespace().next() { if let Ok(temp) = temp_str.parse::() { return Some(temp); } } } } } None } /// Parse wear level from SMART output (SSD wear leveling) /// Supports both NVMe and SATA SSD wear indicators fn parse_wear_level_from_smart(&self, smart_output: &str) -> Option { for line in smart_output.lines() { let line = line.trim(); // NVMe drives - direct percentage used if line.contains("Percentage Used:") { if let Some(wear_part) = line.split("Percentage Used:").nth(1) { if let Some(wear_str) = wear_part.split('%').next() { if let Ok(wear) = wear_str.trim().parse::() { return Some(wear); } } } } // SATA SSD attributes - parse SMART table format // Format: ID ATTRIBUTE_NAME FLAG VALUE WORST THRESH TYPE UPDATED WHEN_FAILED RAW_VALUE let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 10 { // SSD Life Left / Percent Lifetime Remaining (higher = less wear) if line.contains("SSD_Life_Left") || line.contains("Percent_Lifetime_Remain") { if let Ok(remaining) = parts[3].parse::() { // VALUE column return Some(100.0 - remaining); // Convert remaining to used } } // Media Wearout Indicator (lower = more wear, normalize to 0-100) if line.contains("Media_Wearout_Indicator") { if let Ok(remaining) = parts[3].parse::() { // VALUE column return Some(100.0 - remaining); // Convert remaining to used } } // Wear Leveling Count (higher = less wear, but varies by manufacturer) if line.contains("Wear_Leveling_Count") { if let Ok(wear_count) = parts[3].parse::() { // VALUE column // Most SSDs: 100 = new, decreases with wear if wear_count <= 100.0 { return Some(100.0 - wear_count); } } } // Total LBAs Written - calculate against typical endurance if available // This is more complex and manufacturer-specific, so we skip for now } } None } /// Convert bytes to human-readable format fn bytes_to_human_readable(&self, bytes: u64) -> String { const UNITS: &[&str] = &["B", "K", "M", "G", "T"]; let mut size = bytes as f64; let mut unit_index = 0; while size >= 1024.0 && unit_index < UNITS.len() - 1 { size /= 1024.0; unit_index += 1; } if unit_index == 0 { format!("{:.0}{}", size, UNITS[unit_index]) } else { format!("{:.1}{}", size, UNITS[unit_index]) } } /// Convert bytes to gigabytes fn bytes_to_gb(&self, bytes: u64) -> f32 { bytes as f32 / (1024.0 * 1024.0 * 1024.0) } /// Detect device backing a mount point using lsblk (static version for startup) fn detect_device_for_mount_point_static(mount_point: &str) -> Result> { let output = Command::new("lsblk") .args(&["-n", "-o", "NAME,MOUNTPOINT"]) .output()?; if !output.status.success() { return Ok(Vec::new()); } let output_str = String::from_utf8_lossy(&output.stdout); for line in output_str.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 && parts[1] == mount_point { // Remove tree symbols and extract device name (e.g., "├─nvme0n1p2" -> "nvme0n1p2") let device_name = parts[0] .trim_start_matches('├') .trim_start_matches('└') .trim_start_matches('─') .trim(); // Extract base device name (e.g., "nvme0n1p2" -> "nvme0n1") if let Some(base_device) = Self::extract_base_device(device_name) { return Ok(vec![base_device]); } } } Ok(Vec::new()) } /// Extract base device name from partition (e.g., "nvme0n1p2" -> "nvme0n1", "sda1" -> "sda") fn extract_base_device(device_name: &str) -> Option { // Handle NVMe devices (nvme0n1p1 -> nvme0n1) if device_name.starts_with("nvme") { if let Some(p_pos) = device_name.find('p') { return Some(device_name[..p_pos].to_string()); } } // Handle traditional devices (sda1 -> sda) if device_name.len() > 1 { let chars: Vec = device_name.chars().collect(); let mut end_idx = chars.len(); // Find where the device name ends and partition number begins for (i, &c) in chars.iter().enumerate().rev() { if !c.is_ascii_digit() { end_idx = i + 1; break; } } if end_idx > 0 && end_idx < chars.len() { return Some(chars[..end_idx].iter().collect()); } } // If no partition detected, return as-is Some(device_name.to_string()) } /// Get filesystem info using df command fn get_filesystem_info(&self, path: &str) -> Result<(u64, u64)> { let output = Command::new("df") .arg("--block-size=1") .arg(path) .output()?; if !output.status.success() { return Err(anyhow::anyhow!("df command failed for {}", path)); } let output_str = String::from_utf8(output.stdout)?; let lines: Vec<&str> = output_str.lines().collect(); if lines.len() < 2 { return Err(anyhow::anyhow!("Unexpected df output format")); } let fields: Vec<&str> = lines[1].split_whitespace().collect(); if fields.len() < 4 { return Err(anyhow::anyhow!("Unexpected df fields count")); } let total_bytes = fields[1].parse::()?; let used_bytes = fields[2].parse::()?; Ok((total_bytes, used_bytes)) } /// Parse size string (e.g., "120G", "45M") to GB value fn parse_size_to_gb(&self, size_str: &str) -> f32 { let size_str = size_str.trim(); if size_str.is_empty() || size_str == "-" { return 0.0; } // Extract numeric part and unit let (num_str, unit) = if let Some(last_char) = size_str.chars().last() { if last_char.is_alphabetic() { let num_part = &size_str[..size_str.len() - 1]; let unit_part = &size_str[size_str.len() - 1..]; (num_part, unit_part) } else { (size_str, "") } } else { (size_str, "") }; let number: f32 = num_str.parse().unwrap_or(0.0); match unit.to_uppercase().as_str() { "T" | "TB" => number * 1024.0, "G" | "GB" => number, "M" | "MB" => number / 1024.0, "K" | "KB" => number / (1024.0 * 1024.0), "B" | "" => number / (1024.0 * 1024.0 * 1024.0), _ => number, // Assume GB if unknown unit } } } #[async_trait] impl Collector for DiskCollector { async fn collect(&self, status_tracker: &mut StatusTracker) -> Result, CollectorError> { let start_time = Instant::now(); debug!("Collecting storage pool and individual drive metrics"); let mut metrics = Vec::new(); // Get configured storage pools with individual drive data let storage_pools = match self.get_configured_storage_pools() { Ok(pools) => { debug!("Found {} storage pools", pools.len()); pools } Err(e) => { debug!("Failed to get storage pools: {}", e); Vec::new() } }; // Generate metrics for each storage pool and its underlying drives for storage_pool in &storage_pools { let timestamp = chrono::Utc::now().timestamp() as u64; // Storage pool overall metrics let pool_name = &storage_pool.name; // Parse size strings to get actual values for calculations let size_gb = self.parse_size_to_gb(&storage_pool.size); let used_gb = self.parse_size_to_gb(&storage_pool.used); let avail_gb = self.parse_size_to_gb(&storage_pool.available); // Calculate status based on configured thresholds and pool health let usage_status = if storage_pool.usage_percent >= self.config.usage_critical_percent { Status::Critical } else if storage_pool.usage_percent >= self.config.usage_warning_percent { Status::Warning } else { Status::Ok }; let pool_status = match storage_pool.pool_health { PoolHealth::Critical => Status::Critical, PoolHealth::Degraded => Status::Warning, PoolHealth::Rebuilding => Status::Warning, PoolHealth::Healthy => usage_status, PoolHealth::Unknown => Status::Unknown, }; // Storage pool info metrics metrics.push(Metric { name: format!("disk_{}_mount_point", pool_name), value: MetricValue::String(storage_pool.mount_point.clone()), unit: None, description: Some(format!("Mount: {}", storage_pool.mount_point)), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_filesystem", pool_name), value: MetricValue::String(storage_pool.filesystem.clone()), unit: None, description: Some(format!("FS: {}", storage_pool.filesystem)), status: Status::Ok, timestamp, }); // Enhanced pool type information let pool_type_str = match &storage_pool.pool_type { StoragePoolType::Single => "single".to_string(), StoragePoolType::PhysicalDrive { filesystems } => { format!("drive ({})", filesystems.len()) } StoragePoolType::MergerfsPool { data_disks, parity_disks } => { format!("mergerfs ({}+{})", data_disks.len(), parity_disks.len()) } StoragePoolType::RaidArray { level, member_disks, spare_disks } => { format!("{} ({}+{})", level, member_disks.len(), spare_disks.len()) } StoragePoolType::ZfsPool { pool_name, .. } => { format!("zfs ({})", pool_name) } }; metrics.push(Metric { name: format!("disk_{}_pool_type", pool_name), value: MetricValue::String(pool_type_str.clone()), unit: None, description: Some(format!("Type: {}", pool_type_str)), status: Status::Ok, timestamp, }); // Pool health status let health_str = match storage_pool.pool_health { PoolHealth::Healthy => "healthy", PoolHealth::Degraded => "degraded", PoolHealth::Critical => "critical", PoolHealth::Rebuilding => "rebuilding", PoolHealth::Unknown => "unknown", }; metrics.push(Metric { name: format!("disk_{}_pool_health", pool_name), value: MetricValue::String(health_str.to_string()), unit: None, description: Some(format!("Health: {}", health_str)), status: pool_status, timestamp, }); // Storage pool size metrics metrics.push(Metric { name: format!("disk_{}_total_gb", pool_name), value: MetricValue::Float(size_gb), unit: Some("GB".to_string()), description: Some(format!("Total: {}", storage_pool.size)), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_used_gb", pool_name), value: MetricValue::Float(used_gb), unit: Some("GB".to_string()), description: Some(format!("Used: {}", storage_pool.used)), status: pool_status, timestamp, }); metrics.push(Metric { name: format!("disk_{}_available_gb", pool_name), value: MetricValue::Float(avail_gb), unit: Some("GB".to_string()), description: Some(format!("Available: {}", storage_pool.available)), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_usage_percent", pool_name), value: MetricValue::Float(storage_pool.usage_percent), unit: Some("%".to_string()), description: Some(format!("Usage: {:.1}%", storage_pool.usage_percent)), status: pool_status, timestamp, }); // Individual drive metrics for this storage pool for drive in &storage_pool.underlying_drives { // Drive health status metrics.push(Metric { name: format!("disk_{}_{}_health", pool_name, drive.device), value: MetricValue::String(drive.health_status.clone()), unit: None, description: Some(format!("{}: {}", drive.device, drive.health_status)), status: if drive.health_status == "PASSED" { Status::Ok } else if drive.health_status == "FAILED" { Status::Critical } else { Status::Unknown }, timestamp, }); // Drive temperature if let Some(temp) = drive.temperature { let temp_status = self.calculate_temperature_status( &format!("disk_{}_{}_temperature", pool_name, drive.device), temp, status_tracker ); metrics.push(Metric { name: format!("disk_{}_{}_temperature", pool_name, drive.device), value: MetricValue::Float(temp), unit: Some("°C".to_string()), description: Some(format!("{}: {:.0}°C", drive.device, temp)), status: temp_status, timestamp, }); } // Drive wear level (for SSDs) if let Some(wear) = drive.wear_level { let wear_status = if wear >= self.config.wear_critical_percent { Status::Critical } else if wear >= self.config.wear_warning_percent { Status::Warning } else { Status::Ok }; metrics.push(Metric { name: format!("disk_{}_{}_wear_percent", pool_name, drive.device), value: MetricValue::Float(wear), unit: Some("%".to_string()), description: Some(format!("{}: {:.0}% wear", drive.device, wear)), status: wear_status, timestamp, }); } } // Individual filesystem metrics for PhysicalDrive pools if let StoragePoolType::PhysicalDrive { filesystems } = &storage_pool.pool_type { for filesystem_mount in filesystems { if let Ok((total_bytes, used_bytes)) = self.get_filesystem_info(filesystem_mount) { let available_bytes = total_bytes - used_bytes; let usage_percent = if total_bytes > 0 { (used_bytes as f64 / total_bytes as f64) * 100.0 } else { 0.0 }; let filesystem_name = if filesystem_mount == "/" { "root".to_string() } else { filesystem_mount.trim_start_matches('/').replace('/', "_") }; // Calculate filesystem status based on usage let fs_status = if usage_percent >= self.config.usage_critical_percent as f64 { Status::Critical } else if usage_percent >= self.config.usage_warning_percent as f64 { Status::Warning } else { Status::Ok }; // Filesystem usage metrics metrics.push(Metric { name: format!("disk_{}_fs_{}_usage_percent", pool_name, filesystem_name), value: MetricValue::Float(usage_percent as f32), unit: Some("%".to_string()), description: Some(format!("{}: {:.0}%", filesystem_mount, usage_percent)), status: fs_status.clone(), timestamp, }); metrics.push(Metric { name: format!("disk_{}_fs_{}_used_gb", pool_name, filesystem_name), value: MetricValue::Float(self.bytes_to_gb(used_bytes)), unit: Some("GB".to_string()), description: Some(format!("{}: {}GB used", filesystem_mount, self.bytes_to_human_readable(used_bytes))), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_fs_{}_total_gb", pool_name, filesystem_name), value: MetricValue::Float(self.bytes_to_gb(total_bytes)), unit: Some("GB".to_string()), description: Some(format!("{}: {}GB total", filesystem_mount, self.bytes_to_human_readable(total_bytes))), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_fs_{}_available_gb", pool_name, filesystem_name), value: MetricValue::Float(self.bytes_to_gb(available_bytes)), unit: Some("GB".to_string()), description: Some(format!("{}: {}GB available", filesystem_mount, self.bytes_to_human_readable(available_bytes))), status: Status::Ok, timestamp, }); metrics.push(Metric { name: format!("disk_{}_fs_{}_mount_point", pool_name, filesystem_name), value: MetricValue::String(filesystem_mount.clone()), unit: None, description: Some(format!("Mount: {}", filesystem_mount)), status: Status::Ok, timestamp, }); } } } } // Add storage pool count metric metrics.push(Metric { name: "disk_count".to_string(), value: MetricValue::Integer(storage_pools.len() as i64), unit: None, description: Some(format!("Total storage pools: {}", storage_pools.len())), status: Status::Ok, timestamp: chrono::Utc::now().timestamp() as u64, }); let collection_time = start_time.elapsed(); debug!( "Multi-disk collection completed in {:?} with {} metrics", collection_time, metrics.len() ); Ok(metrics) } }