Python API Reference
HDDS provides Python bindings through hdds-py, offering a Pythonic API with full type hints and async support.
Installation
# From PyPI
pip install hdds
# From source
git clone https://github.com/example/hdds.git
cd hdds
pip install ./crates/hdds-py
# With async support
pip install hdds[async]
Quick Start
import hdds
from dataclasses import dataclass
@dataclass
class SensorData:
sensor_id: int
value: float
timestamp: int
# Create participant
participant = hdds.DomainParticipant(domain_id=0)
# Create topic
topic = participant.create_topic("SensorTopic", SensorData)
# Create writer
publisher = participant.create_publisher()
writer = publisher.create_datawriter(topic)
# Publish
writer.write(SensorData(sensor_id=1, value=25.5, timestamp=0))
Domain Participant
Creation
import hdds
from hdds import DomainParticipant, DomainParticipantQos
# Default configuration
participant = DomainParticipant(domain_id=0)
# With QoS
qos = DomainParticipantQos(
entity_name="MySensor",
# Transport settings
enable_shared_memory=True,
enable_udp=True,
)
participant = DomainParticipant(domain_id=0, qos=qos)
# Context manager (auto cleanup)
with DomainParticipant(domain_id=0) as participant:
# Use participant...
pass # Automatically closed
Properties and Methods
# Properties
domain_id: int = participant.domain_id
instance_handle: InstanceHandle = participant.instance_handle
guid: GUID = participant.guid
# Enable (if created disabled)
participant.enable()
# Close explicitly
participant.close()
# Check if closed
if participant.is_closed:
print("Participant is closed")
Topics
Type Registration
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class SensorData:
"""Sensor reading with optional metadata."""
sensor_id: int # uint32
value: float # float32
timestamp: int # uint64
unit: Optional[str] = None # @optional string
@dataclass
class RobotPosition:
"""Robot position with key field."""
robot_id: int # @key uint32
x: float
y: float
z: float
heading: float
Topic Creation
from hdds import Topic, TopicQos, Reliability, Durability
# Default QoS
topic = participant.create_topic("SensorTopic", SensorData)
# With QoS
topic_qos = TopicQos(
reliability=Reliability.RELIABLE,
durability=Durability.TRANSIENT_LOCAL,
)
topic = participant.create_topic(
name="SensorTopic",
data_type=SensorData,
qos=topic_qos
)
# Topic properties
print(f"Topic: {topic.name}, Type: {topic.type_name}")
Publisher
from hdds import Publisher, PublisherQos, Partition
# Default
publisher = participant.create_publisher()
# With QoS
pub_qos = PublisherQos(
partition=Partition(["sensors", "telemetry"])
)
publisher = participant.create_publisher(qos=pub_qos)
DataWriter
Creation
from hdds import DataWriter, DataWriterQos, Reliability, History
# Default QoS
writer = publisher.create_datawriter(topic)
# With QoS
writer_qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(10),
deadline=Duration.from_millis(100),
)
writer = publisher.create_datawriter(topic, qos=writer_qos)
# Type-safe writer
writer: DataWriter[SensorData] = publisher.create_datawriter(topic)
Write Operations
# Simple write
sample = SensorData(sensor_id=1, value=25.5, timestamp=time.time_ns())
writer.write(sample)
# Write with timestamp
writer.write(sample, timestamp=time.time_ns())
# Write with instance handle (for keyed topics)
handle = writer.register_instance(sample)
writer.write(sample, instance_handle=handle)
# Batch write
samples = [
SensorData(sensor_id=1, value=v, timestamp=time.time_ns())
for v in range(100)
]
for sample in samples:
writer.write(sample)
writer.flush() # Force send
Instance Management
# Register instance (for keyed topics)
sample = RobotPosition(robot_id=1, x=0, y=0, z=0, heading=0)
handle = writer.register_instance(sample)
# Update instance
sample.x = 10.0
writer.write(sample, instance_handle=handle)
# Dispose instance (mark as deleted)
writer.dispose(sample, instance_handle=handle)
# Unregister instance
writer.unregister_instance(sample, instance_handle=handle)
Status
# Publication matched
status = writer.publication_matched_status
print(f"Matched readers: {status.current_count}")
# Offered deadline missed
status = writer.offered_deadline_missed_status
if status.total_count > 0:
print(f"Missed deadlines: {status.total_count}")
# Wait for acknowledgments
writer.wait_for_acknowledgments(timeout=Duration.from_secs(5))
Subscriber
from hdds import Subscriber, SubscriberQos
# Default
subscriber = participant.create_subscriber()
# With QoS
sub_qos = SubscriberQos(
partition=Partition(["sensors"])
)
subscriber = participant.create_subscriber(qos=sub_qos)
DataReader
Creation
from hdds import DataReader, DataReaderQos
# Default QoS
reader = subscriber.create_datareader(topic)
# With QoS
reader_qos = DataReaderQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100),
)
reader = subscriber.create_datareader(topic, qos=reader_qos)
# Type-safe reader
reader: DataReader[SensorData] = subscriber.create_datareader(topic)
Read Operations
# Take all available (removes from cache)
samples = reader.take()
for sample, info in samples:
if info.valid_data:
print(f"Sensor {sample.sensor_id}: {sample.value}")
# Read all available (keeps in cache)
samples = reader.read()
# Take with limit
samples = reader.take(max_samples=10)
# Read one sample
sample, info = reader.read_one()
if info.valid_data:
process(sample)
# Take specific instance
samples = reader.take_instance(instance_handle=handle)
Filtering
from hdds import SampleState, ViewState, InstanceState
# Read only new samples
samples = reader.read(
sample_states=SampleState.NOT_READ,
view_states=ViewState.ANY,
instance_states=InstanceState.ALIVE
)
# Read only alive instances
samples = reader.take(
instance_states=InstanceState.ALIVE
)
Sample Info
for sample, info in reader.take():
print(f"Valid: {info.valid_data}")
print(f"Sample state: {info.sample_state}")
print(f"View state: {info.view_state}")
print(f"Instance state: {info.instance_state}")
print(f"Source timestamp: {info.source_timestamp}")
print(f"Instance handle: {info.instance_handle}")
print(f"Publication handle: {info.publication_handle}")
Polling
# Polling loop
while running:
samples = reader.take()
if samples:
for sample, info in samples:
if info.valid_data:
process(sample)
time.sleep(0.001) # 1ms
WaitSet
from hdds import WaitSet, StatusCondition
# Create waitset
waitset = WaitSet()
# Attach reader status condition
condition = reader.status_condition
condition.enabled_statuses = StatusMask.DATA_AVAILABLE
waitset.attach(condition)
# Wait loop
while running:
active = waitset.wait(timeout=Duration.from_secs(1))
for cond in active:
if cond == condition:
samples = reader.take()
for sample, info in samples:
if info.valid_data:
process(sample)
# Detach and cleanup
waitset.detach(condition)
Async API
import asyncio
from hdds.async_api import AsyncDataReader, AsyncDataWriter
async def publisher():
participant = DomainParticipant(domain_id=0)
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()
writer: AsyncDataWriter[SensorData] = publisher.create_async_datawriter(topic)
for i in range(100):
sample = SensorData(sensor_id=1, value=float(i), timestamp=time.time_ns())
await writer.write(sample)
await asyncio.sleep(0.1)
async def subscriber():
participant = DomainParticipant(domain_id=0)
topic = participant.create_topic("SensorTopic", SensorData)
subscriber = participant.create_subscriber()
reader: AsyncDataReader[SensorData] = subscriber.create_async_datareader(topic)
async for sample, info in reader:
if info.valid_data:
print(f"Received: {sample}")
async def main():
await asyncio.gather(publisher(), subscriber())
asyncio.run(main())
Listeners
from hdds import DataWriterListener, DataReaderListener
class MyWriterListener(DataWriterListener):
def on_publication_matched(self, writer, status):
print(f"Matched readers: {status.current_count}")
def on_offered_deadline_missed(self, writer, status):
print(f"Deadline missed: {status.total_count}")
class MyReaderListener(DataReaderListener):
def on_data_available(self, reader):
samples = reader.take()
for sample, info in samples:
if info.valid_data:
self.process(sample)
def on_subscription_matched(self, reader, status):
print(f"Matched writers: {status.current_count}")
# Attach listeners
writer.set_listener(MyWriterListener())
reader.set_listener(MyReaderListener())
QoS Classes
Reliability
from hdds import Reliability, Duration
# Best effort
qos = DataWriterQos(reliability=Reliability.BEST_EFFORT)
# Reliable with timeout
qos = DataWriterQos(
reliability=Reliability.RELIABLE,
max_blocking_time=Duration.from_millis(100)
)
History
from hdds import History
# Keep last N samples
qos = DataWriterQos(history=History.keep_last(10))
# Keep all samples
qos = DataWriterQos(history=History.keep_all())
Durability
from hdds import Durability
qos = DataWriterQos(
durability=Durability.VOLATILE # No persistence
# durability=Durability.TRANSIENT_LOCAL # Persist for late joiners
# durability=Durability.TRANSIENT # Persist in memory
# durability=Durability.PERSISTENT # Persist to disk
)
Complete QoS Example
from hdds import (
DataWriterQos, Reliability, History, Durability,
Deadline, Liveliness, Duration
)
qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100),
durability=Durability.TRANSIENT_LOCAL,
deadline=Duration.from_millis(100),
liveliness=Liveliness.manual_by_topic(
lease_duration=Duration.from_secs(1)
),
)
Error Handling
from hdds import HddsError, TimeoutError, PreconditionNotMetError
try:
writer.write(sample)
except TimeoutError:
print("Write timed out - readers not keeping up")
except PreconditionNotMetError as e:
print(f"Precondition failed: {e}")
except HddsError as e:
print(f"DDS error: {e}")
Type Hints
from typing import Optional
from hdds import DataWriter, DataReader, DomainParticipant
def create_sensor_writer(
participant: DomainParticipant
) -> DataWriter[SensorData]:
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()
return publisher.create_datawriter(topic)
def read_sensors(
reader: DataReader[SensorData]
) -> list[SensorData]:
return [
sample
for sample, info in reader.take()
if info.valid_data
]
Complete Example
#!/usr/bin/env python3
"""HDDS Python example: Sensor publisher and subscriber."""
import time
import signal
import threading
from dataclasses import dataclass
import hdds
from hdds import (
DomainParticipant, DataWriterQos, DataReaderQos,
Reliability, History, Duration
)
@dataclass
class SensorData:
sensor_id: int
value: float
timestamp: int
running = True
def signal_handler(sig, frame):
global running
running = False
def publisher_thread(participant: DomainParticipant):
topic = participant.create_topic("SensorTopic", SensorData)
publisher = participant.create_publisher()
qos = DataWriterQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(10)
)
writer = publisher.create_datawriter(topic, qos=qos)
count = 0
while running:
sample = SensorData(
sensor_id=1,
value=20.0 + (count % 20),
timestamp=time.time_ns()
)
writer.write(sample)
print(f"Published: sensor={sample.sensor_id} value={sample.value:.1f}")
count += 1
time.sleep(0.1)
def subscriber_thread(participant: DomainParticipant):
topic = participant.create_topic("SensorTopic", SensorData)
subscriber = participant.create_subscriber()
qos = DataReaderQos(
reliability=Reliability.RELIABLE,
history=History.keep_last(100)
)
reader = subscriber.create_datareader(topic, qos=qos)
while running:
samples = reader.take()
for sample, info in samples:
if info.valid_data:
print(f"Received: sensor={sample.sensor_id} value={sample.value:.1f}")
time.sleep(0.01)
def main():
signal.signal(signal.SIGINT, signal_handler)
with DomainParticipant(domain_id=0) as participant:
pub_thread = threading.Thread(
target=publisher_thread, args=(participant,))
sub_thread = threading.Thread(
target=subscriber_thread, args=(participant,))
pub_thread.start()
sub_thread.start()
pub_thread.join()
sub_thread.join()
if __name__ == "__main__":
main()
Next Steps
- Hello World Python - Complete tutorial
- C++ API - C++ language bindings
- C API - C language bindings