Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e3996fdb84 | |||
| f94ca60e69 | |||
| c19ff56df8 | |||
| fe2f604703 | |||
| 8bfd416327 | |||
| 85c6c624fb | |||
| eab3f17428 | |||
| 7ad149bbe4 | |||
| b444c88ea0 | |||
| 317cf76bd1 | |||
| 0db1a165b9 | |||
| 3c2955376d | |||
| f09ccabc7f | |||
| 43dd5a901a | |||
| 01e1f33b66 | |||
| ed6399b914 | |||
| 14618c59c6 | |||
| 2740de9b54 | |||
| 37f2650200 | |||
| 833010e270 | |||
| 549d9d1c72 |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -279,7 +279,7 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.190"
|
version = "0.1.203"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -301,7 +301,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.190"
|
version = "0.1.203"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -324,7 +324,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.190"
|
version = "0.1.203"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-agent"
|
name = "cm-dashboard-agent"
|
||||||
version = "0.1.191"
|
version = "0.1.203"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::time::Duration;
|
|||||||
use tokio::time::interval;
|
use tokio::time::interval;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
|
|
||||||
use crate::communication::{AgentCommand, ZmqHandler};
|
use crate::communication::ZmqHandler;
|
||||||
use crate::config::AgentConfig;
|
use crate::config::AgentConfig;
|
||||||
use crate::collectors::{
|
use crate::collectors::{
|
||||||
Collector,
|
Collector,
|
||||||
@@ -134,12 +134,6 @@ impl Agent {
|
|||||||
// NOTE: With structured data, we might need to implement status tracking differently
|
// NOTE: With structured data, we might need to implement status tracking differently
|
||||||
// For now, we skip this until status evaluation is migrated
|
// For now, we skip this until status evaluation is migrated
|
||||||
}
|
}
|
||||||
// Handle incoming commands (check periodically)
|
|
||||||
_ = tokio::time::sleep(Duration::from_millis(100)) => {
|
|
||||||
if let Err(e) = self.handle_commands().await {
|
|
||||||
error!("Error handling commands: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = &mut shutdown_rx => {
|
_ = &mut shutdown_rx => {
|
||||||
info!("Shutdown signal received, stopping agent loop");
|
info!("Shutdown signal received, stopping agent loop");
|
||||||
break;
|
break;
|
||||||
@@ -259,36 +253,4 @@ impl Agent {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle incoming commands from dashboard
|
|
||||||
async fn handle_commands(&mut self) -> Result<()> {
|
|
||||||
// Try to receive a command (non-blocking)
|
|
||||||
if let Ok(Some(command)) = self.zmq_handler.try_receive_command() {
|
|
||||||
info!("Received command: {:?}", command);
|
|
||||||
|
|
||||||
match command {
|
|
||||||
AgentCommand::CollectNow => {
|
|
||||||
info!("Received immediate collection request");
|
|
||||||
if let Err(e) = self.collect_and_broadcast().await {
|
|
||||||
error!("Failed to collect on demand: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AgentCommand::SetInterval { seconds } => {
|
|
||||||
info!("Received interval change request: {}s", seconds);
|
|
||||||
// Note: This would require more complex handling to update the interval
|
|
||||||
// For now, just acknowledge
|
|
||||||
}
|
|
||||||
AgentCommand::ToggleCollector { name, enabled } => {
|
|
||||||
info!("Received collector toggle request: {} -> {}", name, enabled);
|
|
||||||
// Note: This would require more complex handling to enable/disable collectors
|
|
||||||
// For now, just acknowledge
|
|
||||||
}
|
|
||||||
AgentCommand::Ping => {
|
|
||||||
info!("Received ping command");
|
|
||||||
// Maybe send back a pong or status
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -5,10 +5,9 @@ use zmq::{Context, Socket, SocketType};
|
|||||||
|
|
||||||
use crate::config::ZmqConfig;
|
use crate::config::ZmqConfig;
|
||||||
|
|
||||||
/// ZMQ communication handler for publishing metrics and receiving commands
|
/// ZMQ communication handler for publishing metrics
|
||||||
pub struct ZmqHandler {
|
pub struct ZmqHandler {
|
||||||
publisher: Socket,
|
publisher: Socket,
|
||||||
command_receiver: Socket,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ZmqHandler {
|
impl ZmqHandler {
|
||||||
@@ -26,20 +25,8 @@ impl ZmqHandler {
|
|||||||
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
||||||
publisher.set_linger(1000)?; // Linger time on close
|
publisher.set_linger(1000)?; // Linger time on close
|
||||||
|
|
||||||
// Create command receiver socket (PULL socket to receive commands from dashboard)
|
|
||||||
let command_receiver = context.socket(SocketType::PULL)?;
|
|
||||||
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
|
|
||||||
command_receiver.bind(&cmd_bind_address)?;
|
|
||||||
|
|
||||||
info!("ZMQ command receiver bound to {}", cmd_bind_address);
|
|
||||||
|
|
||||||
// Set non-blocking mode for command receiver
|
|
||||||
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
|
|
||||||
command_receiver.set_linger(1000)?;
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
publisher,
|
publisher,
|
||||||
command_receiver,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,36 +52,4 @@ impl ZmqHandler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to receive a command (non-blocking)
|
|
||||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
|
||||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
|
||||||
Ok(bytes) => {
|
|
||||||
debug!("Received command message ({} bytes)", bytes.len());
|
|
||||||
|
|
||||||
let command: AgentCommand = serde_json::from_slice(&bytes)
|
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
|
|
||||||
|
|
||||||
debug!("Parsed command: {:?}", command);
|
|
||||||
Ok(Some(command))
|
|
||||||
}
|
|
||||||
Err(zmq::Error::EAGAIN) => {
|
|
||||||
// No message available (non-blocking)
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Commands that can be sent to the agent
|
|
||||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
|
||||||
pub enum AgentCommand {
|
|
||||||
/// Request immediate metric collection
|
|
||||||
CollectNow,
|
|
||||||
/// Change collection interval
|
|
||||||
SetInterval { seconds: u64 },
|
|
||||||
/// Enable/disable a collector
|
|
||||||
ToggleCollector { name: String, enabled: bool },
|
|
||||||
/// Request status/health check
|
|
||||||
Ping,
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ pub struct AgentConfig {
|
|||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct ZmqConfig {
|
pub struct ZmqConfig {
|
||||||
pub publisher_port: u16,
|
pub publisher_port: u16,
|
||||||
pub command_port: u16,
|
|
||||||
pub bind_address: String,
|
pub bind_address: String,
|
||||||
pub transmission_interval_seconds: u64,
|
pub transmission_interval_seconds: u64,
|
||||||
/// Heartbeat transmission interval in seconds for host connectivity detection
|
/// Heartbeat transmission interval in seconds for host connectivity detection
|
||||||
|
|||||||
@@ -7,14 +7,6 @@ pub fn validate_config(config: &AgentConfig) -> Result<()> {
|
|||||||
bail!("ZMQ publisher port cannot be 0");
|
bail!("ZMQ publisher port cannot be 0");
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.zmq.command_port == 0 {
|
|
||||||
bail!("ZMQ command port cannot be 0");
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.zmq.publisher_port == config.zmq.command_port {
|
|
||||||
bail!("ZMQ publisher and command ports cannot be the same");
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.zmq.bind_address.is_empty() {
|
if config.zmq.bind_address.is_empty() {
|
||||||
bail!("ZMQ bind address cannot be empty");
|
bail!("ZMQ bind address cannot be empty");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard"
|
name = "cm-dashboard"
|
||||||
version = "0.1.191"
|
version = "0.1.203"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -188,9 +188,9 @@ impl ServicesWidget {
|
|||||||
format!(" {} ", tree_symbol),
|
format!(" {} ", tree_symbol),
|
||||||
Typography::tree(),
|
Typography::tree(),
|
||||||
),
|
),
|
||||||
// Docker whale icon
|
// Docker icon (simple character for performance)
|
||||||
ratatui::text::Span::styled(
|
ratatui::text::Span::styled(
|
||||||
"🐋 ".to_string(),
|
"D ".to_string(),
|
||||||
Style::default().fg(Theme::highlight()).bg(Theme::background()),
|
Style::default().fg(Theme::highlight()).bg(Theme::background()),
|
||||||
),
|
),
|
||||||
// Service name
|
// Service name
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "cm-dashboard-shared"
|
name = "cm-dashboard-shared"
|
||||||
version = "0.1.191"
|
version = "0.1.203"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
Reference in New Issue
Block a user