Skip to main content

Python API Reference

HDDS provides Python bindings through hdds-py, offering a Pythonic API with full type hints and async support.

Installation

# From PyPI
pip install hdds

# From source
git clone https://github.com/example/hdds.git
cd hdds
pip install ./crates/hdds-py

# With async support
pip install hdds[async]

Quick Start

import hdds
from dataclasses import dataclass

@dataclass
class SensorData:
sensor_id: int
value: float
timestamp: int

# Create participant
participant = hdds.DomainParticipant(domain_id=0)

# Create topic
topic = participant.create_topic("SensorTopic", SensorData)

# Create writer
publisher = participant.create_publisher()
writer = publisher.create_datawriter(topic)

# Publish
writer.write(SensorData(sensor_id=1, value=25.5, timestamp=0))

Domain Participant

Creation

import hdds
from hdds import DomainParticipant, DomainParticipantQos

# Default configuration
participant = DomainParticipant(domain_id=0)

# With QoS
qos = DomainParticipantQos(
entity_name="MySensor",
# Transport settings
enable_shared_memory=True,
enable_udp=True,
)
participant = DomainParticipant(domain_id=0, qos=qos)

# Context manager (auto cleanup)
with DomainParticipant(domain_id=0) as participant:
# Use participant...
pass # Automatically closed

Properties and Methods

# Properties
domain_id: int = participant.domain_id
instance_handle: InstanceHandle = participant.instance_handle
guid: GUID = participant.guid

# Enable (if created disabled)
participant.enable()

# Close explicitly
participant.close()

# Check if closed
if participant.is_closed:
print("Participant is closed")

Topics

Type Registration

from dataclasses import dataclass
from typing import Optional, List

@dataclass
class SensorData:
"""Sensor reading with optional metadata."""
sensor_id: int # uint32
value: float # float32
timestamp: int # uint64
unit: Optional[str] = None # @optional string

@dataclass
class RobotPosition:
"""Robot position with key field."""
robot_id: int # @key uint32
x: float
y: float
z: float
heading: float

Topic Creation

from hdds import Topic, TopicQos, Reliability, Durability

# Default QoS
topic = participant.create_topic("SensorTopic", SensorData)

# With QoS
topic_qos = TopicQos(
reliability=Reliability.RELIABLE,
durability=Durability.TRANSIENT_LOCAL,
)
topic = participant.create_topic(
name="SensorTopic",
data_type=SensorData,
qos=topic_qos
)

# Topic properties
print(f"Topic: {topic.name}, Type: {topic.type_name}")

Publisher

from hdds import Publisher, PublisherQos, Partition

# Default
publisher = participant.create_publisher()

# With QoS
pub_qos = PublisherQos(
partition=Partition(["sensors", "telemetry"])
)
publisher = participant.create_publisher(qos=pub_qos)

DataWriter

Creation

from hdds import DataWriter, DataWriterQos, Reliability, History

# Default QoS
writer = publisher.create_datawriter(topic)

# With QoS
writer_qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(10),
deadline=Duration.from_millis(100),
)
writer = publisher.create_datawriter(topic, qos=writer_qos)

# Type-safe writer
writer: DataWriter[SensorData] = publisher.create_datawriter(topic)

Write Operations

# Simple write
sample = SensorData(sensor_id=1, value=25.5, timestamp=time.time_ns())
writer.write(sample)

# Write with timestamp
writer.write(sample, timestamp=time.time_ns())

# Write with instance handle (for keyed topics)
handle = writer.register_instance(sample)
writer.write(sample, instance_handle=handle)

# Batch write
samples = [
SensorData(sensor_id=1, value=v, timestamp=time.time_ns())
for v in range(100)
]
for sample in samples:
writer.write(sample)
writer.flush() # Force send

Instance Management

# Register instance (for keyed topics)
sample = RobotPosition(robot_id=1, x=0, y=0, z=0, heading=0)
handle = writer.register_instance(sample)

# Update instance
sample.x = 10.0
writer.write(sample, instance_handle=handle)

