Compare commits

...

2 Commits

Author SHA1 Message Date
eab3f17428 Fix agent hang by reverting service discovery to systemctl
All checks were successful
Build and Release / build-and-release (push) Successful in 1m31s
The D-Bus ListUnits call in discover_services_internal() was causing
the agent to hang on startup.

**Root cause:**
- D-Bus ListUnits call with complex tuple destructuring hung indefinitely
- Agent never completed first collection cycle
- No collector output in logs

**Fix:**
- Revert discover_services_internal() to use systemctl list-units/list-unit-files
- Keep D-Bus-based property queries (WorkingDirectory, MemoryCurrent, ExecStart)
- Hybrid approach: systemctl for discovery, D-Bus for individual queries

**External commands still used:**
- systemctl list-units, list-unit-files (service discovery)
- smartctl (SMART data)
- sudo du (directory sizes)
- nginx -T (config fallback)

Version bump: 0.1.198 → 0.1.199
2025-11-28 11:57:31 +01:00
7ad149bbe4 Replace all systemctl commands with zbus D-Bus API
All checks were successful
Build and Release / build-and-release (push) Successful in 1m31s
Complete migration from systemctl subprocess calls to native D-Bus communication:

**Removed systemctl commands:**
- systemctl is-active (fallback) - use D-Bus cache from ListUnits
- systemctl show --property=LoadState,ActiveState,SubState - use D-Bus cache
- systemctl show --property=WorkingDirectory - use D-Bus Properties.Get
- systemctl show --property=MemoryCurrent - use D-Bus Properties.Get
- systemctl show nginx --property=ExecStart - use D-Bus Properties.Get

**Implementation details:**
- Added get_unit_property() helper for D-Bus property access
- Made get_nginx_site_metrics() async to support D-Bus calls
- Made get_nginx_sites_internal() async
- Made discover_nginx_sites() async
- Made get_nginx_config_from_systemd() async
- Fixed RwLock guard Send issues by using scoped locks

**Remaining external commands:**
- smartctl (disk.rs) - No Rust alternative for SMART data
- sudo du (systemd.rs) - Directory size measurement
- nginx -T (systemd.rs) - Nginx config fallback
- timeout hostname (nixos.rs) - Rare fallback only

Version bump: 0.1.197 → 0.1.198
2025-11-28 11:46:28 +01:00
5 changed files with 148 additions and 122 deletions

6
Cargo.lock generated
View File

