Skip to main content

Publishers and Subscribers

Publishers and Subscribers are container entities that group DataWriters and DataReaders respectively.

Overview

DomainParticipant
├── Publisher
│ ├── DataWriter<SensorData> (writes to "SensorTopic")
│ └── DataWriter<Command> (writes to "CommandTopic")

└── Subscriber
├── DataReader<SensorData> (reads from "SensorTopic")
└── DataReader<Status> (reads from "StatusTopic")

Publisher

A Publisher manages a group of DataWriters with common configuration.

Creating a Publisher

use hdds::prelude::*;

let participant = DomainParticipant::new(0)?;

// Default publisher
let publisher = participant.create_publisher()?;

// With QoS
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["sensors", "zone_a"]));

let publisher = participant.create_publisher_with_qos(pub_qos)?;

Creating DataWriters

let topic = participant.create_topic::<SensorData>("SensorTopic")?;

// Create writer (inherits publisher QoS)
let writer = publisher.create_writer(&topic)?;

// With custom QoS
let writer_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
});

let writer = publisher.create_writer_with_qos(&topic, writer_qos)?;

Writing Data

// Basic write
let sample = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: now(),
};
writer.write(&sample)?;

// Write with timestamp
writer.write_w_timestamp(&sample, now())?;

// Non-blocking write
match writer.try_write(&sample) {
Ok(()) => println!("Written"),
Err(HddsError::WouldBlock) => println!("Buffer full"),
Err(e) => return Err(e.into()),
}

Publisher QoS

let pub_qos = PublisherQos::default()
// Partition for topic filtering
.partition(Partition::new(vec!["sensors"]))

// Group data for coherent updates
.presentation(Presentation::group_coherent())

// Metadata
.group_data(b"sensor_publisher_v1".to_vec());

Subscriber

A Subscriber manages a group of DataReaders with common configuration.

Creating a Subscriber

// Default subscriber
let subscriber = participant.create_subscriber()?;

// With QoS
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensors", "zone_*"])); // Wildcard

let subscriber = participant.create_subscriber_with_qos(sub_qos)?;

Creating DataReaders

let topic = participant.create_topic::<SensorData>("SensorTopic")?;

// Create reader
let reader = subscriber.create_reader(&topic)?;

// With custom QoS
let reader_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepLast { depth: 100 });

let reader = subscriber.create_reader_with_qos(&topic, reader_qos)?;

Reading Data

// Take (removes from cache)
match reader.take() {
Ok(samples) => {
for sample in samples {
println!("Received: {:?}", sample);
}
}
Err(HddsError::NoData) => {
// No data available
}
Err(e) => return Err(e.into()),
}

// Read (keeps in cache)
let samples = reader.read()?;

// Non-blocking
match reader.try_take() {
Ok(samples) => { /* process */ }
Err(HddsError::NoData) => { /* wait */ }
Err(e) => return Err(e.into()),
}

Subscriber QoS

let sub_qos = SubscriberQos::default()
// Partition matching
.partition(Partition::new(vec!["sensors", "zone_*"]))

// Group access
.presentation(Presentation::group_coherent())

// Metadata
.group_data(b"sensor_subscriber_v1".to_vec());

DataWriter

DataWriters are the endpoints that publish data.

Write Operations

// Standard write
writer.write(&sample)?;

// Write with instance handle (faster for keyed data)
let handle = writer.register_instance(&sample)?;
writer.write_w_handle(&sample, handle)?;

// Dispose instance (mark as "no longer available")
writer.dispose(&sample)?;

// Unregister (writer no longer manages this instance)
writer.unregister_instance(&sample)?;

Writer Status

// Matched readers
let matched = writer.matched_subscriptions();
println!("Matched {} readers", matched.len());

// Liveliness
let status = writer.get_liveliness_lost_status()?;
if status.total_count > 0 {
println!("Lost liveliness {} times", status.total_count);
}

// Deadline
let status = writer.get_offered_deadline_missed_status()?;
println!("Missed {} deadlines", status.total_count);

Writer Listener

struct MyWriterListener;

impl DataWriterListener for MyWriterListener {
fn on_publication_matched(&mut self, writer: &DataWriter<SensorData>, status: PublicationMatchedStatus) {
println!("Matched {} readers", status.current_count);
}

fn on_offered_deadline_missed(&mut self, writer: &DataWriter<SensorData>, status: OfferedDeadlineMissedStatus) {
println!("Missed deadline!");
}

fn on_liveliness_lost(&mut self, writer: &DataWriter<SensorData>, status: LivelinessLostStatus) {
println!("Lost liveliness!");
}
}

