Skip to main content

Rust API Reference

HDDS is written in Rust, providing a safe, fast, and zero-copy DDS implementation.

Version 1.0.0

This documents the v1.0.0 stable API. Some features (async API) are not yet implemented.

Installation

# Clone the repository
git clone https://git.hdds.io/hdds/hdds.git
cd hdds

Then add HDDS as a path dependency in your Cargo.toml:

[dependencies]
hdds = { path = "../hdds/crates/hdds" }

# With XTypes support
hdds = { path = "../hdds/crates/hdds", features = ["xtypes"] }

Core Types

Participant

The entry point to HDDS. Uses a builder pattern for configuration.

use hdds::Participant;

// Basic creation
let participant = Participant::builder("my_app")
.domain_id(0)
.build()?;

// With UDP multicast transport (for network communication)
use hdds::TransportMode;

let participant = Participant::builder("sensor_node")
.domain_id(42)
.with_transport(TransportMode::UdpMulticast)
.build()?;

// With custom discovery ports
let participant = Participant::builder("custom_app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.with_discovery_ports(7400, 7401, 7410) // spdp, sedp, user
.build()?;

// With Kubernetes discovery (requires "k8s" feature)
#[cfg(feature = "k8s")]
let participant = Participant::builder("k8s_app")
.with_transport(TransportMode::UdpMulticast)
.with_k8s_discovery("hdds-discovery", "default")
.build()?;

// Properties
let domain_id = participant.domain_id();
let guid = participant.guid();
let name = participant.name();
let mode = participant.transport_mode();

TransportMode

pub enum TransportMode {
IntraProcess, // Default - shared memory between participants in same process
UdpMulticast, // Network communication via UDP multicast
}

Topic

Topics are created from a participant and used to build readers/writers.

use hdds::{Participant, DDS};

// Create a topic (T must implement DDS trait)
let topic = participant.topic::<SensorData>("sensors/temperature")?;

// Topic is used to create readers and writers
let writer = topic.writer().build()?;
let reader = topic.reader().build()?;

DataWriter

Writers publish data to a topic.

use hdds::QoS;

// Create writer with default QoS
let writer = participant.topic::<SensorData>("sensors/temp")?
.writer()
.build()?;

// Create writer with custom QoS
let writer = participant.topic::<SensorData>("sensors/temp")?
.writer()
.qos(QoS::reliable().keep_last(10).transient_local())
.build()?;

// Write data
let sample = SensorData {
sensor_id: 1,
value: 25.5,
timestamp: 1234567890,
};
writer.write(&sample)?;

// Get writer stats
let stats = writer.stats();
println!("Sent: {} samples", stats.samples_sent);

DataReader

Readers receive data from a topic.

use hdds::QoS;

// Create reader with default QoS
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.build()?;

// Create reader with custom QoS
let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;

// Take single sample (removes from cache)
if let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}

// Take batch of samples
let samples = reader.take_batch(10)?;
for sample in samples {
println!("Received: {:?}", sample);
}

