Restore complete MergerFS and SnapRAID functionality to disk collector
All checks were successful
Build and Release / build-and-release (push) Successful in 1m17s
All checks were successful
Build and Release / build-and-release (push) Successful in 1m17s
Updated the disk collector to include all missing functionality from the previous string-based implementation while working with the new structured JSON data architecture: - MergerFS pool discovery from /proc/mounts parsing - SnapRAID parity drive detection via mount path heuristics - Drive categorization (data vs parity) based on path analysis - Numeric mergerfs reference resolution (1:2 -> /mnt/disk paths) - Pool health calculation based on member drive SMART status - Complete SMART data integration for temperatures and wear levels - Proper exclusion of pool member drives from physical drive grouping The implementation replicates the exact logic from the old code while adapting to structured AgentData output format. All mergerfs and snapraid monitoring capabilities are fully restored.
This commit is contained in:
parent
67b59e9551
commit
7b11db990c
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.149"
|
||||
version = "0.1.150"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
@ -301,7 +301,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.149"
|
||||
version = "0.1.150"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@ -324,7 +324,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.149"
|
||||
version = "0.1.150"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"serde",
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.150"
|
||||
version = "0.1.151"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@ -50,6 +50,7 @@ struct MergerfsPool {
|
||||
#[derive(Debug, Clone)]
|
||||
struct PoolDrive {
|
||||
name: String, // Drive name
|
||||
mount_point: String, // e.g., "/mnt/disk1"
|
||||
temperature_celsius: Option<f32>, // Drive temperature
|
||||
}
|
||||
|
||||
@ -198,16 +199,80 @@ impl DiskCollector {
|
||||
}
|
||||
|
||||
/// Detect MergerFS pools from mount data
|
||||
fn detect_mergerfs_pools(&self, _filesystem_usage: &HashMap<String, (u64, u64)>) -> anyhow::Result<Vec<MergerfsPool>> {
|
||||
let pools = Vec::new();
|
||||
fn detect_mergerfs_pools(&self, filesystem_usage: &HashMap<String, (u64, u64)>) -> anyhow::Result<Vec<MergerfsPool>> {
|
||||
let mounts_content = std::fs::read_to_string("/proc/mounts")
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read /proc/mounts: {}", e))?;
|
||||
let mut pools = Vec::new();
|
||||
|
||||
// For now, return empty pools - full mergerfs detection would require parsing /proc/mounts for fuse.mergerfs
|
||||
// This ensures we don't break existing functionality
|
||||
for line in mounts_content.lines() {
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if parts.len() >= 3 && parts[2] == "fuse.mergerfs" {
|
||||
let mount_point = parts[1].to_string();
|
||||
let device_sources = parts[0]; // e.g., "/mnt/disk1:/mnt/disk2"
|
||||
|
||||
// Get pool usage
|
||||
let (total_bytes, used_bytes) = filesystem_usage.get(&mount_point)
|
||||
.copied()
|
||||
.unwrap_or((0, 0));
|
||||
|
||||
// Extract pool name from mount point (e.g., "/srv/media" -> "srv_media")
|
||||
let pool_name = if mount_point == "/" {
|
||||
"root".to_string()
|
||||
} else {
|
||||
mount_point.trim_start_matches('/').replace('/', "_")
|
||||
};
|
||||
|
||||
if pool_name.is_empty() {
|
||||
debug!("Skipping mergerfs pool with empty name: {}", mount_point);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse member paths - handle both full paths and numeric references
|
||||
let raw_paths: Vec<String> = device_sources
|
||||
.split(':')
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
// Convert numeric references to actual mount points if needed
|
||||
let member_paths = if raw_paths.iter().any(|path| !path.starts_with('/')) {
|
||||
// Handle numeric format like "1:2" by finding corresponding /mnt/disk* paths
|
||||
self.resolve_numeric_mergerfs_paths(&raw_paths)?
|
||||
} else {
|
||||
// Already full paths
|
||||
raw_paths
|
||||
};
|
||||
|
||||
// For SnapRAID setups, include parity drives that are related to this pool's data drives
|
||||
let mut all_member_paths = member_paths.clone();
|
||||
let related_parity_paths = self.discover_related_parity_drives(&member_paths)?;
|
||||
all_member_paths.extend(related_parity_paths);
|
||||
|
||||
// Categorize as data vs parity drives
|
||||
let (data_drives, parity_drives) = match self.categorize_pool_drives(&all_member_paths) {
|
||||
Ok(drives) => drives,
|
||||
Err(e) => {
|
||||
debug!("Failed to categorize drives for pool {}: {}. Skipping.", mount_point, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
pools.push(MergerfsPool {
|
||||
name: pool_name,
|
||||
mount_point,
|
||||
total_bytes,
|
||||
used_bytes,
|
||||
data_drives,
|
||||
parity_drives,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Found {} mergerfs pools", pools.len());
|
||||
Ok(pools)
|
||||
}
|
||||
|
||||
/// Group filesystems by physical drive (excluding mergerfs members)
|
||||
/// Group filesystems by physical drive (excluding mergerfs members) - exact old logic
|
||||
fn group_by_physical_drive(
|
||||
&self,
|
||||
mount_devices: &HashMap<String, String>,
|
||||
@ -216,14 +281,14 @@ impl DiskCollector {
|
||||
) -> anyhow::Result<Vec<PhysicalDrive>> {
|
||||
let mut drive_groups: HashMap<String, Vec<Filesystem>> = HashMap::new();
|
||||
|
||||
// Get all mergerfs member paths to exclude them
|
||||
// Get all mergerfs member paths to exclude them - exactly like old code
|
||||
let mut mergerfs_members = std::collections::HashSet::new();
|
||||
for pool in mergerfs_pools {
|
||||
for drive in &pool.data_drives {
|
||||
mergerfs_members.insert(drive.name.clone());
|
||||
mergerfs_members.insert(drive.mount_point.clone());
|
||||
}
|
||||
for drive in &pool.parity_drives {
|
||||
mergerfs_members.insert(drive.name.clone());
|
||||
mergerfs_members.insert(drive.mount_point.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@ -444,28 +509,23 @@ impl DiskCollector {
|
||||
}
|
||||
|
||||
/// Populate pools data into AgentData
|
||||
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], _smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
for pool in mergerfs_pools {
|
||||
// Calculate pool health based on member drive health
|
||||
let (pool_health, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);
|
||||
|
||||
let pool_data = PoolData {
|
||||
name: pool.name.clone(),
|
||||
mount: pool.mount_point.clone(),
|
||||
pool_type: "mergerfs".to_string(),
|
||||
health: "healthy".to_string(), // TODO: Calculate based on member drives
|
||||
usage_percent: (pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0,
|
||||
pool_type: format!("mergerfs ({}+{})", pool.data_drives.len(), pool.parity_drives.len()),
|
||||
health: pool_health,
|
||||
usage_percent: if pool.total_bytes > 0 {
|
||||
(pool.used_bytes as f32 / pool.total_bytes as f32) * 100.0
|
||||
} else { 0.0 },
|
||||
used_gb: pool.used_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
total_gb: pool.total_bytes as f32 / (1024.0 * 1024.0 * 1024.0),
|
||||
data_drives: pool.data_drives.iter().map(|d| cm_dashboard_shared::PoolDriveData {
|
||||
name: d.name.clone(),
|
||||
temperature_celsius: d.temperature_celsius,
|
||||
health: "unknown".to_string(),
|
||||
wear_percent: None,
|
||||
}).collect(),
|
||||
parity_drives: pool.parity_drives.iter().map(|d| cm_dashboard_shared::PoolDriveData {
|
||||
name: d.name.clone(),
|
||||
temperature_celsius: d.temperature_celsius,
|
||||
health: "unknown".to_string(),
|
||||
wear_percent: None,
|
||||
}).collect(),
|
||||
data_drives: data_drive_data,
|
||||
parity_drives: parity_drive_data,
|
||||
};
|
||||
|
||||
agent_data.system.storage.pools.push(pool_data);
|
||||
@ -474,6 +534,55 @@ impl DiskCollector {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Calculate pool health based on member drive status
|
||||
fn calculate_pool_health(&self, pool: &MergerfsPool, smart_data: &HashMap<String, SmartData>) -> (String, Vec<cm_dashboard_shared::PoolDriveData>, Vec<cm_dashboard_shared::PoolDriveData>) {
|
||||
let mut failed_data = 0;
|
||||
let mut failed_parity = 0;
|
||||
|
||||
// Process data drives
|
||||
let data_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.data_drives.iter().map(|d| {
|
||||
let smart = smart_data.get(&d.name);
|
||||
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
|
||||
|
||||
if health == "FAILED" {
|
||||
failed_data += 1;
|
||||
}
|
||||
|
||||
cm_dashboard_shared::PoolDriveData {
|
||||
name: d.name.clone(),
|
||||
temperature_celsius: smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius),
|
||||
health,
|
||||
wear_percent: smart.and_then(|s| s.wear_percent),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
// Process parity drives
|
||||
let parity_drive_data: Vec<cm_dashboard_shared::PoolDriveData> = pool.parity_drives.iter().map(|d| {
|
||||
let smart = smart_data.get(&d.name);
|
||||
let health = smart.map(|s| s.health.clone()).unwrap_or_else(|| "UNKNOWN".to_string());
|
||||
|
||||
if health == "FAILED" {
|
||||
failed_parity += 1;
|
||||
}
|
||||
|
||||
cm_dashboard_shared::PoolDriveData {
|
||||
name: d.name.clone(),
|
||||
temperature_celsius: smart.and_then(|s| s.temperature_celsius).or(d.temperature_celsius),
|
||||
health,
|
||||
wear_percent: smart.and_then(|s| s.wear_percent),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
// Calculate overall pool health
|
||||
let pool_health = match (failed_data, failed_parity) {
|
||||
(0, 0) => "healthy".to_string(),
|
||||
(1, 0) | (0, 1) => "degraded".to_string(), // One failure is degraded but recoverable
|
||||
_ => "critical".to_string(), // Multiple failures are critical
|
||||
};
|
||||
|
||||
(pool_health, data_drive_data, parity_drive_data)
|
||||
}
|
||||
|
||||
/// Calculate filesystem usage status
|
||||
fn calculate_filesystem_usage_status(&self, usage_percent: f32) -> Status {
|
||||
// Use standard filesystem warning/critical thresholds
|
||||
@ -499,6 +608,134 @@ impl DiskCollector {
|
||||
_ => Status::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
/// Discover parity drives that are related to the given data drives
|
||||
fn discover_related_parity_drives(&self, data_drives: &[String]) -> anyhow::Result<Vec<String>> {
|
||||
let mount_devices = tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
|
||||
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
|
||||
|
||||
let mut related_parity = Vec::new();
|
||||
|
||||
// Find parity drives that share the same parent directory as the data drives
|
||||
for data_path in data_drives {
|
||||
if let Some(parent_dir) = self.get_parent_directory(data_path) {
|
||||
// Look for parity drives in the same parent directory
|
||||
for (mount_point, _device) in &mount_devices {
|
||||
if mount_point.contains("parity") && mount_point.starts_with(&parent_dir) {
|
||||
if !related_parity.contains(mount_point) {
|
||||
related_parity.push(mount_point.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(related_parity)
|
||||
}
|
||||
|
||||
/// Get parent directory of a mount path (e.g., "/mnt/disk1" -> "/mnt")
|
||||
fn get_parent_directory(&self, path: &str) -> Option<String> {
|
||||
if let Some(last_slash) = path.rfind('/') {
|
||||
if last_slash > 0 {
|
||||
return Some(path[..last_slash].to_string());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Categorize pool member drives as data vs parity
|
||||
fn categorize_pool_drives(&self, member_paths: &[String]) -> anyhow::Result<(Vec<PoolDrive>, Vec<PoolDrive>)> {
|
||||
let mut data_drives = Vec::new();
|
||||
let mut parity_drives = Vec::new();
|
||||
|
||||
for path in member_paths {
|
||||
let drive_info = self.get_drive_info_for_path(path)?;
|
||||
|
||||
// Heuristic: if path contains "parity", it's parity
|
||||
if path.to_lowercase().contains("parity") {
|
||||
parity_drives.push(drive_info);
|
||||
} else {
|
||||
data_drives.push(drive_info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((data_drives, parity_drives))
|
||||
}
|
||||
|
||||
/// Get drive information for a mount path
|
||||
fn get_drive_info_for_path(&self, path: &str) -> anyhow::Result<PoolDrive> {
|
||||
// Use lsblk to find the backing device
|
||||
let output = Command::new("lsblk")
|
||||
.args(&["-rn", "-o", "NAME,MOUNTPOINT"])
|
||||
.output()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to run lsblk: {}", e))?;
|
||||
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
let mut device = String::new();
|
||||
|
||||
for line in output_str.lines() {
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if parts.len() >= 2 && parts[1] == path {
|
||||
device = parts[0].to_string();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if device.is_empty() {
|
||||
return Err(anyhow::anyhow!("Could not find device for path {}", path));
|
||||
}
|
||||
|
||||
// Extract base device name (e.g., "sda1" -> "sda")
|
||||
let base_device = self.extract_base_device(&format!("/dev/{}", device));
|
||||
|
||||
// Get temperature from SMART data if available
|
||||
let temperature = if let Ok(smart_data) = tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(self.get_smart_data(&base_device))
|
||||
}) {
|
||||
smart_data.temperature_celsius
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(PoolDrive {
|
||||
name: base_device,
|
||||
mount_point: path.to_string(),
|
||||
temperature_celsius: temperature,
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolve numeric mergerfs references like "1:2" to actual mount paths
|
||||
fn resolve_numeric_mergerfs_paths(&self, numeric_refs: &[String]) -> anyhow::Result<Vec<String>> {
|
||||
let mut resolved_paths = Vec::new();
|
||||
|
||||
// Get all mount points that look like /mnt/disk* or /mnt/parity*
|
||||
let mount_devices = tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(self.get_mount_devices())
|
||||
}).map_err(|e| anyhow::anyhow!("Failed to get mount devices: {}", e))?;
|
||||
|
||||
let mut disk_mounts: Vec<String> = mount_devices.keys()
|
||||
.filter(|path| path.starts_with("/mnt/disk") || path.starts_with("/mnt/parity"))
|
||||
.cloned()
|
||||
.collect();
|
||||
disk_mounts.sort(); // Ensure consistent ordering
|
||||
|
||||
for num_ref in numeric_refs {
|
||||
if let Ok(index) = num_ref.parse::<usize>() {
|
||||
// Convert 1-based index to 0-based
|
||||
if index > 0 && index <= disk_mounts.len() {
|
||||
resolved_paths.push(disk_mounts[index - 1].clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: if we couldn't resolve, return the original paths
|
||||
if resolved_paths.is_empty() {
|
||||
resolved_paths = numeric_refs.to_vec();
|
||||
}
|
||||
|
||||
Ok(resolved_paths)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.150"
|
||||
version = "0.1.151"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.150"
|
||||
version = "0.1.151"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user