Add refresh shortkey 'r' for on-demand metrics refresh

Implements ZMQ command protocol for dashboard-to-agent communication:
- Agents listen on port 6131 for REQ/REP commands
- Dashboard sends "refresh" command when 'r' key is pressed
- Agents force immediate collection of all metrics via force_refresh_all()
- Fresh data is broadcast immediately to dashboard
- Updated help text to show "r: Refresh all metrics"

Also includes metric-level caching architecture foundation for future
granular control over individual metric update frequencies.
This commit is contained in:
2025-10-15 22:30:04 +02:00
parent 244cade7d8
commit 6bc7f97375
9 changed files with 751 additions and 2 deletions

View File

@@ -10,6 +10,8 @@ mod notifications;
mod smart_agent;
mod cache;
mod cached_collector;
mod metric_cache;
mod metric_collector;
use smart_agent::SmartAgent;

274
agent/src/metric_cache.rs Normal file
View File

@@ -0,0 +1,274 @@
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, trace};
use serde_json::Value;
use crate::cache::CacheTier;
use crate::collectors::AgentType;
/// Configuration for individual metric collection intervals
#[derive(Debug, Clone)]
pub struct MetricConfig {
pub name: String,
pub tier: CacheTier,
pub collect_fn: String, // Method name to call for this specific metric
}
/// A group of related metrics with potentially different cache tiers
#[derive(Debug, Clone)]
pub struct MetricGroup {
pub name: String,
pub agent_type: AgentType,
pub metrics: Vec<MetricConfig>,
}
/// Cached metric entry with metadata
#[derive(Debug, Clone)]
struct MetricCacheEntry {
data: Value,
last_updated: Instant,
last_accessed: Instant,
access_count: u64,
tier: CacheTier,
}
impl MetricCacheEntry {
fn new(data: Value, tier: CacheTier) -> Self {
let now = Instant::now();
Self {
data,
last_updated: now,
last_accessed: now,
access_count: 1,
tier,
}
}
fn is_stale(&self) -> bool {
self.last_updated.elapsed() > self.tier.max_age()
}
fn access(&mut self) -> Value {
self.last_accessed = Instant::now();
self.access_count += 1;
self.data.clone()
}
fn update(&mut self, data: Value) {
self.data = data;
self.last_updated = Instant::now();
}
}
/// Metric-level cache manager with per-metric tier control
pub struct MetricCache {
// Key format: "agent_type.metric_name"
cache: RwLock<HashMap<String, MetricCacheEntry>>,
metric_groups: HashMap<AgentType, MetricGroup>,
}
impl MetricCache {
pub fn new() -> Self {
let mut metric_groups = HashMap::new();
// Define metric groups with per-metric cache tiers
metric_groups.insert(
AgentType::System,
MetricGroup {
name: "system".to_string(),
agent_type: AgentType::System,
metrics: vec![
MetricConfig {
name: "cpu_load".to_string(),
tier: CacheTier::RealTime,
collect_fn: "get_cpu_load".to_string(),
},
MetricConfig {
name: "cpu_temperature".to_string(),
tier: CacheTier::RealTime,
collect_fn: "get_cpu_temperature".to_string(),
},
MetricConfig {
name: "memory".to_string(),
tier: CacheTier::RealTime,
collect_fn: "get_memory_info".to_string(),
},
MetricConfig {
name: "top_processes".to_string(),
tier: CacheTier::Fast,
collect_fn: "get_top_processes".to_string(),
},
MetricConfig {
name: "cstate".to_string(),
tier: CacheTier::Medium,
collect_fn: "get_cpu_cstate_info".to_string(),
},
MetricConfig {
name: "users".to_string(),
tier: CacheTier::Medium,
collect_fn: "get_logged_in_users".to_string(),
},
],
},
);
metric_groups.insert(
AgentType::Service,
MetricGroup {
name: "service".to_string(),
agent_type: AgentType::Service,
metrics: vec![
MetricConfig {
name: "cpu_usage".to_string(),
tier: CacheTier::RealTime,
collect_fn: "get_service_cpu_usage".to_string(),
},
MetricConfig {
name: "memory_usage".to_string(),
tier: CacheTier::Fast,
collect_fn: "get_service_memory_usage".to_string(),
},
MetricConfig {
name: "status".to_string(),
tier: CacheTier::Medium,
collect_fn: "get_service_status".to_string(),
},
MetricConfig {
name: "disk_usage".to_string(),
tier: CacheTier::Slow,
collect_fn: "get_service_disk_usage".to_string(),
},
],
},
);
Self {
cache: RwLock::new(HashMap::new()),
metric_groups,
}
}
/// Get metric configuration for a specific agent type and metric
pub fn get_metric_config(&self, agent_type: &AgentType, metric_name: &str) -> Option<&MetricConfig> {
self.metric_groups
.get(agent_type)?
.metrics
.iter()
.find(|m| m.name == metric_name)
}
/// Get cached metric if available and not stale
pub async fn get_metric(&self, agent_type: &AgentType, metric_name: &str) -> Option<Value> {
let key = format!("{:?}.{}", agent_type, metric_name);
let mut cache = self.cache.write().await;
if let Some(entry) = cache.get_mut(&key) {
if !entry.is_stale() {
trace!("Metric cache hit for {}: {}ms old", key, entry.last_updated.elapsed().as_millis());
return Some(entry.access());
} else {
debug!("Metric cache entry for {} is stale ({}ms old)", key, entry.last_updated.elapsed().as_millis());
}
}
None
}
/// Store metric in cache
pub async fn put_metric(&self, agent_type: &AgentType, metric_name: &str, data: Value) {
let key = format!("{:?}.{}", agent_type, metric_name);
// Get tier for this metric
let tier = self
.get_metric_config(agent_type, metric_name)
.map(|config| config.tier)
.unwrap_or(CacheTier::Medium);
let mut cache = self.cache.write().await;
if let Some(entry) = cache.get_mut(&key) {
entry.update(data);
trace!("Updated metric cache entry for {}", key);
} else {
cache.insert(key.clone(), MetricCacheEntry::new(data, tier));
trace!("Created new metric cache entry for {} (tier: {:?})", key, tier);
}
}
/// Check if metric needs refresh based on its specific tier
pub async fn metric_needs_refresh(&self, agent_type: &AgentType, metric_name: &str) -> bool {
let key = format!("{:?}.{}", agent_type, metric_name);
let cache = self.cache.read().await;
if let Some(entry) = cache.get(&key) {
entry.is_stale()
} else {
// No cache entry exists
true
}
}
/// Get metrics that need refresh for a specific cache tier
pub async fn get_metrics_needing_refresh(&self, tier: CacheTier) -> Vec<(AgentType, String)> {
let cache = self.cache.read().await;
let mut metrics_to_refresh = Vec::new();
// Find all configured metrics for this tier
for (agent_type, group) in &self.metric_groups {
for metric_config in &group.metrics {
if metric_config.tier == tier {
let key = format!("{:?}.{}", agent_type, metric_config.name);
// Check if this metric needs refresh
let needs_refresh = if let Some(entry) = cache.get(&key) {
entry.is_stale()
} else {
true // No cache entry = needs initial collection
};
if needs_refresh {
metrics_to_refresh.push((agent_type.clone(), metric_config.name.clone()));
}
}
}
}
metrics_to_refresh
}
/// Get all metrics for a specific tier (for scheduling)
pub fn get_metrics_for_tier(&self, tier: CacheTier) -> Vec<(AgentType, String)> {
let mut metrics = Vec::new();
for (agent_type, group) in &self.metric_groups {
for metric_config in &group.metrics {
if metric_config.tier == tier {
metrics.push((agent_type.clone(), metric_config.name.clone()));
}
}
}
metrics
}
/// Cleanup old metric entries
pub async fn cleanup(&self) {
let mut cache = self.cache.write().await;
let initial_size = cache.len();
let cutoff = Instant::now() - Duration::from_secs(3600); // 1 hour
cache.retain(|key, entry| {
let keep = entry.last_accessed > cutoff;
if !keep {
trace!("Removing stale metric cache entry: {}", key);
}
keep
});
let removed = initial_size - cache.len();
if removed > 0 {
info!("Metric cache cleanup: removed {} stale entries ({} remaining)", removed, cache.len());
}
}
}