// Polling loop
loop {
while let Some(sample) = reader.try_take()? {
process(&sample);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}

InstanceHandle

For keyed topics, InstanceHandle identifies specific instances (16-byte key hash).

use hdds::InstanceHandle;

// Create from key hash
let handle = InstanceHandle::new([0u8; 16]);

// Nil handle (for keyless topics)
let nil_handle = InstanceHandle::nil();
assert!(nil_handle.is_nil());

// Get raw bytes
let bytes: &[u8; 16] = handle.as_bytes();

Instance-Based Read/Take

For keyed topics, you can read/take samples for a specific instance.

use hdds::{QoS, InstanceHandle};

// Keyed topic type
#[derive(Debug, Clone, DDS)]
struct SensorData {
#[key]
sensor_id: u32,
value: f32,
timestamp: u64,
}

let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;

// Get instance handle from a sample's key
let handle: InstanceHandle = /* from sample key hash */;

// Take single sample for specific instance (removes from cache)
if let Some(sample) = reader.take_instance(handle)? {
println!("Instance {} value: {}", sample.sensor_id, sample.value);
}

// Take batch for specific instance
let samples = reader.take_instance_batch(handle, 10)?;
for sample in samples {
println!("Received: {:?}", sample);
}

// Read single sample (non-destructive, requires T: Clone)
if let Some(sample) = reader.read_instance(handle)? {
println!("Read: {:?}", sample);
}

// Read batch for specific instance
let samples = reader.read_instance_batch(handle, 10)?;

Instance Methods Summary:

MethodBehaviorRemoves from cache
take_instance(handle)Takes first matching sampleYes
take_instance_batch(handle, max)Takes up to max matching samplesYes
read_instance(handle)Reads first unread matching sample (T: Clone)No
read_instance_batch(handle, max)Reads up to max unread matching samplesNo
Performance

Instance methods use O(n) linear scan over the cache. For high-throughput scenarios with many instances, consider using take_batch() and filtering client-side.

Publisher / Subscriber

Optional grouping for writers/readers with shared QoS.

use hdds::QoS;

// Create publisher
let publisher = participant.create_publisher(QoS::default())?;
let writer = publisher.create_writer::<SensorData>("sensors/temp", QoS::reliable())?;

// Create subscriber
let subscriber = participant.create_subscriber(QoS::default())?;
let reader = subscriber.create_reader::<SensorData>("sensors/temp", QoS::reliable())?;

Coherent Changes API

Group multiple writes as an atomic unit. Readers see all changes or none.

Publisher Side:

let publisher = participant.create_publisher(QoS::default())?;
let writer_pos = publisher.create_writer::<Position>("position", QoS::reliable())?;
let writer_vel = publisher.create_writer::<Velocity>("velocity", QoS::reliable())?;

// Begin coherent set
publisher.begin_coherent_changes()?;

// All writes are batched
writer_pos.write(&Position { x: 10.0, y: 20.0 })?;
writer_vel.write(&Velocity { vx: 1.0, vy: 2.0 })?;

// Commit - readers now see both changes atomically
publisher.end_coherent_changes()?;

// Check state
assert!(!publisher.is_coherent());

Subscriber Side:

let subscriber = participant.create_subscriber(QoS::default())?;
let reader_pos = subscriber.create_reader::<Position>("position", QoS::reliable())?;
let reader_vel = subscriber.create_reader::<Velocity>("velocity", QoS::reliable())?;

// Lock view for consistent reading
subscriber.begin_access()?;

// Read coherent snapshot
let pos = reader_pos.take()?;
let vel = reader_vel.take()?;

// Unlock - new samples become visible
subscriber.end_access()?;

// Check state
assert!(!subscriber.is_access_locked());

API Summary:

EntityMethodDescription
Publisherbegin_coherent_changes()Start coherent set
Publisherend_coherent_changes()Commit coherent set
Publisheris_coherent()Check if in coherent set
Subscriberbegin_access()Lock view for reading
Subscriberend_access()Unlock view
Subscriberis_access_locked()Check if access locked

Error Handling:

// Nested calls return Error::InvalidState
publisher.begin_coherent_changes()?;
assert!(publisher.begin_coherent_changes().is_err()); // Already in coherent set

// End without begin returns Error::InvalidState
assert!(publisher.end_coherent_changes().is_err()); // Not in coherent set
DDS v1.4 Compliance

Coherent Changes implement DDS v1.4 Section 2.2.2.5.1.9 (Publisher) and Section 2.2.2.5.2.6 (Subscriber).

ContentFilteredTopic

Create a filtered view of a topic. Only samples matching the filter expression are delivered to readers.

Basic Usage:

use hdds::{Participant, DDS};

#[derive(Debug, Clone, DDS)]
struct Temperature {
sensor_id: u32,
value: f64,
}

let participant = Participant::builder("app").build()?;

// Create filtered topic: only temperatures > 25.0
let filtered = participant.create_content_filtered_topic::<Temperature>(
"high_temp", // filtered topic name
"sensors/temperature", // source topic name
"value > %0", // SQL-like filter expression
vec!["25.0".to_string()], // expression parameters
)?;

// Create reader from filtered topic
let reader = filtered.reader().build()?;

// Only receives samples where value > 25.0
while let Some(sample) = reader.take()? {
println!("High temp: {}", sample.value);
}

Filter Expression Syntax:

expression ::= condition
| expression AND expression
| expression OR expression
| NOT expression
| '(' expression ')'

condition ::= field_name operator value
operator ::= '>' | '<' | '>=' | '<=' | '=' | '<>' | '!='
value ::= parameter | literal
parameter ::= '%' digit+
literal ::= integer | float | 'string'

Complex Filters:

// Multiple conditions with AND
let filtered = participant.create_content_filtered_topic::<SensorData>(
"hot_dry",
"sensors",
"temperature > %0 AND humidity < %1",
vec!["25.0".to_string(), "80".to_string()],
)?;

// OR and parentheses
let filtered = participant.create_content_filtered_topic::<Alert>(
"critical",
"alerts",
"(severity = 'CRITICAL' OR severity = 'ERROR') AND timestamp > %0",
vec!["1704067200".to_string()],
)?;

// NOT operator
let filtered = participant.create_content_filtered_topic::<Event>(
"non_debug",
"events",
"NOT level = 'DEBUG'",
vec![],
)?;

// String matching with LIKE
let filtered = participant.create_content_filtered_topic::<Log>(
"error_logs",
"logs",
"message LIKE '%error%'",
vec![],
)?;

Runtime Parameter Updates:

let mut filtered = participant.create_content_filtered_topic::<Temperature>(
"threshold_temp",
"sensors/temperature",
"value > %0",
vec!["25.0".to_string()],
)?;

// Change threshold at runtime
filtered.set_expression_parameters(vec!["30.0".to_string()]);

API Summary:

MethodDescription
create_content_filtered_topic()Create filtered topic on Participant
name()Get filtered topic name
related_topic_name()Get source topic name
filter_expression()Get filter expression string
expression_parameters()Get current parameters
set_expression_parameters()Update parameters at runtime
reader()Create reader builder with filter attached

Supported Operators:

OperatorDescriptionExample
>Greater thanvalue > 100
<Less thanvalue < 50
>=Greater or equalvalue >= 0
<=Less or equalvalue <= 255
=Equalstatus = 'OK'
<> or !=Not equaltype <> 'DEBUG'
LIKEPattern matchname LIKE 'sensor%'
ANDLogical anda > 0 AND b < 10
ORLogical orx = 1 OR x = 2
NOTLogical notNOT enabled
DDS v1.4 Compliance

ContentFilteredTopic implements DDS v1.4 Section 2.2.2.7.2. Filtering is performed receiver-side after deserialization.

QoS Configuration

HDDS uses a single unified QoS type with a fluent builder API.

Basic QoS

use hdds::QoS;

// Predefined profiles
let qos = QoS::default(); // Best effort, volatile, keep last 1
let qos = QoS::best_effort(); // Explicit best effort
let qos = QoS::reliable(); // Reliable delivery

// Fluent configuration
let qos = QoS::reliable()
.keep_last(50)
.transient_local();

Reliability

use hdds::qos::Reliability;

// Best effort - fire and forget
let qos = QoS::default(); // Reliability::BestEffort

// Reliable - guaranteed delivery
let qos = QoS::reliable(); // Reliability::Reliable

History

use hdds::qos::History;

// Keep last N samples per instance
let qos = QoS::default().keep_last(10);

// Keep all samples (unbounded)
let qos = QoS::default().keep_all();

Durability

use hdds::qos::Durability;

// Volatile - no persistence (default)
let qos = QoS::default().volatile();

// Transient local - persist for late joiners
let qos = QoS::default().transient_local();

// Transient - persist in memory service
let qos = QoS::default().transient();

// Persistent - persist to disk
let qos = QoS::default().persistent();

Liveliness

use hdds::qos::{Liveliness, LivelinessKind};
use std::time::Duration;

let liveliness = Liveliness {
kind: LivelinessKind::Automatic,
lease_duration: Duration::from_secs(1),
};

// Or ManualByParticipant, ManualByTopic

Complete QoS Example

let writer_qos = QoS::reliable()
.keep_last(100)
.transient_local();

let reader_qos = QoS::reliable()
.keep_last(1000);

The DDS Trait

All topic types must implement the DDS trait for CDR2 serialization.

The IDL code generator creates complete implementations:

// sensor_data.idl
module sensors {
struct SensorData {
@key unsigned long sensor_id;
float value;
unsigned long long timestamp;
};
};
// build.rs
use hdds_gen::Parser;
use hdds_gen::codegen::rust_backend::RustGenerator;

fn main() {
let idl = std::fs::read_to_string("sensor_data.idl").unwrap();
let mut parser = Parser::new(&idl);
let ast = parser.parse().unwrap();

let code = RustGenerator::new().generate(&ast).unwrap();

let out_dir = std::env::var("OUT_DIR").unwrap();
std::fs::write(format!("{}/sensor_data.rs", out_dir), code).unwrap();

println!("cargo:rerun-if-changed=sensor_data.idl");
}
// main.rs
include!(concat!(env!("OUT_DIR"), "/sensor_data.rs"));

use sensors::SensorData;

fn main() -> Result<(), hdds::Error> {
let participant = Participant::builder("app").domain_id(0).build()?;
let writer = participant.topic::<SensorData>("sensors/temp")?.writer().build()?;

writer.write(&SensorData {
sensor_id: 1,
value: 25.5,
timestamp: 0,
})?;

Ok(())
}

Using derive macro

use hdds::DDS;

#[derive(Debug, Clone, DDS)]
struct Temperature {
celsius: f32,
timestamp: u32,
}

Manual Implementation

use hdds::{DDS, Result};
use hdds::core::types::TypeDescriptor;

impl DDS for MyType {
fn type_descriptor() -> &'static TypeDescriptor {
// Return type metadata
&MY_TYPE_DESCRIPTOR
}

fn encode_cdr2(&self, buf: &mut [u8]) -> Result<usize> {
// Encode to CDR2 little-endian
// Return bytes written
}

fn decode_cdr2(buf: &[u8]) -> Result<(Self, usize)> {
// Decode from CDR2 little-endian
// Returns (decoded_value, bytes_consumed)
}

fn compute_key(&self) -> [u8; 16] {
// MD5 hash of @key fields (or zeros if no keys)
[0u8; 16]
}

fn has_key() -> bool {
false
}
}

