Skip to main content

Advanced Transport Features

Network-level configuration for HDDS: DSCP marking, filtering, TSN, low-bandwidth, TCP, and QUIC transport.

DSCP (Differentiated Services)

DSCP marks IP packets for QoS routing by network equipment.

DSCP Classes

ClassValueTOSUse Case
BestEffort (CS0)00Default traffic
Af111040High-throughput bulk
Af211872Low-latency data
Af3126104Streaming media
Af4134136Video/telemetry
Ef46184Real-time, safety-critical
Cs648192Network control
Cs756224Network control (highest)

DscpConfig

use hdds::transport::dscp::{DscpClass, DscpConfig, set_socket_dscp};

// Default config
let config = DscpConfig::default();
// discovery: AF21, user_data: AF21, metatraffic: AF31

// Presets
let realtime = DscpConfig::realtime(); // EF for all
let high = DscpConfig::high_priority(); // AF41 for data
let best = DscpConfig::best_effort(); // CS0 for all

// Custom config
let config = DscpConfig {
discovery: DscpClass::Af21,
user_data: DscpClass::Ef,
metatraffic: DscpClass::Af31,
};

Environment Variable

# Single value (apply to all)
export HDDS_DSCP=46 # EF for all traffic

# Three values: discovery,user_data,metatraffic
export HDDS_DSCP=18,46,26 # AF21, EF, AF31

Socket API

use hdds::transport::dscp::{set_socket_dscp, get_socket_dscp, DscpClass};
use std::net::UdpSocket;

let socket = UdpSocket::bind("0.0.0.0:0")?;

// Set DSCP
set_socket_dscp(&socket, DscpClass::Ef)?;

// Read back
let dscp = get_socket_dscp(&socket);
assert_eq!(dscp, Some(DscpClass::Ef));

Network Filtering

Control which interfaces and source IPs are used for DDS communication.

NetworkFilter Builder

use hdds::transport::filter::{NetworkFilter, InterfaceFilter, SourceFilter};
use std::net::Ipv4Addr;

// Allow only eth0 interface, accept from 10.0.0.0/8
let filter = NetworkFilter::builder()
.interface_by_name("eth0")
.interface_by_cidr("192.168.1.0/24")
.allow_source_cidr("10.0.0.0/8")
.deny_source_cidr("10.0.0.99/32") // Block specific host
.build();

// Check interface
assert!(filter.interfaces.allows_name("eth0"));
assert!(filter.interfaces.allows_ip(Ipv4Addr::new(192, 168, 1, 100)));

// Check source (firewall-style: allow then deny)
assert!(filter.sources.allows(Ipv4Addr::new(10, 1, 2, 3))); // Allowed
assert!(!filter.sources.allows(Ipv4Addr::new(10, 0, 0, 99))); // Denied

Interface Filter

use hdds::transport::filter::InterfaceFilter;

// Allow all (default)
let filter = InterfaceFilter::allow_all();

// Only specific names
let filter = InterfaceFilter::only_names(&["eth0", "eth1"]);

// Only specific CIDRs
let filter = InterfaceFilter::only_cidrs(&[
"10.128.0.0/16".parse().unwrap(),
]);

// Check
assert!(filter.allows_name("eth0"));
assert!(filter.allows_interface("any", Ipv4Addr::new(10, 128, 1, 1)));

Source Filter

Firewall-style allow/deny semantics:

  1. If allow is empty, all sources pass
  2. If allow is non-empty, source must match at least one entry
  3. If source matches any deny entry, it's rejected (deny overrides allow)
use hdds::transport::filter::SourceFilter;

// Allow 10.0.0.0/8, deny 10.128.0.0/16
let filter = SourceFilter {
allow: vec!["10.0.0.0/8".parse().unwrap()],
deny: vec!["10.128.0.0/16".parse().unwrap()],
};

assert!(filter.allows(Ipv4Addr::new(10, 0, 0, 1))); // In allow
assert!(!filter.allows(Ipv4Addr::new(10, 128, 0, 1))); // In deny (blocked)
assert!(!filter.allows(Ipv4Addr::new(192, 168, 1, 1))); // Not in allow

