All checks were successful
Build and Release / build-and-release (push) Successful in 1m50s
Skip the first line in tailscale status output which is always the current host showing as idle. Add additional hostname check to prevent showing the current host in the peer list. Only display actual remote peers with their connection methods.
1235 lines
47 KiB
Rust
1235 lines
47 KiB
Rust
use anyhow::Result;
|
|
use async_trait::async_trait;
|
|
use cm_dashboard_shared::{AgentData, ServiceData, SubServiceData, SubServiceMetric, Status};
|
|
use std::process::Command;
|
|
use std::sync::RwLock;
|
|
use std::time::Instant;
|
|
use tracing::{debug, info};
|
|
|
|
use super::{Collector, CollectorError};
|
|
use crate::config::SystemdConfig;
|
|
|
|
/// Systemd collector for monitoring systemd services with structured data output
|
|
pub struct SystemdCollector {
|
|
/// Cached state with thread-safe interior mutability
|
|
state: RwLock<ServiceCacheState>,
|
|
/// Configuration for service monitoring
|
|
config: SystemdConfig,
|
|
}
|
|
|
|
/// Internal state for service caching
|
|
#[derive(Debug, Clone)]
|
|
struct ServiceCacheState {
|
|
/// Last collection time for performance tracking
|
|
last_collection: Option<Instant>,
|
|
/// Cached complete service data with sub-services
|
|
cached_service_data: Vec<ServiceData>,
|
|
/// Interesting services to monitor (cached after discovery)
|
|
monitored_services: Vec<String>,
|
|
/// Cached service status information from discovery
|
|
service_status_cache: std::collections::HashMap<String, ServiceStatusInfo>,
|
|
/// Last time services were discovered
|
|
last_discovery_time: Option<Instant>,
|
|
/// How often to rediscover services (from config)
|
|
discovery_interval_seconds: u64,
|
|
/// Cached nginx site latency metrics
|
|
nginx_site_metrics: Vec<(String, f32)>,
|
|
/// Last time nginx sites were checked
|
|
last_nginx_check_time: Option<Instant>,
|
|
/// How often to check nginx site latency (configurable)
|
|
nginx_check_interval_seconds: u64,
|
|
}
|
|
|
|
/// Cached service status information from systemctl list-units
|
|
#[derive(Debug, Clone)]
|
|
struct ServiceStatusInfo {
|
|
active_state: String,
|
|
memory_bytes: Option<u64>,
|
|
restart_count: Option<u32>,
|
|
start_timestamp: Option<u64>,
|
|
}
|
|
|
|
impl SystemdCollector {
|
|
pub fn new(config: SystemdConfig) -> Self {
|
|
let state = ServiceCacheState {
|
|
last_collection: None,
|
|
cached_service_data: Vec::new(),
|
|
monitored_services: Vec::new(),
|
|
service_status_cache: std::collections::HashMap::new(),
|
|
last_discovery_time: None,
|
|
discovery_interval_seconds: config.interval_seconds,
|
|
nginx_site_metrics: Vec::new(),
|
|
last_nginx_check_time: None,
|
|
nginx_check_interval_seconds: config.nginx_check_interval_seconds,
|
|
};
|
|
|
|
Self {
|
|
state: RwLock::new(state),
|
|
config,
|
|
}
|
|
}
|
|
|
|
/// Collect service data and populate AgentData
|
|
async fn collect_service_data(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
let start_time = Instant::now();
|
|
debug!("Collecting systemd services metrics");
|
|
|
|
// Get cached services (discovery only happens when needed)
|
|
let monitored_services = match self.get_monitored_services() {
|
|
Ok(services) => services,
|
|
Err(e) => {
|
|
debug!("Failed to get monitored services: {}", e);
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
// Collect service data for each monitored service
|
|
let mut complete_service_data = Vec::new();
|
|
for service_name in &monitored_services {
|
|
match self.get_service_status(service_name) {
|
|
Ok(status_info) => {
|
|
let mut sub_services = Vec::new();
|
|
|
|
// Calculate uptime if we have start timestamp
|
|
let uptime_seconds = status_info.start_timestamp.and_then(|start| {
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.ok()?
|
|
.as_secs();
|
|
Some(now.saturating_sub(start))
|
|
});
|
|
|
|
// Sub-service metrics for specific services (always include cached results)
|
|
if service_name.contains("nginx") && status_info.active_state == "active" {
|
|
let nginx_sites = self.get_nginx_site_metrics();
|
|
for (site_name, latency_ms) in nginx_sites {
|
|
let site_status = if latency_ms >= 0.0 && latency_ms < self.config.nginx_latency_critical_ms {
|
|
"active"
|
|
} else {
|
|
"failed"
|
|
};
|
|
|
|
let mut metrics = Vec::new();
|
|
metrics.push(SubServiceMetric {
|
|
label: "latency_ms".to_string(),
|
|
value: latency_ms,
|
|
unit: Some("ms".to_string()),
|
|
});
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: site_name.clone(),
|
|
service_status: self.calculate_service_status(&site_name, &site_status),
|
|
metrics,
|
|
service_type: "nginx_site".to_string(),
|
|
});
|
|
}
|
|
}
|
|
|
|
if service_name.contains("docker") && status_info.active_state == "active" {
|
|
let docker_containers = self.get_docker_containers();
|
|
for (container_name, container_status) in docker_containers {
|
|
// For now, docker containers have no additional metrics
|
|
// Future: could add memory_mb, cpu_percent, restart_count, etc.
|
|
let metrics = Vec::new();
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: container_name.clone(),
|
|
service_status: self.calculate_service_status(&container_name, &container_status),
|
|
metrics,
|
|
service_type: "container".to_string(),
|
|
});
|
|
}
|
|
|
|
// Add Docker images
|
|
let docker_images = self.get_docker_images();
|
|
for (image_name, _image_status, image_size_mb) in docker_images {
|
|
let metrics = Vec::new();
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: format!("{} size: {:.1} MB", image_name, image_size_mb),
|
|
service_status: Status::Info, // Informational only, no status icon
|
|
metrics,
|
|
service_type: "image".to_string(),
|
|
});
|
|
}
|
|
}
|
|
|
|
if service_name == "openvpn-vpn-download" && status_info.active_state == "active" {
|
|
// Add VPN route
|
|
if let Some(external_ip) = self.get_vpn_external_ip() {
|
|
let metrics = Vec::new();
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: format!("route: {}", external_ip),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "vpn_route".to_string(),
|
|
});
|
|
}
|
|
|
|
// Add torrent stats
|
|
if let Some((active_count, download_mbps, upload_mbps)) = self.get_qbittorrent_stats() {
|
|
let metrics = Vec::new();
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: format!("{} active, ↓ {:.1} MB/s, ↑ {:.1} MB/s", active_count, download_mbps, upload_mbps),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "torrent_stats".to_string(),
|
|
});
|
|
}
|
|
|
|
// Add active torrent copy status for each copy operation
|
|
for torrent_name in self.get_active_torrent_copies() {
|
|
let metrics = Vec::new();
|
|
|
|
sub_services.push(SubServiceData {
|
|
name: format!("Copy: {}", torrent_name),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "torrent_copy".to_string(),
|
|
});
|
|
}
|
|
}
|
|
|
|
if service_name == "nftables" && status_info.active_state == "active" {
|
|
let (tcp_ports, udp_ports) = self.get_nftables_open_ports();
|
|
|
|
if !tcp_ports.is_empty() {
|
|
let metrics = Vec::new();
|
|
sub_services.push(SubServiceData {
|
|
name: format!("wan tcp: {}", tcp_ports),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "firewall_port".to_string(),
|
|
});
|
|
}
|
|
|
|
if !udp_ports.is_empty() {
|
|
let metrics = Vec::new();
|
|
sub_services.push(SubServiceData {
|
|
name: format!("wan udp: {}", udp_ports),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "firewall_port".to_string(),
|
|
});
|
|
}
|
|
}
|
|
|
|
if service_name == "tailscaled" && status_info.active_state == "active" {
|
|
// Add Tailscale peers with their connection methods as sub-services
|
|
let peers = self.get_tailscale_peers();
|
|
for (peer_name, conn_method) in peers {
|
|
let metrics = Vec::new();
|
|
sub_services.push(SubServiceData {
|
|
name: format!("{}: {}", peer_name, conn_method),
|
|
service_status: Status::Info,
|
|
metrics,
|
|
service_type: "tailscale_peer".to_string(),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Create complete service data
|
|
let service_data = ServiceData {
|
|
name: service_name.clone(),
|
|
user_stopped: false, // TODO: Integrate with service tracker
|
|
service_status: self.calculate_service_status(service_name, &status_info.active_state),
|
|
sub_services,
|
|
memory_bytes: status_info.memory_bytes,
|
|
restart_count: status_info.restart_count,
|
|
uptime_seconds,
|
|
};
|
|
|
|
// Add to AgentData and cache
|
|
agent_data.services.push(service_data.clone());
|
|
complete_service_data.push(service_data);
|
|
}
|
|
Err(e) => {
|
|
debug!("Failed to get status for service {}: {}", service_name, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort services alphabetically by name
|
|
agent_data.services.sort_by(|a, b| a.name.cmp(&b.name));
|
|
complete_service_data.sort_by(|a, b| a.name.cmp(&b.name));
|
|
|
|
// Update cached state
|
|
{
|
|
let mut state = self.state.write().unwrap();
|
|
state.last_collection = Some(start_time);
|
|
state.cached_service_data = complete_service_data;
|
|
}
|
|
|
|
let elapsed = start_time.elapsed();
|
|
debug!("Systemd collection completed in {:?} with {} services", elapsed, agent_data.services.len());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get monitored services, discovering them if needed or cache is expired
|
|
fn get_monitored_services(&self) -> Result<Vec<String>> {
|
|
// Check if we need discovery without holding the lock
|
|
let needs_discovery = {
|
|
let state = self.state.read().unwrap();
|
|
match state.last_discovery_time {
|
|
None => true, // First time
|
|
Some(last_time) => {
|
|
let elapsed = last_time.elapsed().as_secs();
|
|
elapsed >= state.discovery_interval_seconds
|
|
}
|
|
}
|
|
};
|
|
|
|
if needs_discovery {
|
|
debug!("Discovering systemd services (cache expired or first run)");
|
|
match self.discover_services_internal() {
|
|
Ok((services, status_cache)) => {
|
|
if let Ok(mut state) = self.state.write() {
|
|
state.monitored_services = services.clone();
|
|
state.service_status_cache = status_cache;
|
|
state.last_discovery_time = Some(Instant::now());
|
|
debug!("Auto-discovered {} services to monitor: {:?}",
|
|
state.monitored_services.len(), state.monitored_services);
|
|
return Ok(services);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
debug!("Failed to discover services, using cached list: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Return cached services
|
|
let state = self.state.read().unwrap();
|
|
Ok(state.monitored_services.clone())
|
|
}
|
|
|
|
/// Get nginx site metrics, checking them if cache is expired (like old working version)
|
|
fn get_nginx_site_metrics(&self) -> Vec<(String, f32)> {
|
|
let mut state = self.state.write().unwrap();
|
|
|
|
// Check if we need to refresh nginx site metrics
|
|
let needs_refresh = match state.last_nginx_check_time {
|
|
None => true, // First time
|
|
Some(last_time) => {
|
|
let elapsed = last_time.elapsed().as_secs();
|
|
elapsed >= state.nginx_check_interval_seconds
|
|
}
|
|
};
|
|
|
|
if needs_refresh {
|
|
// Only check nginx sites if nginx service is active
|
|
if state.monitored_services.iter().any(|s| s.contains("nginx")) {
|
|
let fresh_metrics = self.get_nginx_sites_internal();
|
|
state.nginx_site_metrics = fresh_metrics;
|
|
state.last_nginx_check_time = Some(Instant::now());
|
|
}
|
|
}
|
|
|
|
state.nginx_site_metrics.clone()
|
|
}
|
|
|
|
/// Auto-discover interesting services to monitor
|
|
fn discover_services_internal(&self) -> Result<(Vec<String>, std::collections::HashMap<String, ServiceStatusInfo>)> {
|
|
// First: Get all service unit files (with 3 second timeout)
|
|
let unit_files_output = Command::new("timeout")
|
|
.args(&["3", "systemctl", "list-unit-files", "--type=service", "--no-pager", "--plain"])
|
|
.output()?;
|
|
|
|
if !unit_files_output.status.success() {
|
|
return Err(anyhow::anyhow!("systemctl list-unit-files command failed"));
|
|
}
|
|
|
|
// Second: Get runtime status of all units (with 3 second timeout)
|
|
let units_status_output = Command::new("timeout")
|
|
.args(&["3", "systemctl", "list-units", "--type=service", "--all", "--no-pager", "--plain"])
|
|
.output()?;
|
|
|
|
if !units_status_output.status.success() {
|
|
return Err(anyhow::anyhow!("systemctl list-units command failed"));
|
|
}
|
|
|
|
let unit_files_str = String::from_utf8(unit_files_output.stdout)?;
|
|
let units_status_str = String::from_utf8(units_status_output.stdout)?;
|
|
let mut services = Vec::new();
|
|
|
|
let excluded_services = &self.config.excluded_services;
|
|
let service_name_filters = &self.config.service_name_filters;
|
|
|
|
// Parse all service unit files
|
|
let mut all_service_names = std::collections::HashSet::new();
|
|
for line in unit_files_str.lines() {
|
|
let fields: Vec<&str> = line.split_whitespace().collect();
|
|
if fields.len() >= 2 && fields[0].ends_with(".service") {
|
|
let service_name = fields[0].trim_end_matches(".service");
|
|
all_service_names.insert(service_name.to_string());
|
|
}
|
|
}
|
|
|
|
// Parse runtime status for all units
|
|
let mut status_cache = std::collections::HashMap::new();
|
|
for line in units_status_str.lines() {
|
|
let fields: Vec<&str> = line.split_whitespace().collect();
|
|
if fields.len() >= 4 && fields[0].ends_with(".service") {
|
|
let service_name = fields[0].trim_end_matches(".service");
|
|
let active_state = fields.get(2).unwrap_or(&"unknown").to_string();
|
|
|
|
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
|
active_state,
|
|
memory_bytes: None,
|
|
restart_count: None,
|
|
start_timestamp: None,
|
|
});
|
|
}
|
|
}
|
|
|
|
// For services found in unit files but not in runtime status, set default inactive status
|
|
for service_name in &all_service_names {
|
|
if !status_cache.contains_key(service_name) {
|
|
status_cache.insert(service_name.to_string(), ServiceStatusInfo {
|
|
active_state: "inactive".to_string(),
|
|
memory_bytes: None,
|
|
restart_count: None,
|
|
start_timestamp: None,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Process all discovered services and apply filters
|
|
for service_name in &all_service_names {
|
|
// Skip excluded services first
|
|
let mut is_excluded = false;
|
|
for excluded in excluded_services {
|
|
if service_name.contains(excluded) {
|
|
is_excluded = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if is_excluded {
|
|
continue;
|
|
}
|
|
|
|
// Check if this service matches our filter patterns (supports wildcards)
|
|
for pattern in service_name_filters {
|
|
if self.matches_pattern(service_name, pattern) {
|
|
services.push(service_name.to_string());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok((services, status_cache))
|
|
}
|
|
|
|
/// Get service status with detailed metrics from systemctl
|
|
fn get_service_status(&self, service: &str) -> Result<ServiceStatusInfo> {
|
|
// Always fetch fresh data to get detailed metrics (memory, restarts, uptime)
|
|
// Note: Cache in service_status_cache only has basic active_state from discovery,
|
|
// with all detailed metrics set to None. We need fresh systemctl show data.
|
|
|
|
let output = Command::new("timeout")
|
|
.args(&[
|
|
"2",
|
|
"systemctl",
|
|
"show",
|
|
&format!("{}.service", service),
|
|
"--property=LoadState,ActiveState,SubState,MemoryCurrent,NRestarts,ExecMainStartTimestamp"
|
|
])
|
|
.output()?;
|
|
|
|
let output_str = String::from_utf8(output.stdout)?;
|
|
|
|
// Parse properties
|
|
let mut active_state = String::new();
|
|
let mut memory_bytes = None;
|
|
let mut restart_count = None;
|
|
let mut start_timestamp = None;
|
|
|
|
for line in output_str.lines() {
|
|
if let Some(value) = line.strip_prefix("ActiveState=") {
|
|
active_state = value.to_string();
|
|
} else if let Some(value) = line.strip_prefix("MemoryCurrent=") {
|
|
if value != "[not set]" {
|
|
memory_bytes = value.parse().ok();
|
|
}
|
|
} else if let Some(value) = line.strip_prefix("NRestarts=") {
|
|
restart_count = value.parse().ok();
|
|
} else if let Some(value) = line.strip_prefix("ExecMainStartTimestamp=") {
|
|
if value != "[not set]" && !value.is_empty() {
|
|
// Parse timestamp to seconds since epoch
|
|
if let Ok(output) = Command::new("date")
|
|
.args(&["+%s", "-d", value])
|
|
.output()
|
|
{
|
|
if let Ok(timestamp_str) = String::from_utf8(output.stdout) {
|
|
start_timestamp = timestamp_str.trim().parse().ok();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(ServiceStatusInfo {
|
|
active_state,
|
|
memory_bytes,
|
|
restart_count,
|
|
start_timestamp,
|
|
})
|
|
}
|
|
|
|
/// Check if service name matches pattern (supports wildcards like nginx*)
|
|
fn matches_pattern(&self, service_name: &str, pattern: &str) -> bool {
|
|
if pattern.contains('*') {
|
|
if pattern.ends_with('*') {
|
|
// Pattern like "nginx*" - match if service starts with "nginx"
|
|
let prefix = &pattern[..pattern.len() - 1];
|
|
service_name.starts_with(prefix)
|
|
} else if pattern.starts_with('*') {
|
|
// Pattern like "*backup" - match if service ends with "backup"
|
|
let suffix = &pattern[1..];
|
|
service_name.ends_with(suffix)
|
|
} else {
|
|
// Pattern like "nginx*backup" - simple glob matching
|
|
self.simple_glob_match(service_name, pattern)
|
|
}
|
|
} else {
|
|
// Exact match
|
|
service_name == pattern
|
|
}
|
|
}
|
|
|
|
/// Simple glob matching for patterns with * in the middle
|
|
fn simple_glob_match(&self, text: &str, pattern: &str) -> bool {
|
|
let parts: Vec<&str> = pattern.split('*').collect();
|
|
let mut pos = 0;
|
|
|
|
for part in parts {
|
|
if part.is_empty() {
|
|
continue;
|
|
}
|
|
if let Some(found_pos) = text[pos..].find(part) {
|
|
pos += found_pos + part.len();
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
true
|
|
}
|
|
|
|
/// Calculate service status, taking user-stopped services into account
|
|
fn calculate_service_status(&self, service_name: &str, active_status: &str) -> Status {
|
|
match active_status.to_lowercase().as_str() {
|
|
"active" => Status::Ok,
|
|
"inactive" | "dead" => {
|
|
debug!("Service '{}' is inactive - treating as Inactive status", service_name);
|
|
Status::Inactive
|
|
},
|
|
"failed" | "error" => Status::Critical,
|
|
"activating" | "deactivating" | "reloading" | "starting" | "stopping" => {
|
|
debug!("Service '{}' is transitioning - treating as Pending", service_name);
|
|
Status::Pending
|
|
},
|
|
_ => Status::Unknown,
|
|
}
|
|
}
|
|
|
|
/// Check if service collection cache should be updated
|
|
fn should_update_cache(&self) -> bool {
|
|
let state = self.state.read().unwrap();
|
|
|
|
match state.last_collection {
|
|
None => true,
|
|
Some(last) => {
|
|
let cache_duration = std::time::Duration::from_secs(30);
|
|
last.elapsed() > cache_duration
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Get cached complete service data with sub-services if available and fresh
|
|
fn get_cached_complete_services(&self) -> Option<Vec<ServiceData>> {
|
|
if !self.should_update_cache() {
|
|
let state = self.state.read().unwrap();
|
|
Some(state.cached_service_data.clone())
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Get nginx sites with latency checks (internal - no caching)
|
|
fn get_nginx_sites_internal(&self) -> Vec<(String, f32)> {
|
|
let mut sites = Vec::new();
|
|
|
|
// Discover nginx sites from configuration
|
|
let discovered_sites = self.discover_nginx_sites();
|
|
|
|
// Always add all discovered sites, even if checks fail (like old version)
|
|
for (site_name, url) in &discovered_sites {
|
|
match self.check_site_latency(url) {
|
|
Ok(latency_ms) => {
|
|
sites.push((site_name.clone(), latency_ms));
|
|
}
|
|
Err(_) => {
|
|
// Site is unreachable - use -1.0 to indicate error (like old version)
|
|
sites.push((site_name.clone(), -1.0));
|
|
}
|
|
}
|
|
}
|
|
|
|
sites
|
|
}
|
|
|
|
/// Discover nginx sites from configuration
|
|
fn discover_nginx_sites(&self) -> Vec<(String, String)> {
|
|
// Use the same approach as the old working agent: get nginx config from systemd
|
|
let config_content = match self.get_nginx_config_from_systemd() {
|
|
Some(content) => content,
|
|
None => {
|
|
debug!("Could not get nginx config from systemd, trying nginx -T fallback");
|
|
match self.get_nginx_config_via_command() {
|
|
Some(content) => content,
|
|
None => {
|
|
debug!("Could not get nginx config via any method");
|
|
return Vec::new();
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
// Parse the config content to extract sites
|
|
self.parse_nginx_config_for_sites(&config_content)
|
|
}
|
|
|
|
/// Fallback: get nginx config via nginx -T command
|
|
fn get_nginx_config_via_command(&self) -> Option<String> {
|
|
let output = Command::new("nginx")
|
|
.args(&["-T"])
|
|
.output()
|
|
.ok()?;
|
|
|
|
if !output.status.success() {
|
|
debug!("nginx -T failed");
|
|
return None;
|
|
}
|
|
|
|
Some(String::from_utf8_lossy(&output.stdout).to_string())
|
|
}
|
|
|
|
/// Get nginx config from systemd service definition (NixOS compatible)
|
|
fn get_nginx_config_from_systemd(&self) -> Option<String> {
|
|
let output = Command::new("systemctl")
|
|
.args(&["show", "nginx", "--property=ExecStart", "--no-pager"])
|
|
.output()
|
|
.ok()?;
|
|
|
|
if !output.status.success() {
|
|
debug!("Failed to get nginx ExecStart from systemd");
|
|
return None;
|
|
}
|
|
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
debug!("systemctl show nginx output: {}", stdout);
|
|
|
|
// Parse ExecStart to extract -c config path
|
|
for line in stdout.lines() {
|
|
if line.starts_with("ExecStart=") {
|
|
debug!("Found ExecStart line: {}", line);
|
|
if let Some(config_path) = self.extract_config_path_from_exec_start(line) {
|
|
debug!("Extracted config path: {}", config_path);
|
|
return std::fs::read_to_string(&config_path).ok();
|
|
}
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Extract config path from ExecStart line
|
|
fn extract_config_path_from_exec_start(&self, exec_start: &str) -> Option<String> {
|
|
// Remove ExecStart= prefix
|
|
let exec_part = exec_start.strip_prefix("ExecStart=")?;
|
|
debug!("Parsing exec part: {}", exec_part);
|
|
|
|
// Handle NixOS format: ExecStart={ path=...; argv[]=...nginx -c /config; ... }
|
|
if exec_part.contains("argv[]=") {
|
|
// Extract the part after argv[]=
|
|
let argv_start = exec_part.find("argv[]=")?;
|
|
let argv_part = &exec_part[argv_start + 7..]; // Skip "argv[]="
|
|
debug!("Found NixOS argv part: {}", argv_part);
|
|
|
|
// Look for -c flag followed by config path
|
|
if let Some(c_pos) = argv_part.find(" -c ") {
|
|
let after_c = &argv_part[c_pos + 4..];
|
|
// Find the config path (until next space or semicolon)
|
|
let config_path = after_c.split([' ', ';']).next()?;
|
|
return Some(config_path.to_string());
|
|
}
|
|
} else {
|
|
// Handle traditional format: ExecStart=/path/nginx -c /config
|
|
debug!("Parsing traditional format");
|
|
if let Some(c_pos) = exec_part.find(" -c ") {
|
|
let after_c = &exec_part[c_pos + 4..];
|
|
let config_path = after_c.split_whitespace().next()?;
|
|
return Some(config_path.to_string());
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Parse nginx config content to extract server names and build site list
|
|
fn parse_nginx_config_for_sites(&self, config_content: &str) -> Vec<(String, String)> {
|
|
let mut sites = Vec::new();
|
|
let lines: Vec<&str> = config_content.lines().collect();
|
|
let mut i = 0;
|
|
|
|
debug!("Parsing nginx config with {} lines", lines.len());
|
|
|
|
while i < lines.len() {
|
|
let line = lines[i].trim();
|
|
if line.starts_with("server") && line.contains("{") {
|
|
if let Some(server_name) = self.parse_server_block(&lines, &mut i) {
|
|
let url = format!("https://{}", server_name);
|
|
sites.push((server_name.clone(), url));
|
|
}
|
|
}
|
|
i += 1;
|
|
}
|
|
|
|
debug!("Discovered {} nginx sites total", sites.len());
|
|
sites
|
|
}
|
|
|
|
/// Parse a server block to extract the primary server_name
|
|
fn parse_server_block(&self, lines: &[&str], start_index: &mut usize) -> Option<String> {
|
|
let mut server_names = Vec::new();
|
|
let mut has_redirect = false;
|
|
let mut i = *start_index + 1;
|
|
let mut brace_count = 1;
|
|
|
|
// Parse until we close the server block
|
|
while i < lines.len() && brace_count > 0 {
|
|
let trimmed = lines[i].trim();
|
|
|
|
// Track braces
|
|
brace_count += trimmed.matches('{').count();
|
|
brace_count -= trimmed.matches('}').count();
|
|
|
|
// Extract server_name
|
|
if trimmed.starts_with("server_name") {
|
|
if let Some(names_part) = trimmed.strip_prefix("server_name") {
|
|
let names_clean = names_part.trim().trim_end_matches(';');
|
|
for name in names_clean.split_whitespace() {
|
|
if name != "_"
|
|
&& !name.is_empty()
|
|
&& name.contains('.')
|
|
&& !name.starts_with('$')
|
|
{
|
|
server_names.push(name.to_string());
|
|
debug!("Found server_name in block: {}", name);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check for redirects (skip redirect-only servers)
|
|
if trimmed.contains("return") && (trimmed.contains("301") || trimmed.contains("302")) {
|
|
has_redirect = true;
|
|
}
|
|
|
|
i += 1;
|
|
}
|
|
|
|
*start_index = i - 1;
|
|
|
|
if !server_names.is_empty() && !has_redirect {
|
|
return Some(server_names[0].clone());
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Check site latency using HTTP GET requests
|
|
fn check_site_latency(&self, url: &str) -> Result<f32, Box<dyn std::error::Error>> {
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
|
|
let start = Instant::now();
|
|
|
|
// Create HTTP client with timeouts from configuration
|
|
let client = reqwest::blocking::Client::builder()
|
|
.timeout(Duration::from_secs(self.config.http_timeout_seconds))
|
|
.connect_timeout(Duration::from_secs(self.config.http_connect_timeout_seconds))
|
|
.redirect(reqwest::redirect::Policy::limited(10))
|
|
.build()?;
|
|
|
|
// Make GET request and measure latency
|
|
let response = client.get(url).send()?;
|
|
let latency = start.elapsed().as_millis() as f32;
|
|
|
|
// Check if response is successful (2xx or 3xx status codes)
|
|
if response.status().is_success() || response.status().is_redirection() {
|
|
Ok(latency)
|
|
} else {
|
|
Err(format!(
|
|
"HTTP request failed for {} with status: {}",
|
|
url,
|
|
response.status()
|
|
)
|
|
.into())
|
|
}
|
|
}
|
|
|
|
/// Get docker containers as sub-services
|
|
fn get_docker_containers(&self) -> Vec<(String, String)> {
|
|
let mut containers = Vec::new();
|
|
|
|
// Check if docker is available (cm-agent user is in docker group)
|
|
// Use -a to show ALL containers (running and stopped) with 3 second timeout
|
|
let output = Command::new("timeout")
|
|
.args(&["3", "docker", "ps", "-a", "--format", "{{.Names}},{{.Status}}"])
|
|
.output();
|
|
|
|
let output = match output {
|
|
Ok(out) if out.status.success() => out,
|
|
_ => return containers, // Docker not available or failed
|
|
};
|
|
|
|
let output_str = match String::from_utf8(output.stdout) {
|
|
Ok(s) => s,
|
|
Err(_) => return containers,
|
|
};
|
|
|
|
for line in output_str.lines() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let parts: Vec<&str> = line.split(',').collect();
|
|
if parts.len() >= 2 {
|
|
let container_name = parts[0].trim();
|
|
let status_str = parts[1].trim();
|
|
|
|
let container_status = if status_str.contains("Up") {
|
|
"active"
|
|
} else if status_str.contains("Exited") || status_str.contains("Created") {
|
|
"inactive" // Stopped/created containers are inactive
|
|
} else {
|
|
"failed" // Other states (restarting, paused, dead) → failed
|
|
};
|
|
|
|
containers.push((format!("docker_{}", container_name), container_status.to_string()));
|
|
}
|
|
}
|
|
|
|
containers
|
|
}
|
|
|
|
/// Get docker images as sub-services
|
|
fn get_docker_images(&self) -> Vec<(String, String, f32)> {
|
|
let mut images = Vec::new();
|
|
// Check if docker is available (cm-agent user is in docker group) with 3 second timeout
|
|
let output = Command::new("timeout")
|
|
.args(&["3", "docker", "images", "--format", "{{.Repository}}:{{.Tag}},{{.Size}}"])
|
|
.output();
|
|
|
|
let output = match output {
|
|
Ok(out) if out.status.success() => out,
|
|
Ok(_) => {
|
|
return images;
|
|
}
|
|
Err(_) => {
|
|
return images;
|
|
}
|
|
};
|
|
|
|
let output_str = match String::from_utf8(output.stdout) {
|
|
Ok(s) => s,
|
|
Err(_) => return images,
|
|
};
|
|
|
|
for line in output_str.lines() {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let parts: Vec<&str> = line.split(',').collect();
|
|
if parts.len() >= 2 {
|
|
let image_name = parts[0].trim();
|
|
let size_str = parts[1].trim();
|
|
|
|
// Skip <none>:<none> images (dangling images)
|
|
if image_name.contains("<none>") {
|
|
continue;
|
|
}
|
|
|
|
// Parse size to MB (sizes come as "142MB", "1.5GB", "512kB", etc.)
|
|
let size_mb = self.parse_docker_size(size_str);
|
|
|
|
images.push((
|
|
image_name.to_string(),
|
|
"inactive".to_string(), // Images are informational - use inactive for neutral display
|
|
size_mb
|
|
));
|
|
}
|
|
}
|
|
|
|
images
|
|
}
|
|
|
|
/// Parse Docker size string to MB
|
|
fn parse_docker_size(&self, size_str: &str) -> f32 {
|
|
let size_upper = size_str.to_uppercase();
|
|
|
|
// Extract numeric part and unit
|
|
let mut num_str = String::new();
|
|
let mut unit = String::new();
|
|
|
|
for ch in size_upper.chars() {
|
|
if ch.is_ascii_digit() || ch == '.' {
|
|
num_str.push(ch);
|
|
} else if ch.is_alphabetic() {
|
|
unit.push(ch);
|
|
}
|
|
}
|
|
|
|
let value: f32 = num_str.parse().unwrap_or(0.0);
|
|
|
|
// Convert to MB
|
|
match unit.as_str() {
|
|
"KB" | "K" => value / 1024.0,
|
|
"MB" | "M" => value,
|
|
"GB" | "G" => value * 1024.0,
|
|
"TB" | "T" => value * 1024.0 * 1024.0,
|
|
_ => value, // Assume bytes if no unit
|
|
}
|
|
}
|
|
|
|
/// Get VPN external IP by querying through the vpn namespace
|
|
fn get_vpn_external_ip(&self) -> Option<String> {
|
|
let output = Command::new("timeout")
|
|
.args(&[
|
|
"5",
|
|
"sudo",
|
|
"ip",
|
|
"netns",
|
|
"exec",
|
|
"vpn",
|
|
"curl",
|
|
"-s",
|
|
"--max-time",
|
|
"4",
|
|
"https://ifconfig.me"
|
|
])
|
|
.output()
|
|
.ok()?;
|
|
|
|
if output.status.success() {
|
|
let ip = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
|
if !ip.is_empty() && ip.contains('.') {
|
|
return Some(ip);
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Get Tailscale connected peers with their connection methods
|
|
/// Returns a list of (device_name, connection_method) tuples
|
|
fn get_tailscale_peers(&self) -> Vec<(String, String)> {
|
|
match Command::new("timeout")
|
|
.args(["2", "tailscale", "status"])
|
|
.output()
|
|
{
|
|
Ok(output) if output.status.success() => {
|
|
let status_output = String::from_utf8_lossy(&output.stdout);
|
|
let mut peers = Vec::new();
|
|
|
|
// Get current hostname to filter it out
|
|
let current_hostname = gethostname::gethostname()
|
|
.to_string_lossy()
|
|
.to_string();
|
|
|
|
// Parse tailscale status output
|
|
// Format: IP hostname user os status
|
|
// Example: 100.110.98.3 wslbox cm@ linux active; direct 192.168.30.227:53757
|
|
// Note: First line is always the current host, skip it
|
|
for (idx, line) in status_output.lines().enumerate() {
|
|
if idx == 0 {
|
|
continue; // Skip first line (current host)
|
|
}
|
|
|
|
let parts: Vec<&str> = line.split_whitespace().collect();
|
|
if parts.len() < 5 {
|
|
continue; // Skip invalid lines
|
|
}
|
|
|
|
// parts[0] = IP
|
|
// parts[1] = hostname
|
|
// parts[2] = user
|
|
// parts[3] = OS
|
|
// parts[4+] = status (e.g., "active;", "direct", "192.168.30.227:53757" or "idle;" or "offline")
|
|
|
|
let hostname = parts[1];
|
|
|
|
// Skip if this is the current host (double-check in case format changes)
|
|
if hostname == current_hostname {
|
|
continue;
|
|
}
|
|
|
|
let status_parts = &parts[4..];
|
|
|
|
// Determine connection method from status
|
|
let connection_method = if status_parts.is_empty() {
|
|
continue; // Skip if no status
|
|
} else {
|
|
let status_str = status_parts.join(" ");
|
|
if status_str.contains("offline") {
|
|
continue; // Skip offline peers
|
|
} else if status_str.contains("direct") {
|
|
"direct"
|
|
} else if status_str.contains("relay") {
|
|
"relay"
|
|
} else if status_str.contains("idle") {
|
|
"idle"
|
|
} else if status_str.contains("active") {
|
|
"active"
|
|
} else {
|
|
continue; // Skip unknown status
|
|
}
|
|
};
|
|
|
|
peers.push((hostname.to_string(), connection_method.to_string()));
|
|
}
|
|
|
|
peers
|
|
}
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
/// Get nftables open ports grouped by protocol
|
|
/// Returns: (tcp_ports_string, udp_ports_string)
|
|
fn get_nftables_open_ports(&self) -> (String, String) {
|
|
let output = Command::new("sudo")
|
|
.args(&["/run/current-system/sw/bin/nft", "list", "ruleset"])
|
|
.output();
|
|
|
|
let output = match output {
|
|
Ok(out) if out.status.success() => out,
|
|
Ok(out) => {
|
|
info!("nft command failed with status: {:?}, stderr: {}",
|
|
out.status, String::from_utf8_lossy(&out.stderr));
|
|
return (String::new(), String::new());
|
|
}
|
|
Err(e) => {
|
|
info!("Failed to execute nft command: {}", e);
|
|
return (String::new(), String::new());
|
|
}
|
|
};
|
|
|
|
let output_str = match String::from_utf8(output.stdout) {
|
|
Ok(s) => s,
|
|
Err(_) => {
|
|
info!("Failed to parse nft output as UTF-8");
|
|
return (String::new(), String::new());
|
|
}
|
|
};
|
|
|
|
let mut tcp_ports = std::collections::HashSet::new();
|
|
let mut udp_ports = std::collections::HashSet::new();
|
|
|
|
// Parse nftables output for WAN incoming accept rules with dport
|
|
// Looking for patterns like: tcp dport 22 accept or tcp dport { 22, 80, 443 } accept
|
|
// Only include rules in input_wan chain
|
|
let mut in_wan_chain = false;
|
|
|
|
for line in output_str.lines() {
|
|
let line = line.trim();
|
|
|
|
// Track if we're in the input_wan chain
|
|
if line.contains("chain input_wan") {
|
|
in_wan_chain = true;
|
|
continue;
|
|
}
|
|
|
|
// Reset when exiting chain (closing brace) or entering other chains
|
|
if line == "}" || (line.starts_with("chain ") && !line.contains("input_wan")) {
|
|
in_wan_chain = false;
|
|
continue;
|
|
}
|
|
|
|
// Only process rules in input_wan chain
|
|
if !in_wan_chain {
|
|
continue;
|
|
}
|
|
|
|
// Skip if not an accept rule
|
|
if !line.contains("accept") {
|
|
continue;
|
|
}
|
|
|
|
// Parse TCP ports
|
|
if line.contains("tcp dport") {
|
|
for port in self.extract_ports_from_nft_rule(line) {
|
|
tcp_ports.insert(port);
|
|
}
|
|
}
|
|
|
|
// Parse UDP ports
|
|
if line.contains("udp dport") {
|
|
for port in self.extract_ports_from_nft_rule(line) {
|
|
udp_ports.insert(port);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort and format
|
|
let mut tcp_vec: Vec<u16> = tcp_ports.into_iter().collect();
|
|
let mut udp_vec: Vec<u16> = udp_ports.into_iter().collect();
|
|
tcp_vec.sort();
|
|
udp_vec.sort();
|
|
|
|
let tcp_str = tcp_vec.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", ");
|
|
let udp_str = udp_vec.iter().map(|p| p.to_string()).collect::<Vec<_>>().join(", ");
|
|
|
|
info!("nftables WAN ports - TCP: '{}', UDP: '{}'", tcp_str, udp_str);
|
|
|
|
(tcp_str, udp_str)
|
|
}
|
|
|
|
/// Extract port numbers from nftables rule line
|
|
/// Returns vector of ports (handles both single ports and sets)
|
|
fn extract_ports_from_nft_rule(&self, line: &str) -> Vec<u16> {
|
|
let mut ports = Vec::new();
|
|
|
|
// Pattern: "tcp dport 22 accept" or "tcp dport { 22, 80, 443 } accept"
|
|
if let Some(dport_pos) = line.find("dport") {
|
|
let after_dport = &line[dport_pos + 5..].trim();
|
|
|
|
// Handle port sets like { 22, 80, 443 }
|
|
if after_dport.starts_with('{') {
|
|
if let Some(end_brace) = after_dport.find('}') {
|
|
let ports_str = &after_dport[1..end_brace];
|
|
// Parse each port in the set
|
|
for port_str in ports_str.split(',') {
|
|
if let Ok(port) = port_str.trim().parse::<u16>() {
|
|
ports.push(port);
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Single port
|
|
if let Some(port_str) = after_dport.split_whitespace().next() {
|
|
if let Ok(port) = port_str.parse::<u16>() {
|
|
ports.push(port);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ports
|
|
}
|
|
|
|
/// Get aggregate qBittorrent torrent statistics
|
|
/// Returns: (active_count, download_mbps, upload_mbps)
|
|
fn get_qbittorrent_stats(&self) -> Option<(u32, f32, f32)> {
|
|
// Query qBittorrent API through VPN namespace
|
|
let output = Command::new("timeout")
|
|
.args(&[
|
|
"5",
|
|
"sudo",
|
|
"ip",
|
|
"netns",
|
|
"exec",
|
|
"vpn",
|
|
"curl",
|
|
"-s",
|
|
"--max-time",
|
|
"4",
|
|
"http://localhost:8080/api/v2/torrents/info"
|
|
])
|
|
.output()
|
|
.ok()?;
|
|
|
|
if !output.status.success() {
|
|
return None;
|
|
}
|
|
|
|
let output_str = String::from_utf8_lossy(&output.stdout);
|
|
let torrents: Vec<serde_json::Value> = serde_json::from_str(&output_str).ok()?;
|
|
|
|
let mut active_count = 0u32;
|
|
let mut total_download_bps = 0.0f64;
|
|
let mut total_upload_bps = 0.0f64;
|
|
|
|
for torrent in torrents {
|
|
let state = torrent["state"].as_str().unwrap_or("");
|
|
let dlspeed = torrent["dlspeed"].as_f64().unwrap_or(0.0);
|
|
let upspeed = torrent["upspeed"].as_f64().unwrap_or(0.0);
|
|
|
|
// States: downloading, uploading, stalledDL, stalledUP, queuedDL, queuedUP, pausedDL, pausedUP
|
|
// Count as active if downloading or uploading (seeding)
|
|
if state.contains("downloading") || state.contains("uploading") ||
|
|
state == "stalledDL" || state == "stalledUP" {
|
|
active_count += 1;
|
|
}
|
|
|
|
total_download_bps += dlspeed;
|
|
total_upload_bps += upspeed;
|
|
}
|
|
|
|
// qBittorrent returns bytes/s, convert to MB/s
|
|
let download_mbps = (total_download_bps / 1024.0 / 1024.0) as f32;
|
|
let upload_mbps = (total_upload_bps / 1024.0 / 1024.0) as f32;
|
|
|
|
Some((active_count, download_mbps, upload_mbps))
|
|
}
|
|
|
|
/// Check for active torrent copy operations
|
|
/// Returns: Vec of filenames currently being copied
|
|
fn get_active_torrent_copies(&self) -> Vec<String> {
|
|
let marker_dir = "/tmp/torrent-copy";
|
|
let mut active_copies = Vec::new();
|
|
|
|
// Read all marker files from directory
|
|
if let Ok(entries) = std::fs::read_dir(marker_dir) {
|
|
for entry in entries.flatten() {
|
|
if let Ok(file_type) = entry.file_type() {
|
|
if file_type.is_file() {
|
|
// Filename is the marker (sanitized torrent name)
|
|
if let Some(filename) = entry.file_name().to_str() {
|
|
// Convert sanitized name back (replace _ with /)
|
|
let display_name = filename.replace('_', "/");
|
|
active_copies.push(display_name);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
active_copies
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Collector for SystemdCollector {
|
|
async fn collect_structured(&self, agent_data: &mut AgentData) -> Result<(), CollectorError> {
|
|
// Clear services to prevent duplicates when updating cached data
|
|
agent_data.services.clear();
|
|
|
|
// Use cached complete data if available and fresh
|
|
if let Some(cached_complete_services) = self.get_cached_complete_services() {
|
|
for service_data in cached_complete_services {
|
|
agent_data.services.push(service_data);
|
|
}
|
|
Ok(())
|
|
} else {
|
|
// Collect fresh data
|
|
self.collect_service_data(agent_data).await
|
|
}
|
|
}
|
|
} |