WaitSet

Event-based waiting for data availability.

use hdds::{WaitSet, StatusMask};
use std::time::Duration;

let reader = participant.topic::<SensorData>("sensors/temp")?
.reader()
.build()?;

// Get status condition
let condition = reader.get_status_condition();

// Create waitset and attach condition
let mut waitset = WaitSet::new();
waitset.attach(&condition)?;

// Wait for data
loop {
let active = waitset.wait(Some(Duration::from_secs(1)))?;

for cond in active {
if cond == condition {
while let Some(sample) = reader.try_take()? {
process(&sample);
}
}
}
}

Listeners

Listeners provide callback-based event handling for data and status changes. All callbacks are thread-safe (Send + Sync) and non-blocking.

Thread Safety Requirements

Callbacks are invoked from background threads. Implementations MUST:

  • Return quickly — Never block with sleep(), I/O, or locks that could deadlock
  • Never panic — A panic in a callback will crash the receive thread
  • Be Send + Sync — Required for thread-safe sharing
// ❌ BAD - Blocks the receive thread
fn on_data_available(&self, sample: &T) {
std::thread::sleep(Duration::from_secs(1)); // DON'T DO THIS
std::fs::write("log.txt", format!("{:?}", sample)); // Blocking I/O
}

// ✅ GOOD - Non-blocking, fast
fn on_data_available(&self, sample: &T) {
self.counter.fetch_add(1, Ordering::Relaxed);
if let Ok(sender) = self.channel.try_send(sample.clone()) {
// Process async in another thread
}
}