# Dispose instance (mark as deleted)
writer.dispose(sample, instance_handle=handle)

# Unregister instance
writer.unregister_instance(sample, instance_handle=handle)

Status

# Publication matched
status = writer.publication_matched_status
print(f"Matched readers: {status.current_count}")

# Offered deadline missed
status = writer.offered_deadline_missed_status
if status.total_count > 0:
print(f"Missed deadlines: {status.total_count}")

# Wait for acknowledgments
writer.wait_for_acknowledgments(timeout=Duration.from_secs(5))

Subscriber

from hdds import Subscriber, SubscriberQos

# Default
subscriber = participant.create_subscriber()

# With QoS
sub_qos = SubscriberQos(
partition=Partition(["sensors"])
)
subscriber = participant.create_subscriber(qos=sub_qos)

DataReader

Creation

from hdds import DataReader, DataReaderQos

# Default QoS
reader = subscriber.create_datareader(topic)

# With QoS
reader_qos = DataReaderQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100),
)
reader = subscriber.create_datareader(topic, qos=reader_qos)

# Type-safe reader
reader: DataReader[SensorData] = subscriber.create_datareader(topic)

Read Operations

# Take all available (removes from cache)
samples = reader.take()
for sample, info in samples:
if info.valid_data:
print(f"Sensor {sample.sensor_id}: {sample.value}")

# Read all available (keeps in cache)
samples = reader.read()

# Take with limit
samples = reader.take(max_samples=10)

# Read one sample
sample, info = reader.read_one()
if info.valid_data:
process(sample)

# Take specific instance
samples = reader.take_instance(instance_handle=handle)

Filtering

from hdds import SampleState, ViewState, InstanceState

# Read only new samples
samples = reader.read(
sample_states=SampleState.NOT_READ,
view_states=ViewState.ANY,
instance_states=InstanceState.ALIVE
)

# Read only alive instances
samples = reader.take(
instance_states=InstanceState.ALIVE
)

Sample Info

for sample, info in reader.take():
print(f"Valid: {info.valid_data}")
print(f"Sample state: {info.sample_state}")
print(f"View state: {info.view_state}")
print(f"Instance state: {info.instance_state}")
print(f"Source timestamp: {info.source_timestamp}")
print(f"Instance handle: {info.instance_handle}")
print(f"Publication handle: {info.publication_handle}")

Polling

# Polling loop
while running:
samples = reader.take()
if samples:
for sample, info in samples:
if info.valid_data:
process(sample)
time.sleep(0.001) # 1ms

WaitSet

from hdds import WaitSet, StatusCondition

# Create waitset
waitset = WaitSet()

# Attach reader status condition
condition = reader.status_condition
condition.enabled_statuses = StatusMask.DATA_AVAILABLE
waitset.attach(condition)

# Wait loop
while running:
active = waitset.wait(timeout=Duration.from_secs(1))
for cond in active:
if cond == condition:
samples = reader.take()
for sample, info in samples:
if info.valid_data:
process(sample)

# Detach and cleanup
waitset.detach(condition)

Async API

import asyncio
from hdds.async_api import AsyncDataReader, AsyncDataWriter

async def publisher():
participant = DomainParticipant(domain_id=0)
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()
writer: AsyncDataWriter[SensorData] = publisher.create_async_datawriter(topic)

for i in range(100):
sample = SensorData(sensor_id=1, value=float(i), timestamp=time.time_ns())
await writer.write(sample)
await asyncio.sleep(0.1)

async def subscriber():
participant = DomainParticipant(domain_id=0)
topic = participant.create_topic("SensorTopic", SensorData)
subscriber = participant.create_subscriber()
reader: AsyncDataReader[SensorData] = subscriber.create_async_datareader(topic)

async for sample, info in reader:
if info.valid_data:
print(f"Received: {sample}")

async def main():
await asyncio.gather(publisher(), subscriber())

asyncio.run(main())

Listeners

from hdds import DataWriterListener, DataReaderListener

class MyWriterListener(DataWriterListener):
def on_publication_matched(self, writer, status):
print(f"Matched readers: {status.current_count}")

