Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3fdcec8047 | |||
| 1fcaf4a670 | |||
| 885e19f7fd | |||
| a7b69b8ae7 | |||
| 2d290f40b2 | |||
| ad1fcaa27b |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.227"
|
version = "0.1.234"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -301,7 +301,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.227"
|
version = "0.1.234"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -325,7 +325,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.227"
|
version = "0.1.234"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.228"
|
version = "0.1.234"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use gethostname::gethostname;
|
use gethostname::gethostname;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use tokio::time::interval;
|
use tokio::time::interval;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
|
|
||||||
@@ -19,13 +19,22 @@ use crate::collectors::{
|
|||||||
use crate::notifications::NotificationManager;
|
use crate::notifications::NotificationManager;
|
||||||
use cm_dashboard_shared::AgentData;
|
use cm_dashboard_shared::AgentData;
|
||||||
|
|
||||||
|
/// Wrapper for collectors with timing information
|
||||||
|
struct TimedCollector {
|
||||||
|
collector: Box<dyn Collector>,
|
||||||
|
interval: Duration,
|
||||||
|
last_collection: Option<Instant>,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Agent {
|
pub struct Agent {
|
||||||
hostname: String,
|
hostname: String,
|
||||||
config: AgentConfig,
|
config: AgentConfig,
|
||||||
zmq_handler: ZmqHandler,
|
zmq_handler: ZmqHandler,
|
||||||
collectors: Vec<Box<dyn Collector>>,
|
collectors: Vec<TimedCollector>,
|
||||||
notification_manager: NotificationManager,
|
notification_manager: NotificationManager,
|
||||||
previous_status: Option<SystemStatus>,
|
previous_status: Option<SystemStatus>,
|
||||||
|
cached_agent_data: AgentData,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Track system component status for change detection
|
/// Track system component status for change detection
|
||||||
@@ -55,36 +64,78 @@ impl Agent {
|
|||||||
config.zmq.publisher_port
|
config.zmq.publisher_port
|
||||||
);
|
);
|
||||||
|
|
||||||
// Initialize collectors
|
// Initialize collectors with timing information
|
||||||
let mut collectors: Vec<Box<dyn Collector>> = Vec::new();
|
let mut collectors: Vec<TimedCollector> = Vec::new();
|
||||||
|
|
||||||
// Add enabled collectors
|
// Add enabled collectors
|
||||||
if config.collectors.cpu.enabled {
|
if config.collectors.cpu.enabled {
|
||||||
collectors.push(Box::new(CpuCollector::new(config.collectors.cpu.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(CpuCollector::new(config.collectors.cpu.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.cpu.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "CPU".to_string(),
|
||||||
|
});
|
||||||
|
info!("CPU collector initialized with {}s interval", config.collectors.cpu.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.memory.enabled {
|
if config.collectors.memory.enabled {
|
||||||
collectors.push(Box::new(MemoryCollector::new(config.collectors.memory.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(MemoryCollector::new(config.collectors.memory.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.memory.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "Memory".to_string(),
|
||||||
|
});
|
||||||
|
info!("Memory collector initialized with {}s interval", config.collectors.memory.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.disk.enabled {
|
if config.collectors.disk.enabled {
|
||||||
collectors.push(Box::new(DiskCollector::new(config.collectors.disk.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(DiskCollector::new(config.collectors.disk.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.disk.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "Disk".to_string(),
|
||||||
|
});
|
||||||
|
info!("Disk collector initialized with {}s interval", config.collectors.disk.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.systemd.enabled {
|
if config.collectors.systemd.enabled {
|
||||||
collectors.push(Box::new(SystemdCollector::new(config.collectors.systemd.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(SystemdCollector::new(config.collectors.systemd.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.systemd.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "Systemd".to_string(),
|
||||||
|
});
|
||||||
|
info!("Systemd collector initialized with {}s interval", config.collectors.systemd.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.backup.enabled {
|
if config.collectors.backup.enabled {
|
||||||
collectors.push(Box::new(BackupCollector::new()));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(BackupCollector::new()),
|
||||||
|
interval: Duration::from_secs(config.collectors.backup.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "Backup".to_string(),
|
||||||
|
});
|
||||||
|
info!("Backup collector initialized with {}s interval", config.collectors.backup.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.network.enabled {
|
if config.collectors.network.enabled {
|
||||||
collectors.push(Box::new(NetworkCollector::new(config.collectors.network.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(NetworkCollector::new(config.collectors.network.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.network.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "Network".to_string(),
|
||||||
|
});
|
||||||
|
info!("Network collector initialized with {}s interval", config.collectors.network.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.collectors.nixos.enabled {
|
if config.collectors.nixos.enabled {
|
||||||
collectors.push(Box::new(NixOSCollector::new(config.collectors.nixos.clone())));
|
collectors.push(TimedCollector {
|
||||||
|
collector: Box::new(NixOSCollector::new(config.collectors.nixos.clone())),
|
||||||
|
interval: Duration::from_secs(config.collectors.nixos.interval_seconds),
|
||||||
|
last_collection: None,
|
||||||
|
name: "NixOS".to_string(),
|
||||||
|
});
|
||||||
|
info!("NixOS collector initialized with {}s interval", config.collectors.nixos.interval_seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Initialized {} collectors", collectors.len());
|
info!("Initialized {} collectors", collectors.len());
|
||||||
@@ -93,6 +144,9 @@ impl Agent {
|
|||||||
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
|
let notification_manager = NotificationManager::new(&config.notifications, &hostname)?;
|
||||||
info!("Notification manager initialized");
|
info!("Notification manager initialized");
|
||||||
|
|
||||||
|
// Initialize cached agent data
|
||||||
|
let cached_agent_data = AgentData::new(hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
hostname,
|
hostname,
|
||||||
config,
|
config,
|
||||||
@@ -100,6 +154,7 @@ impl Agent {
|
|||||||
collectors,
|
collectors,
|
||||||
notification_manager,
|
notification_manager,
|
||||||
previous_status: None,
|
previous_status: None,
|
||||||
|
cached_agent_data,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -149,24 +204,47 @@ impl Agent {
|
|||||||
async fn collect_and_broadcast(&mut self) -> Result<()> {
|
async fn collect_and_broadcast(&mut self) -> Result<()> {
|
||||||
debug!("Starting structured data collection");
|
debug!("Starting structured data collection");
|
||||||
|
|
||||||
// Initialize empty AgentData
|
// Collect data from collectors whose intervals have elapsed
|
||||||
let mut agent_data = AgentData::new(self.hostname.clone(), env!("CARGO_PKG_VERSION").to_string());
|
// Update cached_agent_data with new data
|
||||||
|
let now = Instant::now();
|
||||||
|
for timed_collector in &mut self.collectors {
|
||||||
|
let should_collect = match timed_collector.last_collection {
|
||||||
|
None => true, // First collection
|
||||||
|
Some(last_time) => now.duration_since(last_time) >= timed_collector.interval,
|
||||||
|
};
|
||||||
|
|
||||||
// Collect data from all collectors
|
if should_collect {
|
||||||
for collector in &self.collectors {
|
if let Err(e) = timed_collector.collector.collect_structured(&mut self.cached_agent_data).await {
|
||||||
if let Err(e) = collector.collect_structured(&mut agent_data).await {
|
error!("Collector {} failed: {}", timed_collector.name, e);
|
||||||
error!("Collector failed: {}", e);
|
// Update last_collection time even on failure to prevent immediate retries
|
||||||
// Continue with other collectors even if one fails
|
timed_collector.last_collection = Some(now);
|
||||||
|
} else {
|
||||||
|
timed_collector.last_collection = Some(now);
|
||||||
|
debug!(
|
||||||
|
"Collected from {} ({}s interval)",
|
||||||
|
timed_collector.name,
|
||||||
|
timed_collector.interval.as_secs()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update timestamp on cached data
|
||||||
|
self.cached_agent_data.timestamp = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
// Clone for notification check (to avoid borrow issues)
|
||||||
|
let agent_data_snapshot = self.cached_agent_data.clone();
|
||||||
|
|
||||||
// Check for status changes and send notifications
|
// Check for status changes and send notifications
|
||||||
if let Err(e) = self.check_status_changes_and_notify(&agent_data).await {
|
if let Err(e) = self.check_status_changes_and_notify(&agent_data_snapshot).await {
|
||||||
error!("Failed to check status changes: {}", e);
|
error!("Failed to check status changes: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast the structured data via ZMQ
|
// Broadcast the cached structured data via ZMQ
|
||||||
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data).await {
|
if let Err(e) = self.zmq_handler.publish_agent_data(&agent_data_snapshot).await {
|
||||||
error!("Failed to broadcast agent data: {}", e);
|
error!("Failed to broadcast agent data: {}", e);
|
||||||
} else {
|
} else {
|
||||||
debug!("Successfully broadcast structured agent data");
|
debug!("Successfully broadcast structured agent data");
|
||||||
|
|||||||
@@ -66,6 +66,10 @@ impl DiskCollector {
|
|||||||
|
|
||||||
/// Collect all storage data and populate AgentData
|
/// Collect all storage data and populate AgentData
|
||||||
async fn collect_storage_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
async fn collect_storage_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||||
|
// Clear drives and pools to prevent duplicates when updating cached data
|
||||||
|
agent_data.system.storage.drives.clear();
|
||||||
|
agent_data.system.storage.pools.clear();
|
||||||
|
|
||||||
// Step 1: Get mount points and their backing devices
|
// Step 1: Get mount points and their backing devices
|
||||||
let mount_devices = self.get_mount_devices().await?;
|
let mount_devices = self.get_mount_devices().await?;
|
||||||
|
|
||||||
|
|||||||
@@ -200,6 +200,9 @@ impl Collector for MemoryCollector {
|
|||||||
debug!("Collecting memory metrics");
|
debug!("Collecting memory metrics");
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
// Clear tmpfs list to prevent duplicates when updating cached data
|
||||||
|
agent_data.system.memory.tmpfs.clear();
|
||||||
|
|
||||||
// Parse memory info from /proc/meminfo
|
// Parse memory info from /proc/meminfo
|
||||||
let info = self.parse_meminfo().await?;
|
let info = self.parse_meminfo().await?;
|
||||||
|
|
||||||
|
|||||||
@@ -159,6 +159,19 @@ impl SystemdCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if service_name.contains("openvpn-vpn-connection") && status_info.active_state == "active" {
|
||||||
|
if let Some(external_ip) = self.get_vpn_external_ip() {
|
||||||
|
let metrics = Vec::new();
|
||||||
|
|
||||||
|
sub_services.push(SubServiceData {
|
||||||
|
name: format!("IP: {}", external_ip),
|
||||||
|
service_status: Status::Ok,
|
||||||
|
metrics,
|
||||||
|
service_type: "vpn_route".to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create complete service data
|
// Create complete service data
|
||||||
let service_data = ServiceData {
|
let service_data = ServiceData {
|
||||||
name: service_name.clone(),
|
name: service_name.clone(),
|
||||||
@@ -836,11 +849,45 @@ impl SystemdCollector {
|
|||||||
_ => value, // Assume bytes if no unit
|
_ => value, // Assume bytes if no unit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get VPN external IP by querying through the vpn namespace
|
||||||
|
fn get_vpn_external_ip(&self) -> Option<String> {
|
||||||
|
let output = Command::new("timeout")
|
||||||
|
.args(&[
|
||||||
|
"5",
|
||||||
|
"sudo",
|
||||||
|
"ip",
|
||||||
|
"netns",
|
||||||
|
"exec",
|
||||||
|
"vpn",
|
||||||
|
"curl",
|
||||||
|
"-s",
|
||||||
|
"--max-time",
|
||||||
|
"4",
|
||||||
|
"https://ifconfig.me"
|
||||||
|
])
|
||||||
|
.output()
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
if output.status.success() {
|
||||||
|
let ip = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
||||||
|
if !ip.is_empty() && ip.contains('.') {
|
||||||
|
debug!("VPN external IP: {}", ip);
|
||||||
|
return Some(ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Failed to get VPN external IP");
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Collector for SystemdCollector {
|
impl Collector for SystemdCollector {
|
||||||
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
||||||
|
// Clear services to prevent duplicates when updating cached data
|
||||||
|
agent_data.services.clear();
|
||||||
|
|
||||||
// Use cached complete data if available and fresh
|
// Use cached complete data if available and fresh
|
||||||
if let Some(cached_complete_services) = self.get_cached_complete_services() {
|
if let Some(cached_complete_services) = self.get_cached_complete_services() {
|
||||||
for service_data in cached_complete_services {
|
for service_data in cached_complete_services {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.228"
|
version = "0.1.234"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.228"
|
version = "0.1.234"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
Reference in New Issue
Block a user