Publishers et Subscribers
Dans HDDS, les DataWriters publient des données et les DataReaders s'abonnent aux données. Le Participant crée les Topics, et les Topics créent les Writers et Readers.
Vue d'ensemble
Participant
+-- Topic<SensorData> ("SensorTopic")
| +-- DataWriter (écrit des échantillons)
| +-- DataReader (lit des échantillons)
|
+-- Topic<Command> ("CommandTopic")
+-- DataWriter (écrit des échantillons)
+-- DataReader (lit des échantillons)
Créer des Writers et Readers
Patron de base
use hdds::{Participant, QoS, TransportMode};
let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
// Create topic
let topic = participant.topic::<SensorData>("SensorTopic")?;
// Create writer
let writer = topic.writer().build()?;
// Create reader
let reader = topic.reader().build()?;
Avec QoS
use hdds::{Participant, QoS, TransportMode};
let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
let topic = participant.topic::<SensorData>("SensorTopic")?;
// Writer with reliable QoS
let writer = topic
.writer()
.qos(QoS::reliable().keep_last(100).transient_local())
.build()?;
// Reader with reliable QoS
let reader = topic
.reader()
.qos(QoS::reliable().keep_last(100))
.build()?;
DataWriter
Les DataWriters sont les endpoints qui publient des données.
Écrire des données
use hdds::{Participant, QoS, DDS, TransportMode};
#[derive(Debug, Clone, DDS)]
struct SensorData {
sensor_id: u32,
value: f32,
timestamp: u64,
}
let participant = Participant::builder("sensor_publisher")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
let topic = participant.topic::<SensorData>("SensorTopic")?;
let writer = topic.writer().qos(QoS::reliable()).build()?;
// Basic write
let sample = SensorData {
sensor_id: 1,
value: 42.5,
timestamp: 1234567890,
};
writer.write(&sample)?;
Statut du Writer
// Check matched readers
let matched = writer.matched_subscriptions();
println!("Matched {} readers", matched.len());
DataReader
Les DataReaders sont les endpoints qui reçoivent des données.
Lire des données
use hdds::{Participant, QoS, TransportMode};
let participant = Participant::builder("sensor_subscriber")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
let topic = participant.topic::<SensorData>("SensorTopic")?;
let reader = topic.reader().qos(QoS::reliable()).build()?;
// Take samples (removes from cache)
while let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}
Bouclé de polling
use std::time::Duration;
use std::thread;
loop {
// Try to take available samples
while let Some(sample) = reader.try_take()? {
println!("Received: sensor={}, value={}", sample.sensor_id, sample.value);
}
// Small delay to avoid busy-waiting
thread::sleep(Duration::from_millis(100));
}
Alternative : Création directe de Writer/Reader
Vous pouvez aussi créer des Writers et Readers directement depuis le Participant :
use hdds::{Participant, QoS, TransportMode};
let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
// Create writer directly with topic name and QoS
let writer = participant
.create_writer::<SensorData>("SensorTopic", QoS::reliable())?;
// Create reader directly with topic name and QoS
let reader = participant
.create_reader::<SensorData>("SensorTopic", QoS::reliable())?;
Communication intra-processus
Pour la communication dans le même processus (zéro-copy) :
use hdds::{Participant, QoS, TransportMode};
let participant = Participant::builder("intra_process_app")
.domain_id(0)
.with_transport(TransportMode::IntraProcess)
.build()?;
let topic = participant.topic::<SensorData>("internal/data")?;
let writer = topic.writer().qos(QoS::reliable()).build()?;
let reader = topic.reader().qos(QoS::reliable()).build()?;
// Bind reader to writer for in-process delivery
reader.bind_to_writer(writer.merger());
// Data is now shared without serialization
writer.write(&sample)?;
while let Some(data) = reader.try_take()? {
println!("Received: {:?}", data);
}
WaitSet pour la lecture evenementielle
Attendre les données efficacement :
use hdds::{Participant, QoS, WaitSet, TransportMode};
use std::time::Duration;
let participant = Participant::builder("waitset_example")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
let topic = participant.topic::<SensorData>("SensorTopic")?;
let reader = topic.reader().qos(QoS::reliable()).build()?;
// Get status condition and create waitset
let condition = reader.get_status_condition();
let mut waitset = WaitSet::new();
waitset.attach(&condition)?;
loop {
// Wait for data (blocks until data available or timeout)
let _active = waitset.wait(Some(Duration::from_secs(5)))?;
// Take all available samples
while let Some(sample) = reader.try_take()? {
println!("Received: {:?}", sample);
}
}
Bonnes pratiques
- Faire correspondre les QoS entre Writers et Readers - Assurer la compatibilité
- Utiliser les presets QoS appropriées -
QoS::reliable()pour la livraison garantie,QoS::best_effort()pour la vitesse - Utiliser try_take() dans les boucles - Patron de lecture non-bloquant
- Envisager le WaitSet pour l'efficacite - Éviter l'attente active
Patrons courants
Requête-Réponse
use hdds::{Participant, QoS, DDS, TransportMode};
#[derive(Debug, Clone, DDS)]
struct Request {
request_id: u32,
query: String,
}
#[derive(Debug, Clone, DDS)]
struct Reply {
request_id: u32,
result: String,
}
let participant = Participant::builder("request_reply")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
// Client side
let request_topic = participant.topic::<Request>("service/request")?;
let reply_topic = participant.topic::<Reply>("service/reply")?;
let request_writer = request_topic.writer().qos(QoS::reliable()).build()?;
let reply_reader = reply_topic.reader().qos(QoS::reliable()).build()?;
// Send request
request_writer.write(&Request { request_id: 1, query: "hello".into() })?;
// Wait for reply
while let Some(reply) = reply_reader.try_take()? {
println!("Got reply: {:?}", reply);
}
Capteurs multiples
use hdds::{Participant, QoS, DDS, TransportMode};
#[derive(Debug, Clone, DDS)]
struct SensorReading {
#[key]
sensor_id: u32,
value: f32,
}
let participant = Participant::builder("multi_sensor")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast)
.build()?;
let topic = participant.topic::<SensorReading>("sensors/readings")?;
let writer = topic.writer().qos(QoS::reliable().keep_last(10)).build()?;
// Write data for multiple sensors
for sensor_id in 1..=5 {
writer.write(&SensorReading { sensor_id, value: 25.0 + sensor_id as f32 })?;
}
Prochaines étapes
- Vue d'ensemble QoS - Politiques de qualité de service
- Discovery - Comment les endpoints se trouvent
- Exemple Reliable Delivery - Livraison garantie