Skip to main content

Python API Reference

HDDS provides Python bindings through the hdds package, offering a Pythonic API with context managers and fluent QoS configuration.

Version 1.0.0

This documents the current v1.0.0 SDK. The API operates on raw bytes; use hdds_gen to generate typed serialization code.

Installation

# Clone and build from source
git clone https://git.hdds.io/hdds/hdds.git
cd hdds/sdk/python

pip install maturin
maturin develop # Development install
# Or: maturin build # Build wheel

Quick Start

import hdds

# Initialize logging
hdds.logging.init(hdds.LogLevel.INFO)

# Create participant with context manager
with hdds.Participant("my_app") as p:
# Fluent QoS builder
qos = hdds.QoS.reliable().transient_local().history_depth(10)

# Create writer
writer = p.create_writer("hello/world", qos=qos)

# Publish raw bytes
writer.write(b"Hello, DDS!")

Participant

Entry point for all DDS operations.

Creation

from hdds import Participant

# Basic creation (domain 0)
participant = Participant("my_app")

# With domain ID
participant = Participant("my_app", domain_id=42)

# With discovery disabled (intra-process only)
participant = Participant("my_app", enable_discovery=False)

Context Manager

# Recommended: auto-cleanup with context manager
with Participant("my_app") as p:
writer = p.create_writer("topic")
writer.write(b"data")
# Participant and all entities automatically closed

Properties

name: str = participant.name          # Participant name
domain_id: int = participant.domain_id # Domain ID
pid: int = participant.participant_id # Unique ID within domain

Creating Writers/Readers

# Create writer
writer = participant.create_writer("topic")
writer = participant.create_writer("topic", qos=hdds.QoS.reliable())

# Create reader
reader = participant.create_reader("topic")
reader = participant.create_reader("topic", qos=hdds.QoS.reliable())

Cleanup

# Explicit cleanup (if not using context manager)
participant.close()

QoS Configuration

Fluent builder API for Quality of Service.

Factory Methods

from hdds import QoS

# Predefined profiles
qos = QoS.default() # BestEffort, Volatile
qos = QoS.reliable() # Reliable delivery
qos = QoS.best_effort() # Fire and forget
qos = QoS.rti_defaults() # RTI Connext compatible

# Load from XML file
qos = QoS.from_file("fastdds_profile.xml")

# Clone existing QoS
qos2 = qos.clone()

Fluent Builder

All methods return self for chaining:

qos = QoS.reliable() \
.transient_local() \
.history_depth(100) \
.deadline_ms(100) \
.deadline_secs(1) \
.lifespan_ms(5000) \
.lifespan_secs(5) \
.liveliness_automatic(lease_secs=1.0) \
.liveliness_manual_participant(lease_secs=0.5) \
.liveliness_manual_topic(lease_secs=0.25) \
.ownership_shared() \
.ownership_exclusive(strength=100) \
.partition("sensors") \
.time_based_filter_ms(10) \
.latency_budget_ms(50) \
.transport_priority(10) \
.resource_limits(
max_samples=1000,
max_instances=100,
max_samples_per_instance=10
)

Durability

qos.volatile()         # No persistence (default)
qos.transient_local() # Persist for late joiners

Inspection

qos.is_reliable() -> bool
qos.is_transient_local() -> bool
qos.is_ownership_exclusive() -> bool
qos.get_history_depth() -> int
qos.get_deadline_ns() -> int # 0 = infinite
qos.get_lifespan_ns() -> int # 0 = infinite
qos.get_ownership_strength() -> int
qos.get_liveliness_kind() -> LivelinessKind
qos.get_liveliness_lease_ns() -> int
qos.get_transport_priority() -> int

# String representation
print(qos) # QoS(reliable, transient_local, depth=100)

Enums

from hdds import Reliability, Durability, LivelinessKind, OwnershipKind

class LivelinessKind(Enum):
AUTOMATIC = 0
MANUAL_BY_PARTICIPANT = 1
MANUAL_BY_TOPIC = 2

DataWriter

Writers publish data to a topic.

Creation

# Create with default QoS
writer = participant.create_writer("topic")

