Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c6817537a8 | |||
| 2189d34b16 | |||
| 28cfd5758f | |||
| 5deb8cf8d8 | |||
| 0e01813ff5 | |||
| c3c9507a42 | |||
| 4d77ffe17e | |||
| 14f74b4cac | |||
| 67b686f8c7 | |||
| e3996fdb84 | |||
| f94ca60e69 | |||
| c19ff56df8 | |||
| fe2f604703 | |||
| 8bfd416327 | |||
| 85c6c624fb | |||
| eab3f17428 | |||
| 7ad149bbe4 | |||
| b444c88ea0 | |||
| 317cf76bd1 | |||
| 0db1a165b9 | |||
| 3c2955376d | |||
| f09ccabc7f | |||
| 43dd5a901a | |||
| 01e1f33b66 |
80
CLAUDE.md
80
CLAUDE.md
@@ -156,86 +156,6 @@ Complete migration from string-based metrics to structured JSON data. Eliminates
|
||||
- ✅ Backward compatibility via bridge conversion to existing UI widgets
|
||||
- ✅ All string parsing bugs eliminated
|
||||
|
||||
### Cached Collector Architecture (✅ IMPLEMENTED)
|
||||
|
||||
**Problem:** Blocking collectors prevent timely ZMQ transmission, causing false "host offline" alerts.
|
||||
|
||||
**Previous (Sequential Blocking):**
|
||||
```
|
||||
Every 1 second:
|
||||
└─ collect_all_data() [BLOCKS for 2-10+ seconds]
|
||||
├─ CPU (fast: 10ms)
|
||||
├─ Memory (fast: 20ms)
|
||||
├─ Disk SMART (slow: 3s per drive × 4 drives = 12s)
|
||||
├─ Service disk usage (slow: 2-8s per service)
|
||||
└─ Docker (medium: 500ms)
|
||||
└─ send_via_zmq() [Only after ALL collection completes]
|
||||
|
||||
Result: If any collector takes >10s → "host offline" false alert
|
||||
```
|
||||
|
||||
**New (Cached Independent Collectors):**
|
||||
```
|
||||
Shared Cache: Arc<RwLock<AgentData>>
|
||||
|
||||
Background Collectors (independent async tasks):
|
||||
├─ Fast collectors (CPU, RAM, Network)
|
||||
│ └─ Update cache every 1 second
|
||||
├─ Medium collectors (Services, Docker)
|
||||
│ └─ Update cache every 5 seconds
|
||||
└─ Slow collectors (Disk usage, SMART data)
|
||||
└─ Update cache every 60 seconds
|
||||
|
||||
ZMQ Sender (separate async task):
|
||||
Every 1 second:
|
||||
└─ Read current cache
|
||||
└─ Send via ZMQ [Always instant, never blocked]
|
||||
```
|
||||
|
||||
**Benefits:**
|
||||
- ✅ ZMQ sends every 1 second regardless of collector speed
|
||||
- ✅ No false "host offline" alerts from slow collectors
|
||||
- ✅ Different update rates for different metrics (CPU=1s, SMART=60s)
|
||||
- ✅ System stays responsive even with slow operations
|
||||
- ✅ Slow collectors can use longer timeouts without blocking
|
||||
|
||||
**Implementation Details:**
|
||||
- **Shared cache**: `Arc<RwLock<AgentData>>` initialized at agent startup
|
||||
- **Collector intervals**: Fully configurable via NixOS config (`interval_seconds` per collector)
|
||||
- Recommended: Fast (1-10s): CPU, Memory, Network
|
||||
- Recommended: Medium (30-60s): Backup, NixOS
|
||||
- Recommended: Slow (60-300s): Disk, Systemd
|
||||
- **Independent tasks**: Each collector spawned as separate tokio task in `Agent::new()`
|
||||
- **Cache updates**: Collectors acquire write lock → update → release immediately
|
||||
- **ZMQ sender**: Main loop reads cache every `collection_interval_seconds` and broadcasts
|
||||
- **Notification check**: Runs every `notifications.check_interval_seconds`
|
||||
- **Lock strategy**: Short-lived write locks prevent blocking, read locks for transmission
|
||||
- **Stale data**: Acceptable for slow-changing metrics (SMART data, disk usage)
|
||||
|
||||
**Configuration (NixOS):**
|
||||
All intervals and timeouts configurable in `services/cm-dashboard.nix`:
|
||||
|
||||
Collection Intervals:
|
||||
- `collectors.cpu.interval_seconds` (default: 10s)
|
||||
- `collectors.memory.interval_seconds` (default: 2s)
|
||||
- `collectors.disk.interval_seconds` (default: 300s)
|
||||
- `collectors.systemd.interval_seconds` (default: 10s)
|
||||
- `collectors.backup.interval_seconds` (default: 60s)
|
||||
- `collectors.network.interval_seconds` (default: 10s)
|
||||
- `collectors.nixos.interval_seconds` (default: 60s)
|
||||
- `notifications.check_interval_seconds` (default: 30s)
|
||||
- `collection_interval_seconds` - ZMQ transmission rate (default: 2s)
|
||||
|
||||
Command Timeouts (prevent resource leaks from hung commands):
|
||||
- `collectors.disk.command_timeout_seconds` (default: 30s) - lsblk, smartctl, etc.
|
||||
- `collectors.systemd.command_timeout_seconds` (default: 15s) - systemctl, docker, du
|
||||
- `collectors.network.command_timeout_seconds` (default: 10s) - ip route, ip addr
|
||||
|
||||
**Code Locations:**
|
||||
- agent/src/agent.rs:59-133 - Collector task spawning
|
||||
- agent/src/agent.rs:151-179 - Independent collector task runner
|
||||
- agent/src/agent.rs:199-207 - ZMQ sender in main loop
|
||||
|
||||
### Maintenance Mode
|
||||
|
||||
- Agent checks for `/tmp/cm-maintenance` file before sending notifications
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.193"
|
||||
version = "0.1.206"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
@@ -301,7 +301,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.193"
|
||||
version = "0.1.207"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -324,7 +324,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.193"
|
||||
version = "0.1.207"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"serde",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-agent"
|
||||
version = "0.1.194"
|
||||
version = "0.1.208"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
use anyhow::Result;
|
||||
use gethostname::gethostname;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::interval;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::communication::{AgentCommand, ZmqHandler};
|
||||
use crate::communication::ZmqHandler;
|
||||
use crate::config::AgentConfig;
|
||||
use crate::collectors::{
|
||||
Collector,
|
||||
backup::BackupCollector,
|
||||
cpu::CpuCollector,
|
||||
disk::DiskCollector,
|
||||
@@ -24,7 +23,7 @@ pub struct Agent {
|
||||
hostname: String,
|
||||
config: AgentConfig,
|
||||
zmq_handler: ZmqHandler,
|
||||
cache: Arc<RwLock<AgentData>>,
|
||||
collectors: Vec<Box<dyn Collector>>,
|
||||
notification_manager: NotificationManager,
|
||||
previous_status: Option<SystemStatus>,
|
||||
}
|
||||
@@ -56,94 +55,39 @@ impl Agent {
|
||||
config.zmq.publisher_port
|
||||
);
|
||||
|
||||
// Initialize shared cache
|
||||
let cache = Arc::new(RwLock::new(AgentData::new(
|
||||
hostname.clone(),
|
||||
env!("CARGO_PKG_VERSION").to_string()
|
||||
)));
|
||||
info!("Initialized shared agent data cache");
|
||||
|
||||
// Spawn independent collector tasks
|
||||
let mut collector_count = 0;
|
||||
|
||||
// CPU collector
|
||||
// Initialize collectors
|
||||
let mut collectors: Vec<Box<dyn Collector>> = Vec::new();
|
||||
|
||||
// Add enabled collectors
|
||||
if config.collectors.cpu.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = CpuCollector::new(config.collectors.cpu.clone());
|
||||
let interval = config.collectors.cpu.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "CPU").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone())));
|
||||
}
|
||||
|
||||
// Memory collector
|
||||
|
||||
if config.collectors.memory.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = MemoryCollector::new(config.collectors.memory.clone());
|
||||
let interval = config.collectors.memory.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Memory").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone())));
|
||||
}
|
||||
|
||||
// Network collector
|
||||
if config.collectors.network.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = NetworkCollector::new(config.collectors.network.clone());
|
||||
let interval = config.collectors.network.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Network").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
}
|
||||
|
||||
// Backup collector
|
||||
if config.collectors.backup.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = BackupCollector::new();
|
||||
let interval = config.collectors.backup.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Backup").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
}
|
||||
|
||||
// NixOS collector
|
||||
if config.collectors.nixos.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = NixOSCollector::new(config.collectors.nixos.clone());
|
||||
let interval = config.collectors.nixos.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "NixOS").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
}
|
||||
|
||||
// Disk collector
|
||||
|
||||
if config.collectors.disk.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = DiskCollector::new(config.collectors.disk.clone());
|
||||
let interval = config.collectors.disk.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Disk").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone())));
|
||||
}
|
||||
|
||||
// Systemd collector
|
||||
|
||||
if config.collectors.systemd.enabled {
|
||||
let cache_clone = cache.clone();
|
||||
let collector = SystemdCollector::new(config.collectors.systemd.clone());
|
||||
let interval = config.collectors.systemd.interval_seconds;
|
||||
tokio::spawn(async move {
|
||||
Self::run_collector_task(cache_clone, collector, Duration::from_secs(interval), "Systemd").await;
|
||||
});
|
||||
collector_count += 1;
|
||||
collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone())));
|
||||
}
|
||||
|
||||
if config.collectors.backup.enabled {
|
||||
collectors.push(Box::new(BackupCollector::new()));
|
||||
}
|
||||
|
||||
info!("Spawned {} independent collector tasks", collector_count);
|
||||
if config.collectors.network.enabled {
|
||||
collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone())));
|
||||
}
|
||||
|
||||
if config.collectors.nixos.enabled {
|
||||
collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone())));
|
||||
}
|
||||
|
||||
info!("Initialized {} collectors", collectors.len());
|
||||
|
||||
// Initialize notification manager
|
||||
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
|
||||
@@ -153,82 +97,42 @@ impl Agent {
|
||||
hostname,
|
||||
config,
|
||||
zmq_handler,
|
||||
cache,
|
||||
collectors,
|
||||
notification_manager,
|
||||
previous_status: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Independent collector task runner
|
||||
async fn run_collector_task<C>(
|
||||
cache: Arc<RwLock<AgentData>>,
|
||||
collector: C,
|
||||
interval_duration: Duration,
|
||||
name: &str,
|
||||
) where
|
||||
C: crate::collectors::Collector + Send + 'static,
|
||||
{
|
||||
let mut interval_timer = interval(interval_duration);
|
||||
info!("{} collector task started (interval: {:?})", name, interval_duration);
|
||||
|
||||
loop {
|
||||
interval_timer.tick().await;
|
||||
|
||||
// Acquire write lock and update cache
|
||||
{
|
||||
let mut agent_data = cache.write().await;
|
||||
match collector.collect_structured(&mut *agent_data).await {
|
||||
Ok(_) => {
|
||||
debug!("{} collector updated cache", name);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{} collector failed: {}", name, e);
|
||||
}
|
||||
}
|
||||
} // Release lock immediately after collection
|
||||
}
|
||||
}
|
||||
|
||||
/// Main agent loop with cached data architecture
|
||||
/// Main agent loop with structured data collection
|
||||
pub async fn run(&mut self, mut shutdown_rx: tokio::sync::oneshot::Receiver<()>) -> Result<()> {
|
||||
info!("Starting agent main loop with cached collector architecture");
|
||||
info!("Starting agent main loop");
|
||||
|
||||
// Set up intervals from config
|
||||
// Initial collection
|
||||
if let Err(e) = self.collect_and_broadcast().await {
|
||||
error!("Initial metric collection failed: {}", e);
|
||||
}
|
||||
|
||||
// Set up intervals
|
||||
let mut transmission_interval = interval(Duration::from_secs(
|
||||
self.config.collection_interval_seconds,
|
||||
));
|
||||
let mut notification_interval = interval(Duration::from_secs(
|
||||
self.config.notifications.check_interval_seconds,
|
||||
));
|
||||
let mut command_interval = interval(Duration::from_millis(100));
|
||||
let mut notification_interval = interval(Duration::from_secs(30)); // Check notifications every 30s
|
||||
|
||||
// Skip initial ticks
|
||||
// Skip initial ticks to avoid immediate execution
|
||||
transmission_interval.tick().await;
|
||||
notification_interval.tick().await;
|
||||
command_interval.tick().await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = transmission_interval.tick() => {
|
||||
// Read current cache state and broadcast via ZMQ
|
||||
let agent_data = self.cache.read().await.clone();
|
||||
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
|
||||
error!("Failed to broadcast agent data: {}", e);
|
||||
} else {
|
||||
debug!("Successfully broadcast agent data");
|
||||
if let Err(e) = self.collect_and_broadcast().await {
|
||||
error!("Failed to collect and broadcast metrics: {}", e);
|
||||
}
|
||||
}
|
||||
_ = notification_interval.tick() => {
|
||||
// Read cache and check for status changes
|
||||
let agent_data = self.cache.read().await.clone();
|
||||
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
|
||||
error!("Failed to check status changes: {}", e);
|
||||
}
|
||||
}
|
||||
_ = command_interval.tick() => {
|
||||
if let Err(e) = self.handle_commands().await {
|
||||
error!("Error handling commands: {}", e);
|
||||
}
|
||||
// Process any pending notifications
|
||||
// NOTE: With structured data, we might need to implement status tracking differently
|
||||
// For now, we skip this until status evaluation is migrated
|
||||
}
|
||||
_ = &mut shutdown_rx => {
|
||||
info!("Shutdown signal received, stopping agent loop");
|
||||
@@ -241,6 +145,35 @@ impl Agent {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collect structured data from all collectors and broadcast via ZMQ
|
||||
async fn collect_and_broadcast(&mut self) -> Result<()> {
|
||||
debug!("Starting structured data collection");
|
||||
|
||||
// Initialize empty AgentData
|
||||
let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
|
||||
|
||||
// Collect data from all collectors
|
||||
for collector in &self.collectors {
|
||||
if let Err(e) = collector.collect_structured(&mut agent_data).await {
|
||||
error!("Collector failed: {}", e);
|
||||
// Continue with other collectors even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
// Check for status changes and send notifications
|
||||
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
|
||||
error!("Failed to check status changes: {}", e);
|
||||
}
|
||||
|
||||
// Broadcast the structured data via ZMQ
|
||||
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
|
||||
error!("Failed to broadcast agent data: {}", e);
|
||||
} else {
|
||||
debug!("Successfully broadcast structured agent data");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check for status changes and send notifications
|
||||
async fn check_status_changes_and_notify(&mut self, agent_data: &AgentData) -> Result<()> {
|
||||
@@ -320,39 +253,4 @@ impl Agent {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle incoming commands from dashboard
|
||||
async fn handle_commands(&mut self) -> Result<()> {
|
||||
// Try to receive a command (non-blocking)
|
||||
if let Ok(Some(command)) = self.zmq_handler.try_receive_command() {
|
||||
info!("Received command: {:?}", command);
|
||||
|
||||
match command {
|
||||
AgentCommand::CollectNow => {
|
||||
info!("Received immediate transmission request");
|
||||
// With cached architecture, collectors run independently
|
||||
// Just send current cache state immediately
|
||||
let agent_data = self.cache.read().await.clone();
|
||||
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
|
||||
error!("Failed to broadcast on demand: {}", e);
|
||||
}
|
||||
}
|
||||
AgentCommand::SetInterval { seconds } => {
|
||||
info!("Received interval change request: {}s", seconds);
|
||||
// Note: This would require more complex handling to update the interval
|
||||
// For now, just acknowledge
|
||||
}
|
||||
AgentCommand::ToggleCollector { name, enabled } => {
|
||||
info!("Received collector toggle request: {} -> {}", name, enabled);
|
||||
// Note: This would require more complex handling to enable/disable collectors
|
||||
// For now, just acknowledge
|
||||
}
|
||||
AgentCommand::Ping => {
|
||||
info!("Received ping command");
|
||||
// Maybe send back a pong or status
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
}
|
||||
@@ -119,36 +119,40 @@ impl CpuCollector {
|
||||
utils::parse_u64(content.trim())
|
||||
}
|
||||
|
||||
/// Collect CPU frequency and populate AgentData
|
||||
async fn collect_frequency(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Try scaling frequency first (more accurate for current frequency)
|
||||
if let Ok(freq) =
|
||||
utils::read_proc_file("/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq")
|
||||
{
|
||||
if let Ok(freq_khz) = utils::parse_u64(freq.trim()) {
|
||||
let freq_mhz = freq_khz as f32 / 1000.0;
|
||||
agent_data.system.cpu.frequency_mhz = freq_mhz;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
/// Collect CPU C-state (idle depth) and populate AgentData
|
||||
async fn collect_cstate(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Read C-state usage from first CPU (representative of overall system)
|
||||
// C-states indicate CPU idle depth: C1=light sleep, C6=deep sleep, C10=deepest
|
||||
|
||||
// Fallback: parse /proc/cpuinfo for base frequency
|
||||
if let Ok(content) = utils::read_proc_file("/proc/cpuinfo") {
|
||||
for line in content.lines() {
|
||||
if line.starts_with("cpu MHz") {
|
||||
if let Some(freq_str) = line.split(':').nth(1) {
|
||||
if let Ok(freq_mhz) = utils::parse_f32(freq_str) {
|
||||
agent_data.system.cpu.frequency_mhz = freq_mhz;
|
||||
return Ok(());
|
||||
let mut deepest_state = String::from("C0"); // Default to active
|
||||
let mut max_time: u64 = 0;
|
||||
|
||||
// Check C-states from CPU0
|
||||
for state_num in 0..=10 {
|
||||
let time_path = format!("/sys/devices/system/cpu/cpu0/cpuidle/state{}/time", state_num);
|
||||
let name_path = format!("/sys/devices/system/cpu/cpu0/cpuidle/state{}/name", state_num);
|
||||
|
||||
if let Ok(time_str) = utils::read_proc_file(&time_path) {
|
||||
if let Ok(time) = utils::parse_u64(time_str.trim()) {
|
||||
if time > max_time {
|
||||
// This state has most accumulated time
|
||||
if let Ok(name) = utils::read_proc_file(&name_path) {
|
||||
let state_name = name.trim().to_string();
|
||||
// Skip POLL state (not real idle)
|
||||
if state_name != "POLL" {
|
||||
max_time = time;
|
||||
deepest_state = state_name;
|
||||
}
|
||||
}
|
||||
}
|
||||
break; // Only need first CPU entry
|
||||
}
|
||||
} else {
|
||||
// No more states available
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
debug!("CPU frequency not available");
|
||||
// Leave frequency as 0.0 if not available
|
||||
agent_data.system.cpu.cstate = deepest_state;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -165,8 +169,8 @@ impl Collector for CpuCollector {
|
||||
// Collect temperature (optional)
|
||||
self.collect_temperature(agent_data).await?;
|
||||
|
||||
// Collect frequency (optional)
|
||||
self.collect_frequency(agent_data).await?;
|
||||
// Collect C-state (CPU idle depth)
|
||||
self.collect_cstate(agent_data).await?;
|
||||
|
||||
let duration = start.elapsed();
|
||||
debug!("CPU collection completed in {:?}", duration);
|
||||
|
||||
@@ -117,7 +117,7 @@ impl DiskCollector {
|
||||
let mut cmd = Command::new("lsblk");
|
||||
cmd.args(&["-rn", "-o", "NAME,MOUNTPOINT"]);
|
||||
|
||||
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await
|
||||
let output = run_command_with_timeout(cmd, 2).await
|
||||
.map_err(|e| CollectorError::SystemRead {
|
||||
path: "block devices".to_string(),
|
||||
error: e.to_string(),
|
||||
@@ -530,9 +530,6 @@ impl DiskCollector {
|
||||
|
||||
/// Populate drives data into AgentData
|
||||
fn populate_drives_data(&self, physical_drives: &[PhysicalDrive], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Clear existing drives data to prevent duplicates in cached architecture
|
||||
agent_data.system.storage.drives.clear();
|
||||
|
||||
for drive in physical_drives {
|
||||
let smart = smart_data.get(&drive.name);
|
||||
|
||||
@@ -570,9 +567,6 @@ impl DiskCollector {
|
||||
|
||||
/// Populate pools data into AgentData
|
||||
fn populate_pools_data(&self, mergerfs_pools: &[MergerfsPool], smart_data: &HashMap<String, SmartData>, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Clear existing pools data to prevent duplicates in cached architecture
|
||||
agent_data.system.storage.pools.clear();
|
||||
|
||||
for pool in mergerfs_pools {
|
||||
// Calculate pool health and statuses based on member drive health
|
||||
let (pool_health, health_status, usage_status, data_drive_data, parity_drive_data) = self.calculate_pool_health(pool, smart_data);
|
||||
|
||||
@@ -97,12 +97,9 @@ impl MemoryCollector {
|
||||
|
||||
/// Populate tmpfs data into AgentData
|
||||
async fn populate_tmpfs_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Clear existing tmpfs data to prevent duplicates in cached architecture
|
||||
agent_data.system.memory.tmpfs.clear();
|
||||
|
||||
// Discover all tmpfs mount points
|
||||
let tmpfs_mounts = self.discover_tmpfs_mounts()?;
|
||||
|
||||
|
||||
if tmpfs_mounts.is_empty() {
|
||||
debug!("No tmpfs mounts found to monitor");
|
||||
return Ok(());
|
||||
|
||||
@@ -8,12 +8,12 @@ use crate::config::NetworkConfig;
|
||||
|
||||
/// Network interface collector with physical/virtual classification and link status
|
||||
pub struct NetworkCollector {
|
||||
config: NetworkConfig,
|
||||
_config: NetworkConfig,
|
||||
}
|
||||
|
||||
impl NetworkCollector {
|
||||
pub fn new(config: NetworkConfig) -> Self {
|
||||
Self { config }
|
||||
Self { _config: config }
|
||||
}
|
||||
|
||||
/// Check if interface is physical (not virtual)
|
||||
@@ -50,9 +50,8 @@ impl NetworkCollector {
|
||||
}
|
||||
|
||||
/// Get the primary physical interface (the one with default route)
|
||||
fn get_primary_physical_interface(&self) -> Option<String> {
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
match Command::new("timeout").args([&timeout_str, "ip", "route", "show", "default"]).output() {
|
||||
fn get_primary_physical_interface() -> Option<String> {
|
||||
match Command::new("timeout").args(["2", "ip", "route", "show", "default"]).output() {
|
||||
Ok(output) if output.status.success() => {
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
// Parse: "default via 192.168.1.1 dev eno1 ..."
|
||||
@@ -111,8 +110,7 @@ impl NetworkCollector {
|
||||
// Parse VLAN configuration
|
||||
let vlan_map = Self::parse_vlan_config();
|
||||
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
match Command::new("timeout").args([&timeout_str, "ip", "-j", "addr"]).output() {
|
||||
match Command::new("timeout").args(["2", "ip", "-j", "addr"]).output() {
|
||||
Ok(output) if output.status.success() => {
|
||||
let json_str = String::from_utf8_lossy(&output.stdout);
|
||||
|
||||
@@ -197,7 +195,7 @@ impl NetworkCollector {
|
||||
}
|
||||
|
||||
// Assign primary physical interface as parent to virtual interfaces without explicit parent
|
||||
let primary_interface = self.get_primary_physical_interface();
|
||||
let primary_interface = Self::get_primary_physical_interface();
|
||||
if let Some(primary) = primary_interface {
|
||||
for interface in interfaces.iter_mut() {
|
||||
// Only assign parent to virtual interfaces that don't already have one
|
||||
|
||||
@@ -4,7 +4,7 @@ use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetr
|
||||
use std::process::Command;
|
||||
use std::sync::RwLock;
|
||||
use std::time::Instant;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::debug;
|
||||
|
||||
use super::{Collector, CollectorError};
|
||||
use crate::config::SystemdConfig;
|
||||
@@ -43,9 +43,10 @@ struct ServiceCacheState {
|
||||
/// Cached service status information from systemctl list-units
|
||||
#[derive(Debug, Clone)]
|
||||
struct ServiceStatusInfo {
|
||||
load_state: String,
|
||||
active_state: String,
|
||||
sub_state: String,
|
||||
memory_bytes: Option<u64>,
|
||||
restart_count: Option<u32>,
|
||||
start_timestamp: Option<u64>,
|
||||
}
|
||||
|
||||
impl SystemdCollector {
|
||||
@@ -86,14 +87,20 @@ impl SystemdCollector {
|
||||
let mut complete_service_data = Vec::new();
|
||||
for service_name in &monitored_services {
|
||||
match self.get_service_status(service_name) {
|
||||
Ok((active_status, _detailed_info)) => {
|
||||
let memory_mb = self.get_service_memory_usage(service_name).await.unwrap_or(0.0);
|
||||
let disk_gb = self.get_service_disk_usage(service_name).await.unwrap_or(0.0);
|
||||
|
||||
Ok(status_info) => {
|
||||
let mut sub_services = Vec::new();
|
||||
|
||||
// Calculate uptime if we have start timestamp
|
||||
let uptime_seconds = status_info.start_timestamp.and_then(|start| {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.ok()?
|
||||
.as_secs();
|
||||
Some(now.saturating_sub(start))
|
||||
});
|
||||
|
||||
// Sub-service metrics for specific services (always include cached results)
|
||||
if service_name.contains("nginx") && active_status == "active" {
|
||||
if service_name.contains("nginx") && status_info.active_state == "active" {
|
||||
let nginx_sites = self.get_nginx_site_metrics();
|
||||
for (site_name, latency_ms) in nginx_sites {
|
||||
let site_status = if latency_ms >= 0.0 && latency_ms < self.config.nginx_latency_critical_ms {
|
||||
@@ -118,7 +125,7 @@ impl SystemdCollector {
|
||||
}
|
||||
}
|
||||
|
||||
if service_name.contains("docker") && active_status == "active" {
|
||||
if service_name.contains("docker") && status_info.active_state == "active" {
|
||||
let docker_containers = self.get_docker_containers();
|
||||
for (container_name, container_status) in docker_containers {
|
||||
// For now, docker containers have no additional metrics
|
||||
@@ -155,11 +162,12 @@ impl SystemdCollector {
|
||||
// Create complete service data
|
||||
let service_data = ServiceData {
|
||||
name: service_name.clone(),
|
||||
memory_mb,
|
||||
disk_gb,
|
||||
user_stopped: false, // TODO: Integrate with service tracker
|
||||
service_status: self.calculate_service_status(service_name, &active_status),
|
||||
service_status: self.calculate_service_status(service_name, &status_info.active_state),
|
||||
sub_services,
|
||||
memory_bytes: status_info.memory_bytes,
|
||||
restart_count: status_info.restart_count,
|
||||
uptime_seconds,
|
||||
};
|
||||
|
||||
// Add to AgentData and cache
|
||||
@@ -254,19 +262,18 @@ impl SystemdCollector {
|
||||
|
||||
/// Auto-discover interesting services to monitor
|
||||
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
|
||||
// First: Get all service unit files
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
// First: Get all service unit files (with 3 second timeout)
|
||||
let unit_files_output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
|
||||
.args(&["3", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
|
||||
.output()?;
|
||||
|
||||
if !unit_files_output.status.success() {
|
||||
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
|
||||
}
|
||||
|
||||
// Second: Get runtime status of all units
|
||||
// Second: Get runtime status of all units (with 3 second timeout)
|
||||
let units_status_output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
||||
.args(&["3", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
||||
.output()?;
|
||||
|
||||
if !units_status_output.status.success() {
|
||||
@@ -296,14 +303,13 @@ impl SystemdCollector {
|
||||
let fields: Vec<&str> = line.split_whitespace().collect();
|
||||
if fields.len() >= 4 && fields[0].ends_with(".service") {
|
||||
let service_name = fields[0].trim_end_matches(".service");
|
||||
let load_state = fields.get(1).unwrap_or(&"unknown").to_string();
|
||||
let active_state = fields.get(2).unwrap_or(&"unknown").to_string();
|
||||
let sub_state = fields.get(3).unwrap_or(&"unknown").to_string();
|
||||
|
||||
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
||||
load_state,
|
||||
active_state,
|
||||
sub_state,
|
||||
memory_bytes: None,
|
||||
restart_count: None,
|
||||
start_timestamp: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -312,9 +318,10 @@ impl SystemdCollector {
|
||||
for service_name in &all_service_names {
|
||||
if !status_cache.contains_key(service_name) {
|
||||
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
||||
load_state: "not-loaded".to_string(),
|
||||
active_state: "inactive".to_string(),
|
||||
sub_state: "dead".to_string(),
|
||||
memory_bytes: None,
|
||||
restart_count: None,
|
||||
start_timestamp: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -346,37 +353,60 @@ impl SystemdCollector {
|
||||
Ok((services, status_cache))
|
||||
}
|
||||
|
||||
/// Get service status from cache (if available) or fallback to systemctl
|
||||
fn get_service_status(&self, service: &str) -> Result<(String, String)> {
|
||||
// Try to get status from cache first
|
||||
if let Ok(state) = self.state.read() {
|
||||
if let Some(cached_info) = state.service_status_cache.get(service) {
|
||||
let active_status = cached_info.active_state.clone();
|
||||
let detailed_info = format!(
|
||||
"LoadState={}\nActiveState={}\nSubState={}",
|
||||
cached_info.load_state,
|
||||
cached_info.active_state,
|
||||
cached_info.sub_state
|
||||
);
|
||||
return Ok((active_status, detailed_info));
|
||||
/// Get service status with detailed metrics from systemctl
|
||||
fn get_service_status(&self, service: &str) -> Result<ServiceStatusInfo> {
|
||||
// Always fetch fresh data to get detailed metrics (memory, restarts, uptime)
|
||||
// Note: Cache in service_status_cache only has basic active_state from discovery,
|
||||
// with all detailed metrics set to None. We need fresh systemctl show data.
|
||||
|
||||
let output = Command::new("timeout")
|
||||
.args(&[
|
||||
"2",
|
||||
"systemctl",
|
||||
"show",
|
||||
&format!("{}.service", service),
|
||||
"--property=LoadState,ActiveState,SubState,MemoryCurrent,NRestarts,ExecMainStartTimestamp"
|
||||
])
|
||||
.output()?;
|
||||
|
||||
let output_str = String::from_utf8(output.stdout)?;
|
||||
|
||||
// Parse properties
|
||||
let mut active_state = String::new();
|
||||
let mut memory_bytes = None;
|
||||
let mut restart_count = None;
|
||||
let mut start_timestamp = None;
|
||||
|
||||
for line in output_str.lines() {
|
||||
if let Some(value) = line.strip_prefix("ActiveState=") {
|
||||
active_state = value.to_string();
|
||||
} else if let Some(value) = line.strip_prefix("MemoryCurrent=") {
|
||||
if value != "[not set]" {
|
||||
memory_bytes = value.parse().ok();
|
||||
}
|
||||
} else if let Some(value) = line.strip_prefix("NRestarts=") {
|
||||
restart_count = value.parse().ok();
|
||||
} else if let Some(value) = line.strip_prefix("ExecMainStartTimestamp=") {
|
||||
if value != "[not set]" && !value.is_empty() {
|
||||
// Parse timestamp to seconds since epoch
|
||||
if let Ok(output) = Command::new("date")
|
||||
.args(&["+%s", "-d", value])
|
||||
.output()
|
||||
{
|
||||
if let Ok(timestamp_str) = String::from_utf8(output.stdout) {
|
||||
start_timestamp = timestamp_str.trim().parse().ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to systemctl if not in cache
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
let output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "systemctl", "is-active", &format!("{}.service", service)])
|
||||
.output()?;
|
||||
|
||||
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
|
||||
|
||||
// Get more detailed info
|
||||
let output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
|
||||
.output()?;
|
||||
|
||||
let detailed_info = String::from_utf8(output.stdout)?;
|
||||
Ok((active_status, detailed_info))
|
||||
Ok(ServiceStatusInfo {
|
||||
active_state,
|
||||
memory_bytes,
|
||||
restart_count,
|
||||
start_timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if service name matches pattern (supports wildcards like nginx*)
|
||||
@@ -418,81 +448,6 @@ impl SystemdCollector {
|
||||
true
|
||||
}
|
||||
|
||||
/// Get disk usage for a specific service
|
||||
async fn get_service_disk_usage(&self, service_name: &str) -> Result<f32, CollectorError> {
|
||||
// Check if this service has configured directory paths
|
||||
if let Some(dirs) = self.config.service_directories.get(service_name) {
|
||||
// Service has configured paths - use the first accessible one
|
||||
for dir in dirs {
|
||||
if let Some(size) = self.get_directory_size(dir).await {
|
||||
return Ok(size);
|
||||
}
|
||||
}
|
||||
// If configured paths failed, return 0
|
||||
return Ok(0.0);
|
||||
}
|
||||
|
||||
// No configured path - try to get WorkingDirectory from systemctl
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
let output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
|
||||
.output()
|
||||
.map_err(|e| CollectorError::SystemRead {
|
||||
path: format!("WorkingDirectory for {}", service_name),
|
||||
error: e.to_string(),
|
||||
})?;
|
||||
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
for line in output_str.lines() {
|
||||
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
|
||||
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
|
||||
if !dir.is_empty() && dir != "/" {
|
||||
return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(0.0)
|
||||
}
|
||||
|
||||
/// Get size of a directory in GB
|
||||
async fn get_directory_size(&self, path: &str) -> Option<f32> {
|
||||
use super::run_command_with_timeout;
|
||||
|
||||
// Use -s (summary) and --apparent-size for speed
|
||||
let mut cmd = Command::new("sudo");
|
||||
cmd.args(&["du", "-s", "--apparent-size", "--block-size=1", path]);
|
||||
|
||||
let output = run_command_with_timeout(cmd, self.config.command_timeout_seconds).await.ok()?;
|
||||
|
||||
if !output.status.success() {
|
||||
// Log permission errors for debugging but don't spam logs
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
if stderr.contains("Permission denied") {
|
||||
debug!("Permission denied accessing directory: {}", path);
|
||||
} else if stderr.contains("timed out") {
|
||||
warn!("Directory size check timed out for {}", path);
|
||||
} else {
|
||||
debug!("Failed to get size for directory {}: {}", path, stderr);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
let output_str = String::from_utf8(output.stdout).ok()?;
|
||||
let size_str = output_str.split_whitespace().next()?;
|
||||
if let Ok(size_bytes) = size_str.parse::<u64>() {
|
||||
let size_gb = size_bytes as f32 / (1024.0 * 1024.0 * 1024.0);
|
||||
// Return size even if very small (minimum 0.001 GB = 1MB for visibility)
|
||||
if size_gb > 0.0 {
|
||||
Some(size_gb.max(0.001))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate service status, taking user-stopped services into account
|
||||
fn calculate_service_status(&self, service_name: &str, active_status: &str) -> Status {
|
||||
match active_status.to_lowercase().as_str() {
|
||||
@@ -510,33 +465,6 @@ impl SystemdCollector {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get memory usage for a specific service
|
||||
async fn get_service_memory_usage(&self, service_name: &str) -> Result<f32, CollectorError> {
|
||||
let output = Command::new("systemctl")
|
||||
.args(&["show", &format!("{}.service", service_name), "--property=MemoryCurrent"])
|
||||
.output()
|
||||
.map_err(|e| CollectorError::SystemRead {
|
||||
path: format!("memory usage for {}", service_name),
|
||||
error: e.to_string(),
|
||||
})?;
|
||||
|
||||
let output_str = String::from_utf8_lossy(&output.stdout);
|
||||
|
||||
for line in output_str.lines() {
|
||||
if line.starts_with("MemoryCurrent=") {
|
||||
if let Some(mem_str) = line.strip_prefix("MemoryCurrent=") {
|
||||
if mem_str != "[not set]" {
|
||||
if let Ok(memory_bytes) = mem_str.parse::<u64>() {
|
||||
return Ok(memory_bytes as f32 / (1024.0 * 1024.0)); // Convert to MB
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(0.0)
|
||||
}
|
||||
|
||||
/// Check if service collection cache should be updated
|
||||
fn should_update_cache(&self) -> bool {
|
||||
let state = self.state.read().unwrap();
|
||||
@@ -789,10 +717,9 @@ impl SystemdCollector {
|
||||
let mut containers = Vec::new();
|
||||
|
||||
// Check if docker is available (cm-agent user is in docker group)
|
||||
// Use -a to show ALL containers (running and stopped)
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
// Use -a to show ALL containers (running and stopped) with 3 second timeout
|
||||
let output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
|
||||
.args(&["3", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
|
||||
.output();
|
||||
|
||||
let output = match output {
|
||||
@@ -833,10 +760,9 @@ impl SystemdCollector {
|
||||
/// Get docker images as sub-services
|
||||
fn get_docker_images(&self) -> Vec<(String, String, f32)> {
|
||||
let mut images = Vec::new();
|
||||
// Check if docker is available (cm-agent user is in docker group)
|
||||
let timeout_str = self.config.command_timeout_seconds.to_string();
|
||||
// Check if docker is available (cm-agent user is in docker group) with 3 second timeout
|
||||
let output = Command::new("timeout")
|
||||
.args(&[&timeout_str, "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
|
||||
.args(&["3", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
|
||||
.output();
|
||||
|
||||
let output = match output {
|
||||
@@ -915,9 +841,6 @@ impl SystemdCollector {
|
||||
#[async_trait]
|
||||
impl Collector for SystemdCollector {
|
||||
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||
// Clear existing services data to prevent duplicates in cached architecture
|
||||
agent_data.services.clear();
|
||||
|
||||
// Use cached complete data if available and fresh
|
||||
if let Some(cached_complete_services) = self.get_cached_complete_services() {
|
||||
for service_data in cached_complete_services {
|
||||
|
||||
@@ -5,10 +5,9 @@ use zmq::{Context, Socket, SocketType};
|
||||
|
||||
use crate::config::ZmqConfig;
|
||||
|
||||
/// ZMQ communication handler for publishing metrics and receiving commands
|
||||
/// ZMQ communication handler for publishing metrics
|
||||
pub struct ZmqHandler {
|
||||
publisher: Socket,
|
||||
command_receiver: Socket,
|
||||
}
|
||||
|
||||
impl ZmqHandler {
|
||||
@@ -26,20 +25,8 @@ impl ZmqHandler {
|
||||
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
||||
publisher.set_linger(1000)?; // Linger time on close
|
||||
|
||||
// Create command receiver socket (PULL socket to receive commands from dashboard)
|
||||
let command_receiver = context.socket(SocketType::PULL)?;
|
||||
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
|
||||
command_receiver.bind(&cmd_bind_address)?;
|
||||
|
||||
info!("ZMQ command receiver bound to {}", cmd_bind_address);
|
||||
|
||||
// Set non-blocking mode for command receiver
|
||||
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
|
||||
command_receiver.set_linger(1000)?;
|
||||
|
||||
Ok(Self {
|
||||
publisher,
|
||||
command_receiver,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -65,36 +52,4 @@ impl ZmqHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to receive a command (non-blocking)
|
||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
||||
Ok(bytes) => {
|
||||
debug!("Received command message ({} bytes)", bytes.len());
|
||||
|
||||
let command: AgentCommand = serde_json::from_slice(&bytes)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
|
||||
|
||||
debug!("Parsed command: {:?}", command);
|
||||
Ok(Some(command))
|
||||
}
|
||||
Err(zmq::Error::EAGAIN) => {
|
||||
// No message available (non-blocking)
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Commands that can be sent to the agent
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub enum AgentCommand {
|
||||
/// Request immediate metric collection
|
||||
CollectNow,
|
||||
/// Change collection interval
|
||||
SetInterval { seconds: u64 },
|
||||
/// Enable/disable a collector
|
||||
ToggleCollector { name: String, enabled: bool },
|
||||
/// Request status/health check
|
||||
Ping,
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ pub struct AgentConfig {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ZmqConfig {
|
||||
pub publisher_port: u16,
|
||||
pub command_port: u16,
|
||||
pub bind_address: String,
|
||||
pub transmission_interval_seconds: u64,
|
||||
/// Heartbeat transmission interval in seconds for host connectivity detection
|
||||
@@ -79,9 +78,6 @@ pub struct DiskConfig {
|
||||
pub temperature_critical_celsius: f32,
|
||||
pub wear_warning_percent: f32,
|
||||
pub wear_critical_percent: f32,
|
||||
/// Command timeout in seconds for lsblk, smartctl, etc.
|
||||
#[serde(default = "default_disk_command_timeout")]
|
||||
pub command_timeout_seconds: u64,
|
||||
}
|
||||
|
||||
/// Filesystem configuration entry
|
||||
@@ -111,9 +107,6 @@ pub struct SystemdConfig {
|
||||
pub http_timeout_seconds: u64,
|
||||
pub http_connect_timeout_seconds: u64,
|
||||
pub nginx_latency_critical_ms: f32,
|
||||
/// Command timeout in seconds for systemctl, docker, du commands
|
||||
#[serde(default = "default_systemd_command_timeout")]
|
||||
pub command_timeout_seconds: u64,
|
||||
}
|
||||
|
||||
|
||||
@@ -138,9 +131,6 @@ pub struct BackupConfig {
|
||||
pub struct NetworkConfig {
|
||||
pub enabled: bool,
|
||||
pub interval_seconds: u64,
|
||||
/// Command timeout in seconds for ip route, ip addr commands
|
||||
#[serde(default = "default_network_command_timeout")]
|
||||
pub command_timeout_seconds: u64,
|
||||
}
|
||||
|
||||
/// Notification configuration
|
||||
@@ -154,9 +144,6 @@ pub struct NotificationConfig {
|
||||
pub rate_limit_minutes: u64,
|
||||
/// Email notification batching interval in seconds (default: 60)
|
||||
pub aggregation_interval_seconds: u64,
|
||||
/// Status check interval in seconds for detecting changes (default: 30)
|
||||
#[serde(default = "default_notification_check_interval")]
|
||||
pub check_interval_seconds: u64,
|
||||
/// List of metric names to exclude from email notifications
|
||||
#[serde(default)]
|
||||
pub exclude_email_metrics: Vec<String>,
|
||||
@@ -170,26 +157,10 @@ fn default_heartbeat_interval_seconds() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_notification_check_interval() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
fn default_maintenance_mode_file() -> String {
|
||||
"/tmp/cm-maintenance".to_string()
|
||||
}
|
||||
|
||||
fn default_disk_command_timeout() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
fn default_systemd_command_timeout() -> u64 {
|
||||
15
|
||||
}
|
||||
|
||||
fn default_network_command_timeout() -> u64 {
|
||||
10
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||
loader::load_config(path)
|
||||
|
||||
@@ -7,14 +7,6 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> {
|
||||
bail!("ZMQ publisher port cannot be 0");
|
||||
}
|
||||
|
||||
if config.zmq.command_port == 0 {
|
||||
bail!("ZMQ command port cannot be 0");
|
||||
}
|
||||
|
||||
if config.zmq.publisher_port == config.zmq.command_port {
|
||||
bail!("ZMQ publisher and command ports cannot be the same");
|
||||
}
|
||||
|
||||
if config.zmq.bind_address.is_empty() {
|
||||
bail!("ZMQ bind address cannot be empty");
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard"
|
||||
version = "0.1.194"
|
||||
version = "0.1.208"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -28,11 +28,12 @@ pub struct ServicesWidget {
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ServiceInfo {
|
||||
memory_mb: Option<f32>,
|
||||
disk_gb: Option<f32>,
|
||||
metrics: Vec<(String, f32, Option<String>)>, // (label, value, unit)
|
||||
widget_status: Status,
|
||||
service_type: String, // "nginx_site", "container", "image", or empty for parent services
|
||||
memory_bytes: Option<u64>,
|
||||
restart_count: Option<u32>,
|
||||
uptime_seconds: Option<u64>,
|
||||
}
|
||||
|
||||
impl ServicesWidget {
|
||||
@@ -52,8 +53,6 @@ impl ServicesWidget {
|
||||
if metric_name.starts_with("service_") {
|
||||
if let Some(end_pos) = metric_name
|
||||
.rfind("_status")
|
||||
.or_else(|| metric_name.rfind("_memory_mb"))
|
||||
.or_else(|| metric_name.rfind("_disk_gb"))
|
||||
.or_else(|| metric_name.rfind("_latency_ms"))
|
||||
{
|
||||
let service_part = &metric_name[8..end_pos]; // Remove "service_" prefix
|
||||
@@ -76,36 +75,8 @@ impl ServicesWidget {
|
||||
None
|
||||
}
|
||||
|
||||
/// Format disk size with appropriate units (kB/MB/GB)
|
||||
fn format_disk_size(size_gb: f32) -> String {
|
||||
let size_mb = size_gb * 1024.0; // Convert GB to MB
|
||||
|
||||
if size_mb >= 1024.0 {
|
||||
// Show as GB
|
||||
format!("{:.1}GB", size_gb)
|
||||
} else if size_mb >= 1.0 {
|
||||
// Show as MB
|
||||
format!("{:.0}MB", size_mb)
|
||||
} else if size_mb >= 0.001 {
|
||||
// Convert to kB
|
||||
let size_kb = size_mb * 1024.0;
|
||||
format!("{:.0}kB", size_kb)
|
||||
} else {
|
||||
// Show very small sizes as bytes
|
||||
let size_bytes = size_mb * 1024.0 * 1024.0;
|
||||
format!("{:.0}B", size_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
/// Format parent service line - returns text without icon for span formatting
|
||||
fn format_parent_service_line(&self, name: &str, info: &ServiceInfo) -> String {
|
||||
let memory_str = info
|
||||
.memory_mb
|
||||
.map_or("0M".to_string(), |m| format!("{:.0}M", m));
|
||||
let disk_str = info
|
||||
.disk_gb
|
||||
.map_or("0".to_string(), |d| Self::format_disk_size(d));
|
||||
|
||||
// Truncate long service names to fit layout (account for icon space)
|
||||
let short_name = if name.len() > 22 {
|
||||
format!("{}...", &name[..19])
|
||||
@@ -116,7 +87,7 @@ impl ServicesWidget {
|
||||
// Convert Status enum to display text
|
||||
let status_str = match info.widget_status {
|
||||
Status::Ok => "active",
|
||||
Status::Inactive => "inactive",
|
||||
Status::Inactive => "inactive",
|
||||
Status::Critical => "failed",
|
||||
Status::Pending => "pending",
|
||||
Status::Warning => "warning",
|
||||
@@ -124,9 +95,43 @@ impl ServicesWidget {
|
||||
Status::Offline => "offline",
|
||||
};
|
||||
|
||||
// Format memory
|
||||
let memory_str = info.memory_bytes.map_or("-".to_string(), |bytes| {
|
||||
let mb = bytes as f64 / (1024.0 * 1024.0);
|
||||
if mb >= 1000.0 {
|
||||
format!("{:.1}G", mb / 1024.0)
|
||||
} else {
|
||||
format!("{:.0}M", mb)
|
||||
}
|
||||
});
|
||||
|
||||
// Format uptime
|
||||
let uptime_str = info.uptime_seconds.map_or("-".to_string(), |secs| {
|
||||
let days = secs / 86400;
|
||||
let hours = (secs % 86400) / 3600;
|
||||
let mins = (secs % 3600) / 60;
|
||||
|
||||
if days > 0 {
|
||||
format!("{}d{}h", days, hours)
|
||||
} else if hours > 0 {
|
||||
format!("{}h{}m", hours, mins)
|
||||
} else {
|
||||
format!("{}m", mins)
|
||||
}
|
||||
});
|
||||
|
||||
// Format restarts (show "!" if > 0 to indicate instability)
|
||||
let restart_str = info.restart_count.map_or("-".to_string(), |count| {
|
||||
if count > 0 {
|
||||
format!("!{}", count)
|
||||
} else {
|
||||
"0".to_string()
|
||||
}
|
||||
});
|
||||
|
||||
format!(
|
||||
"{:<23} {:<10} {:<8} {:<8}",
|
||||
short_name, status_str, memory_str, disk_str
|
||||
"{:<23} {:<10} {:<8} {:<8} {:<5}",
|
||||
short_name, status_str, memory_str, uptime_str, restart_str
|
||||
)
|
||||
}
|
||||
|
||||
@@ -309,11 +314,12 @@ impl Widget for ServicesWidget {
|
||||
for service in &agent_data.services {
|
||||
// Store parent service
|
||||
let parent_info = ServiceInfo {
|
||||
memory_mb: Some(service.memory_mb),
|
||||
disk_gb: Some(service.disk_gb),
|
||||
metrics: Vec::new(), // Parent services don't have custom metrics
|
||||
widget_status: service.service_status,
|
||||
service_type: String::new(), // Parent services have no type
|
||||
memory_bytes: service.memory_bytes,
|
||||
restart_count: service.restart_count,
|
||||
uptime_seconds: service.uptime_seconds,
|
||||
};
|
||||
self.parent_services.insert(service.name.clone(), parent_info);
|
||||
|
||||
@@ -327,11 +333,12 @@ impl Widget for ServicesWidget {
|
||||
.collect();
|
||||
|
||||
let sub_info = ServiceInfo {
|
||||
memory_mb: None, // Not used for sub-services
|
||||
disk_gb: None, // Not used for sub-services
|
||||
metrics,
|
||||
widget_status: sub_service.service_status,
|
||||
service_type: sub_service.service_type.clone(),
|
||||
memory_bytes: None, // Sub-services don't have individual metrics yet
|
||||
restart_count: None,
|
||||
uptime_seconds: None,
|
||||
};
|
||||
sub_list.push((sub_service.name.clone(), sub_info));
|
||||
}
|
||||
@@ -371,23 +378,16 @@ impl ServicesWidget {
|
||||
self.parent_services
|
||||
.entry(parent_service)
|
||||
.or_insert(ServiceInfo {
|
||||
memory_mb: None,
|
||||
disk_gb: None,
|
||||
metrics: Vec::new(),
|
||||
widget_status: Status::Unknown,
|
||||
service_type: String::new(),
|
||||
memory_bytes: None,
|
||||
restart_count: None,
|
||||
uptime_seconds: None,
|
||||
});
|
||||
|
||||
if metric.name.ends_with("_status") {
|
||||
service_info.widget_status = metric.status;
|
||||
} else if metric.name.ends_with("_memory_mb") {
|
||||
if let Some(memory) = metric.value.as_f32() {
|
||||
service_info.memory_mb = Some(memory);
|
||||
}
|
||||
} else if metric.name.ends_with("_disk_gb") {
|
||||
if let Some(disk) = metric.value.as_f32() {
|
||||
service_info.disk_gb = Some(disk);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(sub_name) => {
|
||||
@@ -407,11 +407,12 @@ impl ServicesWidget {
|
||||
sub_service_list.push((
|
||||
sub_name.clone(),
|
||||
ServiceInfo {
|
||||
memory_mb: None,
|
||||
disk_gb: None,
|
||||
metrics: Vec::new(),
|
||||
widget_status: Status::Unknown,
|
||||
service_type: String::new(), // Unknown type in legacy path
|
||||
memory_bytes: None,
|
||||
restart_count: None,
|
||||
uptime_seconds: None,
|
||||
},
|
||||
));
|
||||
&mut sub_service_list.last_mut().unwrap().1
|
||||
@@ -419,14 +420,6 @@ impl ServicesWidget {
|
||||
|
||||
if metric.name.ends_with("_status") {
|
||||
sub_service_info.widget_status = metric.status;
|
||||
} else if metric.name.ends_with("_memory_mb") {
|
||||
if let Some(memory) = metric.value.as_f32() {
|
||||
sub_service_info.memory_mb = Some(memory);
|
||||
}
|
||||
} else if metric.name.ends_with("_disk_gb") {
|
||||
if let Some(disk) = metric.value.as_f32() {
|
||||
sub_service_info.disk_gb = Some(disk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -485,8 +478,8 @@ impl ServicesWidget {
|
||||
|
||||
// Header
|
||||
let header = format!(
|
||||
"{:<25} {:<10} {:<8} {:<8}",
|
||||
"Service:", "Status:", "RAM:", "Disk:"
|
||||
"{:<25} {:<10} {:<8} {:<8} {:<5}",
|
||||
"Service:", "Status:", "RAM:", "Uptime:", "↻:"
|
||||
);
|
||||
let header_para = Paragraph::new(header).style(Typography::muted());
|
||||
frame.render_widget(header_para, content_chunks[0]);
|
||||
|
||||
@@ -26,7 +26,7 @@ pub struct SystemWidget {
|
||||
cpu_load_1min: Option<f32>,
|
||||
cpu_load_5min: Option<f32>,
|
||||
cpu_load_15min: Option<f32>,
|
||||
cpu_frequency: Option<f32>,
|
||||
cpu_cstate: Option<String>,
|
||||
cpu_status: Status,
|
||||
|
||||
// Memory metrics
|
||||
@@ -102,7 +102,7 @@ impl SystemWidget {
|
||||
cpu_load_1min: None,
|
||||
cpu_load_5min: None,
|
||||
cpu_load_15min: None,
|
||||
cpu_frequency: None,
|
||||
cpu_cstate: None,
|
||||
cpu_status: Status::Unknown,
|
||||
memory_usage_percent: None,
|
||||
memory_used_gb: None,
|
||||
@@ -137,11 +137,11 @@ impl SystemWidget {
|
||||
}
|
||||
}
|
||||
|
||||
/// Format CPU frequency
|
||||
fn format_cpu_frequency(&self) -> String {
|
||||
match self.cpu_frequency {
|
||||
Some(freq) => format!("{:.0} MHz", freq),
|
||||
None => "— MHz".to_string(),
|
||||
/// Format CPU C-state (idle depth)
|
||||
fn format_cpu_cstate(&self) -> String {
|
||||
match &self.cpu_cstate {
|
||||
Some(cstate) => cstate.clone(),
|
||||
None => "—".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ impl Widget for SystemWidget {
|
||||
self.cpu_load_1min = Some(cpu.load_1min);
|
||||
self.cpu_load_5min = Some(cpu.load_5min);
|
||||
self.cpu_load_15min = Some(cpu.load_15min);
|
||||
self.cpu_frequency = Some(cpu.frequency_mhz);
|
||||
self.cpu_cstate = Some(cpu.cstate.clone());
|
||||
self.cpu_status = Status::Ok;
|
||||
|
||||
// Extract memory data directly
|
||||
@@ -832,10 +832,10 @@ impl SystemWidget {
|
||||
);
|
||||
lines.push(Line::from(cpu_spans));
|
||||
|
||||
let freq_text = self.format_cpu_frequency();
|
||||
let cstate_text = self.format_cpu_cstate();
|
||||
lines.push(Line::from(vec![
|
||||
Span::styled(" └─ ", Typography::tree()),
|
||||
Span::styled(format!("Freq: {}", freq_text), Typography::secondary())
|
||||
Span::styled(format!("C-state: {}", cstate_text), Typography::secondary())
|
||||
]));
|
||||
|
||||
// RAM section
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "cm-dashboard-shared"
|
||||
version = "0.1.194"
|
||||
version = "0.1.208"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -46,7 +46,7 @@ pub struct CpuData {
|
||||
pub load_1min: f32,
|
||||
pub load_5min: f32,
|
||||
pub load_15min: f32,
|
||||
pub frequency_mhz: f32,
|
||||
pub cstate: String, // Deepest C-state in use (C1, C6, C10, etc.) - indicates CPU idle depth
|
||||
pub temperature_celsius: Option<f32>,
|
||||
pub load_status: Status,
|
||||
pub temperature_status: Status,
|
||||
@@ -136,11 +136,15 @@ pub struct PoolDriveData {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ServiceData {
|
||||
pub name: String,
|
||||
pub memory_mb: f32,
|
||||
pub disk_gb: f32,
|
||||
pub user_stopped: bool,
|
||||
pub service_status: Status,
|
||||
pub sub_services: Vec<SubServiceData>,
|
||||
/// Memory usage in bytes (from MemoryCurrent)
|
||||
pub memory_bytes: Option<u64>,
|
||||
/// Number of service restarts (from NRestarts)
|
||||
pub restart_count: Option<u32>,
|
||||
/// Uptime in seconds (calculated from ExecMainStartTimestamp)
|
||||
pub uptime_seconds: Option<u64>,
|
||||
}
|
||||
|
||||
/// Sub-service data (nginx sites, docker containers, etc.)
|
||||
@@ -200,7 +204,7 @@ impl AgentData {
|
||||
load_1min: 0.0,
|
||||
load_5min: 0.0,
|
||||
load_15min: 0.0,
|
||||
frequency_mhz: 0.0,
|
||||
cstate: String::from("C0"),
|
||||
temperature_celsius: None,
|
||||
load_status: Status::Unknown,
|
||||
temperature_status: Status::Unknown,
|
||||
|
||||
Reference in New Issue
Block a user