Skip to main content

Hello World in Python

Python is the fastest way to get started with HDDS. This tutorial takes about 5 minutes.

Time: ~5 minutes Prerequisites: HDDS Python package installed

Step 1: Install HDDS

pip install hdds

Step 2: Define the Data Type

Create temperature.py:

from dataclasses import dataclass
from hdds import Topic, Key

@dataclass
class Temperature(Topic):
"""Temperature reading from a sensor."""

sensor_id: Key[str] # Instance key
value: float # Temperature in Celsius
timestamp: int # Unix timestamp in ms
Python Type Hints

HDDS uses Python type hints to generate the DDS type. The Key[str] annotation marks sensor_id as the instance key.

Step 3: Create the Publisher

Create publisher.py:

#!/usr/bin/env python3
import time
import hdds
from temperature import Temperature

def main():
print("Starting temperature publisher...")

# 1. Create DomainParticipant
participant = hdds.DomainParticipant(domain_id=0)
print("Joined domain 0")

# 2. Create Topic
topic = participant.create_topic("temperature/room1", Temperature)
print("Created topic: temperature/room1")

# 3. Create Publisher and DataWriter
publisher = participant.create_publisher()
writer = publisher.create_writer(topic)
print("DataWriter created, waiting for subscribers...")

# 4. Wait for subscribers
writer.wait_for_subscribers(count=1, timeout=30.0)
print("Subscriber connected!")

# 5. Publish temperature readings
for i in range(10):
temp = Temperature(
sensor_id="sensor-001",
value=22.0 + (i * 0.5),
timestamp=int(time.time() * 1000)
)

writer.write(temp)
print(f"Published: sensor={temp.sensor_id}, temp={temp.value:.1f}°C")

time.sleep(1)

print("Publisher finished")

if __name__ == "__main__":
main()

Step 4: Create the Subscriber

Create subscriber.py:

#!/usr/bin/env python3
import hdds
from temperature import Temperature

def main():
print("Starting temperature subscriber...")

# 1. Create DomainParticipant
participant = hdds.DomainParticipant(domain_id=0)
print("Joined domain 0")

# 2. Create Topic
topic = participant.create_topic("temperature/room1", Temperature)
print("Created topic: temperature/room1")

# 3. Create Subscriber and DataReader
subscriber = participant.create_subscriber()
reader = subscriber.create_reader(topic)
print("DataReader created, waiting for data...")

# 4. Read samples in a loop
while True:
if reader.wait_for_data(timeout=5.0):
# Take all available samples
for sample in reader.take():
print(
f"Received: sensor={sample.sensor_id}, "
f"temp={sample.value:.1f}°C, "
f"time={sample.timestamp}"
)
else:
print("No data received in 5 seconds, waiting...")

if __name__ == "__main__":
main()

Step 5: Run

Open two terminals:

# Terminal 1
python subscriber.py

# Terminal 2
python publisher.py

Using Async/Await

HDDS supports Python's async/await for non-blocking I/O:

#!/usr/bin/env python3
import asyncio
import hdds
from temperature import Temperature

async def main():
participant = hdds.DomainParticipant(domain_id=0)
topic = participant.create_topic("temperature/room1", Temperature)
reader = participant.create_subscriber().create_reader(topic)

async for sample in reader:
print(f"Received: {sample.sensor_id} = {sample.value}°C")

if __name__ == "__main__":
asyncio.run(main())

Using Callbacks

Event-driven style with callbacks:

import hdds
from temperature import Temperature

def on_data(reader):
for sample in reader.take():
print(f"Received: {sample.value}°C")

participant = hdds.DomainParticipant(0)
topic = participant.create_topic("temperature/room1", Temperature)
reader = participant.create_subscriber().create_reader(topic)

reader.on_data_available(on_data)

# Keep running
hdds.spin()

QoS Configuration

from hdds import QoS, Reliability, Durability, History

# Reliable with history
qos = QoS(
reliability=Reliability.RELIABLE,
durability=Durability.TRANSIENT_LOCAL,
history=History.keep_last(10)
)

writer = publisher.create_writer(topic, qos=qos)

Complex Types

HDDS supports complex Python types:

from dataclasses import dataclass
from typing import List, Optional
from hdds import Topic, Key

@dataclass
class SensorReading(Topic):
sensor_id: Key[str]
values: List[float] # Sequence
location: Optional[str] # Optional field
metadata: dict[str, str] # Map

@dataclass
class Point:
x: float
y: float
z: float

@dataclass
class Pose(Topic):
robot_id: Key[str]
position: Point # Nested struct
orientation: List[float] # Quaternion

Integration with NumPy

import numpy as np
from hdds import Topic, Key
from dataclasses import dataclass

@dataclass
class LidarScan(Topic):
sensor_id: Key[str]
ranges: np.ndarray # Will be serialized as sequence<float>
intensities: np.ndarray

# Write numpy arrays directly
scan = LidarScan(
sensor_id="lidar-001",
ranges=np.random.rand(360) * 10,
intensities=np.random.rand(360)
)
writer.write(scan)

What's Next?