let writer = publisher.create_writer_with_listener(&topic, MyWriterListener)?;

DataReader

DataReaders are the endpoints that receive data.

Read Operations

// Take (remove from cache)
let samples = reader.take()?;

// Read (keep in cache)
let samples = reader.read()?;

// Take with sample info
let samples_with_info = reader.take_with_info()?;
for (sample, info) in samples_with_info {
println!("Sample: {:?}", sample);
println!(" Source timestamp: {:?}", info.source_timestamp);
println!(" Instance state: {:?}", info.instance_state);
}

// Read specific instance
let handle = reader.lookup_instance(&key)?;
let samples = reader.take_instance(handle)?;

Sample States

// Filter by sample state
let unread = reader.take_w_condition(
SampleStateKind::NotRead,
ViewStateKind::Any,
InstanceStateKind::Alive,
)?;
StateMeaning
SampleState::ReadAlready returned by read/take
SampleState::NotReadNot yet returned
ViewState::NewFirst sample for this instance
ViewState::NotNewNot the first sample
InstanceState::AliveWriter is active
InstanceState::NotAliveDisposedWriter disposed
InstanceState::NotAliveNoWritersNo writers for instance

Reader Listener

struct MyReaderListener;

impl DataReaderListener for MyReaderListener {
fn on_data_available(&mut self, reader: &DataReader<SensorData>) {
if let Ok(samples) = reader.take() {
for sample in samples {
println!("Received: {:?}", sample);
}
}
}

fn on_subscription_matched(&mut self, reader: &DataReader<SensorData>, status: SubscriptionMatchedStatus) {
println!("Matched {} writers", status.current_count);
}

fn on_requested_deadline_missed(&mut self, reader: &DataReader<SensorData>, status: RequestedDeadlineMissedStatus) {
println!("Writer missed deadline!");
}

fn on_liveliness_changed(&mut self, reader: &DataReader<SensorData>, status: LivelinessChangedStatus) {
println!("Alive writers: {}", status.alive_count);
}
}

Partitions

Partitions filter communication between publishers and subscribers:

// Publisher in "sensors" partition
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["sensors"]));
let publisher = participant.create_publisher_with_qos(pub_qos)?;

// Subscriber in "sensors" partition - MATCHES
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensors"]));
let subscriber = participant.create_subscriber_with_qos(sub_qos)?;

// Subscriber in "commands" partition - NO MATCH
let sub_qos2 = SubscriberQos::default()
.partition(Partition::new(vec!["commands"]));

Partition Wildcards

// Subscriber matches all sensor partitions
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensor*"])); // Matches sensor_a, sensor_b, etc.

// Empty partition matches empty publisher partition
let sub_qos = SubscriberQos::default()
.partition(Partition::default()); // Matches default partition

WaitSets

Wait for data efficiently:

let waitset = WaitSet::new()?;

// Create read condition
let condition = reader.create_read_condition(
SampleStateKind::NotRead,
ViewStateKind::Any,
InstanceStateKind::Alive,
)?;

waitset.attach(condition)?;

// Wait for data
loop {
let triggered = waitset.wait(Duration::from_secs(1))?;

for condition in triggered {
// Data available on associated reader
let samples = reader.take()?;
for sample in samples {
println!("Received: {:?}", sample);
}
}
}

Best Practices

  1. Group related writers/readers - Use one Publisher/Subscriber per logical group
  2. Use partitions - For topic-level isolation without separate domains
  3. Set appropriate QoS - Match reliability, durability between endpoints
  4. Use listeners for events - React to matched/unmatched, deadline misses
  5. Prefer take() over read() - Unless you need to keep samples in cache

Common Patterns

Request-Reply

// Request side
let request_writer = publisher.create_writer(&request_topic)?;
let reply_reader = subscriber.create_reader(&reply_topic)?;

request_writer.write(&request)?;
let replies = reply_reader.take()?;

// Reply side
let request_reader = subscriber.create_reader(&request_topic)?;
let reply_writer = publisher.create_writer(&reply_topic)?;

for request in request_reader.take()? {
let reply = process(request);
reply_writer.write(&reply)?;
}

Async Read

// Using tokio
let reader = subscriber.create_reader(&topic)?;

tokio::spawn(async move {
loop {
match reader.take_async().await {
Ok(samples) => {
for sample in samples {
process(sample).await;
}
}
Err(e) => eprintln!("Error: {}", e),
}
}
});

Next Steps