Environment Variables

# Interface filtering
export HDDS_INTERFACE_ALLOW="eth0,eth1,10.0.0.0/8"

# Source filtering
export HDDS_SOURCE_ALLOW="10.0.0.0/8,192.168.0.0/16"
export HDDS_SOURCE_DENY="10.0.0.99/32"

Time-Sensitive Networking (TSN)

IEEE 802.1 TSN support for deterministic Ethernet communication (Linux only).

Features

  • Priority tagging - SO_PRIORITY → traffic classes (mqprio) + VLAN PCP
  • Scheduled TX - SO_TXTIME + SCM_TXTIME for "send-at-time" (LaunchTime)
  • Capability detection - Runtime probe of TSN features

TsnConfig

use hdds::transport::tsn::{TsnConfig, TsnEnforcement, TxTimePolicy};
use std::time::Duration;

// Enable TSN with priority tagging
let config = TsnConfig::new()
.with_priority(6) // High priority (PCP 0-7)
.strict(); // Fail if TSN not available

// With scheduled TX
let config = TsnConfig::new()
.with_priority(6)
.with_txtime(TxTimePolicy::Mandatory)
.with_lead_time(Duration::from_micros(500));

// Presets
let high = TsnConfig::high_priority(); // PCP 6
let normal = TsnConfig::normal_priority(); // PCP 4
let low = TsnConfig::low_priority(); // PCP 2

Clock Sources

use hdds::transport::tsn::TsnClockId;

TsnClockId::Tai // CLOCK_TAI (PTP-synced, recommended)
TsnClockId::Monotonic // CLOCK_MONOTONIC (dev/test)
TsnClockId::Realtime // CLOCK_REALTIME (avoid - leap seconds)
TsnClockId::Phc(path) // Direct PHC ("/dev/ptp0")

TX Time Policies

PolicyDescription
DisabledStandard sendto(), no txtime
OpportunisticUse SO_TXTIME if available, else fallback
MandatoryRequire SO_TXTIME, error if unavailable

Enforcement Modes

ModeDescription
BestEffortDegrade silently if TSN unavailable
StrictError if prerequisites missing

Probe TSN Capabilities

use hdds::transport::tsn::TsnProbe;

let caps = TsnProbe::probe("eth0")?;
println!("SO_TXTIME: {:?}", caps.so_txtime);
println!("ETF qdisc: {:?}", caps.etf_qdisc);
println!("HW timestamping: {:?}", caps.hw_timestamp);

Platform Support

PlatformStatus
LinuxFull support (SO_PRIORITY, SO_TXTIME, ETF qdisc)
OthersStub backend (returns Unsupported errors)

Low Bandwidth Transport (LowBW)

Optimized transport for constrained links (9.6 kbps - 2 Mbps).

Target Environments

  • Throughput: 9.6 kbps → 2 Mbps
  • Latency: 100 ms → 2 s RTT
  • Loss: 10-30% packet loss
  • Use cases: Satellite, radio, mesh networks

Wire Protocol

Frame = sync(0xA5) | version | flags | frame_len(varint) | session_id | frame_seq | records* | crc16?
Record = stream_id | rflags | msg_seq(varint) | len(varint) | payload

Priority Levels

PriorityDescription
P0Critical/reliable - immediate flush, retransmit
P1Important - batched, no retransmit
P2Telemetry - batched, dropped on congestion

Features

  • Minimal overhead - 3-6 bytes per record, 6-10 bytes per frame
  • Selective reliability - P0 = reliable, P2 = best-effort
  • Batching - Token bucket rate limiting
  • Delta encoding - Efficient telemetry updates
  • LZ4 compression - Optional (feature lowbw-lz4)
  • CRC-16 protection - End-to-end integrity

Configuration

use hdds::transport::lowbw::{LowBwConfig, StreamConfig, Priority};

let config = LowBwConfig {
session_timeout_ms: 30_000,
max_frame_size: 256,
enable_crc: true,
..Default::default()
};

