Aller au contenu principal

Publishers et Subscribers

Dans HDDS, les DataWriters publient des données et les DataReaders s'abonnent aux données. Le Participant crée les Topics, et les Topics créent les Writers et Readers.

Vue d'ensemble

Participant
+-- Topic<SensorData> ("SensorTopic")
| +-- DataWriter (écrit des échantillons)
| +-- DataReader (lit des échantillons)
|
+-- Topic<Command> ("CommandTopic")
+-- DataWriter (écrit des échantillons)
+-- DataReader (lit des échantillons)

Créer des Writers et Readers

Patron de base

use hdds::{Participant, QoS, TransportMode};

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

// Create topic
let topic = participant.topic::<SensorData>("SensorTopic")?;

// Create writer
let writer = topic.writer().build()?;

// Create reader
let reader = topic.reader().build()?;

Avec QoS

use hdds::{Participant, QoS, TransportMode};

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorData>("SensorTopic")?;

// Writer with reliable QoS
let writer = topic
.writer()
.qos(QoS::reliable().keep_last(100).transient_local())
.build()?;

// Reader with reliable QoS
let reader = topic
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;

DataWriter

Les DataWriters sont les endpoints qui publient des données.

Écrire des données

use hdds::{Participant, QoS, DDS, TransportMode};

#[derive(Debug, Clone, DDS)]
struct SensorData {
sensor_id: u32,
value: f32,
timestamp: u64,
}

let participant = Participant::builder("sensor_publisher")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorData>("SensorTopic")?;
let writer = topic.writer().qos(QoS::reliable()).build()?;

// Basic write
let sample = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: 1234567890,
};
writer.write(&sample)?;

Statut du Writer

// Check matched readers
let matched = writer.matched_subscriptions();
println!("Matched {} readers", matched.len());

DataReader

Les DataReaders sont les endpoints qui reçoivent des données.

Lire des données

use hdds::{Participant, QoS, TransportMode};

let participant = Participant::builder("sensor_subscriber")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorData>("SensorTopic")?;
let reader = topic.reader().qos(QoS::reliable()).build()?;

// Take samples (removes from cache)
while let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}

Bouclé de polling

use std::time::Duration;
use std::thread;

loop {
// Try to take available samples
while let Some(sample) = reader.try_take()? {
println!("Received: sensor={}, value={}", sample.sensor_id, sample.value);
}

// Small delay to avoid busy-waiting
thread::sleep(Duration::from_millis(100));
}

Alternative : Création directe de Writer/Reader

Vous pouvez aussi créer des Writers et Readers directement depuis le Participant :

use hdds::{Participant, QoS, TransportMode};

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

// Create writer directly with topic name and QoS
let writer = participant
.create_writer::<SensorData>("SensorTopic", QoS::reliable())?;

// Create reader directly with topic name and QoS
let reader = participant
.create_reader::<SensorData>("SensorTopic", QoS::reliable())?;

Communication intra-processus

Pour la communication dans le même processus (zéro-copy) :

use hdds::{Participant, QoS, TransportMode};

let participant = Participant::builder("intra_process_app")
.domain_id(0)
.with_transport(TransportMode::IntraProcess)
.build()?;

let topic = participant.topic::<SensorData>("internal/data")?;

let writer = topic.writer().qos(QoS::reliable()).build()?;
let reader = topic.reader().qos(QoS::reliable()).build()?;

// Bind reader to writer for in-process delivery
reader.bind_to_writer(writer.merger());

// Data is now shared without serialization
writer.write(&sample)?;
while let Some(data) = reader.try_take()? {
println!("Received: {:?}", data);
}

WaitSet pour la lecture evenementielle

Attendre les données efficacement :

use hdds::{Participant, QoS, WaitSet, TransportMode};
use std::time::Duration;

let participant = Participant::builder("waitset_example")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorData>("SensorTopic")?;
let reader = topic.reader().qos(QoS::reliable()).build()?;

// Get status condition and create waitset
let condition = reader.get_status_condition();
let mut waitset = WaitSet::new();
waitset.attach(&condition)?;

loop {
// Wait for data (blocks until data available or timeout)
let _active = waitset.wait(Some(Duration::from_secs(5)))?;

// Take all available samples
while let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}
}

Bonnes pratiques

  1. Faire correspondre les QoS entre Writers et Readers - Assurer la compatibilité
  2. Utiliser les presets QoS appropriées - QoS::reliable() pour la livraison garantie, QoS::best_effort() pour la vitesse
  3. Utiliser try_take() dans les boucles - Patron de lecture non-bloquant
  4. Envisager le WaitSet pour l'efficacite - Éviter l'attente active

Patrons courants

Requête-Réponse

use hdds::{Participant, QoS, DDS, TransportMode};

#[derive(Debug, Clone, DDS)]
struct Request {
request_id: u32,
query: String,
}

#[derive(Debug, Clone, DDS)]
struct Reply {
request_id: u32,
result: String,
}

let participant = Participant::builder("request_reply")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

// Client side
let request_topic = participant.topic::<Request>("service/request")?;
let reply_topic = participant.topic::<Reply>("service/reply")?;

let request_writer = request_topic.writer().qos(QoS::reliable()).build()?;
let reply_reader = reply_topic.reader().qos(QoS::reliable()).build()?;

// Send request
request_writer.write(&Request { request_id: 1, query: "hello".into() })?;

// Wait for reply
while let Some(reply) = reply_reader.try_take()? {
println!("Got reply: {:?}", reply);
}

Capteurs multiples

use hdds::{Participant, QoS, DDS, TransportMode};

#[derive(Debug, Clone, DDS)]
struct SensorReading {
#[key]
sensor_id: u32,
value: f32,
}

let participant = Participant::builder("multi_sensor")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;

let topic = participant.topic::<SensorReading>("sensors/readings")?;
let writer = topic.writer().qos(QoS::reliable().keep_last(10)).build()?;

// Write data for multiple sensors
for sensor_id in 1..=5 {
writer.write(&SensorReading { sensor_id, value: 25.0 + sensor_id as f32 })?;
}

Prochaines étapes