DataReaderListener

Trait for handling reader events:

use hdds::{DataReaderListener, SubscriptionMatchedStatus, LivelinessChangedStatus};
use hdds::{SampleLostStatus, SampleRejectedStatus};
use hdds::{RequestedDeadlineMissedStatus, RequestedIncompatibleQosStatus};

pub trait DataReaderListener<T>: Send + Sync {
/// Called when new data is available
fn on_data_available(&self, sample: &T);

/// Called when a matching writer is discovered or lost
fn on_subscription_matched(&self, status: SubscriptionMatchedStatus) {}

/// Called when a matched writer's liveliness changes
fn on_liveliness_changed(&self, status: LivelinessChangedStatus) {}

/// Called when samples are lost (e.g., overflow)
fn on_sample_lost(&self, status: SampleLostStatus) {}

/// Called when samples are rejected (e.g., resource limits)
fn on_sample_rejected(&self, status: SampleRejectedStatus) {}

/// Called when the requested deadline is missed
fn on_requested_deadline_missed(&self, status: RequestedDeadlineMissedStatus) {}

/// Called when QoS is incompatible with a writer
fn on_requested_incompatible_qos(&self, status: RequestedIncompatibleQosStatus) {}
}

DataWriterListener

Trait for handling writer events:

use hdds::DataWriterListener;

