Skip to main content

Telemetry

Real-time metrics collection and export for HDDS monitoring.

Overview

HDDS includes a telemetry system for observability:

  • Thread-safe metrics - Atomic counters, latency histograms
  • HDMX binary format - Compact wire protocol for streaming
  • Live TCP streaming - Push metrics to monitoring tools at 10 Hz
  • Admin API integration - Metrics snapshots via Admin API

Architecture

┌─────────────────────────────────────────────────────────────┐
│ HDDS Application │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ MetricsCollector │ │
│ │ ┌─────────────────┐ ┌──────────────────────────────┐ │ │
│ │ │ Atomic Counters │ │ Latency Histogram │ │ │
│ │ │ - messages_sent │ │ - ring buffer (10k samples) │ │ │
│ │ │ - messages_recv │ │ - p50, p99, p999 percentiles │ │ │
│ │ │ - bytes_sent │ └──────────────────────────────┘ │ │
│ │ │ - would_block │ │ │
│ │ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Exporter │ │
│ │ TCP Server (port 4242) ──► HDMX binary frames @ 10 Hz │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
└──────────────────────────────┼───────────────────────────────┘

┌───────────────────┐
│ HDDS Viewer │
│ (monitoring) │
└───────────────────┘

Quick Start

use hdds::telemetry::{init_metrics, init_exporter, get_metrics};

fn main() {
// Initialize global metrics collector
let metrics = init_metrics();

// Start TCP exporter (optional) for live streaming
let exporter = init_exporter("127.0.0.1", 4242).ok();

// Application loop...
loop {
// Record metrics during DDS operations
metrics.increment_sent(1);
metrics.increment_bytes_sent(1024);

// Record latency samples
let start_ns = get_timestamp_ns();
// ... operation ...
let end_ns = get_timestamp_ns();
metrics.add_latency_sample(start_ns, end_ns);
}
}

MetricsCollector API

Initialization

use hdds::telemetry::{init_metrics, get_metrics, get_metrics_opt, MetricsCollector};

// Initialize global metrics (creates singleton)
let metrics = init_metrics();

// Get existing collector (creates if not initialized)
let metrics = get_metrics();

// Get existing collector (returns None if not initialized)
if let Some(metrics) = get_metrics_opt() {
// Use metrics
}

// Custom capacity (default: 10,000 latency samples)
let metrics = MetricsCollector::with_capacity(50_000);

Counter Methods

// Message counters
metrics.increment_sent(1); // Messages sent
metrics.increment_received(1); // Messages received
metrics.increment_dropped(1); // Messages dropped

// Byte counter
metrics.increment_bytes_sent(1024); // Bytes sent

// Error counters
metrics.increment_would_block(1); // EAGAIN/EWOULDBLOCK
metrics.increment_merge_full(1); // Merge queue full
metrics.increment_cache_insert_errors(1); // HistoryCache errors
metrics.increment_transport_errors(1); // Transport errors

// Read counters
let sent = metrics.messages_sent();
let dropped = metrics.messages_dropped();

Latency Histogram

// Add latency sample (nanoseconds)
let start_ns = std::time::Instant::now();
// ... operation ...
let elapsed_ns = start_ns.elapsed().as_nanos() as u64;
metrics.add_latency_sample(0, elapsed_ns);

// Or with timestamps
metrics.add_latency_sample(start_timestamp_ns, end_timestamp_ns);

// Check sample count
let count = metrics.latency_sample_count();

Snapshots

use hdds::telemetry::Frame;

// Get current metrics snapshot
let frame: Frame = metrics.snapshot();

// Frame contains timestamp and fields
println!("Timestamp: {} ns", frame.ts_ns);

for field in &frame.fields {
println!("Tag: {}, Value: {}", field.tag, field.value_u64);
}

Metric Tags

TagNameTypeDescription
1PARTICIPANT_IDu64Participant identifier
10MESSAGES_SENTu64Total messages sent
11MESSAGES_RECEIVEDu64Total messages received
12MESSAGES_DROPPEDu64Total messages dropped
13BYTES_SENTu64Total bytes sent
20LATENCY_P50u6450th percentile latency (ns)
21LATENCY_P99u6499th percentile latency (ns)
22LATENCY_P999u6499.9th percentile latency (ns)
40MERGE_FULL_COUNTu64Merge queue full events
41WOULD_BLOCK_COUNTu64EAGAIN/EWOULDBLOCK events
42CACHE_INSERT_ERRORSu64HistoryCache insert errors
43TRANSPORT_ERRORSu64Transport errors

