Skip to main content

Engine & Router

Internal packet routing and event distribution for HDDS.

Overview

The Engine module provides the core data delivery infrastructure:

  • Router - Background thread routing RTPS packets from RxRing to subscribers
  • TopicRegistry - Thread-safe topic→subscribers mapping with GUID routing
  • Subscriber - Trait for receiving topic data (callback pattern)
  • Hub - MPSC producer → NxSPSC subscribers for system events

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ HDDS Runtime │
│ │
│ MulticastListener │
│ │ │
│ ▼ │
│ RxRing.push() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Router │ │
│ │ route_data_packet() → TopicRegistry.get_topic() │ │
│ │ deliver_heartbeat() → HeartbeatHandler.on_heartbeat() │ │
│ │ deliver_nack() → NackHandler.on_nack() │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ TopicRegistry │ │
│ │ topics: HashMap<String, Topic> │ │
│ │ writer_guid_to_topic: HashMap<[u8;16], String> │ │
│ │ heartbeat_handlers: Vec<HeartbeatHandler> │ │
│ │ nack_handlers: Vec<NackHandler> │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Topic.deliver(seq, data) → Subscriber.on_data() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Hub │ │
│ │ Event::OnMatch, OnUnmatch, OnIncompatibleQos, SystemStall │ │
│ │ MPSC producer → NxSPSC subscriber rings │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Router

The Router is a background thread that routes RTPS packets from the multicast ring to registered subscribers.

Starting the Router

use hdds::engine::{Router, TopicRegistry, CallbackSubscriber};
use std::sync::Arc;

// Create topic registry
let registry = Arc::new(TopicRegistry::new());

// Register a subscriber
let subscriber = Arc::new(CallbackSubscriber::new(
"sensor/temperature".to_string(),
|topic, seq, data| {
println!("Received seq {} on {}: {} bytes", seq, topic, data.len());
},
));
registry.register_subscriber(subscriber).unwrap();

// Start router (requires RxRing and RxPool from multicast listener)
let router = Router::start(ring, pool, registry)?;

// ... application runs ...

// Stop router gracefully
router.stop()?;

RouterMetrics

Telemetry counters updated by the router:

use hdds::engine::RouterMetrics;

let metrics = RouterMetrics::new();

// After routing...
let (routed, orphaned, errors, bytes) = metrics.snapshot();
println!("Packets routed: {}", routed);
println!("Packets orphaned: {}", orphaned); // No matching topic
println!("Delivery errors: {}", errors); // Parse/delivery failures
println!("Bytes delivered: {}", bytes);
CounterDescription
packets_routedSuccessfully delivered DATA packets
packets_orphanedDATA packets with no matching topic
delivery_errorsParse failures or subscriber panics
bytes_deliveredTotal bytes delivered to subscribers

RouteStatus

pub enum RouteStatus {
Delivered, // Successfully delivered to topic
Orphaned, // Topic not found in registry
Dropped, // Parse error or missing payload
}

route_data_packet (HOT PATH)

Core routing function called for every DATA packet:

use hdds::engine::{route_data_packet, RouteStatus, TopicRegistry, RouterMetrics};

let status = route_data_packet(
payload, // Raw RTPS packet bytes
packet_len, // Packet length
payload_offset, // Optional offset to CDR payload
&registry, // TopicRegistry reference
&metrics, // RouterMetrics reference
);

match status {
RouteStatus::Delivered => { /* success */ }
RouteStatus::Orphaned => { /* topic not found */ }
RouteStatus::Dropped => { /* parse error */ }
}

TopicRegistry

Thread-safe registry for topics, subscribers, and GUID mappings.

Creating and Managing Topics

use hdds::engine::TopicRegistry;

let registry = TopicRegistry::new();

// Register a topic
registry.register_topic("sensor/temperature".to_string(), Some("SensorData".to_string()))?;

// Get topic
if let Some(topic) = registry.get_topic("sensor/temperature") {
println!("Topic: {}, subscribers: {}", topic.name(), topic.subscriber_count());
}

// Topic count
println!("Total topics: {}", registry.topic_count());

Registering Subscribers

use hdds::engine::{TopicRegistry, CallbackSubscriber, Subscriber};
use std::sync::Arc;

let registry = TopicRegistry::new();

// Register callback subscriber (auto-creates topic if not exists)
let subscriber: Arc<dyn Subscriber> = Arc::new(CallbackSubscriber::new(
"sensor/temperature".to_string(),
|topic, seq, data| {
println!("Data on {}: seq={}, len={}", topic, seq, data.len());
},
));

registry.register_subscriber(subscriber)?;

// Unregister
let removed = registry.unregister_subscriber("sensor/temperature")?;

GUID-Based Routing (RTI/FastDDS/CycloneDDS Interop)

RTI, FastDDS, and CycloneDDS often send DATA packets without inline QoS to save bandwidth. HDDS uses GUID-based routing as a fallback:

// Register writer GUID → topic mapping (called during SEDP discovery)
let writer_guid: [u8; 16] = [0x01, 0x0f, ...]; // From SEDP
registry.register_writer_guid(writer_guid, "sensor/temperature".to_string());