View File

@@ -0,0 +1,150 @@
use async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
use crate::collectors::{CollectorError, AgentType};
use crate::metric_cache::MetricCache;
/// Trait for collectors that support metric-level granular collection
#[async_trait]
pub trait MetricCollector {
/// Get the agent type this collector handles
fn agent_type(&self) -> AgentType;
/// Get the name of this collector
fn name(&self) -> &str;
/// Collect a specific metric by name
async fn collect_metric(&self, metric_name: &str) -> Result<Value, CollectorError>;
/// Get list of all metrics this collector can provide
fn available_metrics(&self) -> Vec<String>;
/// Collect multiple metrics efficiently (batch collection)
async fn collect_metrics(&self, metric_names: &[String]) -> Result<HashMap<String, Value>, CollectorError> {
let mut results = HashMap::new();
// Default implementation: collect each metric individually
for metric_name in metric_names {
match self.collect_metric(metric_name).await {
Ok(value) => {
results.insert(metric_name.clone(), value);
}
Err(e) => {
// Log error but continue with other metrics
tracing::warn!("Failed to collect metric {}: {}", metric_name, e);
}
}
}
Ok(results)
}
/// Collect all metrics this collector provides
async fn collect_all_metrics(&self) -> Result<HashMap<String, Value>, CollectorError> {
let metrics = self.available_metrics();
self.collect_metrics(&metrics).await
}
}
/// Manager for metric-based collection with caching
pub struct MetricCollectionManager {
collectors: HashMap<AgentType, Box<dyn MetricCollector + Send + Sync>>,
cache: MetricCache,
}
impl MetricCollectionManager {
pub fn new() -> Self {
Self {
collectors: HashMap::new(),
cache: MetricCache::new(),
}
}
/// Register a metric collector
pub fn register_collector(&mut self, collector: Box<dyn MetricCollector + Send + Sync>) {
let agent_type = collector.agent_type();
self.collectors.insert(agent_type, collector);
}
/// Collect a specific metric with caching
pub async fn get_metric(&self, agent_type: &AgentType, metric_name: &str) -> Result<Value, CollectorError> {
// Try cache first
if let Some(cached_value) = self.cache.get_metric(agent_type, metric_name).await {
return Ok(cached_value);
}
// Cache miss - collect fresh data
if let Some(collector) = self.collectors.get(agent_type) {
let value = collector.collect_metric(metric_name).await?;
// Store in cache
self.cache.put_metric(agent_type, metric_name, value.clone()).await;
Ok(value)
} else {
Err(CollectorError::ConfigError {
message: format!("No collector registered for agent type {:?}", agent_type),
})
}
}
/// Collect multiple metrics for an agent type
pub async fn get_metrics(&self, agent_type: &AgentType, metric_names: &[String]) -> Result<HashMap<String, Value>, CollectorError> {
let mut results = HashMap::new();
let mut metrics_to_collect = Vec::new();
// Check cache for each metric
for metric_name in metric_names {
if let Some(cached_value) = self.cache.get_metric(agent_type, metric_name).await {
results.insert(metric_name.clone(), cached_value);
} else {
metrics_to_collect.push(metric_name.clone());
}
}
// Collect uncached metrics
if !metrics_to_collect.is_empty() {
if let Some(collector) = self.collectors.get(agent_type) {
let fresh_metrics = collector.collect_metrics(&metrics_to_collect).await?;
// Store in cache and add to results
for (metric_name, value) in fresh_metrics {
self.cache.put_metric(agent_type, &metric_name, value.clone()).await;
results.insert(metric_name, value);
}
}
}
Ok(results)
}
/// Get metrics that need refresh for a specific tier
pub async fn get_stale_metrics(&self, tier: crate::cache::CacheTier) -> Vec<(AgentType, String)> {
self.cache.get_metrics_needing_refresh(tier).await
}
/// Force refresh specific metrics
pub async fn refresh_metrics(&self, metrics: &[(AgentType, String)]) -> Result<(), CollectorError> {
for (agent_type, metric_name) in metrics {
if let Some(collector) = self.collectors.get(agent_type) {
match collector.collect_metric(metric_name).await {
Ok(value) => {
self.cache.put_metric(agent_type, metric_name, value).await;
}
Err(e) => {
tracing::warn!("Failed to refresh metric {}.{}: {}",
format!("{:?}", agent_type), metric_name, e);
}
}
}
}
Ok(())
}
/// Cleanup old cache entries
pub async fn cleanup_cache(&self) {
self.cache.cleanup().await;
}
}

