Christoffer Martinsson a7b69b8ae7
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
Fix duplicate data by clearing vectors before collection
Collectors now clear their target vectors (tmpfs, drives, pools, services)
before populating to prevent duplicates when updating cached AgentData.

- Clear tmpfs list in memory collector
- Clear drives and pools in disk collector
- Clear services in systemd collector
- Bump version to v0.1.231
2025-12-01 13:21:26 +01:00

853 lines
35 KiB
Rust

use anyhow::Result;
use async_trait::async_trait;
use cm_dashboard_shared::{AgentData, DriveData, FilesystemData, PoolData, HysteresisThresholds, Status};
use crate::config::DiskConfig;
use tokio::process::Command as TokioCommand;
use std::process::Command as StdCommand;
use std::collections::HashMap;
use super::{Collector, CollectorError};
/// Storage collector with clean architecture and structured data output
pub struct DiskCollector {
config: DiskConfig,
temperature_thresholds: HysteresisThresholds,
}
/// A physical drive with its filesystems
#[derive(Debug, Clone)]
struct PhysicalDrive {
name: String, // e.g., "nvme0n1", "sda"
health: String, // SMART health status
filesystems: Vec<Filesystem>, // mounted filesystems on this drive
}
/// A filesystem mounted on a drive
#[derive(Debug, Clone)]
struct Filesystem {
mount_point: String, // e.g., "/", "/boot"
usage_percent: f32, // Usage percentage
used_bytes: u64, // Used bytes
total_bytes: u64, // Total bytes
}
/// MergerFS pool
#[derive(Debug, Clone)]
struct MergerfsPool {
name: String, // e.g., "srv_media"
mount_point: String, // e.g., "/srv/media"
total_bytes: u64, // Pool total bytes
used_bytes: u64, // Pool used bytes
data_drives: Vec<PoolDrive>, // Data drives in pool
parity_drives: Vec<PoolDrive>, // Parity drives in pool
}
/// Drive in a storage pool
#[derive(Debug, Clone)]
struct PoolDrive {
name: String, // Drive name
mount_point: String, // e.g., "/mnt/disk1"
temperature_celsius: Option<f32>, // Drive temperature
}
impl DiskCollector {
pub fn new(config: DiskConfig) -> Self {
let temperature_thresholds = HysteresisThresholds::new(
config.temperature_warning_celsius,
config.temperature_critical_celsius,
);
Self {
config,
temperature_thresholds,
}
}
/// Collect all storage data and populate AgentData
async fn collect_storage_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Clear drives and pools to prevent duplicates when updating cached data
agent_data.system.storage.drives.clear();
agent_data.system.storage.pools.clear();
// Step 1: Get mount points and their backing devices
let mount_devices = self.get_mount_devices().await?;
// Step 2: Get filesystem usage for each mount point using df
let mut filesystem_usage = self.get_filesystem_usage(&mount_devices).map_err(|e| CollectorError::Parse {
value: "filesystem usage".to_string(),
error: format!("Failed to get filesystem usage: {}", e),
})?;
// Step 2.5: Add MergerFS mount points that weren't in lsblk output
self.add_mergerfs_filesystem_usage(&mut filesystem_usage).map_err(|e| CollectorError::Parse {
value: "mergerfs filesystem usage".to_string(),
error: format!("Failed to get mergerfs filesystem usage: {}", e),
})?;
// Step 3: Detect MergerFS pools
let mergerfs_pools = self.detect_mergerfs_pools(&filesystem_usage).map_err(|e| CollectorError::Parse {
value: "mergerfs pools".to_string(),
error: format!("Failed to detect mergerfs pools: {}", e),
})?;
// Step 4: Group filesystems by physical drive (excluding mergerfs members)
let physical_drives = self.group_by_physical_drive(&mount_devices, &filesystem_usage, &mergerfs_pools).map_err(|e| CollectorError::Parse {
value: "physical drives".to_string(),
error: format!("Failed to group by physical drive: {}", e),
})?;
// Step 5: Get SMART data for all drives
let smart_data = self.get_smart_data_for_drives(&physical_drives, &mergerfs_pools).await;
// Step 6: Populate AgentData
self.populate_drives_data(&physical_drives, &smart_data, agent_data)?;
self.populate_pools_data(&mergerfs_pools, &smart_data, agent_data)?;
Ok(())
}
/// Get block devices and their mount points using lsblk
async fn get_mount_devices(&self) -> Result<HashMap<String, String>, CollectorError> {
use super::run_command_with_timeout;
let mut cmd = TokioCommand::new("lsblk");
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
let output = run_command_with_timeout(cmd, 2).await
.map_err(|e| CollectorError::SystemRead {
path: "block devices".to_string(),
error: e.to_string(),
})?;
let mut mount_devices = HashMap::new();
for line in String::from_utf8_lossy(&output.stdout).lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
let device_name = parts[0];
let mount_point = parts[1];
// Skip swap partitions and unmounted devices
if mount_point == "[SWAP]" || mount_point.is_empty() {
continue;
}
// Convert device name to full path
let device_path = format!("/dev/{}", device_name);
mount_devices.insert(mount_point.to_string(), device_path);
}
}
Ok(mount_devices)
}
/// Use df to get filesystem usage for mount points
fn get_filesystem_usage(&self, mount_devices: &HashMap<String, String>) -> anyhow::Result<HashMap<String, (u64, u64)>> {
let mut filesystem_usage = HashMap::new();
for mount_point in mount_devices.keys() {
match self.get_filesystem_info(mount_point) {
Ok((total, used)) => {
filesystem_usage.insert(mount_point.clone(), (total, used));
}
Err(_e) => {
// Silently skip filesystems we can't read
}
}
}
Ok(filesystem_usage)
}
/// Add filesystem usage for MergerFS mount points that aren't in lsblk
fn add_mergerfs_filesystem_usage(&self, filesystem_usage: &mut HashMap<String, (u64, u64)>) -> anyhow::Result<()> {
let mounts_content = std::fs::read_to_string("/proc/mounts")
.map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?;
for line in mounts_content.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 && parts[2] == "fuse.mergerfs" {
let mount_point = parts[1].to_string();
// Only add if we don't already have usage data for this mount point
if !filesystem_usage.contains_key(&mount_point) {
if let Ok((total, used)) = self.get_filesystem_info(&mount_point) {
filesystem_usage.insert(mount_point, (total, used));
}
}
}
}
Ok(())
}
/// Get filesystem info for a single mount point
fn get_filesystem_info(&self, mount_point: &str) -> Result<(u64, u64), CollectorError> {
let output = StdCommand::new("timeout")
.args(&["2", "df", "--block-size=1", mount_point])
.output()
.map_err(|e| CollectorError::SystemRead {
path: format!("df {}", mount_point),
error: e.to_string(),
})?;
let output_str = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = output_str.lines().collect();
if lines.len() < 2 {
return Err(CollectorError::Parse {
value: output_str.to_string(),
error: "Expected at least 2 lines from df output".to_string(),
});
}
// Parse the data line (skip header)
let parts: Vec<&str> = lines[1].split_whitespace().collect();
if parts.len() < 4 {
return Err(CollectorError::Parse {
value: lines[1].to_string(),
error: "Expected at least 4 fields in df output".to_string(),
});
}
let total_bytes: u64 = parts[1].parse().map_err(|e| CollectorError::Parse {
value: parts[1].to_string(),
error: format!("Failed to parse total bytes: {}", e),
})?;
let used_bytes: u64 = parts[2].parse().map_err(|e| CollectorError::Parse {
value: parts[2].to_string(),
error: format!("Failed to parse used bytes: {}", e),
})?;
Ok((total_bytes, used_bytes))
}
/// Detect MergerFS pools from mount data
fn detect_mergerfs_pools(&self, filesystem_usage: &HashMap<String, (u64, u64)>) -> anyhow::Result<Vec<MergerfsPool>> {
let mounts_content = std::fs::read_to_string("/proc/mounts")
.map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?;
let mut pools = Vec::new();
for line in mounts_content.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 && parts[2] == "fuse.mergerfs" {
let mount_point = parts[1].to_string();
let device_sources = parts[0]; // e.g., "/mnt/disk1:/mnt/disk2"
// Get pool usage
let (total_bytes, used_bytes) = filesystem_usage.get(&mount_point)
.copied()
.unwrap_or((0, 0));
// Extract pool name from mount point (e.g., "/srv/media" -> "srv_media")
let pool_name = if mount_point == "/" {
"root".to_string()
} else {
mount_point.trim_start_matches('/').replace('/', "_")
};
if pool_name.is_empty() {
continue;
}
// Parse member paths - handle both full paths and numeric references
let raw_paths: Vec<String> = device_sources
.split(':')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
// Convert numeric references to actual mount points if needed
let member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
self.resolve_numeric_mergerfs_paths(&raw_paths)?
} else {
// Already full paths
raw_paths
};
// For SnapRAID setups, include parity drives that are related to this pool's data drives
let mut all_member_paths = member_paths.clone();
let related_parity_paths = self.discover_related_parity_drives(&member_paths)?;
all_member_paths.extend(related_parity_paths);
// Categorize as data vs parity drives
let (data_drives, parity_drives) = match self.categorize_pool_drives(&all_member_paths) {
Ok(drives) => drives,
Err(_e) => {
continue;
}
};
pools.push(MergerfsPool {
name: pool_name,
mount_point,
total_bytes,
used_bytes,
data_drives,
parity_drives,
});
}
}
Ok(pools)
}
/// Group filesystems by physical drive (excluding mergerfs members) - exact old logic
fn group_by_physical_drive(
&self,
mount_devices: &HashMap<String, String>,
filesystem_usage: &HashMap<String, (u64, u64)>,
mergerfs_pools: &[MergerfsPool]
) -> anyhow::Result<Vec<PhysicalDrive>> {
let mut drive_groups: HashMap<String, Vec<Filesystem>> = HashMap::new();
// Get all mergerfs member paths to exclude them - exactly like old code
let mut mergerfs_members = std::collections::HashSet::new();
for pool in mergerfs_pools {
for drive in &pool.data_drives {
mergerfs_members.insert(drive.mount_point.clone());
}
for drive in &pool.parity_drives {
mergerfs_members.insert(drive.mount_point.clone());
}
}
// Group filesystems by base device
for (mount_point, device) in mount_devices {
// Skip mergerfs member mounts
if mergerfs_members.contains(mount_point) {
continue;
}
let base_device = self.extract_base_device(device);
if let Some((total, used)) = filesystem_usage.get(mount_point) {
let usage_percent = (*used as f32 / *total as f32) * 100.0;
let filesystem = Filesystem {
mount_point: mount_point.clone(), // Keep actual mount point like "/" and "/boot"
usage_percent,
used_bytes: *used,
total_bytes: *total,
};
drive_groups.entry(base_device).or_insert_with(Vec::new).push(filesystem);
}
}
// Convert to PhysicalDrive structs
let mut physical_drives = Vec::new();
for (drive_name, filesystems) in drive_groups {
let physical_drive = PhysicalDrive {
name: drive_name,
health: "UNKNOWN".to_string(), // Will be updated with SMART data
filesystems,
};
physical_drives.push(physical_drive);
}
physical_drives.sort_by(|a, b| a.name.cmp(&b.name));
Ok(physical_drives)
}
/// Extract base device name from device path
fn extract_base_device(&self, device: &str) -> String {
// Extract base device name (e.g., "/dev/nvme0n1p1" -> "nvme0n1")
if let Some(dev_name) = device.strip_prefix("/dev/") {
// Remove partition numbers: nvme0n1p1 -> nvme0n1, sda1 -> sda
if let Some(pos) = dev_name.find('p') {
if dev_name[pos+1..].chars().all(char::is_numeric) {
return dev_name[..pos].to_string();
}
}
// Handle traditional naming: sda1 -> sda
let mut result = String::new();
for ch in dev_name.chars() {
if ch.is_ascii_digit() {
break;
}
result.push(ch);
}
if !result.is_empty() {
return result;
}
}
device.to_string()
}
/// Get SMART data for drives in parallel
async fn get_smart_data_for_drives(&self, physical_drives: &[PhysicalDrive], mergerfs_pools: &[MergerfsPool]) -> HashMap<String, SmartData> {
use futures::future::join_all;
// Collect all drive names
let mut all_drives = std::collections::HashSet::new();
for drive in physical_drives {
all_drives.insert(drive.name.clone());
}
for pool in mergerfs_pools {
for drive in &pool.data_drives {
all_drives.insert(drive.name.clone());
}
for drive in &pool.parity_drives {
all_drives.insert(drive.name.clone());
}
}
// Collect SMART data for all drives in parallel
let futures: Vec<_> = all_drives
.iter()
.map(|drive_name| {
let drive = drive_name.clone();
async move {
let result = self.get_smart_data(&drive).await;
(drive, result)
}
})
.collect();
let results = join_all(futures).await;
// Build HashMap from results
let mut smart_data = HashMap::new();
for (drive_name, result) in results {
if let Ok(data) = result {
smart_data.insert(drive_name, data);
}
}
smart_data
}
/// Get SMART data for a single drive
async fn get_smart_data(&self, drive_name: &str) -> Result<SmartData, CollectorError> {
use super::run_command_with_timeout;
// Use direct smartctl (no sudo) - service has CAP_SYS_RAWIO and CAP_SYS_ADMIN capabilities
// For NVMe drives, specify device type explicitly
let mut cmd = TokioCommand::new("smartctl");
if drive_name.starts_with("nvme") {
cmd.args(&["-d", "nvme", "-a", &format!("/dev/{}", drive_name)]);
} else {
cmd.args(&["-a", &format!("/dev/{}", drive_name)]);
}
let output = run_command_with_timeout(cmd, 3).await
.map_err(|e| CollectorError::SystemRead {
path: format!("SMART data for {}", drive_name),
error: e.to_string(),
})?;
let output_str = String::from_utf8_lossy(&output.stdout);
// Note: smartctl returns non-zero exit codes for warnings (like exit code 32
// for "temperature was high in the past"), but the output data is still valid.
// Only check if we got any output at all, don't reject based on exit code.
if output_str.is_empty() {
return Ok(SmartData {
health: "UNKNOWN".to_string(),
serial_number: None,
temperature_celsius: None,
wear_percent: None,
});
}
let mut health = "UNKNOWN".to_string();
let mut serial_number = None;
let mut temperature = None;
let mut wear_percent = None;
for line in output_str.lines() {
if line.contains("SMART overall-health") {
if line.contains("PASSED") {
health = "PASSED".to_string();
} else if line.contains("FAILED") {
health = "FAILED".to_string();
}
}
// Serial number parsing (both SATA and NVMe)
if line.contains("Serial Number:") {
if let Some(serial_part) = line.split("Serial Number:").nth(1) {
let serial_str = serial_part.trim();
if !serial_str.is_empty() {
// Take first whitespace-separated token
if let Some(serial) = serial_str.split_whitespace().next() {
serial_number = Some(serial.to_string());
}
}
}
}
// Temperature parsing for different drive types
if line.contains("Temperature_Celsius") || line.contains("Airflow_Temperature_Cel") || line.contains("Temperature_Case") {
// Traditional SATA drives: attribute table format
if let Some(temp_str) = line.split_whitespace().nth(9) {
if let Ok(temp) = temp_str.parse::<f32>() {
temperature = Some(temp);
}
}
} else if line.starts_with("Temperature:") {
// NVMe drives: simple "Temperature: 27 Celsius" format
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(temp) = parts[1].parse::<f32>() {
temperature = Some(temp);
}
}
}
// Wear level parsing for SSDs
if line.contains("Media_Wearout_Indicator") {
// Media_Wearout_Indicator stores remaining life % in column 3 (VALUE)
if let Some(wear_str) = line.split_whitespace().nth(3) {
if let Ok(remaining) = wear_str.parse::<f32>() {
wear_percent = Some(100.0 - remaining); // Convert remaining life to wear
}
}
} else if line.contains("Wear_Leveling_Count") || line.contains("SSD_Life_Left") {
// Other wear attributes store value in column 9 (RAW_VALUE)
if let Some(wear_str) = line.split_whitespace().nth(9) {
if let Ok(wear) = wear_str.parse::<f32>() {
wear_percent = Some(100.0 - wear); // Convert remaining life to wear
}
}
}
// NVMe wear parsing: "Percentage Used: 1%"
if line.contains("Percentage Used:") {
if let Some(percent_part) = line.split("Percentage Used:").nth(1) {
if let Some(percent_str) = percent_part.split_whitespace().next() {
if let Some(percent_clean) = percent_str.strip_suffix('%') {
if let Ok(wear) = percent_clean.parse::<f32>() {
wear_percent = Some(wear);
}
}
}
}
}
}
Ok(SmartData {
health,
serial_number,
temperature_celsius: temperature,
wear_percent,
})
}
/// Populate drives data into AgentData
fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
for drive in physical_drives {
let smart = smart_data.get(&drive.name);
let mut filesystems: Vec<FilesystemData> = drive.filesystems.iter().map(|fs| {
FilesystemData {
mount: fs.mount_point.clone(), // This preserves "/" and "/boot" correctly
usage_percent: fs.usage_percent,
used_gb: fs.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
total_gb: fs.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
usage_status: self.calculate_filesystem_usage_status(fs.usage_percent),
}
}).collect();
// Sort filesystems by mount point for consistent display order
filesystems.sort_by(|a, b| a.mount.cmp(&b.mount));
agent_data.system.storage.drives.push(DriveData {
name: drive.name.clone(),
serial_number: smart.and_then(|s| s.serial_number.clone()),
health: smart.map(|s| s.health.clone()).unwrap_or_else(|| drive.health.clone()),
temperature_celsius: smart.and_then(|s| s.temperature_celsius),
wear_percent: smart.and_then(|s| s.wear_percent),
filesystems,
temperature_status: smart.and_then(|s| s.temperature_celsius)
.map(|temp| self.calculate_temperature_status(temp))
.unwrap_or(Status::Unknown),
health_status: self.calculate_health_status(
smart.map(|s| s.health.as_str()).unwrap_or("UNKNOWN")
),
});
}
Ok(())
}
/// Populate pools data into AgentData
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
for pool in mergerfs_pools {
// Calculate pool health and statuses based on member drive health
let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);
let pool_data = PoolData {
name: pool.name.clone(),
mount: pool.mount_point.clone(),
pool_type: format!("mergerfs ({}+{})", pool.data_drives.len(), pool.parity_drives.len()),
health: pool_health,
usage_percent: if pool.total_bytes > 0 {
(pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0
} else { 0.0 },
used_gb: pool.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
total_gb: pool.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
data_drives: data_drive_data,
parity_drives: parity_drive_data,
health_status,
usage_status,
};
agent_data.system.storage.pools.push(pool_data);
}
Ok(())
}
/// Calculate pool health based on member drive status
fn calculate_pool_health(&self, pool: &MergerfsPool, smart_data: &HashMap<String, SmartData>) -> (String, cm_dashboard_shared::Status, cm_dashboard_shared::Status, Vec<cm_dashboard_shared::PoolDriveData>, Vec<cm_dashboard_shared::PoolDriveData>) {
let mut failed_data = 0;
let mut failed_parity = 0;
// Process data drives
let data_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.data_drives.iter().map(|d| {
let smart = smart_data.get(&d.name);
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius);
if health == "FAILED" {
failed_data += 1;
}
// Calculate drive statuses using config thresholds
let health_status = self.calculate_health_status(&health);
let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown);
cm_dashboard_shared::PoolDriveData {
name: d.name.clone(),
serial_number: smart.and_then(|s| s.serial_number.clone()),
temperature_celsius: temperature,
health,
wear_percent: smart.and_then(|s| s.wear_percent),
health_status,
temperature_status,
}
}).collect();
// Process parity drives
let parity_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.parity_drives.iter().map(|d| {
let smart = smart_data.get(&d.name);
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
let temperature = smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius);
if health == "FAILED" {
failed_parity += 1;
}
// Calculate drive statuses using config thresholds
let health_status = self.calculate_health_status(&health);
let temperature_status = temperature.map(|t| self.temperature_thresholds.evaluate(t)).unwrap_or(cm_dashboard_shared::Status::Unknown);
cm_dashboard_shared::PoolDriveData {
name: d.name.clone(),
serial_number: smart.and_then(|s| s.serial_number.clone()),
temperature_celsius: temperature,
health,
wear_percent: smart.and_then(|s| s.wear_percent),
health_status,
temperature_status,
}
}).collect();
// Calculate overall pool health string and status
// SnapRAID logic: can tolerate up to N parity drive failures (where N = number of parity drives)
// If data drives fail AND we've lost parity protection, that's critical
let (pool_health, health_status) = if failed_data == 0 && failed_parity == 0 {
("healthy".to_string(), cm_dashboard_shared::Status::Ok)
} else if failed_data == 0 && failed_parity > 0 {
// Parity failed but no data loss - degraded (reduced protection)
("degraded".to_string(), cm_dashboard_shared::Status::Warning)
} else if failed_data == 1 && failed_parity == 0 {
// One data drive failed, parity intact - degraded (recoverable)
("degraded".to_string(), cm_dashboard_shared::Status::Warning)
} else {
// Multiple data drives failed OR data+parity failed = data loss risk
("critical".to_string(), cm_dashboard_shared::Status::Critical)
};
// Calculate pool usage status using config thresholds
let usage_percent = if pool.total_bytes > 0 {
(pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0
} else { 0.0 };
let usage_status = if usage_percent >= self.config.usage_critical_percent {
cm_dashboard_shared::Status::Critical
} else if usage_percent >= self.config.usage_warning_percent {
cm_dashboard_shared::Status::Warning
} else {
cm_dashboard_shared::Status::Ok
};
(pool_health, health_status, usage_status, data_drive_data, parity_drive_data)
}
/// Calculate filesystem usage status
fn calculate_filesystem_usage_status(&self, usage_percent: f32) -> Status {
// Use standard filesystem warning/critical thresholds
if usage_percent >= 95.0 {
Status::Critical
} else if usage_percent >= 85.0 {
Status::Warning
} else {
Status::Ok
}
}
/// Calculate drive temperature status
fn calculate_temperature_status(&self, temperature: f32) -> Status {
self.temperature_thresholds.evaluate(temperature)
}
/// Calculate drive health status
fn calculate_health_status(&self, health: &str) -> Status {
match health {
"PASSED" => Status::Ok,
"FAILED" => Status::Critical,
_ => Status::Unknown,
}
}
/// Discover parity drives that are related to the given data drives
fn discover_related_parity_drives(&self, data_drives: &[String]) -> anyhow::Result<Vec<String>> {
let mount_devices = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
let mut related_parity = Vec::new();
// Find parity drives that share the same parent directory as the data drives
for data_path in data_drives {
if let Some(parent_dir) = self.get_parent_directory(data_path) {
// Look for parity drives in the same parent directory
for (mount_point, _device) in &mount_devices {
if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) {
if !related_parity.contains(mount_point) {
related_parity.push(mount_point.clone());
}
}
}
}
}
Ok(related_parity)
}
/// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt")
fn get_parent_directory(&self, path: &str) -> Option<String> {
if let Some(last_slash) = path.rfind('/') {
if last_slash > 0 {
return Some(path[..last_slash].to_string());
}
}
None
}
/// Categorize pool member drives as data vs parity
fn categorize_pool_drives(&self, member_paths: &[String]) -> anyhow::Result<(Vec<PoolDrive>, Vec<PoolDrive>)> {
let mut data_drives = Vec::new();
let mut parity_drives = Vec::new();
for path in member_paths {
let drive_info = self.get_drive_info_for_path(path)?;
// Heuristic: if path contains "parity", it's parity
if path.to_lowercase().contains("parity") {
parity_drives.push(drive_info);
} else {
data_drives.push(drive_info);
}
}
Ok((data_drives, parity_drives))
}
/// Get drive information for a mount path
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
// Use lsblk to find the backing device with timeout
let output = StdCommand::new("timeout")
.args(&["2", "lsblk", "-rn", "-o", "NAME,MOUNTPOINT"])
.output()
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;
let output_str = String::from_utf8_lossy(&output.stdout);
let mut device = String::new();
for line in output_str.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 && parts[1] == path {
device = parts[0].to_string();
break;
}
}
if device.is_empty() {
return Err(anyhow::anyhow!("Could not find device for path {}", path));
}
// Extract base device name (e.g., "sda1" -> "sda")
let base_device = self.extract_base_device(&format!("/dev/{}", device));
// Temperature will be filled in later from parallel SMART collection
// Don't collect it here to avoid sequential blocking with problematic async nesting
Ok(PoolDrive {
name: base_device,
mount_point: path.to_string(),
temperature_celsius: None,
})
}
/// Resolve numeric mergerfs references like "1:2" to actual mount paths
fn resolve_numeric_mergerfs_paths(&self, numeric_refs: &[String]) -> anyhow::Result<Vec<String>> {
let mut resolved_paths = Vec::new();
// Get all mount points that look like /mnt/disk* or /mnt/parity*
let mount_devices = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
let mut disk_mounts: Vec<String> = mount_devices.keys()
.filter(|path| path.starts_with("/mnt/disk") || path.starts_with("/mnt/parity"))
.cloned()
.collect();
disk_mounts.sort(); // Ensure consistent ordering
for num_ref in numeric_refs {
if let Ok(index) = num_ref.parse::<usize>() {
// Convert 1-based index to 0-based
if index > 0 && index <= disk_mounts.len() {
resolved_paths.push(disk_mounts[index - 1].clone());
}
}
}
// Fallback: if we couldn't resolve, return the original paths
if resolved_paths.is_empty() {
resolved_paths = numeric_refs.to_vec();
}
Ok(resolved_paths)
}
}
#[async_trait]
impl Collector for DiskCollector {
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
self.collect_storage_data(agent_data).await
}
}
/// SMART data for a drive
#[derive(Debug, Clone)]
struct SmartData {
health: String,
serial_number: Option<String>,
temperature_celsius: Option<f32>,
wear_percent: Option<f32>,
}