pub trait DataWriterListener<T>: Send + Sync {
/// Called after a sample is successfully written
fn on_sample_written(&self, sample: &T, sequence_number: u64);

/// Called when a matching reader is discovered or lost
fn on_publication_matched(&self, status: PublicationMatchedStatus) {}

/// Called when the offered deadline is missed
fn on_offered_deadline_missed(&self, instance_handle: Option<u64>) {}

/// Called when QoS is incompatible with a reader
fn on_offered_incompatible_qos(&self, policy_id: u32, policy_name: &str) {}

/// Called when liveliness is lost
fn on_liveliness_lost(&self) {}
}

ClosureListener

Simple closure-based listener for common use cases:

use hdds::{Participant, QoS, ClosureListener};
use std::sync::Arc;

// Create a closure-based listener
let listener = ClosureListener::new(|sample: &Temperature| {
println!("Received temperature: {:?}", sample);
});

// Attach to reader
let reader = participant
.topic::<Temperature>("sensor/temp")?
.reader()
.qos(QoS::reliable())
.with_listener(Arc::new(listener))
.build()?;

Custom Listener Implementation

For full control, implement the trait directly:

use hdds::{DataReaderListener, SubscriptionMatchedStatus};
use std::sync::atomic::{AtomicU64, Ordering};

struct MetricsListener {
received_count: AtomicU64,
}

impl MetricsListener {
fn new() -> Self {
Self { received_count: AtomicU64::new(0) }
}

fn count(&self) -> u64 {
self.received_count.load(Ordering::Relaxed)
}
}

impl DataReaderListener<Temperature> for MetricsListener {
fn on_data_available(&self, sample: &Temperature) {
self.received_count.fetch_add(1, Ordering::Relaxed);
println!("[{}] temp={:.1}°C", sample.sensor_id, sample.value);
}

fn on_subscription_matched(&self, status: SubscriptionMatchedStatus) {
println!("Matched writers: {}", status.current_count);
}
}