let stream = StreamConfig {
priority: Priority::P0, // Reliable
..Default::default()
};
use hdds::transport::lowbw::{UdpLink, SimLink, SimLinkConfig, LoopbackLink};

// UDP link (production)
let link = UdpLink::bind("0.0.0.0:5000")?;

// Simulated link (testing with loss/delay)
let sim_config = SimLinkConfig {
loss_rate: 0.1, // 10% packet loss
delay_ms: 100, // 100ms latency
bandwidth_bps: 9600, // 9.6 kbps
..Default::default()
};
let link = SimLink::new(sim_config);

// Loopback (testing)
let link = LoopbackLink::new();

TCP Transport

TCP-based transport for firewall-restricted environments.

Not Interoperable

RTPS over TCP is not standardized and is HDDS-to-HDDS only. Each DDS vendor uses different framing. Use UDP for cross-vendor interop.

Use Cases

  • Corporate firewalls with TCP-only policies
  • Cloud/Kubernetes without multicast
  • NAT traversal where UDP hole punching fails
  • WAN connections with high packet loss

Configuration

use hdds::transport::tcp::{TcpConfig, TcpRole, TransportPreference};

let config = TcpConfig {
enabled: true,
listen_port: 7410,
role: TcpRole::Auto,
..Default::default()
};

Transport Preferences

PreferenceDescription
UdpOnlyUDP for discovery and data
TcpOnlyTCP for everything (no multicast)
UdpDiscoveryTcpDataUDP for SPDP/SEDP, TCP for user data
HybridUDP primary, TCP fallback

Wire Format

TCP is a stream protocol, so RTPS messages are length-prefixed:

+----------------+-------------------+
| Length (4B BE) | RTPS Message |
+----------------+-------------------+

TLS Support

Requires feature flag tcp-tls:

[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["tcp-tls"] }
use hdds::transport::tcp::{TlsConfig, TlsVersion};

let tls = TlsConfig::builder()
.cert_file("/path/to/cert.pem")
.key_file("/path/to/key.pem")
.ca_file("/path/to/ca.pem")
.min_version(TlsVersion::Tls12)
.build()?;

QUIC Transport

QUIC-based transport for modern, secure, and NAT-friendly communication.

Feature Flag

QUIC transport requires the quic feature:

[dependencies]
hdds = { path = "../hdds/crates/hdds", features = ["quic"] }

Why QUIC?

AdvantageDescription
NAT TraversalUDP-based, works through most firewalls and NATs
0-RTT ConnectionsInstant reconnection to known peers
Connection MigrationSeamless IP address changes without disconnection
Built-in TLS 1.3Mandatory encryption with auto-generated certificates
MultiplexingMultiple streams over single connection

Use Cases

  • Mobile/roaming devices (IP changes frequently)
  • Cloud deployments behind NAT
  • IoT devices on cellular networks
  • Cross-datacenter communication
  • Firewall-restricted environments (UDP port 443)

Configuration

use hdds::transport::quic::{QuicConfig, QuicTransport};

let config = QuicConfig::builder()
.bind_addr("0.0.0.0:7400".parse()?)
.enable_0rtt(true) // Fast reconnection
.idle_timeout_ms(30_000) // Connection timeout
.max_concurrent_streams(100) // Per connection
.build();

let transport = QuicTransport::new(config).await?;

QuicConfig Options

OptionDefaultDescription
bind_addr0.0.0.0:7400Local bind address
enable_0rttfalseEnable 0-RTT for known peers
idle_timeout_ms30000Connection idle timeout
max_concurrent_streams100Streams per connection
cert_pathAutoCustom TLS certificate path
key_pathAutoCustom TLS key path

Connecting and Sending

use hdds::transport::quic::{QuicConfig, QuicTransport};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = QuicConfig::builder()
.bind_addr("0.0.0.0:7400".parse()?)
.enable_0rtt(true)
.build();

let transport = QuicTransport::new(config).await?;

// Connect to remote peer
let remote: SocketAddr = "192.168.1.100:7400".parse()?;
transport.connect(remote).await?;

// Send RTPS message
let rtps_message = build_rtps_message();
transport.send(&rtps_message, &remote).await?;

