C++ API Reference
HDDS provides a modern C++ SDK with RAII wrappers around the C FFI layer. The API uses C++17 features for safe, idiomatic usage.
This documents the current v1.0.11 API. Listeners are supported via hdds_listener.hpp. Instance management and content-filtered topics are not yet implemented.
Installation
# Build HDDS core + C++ SDK (Rust toolchain + CMake required)
cd /path/to/hdds
make sdk-cxx
Manual build (without Make)
cd /path/to/hdds
cargo build --release -p hdds-c # Rust core (libhdds_c.a)
cd sdk/cxx && mkdir -p build && cd build
cmake .. && make -j$(nproc) # C++ wrappers (libhdds_cxx.a)
System-wide install (make install)
# Build + install to /usr/local (default)
make sdk-cxx
sudo make install
# Or install to a custom prefix
make sdk-cxx
make install PREFIX=/opt/hdds
After installing, CMake finds HDDS automatically:
# With default prefix (/usr/local)
cmake ..
# With custom prefix
cmake .. -DCMAKE_PREFIX_PATH=/opt/hdds
CMake Integration
HDDS provides a find_package() config module. Set CMAKE_PREFIX_PATH to the SDK cmake directory:
find_package(hdds REQUIRED)
target_link_libraries(myapp PRIVATE hdds::hdds)
cmake .. -DCMAKE_PREFIX_PATH=/path/to/hdds/sdk/cmake
Available imported targets:
hdds::hdds-- convenience alias (links hdds_cxx + hdds_c + platform libs)hdds::hdds_cxx-- C++ RAII wrappershdds::hdds_c-- C FFI layer
Alternative: manual paths (without find_package)
set(HDDS_ROOT "$ENV{HOME}/hdds" CACHE PATH "Path to HDDS source root (e.g. ~/hdds)")
set(HDDS_C_INCLUDE "${HDDS_ROOT}/sdk/c/include")
set(HDDS_CXX_INCLUDE "${HDDS_ROOT}/sdk/cxx/include")
set(HDDS_LIB_DIR "${HDDS_ROOT}/target/release")
set(HDDS_CXX_LIB_DIR "${HDDS_ROOT}/sdk/cxx/build")
target_include_directories(myapp PRIVATE ${HDDS_CXX_INCLUDE} ${HDDS_C_INCLUDE})
target_link_directories(myapp PRIVATE ${HDDS_CXX_LIB_DIR} ${HDDS_LIB_DIR})
target_link_libraries(myapp PRIVATE hdds_cxx hdds_c pthread dl m)
Header
#include <hdds.hpp>
Quick Start
The recommended C++ API uses create_writer<T>() / create_reader<T>() with types generated by hddsgen. CDR2 serialization is handled automatically -- no manual buffer management needed.
#include <hdds.hpp>
#include "MyType.hpp" // generated by: hddsgen gen cpp MyType.idl -o MyType.hpp
int main() {
// RAII participant
hdds::Participant participant("my_app");
// Fluent QoS builder
auto qos = hdds::QoS::reliable()
.transient_local()
.history_depth(10);
// Create typed writer (compile-time check for encode_cdr2_le)
auto writer = participant.create_writer<MyType>("hello/world", qos);
// Publish -- CDR2 serialization is automatic
writer.write(MyType{42, "hello"});
return 0; // RAII cleanup
}
Typed vs Raw API
| Typed API | Raw API | |
|---|---|---|
| Create | create_writer<T>() / create_reader<T>() | create_writer_raw() / create_reader_raw() |
| Returns | TypedDataWriter<T> / TypedDataReader<T> (value) | unique_ptr<DataWriter> / unique_ptr<DataReader> |
| Syntax | writer.write(msg) / reader.take() | writer->write_raw(bytes) / reader->take_raw() |
| Serialization | Automatic (CDR2) | Manual (encode_cdr2_le / decode_cdr2_le) |
| Use when | You have IDL-generated types (normal case) | Custom serialization, raw bytes, or bridging protocols |
Transport Mode
Select transport at participant creation.
enum class TransportMode {
IntraProcess = 0, // Same-process communication (zero-copy)
UdpMulticast = 1, // UDP multicast (default, LAN discovery)
};
TCP, QUIC, and LowBandwidth transports are available in the Rust core but not yet exposed via the C++ TransportMode enum. To use them from a C++ application, set the HDDS_TRANSPORT environment variable before creating a participant:
# TCP transport (NAT traversal, firewalls)
HDDS_TRANSPORT=tcp ./my_app
# QUIC transport (encrypted, NAT-friendly)
HDDS_TRANSPORT=quic ./my_app
# Low bandwidth transport (satellite, constrained links)
HDDS_TRANSPORT=low_bandwidth ./my_app
Or set it programmatically before participant creation:
setenv("HDDS_TRANSPORT", "tcp", 1);
hdds::Participant participant("my_app");
Participant
Entry point for all DDS operations.
Creation
// Basic creation (UDP multicast, domain 0)
hdds::Participant participant("my_app");
// With domain ID
hdds::Participant participant("my_app", 42);
// With explicit transport mode
hdds::Participant participant("my_app", hdds::TransportMode::UdpMulticast, 42);
// IntraProcess transport (same-process, zero-copy)
hdds::Participant participant("my_app", hdds::TransportMode::IntraProcess);
Properties
const std::string& name = participant.name();
uint32_t domain = participant.domain_id();
uint8_t pid = participant.participant_id();
// Get C handle for advanced usage
HddsParticipant* c_handle = participant.c_handle();
Creating Writers/Readers
// Raw (untyped) -- returns unique_ptr<DataWriter/DataReader>, use -> syntax
auto writer = participant.create_writer_raw("topic", qos);
auto reader = participant.create_reader_raw("topic", qos);
writer->write_raw(data);
// Typed -- returns TypedDataWriter<T>/TypedDataReader<T> by value, use . syntax
auto writer = participant.create_writer<MyType>("topic", qos);
auto reader = participant.create_reader<MyType>("topic", qos);
writer.write(MyType{42, "hello"}); // no need to specify <MyType> again
auto msg = reader.take(); // no need to specify <MyType> again
// Publisher/Subscriber grouping (also supports typed API)
auto publisher = participant.create_publisher(qos);
auto writer = publisher.create_writer<MyType>("topic");
auto subscriber = participant.create_subscriber(qos);
auto reader = subscriber.create_reader<MyType>("topic");
DSCP Configuration
DSCP configuration is implemented in the Rust core but per-participant DSCP control is not yet exposed in the C++ SDK. Use the HDDS_DSCP environment variable instead:
export HDDS_DSCP=ef # Expedited Forwarding (real-time, lowest latency)
export HDDS_DSCP=af21 # Low-latency data (default DDS)
The DscpClass enum and dscp_to_tos() helper are available in hdds.hpp for reference.
DSCP classes (available in Rust and C APIs):
enum class DscpClass : uint8_t {
BestEffort = 0, // CS0 - Default, no priority
Af11 = 10, // High-throughput data
Af21 = 18, // Low-latency data (standard DDS)
Af31 = 26, // Streaming media
Af41 = 34, // Video, important telemetry
Ef = 46, // Real-time, safety-critical
Cs6 = 48, // Network control
Cs7 = 56, // Highest priority
};
QoS Configuration
Fluent builder API for Quality of Service.
Factory Methods
// Predefined profiles
auto qos = hdds::QoS::default_qos(); // BestEffort, Volatile
auto qos = hdds::QoS::reliable(); // Reliable delivery
auto qos = hdds::QoS::best_effort(); // Fire and forget
auto qos = hdds::QoS::rti_defaults(); // RTI Connext compatible
// Load from XML file (OMG DDS QoS profile format)
auto qos = hdds::QoS::from_xml("qos_profile.xml");
// Load from FastDDS XML profile
auto qos = hdds::QoS::from_file("fastdds_profile.xml");
Both from_xml() and from_file() throw hdds::Error if the file does not exist or contains invalid XML.
See sdk/samples/02_qos/qos_profile.xml for a complete example:
<dds xmlns="http://www.omg.org/dds/">
<qos_profile name="reliable_profile">
<datawriter_qos>
<reliability><kind>RELIABLE</kind></reliability>
<durability><kind>TRANSIENT_LOCAL</kind></durability>
</datawriter_qos>
</qos_profile>
</dds>
The xml_loading sample in sdk/samples/02_qos/cpp/ demonstrates loading profiles at runtime.
Fluent Builder
auto qos = hdds::QoS::reliable()
.transient_local() // Durability
.history_depth(100) // History
.deadline(std::chrono::milliseconds(100)) // Deadline
.lifespan(std::chrono::seconds(5)) // Lifespan
.liveliness_automatic(std::chrono::seconds(1))
.ownership_exclusive(100) // Ownership strength
.partition("sensors") // Partition
.time_based_filter(std::chrono::milliseconds(10))
.transport_priority(10)
.resource_limits(1000, 100, 10); // max_samples, instances, per_instance
Inspection
// Reliability & durability
bool reliable = qos.is_reliable();
bool transient = qos.is_transient_local();
// History
uint32_t depth = qos.get_history_depth();
// Timing
uint64_t latency_ns = qos.get_latency_budget_ns(); // 0 = none
uint64_t filter_ns = qos.get_time_based_filter_ns(); // 0 = no filter
// Resource limits (SIZE_MAX = unlimited / use defaults)
size_t max_samples = qos.get_max_samples();
size_t max_inst = qos.get_max_instances();
size_t max_per_inst = qos.get_max_samples_per_instance();
// Get C handle for FFI
HddsQoS* c_handle = qos.c_handle();
QoS Samples
Complete working examples for each QoS policy are available in sdk/samples/02_qos/cpp/:
| Sample | QoS Policy |
|---|---|
reliable_delivery | Reliability (RELIABLE) |
best_effort | Reliability (BEST_EFFORT) |
transient_local | Durability (TRANSIENT_LOCAL) |
deadline_monitor | Deadline |
liveliness_auto | Liveliness (AUTOMATIC) |
liveliness_manual | Liveliness (MANUAL_BY_TOPIC) |
partition_filter | Partition |
ownership_exclusive | Ownership (EXCLUSIVE) |
history_keep_last | History (KEEP_LAST) |
xml_loading | QoS from XML file |
lifespan | Lifespan |
latency_budget | Latency Budget |
time_based_filter | Time-Based Filter |
transport_priority | Transport Priority |
resource_limits | Resource Limits |
DataWriter
Writers publish data to a topic.
Typed Writer (recommended)
create_writer<T>() returns a TypedDataWriter<T> by value. Use . syntax:
auto writer = participant.create_writer<MyType>("topic", qos);
// CDR2 serialization handled automatically
writer.write(MyType{42, "hello"});
const std::string& topic = writer.topic_name();
// Access underlying DataWriter for raw operations
writer.raw()->write_raw(raw_bytes);
Raw Writer (untyped)
create_writer_raw() returns std::unique_ptr<DataWriter>. Use -> syntax:
auto writer = participant.create_writer_raw("topic", qos);
// Write raw bytes (vector)
std::vector<uint8_t> data = {1, 2, 3, 4};
writer->write_raw(data);
// Write raw bytes (pointer + size)
uint8_t buffer[256];
writer->write_raw(buffer, sizeof(buffer));
const std::string& topic = writer->topic_name();
Move Semantics
// Both typed and raw writers are movable, not copyable
auto writer2 = std::move(writer); // OK
// auto writer3 = writer2; // Error: deleted copy constructor
DataReader
Readers receive data from a topic.
Typed Reader (recommended)
create_reader<T>() returns a TypedDataReader<T> by value. Use . syntax. No need to re-specify the type on take():
auto reader = participant.create_reader<MyType>("topic", qos);
// Take typed data -- no <MyType> needed, reader already knows the type
std::optional<MyType> msg = reader.take();
if (msg) {
std::cout << "Received: " << msg->value << "\n";
}
// Status condition for WaitSet (returns raw C pointer -- owned by the reader, do not free)
HddsStatusCondition* cond = reader.get_status_condition();
// Access underlying DataReader for raw operations
reader.raw()->take_raw();
get_status_condition() returns a raw C pointerThe HddsStatusCondition* is owned by the reader/writer and lives as long as the parent object. Do not delete or free it. This applies to both typed and raw readers.
Raw Reader (untyped)
create_reader_raw() returns std::unique_ptr<DataReader>. Use -> syntax:
auto reader = participant.create_reader_raw("topic", qos);
// Take raw bytes (non-blocking)
std::optional<std::vector<uint8_t>> data = reader->take_raw();
if (data) {
process(*data);
}
HddsStatusCondition* cond = reader->get_status_condition();
WaitSet
Event-driven waiting for data availability.
Basic Usage
// Create waitset
hdds::WaitSet waitset;
// With typed reader (. syntax)
auto reader = participant.create_reader<MyType>("topic");
waitset.attach(reader.get_status_condition());
while (running) {
if (waitset.wait(std::chrono::seconds(1))) {
while (auto msg = reader.take()) {
process(msg->value);
}
}
}
// With raw reader (-> syntax)
auto raw_reader = participant.create_reader_raw("topic");
waitset.attach(raw_reader->get_status_condition());
while (running) {
if (waitset.wait(std::chrono::seconds(1))) {
while (auto data = raw_reader->take_raw()) {
process(*data);
}
}
}
Guard Conditions
// Create guard condition for custom signaling
hdds::GuardCondition guard;
waitset.attach(guard);
// Trigger from another thread
std::thread([&guard]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
guard.trigger();
}).detach();
// Wait will return when guard is triggered
waitset.wait();
// Cleanup
waitset.detach(guard);
Infinite Wait
waitset.wait(); // Blocks until condition triggered
Logging
C++ API
// Initialize with explicit level (Warn recommended to suppress internal Rust logs)
hdds::logging::init(hdds::LogLevel::Warn);
// Initialize from RUST_LOG env var (with fallback)
hdds::logging::init_env(hdds::LogLevel::Warn);
// Initialize with custom filter string
hdds::logging::init_filter("hdds=debug,hdds::rtps=trace");
Log levels: Off, Error, Warn, Info, Debug, Trace
Environment Variable
Alternatively, set RUST_LOG before calling init_env():
export RUST_LOG=hdds=info
export RUST_LOG=hdds=debug
export RUST_LOG=hdds::rtps=trace,hdds::discovery=debug
Telemetry
Built-in metrics collection.
Initialize
// Initialize global metrics
hdds::Metrics metrics = hdds::telemetry::init();
Snapshot
hdds::MetricsSnapshot snap = metrics.snapshot();
std::cout << "Messages sent: " << snap.messages_sent << "\n";
std::cout << "Messages received: " << snap.messages_received << "\n";
std::cout << "Latency P50: " << snap.latency_p50_ms() << "ms\n";
std::cout << "Latency P99: " << snap.latency_p99_ms() << "ms\n";
std::cout << "Latency P99.9: " << snap.latency_p999_ms() << "ms\n";
MetricsSnapshot fields:
struct MetricsSnapshot {
uint64_t timestamp_ns;
uint64_t messages_sent;
uint64_t messages_received;
uint64_t messages_dropped;
uint64_t bytes_sent;
uint64_t latency_p50_ns;
uint64_t latency_p99_ns;
uint64_t latency_p999_ns;
uint64_t merge_full_count;
uint64_t would_block_count;
double latency_p50_ms() const;
double latency_p99_ms() const;
double latency_p999_ms() const;
};
Exporter (HDDS Viewer)
// Start telemetry server for HDDS Viewer
auto exporter = hdds::telemetry::start_exporter("127.0.0.1", 4242);
// ... application runs ...
exporter.stop();
Manual Latency Recording
auto start = std::chrono::steady_clock::now();
// ... operation ...
auto end = std::chrono::steady_clock::now();
metrics.record_latency(
std::chrono::duration_cast<std::chrono::nanoseconds>(start.time_since_epoch()).count(),
std::chrono::duration_cast<std::chrono::nanoseconds>(end.time_since_epoch()).count()
);
Error Handling
// Exceptions
class hdds::Error : public std::runtime_error {
public:
explicit Error(const std::string& msg);
};
// Example
try {
auto writer = participant.create_writer_raw("topic");
writer->write_raw(data);
} catch (const hdds::Error& e) {
std::cerr << "HDDS error: " << e.what() << "\n";
}
Publisher / Subscriber
Optional grouping for writers/readers with shared QoS. Publisher and Subscriber support both typed and raw APIs, just like Participant:
// Create publisher/subscriber with shared QoS
auto publisher = participant.create_publisher(qos);
auto subscriber = participant.create_subscriber(qos);
// Typed API (recommended) -- same syntax as Participant
auto writer = publisher.create_writer<Temperature>("sensors/temp");
auto reader = subscriber.create_reader<Temperature>("sensors/temp", qos);
writer.write(Temperature{1, 22.5f});
auto msg = reader.take(); // std::optional<Temperature>
// Raw API
auto raw_writer = publisher.create_writer_raw("sensors/temp");
auto raw_reader = subscriber.create_reader_raw("sensors/temp", qos);
Complete Example
Typed API (recommended)
#include <hdds.hpp>
#include <iostream>
#include <thread>
#include <chrono>
#include "Temperature.hpp" // generated by: hddsgen gen cpp Temperature.idl -o Temperature.hpp
int main() {
hdds::logging::init(hdds::LogLevel::Warn);
hdds::Participant participant("example");
auto qos = hdds::QoS::reliable()
.transient_local()
.history_depth(10);
// Typed writer/reader -- CDR2 serialization handled automatically
auto writer = participant.create_writer<Temperature>("sensors/temp", qos);
auto reader = participant.create_reader<Temperature>("sensors/temp", qos);
hdds::WaitSet waitset;
waitset.attach(reader.get_status_condition());
// Publisher thread
std::thread pub_thread([&writer]() {
for (int i = 0; i < 10; ++i) {
writer.write(Temperature{i, 20.0f + i * 0.5f});
std::cout << "Published: sensor_id=" << i << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
// Subscriber loop
for (int received = 0; received < 10;) {
if (waitset.wait(std::chrono::seconds(5))) {
while (auto msg = reader.take()) {
std::cout << "Received: sensor " << msg->sensor_id
<< " = " << msg->value << "C\n";
++received;
}
}
}
pub_thread.join();
return 0;
}
Raw API version (untyped payloads)
#include <hdds.hpp>
#include <iostream>
#include <thread>
#include <chrono>
int main() {
hdds::logging::init(hdds::LogLevel::Warn);
hdds::Participant participant("example");
auto qos = hdds::QoS::reliable()
.transient_local()
.history_depth(10);
auto writer = participant.create_writer_raw("hello/world", qos);
auto reader = participant.create_reader_raw("hello/world", qos);
hdds::WaitSet waitset;
waitset.attach(reader->get_status_condition());
std::thread pub_thread([&writer]() {
for (int i = 0; i < 10; ++i) {
std::string msg = "Hello #" + std::to_string(i);
std::vector<uint8_t> data(msg.begin(), msg.end());
writer->write_raw(data);
std::cout << "Published: " << msg << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
for (int received = 0; received < 10;) {
if (waitset.wait(std::chrono::seconds(5))) {
while (auto data = reader->take_raw()) {
std::string msg(data->begin(), data->end());
std::cout << "Received: " << msg << "\n";
++received;
}
}
}
pub_thread.join();
return 0;
}
Thread Safety
- Participant creation/destruction: NOT thread-safe
- Writer/Reader creation: NOT thread-safe
writer.write()/writer->write_raw(): Thread-safe (multiple threads can write)reader.take()/reader->take_raw(): NOT thread-safe (use one reader per thread)- QoS methods: NOT thread-safe
- WaitSet: NOT thread-safe
Using with Typed Data (hddsgen)
Use hddsgen to generate C++ types with CDR2 serialization from IDL files. The typed API (create_writer<T>, create_reader<T>) works directly with these generated types -- no manual buffer management needed.
T must provide encode_cdr2_le(uint8_t*, size_t) -> int (for writing) and decode_cdr2_le(const uint8_t*, size_t) -> int (for reading). Types generated by hddsgen satisfy this automatically. Using a type without these methods triggers a compile-time error with instructions.
# Generate C++ header from IDL (-o required, otherwise outputs to stdout)
hddsgen gen cpp Temperature.idl -o Temperature.hpp
# Install hdds-gen if needed (requires Rust toolchain)
cd /path/to/hdds_gen && cargo install --path .
Generated Code Structure
For an IDL struct like:
struct Temperature {
long sensor_id;
float value;
};
hddsgen generates a header with two components:
1. Data struct with CDR2 codec methods:
struct Temperature {
int32_t sensor_id = 0;
float value = 0.0f;
Temperature() = default;
Temperature(int32_t s, float v);
// Serialize to CDR2 LE buffer. Returns bytes written, or -1 on error.
[[nodiscard]] int encode_cdr2_le(std::uint8_t* dst, std::size_t len) const noexcept;
// Deserialize from CDR2 LE buffer. Returns bytes read, or -1 on error.
[[nodiscard]] int decode_cdr2_le(const std::uint8_t* src, std::size_t len) noexcept;
};
2. PubSubType helper with DDS-compatible static interface:
class TemperaturePubSubType {
public:
using type = Temperature;
static constexpr const char* type_name() noexcept;
static int serialize(const void* data, std::uint8_t* buf, std::size_t buf_size) noexcept;
static int deserialize(const std::uint8_t* buf, std::size_t buf_size, void* data) noexcept;
static std::size_t calculate_serialized_size(const void* data) noexcept;
static void* create_data() noexcept;
static void delete_data(void* data) noexcept;
static bool compute_key(const void* data, std::uint8_t key_buffer[16]) noexcept;
static constexpr bool has_key() noexcept;
};
Publishing (typed API)
#include "Temperature.hpp"
#include <hdds.hpp>
auto writer = participant.create_writer<Temperature>("sensors/temp", qos);
// CDR2 serialization handled automatically (. syntax, not ->)
writer.write(Temperature{1, 23.5f});
Subscribing (typed API)
auto reader = participant.create_reader<Temperature>("sensors/temp", qos);
// No need to re-specify <Temperature> -- reader already knows the type
while (auto msg = reader.take()) {
std::cout << "Sensor " << msg->sensor_id << ": " << msg->value << "C\n";
}
Raw API (manual buffer management)
Publishing:
auto writer = participant.create_writer_raw("sensors/temp", qos);
Temperature msg(1, 23.5f);
std::uint8_t buffer[4096];
int bytes = msg.encode_cdr2_le(buffer, sizeof(buffer));
if (bytes > 0) {
writer->write_raw(buffer, static_cast<size_t>(bytes));
}
Subscribing:
auto reader = participant.create_reader_raw("sensors/temp", qos);
while (auto data = reader->take_raw()) {
Temperature msg;
if (msg.decode_cdr2_le(data->data(), data->size()) > 0) {
std::cout << "Sensor " << msg.sensor_id << ": " << msg.value << "C\n";
}
}
PubSubType Usage
The PubSubType class is primarily used for type-erased patterns and DDS infrastructure. For direct usage, prefer the typed API or encode_cdr2_le()/decode_cdr2_le() on the struct itself.
// Using PubSubType for serialize (equivalent to encode_cdr2_le)
Temperature msg(1, 23.5f);
std::uint8_t buffer[4096];
int bytes = TemperaturePubSubType::serialize(&msg, buffer, sizeof(buffer));
// Using PubSubType for deserialize (equivalent to decode_cdr2_le)
Temperature out;
int result = TemperaturePubSubType::deserialize(buffer, bytes, &out);
// Get serialized size
std::size_t size = TemperaturePubSubType::calculate_serialized_size(&msg);
Listeners
Listeners provide callback-based notification for DDS entity events.
This is an alternative to the polling-based WaitSet pattern.
Include hdds_listener.hpp for the C++ listener wrappers.
#include <hdds_listener.hpp>
ReaderListener
Base class for DataReader event callbacks. Override the methods you need; unoverridden methods are no-ops. The listener must outlive the reader it is attached to.
class hdds::ReaderListener {
public:
virtual ~ReaderListener() = default;
// Called when new data is available (raw serialized bytes)
virtual void on_data_available(const uint8_t* data, size_t len);
// Called when the reader matches/unmatches with a writer
virtual void on_subscription_matched(const hdds::SubscriptionMatchedStatus& status);
// Called when liveliness of a matched writer changes
virtual void on_liveliness_changed(const hdds::LivelinessChangedStatus& status);
// Called when samples are lost (gap in sequence numbers)
virtual void on_sample_lost(const hdds::SampleLostStatus& status);
// Called when samples are rejected due to resource limits
virtual void on_sample_rejected(const hdds::SampleRejectedStatus& status);
// Called when the requested deadline is missed
virtual void on_deadline_missed(const hdds::DeadlineMissedStatus& status);
// Called when QoS is incompatible with a matched writer
virtual void on_incompatible_qos(const hdds::IncompatibleQosStatus& status);
};
WriterListener
Base class for DataWriter event callbacks. Override the methods you need.
class hdds::WriterListener {
public:
virtual ~WriterListener() = default;
// Called after a sample is successfully written
virtual void on_sample_written(const uint8_t* data, size_t len, uint64_t seq);
// Called when the writer matches/unmatches with a reader
virtual void on_publication_matched(const hdds::PublicationMatchedStatus& status);
// Called when an offered deadline is missed
virtual void on_offered_deadline_missed(uint64_t instance_handle);
// Called when QoS is incompatible with a matched reader
virtual void on_offered_incompatible_qos(uint32_t policy_id, const char* policy_name);
// Called when liveliness is lost (MANUAL_BY_* only)
virtual void on_liveliness_lost();
};
Status Types
struct hdds::SubscriptionMatchedStatus {
uint32_t total_count; // Total cumulative matched publications
int32_t total_count_change; // Change since last callback
uint32_t current_count; // Current number of matched publications
int32_t current_count_change; // Change since last callback
};
struct hdds::PublicationMatchedStatus {
uint32_t total_count;
int32_t total_count_change;
uint32_t current_count;
int32_t current_count_change;
};
struct hdds::LivelinessChangedStatus {
uint32_t alive_count;
int32_t alive_count_change;
uint32_t not_alive_count;
int32_t not_alive_count_change;
};
struct hdds::SampleLostStatus {
uint32_t total_count;
int32_t total_count_change;
};
struct hdds::SampleRejectedStatus {
uint32_t total_count;
int32_t total_count_change;
uint32_t last_reason; // 0=NotRejected, 1=ResourceLimit, 2=InstanceLimit, 3=SamplesPerInstanceLimit
};
struct hdds::DeadlineMissedStatus {
uint32_t total_count;
int32_t total_count_change;
};
struct hdds::IncompatibleQosStatus {
uint32_t total_count;
int32_t total_count_change;
uint32_t last_policy_id;
};
Installing Listeners
Use hdds::set_listener() to install a listener on a reader or writer, and hdds::clear_listener() to remove it. Both are overloaded for readers and writers.
// With typed reader/writer (recommended -- pass directly, no c_handle needed)
hdds::set_listener(reader, &my_reader_listener); // returns 0 on success
hdds::clear_listener(reader);
hdds::set_listener(writer, &my_writer_listener);
hdds::clear_listener(writer);
// With raw reader/writer (requires c_handle())
hdds::set_listener(raw_reader->c_handle(), &my_reader_listener);
hdds::clear_listener(raw_reader->c_handle());
Per-Callback Convenience Setters
For simple cases where you only need one callback, use the per-callback setters instead of a full listener class. These replace any previously installed listener (they are mutually exclusive with set_listener()).
// With typed reader (recommended)
hdds::set_on_data_available(reader,
[](const uint8_t* data, size_t len, void* ctx) {
std::cout << "Received " << len << " bytes\n";
});
hdds::set_on_subscription_matched(reader,
[](const HddsSubscriptionMatchedStatus* status, void* ctx) {
std::cout << "Matched writers: " << status->current_count << "\n";
});
hdds::set_on_publication_matched(writer,
[](const HddsPublicationMatchedStatus* status, void* ctx) {
std::cout << "Matched readers: " << status->current_count << "\n";
});
// With raw reader/writer (requires c_handle())
hdds::set_on_data_available(raw_reader->c_handle(), my_callback);
The on_data_available callback receives raw serialized bytes (const uint8_t* data, size_t len), not a typed object. You need to decode manually inside the callback:
hdds::set_on_data_available(reader,
[](const uint8_t* data, size_t len, void* ctx) {
SensorData msg;
if (msg.decode_cdr2_le(data, len) > 0) {
std::cout << "Temp: " << msg.temperature << "\n";
}
});
This is a current limitation of the C FFI layer. For typed deserialization without manual decoding, use the WaitSet + reader.take() pattern instead (see Hello World).
Complete Listener Example
#include <hdds.hpp>
#include <hdds_listener.hpp>
#include <iostream>
#include <atomic>
#include "SensorData.hpp" // hddsgen-generated type
class SensorListener : public hdds::ReaderListener {
std::atomic<uint64_t> received_{0};
public:
void on_data_available(const uint8_t* data, size_t len) override {
received_.fetch_add(1, std::memory_order_relaxed);
// Decode raw bytes to typed data
SensorData msg;
if (msg.decode_cdr2_le(data, len) > 0) {
std::cout << "Sensor " << msg.sensor_id
<< ": " << msg.temperature << "C\n";
}
}
void on_subscription_matched(const hdds::SubscriptionMatchedStatus& s) override {
std::cout << "Matched writers: " << s.current_count
<< " (total: " << s.total_count << ")\n";
}
void on_sample_lost(const hdds::SampleLostStatus& s) override {
std::cout << "WARNING: " << s.total_count_change << " sample(s) lost\n";
}
uint64_t count() const { return received_.load(std::memory_order_relaxed); }
};
int main() {
hdds::Participant participant("listener_demo");
auto qos = hdds::QoS::reliable().transient_local();
// Typed reader (recommended)
auto reader = participant.create_reader<SensorData>("sensors/temp", qos);
SensorListener listener;
hdds::set_listener(reader, &listener); // pass typed reader directly
// ... application runs; callbacks fire on background threads ...
// Cleanup: remove listener before destroying reader
hdds::clear_listener(reader);
return 0;
}
- The listener object must outlive the reader/writer it is attached to.
- Callbacks are invoked from background threads. Implementations should return quickly and must not block.
- Always call
hdds::clear_listener()before destroying the listener object.
Known Issues (v1.0.11)
| Issue | Description | Workaround |
|---|---|---|
| TRANSIENT_LOCAL late-joiner delivery | A subscriber joining after a publisher has written does not receive cached historical data. Pub/sub works correctly when both are running simultaneously. | Start subscriber before publisher, or use a WaitSet on on_publication_matched to delay publishing until a reader is discovered. |
Not Yet Implemented (v1.0.11)
| Feature | Status |
|---|---|
| DSCP C++ bindings | Use env var HDDS_DSCP instead |
| Instance management (dispose, unregister) | Not implemented |
| SampleInfo with metadata | Not implemented |
| Content-filtered topics | Not implemented |
Next Steps
- Hello World C++ - Complete tutorial
- C API - Low-level C FFI
- Rust API - Native Rust API