Aller au contenu principal

Hello World en Python

Python est le moyen le plus rapide pour demarrer avec HDDS. Ce tutoriel prend environ 5 minutes.

Duree : ~5 minutes Prerequis : Package Python HDDS installe

Etape 1 : Installer HDDS

# Clone and build HDDS Python bindings
git clone https://git.hdds.io/hdds/hdds.git
cd hdds/sdk/python
pip install maturin
maturin develop

Etape 2 : Definir le type de donnees

Creez temperature.py :

from dataclasses import dataclass
from hdds import Topic, Key

@dataclass
class Temperature(Topic):
"""Temperature reading from a sensor."""

sensor_id: Key[str] # Instance key
value: float # Temperature in Celsius
timestamp: int # Unix timestamp in ms
Type hints Python

HDDS utilise les type hints Python pour generer le type DDS. L'annotation Key[str] marque sensor_id comme cle d'instance.

Etape 3 : Creer le Publisher

Creez publisher.py :

#!/usr/bin/env python3
import time
import hdds
from temperature import Temperature

def main():
print("Starting temperature publisher...")

# 1. Create DomainParticipant
participant = hdds.DomainParticipant(domain_id=0)
print("Joined domain 0")

# 2. Create Topic
topic = participant.create_topic("temperature/room1", Temperature)
print("Created topic: temperature/room1")

# 3. Create Publisher and DataWriter
publisher = participant.create_publisher()
writer = publisher.create_writer(topic)
print("DataWriter created, waiting for subscribers...")

# 4. Wait for subscribers
writer.wait_for_subscribers(count=1, timeout=30.0)
print("Subscriber connected!")

# 5. Publish temperature readings
for i in range(10):
temp = Temperature(
sensor_id="sensor-001",
value=22.0 + (i * 0.5),
timestamp=int(time.time() * 1000)
)

writer.write(temp)
print(f"Published: sensor={temp.sensor_id}, temp={temp.value:.1f}C")

time.sleep(1)

print("Publisher finished")

if __name__ == "__main__":
main()

Etape 4 : Creer le Subscriber

Creez subscriber.py :

#!/usr/bin/env python3
import hdds
from temperature import Temperature

def main():
print("Starting temperature subscriber...")

# 1. Create DomainParticipant
participant = hdds.DomainParticipant(domain_id=0)
print("Joined domain 0")

# 2. Create Topic
topic = participant.create_topic("temperature/room1", Temperature)
print("Created topic: temperature/room1")

# 3. Create Subscriber and DataReader
subscriber = participant.create_subscriber()
reader = subscriber.create_reader(topic)
print("DataReader created, waiting for data...")

# 4. Read samples in a loop
while True:
if reader.wait_for_data(timeout=5.0):
# Take all available samples
for sample in reader.take():
print(
f"Received: sensor={sample.sensor_id}, "
f"temp={sample.value:.1f}C, "
f"time={sample.timestamp}"
)
else:
print("No data received in 5 seconds, waiting...")

if __name__ == "__main__":
main()

Etape 5 : Executer

Ouvrez deux terminaux :

# Terminal 1
python subscriber.py

# Terminal 2
python publisher.py

Utiliser Async/Await

HDDS supporte le async/await de Python pour des E/S non bloquantes :

#!/usr/bin/env python3
import asyncio
import hdds
from temperature import Temperature

async def main():
participant = hdds.DomainParticipant(domain_id=0)
topic = participant.create_topic("temperature/room1", Temperature)
reader = participant.create_subscriber().create_reader(topic)

async for sample in reader:
print(f"Received: {sample.sensor_id} = {sample.value}C")

if __name__ == "__main__":
asyncio.run(main())

Utiliser des callbacks

Style evenementiel avec des callbacks :

import hdds
from temperature import Temperature

def on_data(reader):
for sample in reader.take():
print(f"Received: {sample.value}C")

participant = hdds.DomainParticipant(0)
topic = participant.create_topic("temperature/room1", Temperature)
reader = participant.create_subscriber().create_reader(topic)

reader.on_data_available(on_data)

# Keep running
hdds.spin()

Configuration QoS

from hdds import QoS, Reliability, Durability, History

# Reliable with history
qos = QoS(
reliability=Reliability.RELIABLE,
durability=Durability.TRANSIENT_LOCAL,
history=History.keep_last(10)
)

writer = publisher.create_writer(topic, qos=qos)

Types complexes

HDDS supporte les types Python complexes :

from dataclasses import dataclass
from typing import List, Optional
from hdds import Topic, Key

@dataclass
class SensorReading(Topic):
sensor_id: Key[str]
values: List[float] # Sequence
location: Optional[str] # Optional field
metadata: dict[str, str] # Map

@dataclass
class Point:
x: float
y: float
z: float

@dataclass
class Pose(Topic):
robot_id: Key[str]
position: Point # Nested struct
orientation: List[float] # Quaternion

Integration avec NumPy

import numpy as np
from hdds import Topic, Key
from dataclasses import dataclass

@dataclass
class LidarScan(Topic):
sensor_id: Key[str]
ranges: np.ndarray # Will be serialized as sequence<float>
intensities: np.ndarray

# Write numpy arrays directly
scan = LidarScan(
sensor_id="lidar-001",
ranges=np.random.rand(360) * 10,
intensities=np.random.rand(360)
)
writer.write(scan)

Prochaines etapes