// Broadcast to all connected peers
transport.broadcast(&rtps_message).await?;

Ok(())
}

Connection Statistics

// Get connection stats
let stats = transport.connection_stats(&remote)?;

println!("Bytes sent: {}", stats.bytes_sent);
println!("Bytes received: {}", stats.bytes_received);
println!("RTT: {:?}", stats.rtt);
println!("Connection migrations: {}", stats.migrations);
StatDescription
bytes_sentTotal bytes transmitted
bytes_receivedTotal bytes received
rttCurrent round-trip time estimate
migrationsNumber of IP address changes handled

0-RTT Resumption

When enable_0rtt(true) is set, QUIC caches session tickets for known peers:

let config = QuicConfig::builder()
.bind_addr("0.0.0.0:7400".parse()?)
.enable_0rtt(true)
.build();

// First connection: full handshake (1-RTT)
transport.connect(peer_addr).await?;

// ... connection closes ...

// Subsequent connection: 0-RTT (instant)
transport.connect(peer_addr).await?;
0-RTT Security

0-RTT data can be replayed by attackers. HDDS only uses 0-RTT for idempotent discovery messages, not user data.

Connection Migration

QUIC automatically handles IP address changes:

// Device moves from WiFi to cellular
// IP changes from 192.168.1.50 to 10.0.0.50
// Connection continues without interruption!

let stats = transport.connection_stats(&remote)?;
println!("Migrations: {}", stats.migrations); // Shows migration count

TLS Certificates

By default, QUIC generates self-signed certificates automatically. For production:

let config = QuicConfig::builder()
.bind_addr("0.0.0.0:7400".parse()?)
.cert_path("/path/to/cert.pem")
.key_path("/path/to/key.pem")
.ca_path("/path/to/ca.pem") // For peer verification
.build();

QUIC vs TCP vs UDP

FeatureUDPTCPQUIC
NAT traversalGoodPoorExcellent
Firewall friendlyVariesGoodGood (UDP 443)
Connection migrationNoNoYes
Built-in encryptionNoOptionalMandatory
0-RTT reconnectionN/ANoYes
Head-of-line blockingNoYesNo
MulticastYesNoNo

When to Use QUIC

Good for:

  • Mobile/IoT with changing networks
  • NAT/firewall traversal
  • Secure point-to-point links
  • Cloud-to-cloud communication

Not for:

  • Multicast discovery (use UDP)
  • Cross-vendor interop (QUIC not standardized for DDS)
  • Ultra-low latency (< 100µs)

Hybrid Configuration

Combine UDP for discovery with QUIC for data:

use hdds::{Participant, TransportMode};
use hdds::transport::quic::QuicConfig;

let quic_config = QuicConfig::builder()
.bind_addr("0.0.0.0:7401".parse()?)
.enable_0rtt(true)
.build();

let participant = Participant::builder("app")
.domain_id(0)
.with_transport(TransportMode::UdpMulticast) // Discovery
.with_quic_transport(quic_config) // User data
.build()?;

Message Fragmentation (DATA_FRAG)

HDDS automatically fragments large messages that exceed the unfragmented size threshold.

How It Works

Payload ≤ 8KB  →  Single DATA packet
Payload > 8KB → Multiple DATA_FRAG packets (1KB each)

The writer splits large payloads into 1KB fragments, each sent as a separate DATA_FRAG submessage. The reader reassembles fragments using a fragment buffer before delivering the complete sample.

Fragmentation Parameters

ParameterDefaultDescription
DEFAULT_FRAGMENT_SIZE1024 bytesSize of each fragment
DEFAULT_MAX_UNFRAGMENTED_SIZE8192 bytesThreshold for fragmentation
Fragment buffer limit~64KBMaximum reassembly size

Performance

Measured on a loaded system:

PayloadFragmentsLatencyStatus
64 B1 (DATA)~1.1 msSupported
16 KB16~1.1 msSupported
64 KB64~2.3 msSupported
128 KB128N/ABuffer limit

Wire Format

DATA_FRAG submessage structure (RTPS 2.5 spec):

