Skip to main content

Admin API

Debug and monitoring API for HDDS applications.

Overview

The Admin API provides real-time inspection of HDDS mesh state via a lightweight binary protocol over TCP (default port 4243):

  • Epoch-based snapshots - Lock-free reads using atomic counters
  • Zero data-plane impact - No locks held during DDS operations
  • JSON responses - Human-readable format for debugging tools
  • Binary protocol - Simple [cmd_id][len][payload] framing

Architecture

┌───────────────────────────────────────────────────────────────┐
│ HDDS Runtime │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AdminApi │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │ │
│ │ │ ParticipantDB│ │ DiscoveryFSM │ │ MetricsCollector│ │ │
│ │ │ (local part) │ │ (discovered) │ │ (telemetry) │ │ │
│ │ └──────────────┘ └──────────────┘ └────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────┴───────────────┐ │ │
│ │ │ Epoch-based Snapshot │ │ │
│ │ │ (AtomicU64 + retry on change)│ │ │
│ │ └───────────────┬───────────────┘ │ │
│ └───────────────────────────┼──────────────────────────────┘ │
│ │ │
│ TCP Server (port 4243) │
│ [cmd_id][len][JSON payload] │
└──────────────────────────────┼─────────────────────────────────┘


┌───────────────────┐
│ Debug Tools │
│ (hdds-admin CLI) │
└───────────────────┘

Quick Start

use hdds::admin::AdminApi;
use hdds::telemetry::init_metrics;

fn main() -> std::io::Result<()> {
// Initialize metrics
let metrics = init_metrics();

// Start Admin API on port 4243
let mut admin = AdminApi::bind("127.0.0.1", 4243, None)?;

// Inject metrics collector
admin.set_metrics(metrics);

// Set local participant name
admin.set_local_participant("my-app".to_string());

// Application runs...
// Admin API available at tcp://127.0.0.1:4243

Ok(())
}

Commands

CommandIDDescription
GetMesh0x01List all discovered participants
GetTopics0x02List active topics with endpoint counts
GetMetrics0x03Get telemetry metrics snapshot
GetHealth0x04Health check with uptime
GetWriters0x05List all DataWriters
GetReaders0x06List all DataReaders

Binary Protocol

Request Format

┌──────────┬────────────────┐
│ cmd_id │ payload_len │
│ (1 byte) │ (4 bytes LE) │
└──────────┴────────────────┘

Response Format

┌──────────┬────────────────┬──────────────────┐
│ status │ payload_len │ JSON payload │
│ (1 byte) │ (4 bytes LE) │ (variable) │
└──────────┴────────────────┴──────────────────┘

Status Codes

StatusCodeDescription
Ok0x00Success
InvalidCommand0x01Unknown command ID
InternalError0x02Server error

Response Examples

GetMesh (0x01)

{
"epoch": 42,
"participants": [
{
"guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.01.c1",
"name": "sensor-node",
"is_local": false,
"state": "Active",
"endpoints": ["192.168.1.100:7400", "192.168.1.100:7411"],
"lease_ms": 100000,
"last_seen_ago_ms": 1234
}
]
}

GetTopics (0x02)

{
"epoch": 42,
"topics": [
{
"name": "sensor_data",
"type_name": "SensorData",
"writers_count": 3,
"readers_count": 5
}
]
}

GetMetrics (0x03)

{
"epoch": 42,
"messages_sent": 10000,
"messages_received": 9500,
"messages_dropped": 5,
"latency_min_ns": 100000,
"latency_p50_ns": 500000,
"latency_p99_ns": 2000000,
"latency_max_ns": 5000000
}

GetHealth (0x04)

{
"status": "ok",
"uptime_secs": 3600
}

GetWriters (0x05)

{
"epoch": 42,
"endpoints": [
{
"guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.03.c2",
"participant_guid": "01.0f.ac.10.00.00.00.01.00.00.00.00.00.00.01.c1",
"topic_name": "sensor_data",
"type_name": "SensorData",
"reliability": "Reliable",
"durability": "Volatile",
"history": "KeepLast(10)"
}
]
}

GetReaders (0x06)

{
"epoch": 42,
"endpoints": [
{
"guid": "01.0f.ac.10.00.00.00.02.00.00.00.00.00.00.04.c7",
"participant_guid": "01.0f.ac.10.00.00.00.02.00.00.00.00.00.00.01.c1",
"topic_name": "sensor_data",
"type_name": "SensorData",
"reliability": "Reliable",
"durability": "Volatile",
"history": "KeepLast(100)"
}
]
}

Rust API

Initialization

use hdds::admin::AdminApi;