def on_offered_deadline_missed(self, writer, status):
print(f"Deadline missed: {status.total_count}")

class MyReaderListener(DataReaderListener):
def on_data_available(self, reader):
samples = reader.take()
for sample, info in samples:
if info.valid_data:
self.process(sample)

def on_subscription_matched(self, reader, status):
print(f"Matched writers: {status.current_count}")

# Attach listeners
writer.set_listener(MyWriterListener())
reader.set_listener(MyReaderListener())

QoS Classes

Reliability

from hdds import Reliability, Duration

# Best effort
qos = DataWriterQos(reliability=Reliability.BEST_EFFORT)

# Reliable with timeout
qos = DataWriterQos(
reliability=Reliability.RELIABLE,
max_blocking_time=Duration.from_millis(100)
)

History

from hdds import History

# Keep last N samples
qos = DataWriterQos(history=History.keep_last(10))

# Keep all samples
qos = DataWriterQos(history=History.keep_all())

Durability

from hdds import Durability

qos = DataWriterQos(
durability=Durability.VOLATILE # No persistence
# durability=Durability.TRANSIENT_LOCAL # Persist for late joiners
# durability=Durability.TRANSIENT # Persist in memory
# durability=Durability.PERSISTENT # Persist to disk
)

Complete QoS Example

from hdds import (
DataWriterQos, Reliability, History, Durability,
Deadline, Liveliness, Duration
)

qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100),
durability=Durability.TRANSIENT_LOCAL,
deadline=Duration.from_millis(100),
liveliness=Liveliness.manual_by_topic(
lease_duration=Duration.from_secs(1)
),
)

Error Handling

from hdds import HddsError, TimeoutError, PreconditionNotMetError

try:
writer.write(sample)
except TimeoutError:
print("Write timed out - readers not keeping up")
except PreconditionNotMetError as e:
print(f"Precondition failed: {e}")
except HddsError as e:
print(f"DDS error: {e}")

Type Hints

from typing import Optional
from hdds import DataWriter, DataReader, DomainParticipant

def create_sensor_writer(
participant: DomainParticipant
) -> DataWriter[SensorData]:
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()
return publisher.create_datawriter(topic)

def read_sensors(
reader: DataReader[SensorData]
) -> list[SensorData]:
return [
sample
for sample, info in reader.take()
if info.valid_data
]

Complete Example

#!/usr/bin/env python3
"""HDDS Python example: Sensor publisher and subscriber."""

import time
import signal
import threading
from dataclasses import dataclass

import hdds
from hdds import (
DomainParticipant, DataWriterQos, DataReaderQos,
Reliability, History, Duration
)

@dataclass
class SensorData:
sensor_id: int
value: float
timestamp: int

running = True

def signal_handler(sig, frame):
global running
running = False

def publisher_thread(participant: DomainParticipant):
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()

qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(10)
)
writer = publisher.create_datawriter(topic, qos=qos)

count = 0
while running:
sample = SensorData(
sensor_id=1,
value=20.0 + (count % 20),
timestamp=time.time_ns()
)
writer.write(sample)
print(f"Published: sensor={sample.sensor_id} value={sample.value:.1f}")
count += 1
time.sleep(0.1)

def subscriber_thread(participant: DomainParticipant):
topic = participant.create_topic("SensorTopic", SensorData)
subscriber = participant.create_subscriber()

qos = DataReaderQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100)
)
reader = subscriber.create_datareader(topic, qos=qos)

while running:
samples = reader.take()
for sample, info in samples:
if info.valid_data:
print(f"Received: sensor={sample.sensor_id} value={sample.value:.1f}")
time.sleep(0.01)

def main():
signal.signal(signal.SIGINT, signal_handler)

with DomainParticipant(domain_id=0) as participant:
pub_thread = threading.Thread(
target=publisher_thread, args=(participant,))
sub_thread = threading.Thread(
target=subscriber_thread, args=(participant,))

pub_thread.start()
sub_thread.start()

pub_thread.join()
sub_thread.join()

if __name__ == "__main__":
main()

Next Steps