// Router uses this mapping for packets without inline QoS
if let Some(topic_name) = registry.get_topic_by_guid(&writer_guid) {
// Route to topic...
}

Fallback Unknown Writer Mapping

For interop scenarios where SEDP is unreliable:

# Enable fallback: bind unknown writers to single topic
export HDDS_ROUTE_UNKNOWN_WRITER_TO_SINGLE_TOPIC=1

When enabled, if there is exactly one topic with subscribers, unknown writer GUIDs are automatically bound to that topic.

Reliability Handlers

use hdds::engine::{TopicRegistry, HeartbeatHandler, NackHandler};

struct MyHeartbeatHandler;

impl HeartbeatHandler for MyHeartbeatHandler {
fn on_heartbeat(&self, heartbeat_bytes: &[u8]) {
// Process heartbeat (liveness check from writer)
}
}

struct MyNackHandler;

impl NackHandler for MyNackHandler {
fn on_nack(&self, nack_bytes: &[u8]) {
// Process NACK (retransmission request from reader)
}
}

let registry = TopicRegistry::new();
registry.register_heartbeat_handler(Arc::new(MyHeartbeatHandler));
registry.register_nack_handler(Arc::new(MyNackHandler));

Subscriber Trait

Trait for receiving topic data:

pub trait Subscriber: Send + Sync {
/// Called when data is received
fn on_data(&self, topic: &str, seq: u64, data: &[u8]);

/// Returns the topic name
fn topic_name(&self) -> &str;
}

CallbackSubscriber

Closure-based subscriber:

use hdds::engine::CallbackSubscriber;
use std::sync::Arc;

let subscriber = Arc::new(CallbackSubscriber::new(
"my_topic".to_string(),
|topic, seq, data| {
println!("Received: topic={}, seq={}, len={}", topic, seq, data.len());
},
));

Custom Subscriber

use hdds::engine::Subscriber;

struct MySensorSubscriber {
topic: String,
// ... custom state
}

impl Subscriber for MySensorSubscriber {
fn on_data(&self, topic: &str, seq: u64, data: &[u8]) {
// Deserialize and process sensor data...
}

fn topic_name(&self) -> &str {
&self.topic
}
}

Panic Isolation

If a subscriber panics during on_data(), the Router catches it and continues delivery to other subscribers. The panic is logged as a delivery_error metric.

Event Hub

MPSC producer → NxSPSC subscribers for system events.

Event Types

pub enum Event {
/// Writer-reader match (discovery complete)
OnMatch { writer_id: u32, reader_id: u32 },

/// Writer-reader unmatch (endpoint left)
OnUnmatch { writer_id: u32, reader_id: u32 },

/// QoS incompatibility detected
OnIncompatibleQos { reason: u8 },

/// System stall (congestion, resource exhaustion)
SystemStall,
}

Using the Hub

use hdds::engine::{Hub, Event};

// Create hub
let hub = Hub::new();

// Subscribe (each subscriber gets dedicated ring)
let my_ring = hub.subscribe(256); // Capacity 256 events

// Publish events (broadcast to all subscribers)
hub.publish(Event::OnMatch { writer_id: 1, reader_id: 2 });
hub.publish(Event::OnIncompatibleQos { reason: 1 });

// Receive events in subscriber
let ring = my_ring.lock().unwrap();
while let Some(entry) = ring.pop() {
let event = Hub::decode_event(entry);
match event {
Event::OnMatch { writer_id, reader_id } => {
println!("Match: writer={}, reader={}", writer_id, reader_id);
}
Event::OnIncompatibleQos { reason } => {
println!("QoS incompatible: reason={}", reason);
}
_ => {}
}
}

Event Encoding

Events are encoded into IndexEntry for efficient ring buffer storage:

FieldContent
seqEvent type (0=OnMatch, 1=OnUnmatch, 2=OnIncompatibleQos, 3=SystemStall)
handleUpper 16 bits = writer_id/reason, Lower 16 bits = reader_id
len0 (unused)

Topic

Topic metadata and subscriber fanout:

use hdds::engine::Topic;

let mut topic = Topic::new("my_topic".to_string(), Some("MyType".to_string()));

// Accessors
println!("Name: {}", topic.name());
println!("Type: {:?}", topic.type_name());
println!("Subscribers: {}", topic.subscriber_count());

// Deliver data (HOT PATH)
let errors = topic.deliver(seq_num, &cdr_payload);

Performance Notes

  • Lock-free metrics - All counters use relaxed atomics
  • RwLock for registry - Readers don't block each other
  • Panic isolation - std::panic::catch_unwind in delivery paths
  • HOT PATH optimization - #[inline] on critical functions
  • Lossy event delivery - Hub drops events if subscriber ring is full

Environment Variables

VariableDescription
HDDS_ROUTE_UNKNOWN_WRITER_TO_SINGLE_TOPICBind unknown GUIDs to single topic
HDDS_INTEROP_DIAGNOSTICSLog payload hex dumps for debugging