# Create with custom QoS
qos = QoS.reliable().transient_local()
writer = participant.create_writer("topic", qos=qos)

Writing Data

# Write raw bytes
writer.write(b"Hello, DDS!")

# Write from bytearray
data = bytearray([1, 2, 3, 4])
writer.write(bytes(data))
Bytes Only

The write() method accepts only bytes. For typed data, serialize first using hdds_gen-generated code.

Properties

topic_name: str = writer.topic_name
qos: QoS = writer.qos

DataReader

Readers receive data from a topic.

Creation

# Create with default QoS
reader = participant.create_reader("topic")

# Create with custom QoS
qos = QoS.reliable().history_depth(100)
reader = participant.create_reader("topic", qos=qos)

Taking Data

# Take one sample (non-blocking)
data: bytes | None = reader.take()
if data is not None:
print(f"Received {len(data)} bytes")

# Take with custom buffer size
data = reader.take(buffer_size=1024)

# Polling loop
while running:
data = reader.take()
if data:
process(data)
time.sleep(0.001) # 1ms

Status Condition

# Get status condition for WaitSet integration
condition = reader.get_status_condition()

Properties

topic_name: str = reader.topic_name
qos: QoS = reader.qos

WaitSet

Event-driven waiting for data availability.

Basic Usage

from hdds import WaitSet

# Create waitset
waitset = WaitSet()

# Create reader and attach condition
reader = participant.create_reader("topic")
condition = reader.get_status_condition()
waitset.attach(condition)

# Wait loop
while running:
triggered = waitset.wait(timeout_secs=1.0)
if triggered:
while (data := reader.take()) is not None:
process(data)

# Cleanup
waitset.detach(condition)

Guard Conditions

from hdds import GuardCondition

# Create guard condition for custom signaling
guard = GuardCondition()
waitset.attach(guard)

# Trigger from another thread
import threading

def trigger_later():
time.sleep(1.0)
guard.set_trigger_value(True)

threading.Thread(target=trigger_later).start()

# Wait will return when guard is triggered
waitset.wait()

# Cleanup
waitset.detach(guard)

Infinite Wait

waitset.wait()  # Blocks until condition triggered

Logging

import hdds

# Initialize with level
hdds.logging.init(hdds.LogLevel.INFO)

# Available log levels
class LogLevel(Enum):
OFF = 0
ERROR = 1
WARN = 2
INFO = 3
DEBUG = 4
TRACE = 5

Telemetry

Built-in metrics collection.

Initialize

import hdds

# Initialize global metrics
metrics = hdds.telemetry.init()

# Get existing (if already initialized)
metrics = hdds.telemetry.get() # Returns None if not initialized

Snapshot

snapshot = metrics.snapshot()

print(f"Messages sent: {snapshot.messages_sent}")
print(f"Messages received: {snapshot.messages_received}")
print(f"Messages dropped: {snapshot.messages_dropped}")
print(f"Bytes sent: {snapshot.bytes_sent}")
print(f"Latency P50: {snapshot.latency_p50_ms:.2f}ms")
print(f"Latency P99: {snapshot.latency_p99_ms:.2f}ms")
print(f"Latency P99.9: {snapshot.latency_p999_ms:.2f}ms")

MetricsSnapshot fields:

@dataclass
class MetricsSnapshot:
timestamp_ns: int
messages_sent: int
messages_received: int
messages_dropped: int
bytes_sent: int
latency_p50_ns: int
latency_p99_ns: int
latency_p999_ns: int
merge_full_count: int
would_block_count: int

# Convenience properties
@property
def latency_p50_ms(self) -> float: ...
@property
def latency_p99_ms(self) -> float: ...
@property
def latency_p999_ms(self) -> float: ...

Exporter (HDDS Viewer)

# Start telemetry server for HDDS Viewer
exporter = hdds.telemetry.start_exporter("127.0.0.1", 4242)

# ... application runs ...

exporter.stop()

Manual Latency Recording

import time

start_ns = time.time_ns()
# ... operation ...
end_ns = time.time_ns()

metrics.record_latency(start_ns, end_ns)

