Aller au contenu principal

Python API Reference

HDDS provides Python bindings through the hdds package, offering a Pythonic API with context managers and fluent QoS configuration.

Version 1.0.0

This documents the current v1.0.0 SDK. The API operates on raw bytes; use hdds_gen to generate typed serialization code.

Installation

# Clone and build from source
git clone https://git.hdds.io/hdds/hdds.git
cd hdds/sdk/python

pip install maturin
maturin develop # Development install
# Or: maturin build # Build wheel

Quick Start

import hdds

# Initialize logging
hdds.logging.init(hdds.LogLevel.INFO)

# Create participant with context manager
with hdds.Participant("my_app") as p:
# Fluent QoS builder
qos = hdds.QoS.reliable().transient_local().history_depth(10)

# Create writer
writer = p.create_writer("hello/world", qos=qos)

# Publish raw bytes
writer.write(b"Hello, DDS!")

Participant

Entry point for all DDS operations.

Creation

from hdds import Participant

# Basic creation (domain 0)
participant = Participant("my_app")

# With domain ID
participant = Participant("my_app", domain_id=42)

# With discovery disabled (intra-process only)
participant = Participant("my_app", enable_discovery=False)

Context Manager

# Recommended: auto-cleanup with context manager
with Participant("my_app") as p:
writer = p.create_writer("topic")
writer.write(b"data")
# Participant and all entities automatically closed

Properties

name: str = participant.name          # Participant name
domain_id: int = participant.domain_id # Domain ID
pid: int = participant.participant_id # Unique ID within domain

Creating Writers/Readers

# Create writer
writer = participant.create_writer("topic")
writer = participant.create_writer("topic", qos=hdds.QoS.reliable())

# Create reader
reader = participant.create_reader("topic")
reader = participant.create_reader("topic", qos=hdds.QoS.reliable())

Cleanup

# Explicit cleanup (if not using context manager)
participant.close()

QoS Configuration

Fluent builder API for Quality of Service.

Factory Methods

from hdds import QoS

# Predefined profiles
qos = QoS.default() # BestEffort, Volatile
qos = QoS.reliable() # Reliable delivery
qos = QoS.best_effort() # Fire and forget
qos = QoS.rti_defaults() # RTI Connext compatible

# Load from XML file
qos = QoS.from_file("fastdds_profile.xml")

# Clone existing QoS
qos2 = qos.clone()

Fluent Builder

All methods return self for chaining:

qos = QoS.reliable() \
.transient_local() \
.history_depth(100) \
.deadline_ms(100) \
.deadline_secs(1) \
.lifespan_ms(5000) \
.lifespan_secs(5) \
.liveliness_automatic(lease_secs=1.0) \
.liveliness_manual_participant(lease_secs=0.5) \
.liveliness_manual_topic(lease_secs=0.25) \
.ownership_shared() \
.ownership_exclusive(strength=100) \
.partition("sensors") \
.time_based_filter_ms(10) \
.latency_budget_ms(50) \
.transport_priority(10) \
.resource_limits(
max_samples=1000,
max_instances=100,
max_samples_per_instance=10
)

Durability

qos.volatile()         # No persistence (default)
qos.transient_local() # Persist for late joiners

Inspection

qos.is_reliable() -> bool
qos.is_transient_local() -> bool
qos.is_ownership_exclusive() -> bool
qos.get_history_depth() -> int
qos.get_deadline_ns() -> int # 0 = infinite
qos.get_lifespan_ns() -> int # 0 = infinite
qos.get_ownership_strength() -> int
qos.get_liveliness_kind() -> LivelinessKind
qos.get_liveliness_lease_ns() -> int
qos.get_transport_priority() -> int

# String representation
print(qos) # QoS(reliable, transient_local, depth=100)

Enums

from hdds import Reliability, Durability, LivelinessKind, OwnershipKind

class LivelinessKind(Enum):
AUTOMATIC = 0
MANUAL_BY_PARTICIPANT = 1
MANUAL_BY_TOPIC = 2

DataWriter

Writers publish data to a topic.

Creation

# Create with default QoS
writer = participant.create_writer("topic")

# Create with custom QoS
qos = QoS.reliable().transient_local()
writer = participant.create_writer("topic", qos=qos)

Writing Data

# Write raw bytes
writer.write(b"Hello, DDS!")