Live Streaming with Exporter

Starting the Exporter

use hdds::telemetry::{init_exporter, get_exporter};

// Start TCP server on port 4242
let exporter = init_exporter("127.0.0.1", 4242)?;

// Or bind to all interfaces
let exporter = init_exporter("0.0.0.0", 4242)?;

// Get existing exporter
if let Some(exporter) = get_exporter() {
// Push frames manually
let frame = metrics.snapshot();
exporter.push(&frame);
}

Push Metrics

use std::time::Duration;
use std::thread;

// Spawn metrics push thread (10 Hz)
let metrics = init_metrics();
let exporter = init_exporter("127.0.0.1", 4242)?;

thread::spawn(move || {
loop {
let frame = metrics.snapshot();
exporter.push(&frame);
thread::sleep(Duration::from_millis(100));
}
});

Shutdown

// Graceful shutdown
exporter.shutdown();

HDMX Binary Format

The HDMX (HDDS Metrics eXchange) format is a compact binary protocol:

Header (16 bytes)

OffsetSizeFieldDescription
04Magic0x48444D58 ("HDMX" LE)
44Version0x00000100 (v1.0.0)
84Frame LengthTotal frame size (bytes)
124Field CountNumber of fields

Field (4 + N bytes)

OffsetSizeFieldDescription
02TagField identifier
21DTypeData type (0=u64, 1=i64, 2=f64, 3=u32, 4=bytes)
31LengthValue length (4 or 8 bytes)
4NValueField value (little-endian)

Encoding/Decoding

use hdds::telemetry::{encode_frame, decode_frame, Frame, MAGIC, VERSION};

// Encode frame to bytes
let frame = metrics.snapshot();
let bytes: Vec<u8> = encode_frame(&frame)?;

// Decode bytes to frame
let decoded: Frame = decode_frame(&bytes)?;

// Check magic and version
assert_eq!(u32::from_le_bytes(bytes[0..4].try_into().unwrap()), MAGIC);

Performance

OperationTarget Latency
Counter increment< 5 ns
Latency sample< 500 ns
Snapshot< 1 us
Frame encode< 5 us
TCP push< 10 us

Thread Safety

  • Counters: Lock-free atomics (Relaxed ordering)
  • Latency histogram: Mutex-protected ring buffer
  • Exporter clients: Mutex-protected vector

Disabling Telemetry

Via environment variable:

# Disable telemetry export
export HDDS_EXPORTER_DISABLE=1
./my_dds_app

Or skip initialization:

// Simply don't call init_exporter()
let metrics = init_metrics(); // Metrics still collected locally
// No exporter = no TCP streaming

Integration with HDDS Viewer

HDDS Viewer connects to the exporter port for live monitoring:

# Start your DDS application with telemetry
export HDDS_TELEMETRY_PORT=4242
./my_dds_app

# Connect with HDDS Viewer
hdds-viewer --connect 127.0.0.1:4242

Example: Complete Instrumentation

use hdds::telemetry::{init_metrics, init_exporter};
use hdds::{Participant, QoS};
use std::time::{Duration, Instant};
use std::thread;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize telemetry
let metrics = init_metrics();
let _exporter = init_exporter("0.0.0.0", 4242)?;

// Create DDS participant
let participant = Participant::builder("instrumented-app").build()?;
let topic = participant.topic::<SensorData>("sensor_data")?;
let writer = participant.publisher()?.create_writer(&topic, QoS::reliable())?;

// Spawn metrics push thread
let metrics_clone = metrics.clone();
thread::spawn(move || {
loop {
if let Some(exporter) = hdds::telemetry::get_exporter() {
exporter.push(&metrics_clone.snapshot());
}
thread::sleep(Duration::from_millis(100));
}
});

// Application loop with instrumentation
loop {
let start = Instant::now();

// Publish data
let data = SensorData { value: 42.0 };
match writer.write(&data) {
Ok(_) => {
metrics.increment_sent(1);
metrics.increment_bytes_sent(std::mem::size_of::<SensorData>() as u64);
}
Err(_) => {
metrics.increment_dropped(1);
}
}

// Record latency
let elapsed_ns = start.elapsed().as_nanos() as u64;
metrics.add_latency_sample(0, elapsed_ns);

thread::sleep(Duration::from_millis(10));
}
}