Error Handling

from hdds import HddsException, HddsError

try:
writer.write(b"data")
except HddsException as e:
print(f"HDDS error: {e}")

# Error codes
class HddsError(Enum):
OK = 0
INVALID_ARGUMENT = 1
NOT_FOUND = 2
OPERATION_FAILED = 3
OUT_OF_MEMORY = 4

Complete Example

#!/usr/bin/env python3
"""HDDS Python SDK example: Basic pub/sub."""

import time
import threading
import hdds

def publisher(participant: hdds.Participant):
"""Publish messages every second."""
qos = hdds.QoS.reliable().transient_local().history_depth(10)
writer = participant.create_writer("hello/world", qos=qos)

for i in range(10):
message = f"Hello #{i}".encode()
writer.write(message)
print(f"Published: {message.decode()}")
time.sleep(1.0)

def subscriber(participant: hdds.Participant):
"""Subscribe and print messages."""
qos = hdds.QoS.reliable().history_depth(100)
reader = participant.create_reader("hello/world", qos=qos)

waitset = hdds.WaitSet()
condition = reader.get_status_condition()
waitset.attach(condition)

received = 0
while received < 10:
if waitset.wait(timeout_secs=5.0):
while (data := reader.take()) is not None:
print(f"Received: {data.decode()}")
received += 1

waitset.detach(condition)

def main():
# Initialize logging
hdds.logging.init(hdds.LogLevel.INFO)

# Initialize telemetry
metrics = hdds.telemetry.init()

with hdds.Participant("example") as participant:
# Start subscriber thread
sub_thread = threading.Thread(
target=subscriber, args=(participant,))
sub_thread.start()

# Give subscriber time to initialize
time.sleep(0.5)

# Run publisher in main thread
publisher(participant)

# Wait for subscriber
sub_thread.join()

# Print metrics
snap = metrics.snapshot()
print(f"\nMetrics:")
print(f" Messages sent: {snap.messages_sent}")
print(f" Messages received: {snap.messages_received}")

if __name__ == "__main__":
main()

Using with Typed Data

The Python SDK operates on raw bytes. For typed data, use hdds_gen to generate Python serialization code:

hddsgen gen python Temperature.idl -o temperature.py
# temperature.py (generated)
from dataclasses import dataclass

@dataclass
class Temperature:
sensor_id: int
value: float
unit: str

def temperature_encode_cdr2_le(t: Temperature) -> bytes:
...

def temperature_decode_cdr2_le(data: bytes) -> Temperature:
...

Usage:

from temperature import Temperature, temperature_encode_cdr2_le, temperature_decode_cdr2_le
import hdds

with hdds.Participant("sensor") as p:
writer = p.create_writer("sensors/temp")
reader = p.create_reader("sensors/temp")

# Publish typed data
temp = Temperature(sensor_id=1, value=23.5, unit="celsius")
writer.write(temperature_encode_cdr2_le(temp))

# Receive typed data
if (data := reader.take()) is not None:
temp = temperature_decode_cdr2_le(data)
print(f"Sensor {temp.sensor_id}: {temp.value} {temp.unit}")

Thread Safety

  • Participant creation/close: NOT thread-safe
  • Writer/Reader creation: NOT thread-safe
  • writer.write(): Thread-safe
  • reader.take(): NOT thread-safe (use one reader per thread)
  • QoS methods: NOT thread-safe
  • WaitSet: NOT thread-safe

Module Structure

hdds/
├── __init__.py # Main exports
├── participant.py # Participant class
├── qos.py # QoS class and enums
├── entities.py # DataWriter, DataReader
├── waitset.py # WaitSet, GuardCondition
├── logging.py # Logging utilities
├── telemetry.py # Metrics collection
└── _native.py # FFI bindings (internal)

Not Yet Implemented (v1.0.0)

FeatureStatus
Async API (async/await)Not implemented
DataWriterListener / DataReaderListenerNot implemented
Instance management (dispose, unregister)Not implemented
SampleInfo with metadataNot implemented
Typed API (without hdds_gen)Not implemented
Content-filtered topicsNot implemented

Next Steps