Christoffer Martinsson b444c88ea0
All checks were successful
Build and Release / build-and-release (push) Successful in 1m54s
Replace external commands with native Rust APIs
Significant performance improvements by eliminating subprocess spawning:

- Replace 'ip' commands with rtnetlink for network interface discovery
- Replace 'docker ps/images' with bollard Docker API client
- Replace 'systemctl list-units' with zbus D-Bus for systemd interaction
- Replace 'df' with statvfs() syscall for filesystem statistics
- Replace 'lsblk' with /proc/mounts parsing

Add interval-based caching to collectors:
- DiskCollector now respects interval_seconds configuration
- SystemdCollector now respects interval_seconds configuration
- CpuCollector now respects interval_seconds configuration

Remove unused command communication infrastructure:
- Remove port 6131 ZMQ command receiver
- Clean up unused AgentCommand types

Dependencies added:
- rtnetlink = "0.14"
- netlink-packet-route = "0.19"
- bollard = "0.17"
- zbus = "4.0"
- nix (fs features for statvfs)
2025-11-28 11:27:33 +01:00

225 lines
8.2 KiB
Rust

use async_trait::async_trait;
use cm_dashboard_shared::{AgentData, TmpfsData, HysteresisThresholds, Status};
use tracing::debug;
use super::{utils, Collector, CollectorError};
use crate::config::MemoryConfig;
/// Extremely efficient memory metrics collector
///
/// EFFICIENCY OPTIMIZATIONS:
/// - Single /proc/meminfo read for all memory metrics
/// - Minimal string allocations
/// - No process spawning for basic metrics
/// - <0.5ms collection time target
pub struct MemoryCollector {
usage_thresholds: HysteresisThresholds,
}
impl MemoryCollector {
pub fn new(config: MemoryConfig) -> Self {
// Create hysteresis thresholds with 10% gap for recovery
let usage_thresholds = HysteresisThresholds::new(
config.usage_warning_percent,
config.usage_critical_percent,
);
Self {
usage_thresholds,
}
}
/// Parse /proc/meminfo efficiently
/// Format: "MemTotal: 16384000 kB"
async fn parse_meminfo(&self) -> Result<MemoryInfo, CollectorError> {
let content = utils::read_proc_file("/proc/meminfo")?;
let mut info = MemoryInfo::default();
// Parse each line efficiently - only extract what we need
for line in content.lines() {
if let Some(colon_pos) = line.find(':') {
let key = &line[..colon_pos];
let value_part = &line[colon_pos + 1..];
// Extract number from value part (format: " 12345 kB")
if let Some(number_str) = value_part.split_whitespace().next() {
if let Ok(value_kb) = utils::parse_u64(number_str) {
match key {
"MemTotal" => info.total_kb = value_kb,
"MemAvailable" => info.available_kb = value_kb,
"MemFree" => info.free_kb = value_kb,
"Buffers" => info.buffers_kb = value_kb,
"Cached" => info.cached_kb = value_kb,
"SwapTotal" => info.swap_total_kb = value_kb,
"SwapFree" => info.swap_free_kb = value_kb,
_ => {} // Skip other fields for efficiency
}
}
}
}
}
// Validate that we got essential fields
if info.total_kb == 0 {
return Err(CollectorError::Parse {
value: "MemTotal".to_string(),
error: "MemTotal not found or zero in /proc/meminfo".to_string(),
});
}
// If MemAvailable is not available (older kernels), calculate it
if info.available_kb == 0 {
info.available_kb = info.free_kb + info.buffers_kb + info.cached_kb;
}
Ok(info)
}
/// Populate memory data directly into AgentData
async fn populate_memory_data(&self, info: &MemoryInfo, agent_data: &mut AgentData) -> Result<(), CollectorError> {
// Calculate derived values
let available = info.available_kb;
let used = info.total_kb - available;
let usage_percent = (used as f32 / info.total_kb as f32) * 100.0;
// Populate basic memory fields
agent_data.system.memory.usage_percent = usage_percent;
agent_data.system.memory.total_gb = info.total_kb as f32 / (1024.0 * 1024.0);
agent_data.system.memory.used_gb = used as f32 / (1024.0 * 1024.0);
// Populate swap data if available
agent_data.system.memory.swap_total_gb = info.swap_total_kb as f32 / (1024.0 * 1024.0);
agent_data.system.memory.swap_used_gb = (info.swap_total_kb - info.swap_free_kb) as f32 / (1024.0 * 1024.0);
Ok(())
}
/// Populate tmpfs data into AgentData using statvfs syscall
async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
use nix::sys::statvfs::statvfs;
// Discover all tmpfs mount points
let tmpfs_mounts = self.discover_tmpfs_mounts()?;
if tmpfs_mounts.is_empty() {
debug!("No tmpfs mounts found to monitor");
return Ok(());
}
// Get usage data for each tmpfs mount using statvfs syscall
for mount_point in tmpfs_mounts {
match statvfs(mount_point.as_str()) {
Ok(stat) => {
let block_size = stat.fragment_size() as u64;
let total_bytes = stat.blocks() as u64 * block_size;
let available_bytes = stat.blocks_available() as u64 * block_size;
let used_bytes = total_bytes - available_bytes;
if total_bytes == 0 {
continue;
}
let total_gb = total_bytes as f32 / (1024.0 * 1024.0 * 1024.0);
let used_gb = used_bytes as f32 / (1024.0 * 1024.0 * 1024.0);
let usage_percent = (used_bytes as f32 / total_bytes as f32) * 100.0;
// Add to tmpfs list
agent_data.system.memory.tmpfs.push(TmpfsData {
mount: mount_point.clone(),
usage_percent,
used_gb,
total_gb,
});
}
Err(e) => {
debug!("Failed to get stats for tmpfs mount {}: {}", mount_point, e);
}
}
}
// Sort tmpfs mounts by mount point for consistent display order
agent_data.system.memory.tmpfs.sort_by(|a, b| a.mount.cmp(&b.mount));
Ok(())
}
/// Discover all tmpfs mount points from /proc/mounts
fn discover_tmpfs_mounts(&self) -> Result<Vec<String>, CollectorError> {
let content = utils::read_proc_file("/proc/mounts")?;
let mut tmpfs_mounts = Vec::new();
for line in content.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 3 && fields[2] == "tmpfs" {
let mount_point = fields[1];
// Filter out system/internal tmpfs mounts that aren't useful for monitoring
if self.should_monitor_tmpfs(mount_point) {
tmpfs_mounts.push(mount_point.to_string());
}
}
}
debug!("Discovered {} tmpfs mounts: {:?}", tmpfs_mounts.len(), tmpfs_mounts);
Ok(tmpfs_mounts)
}
/// Determine if a tmpfs mount point should be monitored
fn should_monitor_tmpfs(&self, mount_point: &str) -> bool {
// Include commonly useful tmpfs mounts
matches!(mount_point,
"/tmp" | "/var/tmp" | "/dev/shm" | "/run" | "/var/log"
) || mount_point.starts_with("/run/user/") // User session tmpfs
}
/// Calculate memory usage status based on thresholds
fn calculate_memory_status(&self, usage_percent: f32) -> Status {
self.usage_thresholds.evaluate(usage_percent)
}
}
#[async_trait]
impl Collector for MemoryCollector {
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
debug!("Collecting memory metrics");
let start = std::time::Instant::now();
// Parse memory info from /proc/meminfo
let info = self.parse_meminfo().await?;
// Populate memory data directly
self.populate_memory_data(&info, agent_data).await?;
// Collect tmpfs data
self.populate_tmpfs_data(agent_data).await?;
let duration = start.elapsed();
debug!("Memory collection completed in {:?}", duration);
// Efficiency check: warn if collection takes too long
if duration.as_millis() > 1 {
debug!(
"Memory collection took {}ms - consider optimization",
duration.as_millis()
);
}
// Calculate status using thresholds
agent_data.system.memory.usage_status = self.calculate_memory_status(agent_data.system.memory.usage_percent);
Ok(())
}
}
/// Internal structure for parsing /proc/meminfo
#[derive(Default)]
struct MemoryInfo {
total_kb: u64,
available_kb: u64,
free_kb: u64,
buffers_kb: u64,
cached_kb: u64,
swap_total_kb: u64,
swap_free_kb: u64,
}