Admin API
Debug and monitoring API for HDDS applications.
Overview
The Admin API provides real-time inspection of HDDS mesh state via a lightweight binary protocol over TCP (default port 4243):
- Epoch-based snapshots - Lock-free reads using atomic counters
- Zero data-plane impact - No locks held during DDS operations
- JSON responses - Human-readable format for debugging tools
- Binary protocol - Simple
[cmd_id][len][payload]framing
Architecture
┌───────────────────────────────────────────────────────────────┐
│ HDDS Runtime │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AdminApi │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │ │
│ │ │ ParticipantDB│ │ DiscoveryFSM │ │ MetricsCollector│ │ │
│ │ │ (local part) │ │ (discovered) │ │ (telemetry) │ │ │
│ │ └──────────────┘ └──────────────┘ └────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────┴───────────────┐ │ │
│ │ │ Epoch-based Snapshot │ │ │
│ │ │ (AtomicU64 + retry on change)│ │ │
│ │ └───────────────┬───────────────┘ │ │
│ └───────────────────────────┼──────────────────────────────┘ │
│ │ │
│ TCP Server (port 4243) │
│ [cmd_id][len][JSON payload] │
└──────────────────────────────┼─────────────────────────────────┘
│
▼
┌───────────────────┐
│ Debug Tools │
│ (hdds-admin CLI) │
└───────────────────┘
Quick Start
use hdds::admin::AdminApi;
use hdds::telemetry::init_metrics;
fn main() -> std::io::Result<()> {
// Initialize metrics
let metrics = init_metrics();
// Start Admin API on port 4243
let mut admin = AdminApi::bind("127.0.0.1", 4243, None)?;
// Inject metrics collector
admin.set_metrics(metrics);
// Set local participant name
admin.set_local_participant("my-app".to_string());
// Application runs...
// Admin API available at tcp://127.0.0.1:4243
Ok(())
}
Commands
| Command | ID | Description |
|---|---|---|
GetMesh | 0x01 | List all discovered participants |
GetTopics | 0x02 | List active topics with endpoint counts |
GetMetrics | 0x03 | Get telemetry metrics snapshot |
GetHealth | 0x04 | Health check with uptime |
GetWriters | 0x05 | List all DataWriters |
GetReaders | 0x06 | List all DataReaders |
Binary Protocol
Request Format
┌──────────┬────────────────┐
│ cmd_id │ payload_len │
│ (1 byte) │ (4 bytes LE) │
└──────────┴────────────────┘
Response Format
┌──────────┬────────────────┬──────────────────┐
│ status │ payload_len │ JSON payload │
│ (1 byte) │ (4 bytes LE) │ (variable) │
└──────────┴────────────────┴──────────────────┘
Status Codes
| Status | Code | Description |
|---|---|---|
Ok | 0x00 | Success |
InvalidCommand | 0x01 | Unknown command ID |
InternalError | 0x02 | Server error |
Response Examples
GetMesh (0x01)
{
"epoch": 42,
"participants": [
{
"guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.01.c1",
"name": "sensor-node",
"is_local": false,
"state": "Active",
"endpoints": ["192.168.1.100:7400", "192.168.1.100:7411"],
"lease_ms": 100000,
"last_seen_ago_ms": 1234
}
]
}
GetTopics (0x02)
{
"epoch": 42,
"topics": [
{
"name": "sensor_data",
"type_name": "SensorData",
"writers_count": 3,
"readers_count": 5
}
]
}
GetMetrics (0x03)
{
"epoch": 42,
"messages_sent": 10000,
"messages_received": 9500,
"messages_dropped": 5,
"latency_min_ns": 100000,
"latency_p50_ns": 500000,
"latency_p99_ns": 2000000,
"latency_max_ns": 5000000
}
GetHealth (0x04)
{
"status": "ok",
"uptime_secs": 3600
}
GetWriters (0x05)
{
"epoch": 42,
"endpoints": [
{
"guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.03.c2",
"participant_guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.01.c1",
"topic_name": "sensor_data",
"type_name": "SensorData",
"reliability": "Reliable",
"durability": "Volatile",
"history": "KeepLast(10)"
}
]
}
GetReaders (0x06)
{
"epoch": 42,
"endpoints": [
{
"guid": "01.0f.ac.10.00.00.00.02.00.00.00.00.00.00.04.c7",
"participant_guid": "01.0f.ac.10.00.00.00.02.00.00.00.00.00.00.01.c1",
"topic_name": "sensor_data",
"type_name": "SensorData",
"reliability": "Reliable",
"durability": "Volatile",
"history": "KeepLast(100)"
}
]
}
Rust API
Initialization
use hdds::admin::AdminApi;
// Basic binding
let admin = AdminApi::bind("127.0.0.1", 4243, None)?;
// With DiscoveryFsm for full discovery data
let fsm = participant.discovery_fsm();
let admin = AdminApi::bind("0.0.0.0", 4243, Some(fsm))?;
Configuration
// Inject metrics collector
admin.set_metrics(metrics_collector);
// Set local participant name
admin.set_local_participant("my-app".to_string());
Programmatic Snapshots
use hdds::admin::{MeshSnapshot, TopicsSnapshot, MetricsSnapshot, EndpointsSnapshot};
// Mesh snapshot (participants)
let mesh: MeshSnapshot = admin.snapshot_mesh();
for p in &mesh.participants {
println!("Participant: {} ({})", p.guid, p.name);
}
// Topics snapshot
let topics: TopicsSnapshot = admin.snapshot_topics();
for t in &topics.topics {
println!("Topic: {} - {} writers, {} readers",
t.name, t.writers_count, t.readers_count);
}
// Metrics snapshot
let metrics: MetricsSnapshot = admin.snapshot_metrics();
println!("Messages sent: {}", metrics.messages_sent);
println!("P99 latency: {} ns", metrics.latency_p99_ns);
// Endpoints
let writers: EndpointsSnapshot = admin.snapshot_writers();
let readers: EndpointsSnapshot = admin.snapshot_readers();
// Uptime
let uptime = admin.uptime_secs();
Shutdown
// Graceful shutdown
admin.shutdown();
// Or let it drop automatically
drop(admin);
Snapshot Types
ParticipantView
pub struct ParticipantView {
pub guid: String, // Hex format: "01.0f.ac.10..."
pub name: String, // Participant name
pub is_local: bool, // Local or discovered
pub state: Option<String>, // "Idle", "Announced", "Discovered", "Active"
pub endpoints: Option<Vec<String>>, // Socket addresses
pub lease_ms: Option<u64>, // Lease duration (ms)
pub last_seen_ago_ms: Option<u64>, // Time since last SPDP (ms)
}
EndpointView
pub struct EndpointView {
pub guid: String, // Endpoint GUID
pub participant_guid: String, // Parent participant
pub topic_name: String, // Topic name
pub type_name: String, // Type name
pub reliability: String, // "Reliable" or "BestEffort"
pub durability: String, // "Volatile", "TransientLocal", "Transient", "Persistent"
pub history: String, // "KeepLast(N)" or "KeepAll"
}
Epoch-Based Snapshots
The Admin API uses epoch-based consistency to avoid blocking the data plane:
┌─────────────────────────────────────────────────────────────┐
│ Epoch-Based Read │
│ │
│ 1. Read epoch_before = atomic_load(epoch) │
│ 2. Clone data from Arc<RwLock<T>> (brief read lock) │
│ 3. Read epoch_after = atomic_load(epoch) │
│ 4. If epoch_before == epoch_after → return data │
│ 5. Else retry (up to 3 times) │
│ │
└─────────────────────────────────────────────────────────────┘
This ensures:
- No blocking of write/read operations
- Consistent snapshots (data from same epoch)
- Automatic retry on concurrent mutations
Command-Line Client
Connect with netcat or a custom client:
# Using netcat (send GetHealth = 0x04)
echo -ne '\x04\x00\x00\x00\x00' | nc localhost 4243 | tail -c +6
# Response: {"status":"ok","uptime_secs":3600}
Example Python Client
import socket
import struct
import json
def admin_query(host, port, cmd_id):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# Send request: [cmd_id][payload_len=0]
sock.send(struct.pack('<BI', cmd_id, 0))
# Read response header
header = sock.recv(5)
status, payload_len = struct.unpack('<BI', header)
if status != 0:
raise Exception(f"Error: status={status}")
# Read JSON payload
payload = sock.recv(payload_len)
sock.close()
return json.loads(payload)
# Usage
mesh = admin_query('localhost', 4243, 0x01) # GetMesh
print(json.dumps(mesh, indent=2))
Integration with HDDS Viewer
HDDS Viewer connects to both ports:
- Port 4242: Telemetry streaming (HDMX binary)
- Port 4243: Admin API (JSON snapshots)
# Start your DDS application
./my_dds_app
# Connect with HDDS Viewer
hdds-viewer --admin 127.0.0.1:4243 --telemetry 127.0.0.1:4242
Thread Safety
- TCP accept loop: Dedicated thread, non-blocking
- Client handlers: One thread per connection
- Snapshots: Epoch-based, lock-free reads
- Mutations: Bump epoch atomically after changes
Related
- Telemetry - Metrics streaming
- Environment Variables - Configuration
- Debugging Guide - Troubleshooting