Remove unused ZMQ command receiver (port 6131)
Service control migrated to SSH, command receiver no longer needed. - Remove command_receiver Socket from ZmqHandler - Remove try_receive_command method - Remove AgentCommand enum - Remove command_port from ZmqConfig
This commit is contained in:
parent
fe2f604703
commit
c19ff56df8
@ -5,10 +5,9 @@ use zmq::{Context, Socket, SocketType};
|
||||
|
||||
use crate::config::ZmqConfig;
|
||||
|
||||
/// ZMQ communication handler for publishing metrics and receiving commands
|
||||
/// ZMQ communication handler for publishing metrics
|
||||
pub struct ZmqHandler {
|
||||
publisher: Socket,
|
||||
command_receiver: Socket,
|
||||
}
|
||||
|
||||
impl ZmqHandler {
|
||||
@ -26,20 +25,8 @@ impl ZmqHandler {
|
||||
publisher.set_sndhwm(1000)?; // High water mark for outbound messages
|
||||
publisher.set_linger(1000)?; // Linger time on close
|
||||
|
||||
// Create command receiver socket (PULL socket to receive commands from dashboard)
|
||||
let command_receiver = context.socket(SocketType::PULL)?;
|
||||
let cmd_bind_address = format!("tcp://{}:{}", config.bind_address, config.command_port);
|
||||
command_receiver.bind(&cmd_bind_address)?;
|
||||
|
||||
info!("ZMQ command receiver bound to {}", cmd_bind_address);
|
||||
|
||||
// Set non-blocking mode for command receiver
|
||||
command_receiver.set_rcvtimeo(0)?; // Non-blocking receive
|
||||
command_receiver.set_linger(1000)?;
|
||||
|
||||
Ok(Self {
|
||||
publisher,
|
||||
command_receiver,
|
||||
})
|
||||
}
|
||||
|
||||
@ -65,36 +52,4 @@ impl ZmqHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to receive a command (non-blocking)
|
||||
pub fn try_receive_command(&self) -> Result<Option<AgentCommand>> {
|
||||
match self.command_receiver.recv_bytes(zmq::DONTWAIT) {
|
||||
Ok(bytes) => {
|
||||
debug!("Received command message ({} bytes)", bytes.len());
|
||||
|
||||
let command: AgentCommand = serde_json::from_slice(&bytes)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to deserialize command: {}", e))?;
|
||||
|
||||
debug!("Parsed command: {:?}", command);
|
||||
Ok(Some(command))
|
||||
}
|
||||
Err(zmq::Error::EAGAIN) => {
|
||||
// No message available (non-blocking)
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("ZMQ receive error: {}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Commands that can be sent to the agent
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub enum AgentCommand {
|
||||
/// Request immediate metric collection
|
||||
CollectNow,
|
||||
/// Change collection interval
|
||||
SetInterval { seconds: u64 },
|
||||
/// Enable/disable a collector
|
||||
ToggleCollector { name: String, enabled: bool },
|
||||
/// Request status/health check
|
||||
Ping,
|
||||
}
|
||||
|
||||
@ -20,7 +20,6 @@ pub struct AgentConfig {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ZmqConfig {
|
||||
pub publisher_port: u16,
|
||||
pub command_port: u16,
|
||||
pub bind_address: String,
|
||||
pub transmission_interval_seconds: u64,
|
||||
/// Heartbeat transmission interval in seconds for host connectivity detection
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user