Files
cm-dashboard/agent/src/collectors/memory.rs
Christoffer Martinsson a7b69b8ae7
All checks were successful
Build and Release / build-and-release (push) Successful in 1m19s
Fix duplicate data by clearing vectors before collection
Collectors now clear their target vectors (tmpfs, drives, pools, services)
before populating to prevent duplicates when updating cached AgentData.

- Clear tmpfs list in memory collector
- Clear drives and pools in disk collector
- Clear services in systemd collector
- Bump version to v0.1.231
2025-12-01 13:21:26 +01:00

243 lines
8.8 KiB
Rust

use async_trait::async_trait;
use cm_dashboard_shared::{AgentData, TmpfsData, HysteresisThresholds, Status};
use tracing::debug;
use super::{utils, Collector, CollectorError};
use crate::config::MemoryConfig;
/// Extremely efficient memory metrics collector
///
/// EFFICIENCY OPTIMIZATIONS:
/// - Single /proc/meminfo read for all memory metrics
/// - Minimal string allocations
/// - No process spawning for basic metrics
/// - <0.5ms collection time target
pub struct MemoryCollector {
usage_thresholds: HysteresisThresholds,
}
impl MemoryCollector {
pub fn new(config: MemoryConfig) -> Self {
// Create hysteresis thresholds with 10% gap for recovery
let usage_thresholds = HysteresisThresholds::new(
config.usage_warning_percent,
config.usage_critical_percent,
);
Self {
usage_thresholds,
}
}
/// Parse /proc/meminfo efficiently
/// Format: "MemTotal: 16384000 kB"
async fn parse_meminfo(&self) -> Result<MemoryInfo, CollectorError> {
let content = utils::read_proc_file("/proc/meminfo")?;
let mut info = MemoryInfo::default();
// Parse each line efficiently - only extract what we need
for line in content.lines() {
if let Some(colon_pos) = line.find(':') {
let key = &line[..colon_pos];
let value_part = &line[colon_pos + 1..];
// Extract number from value part (format: " 12345 kB")
if let Some(number_str) = value_part.split_whitespace().next() {
if let Ok(value_kb) = utils::parse_u64(number_str) {
match key {
"MemTotal" => info.total_kb = value_kb,
"MemAvailable" => info.available_kb = value_kb,
"MemFree" => info.free_kb = value_kb,
"Buffers" => info.buffers_kb = value_kb,
"Cached" => info.cached_kb = value_kb,
"SwapTotal" => info.swap_total_kb = value_kb,
"SwapFree" => info.swap_free_kb = value_kb,
_ => {} // Skip other fields for efficiency
}
}
}
}
}
// Validate that we got essential fields
if info.total_kb == 0 {
return Err(CollectorError::Parse {
value: "MemTotal".to_string(),
error: "MemTotal not found or zero in /proc/meminfo".to_string(),
});
}
// If MemAvailable is not available (older kernels), calculate it
if info.available_kb == 0 {
info.available_kb = info.free_kb + info.buffers_kb + info.cached_kb;
}
Ok(info)
}
/// Populate memory data directly into AgentData
async fn populate_memory_data(&self, info: &MemoryInfo, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Calculate derived values
let available = info.available_kb;
let used = info.total_kb - available;
let usage_percent = (used as f32 / info.total_kb as f32) * 100.0;
// Populate basic memory fields
agent_data.system.memory.usage_percent = usage_percent;
agent_data.system.memory.total_gb = info.total_kb as f32 / (1024.0 * 1024.0);
agent_data.system.memory.used_gb = used as f32 / (1024.0 * 1024.0);
// Populate swap data if available
agent_data.system.memory.swap_total_gb = info.swap_total_kb as f32 / (1024.0 * 1024.0);
agent_data.system.memory.swap_used_gb = (info.swap_total_kb - info.swap_free_kb) as f32 / (1024.0 * 1024.0);
Ok(())
}
/// Populate tmpfs data into AgentData
async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Discover all tmpfs mount points
let tmpfs_mounts = self.discover_tmpfs_mounts()?;
if tmpfs_mounts.is_empty() {
debug!("No tmpfs mounts found to monitor");
return Ok(());
}
// Get usage data for all tmpfs mounts at once using df (with 2 second timeout)
let mut df_args = vec!["2", "df", "--output=target,size,used", "--block-size=1"];
df_args.extend(tmpfs_mounts.iter().map(|s| s.as_str()));
let df_output = std::process::Command::new("timeout")
.args(&df_args[..])
.output()
.map_err(|e| CollectorError::SystemRead {
path: "tmpfs mounts".to_string(),
error: e.to_string(),
})?;
let df_str = String::from_utf8_lossy(&df_output.stdout);
let df_lines: Vec<&str> = df_str.lines().skip(1).collect(); // Skip header
// Process each tmpfs mount
for (i, mount_point) in tmpfs_mounts.iter().enumerate() {
if i >= df_lines.len() {
debug!("Not enough df output lines for tmpfs mount: {}", mount_point);
continue;
}
let parts: Vec<&str> = df_lines[i].split_whitespace().collect();
if parts.len() < 3 {
debug!("Invalid df output for tmpfs mount: {}", mount_point);
continue;
}
let total_bytes: u64 = parts[1].parse().unwrap_or(0);
let used_bytes: u64 = parts[2].parse().unwrap_or(0);
if total_bytes == 0 {
continue;
}
let total_gb = total_bytes as f32 / (1024.0 * 1024.0 * 1024.0);
let used_gb = used_bytes as f32 / (1024.0 * 1024.0 * 1024.0);
let usage_percent = (used_bytes as f32 / total_bytes as f32) * 100.0;
// Add to tmpfs list
agent_data.system.memory.tmpfs.push(TmpfsData {
mount: mount_point.clone(),
usage_percent,
used_gb,
total_gb,
});
}
// Sort tmpfs mounts by mount point for consistent display order
agent_data.system.memory.tmpfs.sort_by(|a, b| a.mount.cmp(&b.mount));
Ok(())
}
/// Discover all tmpfs mount points from /proc/mounts
fn discover_tmpfs_mounts(&self) -> Result<Vec<String>, CollectorError> {
let content = utils::read_proc_file("/proc/mounts")?;
let mut tmpfs_mounts = Vec::new();
for line in content.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 3 && fields[2] == "tmpfs" {
let mount_point = fields[1];
// Filter out system/internal tmpfs mounts that aren't useful for monitoring
if self.should_monitor_tmpfs(mount_point) {
tmpfs_mounts.push(mount_point.to_string());
}
}
}
debug!("Discovered {} tmpfs mounts: {:?}", tmpfs_mounts.len(), tmpfs_mounts);
Ok(tmpfs_mounts)
}
/// Determine if a tmpfs mount point should be monitored
fn should_monitor_tmpfs(&self, mount_point: &str) -> bool {
// Include commonly useful tmpfs mounts
matches!(mount_point,
"/tmp" | "/var/tmp" | "/dev/shm" | "/run" | "/var/log"
) || mount_point.starts_with("/run/user/") // User session tmpfs
}
/// Calculate memory usage status based on thresholds
fn calculate_memory_status(&self, usage_percent: f32) -> Status {
self.usage_thresholds.evaluate(usage_percent)
}
}
#[async_trait]
impl Collector for MemoryCollector {
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
debug!("Collecting memory metrics");
let start = std::time::Instant::now();
// Clear tmpfs list to prevent duplicates when updating cached data
agent_data.system.memory.tmpfs.clear();
// Parse memory info from /proc/meminfo
let info = self.parse_meminfo().await?;
// Populate memory data directly
self.populate_memory_data(&info, agent_data).await?;
// Collect tmpfs data
self.populate_tmpfs_data(agent_data).await?;
let duration = start.elapsed();
debug!("Memory collection completed in {:?}", duration);
// Efficiency check: warn if collection takes too long
if duration.as_millis() > 1 {
debug!(
"Memory collection took {}ms - consider optimization",
duration.as_millis()
);
}
// Calculate status using thresholds
agent_data.system.memory.usage_status = self.calculate_memory_status(agent_data.system.memory.usage_percent);
Ok(())
}
}
/// Internal structure for parsing /proc/meminfo
#[derive(Default)]
struct MemoryInfo {
total_kb: u64,
available_kb: u64,
free_kb: u64,
buffers_kb: u64,
cached_kb: u64,
swap_total_kb: u64,
swap_free_kb: u64,
}