Publishers and Subscribers
Publishers and Subscribers are container entities that group DataWriters and DataReaders respectively.
Overview
DomainParticipant
├── Publisher
│ ├── DataWriter<SensorData> (writes to "SensorTopic")
│ └── DataWriter<Command> (writes to "CommandTopic")
│
└── Subscriber
├── DataReader<SensorData> (reads from "SensorTopic")
└── DataReader<Status> (reads from "StatusTopic")
Publisher
A Publisher manages a group of DataWriters with common configuration.
Creating a Publisher
use hdds::prelude::*;
let participant = DomainParticipant::new(0)?;
// Default publisher
let publisher = participant.create_publisher()?;
// With QoS
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["sensors", "zone_a"]));
let publisher = participant.create_publisher_with_qos(pub_qos)?;
Creating DataWriters
let topic = participant.create_topic::<SensorData>("SensorTopic")?;
// Create writer (inherits publisher QoS)
let writer = publisher.create_writer(&topic)?;
// With custom QoS
let writer_qos = DataWriterQos::default()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(1),
});
let writer = publisher.create_writer_with_qos(&topic, writer_qos)?;
Writing Data
// Basic write
let sample = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: now(),
};
writer.write(&sample)?;
// Write with timestamp
writer.write_w_timestamp(&sample, now())?;
// Non-blocking write
match writer.try_write(&sample) {
Ok(()) => println!("Written"),
Err(HddsError::WouldBlock) => println!("Buffer full"),
Err(e) => return Err(e.into()),
}
Publisher QoS
let pub_qos = PublisherQos::default()
// Partition for topic filtering
.partition(Partition::new(vec!["sensors"]))
// Group data for coherent updates
.presentation(Presentation::group_coherent())
// Metadata
.group_data(b"sensor_publisher_v1".to_vec());
Subscriber
A Subscriber manages a group of DataReaders with common configuration.
Creating a Subscriber
// Default subscriber
let subscriber = participant.create_subscriber()?;
// With QoS
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensors", "zone_*"])); // Wildcard
let subscriber = participant.create_subscriber_with_qos(sub_qos)?;
Creating DataReaders
let topic = participant.create_topic::<SensorData>("SensorTopic")?;
// Create reader
let reader = subscriber.create_reader(&topic)?;
// With custom QoS
let reader_qos = DataReaderQos::default()
.reliability(Reliability::Reliable)
.history(History::KeepLast { depth: 100 });
let reader = subscriber.create_reader_with_qos(&topic, reader_qos)?;
Reading Data
// Take (removes from cache)
match reader.take() {
Ok(samples) => {
for sample in samples {
println!("Received: {:?}", sample);
}
}
Err(HddsError::NoData) => {
// No data available
}
Err(e) => return Err(e.into()),
}
// Read (keeps in cache)
let samples = reader.read()?;
// Non-blocking
match reader.try_take() {
Ok(samples) => { /* process */ }
Err(HddsError::NoData) => { /* wait */ }
Err(e) => return Err(e.into()),
}
Subscriber QoS
let sub_qos = SubscriberQos::default()
// Partition matching
.partition(Partition::new(vec!["sensors", "zone_*"]))
// Group access
.presentation(Presentation::group_coherent())
// Metadata
.group_data(b"sensor_subscriber_v1".to_vec());
DataWriter
DataWriters are the endpoints that publish data.
Write Operations
// Standard write
writer.write(&sample)?;
// Write with instance handle (faster for keyed data)
let handle = writer.register_instance(&sample)?;
writer.write_w_handle(&sample, handle)?;
// Dispose instance (mark as "no longer available")
writer.dispose(&sample)?;
// Unregister (writer no longer manages this instance)
writer.unregister_instance(&sample)?;
Writer Status
// Matched readers
let matched = writer.matched_subscriptions();
println!("Matched {} readers", matched.len());
// Liveliness
let status = writer.get_liveliness_lost_status()?;
if status.total_count > 0 {
println!("Lost liveliness {} times", status.total_count);
}
// Deadline
let status = writer.get_offered_deadline_missed_status()?;
println!("Missed {} deadlines", status.total_count);
Writer Listener
struct MyWriterListener;
impl DataWriterListener for MyWriterListener {
fn on_publication_matched(&mut self, writer: &DataWriter<SensorData>, status: PublicationMatchedStatus) {
println!("Matched {} readers", status.current_count);
}
fn on_offered_deadline_missed(&mut self, writer: &DataWriter<SensorData>, status: OfferedDeadlineMissedStatus) {
println!("Missed deadline!");
}
fn on_liveliness_lost(&mut self, writer: &DataWriter<SensorData>, status: LivelinessLostStatus) {
println!("Lost liveliness!");
}
}
let writer = publisher.create_writer_with_listener(&topic, MyWriterListener)?;
DataReader
DataReaders are the endpoints that receive data.
Read Operations
// Take (remove from cache)
let samples = reader.take()?;
// Read (keep in cache)
let samples = reader.read()?;
// Take with sample info
let samples_with_info = reader.take_with_info()?;
for (sample, info) in samples_with_info {
println!("Sample: {:?}", sample);
println!(" Source timestamp: {:?}", info.source_timestamp);
println!(" Instance state: {:?}", info.instance_state);
}
// Read specific instance
let handle = reader.lookup_instance(&key)?;
let samples = reader.take_instance(handle)?;
Sample States
// Filter by sample state
let unread = reader.take_w_condition(
SampleStateKind::NotRead,
ViewStateKind::Any,
InstanceStateKind::Alive,
)?;
| State | Meaning |
|---|---|
SampleState::Read | Already returned by read/take |
SampleState::NotRead | Not yet returned |
ViewState::New | First sample for this instance |
ViewState::NotNew | Not the first sample |
InstanceState::Alive | Writer is active |
InstanceState::NotAliveDisposed | Writer disposed |
InstanceState::NotAliveNoWriters | No writers for instance |
Reader Listener
struct MyReaderListener;
impl DataReaderListener for MyReaderListener {
fn on_data_available(&mut self, reader: &DataReader<SensorData>) {
if let Ok(samples) = reader.take() {
for sample in samples {
println!("Received: {:?}", sample);
}
}
}
fn on_subscription_matched(&mut self, reader: &DataReader<SensorData>, status: SubscriptionMatchedStatus) {
println!("Matched {} writers", status.current_count);
}
fn on_requested_deadline_missed(&mut self, reader: &DataReader<SensorData>, status: RequestedDeadlineMissedStatus) {
println!("Writer missed deadline!");
}
fn on_liveliness_changed(&mut self, reader: &DataReader<SensorData>, status: LivelinessChangedStatus) {
println!("Alive writers: {}", status.alive_count);
}
}
Partitions
Partitions filter communication between publishers and subscribers:
// Publisher in "sensors" partition
let pub_qos = PublisherQos::default()
.partition(Partition::new(vec!["sensors"]));
let publisher = participant.create_publisher_with_qos(pub_qos)?;
// Subscriber in "sensors" partition - MATCHES
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensors"]));
let subscriber = participant.create_subscriber_with_qos(sub_qos)?;
// Subscriber in "commands" partition - NO MATCH
let sub_qos2 = SubscriberQos::default()
.partition(Partition::new(vec!["commands"]));
Partition Wildcards
// Subscriber matches all sensor partitions
let sub_qos = SubscriberQos::default()
.partition(Partition::new(vec!["sensor*"])); // Matches sensor_a, sensor_b, etc.
// Empty partition matches empty publisher partition
let sub_qos = SubscriberQos::default()
.partition(Partition::default()); // Matches default partition
WaitSets
Wait for data efficiently:
let waitset = WaitSet::new()?;
// Create read condition
let condition = reader.create_read_condition(
SampleStateKind::NotRead,
ViewStateKind::Any,
InstanceStateKind::Alive,
)?;
waitset.attach(condition)?;
// Wait for data
loop {
let triggered = waitset.wait(Duration::from_secs(1))?;
for condition in triggered {
// Data available on associated reader
let samples = reader.take()?;
for sample in samples {
println!("Received: {:?}", sample);
}
}
}
Best Practices
- Group related writers/readers - Use one Publisher/Subscriber per logical group
- Use partitions - For topic-level isolation without separate domains
- Set appropriate QoS - Match reliability, durability between endpoints
- Use listeners for events - React to matched/unmatched, deadline misses
- Prefer take() over read() - Unless you need to keep samples in cache
Common Patterns
Request-Reply
// Request side
let request_writer = publisher.create_writer(&request_topic)?;
let reply_reader = subscriber.create_reader(&reply_topic)?;
request_writer.write(&request)?;
let replies = reply_reader.take()?;
// Reply side
let request_reader = subscriber.create_reader(&request_topic)?;
let reply_writer = publisher.create_writer(&reply_topic)?;
for request in request_reader.take()? {
let reply = process(request);
reply_writer.write(&reply)?;
}
Async Read
// Using tokio
let reader = subscriber.create_reader(&topic)?;
tokio::spawn(async move {
loop {
match reader.take_async().await {
Ok(samples) => {
for sample in samples {
process(sample).await;
}
}
Err(e) => eprintln!("Error: {}", e),
}
}
});
Next Steps
- QoS Overview - Quality of Service policies
- Discovery - How endpoints find each other
- Reliable Delivery Example - Guaranteed delivery