// Usage
let listener = Arc::new(MetricsListener::new());
let reader = participant
.topic::<Temperature>("sensor/temp")?
.reader()
.with_listener(listener.clone())
.build()?;

// Check metrics later
println!("Total received: {}", listener.count());

Writer Listener Example

use hdds::{DataWriterListener, PublicationMatchedStatus};

struct WriteLogger;

impl DataWriterListener<Command> for WriteLogger {
fn on_sample_written(&self, sample: &Command, seq: u64) {
println!("Wrote command {} (seq={})", sample.id, seq);
}

fn on_publication_matched(&self, status: PublicationMatchedStatus) {
println!("Matched readers: {}", status.current_count);
}
}

let writer = participant
.topic::<Command>("robot/cmd")?
.writer()
.qos(QoS::reliable())
.with_listener(Arc::new(WriteLogger))
.build()?;

Status Types

StatusDescription
SubscriptionMatchedStatusMatched writer count changes
PublicationMatchedStatusMatched reader count changes
LivelinessChangedStatusWriter liveliness state
SampleLostStatusSamples lost due to overflow
SampleRejectedStatusSamples rejected due to limits
RequestedDeadlineMissedStatusDeadline QoS violation
RequestedIncompatibleQosStatusQoS mismatch detected

Error Handling

use hdds::Error;

match writer.write(&sample) {
Ok(()) => println!("Written"),
Err(Error::Timeout) => eprintln!("Write timed out"),
Err(Error::NotEnabled) => eprintln!("Writer not enabled"),
Err(e) => eprintln!("Error: {}", e),
}

XTypes Support

Optional type introspection (feature-gated).

[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["xtypes"] }
#[cfg(feature = "xtypes")]
{
// Access type cache
let type_cache = participant.type_cache();

// Types implementing DDS can provide TypeObject
let type_obj = SensorData::get_type_object();
}

Complete Example

use hdds::{Participant, QoS, TransportMode, DDS};
use std::time::Duration;

#[derive(Debug, Clone, DDS)]
struct Temperature {
#[key]
sensor_id: u32,
celsius: f32,
timestamp: u64,
}

fn main() -> Result<(), hdds::Error> {
// Create participant with UDP transport
let participant = Participant::builder("temp_sensor")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

// Create writer with reliable QoS
let writer = participant
.topic::<Temperature>("sensors/temperature")?
.writer()
.qos(QoS::reliable().keep_last(10).transient_local())
.build()?;

// Publish temperature readings
for i in 0..100 {
let sample = Temperature {
sensor_id: 1,
celsius: 20.0 + (i as f32 * 0.1),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
};

writer.write(&sample)?;
println!("Published: {:?}", sample);

std::thread::sleep(Duration::from_millis(100));
}

Ok(())
}

Not Yet Implemented (v1.0.0)

The following features are planned but not available in v1.0.0:

FeatureStatus
Async API (async/await)Phase 7 - Planned
Instance lifecycle (dispose, unregister)Not implemented
SampleInfo with metadataNot implemented
wait_for_acknowledgments()Not implemented
What's New in v1.0.0
  • Listeners API - DataReaderListener, DataWriterListener, ClosureListener for callback-based event handling
  • Kubernetes Discovery - Zero-dep K8s DNS discovery with with_k8s_discovery() (feature: k8s)
  • ContentFilteredTopic - SQL-like filtering with create_content_filtered_topic(), supports AND/OR/NOT, LIKE, runtime parameter updates
  • Coherent Changes API - begin_coherent_changes(), end_coherent_changes(), begin_access(), end_access()
  • Instance-based read/take - read_instance(), take_instance(), read_instance_batch(), take_instance_batch()
  • InstanceHandle type - 16-byte key hash for keyed topics
  • Non-destructive read - read_instance() methods (requires T: Clone)

Next Steps