# Write from bytearray
data = bytearray([1, 2, 3, 4])
writer.write(bytes(data))
Bytes Only

The write() method accepts only bytes. For typed data, serialize first using hdds_gen-generated code.

Properties

topic_name: str = writer.topic_name
qos: QoS = writer.qos

DataReader

Readers receive data from a topic.

Creation

# Create with default QoS
reader = participant.create_reader("topic")

# Create with custom QoS
qos = QoS.reliable().history_depth(100)
reader = participant.create_reader("topic", qos=qos)

Taking Data

# Take one sample (non-blocking)
data: bytes | None = reader.take()
if data is not None:
print(f"Received {len(data)} bytes")

# Take with custom buffer size
data = reader.take(buffer_size=1024)

# Polling loop
while running:
data = reader.take()
if data:
process(data)
time.sleep(0.001) # 1ms

Status Condition

# Get status condition for WaitSet integration
condition = reader.get_status_condition()

Properties

topic_name: str = reader.topic_name
qos: QoS = reader.qos

WaitSet

Event-driven waiting for data availability.

Basic Usage

from hdds import WaitSet

# Create waitset
waitset = WaitSet()

# Create reader and attach condition
reader = participant.create_reader("topic")
condition = reader.get_status_condition()
waitset.attach(condition)

# Wait loop
while running:
triggered = waitset.wait(timeout_secs=1.0)
if triggered:
while (data := reader.take()) is not None:
process(data)

# Cleanup
waitset.detach(condition)

Guard Conditions

from hdds import GuardCondition

# Create guard condition for custom signaling
guard = GuardCondition()
waitset.attach(guard)

# Trigger from another thread
import threading

def trigger_later():
time.sleep(1.0)
guard.set_trigger_value(True)

threading.Thread(target=trigger_later).start()

# Wait will return when guard is triggered
waitset.wait()

# Cleanup
waitset.detach(guard)

Infinite Wait

waitset.wait()  # Blocks until condition triggered

Logging

import hdds

# Initialize with level
hdds.logging.init(hdds.LogLevel.INFO)

# Available log levels
class LogLevel(Enum):
OFF = 0
ERROR = 1
WARN = 2
INFO = 3
DEBUG = 4
TRACE = 5

Telemetry

Built-in metrics collection.

Initialize

import hdds

# Initialize global metrics
metrics = hdds.telemetry.init()

# Get existing (if already initialized)
metrics = hdds.telemetry.get() # Returns None if not initialized

Snapshot

snapshot = metrics.snapshot()

print(f"Messages sent: {snapshot.messages_sent}")
print(f"Messages received: {snapshot.messages_received}")
print(f"Messages dropped: {snapshot.messages_dropped}")
print(f"Bytes sent: {snapshot.bytes_sent}")
print(f"Latency P50: {snapshot.latency_p50_ms:.2f}ms")
print(f"Latency P99: {snapshot.latency_p99_ms:.2f}ms")
print(f"Latency P99.9: {snapshot.latency_p999_ms:.2f}ms")

MetricsSnapshot fields:

@dataclass
class MetricsSnapshot:
timestamp_ns: int
messages_sent: int
messages_received: int
messages_dropped: int
bytes_sent: int
latency_p50_ns: int
latency_p99_ns: int
latency_p999_ns: int
merge_full_count: int
would_block_count: int

# Convenience properties
@property
def latency_p50_ms(self) -> float: ...
@property
def latency_p99_ms(self) -> float: ...
@property
def latency_p999_ms(self) -> float: ...

Exporter (HDDS Viewer)

# Start telemetry server for HDDS Viewer
exporter = hdds.telemetry.start_exporter("127.0.0.1", 4242)

# ... application runs ...

exporter.stop()

Manual Latency Recording

import time

start_ns = time.time_ns()
# ... operation ...
end_ns = time.time_ns()

metrics.record_latency(start_ns, end_ns)

Error Handling

from hdds import HddsException, HddsError

try:
writer.write(b"data")
except HddsException as e:
print(f"HDDS error: {e}")

# Error codes
class HddsError(Enum):
OK = 0
INVALID_ARGUMENT = 1
NOT_FOUND = 2
OPERATION_FAILED = 3
OUT_OF_MEMORY = 4

Complete Example