DATA_FRAG Header:
├── extraFlags (2 bytes)
├── octetsToInlineQos (2 bytes)
├── readerId (4 bytes)
├── writerId (4 bytes)
├── writerSN (8 bytes)
├── fragmentStartingNum (4 bytes) ← Fragment index (1-based)
├── fragmentsInSubmessage (2 bytes) ← Usually 1
├── fragmentSize (2 bytes) ← 1024 bytes
├── sampleSize (4 bytes) ← Total payload size
└── payload (fragmentSize bytes)

Use Cases

DATA_FRAG is essential for:

  • Point clouds - LiDAR data (100KB-10MB per scan)
  • Images - Camera frames (100KB-5MB)
  • Maps - Occupancy grids, costmaps
  • Large configurations - System parameters, calibration data

Interoperability

DATA_FRAG is part of the RTPS 2.5 specification and works with:

  • FastDDS
  • RTI Connext DDS
  • CycloneDDS
  • Other RTPS-compliant implementations
Fragment Size

HDDS uses 1KB fragments by default. Some vendors use larger fragments (e.g., RTI uses 1400 bytes to fit in typical MTU). Fragment size does not need to match for interop - the receiver reassembles based on fragment metadata.

Limitations

LimitationDescription
Buffer sizeDefault ~64KB max reassembly (configurable)
UDP MTUFragments still limited by network MTU (~1500 bytes Ethernet)
OrderingAll fragments must arrive before timeout for reassembly
MemoryFragment buffer allocates per-writer reassembly state

Fragment Retransmission (NACK_FRAG)

NACK_FRAG enables reliable delivery of fragmented messages by allowing readers to request retransmission of specific missing fragments.

How It Works

Writer sends:     FRAG_1, FRAG_2, [lost], FRAG_4, FRAG_5
Reader detects: Fragment 3 missing
Reader sends: NACK_FRAG(seq=42, bitmap=[0,0,1,0,0])
Writer resends: FRAG_3
Reader completes: Reassembly done → deliver sample

When a reader detects gaps in the fragment sequence (via timeout or out-of-order arrival), it sends a NACK_FRAG submessage identifying exactly which fragments are missing using a bitmap.

Wire Format

NACK_FRAG submessage structure (RTPS 2.5 spec):

NACK_FRAG Header:
├── readerId (4 bytes) ← EntityId of requesting reader
├── writerId (4 bytes) ← EntityId of target writer
├── writerSN (8 bytes) ← Sequence number being NAK'd
├── fragmentNumberState:
│ ├── bitmapBase (4 bytes) ← First fragment number in bitmap
│ ├── numBits (4 bytes) ← Number of bits in bitmap
│ └── bitmap (N × 4 bytes) ← 1 = missing, 0 = received
└── count (4 bytes) ← Anti-replay counter

Fragment Detection APIs

The fragment buffer provides APIs to detect missing fragments:

// Get list of missing fragments for a sequence
let missing = fragment_buffer.get_missing_fragments(writer_guid, seq_num);
// Returns: Vec<u32> e.g., [3, 7, 8] for fragments 3, 7, 8 missing

// Get sequences with incomplete fragments (stale/timeout)
let stale = fragment_buffer.get_stale_sequences(timeout_duration);
// Returns: Vec<(WriterGuid, SequenceNumber)>

HEARTBEAT_FRAG (submessage kind 0x13) is sent by writers to advertise the range of available fragments, allowing readers to detect gaps proactively:

HEARTBEAT_FRAG Header:
├── readerId (4 bytes)
├── writerId (4 bytes)
├── writerSN (8 bytes)
├── lastFragmentNum (4 bytes) ← Highest fragment sent
└── count (4 bytes)

Reliability Flow

Complete reliable fragmented delivery:

Writer:
1. Sends 64 DATA_FRAG (seq=1, frag=1..64)
2. Sends HEARTBEAT_FRAG (seq=1, lastFrag=64, count=N)

Reader (if fragments missing, e.g., 3 and 7 lost):
3. Receives HEARTBEAT_FRAG → checks FragmentBuffer
4. Detects fragments [3, 7] missing → sends NACK_FRAG

