use anyhow::Result; use async_trait::async_trait; use cm_dashboard_shared::{AgentData, DriveData, FilesystemData, PoolData, HysteresisThresholds, Status}; use crate::config::DiskConfig; use tokio::process::Command as TokioCommand; use std::process::Command as StdCommand; use std::time::Instant; use std::collections::HashMap; use tracing::debug; use super::{Collector, CollectorError}; /// Storage collector with clean architecture and structured data output pub struct DiskCollector { config: DiskConfig, temperature_thresholds: HysteresisThresholds, } /// A physical drive with its filesystems #[derive(Debug, Clone)] struct PhysicalDrive { name: String, // e.g., "nvme0n1", "sda" health: String, // SMART health status filesystems: Vec, // mounted filesystems on this drive } /// A filesystem mounted on a drive #[derive(Debug, Clone)] struct Filesystem { mount_point: String, // e.g., "/", "/boot" usage_percent: f32, // Usage percentage used_bytes: u64, // Used bytes total_bytes: u64, // Total bytes } /// MergerFS pool #[derive(Debug, Clone)] struct MergerfsPool { name: String, // e.g., "srv_media" mount_point: String, // e.g., "/srv/media" total_bytes: u64, // Pool total bytes used_bytes: u64, // Pool used bytes data_drives: Vec, // Data drives in pool parity_drives: Vec, // Parity drives in pool } /// Drive in a storage pool #[derive(Debug, Clone)] struct PoolDrive { name: String, // Drive name mount_point: String, // e.g., "/mnt/disk1" temperature_celsius: Option, // Drive temperature } impl DiskCollector { pub fn new(config: DiskConfig) -> Self { let temperature_thresholds = HysteresisThresholds::new( config.temperature_warning_celsius, config.temperature_critical_celsius, ); Self { config, temperature_thresholds, } } /// Collect all storage data and populate AgentData async fn collect_storage_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { let start_time = Instant::now(); debug!("Starting clean storage collection"); // Step 1: Get mount points and their backing devices let mount_devices = self.get_mount_devices().await?; // Step 2: Get filesystem usage for each mount point using df let mut filesystem_usage = self.get_filesystem_usage(&mount_devices).map_err(|e| CollectorError::Parse { value: "filesystem usage".to_string(), error: format!("Failed to get filesystem usage: {}", e), })?; // Step 2.5: Add MergerFS mount points that weren't in lsblk output self.add_mergerfs_filesystem_usage(&mut filesystem_usage).map_err(|e| CollectorError::Parse { value: "mergerfs filesystem usage".to_string(), error: format!("Failed to get mergerfs filesystem usage: {}", e), })?; // Step 3: Detect MergerFS pools let mergerfs_pools = self.detect_mergerfs_pools(&filesystem_usage).map_err(|e| CollectorError::Parse { value: "mergerfs pools".to_string(), error: format!("Failed to detect mergerfs pools: {}", e), })?; // Step 4: Group filesystems by physical drive (excluding mergerfs members) let physical_drives = self.group_by_physical_drive(&mount_devices, &filesystem_usage, &mergerfs_pools).map_err(|e| CollectorError::Parse { value: "physical drives".to_string(), error: format!("Failed to group by physical drive: {}", e), })?; // Step 5: Get SMART data for all drives let smart_data = self.get_smart_data_for_drives(&physical_drives, &mergerfs_pools).await; // Step 6: Populate AgentData self.populate_drives_data(&physical_drives, &smart_data, agent_data)?; self.populate_pools_data(&mergerfs_pools, &smart_data, agent_data)?; let elapsed = start_time.elapsed(); debug!("Storage collection completed in {:?}", elapsed); Ok(()) } /// Get block devices and their mount points using lsblk async fn get_mount_devices(&self) -> Result, CollectorError> { use super::run_command_with_timeout; let mut cmd = TokioCommand::new("lsblk"); cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]); let output = run_command_with_timeout(cmd, 2).await .map_err(|e| CollectorError::SystemRead { path: "block devices".to_string(), error: e.to_string(), })?; let mut mount_devices = HashMap::new(); for line in String::from_utf8_lossy(&output.stdout).lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { let device_name = parts[0]; let mount_point = parts[1]; // Skip swap partitions and unmounted devices if mount_point == "[SWAP]" || mount_point.is_empty() { continue; } // Convert device name to full path let device_path = format!("/dev/{}", device_name); mount_devices.insert(mount_point.to_string(), device_path); } } debug!("Found {} mounted block devices", mount_devices.len()); Ok(mount_devices) } /// Use df to get filesystem usage for mount points fn get_filesystem_usage(&self, mount_devices: &HashMap) -> anyhow::Result> { let mut filesystem_usage = HashMap::new(); for mount_point in mount_devices.keys() { match self.get_filesystem_info(mount_point) { Ok((total, used)) => { filesystem_usage.insert(mount_point.clone(), (total, used)); } Err(e) => { debug!("Failed to get filesystem info for {}: {}", mount_point, e); } } } Ok(filesystem_usage) } /// Add filesystem usage for MergerFS mount points that aren't in lsblk fn add_mergerfs_filesystem_usage(&self, filesystem_usage: &mut HashMap) -> anyhow::Result<()> { let mounts_content = std::fs::read_to_string("/proc/mounts") .map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?; for line in mounts_content.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 3 && parts[2] == "fuse.mergerfs" { let mount_point = parts[1].to_string(); // Only add if we don't already have usage data for this mount point if !filesystem_usage.contains_key(&mount_point) { if let Ok((total, used)) = self.get_filesystem_info(&mount_point) { debug!("Added MergerFS filesystem usage for {}: {}GB total, {}GB used", mount_point, total as f32 / (1024.0 * 1024.0 * 1024.0), used as f32 / (1024.0 * 1024.0 * 1024.0)); filesystem_usage.insert(mount_point, (total, used)); } } } } Ok(()) } /// Get filesystem info for a single mount point fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> { let output = StdCommand::new("timeout") .args(&["2", "df", "--block-size=1", mount_point]) .output() .map_err(|e| CollectorError::SystemRead { path: format!("df {}", mount_point), error: e.to_string(), })?; let output_str = String::from_utf8_lossy(&output.stdout); let lines: Vec<&str> = output_str.lines().collect(); if lines.len() < 2 { return Err(CollectorError::Parse { value: output_str.to_string(), error: "Expected at least 2 lines from df output".to_string(), }); } // Parse the data line (skip header) let parts: Vec<&str> = lines[1].split_whitespace().collect(); if parts.len() < 4 { return Err(CollectorError::Parse { value: lines[1].to_string(), error: "Expected at least 4 fields in df output".to_string(), }); } let total_bytes: u64 = parts[1].parse().map_err(|e| CollectorError::Parse { value: parts[1].to_string(), error: format!("Failed to parse total bytes: {}", e), })?; let used_bytes: u64 = parts[2].parse().map_err(|e| CollectorError::Parse { value: parts[2].to_string(), error: format!("Failed to parse used bytes: {}", e), })?; Ok((total_bytes, used_bytes)) } /// Detect MergerFS pools from mount data fn detect_mergerfs_pools(&self, filesystem_usage: &HashMap) -> anyhow::Result> { let mounts_content = std::fs::read_to_string("/proc/mounts") .map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?; let mut pools = Vec::new(); for line in mounts_content.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 3 && parts[2] == "fuse.mergerfs" { let mount_point = parts[1].to_string(); let device_sources = parts[0]; // e.g., "/mnt/disk1:/mnt/disk2" // Get pool usage let (total_bytes, used_bytes) = filesystem_usage.get(&mount_point) .copied() .unwrap_or((0, 0)); // Extract pool name from mount point (e.g., "/srv/media" -> "srv_media") let pool_name = if mount_point == "/" { "root".to_string() } else { mount_point.trim_start_matches('/').replace('/', "_") }; if pool_name.is_empty() { debug!("Skipping mergerfs pool with empty name: {}", mount_point); continue; } // Parse member paths - handle both full paths and numeric references let raw_paths: Vec = device_sources .split(':') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); // Convert numeric references to actual mount points if needed let member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) { // Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths self.resolve_numeric_mergerfs_paths(&raw_paths)? } else { // Already full paths raw_paths }; // For SnapRAID setups, include parity drives that are related to this pool's data drives let mut all_member_paths = member_paths.clone(); let related_parity_paths = self.discover_related_parity_drives(&member_paths)?; all_member_paths.extend(related_parity_paths); // Categorize as data vs parity drives let (data_drives, parity_drives) = match self.categorize_pool_drives(&all_member_paths) { Ok(drives) => drives, Err(e) => { debug!("Failed to categorize drives for pool {}: {}. Skipping.", mount_point, e); continue; } }; pools.push(MergerfsPool { name: pool_name, mount_point, total_bytes, used_bytes, data_drives, parity_drives, }); } } debug!("Found {} mergerfs pools", pools.len()); Ok(pools) } /// Group filesystems by physical drive (excluding mergerfs members) - exact old logic fn group_by_physical_drive( &self, mount_devices: &HashMap, filesystem_usage: &HashMap, mergerfs_pools: &[MergerfsPool] ) -> anyhow::Result> { let mut drive_groups: HashMap> = HashMap::new(); // Get all mergerfs member paths to exclude them - exactly like old code let mut mergerfs_members = std::collections::HashSet::new(); for pool in mergerfs_pools { for drive in &pool.data_drives { mergerfs_members.insert(drive.mount_point.clone()); } for drive in &pool.parity_drives { mergerfs_members.insert(drive.mount_point.clone()); } } // Group filesystems by base device for (mount_point, device) in mount_devices { // Skip mergerfs member mounts if mergerfs_members.contains(mount_point) { continue; } let base_device = self.extract_base_device(device); if let Some((total, used)) = filesystem_usage.get(mount_point) { let usage_percent = (*used as f32 / *total as f32) * 100.0; let filesystem = Filesystem { mount_point: mount_point.clone(), // Keep actual mount point like "/" and "/boot" usage_percent, used_bytes: *used, total_bytes: *total, }; drive_groups.entry(base_device).or_insert_with(Vec::new).push(filesystem); } } // Convert to PhysicalDrive structs let mut physical_drives = Vec::new(); for (drive_name, filesystems) in drive_groups { let physical_drive = PhysicalDrive { name: drive_name, health: "UNKNOWN".to_string(), // Will be updated with SMART data filesystems, }; physical_drives.push(physical_drive); } physical_drives.sort_by(|a, b| a.name.cmp(&b.name)); Ok(physical_drives) } /// Extract base device name from device path fn extract_base_device(&self, device: &str) -> String { // Extract base device name (e.g., "/dev/nvme0n1p1" -> "nvme0n1") if let Some(dev_name) = device.strip_prefix("/dev/") { // Remove partition numbers: nvme0n1p1 -> nvme0n1, sda1 -> sda if let Some(pos) = dev_name.find('p') { if dev_name[pos+1..].chars().all(char::is_numeric) { return dev_name[..pos].to_string(); } } // Handle traditional naming: sda1 -> sda let mut result = String::new(); for ch in dev_name.chars() { if ch.is_ascii_digit() { break; } result.push(ch); } if !result.is_empty() { return result; } } device.to_string() } /// Get SMART data for drives in parallel async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap { use futures::future::join_all; // Collect all drive names let mut all_drives = std::collections::HashSet::new(); for drive in physical_drives { all_drives.insert(drive.name.clone()); } for pool in mergerfs_pools { for drive in &pool.data_drives { all_drives.insert(drive.name.clone()); } for drive in &pool.parity_drives { all_drives.insert(drive.name.clone()); } } // Collect SMART data for all drives in parallel let futures: Vec<_> = all_drives .iter() .map(|drive_name| { let drive = drive_name.clone(); async move { let result = self.get_smart_data(&drive).await; (drive, result) } }) .collect(); let results = join_all(futures).await; // Build HashMap from results let mut smart_data = HashMap::new(); for (drive_name, result) in results { if let Ok(data) = result { smart_data.insert(drive_name, data); } } smart_data } /// Get SMART data for a single drive async fn get_smart_data(&self, drive_name: &str) -> Result { use super::run_command_with_timeout; // Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities // For NVMe drives, specify device type explicitly let mut cmd = TokioCommand::new("smartctl"); if drive_name.starts_with("nvme") { cmd.args(&["-d", "nvme", "-a", &format!("/dev/{}", drive_name)]); } else { cmd.args(&["-a", &format!("/dev/{}", drive_name)]); } let output = run_command_with_timeout(cmd, 3).await .map_err(|e| CollectorError::SystemRead { path: format!("SMART data for {}", drive_name), error: e.to_string(), })?; let output_str = String::from_utf8_lossy(&output.stdout); if !output.status.success() { // Return unknown data rather than failing completely return Ok(SmartData { health: "UNKNOWN".to_string(), serial_number: None, temperature_celsius: None, wear_percent: None, }); } let mut health = "UNKNOWN".to_string(); let mut serial_number = None; let mut temperature = None; let mut wear_percent = None; for line in output_str.lines() { if line.contains("SMART overall-health") { if line.contains("PASSED") { health = "PASSED".to_string(); } else if line.contains("FAILED") { health = "FAILED".to_string(); } } // Serial number parsing (both SATA and NVMe) if line.contains("Serial Number:") { if let Some(serial_part) = line.split("Serial Number:").nth(1) { let serial_str = serial_part.trim(); if !serial_str.is_empty() { // Take first whitespace-separated token if let Some(serial) = serial_str.split_whitespace().next() { serial_number = Some(serial.to_string()); } } } } // Temperature parsing for different drive types if line.contains("Temperature_Celsius") || line.contains("Airflow_Temperature_Cel") || line.contains("Temperature_Case") { // Traditional SATA drives: attribute table format if let Some(temp_str) = line.split_whitespace().nth(9) { if let Ok(temp) = temp_str.parse::() { temperature = Some(temp); } } } else if line.starts_with("Temperature:") { // NVMe drives: simple "Temperature: 27 Celsius" format let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { if let Ok(temp) = parts[1].parse::() { temperature = Some(temp); } } } // Wear level parsing for SSDs if line.contains("Media_Wearout_Indicator") { // Media_Wearout_Indicator stores remaining life % in column 3 (VALUE) if let Some(wear_str) = line.split_whitespace().nth(3) { if let Ok(remaining) = wear_str.parse::() { wear_percent = Some(100.0 - remaining); // Convert remaining life to wear } } } else if line.contains("Wear_Leveling_Count") || line.contains("SSD_Life_Left") { // Other wear attributes store value in column 9 (RAW_VALUE) if let Some(wear_str) = line.split_whitespace().nth(9) { if let Ok(wear) = wear_str.parse::() { wear_percent = Some(100.0 - wear); // Convert remaining life to wear } } } // NVMe wear parsing: "Percentage Used: 1%" if line.contains("Percentage Used:") { if let Some(percent_part) = line.split("Percentage Used:").nth(1) { if let Some(percent_str) = percent_part.split_whitespace().next() { if let Some(percent_clean) = percent_str.strip_suffix('%') { if let Ok(wear) = percent_clean.parse::() { wear_percent = Some(wear); } } } } } } Ok(SmartData { health, serial_number, temperature_celsius: temperature, wear_percent, }) } /// Populate drives data into AgentData fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap, agent_data: &mut AgentData) -> Result<(), CollectorError> { for drive in physical_drives { let smart = smart_data.get(&drive.name); let mut filesystems: Vec = drive.filesystems.iter().map(|fs| { FilesystemData { mount: fs.mount_point.clone(), // This preserves "/" and "/boot" correctly usage_percent: fs.usage_percent, used_gb: fs.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0), total_gb: fs.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0), usage_status: self.calculate_filesystem_usage_status(fs.usage_percent), } }).collect(); // Sort filesystems by mount point for consistent display order filesystems.sort_by(|a, b| a.mount.cmp(&b.mount)); agent_data.system.storage.drives.push(DriveData { name: drive.name.clone(), serial_number: smart.and_then(|s| s.serial_number.clone()), health: smart.map(|s| s.health.clone()).unwrap_or_else(|| drive.health.clone()), temperature_celsius: smart.and_then(|s| s.temperature_celsius), wear_percent: smart.and_then(|s| s.wear_percent), filesystems, temperature_status: smart.and_then(|s| s.temperature_celsius) .map(|temp| self.calculate_temperature_status(temp)) .unwrap_or(Status::Unknown), health_status: self.calculate_health_status( smart.map(|s| s.health.as_str()).unwrap_or("UNKNOWN") ), }); } Ok(()) } /// Populate pools data into AgentData fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap, agent_data: &mut AgentData) -> Result<(), CollectorError> { for pool in mergerfs_pools { // Calculate pool health and statuses based on member drive health let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data); let pool_data = PoolData { name: pool.name.clone(), mount: pool.mount_point.clone(), pool_type: format!("mergerfs ({}+{})", pool.data_drives.len(), pool.parity_drives.len()), health: pool_health, usage_percent: if pool.total_bytes > 0 { (pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0 } else { 0.0 }, used_gb: pool.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0), total_gb: pool.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0), data_drives: data_drive_data, parity_drives: parity_drive_data, health_status, usage_status, }; agent_data.system.storage.pools.push(pool_data); } Ok(()) } /// Calculate pool health based on member drive status fn calculate_pool_health(&self, pool: &MergerfsPool, smart_data: &HashMap) -> (String, cm_dashboard_shared::Status, cm_dashboard_shared::Status, Vec, Vec) { let mut failed_data = 0; let mut failed_parity = 0; // Process data drives let data_drive_data: Vec = pool.data_drives.iter().map(|d| { let smart = smart_data.get(&d.name); let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string()); let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius); if health == "FAILED" { failed_data += 1; } // Calculate drive statuses using config thresholds let health_status = self.calculate_health_status(&health); let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown); cm_dashboard_shared::PoolDriveData { name: d.name.clone(), serial_number: smart.and_then(|s| s.serial_number.clone()), temperature_celsius: temperature, health, wear_percent: smart.and_then(|s| s.wear_percent), health_status, temperature_status, } }).collect(); // Process parity drives let parity_drive_data: Vec = pool.parity_drives.iter().map(|d| { let smart = smart_data.get(&d.name); let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string()); let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius); if health == "FAILED" { failed_parity += 1; } // Calculate drive statuses using config thresholds let health_status = self.calculate_health_status(&health); let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown); cm_dashboard_shared::PoolDriveData { name: d.name.clone(), serial_number: smart.and_then(|s| s.serial_number.clone()), temperature_celsius: temperature, health, wear_percent: smart.and_then(|s| s.wear_percent), health_status, temperature_status, } }).collect(); // Calculate overall pool health string and status // SnapRAID logic: can tolerate up to N parity drive failures (where N = number of parity drives) // If data drives fail AND we've lost parity protection, that's critical let (pool_health, health_status) = if failed_data == 0 && failed_parity == 0 { ("healthy".to_string(), cm_dashboard_shared::Status::Ok) } else if failed_data == 0 && failed_parity > 0 { // Parity failed but no data loss - degraded (reduced protection) ("degraded".to_string(), cm_dashboard_shared::Status::Warning) } else if failed_data == 1 && failed_parity == 0 { // One data drive failed, parity intact - degraded (recoverable) ("degraded".to_string(), cm_dashboard_shared::Status::Warning) } else { // Multiple data drives failed OR data+parity failed = data loss risk ("critical".to_string(), cm_dashboard_shared::Status::Critical) }; // Calculate pool usage status using config thresholds let usage_percent = if pool.total_bytes > 0 { (pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0 } else { 0.0 }; let usage_status = if usage_percent >= self.config.usage_critical_percent { cm_dashboard_shared::Status::Critical } else if usage_percent >= self.config.usage_warning_percent { cm_dashboard_shared::Status::Warning } else { cm_dashboard_shared::Status::Ok }; (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) } /// Calculate filesystem usage status fn calculate_filesystem_usage_status(&self, usage_percent: f32) -> Status { // Use standard filesystem warning/critical thresholds if usage_percent >= 95.0 { Status::Critical } else if usage_percent >= 85.0 { Status::Warning } else { Status::Ok } } /// Calculate drive temperature status fn calculate_temperature_status(&self, temperature: f32) -> Status { self.temperature_thresholds.evaluate(temperature) } /// Calculate drive health status fn calculate_health_status(&self, health: &str) -> Status { match health { "PASSED" => Status::Ok, "FAILED" => Status::Critical, _ => Status::Unknown, } } /// Discover parity drives that are related to the given data drives fn discover_related_parity_drives(&self, data_drives: &[String]) -> anyhow::Result> { let mount_devices = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(self.get_mount_devices()) }).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?; let mut related_parity = Vec::new(); // Find parity drives that share the same parent directory as the data drives for data_path in data_drives { if let Some(parent_dir) = self.get_parent_directory(data_path) { // Look for parity drives in the same parent directory for (mount_point, _device) in &mount_devices { if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) { if !related_parity.contains(mount_point) { related_parity.push(mount_point.clone()); } } } } } Ok(related_parity) } /// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt") fn get_parent_directory(&self, path: &str) -> Option { if let Some(last_slash) = path.rfind('/') { if last_slash > 0 { return Some(path[..last_slash].to_string()); } } None } /// Categorize pool member drives as data vs parity fn categorize_pool_drives(&self, member_paths: &[String]) -> anyhow::Result<(Vec, Vec)> { let mut data_drives = Vec::new(); let mut parity_drives = Vec::new(); for path in member_paths { let drive_info = self.get_drive_info_for_path(path)?; // Heuristic: if path contains "parity", it's parity if path.to_lowercase().contains("parity") { parity_drives.push(drive_info); } else { data_drives.push(drive_info); } } Ok((data_drives, parity_drives)) } /// Get drive information for a mount path fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result { // Use lsblk to find the backing device with timeout let output = StdCommand::new("timeout") .args(&["2", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"]) .output() .map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?; let output_str = String::from_utf8_lossy(&output.stdout); let mut device = String::new(); for line in output_str.lines() { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 && parts[1] == path { device = parts[0].to_string(); break; } } if device.is_empty() { return Err(anyhow::anyhow!("Could not find device for path {}", path)); } // Extract base device name (e.g., "sda1" -> "sda") let base_device = self.extract_base_device(&format!("/dev/{}", device)); // Temperature will be filled in later from parallel SMART collection // Don't collect it here to avoid sequential blocking with problematic async nesting Ok(PoolDrive { name: base_device, mount_point: path.to_string(), temperature_celsius: None, }) } /// Resolve numeric mergerfs references like "1:2" to actual mount paths fn resolve_numeric_mergerfs_paths(&self, numeric_refs: &[String]) -> anyhow::Result> { let mut resolved_paths = Vec::new(); // Get all mount points that look like /mnt/disk* or /mnt/parity* let mount_devices = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(self.get_mount_devices()) }).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?; let mut disk_mounts: Vec = mount_devices.keys() .filter(|path| path.starts_with("/mnt/disk") || path.starts_with("/mnt/parity")) .cloned() .collect(); disk_mounts.sort(); // Ensure consistent ordering for num_ref in numeric_refs { if let Ok(index) = num_ref.parse::() { // Convert 1-based index to 0-based if index > 0 && index <= disk_mounts.len() { resolved_paths.push(disk_mounts[index - 1].clone()); } } } // Fallback: if we couldn't resolve, return the original paths if resolved_paths.is_empty() { resolved_paths = numeric_refs.to_vec(); } Ok(resolved_paths) } } #[async_trait] impl Collector for DiskCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { self.collect_storage_data(agent_data).await } } /// SMART data for a drive #[derive(Debug, Clone)] struct SmartData { health: String, serial_number: Option, temperature_celsius: Option, wear_percent: Option, }