Rust API Reference
HDDS is written in Rust, providing a safe, fast, and zero-copy DDS implementation.
This documents the v1.0.0 stable API. Some features (async API) are not yet implemented.
Installation
# Clone the repository
git clone https://git.hdds.io/hdds/hdds.git
cd hdds
Then add HDDS as a path dependency in your Cargo.toml:
[dependencies]
hdds = { path = "../hdds/crates/hdds" }
# With XTypes support
hdds = { path = "../hdds/crates/hdds", features = ["xtypes"] }
Core Types
Participant
The entry point to HDDS. Uses a builder pattern for configuration.
use hdds::Participant;
// Basic creation
let participant = Participant::builder("my_app")
.domain_id(0)
.build()?;
// With UDP multicast transport (for network communication)
use hdds::TransportMode;
let participant = Participant::builder("sensor_node")
.domain_id(42)
.with_transport(TransportMode::UdpMulticast)
.build()?;
// With custom discovery ports
let participant = Participant::builder("custom_app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.with_discovery_ports(7400, 7401, 7410) // spdp, sedp, user
.build()?;
// With Kubernetes discovery (requires "k8s" feature)
#[cfg(feature = "k8s")]
let participant = Participant::builder("k8s_app")
.with_transport(TransportMode::UdpMulticast)
.with_k8s_discovery("hdds-discovery", "default")
.build()?;
// Properties
let domain_id = participant.domain_id();
let guid = participant.guid();
let name = participant.name();
let mode = participant.transport_mode();
TransportMode
pub enum TransportMode {
IntraProcess, // Default - shared memory between participants in same process
UdpMulticast, // Network communication via UDP multicast
}
Topic
Topics are created from a participant and used to build readers/writers.
use hdds::{Participant, DDS};
// Create a topic (T must implement DDS trait)
let topic = participant.topic::<SensorData>("sensors/temperature")?;
// Topic is used to create readers and writers
let writer = topic.writer().build()?;
let reader = topic.reader().build()?;
DataWriter
Writers publish data to a topic.
use hdds::QoS;
// Create writer with default QoS
let writer = participant.topic::<SensorData>("sensors/temp")?
.writer()
.build()?;
// Create writer with custom QoS
let writer = participant.topic::<SensorData>("sensors/temp")?
.writer()
.qos(QoS::reliable().keep_last(10).transient_local())
.build()?;
// Write data
let sample = SensorData {
sensor_id: 1,
value: 25.5,
timestamp: 1234567890,
};
writer.write(&sample)?;
// Get writer stats
let stats = writer.stats();
println!("Sent: {} samples", stats.samples_sent);
DataReader
Readers receive data from a topic.
use hdds::QoS;
// Create reader with default QoS
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.build()?;
// Create reader with custom QoS
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;
// Take single sample (removes from cache)
if let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}
// Take batch of samples
let samples = reader.take_batch(10)?;
for sample in samples {
println!("Received: {:?}", sample);
}
// Polling loop
loop {
while let Some(sample) = reader.try_take()? {
process(&sample);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
InstanceHandle
For keyed topics, InstanceHandle identifies specific instances (16-byte key hash).
use hdds::InstanceHandle;
// Create from key hash
let handle = InstanceHandle::new([0u8; 16]);
// Nil handle (for keyless topics)
let nil_handle = InstanceHandle::nil();
assert!(nil_handle.is_nil());
// Get raw bytes
let bytes: &[u8; 16] = handle.as_bytes();
Instance-Based Read/Take
For keyed topics, you can read/take samples for a specific instance.
use hdds::{QoS, InstanceHandle};
// Keyed topic type
#[derive(Debug, Clone, DDS)]
struct SensorData {
#[key]
sensor_id: u32,
value: f32,
timestamp: u64,
}
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;
// Get instance handle from a sample's key
let handle: InstanceHandle = /* from sample key hash */;
// Take single sample for specific instance (removes from cache)
if let Some(sample) = reader.take_instance(handle)? {
println!("Instance {} value: {}", sample.sensor_id, sample.value);
}
// Take batch for specific instance
let samples = reader.take_instance_batch(handle, 10)?;
for sample in samples {
println!("Received: {:?}", sample);
}
// Read single sample (non-destructive, requires T: Clone)
if let Some(sample) = reader.read_instance(handle)? {
println!("Read: {:?}", sample);
}
// Read batch for specific instance
let samples = reader.read_instance_batch(handle, 10)?;
Instance Methods Summary:
| Method | Behavior | Removes from cache |
|---|---|---|
take_instance(handle) | Takes first matching sample | Yes |
take_instance_batch(handle, max) | Takes up to max matching samples | Yes |
read_instance(handle) | Reads first unread matching sample (T: Clone) | No |
read_instance_batch(handle, max) | Reads up to max unread matching samples | No |
Instance methods use O(n) linear scan over the cache. For high-throughput scenarios with many instances, consider using take_batch() and filtering client-side.
Publisher / Subscriber
Optional grouping for writers/readers with shared QoS.
use hdds::QoS;
// Create publisher
let publisher = participant.create_publisher(QoS::default())?;
let writer = publisher.create_writer::<SensorData>("sensors/temp", QoS::reliable())?;
// Create subscriber
let subscriber = participant.create_subscriber(QoS::default())?;
let reader = subscriber.create_reader::<SensorData>("sensors/temp", QoS::reliable())?;
Coherent Changes API
Group multiple writes as an atomic unit. Readers see all changes or none.
Publisher Side:
let publisher = participant.create_publisher(QoS::default())?;
let writer_pos = publisher.create_writer::<Position>("position", QoS::reliable())?;
let writer_vel = publisher.create_writer::<Velocity>("velocity", QoS::reliable())?;
// Begin coherent set
publisher.begin_coherent_changes()?;
// All writes are batched
writer_pos.write(&Position { x: 10.0, y: 20.0 })?;
writer_vel.write(&Velocity { vx: 1.0, vy: 2.0 })?;
// Commit - readers now see both changes atomically
publisher.end_coherent_changes()?;
// Check state
assert!(!publisher.is_coherent());
Subscriber Side:
let subscriber = participant.create_subscriber(QoS::default())?;
let reader_pos = subscriber.create_reader::<Position>("position", QoS::reliable())?;
let reader_vel = subscriber.create_reader::<Velocity>("velocity", QoS::reliable())?;
// Lock view for consistent reading
subscriber.begin_access()?;
// Read coherent snapshot
let pos = reader_pos.take()?;
let vel = reader_vel.take()?;
// Unlock - new samples become visible
subscriber.end_access()?;
// Check state
assert!(!subscriber.is_access_locked());
API Summary:
| Entity | Method | Description |
|---|---|---|
| Publisher | begin_coherent_changes() | Start coherent set |
| Publisher | end_coherent_changes() | Commit coherent set |
| Publisher | is_coherent() | Check if in coherent set |
| Subscriber | begin_access() | Lock view for reading |
| Subscriber | end_access() | Unlock view |
| Subscriber | is_access_locked() | Check if access locked |
Error Handling:
// Nested calls return Error::InvalidState
publisher.begin_coherent_changes()?;
assert!(publisher.begin_coherent_changes().is_err()); // Already in coherent set
// End without begin returns Error::InvalidState
assert!(publisher.end_coherent_changes().is_err()); // Not in coherent set
Coherent Changes implement DDS v1.4 Section 2.2.2.5.1.9 (Publisher) and Section 2.2.2.5.2.6 (Subscriber).
ContentFilteredTopic
Create a filtered view of a topic. Only samples matching the filter expression are delivered to readers.
Basic Usage:
use hdds::{Participant, DDS};
#[derive(Debug, Clone, DDS)]
struct Temperature {
sensor_id: u32,
value: f64,
}
let participant = Participant::builder("app").build()?;
// Create filtered topic: only temperatures > 25.0
let filtered = participant.create_content_filtered_topic::<Temperature>(
"high_temp", // filtered topic name
"sensors/temperature", // source topic name
"value > %0", // SQL-like filter expression
vec!["25.0".to_string()], // expression parameters
)?;
// Create reader from filtered topic
let reader = filtered.reader().build()?;
// Only receives samples where value > 25.0
while let Some(sample) = reader.take()? {
println!("High temp: {}", sample.value);
}
Filter Expression Syntax:
expression ::= condition
| expression AND expression
| expression OR expression
| NOT expression
| '(' expression ')'
condition ::= field_name operator value
operator ::= '>' | '<' | '>=' | '<=' | '=' | '<>' | '!='
value ::= parameter | literal
parameter ::= '%' digit+
literal ::= integer | float | 'string'
Complex Filters:
// Multiple conditions with AND
let filtered = participant.create_content_filtered_topic::<SensorData>(
"hot_dry",
"sensors",
"temperature > %0 AND humidity < %1",
vec!["25.0".to_string(), "80".to_string()],
)?;
// OR and parentheses
let filtered = participant.create_content_filtered_topic::<Alert>(
"critical",
"alerts",
"(severity = 'CRITICAL' OR severity = 'ERROR') AND timestamp > %0",
vec!["1704067200".to_string()],
)?;
// NOT operator
let filtered = participant.create_content_filtered_topic::<Event>(
"non_debug",
"events",
"NOT level = 'DEBUG'",
vec![],
)?;
// String matching with LIKE
let filtered = participant.create_content_filtered_topic::<Log>(
"error_logs",
"logs",
"message LIKE '%error%'",
vec![],
)?;
Runtime Parameter Updates:
let mut filtered = participant.create_content_filtered_topic::<Temperature>(
"threshold_temp",
"sensors/temperature",
"value > %0",
vec!["25.0".to_string()],
)?;
// Change threshold at runtime
filtered.set_expression_parameters(vec!["30.0".to_string()]);
API Summary:
| Method | Description |
|---|---|
create_content_filtered_topic() | Create filtered topic on Participant |
name() | Get filtered topic name |
related_topic_name() | Get source topic name |
filter_expression() | Get filter expression string |
expression_parameters() | Get current parameters |
set_expression_parameters() | Update parameters at runtime |
reader() | Create reader builder with filter attached |
Supported Operators:
| Operator | Description | Example |
|---|---|---|
> | Greater than | value > 100 |
< | Less than | value < 50 |
>= | Greater or equal | value >= 0 |
<= | Less or equal | value <= 255 |
= | Equal | status = 'OK' |
<> or != | Not equal | type <> 'DEBUG' |
LIKE | Pattern match | name LIKE 'sensor%' |
AND | Logical and | a > 0 AND b < 10 |
OR | Logical or | x = 1 OR x = 2 |
NOT | Logical not | NOT enabled |
ContentFilteredTopic implements DDS v1.4 Section 2.2.2.7.2. Filtering is performed receiver-side after deserialization.
QoS Configuration
HDDS uses a single unified QoS type with a fluent builder API.
Basic QoS
use hdds::QoS;
// Predefined profiles
let qos = QoS::default(); // Best effort, volatile, keep last 1
let qos = QoS::best_effort(); // Explicit best effort
let qos = QoS::reliable(); // Reliable delivery
// Fluent configuration
let qos = QoS::reliable()
.keep_last(50)
.transient_local();
Reliability
use hdds::qos::Reliability;
// Best effort - fire and forget
let qos = QoS::default(); // Reliability::BestEffort
// Reliable - guaranteed delivery
let qos = QoS::reliable(); // Reliability::Reliable
History
use hdds::qos::History;
// Keep last N samples per instance
let qos = QoS::default().keep_last(10);
// Keep all samples (unbounded)
let qos = QoS::default().keep_all();
Durability
use hdds::qos::Durability;
// Volatile - no persistence (default)
let qos = QoS::default().volatile();
// Transient local - persist for late joiners
let qos = QoS::default().transient_local();
// Transient - persist in memory service
let qos = QoS::default().transient();
// Persistent - persist to disk
let qos = QoS::default().persistent();
Liveliness
use hdds::qos::{Liveliness, LivelinessKind};
use std::time::Duration;
let liveliness = Liveliness {
kind: LivelinessKind::Automatic,
lease_duration: Duration::from_secs(1),
};
// Or ManualByParticipant, ManualByTopic
Complete QoS Example
let writer_qos = QoS::reliable()
.keep_last(100)
.transient_local();
let reader_qos = QoS::reliable()
.keep_last(1000);
The DDS Trait
All topic types must implement the DDS trait for CDR2 serialization.
Using hdds_gen (Recommended)
The IDL code generator creates complete implementations:
// sensor_data.idl
module sensors {
struct SensorData {
@key unsigned long sensor_id;
float value;
unsigned long long timestamp;
};
};
// build.rs
use hdds_gen::Parser;
use hdds_gen::codegen::rust_backend::RustGenerator;
fn main() {
let idl = std::fs::read_to_string("sensor_data.idl").unwrap();
let mut parser = Parser::new(&idl);
let ast = parser.parse().unwrap();
let code = RustGenerator::new().generate(&ast).unwrap();
let out_dir = std::env::var("OUT_DIR").unwrap();
std::fs::write(format!("{}/sensor_data.rs", out_dir), code).unwrap();
println!("cargo:rerun-if-changed=sensor_data.idl");
}
// main.rs
include!(concat!(env!("OUT_DIR"), "/sensor_data.rs"));
use sensors::SensorData;
fn main() -> Result<(), hdds::Error> {
let participant = Participant::builder("app").domain_id(0).build()?;
let writer = participant.topic::<SensorData>("sensors/temp")?.writer().build()?;
writer.write(&SensorData {
sensor_id: 1,
value: 25.5,
timestamp: 0,
})?;
Ok(())
}
Using derive macro
use hdds::DDS;
#[derive(Debug, Clone, DDS)]
struct Temperature {
celsius: f32,
timestamp: u32,
}
Manual Implementation
use hdds::{DDS, Result};
use hdds::core::types::TypeDescriptor;
impl DDS for MyType {
fn type_descriptor() -> &'static TypeDescriptor {
// Return type metadata
&MY_TYPE_DESCRIPTOR
}
fn encode_cdr2(&self, buf: &mut [u8]) -> Result<usize> {
// Encode to CDR2 little-endian
// Return bytes written
}
fn decode_cdr2(buf: &[u8]) -> Result<(Self, usize)> {
// Decode from CDR2 little-endian
// Returns (decoded_value, bytes_consumed)
}
fn compute_key(&self) -> [u8; 16] {
// MD5 hash of @key fields (or zeros if no keys)
[0u8; 16]
}
fn has_key() -> bool {
false
}
}
WaitSet
Event-based waiting for data availability.
use hdds::{WaitSet, StatusMask};
use std::time::Duration;
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.build()?;
// Get status condition
let condition = reader.get_status_condition();
// Create waitset and attach condition
let mut waitset = WaitSet::new();
waitset.attach(&condition)?;
// Wait for data
loop {
let active = waitset.wait(Some(Duration::from_secs(1)))?;
for cond in active {
if cond == condition {
while let Some(sample) = reader.try_take()? {
process(&sample);
}
}
}
}
Listeners
Listeners provide callback-based event handling for data and status changes. All callbacks are thread-safe (Send + Sync) and non-blocking.
Callbacks are invoked from background threads. Implementations MUST:
- Return quickly — Never block with
sleep(), I/O, or locks that could deadlock - Never panic — A panic in a callback will crash the receive thread
- Be
Send + Sync— Required for thread-safe sharing
// ❌ BAD - Blocks the receive thread
fn on_data_available(&self, sample: &T) {
std::thread::sleep(Duration::from_secs(1)); // DON'T DO THIS
std::fs::write("log.txt", format!("{:?}", sample)); // Blocking I/O
}
// ✅ GOOD - Non-blocking, fast
fn on_data_available(&self, sample: &T) {
self.counter.fetch_add(1, Ordering::Relaxed);
if let Ok(sender) = self.channel.try_send(sample.clone()) {
// Process async in another thread
}
}
DataReaderListener
Trait for handling reader events:
use hdds::{DataReaderListener, SubscriptionMatchedStatus, LivelinessChangedStatus};
use hdds::{SampleLostStatus, SampleRejectedStatus};
use hdds::{RequestedDeadlineMissedStatus, RequestedIncompatibleQosStatus};
pub trait DataReaderListener<T>: Send + Sync {
/// Called when new data is available
fn on_data_available(&self, sample: &T);
/// Called when a matching writer is discovered or lost
fn on_subscription_matched(&self, status: SubscriptionMatchedStatus) {}
/// Called when a matched writer's liveliness changes
fn on_liveliness_changed(&self, status: LivelinessChangedStatus) {}
/// Called when samples are lost (e.g., overflow)
fn on_sample_lost(&self, status: SampleLostStatus) {}
/// Called when samples are rejected (e.g., resource limits)
fn on_sample_rejected(&self, status: SampleRejectedStatus) {}
/// Called when the requested deadline is missed
fn on_requested_deadline_missed(&self, status: RequestedDeadlineMissedStatus) {}
/// Called when QoS is incompatible with a writer
fn on_requested_incompatible_qos(&self, status: RequestedIncompatibleQosStatus) {}
}
DataWriterListener
Trait for handling writer events:
use hdds::DataWriterListener;
pub trait DataWriterListener<T>: Send + Sync {
/// Called after a sample is successfully written
fn on_sample_written(&self, sample: &T, sequence_number: u64);
/// Called when a matching reader is discovered or lost
fn on_publication_matched(&self, status: PublicationMatchedStatus) {}
/// Called when the offered deadline is missed
fn on_offered_deadline_missed(&self, instance_handle: Option<u64>) {}
/// Called when QoS is incompatible with a reader
fn on_offered_incompatible_qos(&self, policy_id: u32, policy_name: &str) {}
/// Called when liveliness is lost
fn on_liveliness_lost(&self) {}
}
ClosureListener
Simple closure-based listener for common use cases:
use hdds::{Participant, QoS, ClosureListener};
use std::sync::Arc;
// Create a closure-based listener
let listener = ClosureListener::new(|sample: &Temperature| {
println!("Received temperature: {:?}", sample);
});
// Attach to reader
let reader = participant
.topic::<Temperature>("sensor/temp")?
.reader()
.qos(QoS::reliable())
.with_listener(Arc::new(listener))
.build()?;
Custom Listener Implementation
For full control, implement the trait directly:
use hdds::{DataReaderListener, SubscriptionMatchedStatus};
use std::sync::atomic::{AtomicU64, Ordering};
struct MetricsListener {
received_count: AtomicU64,
}
impl MetricsListener {
fn new() -> Self {
Self { received_count: AtomicU64::new(0) }
}
fn count(&self) -> u64 {
self.received_count.load(Ordering::Relaxed)
}
}
impl DataReaderListener<Temperature> for MetricsListener {
fn on_data_available(&self, sample: &Temperature) {
self.received_count.fetch_add(1, Ordering::Relaxed);
println!("[{}] temp={:.1}°C", sample.sensor_id, sample.value);
}
fn on_subscription_matched(&self, status: SubscriptionMatchedStatus) {
println!("Matched writers: {}", status.current_count);
}
}
// Usage
let listener = Arc::new(MetricsListener::new());
let reader = participant
.topic::<Temperature>("sensor/temp")?
.reader()
.with_listener(listener.clone())
.build()?;
// Check metrics later
println!("Total received: {}", listener.count());
Writer Listener Example
use hdds::{DataWriterListener, PublicationMatchedStatus};
struct WriteLogger;
impl DataWriterListener<Command> for WriteLogger {
fn on_sample_written(&self, sample: &Command, seq: u64) {
println!("Wrote command {} (seq={})", sample.id, seq);
}
fn on_publication_matched(&self, status: PublicationMatchedStatus) {
println!("Matched readers: {}", status.current_count);
}
}
let writer = participant
.topic::<Command>("robot/cmd")?
.writer()
.qos(QoS::reliable())
.with_listener(Arc::new(WriteLogger))
.build()?;
Status Types
| Status | Description |
|---|---|
SubscriptionMatchedStatus | Matched writer count changes |
PublicationMatchedStatus | Matched reader count changes |
LivelinessChangedStatus | Writer liveliness state |
SampleLostStatus | Samples lost due to overflow |
SampleRejectedStatus | Samples rejected due to limits |
RequestedDeadlineMissedStatus | Deadline QoS violation |
RequestedIncompatibleQosStatus | QoS mismatch detected |
Error Handling
use hdds::Error;
match writer.write(&sample) {
Ok(()) => println!("Written"),
Err(Error::Timeout) => eprintln!("Write timed out"),
Err(Error::NotEnabled) => eprintln!("Writer not enabled"),
Err(e) => eprintln!("Error: {}", e),
}
XTypes Support
Optional type introspection (feature-gated).
[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["xtypes"] }
#[cfg(feature = "xtypes")]
{
// Access type cache
let type_cache = participant.type_cache();
// Types implementing DDS can provide TypeObject
let type_obj = SensorData::get_type_object();
}
Complete Example
use hdds::{Participant, QoS, TransportMode, DDS};
use std::time::Duration;
#[derive(Debug, Clone, DDS)]
struct Temperature {
#[key]
sensor_id: u32,
celsius: f32,
timestamp: u64,
}
fn main() -> Result<(), hdds::Error> {
// Create participant with UDP transport
let participant = Participant::builder("temp_sensor")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
// Create writer with reliable QoS
let writer = participant
.topic::<Temperature>("sensors/temperature")?
.writer()
.qos(QoS::reliable().keep_last(10).transient_local())
.build()?;
// Publish temperature readings
for i in 0..100 {
let sample = Temperature {
sensor_id: 1,
celsius: 20.0 + (i as f32 * 0.1),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};
writer.write(&sample)?;
println!("Published: {:?}", sample);
std::thread::sleep(Duration::from_millis(100));
}
Ok(())
}
Not Yet Implemented (v1.0.0)
The following features are planned but not available in v1.0.0:
| Feature | Status |
|---|---|
Async API (async/await) | Phase 7 - Planned |
Instance lifecycle (dispose, unregister) | Not implemented |
| SampleInfo with metadata | Not implemented |
wait_for_acknowledgments() | Not implemented |
- Listeners API -
DataReaderListener,DataWriterListener,ClosureListenerfor callback-based event handling - Kubernetes Discovery - Zero-dep K8s DNS discovery with
with_k8s_discovery()(feature:k8s) - ContentFilteredTopic - SQL-like filtering with
create_content_filtered_topic(), supports AND/OR/NOT, LIKE, runtime parameter updates - Coherent Changes API -
begin_coherent_changes(),end_coherent_changes(),begin_access(),end_access() - Instance-based read/take -
read_instance(),take_instance(),read_instance_batch(),take_instance_batch() - InstanceHandle type - 16-byte key hash for keyed topics
- Non-destructive read -
read_instance()methods (requiresT: Clone)
Next Steps
- Hello World Rust - Complete tutorial
- QoS Policies - QoS configuration
- hdds_gen - IDL code generator
- Examples - More examples