Implement simple persistent cache with automatic saving on status changes
This commit is contained in:
parent
338c4457a5
commit
a08670071c
@ -179,7 +179,7 @@ impl Agent {
|
||||
|
||||
async fn process_metrics(&mut self, metrics: &[Metric]) {
|
||||
for metric in metrics {
|
||||
self.host_status_manager.process_metric(metric, &mut self.notification_manager).await;
|
||||
self.host_status_manager.process_metric(metric, &mut self.notification_manager, self.metric_manager.get_cache_manager()).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
172
agent/src/cache/mod.rs
vendored
172
agent/src/cache/mod.rs
vendored
@ -1,101 +1,119 @@
|
||||
use cm_dashboard_shared::{CacheConfig, Metric};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::warn;
|
||||
use tracing::{info, warn, error};
|
||||
|
||||
mod cached_metric;
|
||||
mod manager;
|
||||
|
||||
pub use cached_metric::CachedMetric;
|
||||
pub use manager::MetricCacheManager;
|
||||
|
||||
/// Central cache for individual metrics with configurable tiers
|
||||
pub struct ConfigurableCache {
|
||||
cache: RwLock<HashMap<String, CachedMetric>>,
|
||||
config: CacheConfig,
|
||||
/// Simple persistent cache for metrics
|
||||
pub struct SimpleCache {
|
||||
metrics: RwLock<HashMap<String, Metric>>,
|
||||
persist_path: String,
|
||||
}
|
||||
|
||||
impl ConfigurableCache {
|
||||
impl SimpleCache {
|
||||
pub fn new(config: CacheConfig) -> Self {
|
||||
Self {
|
||||
cache: RwLock::new(HashMap::new()),
|
||||
config,
|
||||
}
|
||||
let cache = Self {
|
||||
metrics: RwLock::new(HashMap::new()),
|
||||
persist_path: config.persist_path,
|
||||
};
|
||||
|
||||
// Load from disk on startup
|
||||
cache.load_from_disk();
|
||||
cache
|
||||
}
|
||||
|
||||
/// Store metric in cache
|
||||
pub async fn store_metric(&self, metric: Metric) {
|
||||
if !self.config.enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut cache = self.cache.write().await;
|
||||
|
||||
// Enforce max entries limit
|
||||
if cache.len() >= self.config.max_entries {
|
||||
self.cleanup_old_entries(&mut cache).await;
|
||||
}
|
||||
|
||||
let cached_metric = CachedMetric {
|
||||
metric: metric.clone(),
|
||||
collected_at: Instant::now(),
|
||||
access_count: 1,
|
||||
};
|
||||
|
||||
cache.insert(metric.name.clone(), cached_metric);
|
||||
|
||||
// Cached metric (debug logging disabled for performance)
|
||||
let mut metrics = self.metrics.write().await;
|
||||
metrics.insert(metric.name.clone(), metric);
|
||||
}
|
||||
|
||||
/// Get all cached metrics (including expired ones) for broadcasting
|
||||
/// Get all cached metrics
|
||||
pub async fn get_all_cached_metrics(&self) -> Vec<Metric> {
|
||||
if !self.config.enabled {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let cache = self.cache.read().await;
|
||||
let mut all_metrics = Vec::new();
|
||||
|
||||
for cached_metric in cache.values() {
|
||||
all_metrics.push(cached_metric.metric.clone());
|
||||
}
|
||||
|
||||
all_metrics
|
||||
let metrics = self.metrics.read().await;
|
||||
metrics.values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Background cleanup of old entries
|
||||
async fn cleanup_old_entries(&self, cache: &mut HashMap<String, CachedMetric>) {
|
||||
let mut to_remove = Vec::new();
|
||||
|
||||
for (metric_name, cached_metric) in cache.iter() {
|
||||
let cache_interval = self.config.default_ttl_seconds;
|
||||
let elapsed = cached_metric.collected_at.elapsed().as_secs();
|
||||
|
||||
// Remove entries that are way past their expiration (2x interval)
|
||||
if elapsed > cache_interval * 2 {
|
||||
to_remove.push(metric_name.clone());
|
||||
/// Save cache to disk
|
||||
pub async fn save_to_disk(&self) {
|
||||
let metrics = self.metrics.read().await;
|
||||
|
||||
// Create directory if needed
|
||||
if let Some(parent) = Path::new(&self.persist_path).parent() {
|
||||
if let Err(e) = fs::create_dir_all(parent) {
|
||||
warn!("Failed to create cache directory {}: {}", parent.display(), e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for metric_name in to_remove {
|
||||
cache.remove(&metric_name);
|
||||
}
|
||||
|
||||
// If still too many entries, remove least recently accessed
|
||||
if cache.len() >= self.config.max_entries {
|
||||
let mut entries: Vec<_> = cache
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.access_count))
|
||||
.collect();
|
||||
entries.sort_by_key(|(_, access_count)| *access_count);
|
||||
|
||||
let excess = cache.len() - (self.config.max_entries * 3 / 4); // Remove 25%
|
||||
for (metric_name, _) in entries.iter().take(excess) {
|
||||
cache.remove(metric_name);
|
||||
// Serialize and save
|
||||
match serde_json::to_string_pretty(&*metrics) {
|
||||
Ok(json) => {
|
||||
if let Err(e) = fs::write(&self.persist_path, json) {
|
||||
error!("Failed to save cache to {}: {}", self.persist_path, e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize cache: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warn!("Cache cleanup removed {} entries due to size limit", excess);
|
||||
/// Load cache from disk
|
||||
fn load_from_disk(&self) {
|
||||
match fs::read_to_string(&self.persist_path) {
|
||||
Ok(content) => {
|
||||
match serde_json::from_str::<HashMap<String, Metric>>(&content) {
|
||||
Ok(loaded_metrics) => {
|
||||
if let Ok(mut metrics) = self.metrics.try_write() {
|
||||
*metrics = loaded_metrics;
|
||||
info!("Loaded {} metrics from cache", metrics.len());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to parse cache file {}: {}", self.persist_path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
info!("No cache file found at {}, starting fresh", self.persist_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetricCacheManager {
|
||||
cache: Arc<SimpleCache>,
|
||||
}
|
||||
|
||||
impl MetricCacheManager {
|
||||
pub fn new(config: CacheConfig) -> Self {
|
||||
Self {
|
||||
cache: Arc::new(SimpleCache::new(config)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn store_metric(&self, metric: Metric) {
|
||||
self.cache.store_metric(metric).await;
|
||||
}
|
||||
|
||||
pub async fn cache_metric(&self, metric: Metric) {
|
||||
self.store_metric(metric).await;
|
||||
}
|
||||
|
||||
pub async fn start_background_tasks(&self) {
|
||||
// No background tasks needed for simple cache
|
||||
}
|
||||
|
||||
pub async fn get_all_cached_metrics(&self) -> Result<Vec<Metric>, anyhow::Error> {
|
||||
Ok(self.cache.get_all_cached_metrics().await)
|
||||
}
|
||||
|
||||
pub async fn save_to_disk(&self) {
|
||||
self.cache.save_to_disk().await;
|
||||
}
|
||||
}
|
||||
@ -112,14 +112,8 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> {
|
||||
}
|
||||
|
||||
// Validate cache configuration
|
||||
if config.cache.enabled {
|
||||
if config.cache.default_ttl_seconds == 0 {
|
||||
bail!("Cache TTL cannot be 0");
|
||||
}
|
||||
|
||||
if config.cache.max_entries == 0 {
|
||||
bail!("Cache max entries cannot be 0");
|
||||
}
|
||||
if config.cache.persist_path.is_empty() {
|
||||
bail!("Cache persist path cannot be empty");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@ -240,7 +240,7 @@ impl MetricCollectionManager {
|
||||
|
||||
/// Get all cached metrics from the cache manager
|
||||
pub async fn get_all_cached_metrics(&self) -> Result<Vec<Metric>> {
|
||||
let cached_metrics = self.cache_manager.get_all_cached_metrics().await;
|
||||
let cached_metrics = self.cache_manager.get_all_cached_metrics().await?;
|
||||
debug!(
|
||||
"Retrieved {} cached metrics for broadcast",
|
||||
cached_metrics.len()
|
||||
@ -248,4 +248,8 @@ impl MetricCollectionManager {
|
||||
Ok(cached_metrics)
|
||||
}
|
||||
|
||||
pub fn get_cache_manager(&self) -> &MetricCacheManager {
|
||||
&self.cache_manager
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -9,7 +9,6 @@ use chrono::Utc;
|
||||
pub struct HostStatusConfig {
|
||||
pub enabled: bool,
|
||||
pub aggregation_method: String, // "worst_case"
|
||||
pub update_interval_seconds: u64,
|
||||
pub notification_interval_seconds: u64,
|
||||
}
|
||||
|
||||
@ -18,7 +17,6 @@ impl Default for HostStatusConfig {
|
||||
Self {
|
||||
enabled: true,
|
||||
aggregation_method: "worst_case".to_string(),
|
||||
update_interval_seconds: 5,
|
||||
notification_interval_seconds: 30,
|
||||
}
|
||||
}
|
||||
@ -72,7 +70,7 @@ impl HostStatusManager {
|
||||
|
||||
/// Update the status of a specific service and recalculate host status
|
||||
/// Updates real-time status and buffers changes for email notifications
|
||||
pub fn update_service_status(&mut self, service: String, status: Status) {
|
||||
pub fn update_service_status(&mut self, service: String, status: Status, cache_manager: Option<&crate::cache::MetricCacheManager>) {
|
||||
if !self.config.enabled {
|
||||
return;
|
||||
}
|
||||
@ -84,6 +82,14 @@ impl HostStatusManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// Save cache when status changes (clone cache manager reference for async)
|
||||
if let Some(cache) = cache_manager {
|
||||
let cache = cache.clone();
|
||||
tokio::spawn(async move {
|
||||
cache.save_to_disk().await;
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize batch if this is the first change
|
||||
if self.batch_start_time.is_none() {
|
||||
self.batch_start_time = Some(Instant::now());
|
||||
@ -163,9 +169,9 @@ impl HostStatusManager {
|
||||
|
||||
|
||||
/// Process a metric - updates status (notifications handled separately via batching)
|
||||
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager) {
|
||||
pub async fn process_metric(&mut self, metric: &Metric, _notification_manager: &mut crate::notifications::NotificationManager, cache_manager: &crate::cache::MetricCacheManager) {
|
||||
// Just update status - notifications are handled by process_pending_notifications
|
||||
self.update_service_status(metric.name.clone(), metric.status);
|
||||
self.update_service_status(metric.name.clone(), metric.status, Some(cache_manager));
|
||||
}
|
||||
|
||||
/// Process pending notifications - call this at notification intervals
|
||||
|
||||
@ -3,23 +3,13 @@ use serde::{Deserialize, Serialize};
|
||||
/// Cache configuration
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct CacheConfig {
|
||||
pub enabled: bool,
|
||||
pub default_ttl_seconds: u64,
|
||||
pub max_entries: usize,
|
||||
pub warming_timeout_seconds: u64,
|
||||
pub background_refresh_enabled: bool,
|
||||
pub cleanup_interval_seconds: u64,
|
||||
pub persist_path: String,
|
||||
}
|
||||
|
||||
impl Default for CacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: true,
|
||||
default_ttl_seconds: 30,
|
||||
max_entries: 10000,
|
||||
warming_timeout_seconds: 3,
|
||||
background_refresh_enabled: true,
|
||||
cleanup_interval_seconds: 1800,
|
||||
persist_path: "/var/lib/cm-dashboard/cache.json".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user