use async_trait::async_trait; use cm_dashboard_shared::{AgentData, TmpfsData, HysteresisThresholds, Status}; use tracing::debug; use super::{utils, Collector, CollectorError}; use crate::config::MemoryConfig; /// Extremely efficient memory metrics collector /// /// EFFICIENCY OPTIMIZATIONS: /// - Single /proc/meminfo read for all memory metrics /// - Minimal string allocations /// - No process spawning for basic metrics /// - <0.5ms collection time target pub struct MemoryCollector { usage_thresholds: HysteresisThresholds, } impl MemoryCollector { pub fn new(config: MemoryConfig) -> Self { // Create hysteresis thresholds with 10% gap for recovery let usage_thresholds = HysteresisThresholds::new( config.usage_warning_percent, config.usage_critical_percent, ); Self { usage_thresholds, } } /// Parse /proc/meminfo efficiently /// Format: "MemTotal: 16384000 kB" async fn parse_meminfo(&self) -> Result { let content = utils::read_proc_file("/proc/meminfo")?; let mut info = MemoryInfo::default(); // Parse each line efficiently - only extract what we need for line in content.lines() { if let Some(colon_pos) = line.find(':') { let key = &line[..colon_pos]; let value_part = &line[colon_pos + 1..]; // Extract number from value part (format: " 12345 kB") if let Some(number_str) = value_part.split_whitespace().next() { if let Ok(value_kb) = utils::parse_u64(number_str) { match key { "MemTotal" => info.total_kb = value_kb, "MemAvailable" => info.available_kb = value_kb, "MemFree" => info.free_kb = value_kb, "Buffers" => info.buffers_kb = value_kb, "Cached" => info.cached_kb = value_kb, "SwapTotal" => info.swap_total_kb = value_kb, "SwapFree" => info.swap_free_kb = value_kb, _ => {} // Skip other fields for efficiency } } } } } // Validate that we got essential fields if info.total_kb == 0 { return Err(CollectorError::Parse { value: "MemTotal".to_string(), error: "MemTotal not found or zero in /proc/meminfo".to_string(), }); } // If MemAvailable is not available (older kernels), calculate it if info.available_kb == 0 { info.available_kb = info.free_kb + info.buffers_kb + info.cached_kb; } Ok(info) } /// Populate memory data directly into AgentData async fn populate_memory_data(&self, info: &MemoryInfo, agent_data: &mut AgentData) -> Result<(), CollectorError> { // Calculate derived values let available = info.available_kb; let used = info.total_kb - available; let usage_percent = (used as f32 / info.total_kb as f32) * 100.0; // Populate basic memory fields agent_data.system.memory.usage_percent = usage_percent; agent_data.system.memory.total_gb = info.total_kb as f32 / (1024.0 * 1024.0); agent_data.system.memory.used_gb = used as f32 / (1024.0 * 1024.0); // Populate swap data if available agent_data.system.memory.swap_total_gb = info.swap_total_kb as f32 / (1024.0 * 1024.0); agent_data.system.memory.swap_used_gb = (info.swap_total_kb - info.swap_free_kb) as f32 / (1024.0 * 1024.0); Ok(()) } /// Populate tmpfs data into AgentData async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { // Discover all tmpfs mount points let tmpfs_mounts = self.discover_tmpfs_mounts()?; if tmpfs_mounts.is_empty() { debug!("No tmpfs mounts found to monitor"); return Ok(()); } // Get usage data for all tmpfs mounts at once using df (with 2 second timeout) let mut df_args = vec!["2", "df", "--output=target,size,used", "--block-size=1"]; df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str())); let df_output = std::process::Command::new("timeout") .args(&df_args[..]) .output() .map_err(|e| CollectorError::SystemRead { path: "tmpfs mounts".to_string(), error: e.to_string(), })?; let df_str = String::from_utf8_lossy(&df_output.stdout); let df_lines: Vec<&str> = df_str.lines().skip(1).collect(); // Skip header // Process each tmpfs mount for (i, mount_point) in tmpfs_mounts.iter().enumerate() { if i >= df_lines.len() { debug!("Not enough df output lines for tmpfs mount: {}", mount_point); continue; } let parts: Vec<&str> = df_lines[i].split_whitespace().collect(); if parts.len() < 3 { debug!("Invalid df output for tmpfs mount: {}", mount_point); continue; } let total_bytes: u64 = parts[1].parse().unwrap_or(0); let used_bytes: u64 = parts[2].parse().unwrap_or(0); if total_bytes == 0 { continue; } let total_gb = total_bytes as f32 / (1024.0 * 1024.0 * 1024.0); let used_gb = used_bytes as f32 / (1024.0 * 1024.0 * 1024.0); let usage_percent = (used_bytes as f32 / total_bytes as f32) * 100.0; // Add to tmpfs list agent_data.system.memory.tmpfs.push(TmpfsData { mount: mount_point.clone(), usage_percent, used_gb, total_gb, }); } // Sort tmpfs mounts by mount point for consistent display order agent_data.system.memory.tmpfs.sort_by(|a, b| a.mount.cmp(&b.mount)); Ok(()) } /// Discover all tmpfs mount points from /proc/mounts fn discover_tmpfs_mounts(&self) -> Result, CollectorError> { let content = utils::read_proc_file("/proc/mounts")?; let mut tmpfs_mounts = Vec::new(); for line in content.lines() { let fields: Vec<&str> = line.split_whitespace().collect(); if fields.len() >= 3 && fields[2] == "tmpfs" { let mount_point = fields[1]; // Filter out system/internal tmpfs mounts that aren't useful for monitoring if self.should_monitor_tmpfs(mount_point) { tmpfs_mounts.push(mount_point.to_string()); } } } debug!("Discovered {} tmpfs mounts: {:?}", tmpfs_mounts.len(), tmpfs_mounts); Ok(tmpfs_mounts) } /// Determine if a tmpfs mount point should be monitored fn should_monitor_tmpfs(&self, mount_point: &str) -> bool { // Include commonly useful tmpfs mounts matches!(mount_point, "/tmp" | "/var/tmp" | "/dev/shm" | "/run" | "/var/log" ) || mount_point.starts_with("/run/user/") // User session tmpfs } /// Calculate memory usage status based on thresholds fn calculate_memory_status(&self, usage_percent: f32) -> Status { self.usage_thresholds.evaluate(usage_percent) } } #[async_trait] impl Collector for MemoryCollector { async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> { debug!("Collecting memory metrics"); let start = std::time::Instant::now(); // Parse memory info from /proc/meminfo let info = self.parse_meminfo().await?; // Populate memory data directly self.populate_memory_data(&info, agent_data).await?; // Collect tmpfs data self.populate_tmpfs_data(agent_data).await?; let duration = start.elapsed(); debug!("Memory collection completed in {:?}", duration); // Efficiency check: warn if collection takes too long if duration.as_millis() > 1 { debug!( "Memory collection took {}ms - consider optimization", duration.as_millis() ); } // Calculate status using thresholds agent_data.system.memory.usage_status = self.calculate_memory_status(agent_data.system.memory.usage_percent); Ok(()) } } /// Internal structure for parsing /proc/meminfo #[derive(Default)] struct MemoryInfo { total_kb: u64, available_kb: u64, free_kb: u64, buffers_kb: u64, cached_kb: u64, swap_total_kb: u64, swap_free_kb: u64, }