Engine & Router
Internal packet routing and event distribution for HDDS.
Overview
The Engine module provides the core data delivery infrastructure:
- Router - Background thread routing RTPS packets from RxRing to subscribers
- TopicRegistry - Thread-safe topic→subscribers mapping with GUID routing
- Subscriber - Trait for receiving topic data (callback pattern)
- Hub - MPSC producer → NxSPSC subscribers for system events
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ HDDS Runtime │
│ │
│ MulticastListener │
│ │ │
│ ▼ │
│ RxRing.push() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Router │ │
│ │ route_data_packet() → TopicRegistry.get_topic() │ │
│ │ deliver_heartbeat() → HeartbeatHandler.on_heartbeat() │ │
│ │ deliver_nack() → NackHandler.on_nack() │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ TopicRegistry │ │
│ │ topics: HashMap<String, Topic> │ │
│ │ writer_guid_to_topic: HashMap<[u8;16], String> │ │
│ │ heartbeat_handlers: Vec<HeartbeatHandler> │ │
│ │ nack_handlers: Vec<NackHandler> │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Topic.deliver(seq, data) → Subscriber.on_data() │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Hub │ │
│ │ Event::OnMatch, OnUnmatch, OnIncompatibleQos, SystemStall │ │
│ │ MPSC producer → NxSPSC subscriber rings │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Router
The Router is a background thread that routes RTPS packets from the multicast ring to registered subscribers.
Starting the Router
use hdds::engine::{Router, TopicRegistry, CallbackSubscriber};
use std::sync::Arc;
// Create topic registry
let registry = Arc::new(TopicRegistry::new());
// Register a subscriber
let subscriber = Arc::new(CallbackSubscriber::new(
"sensor/temperature".to_string(),
|topic, seq, data| {
println!("Received seq {} on {}: {} bytes", seq, topic, data.len());
},
));
registry.register_subscriber(subscriber).unwrap();
// Start router (requires RxRing and RxPool from multicast listener)
let router = Router::start(ring, pool, registry)?;
// ... application runs ...
// Stop router gracefully
router.stop()?;
RouterMetrics
Telemetry counters updated by the router:
use hdds::engine::RouterMetrics;
let metrics = RouterMetrics::new();
// After routing...
let (routed, orphaned, errors, bytes) = metrics.snapshot();
println!("Packets routed: {}", routed);
println!("Packets orphaned: {}", orphaned); // No matching topic
println!("Delivery errors: {}", errors); // Parse/delivery failures
println!("Bytes delivered: {}", bytes);
| Counter | Description |
|---|---|
packets_routed | Successfully delivered DATA packets |
packets_orphaned | DATA packets with no matching topic |
delivery_errors | Parse failures or subscriber panics |
bytes_delivered | Total bytes delivered to subscribers |
RouteStatus
pub enum RouteStatus {
Delivered, // Successfully delivered to topic
Orphaned, // Topic not found in registry
Dropped, // Parse error or missing payload
}
route_data_packet (HOT PATH)
Core routing function called for every DATA packet:
use hdds::engine::{route_data_packet, RouteStatus, TopicRegistry, RouterMetrics};
let status = route_data_packet(
payload, // Raw RTPS packet bytes
packet_len, // Packet length
payload_offset, // Optional offset to CDR payload
®istry, // TopicRegistry reference
&metrics, // RouterMetrics reference
);
match status {
RouteStatus::Delivered => { /* success */ }
RouteStatus::Orphaned => { /* topic not found */ }
RouteStatus::Dropped => { /* parse error */ }
}
TopicRegistry
Thread-safe registry for topics, subscribers, and GUID mappings.
Creating and Managing Topics
use hdds::engine::TopicRegistry;
let registry = TopicRegistry::new();
// Register a topic
registry.register_topic("sensor/temperature".to_string(), Some("SensorData".to_string()))?;
// Get topic
if let Some(topic) = registry.get_topic("sensor/temperature") {
println!("Topic: {}, subscribers: {}", topic.name(), topic.subscriber_count());
}
// Topic count
println!("Total topics: {}", registry.topic_count());
Registering Subscribers
use hdds::engine::{TopicRegistry, CallbackSubscriber, Subscriber};
use std::sync::Arc;
let registry = TopicRegistry::new();
// Register callback subscriber (auto-creates topic if not exists)
let subscriber: Arc<dyn Subscriber> = Arc::new(CallbackSubscriber::new(
"sensor/temperature".to_string(),
|topic, seq, data| {
println!("Data on {}: seq={}, len={}", topic, seq, data.len());
},
));
registry.register_subscriber(subscriber)?;
// Unregister
let removed = registry.unregister_subscriber("sensor/temperature")?;
GUID-Based Routing (RTI/FastDDS/CycloneDDS Interop)
RTI, FastDDS, and CycloneDDS often send DATA packets without inline QoS to save bandwidth. HDDS uses GUID-based routing as a fallback:
// Register writer GUID → topic mapping (called during SEDP discovery)
let writer_guid: [u8; 16] = [0x01, 0x0f, ...]; // From SEDP
registry.register_writer_guid(writer_guid, "sensor/temperature".to_string());
// Router uses this mapping for packets without inline QoS
if let Some(topic_name) = registry.get_topic_by_guid(&writer_guid) {
// Route to topic...
}
Fallback Unknown Writer Mapping
For interop scenarios where SEDP is unreliable:
# Enable fallback: bind unknown writers to single topic
export HDDS_ROUTE_UNKNOWN_WRITER_TO_SINGLE_TOPIC=1
When enabled, if there is exactly one topic with subscribers, unknown writer GUIDs are automatically bound to that topic.
Reliability Handlers
use hdds::engine::{TopicRegistry, HeartbeatHandler, NackHandler};
struct MyHeartbeatHandler;
impl HeartbeatHandler for MyHeartbeatHandler {
fn on_heartbeat(&self, heartbeat_bytes: &[u8]) {
// Process heartbeat (liveness check from writer)
}
}
struct MyNackHandler;
impl NackHandler for MyNackHandler {
fn on_nack(&self, nack_bytes: &[u8]) {
// Process NACK (retransmission request from reader)
}
}
let registry = TopicRegistry::new();
registry.register_heartbeat_handler(Arc::new(MyHeartbeatHandler));
registry.register_nack_handler(Arc::new(MyNackHandler));
Subscriber Trait
Trait for receiving topic data:
pub trait Subscriber: Send + Sync {
/// Called when data is received
fn on_data(&self, topic: &str, seq: u64, data: &[u8]);
/// Returns the topic name
fn topic_name(&self) -> &str;
}
CallbackSubscriber
Closure-based subscriber:
use hdds::engine::CallbackSubscriber;
use std::sync::Arc;
let subscriber = Arc::new(CallbackSubscriber::new(
"my_topic".to_string(),
|topic, seq, data| {
println!("Received: topic={}, seq={}, len={}", topic, seq, data.len());
},
));
Custom Subscriber
use hdds::engine::Subscriber;
struct MySensorSubscriber {
topic: String,
// ... custom state
}
impl Subscriber for MySensorSubscriber {
fn on_data(&self, topic: &str, seq: u64, data: &[u8]) {
// Deserialize and process sensor data...
}
fn topic_name(&self) -> &str {
&self.topic
}
}
Panic Isolation
If a subscriber panics during on_data(), the Router catches it and continues delivery to other subscribers. The panic is logged as a delivery_error metric.
Event Hub
MPSC producer → NxSPSC subscribers for system events.
Event Types
pub enum Event {
/// Writer-reader match (discovery complete)
OnMatch { writer_id: u32, reader_id: u32 },
/// Writer-reader unmatch (endpoint left)
OnUnmatch { writer_id: u32, reader_id: u32 },
/// QoS incompatibility detected
OnIncompatibleQos { reason: u8 },
/// System stall (congestion, resource exhaustion)
SystemStall,
}
Using the Hub
use hdds::engine::{Hub, Event};
// Create hub
let hub = Hub::new();
// Subscribe (each subscriber gets dedicated ring)
let my_ring = hub.subscribe(256); // Capacity 256 events
// Publish events (broadcast to all subscribers)
hub.publish(Event::OnMatch { writer_id: 1, reader_id: 2 });
hub.publish(Event::OnIncompatibleQos { reason: 1 });
// Receive events in subscriber
let ring = my_ring.lock().unwrap();
while let Some(entry) = ring.pop() {
let event = Hub::decode_event(entry);
match event {
Event::OnMatch { writer_id, reader_id } => {
println!("Match: writer={}, reader={}", writer_id, reader_id);
}
Event::OnIncompatibleQos { reason } => {
println!("QoS incompatible: reason={}", reason);
}
_ => {}
}
}
Event Encoding
Events are encoded into IndexEntry for efficient ring buffer storage:
| Field | Content |
|---|---|
seq | Event type (0=OnMatch, 1=OnUnmatch, 2=OnIncompatibleQos, 3=SystemStall) |
handle | Upper 16 bits = writer_id/reason, Lower 16 bits = reader_id |
len | 0 (unused) |
Topic
Topic metadata and subscriber fanout:
use hdds::engine::Topic;
let mut topic = Topic::new("my_topic".to_string(), Some("MyType".to_string()));
// Accessors
println!("Name: {}", topic.name());
println!("Type: {:?}", topic.type_name());
println!("Subscribers: {}", topic.subscriber_count());
// Deliver data (HOT PATH)
let errors = topic.deliver(seq_num, &cdr_payload);
Performance Notes
- Lock-free metrics - All counters use relaxed atomics
- RwLock for registry - Readers don't block each other
- Panic isolation -
std::panic::catch_unwindin delivery paths - HOT PATH optimization -
#[inline]on critical functions - Lossy event delivery - Hub drops events if subscriber ring is full
Environment Variables
| Variable | Description |
|---|---|
HDDS_ROUTE_UNKNOWN_WRITER_TO_SINGLE_TOPIC | Bind unknown GUIDs to single topic |
HDDS_INTEROP_DIAGNOSTICS | Log payload hex dumps for debugging |
Related
- Congestion Control - Rate limiting
- Telemetry - Metrics collection
- Concepts: Architecture - System overview