#!/usr/bin/env python3
"""HDDS Python SDK example: Basic pub/sub."""

import time
import threading
import hdds

def publisher(participant: hdds.Participant):
"""Publish messages every second."""
qos = hdds.QoS.reliable().transient_local().history_depth(10)
writer = participant.create_writer("hello/world", qos=qos)

for i in range(10):
message = f"Hello #{i}".encode()
writer.write(message)
print(f"Published: {message.decode()}")
time.sleep(1.0)

def subscriber(participant: hdds.Participant):
"""Subscribe and print messages."""
qos = hdds.QoS.reliable().history_depth(100)
reader = participant.create_reader("hello/world", qos=qos)

waitset = hdds.WaitSet()
condition = reader.get_status_condition()
waitset.attach(condition)

received = 0
while received < 10:
if waitset.wait(timeout_secs=5.0):
while (data := reader.take()) is not None:
print(f"Received: {data.decode()}")
received += 1

waitset.detach(condition)

def main():
# Initialize logging
hdds.logging.init(hdds.LogLevel.INFO)

# Initialize telemetry
metrics = hdds.telemetry.init()

with hdds.Participant("example") as participant:
# Start subscriber thread
sub_thread = threading.Thread(
target=subscriber, args=(participant,))
sub_thread.start()

# Give subscriber time to initialize
time.sleep(0.5)

# Run publisher in main thread
publisher(participant)

# Wait for subscriber
sub_thread.join()

# Print metrics
snap = metrics.snapshot()
print(f"\nMetrics:")
print(f" Messages sent: {snap.messages_sent}")
print(f" Messages received: {snap.messages_received}")

if __name__ == "__main__":
main()

Using with Typed Data

The Python SDK operates on raw bytes. For typed data, use hdds_gen to generate Python serialization code:

hddsgen gen python Temperature.idl -o temperature.py
# temperature.py (generated)
from dataclasses import dataclass

@dataclass
class Temperature:
sensor_id: int
value: float
unit: str

def temperature_encode_cdr2_le(t: Temperature) -> bytes:
...

def temperature_decode_cdr2_le(data: bytes) -> Temperature:
...

Usage:

from temperature import Temperature, temperature_encode_cdr2_le, temperature_decode_cdr2_le
import hdds

with hdds.Participant("sensor") as p:
writer = p.create_writer("sensors/temp")
reader = p.create_reader("sensors/temp")

# Publish typed data
temp = Temperature(sensor_id=1, value=23.5, unit="celsius")
writer.write(temperature_encode_cdr2_le(temp))

# Receive typed data
if (data := reader.take()) is not None:
temp = temperature_decode_cdr2_le(data)
print(f"Sensor {temp.sensor_id}: {temp.value} {temp.unit}")

Thread Safety

  • Participant creation/close: NOT thread-safe
  • Writer/Reader creation: NOT thread-safe
  • writer.write(): Thread-safe
  • reader.take(): NOT thread-safe (use one reader per thread)
  • QoS methods: NOT thread-safe
  • WaitSet: NOT thread-safe

Module Structure

hdds/
├── __init__.py # Main exports
├── participant.py # Participant class
├── qos.py # QoS class and enums
├── entities.py # DataWriter, DataReader
├── waitset.py # WaitSet, GuardCondition
├── listener.py # ReaderListener, WriterListener, status types
├── logging.py # Logging utilities
├── telemetry.py # Metrics collection
└── _native.py # FFI bindings (internal)

Listeners

Listeners provide callback-based notification for DDS entity events. This is an alternative to the polling-based WaitSet pattern. Import from hdds.listener:

from hdds.listener import ReaderListener, WriterListener

ReaderListener

Base class for DataReader event callbacks. Override the methods you need; unoverridden methods are no-ops.

class ReaderListener:
def on_data_available(self, data: bytes) -> None:
"""Called when new data is available. Receives raw serialized bytes."""
pass

def on_subscription_matched(self, status: SubscriptionMatchedStatus) -> None:
"""Called when the reader matches/unmatches with a writer."""
pass

def on_liveliness_changed(self, status: LivelinessChangedStatus) -> None:
"""Called when liveliness of a matched writer changes."""
pass

def on_sample_lost(self, status: SampleLostStatus) -> None:
"""Called when samples are lost (gap in sequence numbers)."""
pass