View File

@@ -22,6 +22,7 @@ use crate::notifications::{NotificationManager, NotificationConfig};
pub struct SmartAgent {
hostname: String,
zmq_socket: Socket,
zmq_command_socket: Socket,
notification_manager: NotificationManager,
cache: Arc<SmartCache>,
scheduler: CollectionScheduler,
@@ -40,6 +41,12 @@ impl SmartAgent {
socket.bind("tcp://0.0.0.0:6130")?;
info!("ZMQ publisher bound to tcp://0.0.0.0:6130");
// Setup command socket (REP)
let command_socket = context.socket(SocketType::REP)?;
command_socket.bind("tcp://0.0.0.0:6131")?;
command_socket.set_rcvtimeo(1000)?; // 1 second timeout for non-blocking
info!("ZMQ command socket bound to tcp://0.0.0.0:6131");
// Setup notifications
let notification_config = NotificationConfig {
enabled: true,
@@ -127,6 +134,7 @@ impl SmartAgent {
Ok(Self {
hostname,
zmq_socket: socket,
zmq_command_socket: command_socket,
notification_manager,
cache,
scheduler,
@@ -180,6 +188,9 @@ impl SmartAgent {
_ = stats_interval.tick() => {
self.log_cache_stats().await;
}
_ = self.handle_commands() => {
// Commands handled in background
}
}
}
}
@@ -379,4 +390,76 @@ impl SmartAgent {
debug!(" {:?}: {} entries", tier, count);
}
}
/// Handle incoming commands from dashboard (non-blocking)
async fn handle_commands(&mut self) {
// Check for commands with non-blocking receive
match self.zmq_command_socket.recv_string(zmq::DONTWAIT) {
Ok(Ok(command)) => {
info!("Received command: {}", command);
match command.as_str() {
"refresh" => {
info!("Processing refresh command - forcing immediate collection");
self.force_refresh_all().await;
// Send response
let response = "refresh_started";
if let Err(e) = self.zmq_command_socket.send(response, 0) {
warn!("Failed to send command response: {}", e);
}
}
_ => {
warn!("Unknown command: {}", command);
let response = "unknown_command";
if let Err(e) = self.zmq_command_socket.send(response, 0) {
warn!("Failed to send error response: {}", e);
}
}
}
}
Ok(Err(e)) => {
warn!("String conversion error: {:?}", e);
}
Err(zmq::Error::EAGAIN) => {
// No message available - this is normal
}
Err(e) => {
warn!("ZMQ command receive error: {}", e);
}
}
}
/// Force immediate collection of all metrics
async fn force_refresh_all(&mut self) {
info!("Force refreshing all collectors");
let start = std::time::Instant::now();
let mut refreshed = 0;
let mut outputs = Vec::new();
for collector in &self.cached_collectors {
match collector.collect_fresh().await {
Ok(output) => {
// Send immediately via ZMQ
if let Err(e) = self.send_metrics(&output.agent_type, &output.data).await {
error!("Failed to send refreshed metrics for {}: {}", collector.name(), e);
} else {
refreshed += 1;
outputs.push(output);
}
}
Err(e) => {
error!("Force refresh failed for {}: {}", collector.name(), e);
}
}
}
info!("Force refresh completed: {}/{} collectors in {}ms",
refreshed, self.cached_collectors.len(), start.elapsed().as_millis());
// Process status changes for refreshed data
for output in outputs {
self.check_status_changes(&output).await;
}
}
}