// Basic binding
let admin = AdminApi::bind("127.0.0.1", 4243, None)?;

// With DiscoveryFsm for full discovery data
let fsm = participant.discovery_fsm();
let admin = AdminApi::bind("0.0.0.0", 4243, Some(fsm))?;

Configuration

// Inject metrics collector
admin.set_metrics(metrics_collector);

// Set local participant name
admin.set_local_participant("my-app".to_string());

Programmatic Snapshots

use hdds::admin::{MeshSnapshot, TopicsSnapshot, MetricsSnapshot, EndpointsSnapshot};

// Mesh snapshot (participants)
let mesh: MeshSnapshot = admin.snapshot_mesh();
for p in &mesh.participants {
println!("Participant: {} ({})", p.guid, p.name);
}

// Topics snapshot
let topics: TopicsSnapshot = admin.snapshot_topics();
for t in &topics.topics {
println!("Topic: {} - {} writers, {} readers",
t.name, t.writers_count, t.readers_count);
}

// Metrics snapshot
let metrics: MetricsSnapshot = admin.snapshot_metrics();
println!("Messages sent: {}", metrics.messages_sent);
println!("P99 latency: {} ns", metrics.latency_p99_ns);

// Endpoints
let writers: EndpointsSnapshot = admin.snapshot_writers();
let readers: EndpointsSnapshot = admin.snapshot_readers();

// Uptime
let uptime = admin.uptime_secs();

Shutdown

// Graceful shutdown
admin.shutdown();

// Or let it drop automatically
drop(admin);

Snapshot Types

ParticipantView

pub struct ParticipantView {
pub guid: String, // Hex format: "01.0f.ac.10..."
pub name: String, // Participant name
pub is_local: bool, // Local or discovered
pub state: Option<String>, // "Idle", "Announced", "Discovered", "Active"
pub endpoints: Option<Vec<String>>, // Socket addresses
pub lease_ms: Option<u64>, // Lease duration (ms)
pub last_seen_ago_ms: Option<u64>, // Time since last SPDP (ms)
}

EndpointView

pub struct EndpointView {
pub guid: String, // Endpoint GUID
pub participant_guid: String, // Parent participant
pub topic_name: String, // Topic name
pub type_name: String, // Type name
pub reliability: String, // "Reliable" or "BestEffort"
pub durability: String, // "Volatile", "TransientLocal", "Transient", "Persistent"
pub history: String, // "KeepLast(N)" or "KeepAll"
}

Epoch-Based Snapshots

The Admin API uses epoch-based consistency to avoid blocking the data plane:

┌─────────────────────────────────────────────────────────────┐
│ Epoch-Based Read │
│ │
│ 1. Read epoch_before = atomic_load(epoch) │
│ 2. Clone data from Arc<RwLock<T>> (brief read lock) │
│ 3. Read epoch_after = atomic_load(epoch) │
│ 4. If epoch_before == epoch_after → return data │
│ 5. Else retry (up to 3 times) │
│ │
└─────────────────────────────────────────────────────────────┘

This ensures:

  • No blocking of write/read operations
  • Consistent snapshots (data from same epoch)
  • Automatic retry on concurrent mutations

Command-Line Client

Connect with netcat or a custom client:

# Using netcat (send GetHealth = 0x04)
echo -ne '\x04\x00\x00\x00\x00' | nc localhost 4243 | tail -c +6

# Response: {"status":"ok","uptime_secs":3600}

Example Python Client

import socket
import struct
import json

def admin_query(host, port, cmd_id):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))

# Send request: [cmd_id][payload_len=0]
sock.send(struct.pack('<BI', cmd_id, 0))

# Read response header
header = sock.recv(5)
status, payload_len = struct.unpack('<BI', header)

if status != 0:
raise Exception(f"Error: status={status}")

# Read JSON payload
payload = sock.recv(payload_len)
sock.close()

return json.loads(payload)

# Usage
mesh = admin_query('localhost', 4243, 0x01) # GetMesh
print(json.dumps(mesh, indent=2))

Integration with HDDS Viewer

HDDS Viewer connects to both ports:

  • Port 4242: Telemetry streaming (HDMX binary)
  • Port 4243: Admin API (JSON snapshots)
# Start your DDS application
./my_dds_app

# Connect with HDDS Viewer
hdds-viewer --admin 127.0.0.1:4243 --telemetry 127.0.0.1:4242

Thread Safety

  • TCP accept loop: Dedicated thread, non-blocking
  • Client handlers: One thread per connection
  • Snapshots: Epoch-based, lock-free reads
  • Mutations: Bump epoch atomically after changes