def on_sample_rejected(self, status: SampleRejectedStatus) -> None:
"""Called when samples are rejected due to resource limits."""
pass

def on_deadline_missed(self, status: DeadlineMissedStatus) -> None:
"""Called when the requested deadline is missed."""
pass

def on_incompatible_qos(self, status: IncompatibleQosStatus) -> None:
"""Called when QoS is incompatible with a matched writer."""
pass

WriterListener

Base class for DataWriter event callbacks. Override the methods you need.

class WriterListener:
def on_sample_written(self, data: bytes, sequence_number: int) -> None:
"""Called after a sample is successfully written."""
pass

def on_publication_matched(self, status: PublicationMatchedStatus) -> None:
"""Called when the writer matches/unmatches with a reader."""
pass

def on_offered_deadline_missed(self, instance_handle: int) -> None:
"""Called when an offered deadline is missed."""
pass

def on_offered_incompatible_qos(self, policy_id: int, policy_name: str) -> None:
"""Called when QoS is incompatible with a matched reader."""
pass

def on_liveliness_lost(self) -> None:
"""Called when liveliness is lost (MANUAL_BY_* only)."""
pass

Status Types

from hdds.listener import (
SubscriptionMatchedStatus,
PublicationMatchedStatus,
LivelinessChangedStatus,
SampleLostStatus,
SampleRejectedStatus,
DeadlineMissedStatus,
IncompatibleQosStatus,
)
StatusFields
SubscriptionMatchedStatustotal_count, total_count_change, current_count, current_count_change
PublicationMatchedStatustotal_count, total_count_change, current_count, current_count_change
LivelinessChangedStatusalive_count, alive_count_change, not_alive_count, not_alive_count_change
SampleLostStatustotal_count, total_count_change
SampleRejectedStatustotal_count, total_count_change, last_reason (0=NotRejected, 1=ResourceLimit, 2=InstanceLimit, 3=SamplesPerInstanceLimit)
DeadlineMissedStatustotal_count, total_count_change
IncompatibleQosStatustotal_count, total_count_change, last_policy_id

Installing Listeners

Use set_reader_listener() and set_writer_listener() from hdds.listener to attach a listener. Use the corresponding clear_* functions to remove them.

from hdds.listener import (
set_reader_listener,
clear_reader_listener,
set_writer_listener,
clear_writer_listener,
)

# Install a ReaderListener
set_reader_listener(reader._handle, my_reader_listener)

# Remove a ReaderListener
clear_reader_listener(reader._handle)

# Install a WriterListener
set_writer_listener(writer._handle, my_writer_listener)

# Remove a WriterListener
clear_writer_listener(writer._handle)

Complete Listener Example

import hdds
from hdds.listener import ReaderListener, WriterListener, set_reader_listener

class SensorListener(ReaderListener):
def __init__(self):
self.count = 0

def on_data_available(self, data: bytes):
self.count += 1
print(f"Sample received ({len(data)} bytes)")

def on_subscription_matched(self, status):
print(f"Matched writers: {status.current_count} "
f"(total: {status.total_count})")

def on_sample_lost(self, status):
print(f"WARNING: {status.total_count_change} sample(s) lost")

class WriteTracker(WriterListener):
def on_sample_written(self, data: bytes, sequence_number: int):
print(f"Wrote {len(data)} bytes (seq={sequence_number})")

def on_publication_matched(self, status):
print(f"Matched readers: {status.current_count}")

with hdds.Participant("listener_demo") as p:
qos = hdds.QoS.reliable().transient_local()
reader = p.create_reader("sensors/temp", qos=qos)

listener = SensorListener()
set_reader_listener(reader._handle, listener)

# ... application runs; callbacks fire on background threads ...
Lifetime and Thread Safety
  • Keep a reference to the listener object as long as it is attached. The Python CFUNCTYPE callbacks are prevented from garbage collection internally, but the listener itself must stay alive.
  • Callbacks are invoked from background threads. Implementations should return quickly and must not block.

Not Yet Implemented (v1.0.0)

FeatureStatus
Async API (async/await)Not implemented
Instance management (dispose, unregister)Not implemented
SampleInfo with metadataNot implemented
Typed API (without hdds_gen)Not implemented
Content-filtered topicsNot implemented

Next Steps