Discovery Server Client
Client-side support for centralized discovery in environments without multicast.
Overview
The Discovery Server client enables HDDS participants to discover each other via a central server instead of multicast-based SPDP. This is essential for:
- Cloud/Kubernetes - No multicast support
- Corporate networks - Multicast disabled by policy
- NAT traversal - Participants behind firewalls
- WAN deployments - Geographic distribution
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Discovery Server │
│ (hdds-discovery-server) │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Participant Registry │ │
│ │ GUID_A → {name, domain, locators, endpoints} │ │
│ │ GUID_B → {name, domain, locators, endpoints} │ │
│ │ GUID_C → ... │ │
│ └───────────────────────────────────────────────────────────────┘ │
└──────────────────────────┬──────────────────────────────────────────┘
│ TCP (JSON + length prefix)
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Participant │ │ Participant │ │ Participant │
│ A │ │ B │ │ C │
│ (Cloud VM) │ │ (K8s Pod) │ │ (On-prem) │
└─────────────┘ └─────────────┘ └─────────────┘
Quick Start
use hdds::discovery_server::{DiscoveryServerClient, DiscoveryServerConfig};
use std::net::SocketAddr;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure the client
let server_addr: SocketAddr = "192.168.1.100:7400".parse()?;
let config = DiscoveryServerConfig::new(server_addr);
// Create client with GUID prefix (12 bytes)
let guid_prefix = [0x01, 0x0f, 0xac, 0x10, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x00];
let mut client = DiscoveryServerClient::new(config, guid_prefix)?;
// Connect to server
client.connect()?;
// Announce our participant
let unicast_locators = vec!["192.168.1.50:7411".parse()?];
client.announce_participant(
0, // domain_id
Some("MyApp".into()), // participant name
unicast_locators, // where to reach us
0x3f, // builtin endpoints mask
)?;
// Event loop
loop {
// Send heartbeat if due
if client.heartbeat_due() {
client.send_heartbeat()?;
}
// Read server messages
if let Some(event) = client.read_message()? {
match event {
ClientEvent::ParticipantDiscovered { guid_prefix, name, .. } => {
println!("Discovered: {:?} ({})", guid_prefix, name.unwrap_or_default());
}
ClientEvent::ParticipantLeft { guid_prefix } => {
println!("Left: {:?}", guid_prefix);
}
ClientEvent::EndpointDiscovered { topic_name, is_writer, .. } => {
println!("Endpoint: {} ({})", topic_name,
if is_writer { "writer" } else { "reader" });
}
_ => {}
}
}
}
}
Configuration
DiscoveryServerConfig
use hdds::discovery_server::DiscoveryServerConfig;
use std::time::Duration;
// Default configuration
let config = DiscoveryServerConfig::default();
// server_address: 127.0.0.1:7400
// connect_timeout: 5 seconds
// reconnect_delay: 1 second
// max_reconnect_attempts: 10
// heartbeat_interval: 10 seconds
// auto_reconnect: true
// max_message_size: 16 MB
// disable_multicast: true
// Custom configuration
let config = DiscoveryServerConfig::new("192.168.1.100:7400".parse()?)
.with_connect_timeout(Duration::from_secs(10))
.with_heartbeat_interval(Duration::from_secs(5))
.with_max_reconnect_attempts(20)
.without_auto_reconnect() // Disable auto-reconnect
.with_multicast_enabled(); // Hybrid mode (server + multicast)
Configuration Options
| Option | Default | Description |
|---|---|---|
server_address | 127.0.0.1:7400 | Discovery server address |
connect_timeout | 5 seconds | TCP connection timeout |
reconnect_delay | 1 second | Delay between reconnect attempts |
max_reconnect_attempts | 10 | Max retries (0 = infinite) |
heartbeat_interval | 10 seconds | Keep-alive interval |
auto_reconnect | true | Auto-reconnect on disconnect |
max_message_size | 16 MB | Max JSON message size |
disable_multicast | true | Disable SPDP multicast |
Client Builder
use hdds::discovery_server::DiscoveryServerClientBuilder;
use std::time::Duration;
let client = DiscoveryServerClientBuilder::new("192.168.1.100:7400".parse()?)
.guid_prefix([0x01; 12])
.connect_timeout(Duration::from_secs(10))
.heartbeat_interval(Duration::from_secs(5))
.without_auto_reconnect()
.with_multicast_enabled() // Hybrid: server + multicast
.build()?;
Client Events
ClientEvent Enum
pub enum ClientEvent {
/// Successfully connected to server
Connected,
/// Disconnected from server
Disconnected { reason: String },
/// Server acknowledged our registration
ParticipantAcknowledged,
/// Remote participant discovered
ParticipantDiscovered {
guid_prefix: [u8; 12],
domain_id: u32,
name: Option<String>,
unicast_locators: Vec<SocketAddr>,
builtin_endpoints: u32,
},
/// Remote participant left
ParticipantLeft { guid_prefix: [u8; 12] },
/// Remote endpoint discovered
EndpointDiscovered {
guid_prefix: [u8; 12],
entity_id: [u8; 4],
topic_name: String,
type_name: String,
is_writer: bool,
reliable: bool,
durability: u8,
unicast_locators: Vec<SocketAddr>,
},
/// Server error
Error { code: u32, message: String },
}
Wire Protocol
The client communicates with the server using length-prefixed JSON over TCP.
Message Format
┌────────────────────┬───────────────────────────────────────┐
│ Length (4B BE) │ JSON payload │
└────────────────────┴───────────────────────────────────────┘
Client Messages
| Message Type | Description |
|---|---|
participant_announce | Register participant with server |
endpoint_announce | Register writer/reader endpoint |
heartbeat | Keep lease alive |
participant_leave | Graceful departure |
Server Messages
| Message Type | Description |
|---|---|
participant_ack | Acknowledge registration |
participant_announce | Broadcast new participant |
participant_leave | Broadcast departure |
endpoint_announce | Broadcast new endpoint |
error | Error response |
Example JSON Messages
Client → Server (ParticipantAnnounce):
{
"type": "participant_announce",
"guid_prefix": "010fac1000000001000000000",
"domain_id": 0,
"name": "MyApp",
"unicast_locators": ["192.168.1.50:7411"],
"vendor_id": [1, 16],
"protocol_version": [2, 4],
"builtin_endpoints": 63
}
Server → Client (ParticipantDiscovered):
{
"type": "participant_announce",
"guid_prefix": "aabbccddeeff001122334455",
"domain_id": 0,
"name": "RemoteApp",
"unicast_locators": ["10.0.0.1:7411"],
"builtin_endpoints": 63
}
Server → Client (Error):
{
"type": "error",
"code": 1,
"message": "Max participants reached"
}
Error Handling
ClientError
pub enum ClientError {
/// Connection failed
ConnectionFailed(String),
/// Connection closed unexpectedly
ConnectionClosed,
/// I/O error
Io(std::io::Error),
/// Protocol error (invalid JSON, etc.)
Protocol(String),
/// Server returned an error
ServerError { code: u32, message: String },
/// Configuration error
Config(String),
/// Operation attempted while not connected
NotConnected,
}
Example Error Handling
use hdds::discovery_server::{ClientError, DiscoveryServerClient};
fn handle_connection(client: &mut DiscoveryServerClient) {
match client.connect() {
Ok(()) => println!("Connected to discovery server"),
Err(ClientError::ConnectionFailed(msg)) => {
eprintln!("Connection failed: {}", msg);
// Retry logic...
}
Err(ClientError::Config(msg)) => {
eprintln!("Configuration error: {}", msg);
}
Err(e) => eprintln!("Error: {}", e),
}
}
fn handle_message(client: &mut DiscoveryServerClient) {
match client.read_message() {
Ok(Some(event)) => {
// Process event...
}
Ok(None) => {
// No message or connection closed gracefully
}
Err(ClientError::NotConnected) => {
// Need to reconnect
}
Err(ClientError::Protocol(msg)) => {
eprintln!("Protocol error: {}", msg);
}
Err(e) => eprintln!("Error: {}", e),
}
}
Client API Reference
Connection Management
// Connect to server
client.connect()?;
// Check connection status
if client.is_connected() {
// Connected...
}
// Disconnect gracefully
client.disconnect();
// Leave and disconnect (notifies server)
client.leave()?;
Announcements
// Announce participant
client.announce_participant(
domain_id, // u32
name, // Option<String>
unicast_locators, // Vec<SocketAddr>
builtin_endpoints, // u32 (bitmask)
)?;
// Announce endpoint (writer or reader)
client.announce_endpoint(
entity_id, // [u8; 4]
topic_name, // String
type_name, // String
is_writer, // bool
reliable, // bool
durability, // u8 (0=Volatile, 1=TransientLocal, etc.)
unicast_locators, // Vec<SocketAddr>
)?;
Heartbeat
// Check if heartbeat is due
if client.heartbeat_due() {
client.send_heartbeat()?;
}
Accessors
// Get GUID prefix
let guid = client.guid_prefix();
// Get server address
let addr = client.server_address();
// Get configuration
let config = client.config();
Hybrid Mode
Run discovery server and multicast simultaneously:
let config = DiscoveryServerConfig::new(server_addr)
.with_multicast_enabled(); // Don't disable multicast
let client = DiscoveryServerClient::new(config, guid_prefix)?;
In hybrid mode:
- Local network peers discovered via multicast (fast)
- Remote peers discovered via server (NAT traversal)
- Redundancy if server is temporarily unavailable
Deployment
Docker/Kubernetes
# Kubernetes Discovery Server deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: hdds-discovery-server
spec:
replicas: 1
selector:
matchLabels:
app: hdds-discovery-server
template:
spec:
containers:
- name: discovery-server
image: hdds/discovery-server:latest
ports:
- containerPort: 7400
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
name: hdds-discovery
spec:
selector:
app: hdds-discovery-server
ports:
- port: 7400
targetPort: 7400
Client configuration in K8s:
// Connect to K8s service
let server_addr = "hdds-discovery.default.svc.cluster.local:7400";
let config = DiscoveryServerConfig::new(server_addr.parse()?);
Related
- K8s Discovery - Kubernetes-native discovery
- Cloud Discovery - AWS/GCP/Azure discovery
- Concepts: Discovery - DDS discovery overview