@@ -493,7 +493,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
[[package]]
name = "cm-dashboard"
version = "0.1.197"
version = "0.1.199"
dependencies = [
"anyhow",
"chrono",
@@ -515,7 +515,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-agent"
version = "0.1.197"
version = "0.1.199"
dependencies = [
"anyhow",
"async-trait",
@@ -545,7 +545,7 @@ dependencies = [
[[package]]
name = "cm-dashboard-shared"
version = "0.1.197"
version = "0.1.199"
dependencies = [
"chrono",
"serde",

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-agent"
version = "0.1.197"
version = "0.1.199"
edition = "2021"
[dependencies]

View File

@@ -97,7 +97,7 @@ impl SystemdCollector {
// Sub-service metrics for specific services (always include cached results)
if service_name.contains("nginx") && active_status == "active" {
let nginx_sites = self.get_nginx_site_metrics();
let nginx_sites = self.get_nginx_site_metrics().await;
for (site_name, latency_ms) in nginx_sites {
let site_status = if latency_ms >= 0.0 && latency_ms < self.config.nginx_latency_critical_ms {
"active"
@@ -231,69 +231,103 @@ impl SystemdCollector {
}
/// Get nginx site metrics, checking them if cache is expired (like old working version)
fn get_nginx_site_metrics(&self) -> Vec<(String, f32)> {
let mut state = self.state.write().unwrap();
// Check if we need to refresh nginx site metrics
let needs_refresh = match state.last_nginx_check_time {
None => true, // First time
async fn get_nginx_site_metrics(&self) -> Vec<(String, f32)> {
// Check if we need to refresh (read lock)
let needs_refresh = {
let state = self.state.read().unwrap();
match state.last_nginx_check_time {
None => true,
Some(last_time) => {
let elapsed = last_time.elapsed().as_secs();
elapsed >= state.nginx_check_interval_seconds
}
}
};
if needs_refresh {
// Only check nginx sites if nginx service is active
if state.monitored_services.iter().any(|s| s.contains("nginx")) {
let fresh_metrics = self.get_nginx_sites_internal();
// Check if nginx is active (read lock)
let has_nginx = {
let state = self.state.read().unwrap();
state.monitored_services.iter().any(|s| s.contains("nginx"))
};
if has_nginx {
let fresh_metrics = self.get_nginx_sites_internal().await;
let mut state = self.state.write().unwrap();
state.nginx_site_metrics = fresh_metrics;
state.last_nginx_check_time = Some(Instant::now());
}
}
let state = self.state.read().unwrap();
state.nginx_site_metrics.clone()
}
/// Auto-discover interesting services to monitor using D-Bus
/// Auto-discover interesting services to monitor using systemctl
async fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
// Connect to system D-Bus
let connection = Connection::system().await?;
// First: Get all service unit files (with 3 second timeout)
let unit_files_output = Command::new("timeout")
.args(&["3", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
.output()?;
// Get systemd manager proxy
let proxy = zbus::Proxy::new(
&connection,
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
"org.freedesktop.systemd1.Manager",
).await?;
if !unit_files_output.status.success() {
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
}
// List all units via D-Bus
let units: Vec<(String, String, String, String, String, String, zbus::zvariant::OwnedObjectPath, u32, String, zbus::zvariant::OwnedObjectPath)> =
proxy.call("ListUnits", &()).await?;
// Second: Get runtime status of all units (with 3 second timeout)
let units_status_output = Command::new("timeout")
.args(&["3", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
.output()?;
if !units_status_output.status.success() {
return Err(anyhow::anyhow!("systemctl list-units command failed"));
}
let unit_files_str = String::from_utf8(unit_files_output.stdout)?;
let units_status_str = String::from_utf8(units_status_output.stdout)?;
let mut services = Vec::new();
let excluded_services = &self.config.excluded_services;
let service_name_filters = &self.config.service_name_filters;
// Parse all service unit files
let mut all_service_names = std::collections::HashSet::new();
let mut service_status_cache = std::collections::HashMap::new();
// Parse D-Bus response for services only
for unit in units {
let (unit_name, _description, load_state, active_state, sub_state, _followed, _unit_path, _job_id, _job_type, _job_path) = unit;
if unit_name.ends_with(".service") {
let service_name = unit_name.trim_end_matches(".service");
for line in unit_files_str.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 2 && fields[0].ends_with(".service") {
let service_name = fields[0].trim_end_matches(".service");
all_service_names.insert(service_name.to_string());
}
}
service_status_cache.insert(service_name.to_string(), ServiceStatusInfo {
load_state: load_state.clone(),
active_state: active_state.clone(),
sub_state: sub_state.clone(),
// Parse runtime status for all units
let mut status_cache = std::collections::HashMap::new();
for line in units_status_str.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() >= 4 && fields[0].ends_with(".service") {
let service_name = fields[0].trim_end_matches(".service");
let load_state = fields.get(1).unwrap_or(&"unknown").to_string();
let active_state = fields.get(2).unwrap_or(&"unknown").to_string();
let sub_state = fields.get(3).unwrap_or(&"unknown").to_string();
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
load_state,
active_state,
sub_state,
});
}
}
let mut services = Vec::new();
let excluded_services = &self.config.excluded_services;
let service_name_filters = &self.config.service_name_filters;
// For services found in unit files but not in runtime status, set default inactive status
for service_name in &all_service_names {
if !status_cache.contains_key(service_name) {
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
load_state: "not-loaded".to_string(),
active_state: "inactive".to_string(),
sub_state: "dead".to_string(),
});
}
}
// Process all discovered services and apply filters
for service_name in &all_service_names {
@@ -319,12 +353,12 @@ impl SystemdCollector {
}
}
Ok((services, service_status_cache))
Ok((services, status_cache))
}
/// Get service status from cache (if available) or fallback to systemctl
/// Get service status from D-Bus cache
fn get_service_status(&self, service: &str) -> Result<(String, String)> {
// Try to get status from cache first
// Get status from D-Bus cache (populated by discover_services_internal)
if let Ok(state) = self.state.read() {
if let Some(cached_info) = state.service_status_cache.get(service) {
let active_status = cached_info.active_state.clone();
@@ -338,20 +372,45 @@ impl SystemdCollector {
}
}
// Fallback to systemctl if not in cache (with 2 second timeout)
let output = Command::new("timeout")
.args(&["2", "systemctl", "is-active", &format!("{}.service", service)])
.output()?;
// Service not found in D-Bus cache - treat as inactive
Ok(("inactive".to_string(), "LoadState=not-found\nActiveState=inactive\nSubState=dead".to_string()))
}
let active_status = String::from_utf8(output.stdout)?.trim().to_string();
/// Get a unit property via D-Bus
async fn get_unit_property(&self, service_name: &str, property: &str) -> Option<zbus::zvariant::OwnedValue> {
// Connect to system D-Bus
let connection = Connection::system().await.ok()?;
// Get more detailed info (with 2 second timeout)
let output = Command::new("timeout")
.args(&["2", "systemctl", "show", &format!("{}.service", service), "--property=LoadState,ActiveState,SubState"])
.output()?;
// Get systemd manager proxy
let manager_proxy = zbus::Proxy::new(
&connection,
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
"org.freedesktop.systemd1.Manager",
).await.ok()?;
let detailed_info = String::from_utf8(output.stdout)?;
Ok((active_status, detailed_info))
// Get unit path for service
let unit_name = format!("{}.service", service_name);
let unit_path: zbus::zvariant::OwnedObjectPath = manager_proxy
.call("GetUnit", &(unit_name,))
.await
.ok()?;
// Get property using standard D-Bus Properties interface
let prop_proxy = zbus::Proxy::new(
&connection,
"org.freedesktop.systemd1",
unit_path.as_str(),
"org.freedesktop.DBus.Properties",
).await.ok()?;
// Try Service interface first, fallback to Unit interface
// Get returns a Variant, we need to extract the inner value
if let Ok(variant) = prop_proxy.call("Get", &("org.freedesktop.systemd1.Service", property)).await {
return Some(variant);
}
prop_proxy.call("Get", &("org.freedesktop.systemd1.Unit", property)).await.ok()
}
/// Check if service name matches pattern (supports wildcards like nginx*)
@@ -407,21 +466,12 @@ impl SystemdCollector {
return Ok(0.0);
}
// No configured path - try to get WorkingDirectory from systemctl (with 2 second timeout)
let output = Command::new("timeout")
.args(&["2", "systemctl", "show", &format!("{}.service", service_name), "--property=WorkingDirectory"])
.output()
.map_err(|e| CollectorError::SystemRead {
path: format!("WorkingDirectory for {}", service_name),
error: e.to_string(),
})?;
let output_str = String::from_utf8_lossy(&output.stdout);
for line in output_str.lines() {
if line.starts_with("WorkingDirectory=") && !line.contains("[not set]") {
let dir = line.strip_prefix("WorkingDirectory=").unwrap_or("");
if !dir.is_empty() && dir != "/" {
return Ok(self.get_directory_size(dir).await.unwrap_or(0.0));
// No configured path - try to get WorkingDirectory from D-Bus
if let Some(value) = self.get_unit_property(service_name, "WorkingDirectory").await {
// WorkingDirectory is a string property - try to extract as string
if let Ok(dir_str) = <String>::try_from(value) {
if !dir_str.is_empty() && dir_str != "/" && dir_str != "[not set]" {
return Ok(self.get_directory_size(&dir_str).await.unwrap_or(0.0));
}
}
}
@@ -484,29 +534,15 @@ impl SystemdCollector {
}
}
/// Get memory usage for a specific service
/// Get memory usage for a specific service via D-Bus
async fn get_service_memory_usage(&self, service_name: &str) -> Result<f32, CollectorError> {
let output = Command::new("systemctl")
.args(&["show", &format!("{}.service", service_name), "--property=MemoryCurrent"])
.output()
.map_err(|e| CollectorError::SystemRead {
path: format!("memory usage for {}", service_name),
error: e.to_string(),
})?;
let output_str = String::from_utf8_lossy(&output.stdout);
for line in output_str.lines() {
if line.starts_with("MemoryCurrent=") {
if let Some(mem_str) = line.strip_prefix("MemoryCurrent=") {
if mem_str != "[not set]" {
if let Ok(memory_bytes) = mem_str.parse::<u64>() {
// Get MemoryCurrent property from D-Bus
if let Some(value) = self.get_unit_property(service_name, "MemoryCurrent").await {
// MemoryCurrent is a u64 property (bytes) - try to extract
if let Ok(memory_bytes) = <u64>::try_from(value) {
return Ok(memory_bytes as f32 / (1024.0 * 1024.0)); // Convert to MB
}
}
}
}
}
Ok(0.0)
}
@@ -535,11 +571,11 @@ impl SystemdCollector {
}
/// Get nginx sites with latency checks (internal - no caching)
fn get_nginx_sites_internal(&self) -> Vec<(String, f32)> {
async fn get_nginx_sites_internal(&self) -> Vec<(String, f32)> {
let mut sites = Vec::new();
// Discover nginx sites from configuration
let discovered_sites = self.discover_nginx_sites();
let discovered_sites = self.discover_nginx_sites().await;
// Always add all discovered sites, even if checks fail (like old version)
for (site_name, url) in &discovered_sites {
@@ -558,9 +594,9 @@ impl SystemdCollector {
}
/// Discover nginx sites from configuration
fn discover_nginx_sites(&self) -> Vec<(String, String)> {
async fn discover_nginx_sites(&self) -> Vec<(String, String)> {
// Use the same approach as the old working agent: get nginx config from systemd
let config_content = match self.get_nginx_config_from_systemd() {
let config_content = match self.get_nginx_config_from_systemd().await {
Some(content) => content,
None => {
debug!("Could not get nginx config from systemd, trying nginx -T fallback");
@@ -593,31 +629,21 @@ impl SystemdCollector {
Some(String::from_utf8_lossy(&output.stdout).to_string())
}
/// Get nginx config from systemd service definition (NixOS compatible)
fn get_nginx_config_from_systemd(&self) -> Option<String> {
let output = Command::new("systemctl")
.args(&["show", "nginx", "--property=ExecStart", "--no-pager"])
.output()
.ok()?;
/// Get nginx config from systemd service definition via D-Bus (NixOS compatible)
async fn get_nginx_config_from_systemd(&self) -> Option<String> {
// Get ExecStart property from D-Bus
let value = self.get_unit_property("nginx", "ExecStart").await?;
if !output.status.success() {
debug!("Failed to get nginx ExecStart from systemd");
return None;
}
// ExecStart is a complex structure: array of (path, args, unclean_exit)
// For our purposes, we need to extract the command line
let exec_start_str = format!("{:?}", value);
debug!("nginx ExecStart from D-Bus: {}", exec_start_str);
let stdout = String::from_utf8_lossy(&output.stdout);
debug!("systemctl show nginx output: {}", stdout);
// Parse ExecStart to extract -c config path
for line in stdout.lines() {
if line.starts_with("ExecStart=") {
debug!("Found ExecStart line: {}", line);
if let Some(config_path) = self.extract_config_path_from_exec_start(line) {
// Extract config path from ExecStart structure
if let Some(config_path) = self.extract_config_path_from_exec_start(&exec_start_str) {
debug!("Extracted config path: {}", config_path);
return std::fs::read_to_string(&config_path).ok();
}
}
}
None
}

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard"
version = "0.1.197"
version = "0.1.199"
edition = "2021"
[dependencies]

View File

@@ -1,6 +1,6 @@
[package]
name = "cm-dashboard-shared"
version = "0.1.197"
version = "0.1.199"
edition = "2021"
[dependencies]