Writer:
5. Receives NACK_FRAG → retransmits fragments 3 and 7

Reader:
6. Receives missing fragments → completes reassembly → delivers sample

Gap Detection Mechanisms

HDDS uses two complementary mechanisms to detect missing fragments:

MechanismTriggerLatencyUse Case
HEARTBEAT_FRAGWriter announces completion< 1 msFast recovery on lossy networks
Stale detectionTimeout on incomplete reassembly100 msFallback when HEARTBEAT_FRAG lost

HEARTBEAT_FRAG (proactive): Writer sends after all DATA_FRAGs. Reader immediately detects gaps and sends NACK_FRAG. Recovery in < 1ms.

Stale detection (fallback): If no new fragments arrive for 100ms, the router checks for incomplete sequences and sends NACK_FRAG. Handles cases where HEARTBEAT_FRAG itself is lost.

Submessage IDs

SubmessageKindDescription
DATA_FRAG0x16Fragmented data payload
NACK_FRAG0x12Request missing fragments
HEARTBEAT_FRAG0x13Advertise available fragments

Configuration

NACK_FRAG behavior is controlled by reliability QoS:

use hdds::QoS;
use std::time::Duration;

let qos = QoS::reliable()
.max_blocking_time(Duration::from_secs(5)) // Writer waits for ACKs
.keep_last(10); // History depth

// BestEffort = no NACK_FRAG (fragments may be lost)
// Reliable = NACK_FRAG enabled (fragments retransmitted)

Testing

On loopback networks (0% loss), NACK_FRAG is never triggered as all fragments arrive intact.

Cross-Machine Test Results

Tested with 64KB payloads across real network with simulated 10% packet loss:

MetricResult
Payload size64 KB (64 fragments)
NetworkCross-machine (local → remote)
Packet loss10% (simulated)
Final message loss0%
Latency7.1 ms
Throughput96 msg/s
Total retransmissions91 fragments

NACK_FRAG in Action

[NACK_FRAG] missing_frags=[1..54] → retransmitted 54/54
[NACK_FRAG] missing_frags=[14, 21, 33] → retransmitted 3/3
[NACK_FRAG] missing_frags=[21] → retransmitted 1/1

The mechanism handles both:

  • Burst losses (54 consecutive fragments lost)
  • Scattered losses (random fragments 14, 21, 33)
  • Single fragment recovery

Simulating Packet Loss

# Simulate 10% packet loss with tc netem
sudo tc qdisc add dev eth0 root netem loss 10%

# Run cross-machine test
cargo run --release --example frag_test -- --remote 192.168.1.x

# Remove the netem rule
sudo tc qdisc del dev eth0 root

On real WiFi/WAN networks with inherent packet loss, NACK_FRAG activates automatically.

Performance Characteristics

Reliable delivery performance varies by message size and load:

PayloadMessagesLossThroughputStatus
64 bytes10000%274 msg/s✅ 100% reliable
65 KB200%146 msg/s✅ 100% reliable
65 KB5012%7 msg/s⚠️ Degraded
65 KB10061%1 msg/s⚠️ High load

Guidelines:

  • Small messages (under 8KB): 100% reliable at any rate
  • Large messages (fragmented): 100% reliable up to ~20 msg burst
  • High-volume large bursts: Use rate limiting or flow control
High Throughput Large Messages

For sustained high-throughput large message delivery:

  • Add delays between writes (e.g., 10-50ms)
  • Use QoS::reliable().max_blocking_time() to allow writer to pace
  • Consider breaking very large payloads into logical chunks at application level
Industry Standard

These limits are normal for DDS middleware. FastDDS, RTI Connext, and CycloneDDS have similar characteristics under extreme load. The RTPS protocol prioritizes correctness over throughput under congestion.


Environment Variables Summary

VariableFeatureExample
HDDS_DSCPDSCP marking46 or 18,46,26
HDDS_INTERFACE_ALLOWInterface filtereth0,10.0.0.0/8
HDDS_SOURCE_ALLOWSource whitelist10.0.0.0/8
HDDS_SOURCE_DENYSource blacklist10.0.0.99/32