数据分发服务开源实现FastDDS

数据分发服务开源实现FastDDS

fastDDS 官方文档

1. 编译安装

image-20230515132605181

2. FastDDS框架理解

以数据为中心的DCPS(发布-订阅)模型实现,用于分布式软件通信
并发多线程系统,每个DomainParticipant生成一组线程处理后台任务
FastDDS API是线程安全的
DDS中间件由应用层、DDS层、RTPS层(实时发布订阅协议)及传输层组成。传输层可以选择以太网协议栈,也可以选择共享内存。

image-20230515132703899

2.1 DDS核心概念

  1. 域 domain
  2. 主题 topic
  3. 发布者 publisher
  4. 订阅者 subscriber
image-20230515132747083 image-20230515132807725 image-20230515132838725

2.2 FastDDS架构实现

应用层:用户APP
FastDDS:部署1个或者多个域(domain)
RTPS层:传输层抽象,动态发现,实时发布-订阅协议实现,互操作性

image-20230515132909697
2.2.1 DDS层
  • 服务质量 QoS
  • Listener
2.2.2 RTPS层

支持单播与多播
服务发现
同步与异步发布模式

3. FastDDS源码理解

3.1 DDS层

3.2 RTPS层

3.3 共享内存

3.4 服务质量 QoS

服务质量QoS是用来解决网络延迟与阻塞等问题的一种技术。
reliability 可靠性
deadline 截止时间

4. 例程理解

4.1 HelloWorldExample

image-20230515132939263 image-20230515132954367 image-20230515133011686

源码:HelloWorldExample

4.1.1 接口描述与代码生成
1
2
3
4
5
struct HelloWorld
{
unsigned long index; # uint32_t in C++
string message;
};
1
fastddsgen ./HelloWorld.idl

生成以下4个文件:
HelloWorld.h
HelloWorld.cxx // 数据类型
HelloWorldPubSubTypes.h
HelloWorldPubSubTypes.cxx // 继承,payload 序列化与反序列化

4.1.2 主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* @file HelloWorld_main.cpp
*
*/
#include <limits>
#include <sstream>

#include "HelloWorldPublisher.h"
#include "HelloWorldSubscriber.h"

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastrtps/log/Log.h>

#include <optionparser.hpp>

using eprosima::fastdds::dds::Log;

namespace option = eprosima::option;

//命令行解析程序,略

int main(
int argc,
char** argv)
{
std::cout << "Starting " << std::endl;

int type = 1;
uint32_t count = 10; // 十次
uint32_t sleep = 100; // 间隔 100 ms
bool use_environment_qos = false;
/* ...... */
if (strcmp(type_name, "publisher") == 0)
{
type = 1;
}
else if (strcmp(type_name, "subscriber") == 0)
{
type = 2;
}
/* ...... */
switch (type)
{
case 1: // 调用发布者程序
{
HelloWorldPublisher mypub;
if (mypub.init(use_environment_qos))
{
mypub.run(count, sleep);
}
break;
}
case 2: // 调用订阅者程序
{
HelloWorldSubscriber mysub;
if (mysub.init(use_environment_qos))
{
mysub.run();
}
break;
}
}
Log::Reset();
return 0;
}

4.1.3 发布者与订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 初始化 *****
bool HelloWorldPublisher::init(
bool use_env)
{
// 暂时省略 QoS 设置,以下同
// step1:使用 participant factory 创建 participant
auto factory = DomainParticipantFactory::get_instance();
participant_ = factory->create_participant(0, pqos); // domain ID = 0
// step2:注册 participant
// type_(new HelloWorldPubSubType()
//REGISTER THE TYPE
type_.register_type(participant_);
// step3:创建 publisher
publisher_ = participant_->create_publisher(
pubqos,
nullptr);
// step4:创建 DataWriter
writer_ = publisher_->create_datawriter(
topic_,
wqos,
&listener_);
// step5:创建 topic
topic_ = participant_->create_topic(
"HelloWorldTopic",
"HelloWorld",
tqos);
}
// 回调函数
void HelloWorldPublisher::PubListener::on_publication_matched(
eprosima::fastdds::dds::DataWriter*,
const eprosima::fastdds::dds::PublicationMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched_ = info.total_count;
firstConnected_ = true;
std::cout << "Publisher matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched_ = info.total_count;
std::cout << "Publisher unmatched." << std::endl;
}
}
// 数据写入 *****
// 当 DataWriter 与 DataRreader匹配时,回调函数中数据成员 matched_将更新,其值为发现的
// DataReader 的数量,当发现第一个DataReader时,数据开始传输。
bool HelloWorldPublisher::publish(
bool waitForListener)
{
if (listener_.firstConnected_ || !waitForListener || listener_.matched_ > 0)
{
hello_.index(hello_.index() + 1); // idl中定义的 unsigned long index
writer_->write(&hello_);
return true;
}
return false;
}
// 创建新线程,阻塞等待
// 新线程根据 count 与 sleep 发送发消息(调用 publish)
void HelloWorldPublisher::run(
uint32_t samples,
uint32_t sleep)
{
std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep);
thread.join();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool HelloWorldSubscriber::init(
bool use_env)
{
// step1-2 与发布者一致
subscriber_ = participant_->create_subscriber(sqos, nullptr); // 创建订阅者
topic_ = participant_->create_topic( // 创建 topic
"HelloWorldTopic",
"HelloWorld",
tqos);
reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);
}
// 回调函数
void HelloWorldSubscriber::SubListener::on_subscription_matched(
DataReader*,
const SubscriptionMatchedStatus& info)
{}


// 消息接收
void HelloWorldSubscriber::SubListener::on_data_available(
DataReader* reader)
{
SampleInfo info;
if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK)
{
if (info.instance_state == ALIVE_INSTANCE_STATE)
{
samples_++;
// Print your structure data here.
std::cout << "Message " << hello_.message() << " " << hello_.index() << " RECEIVED" << std::endl;
}
}
}

4.2 HelloWorldExampleDataSharing

image-20230515133053716
1
2
3
4
5
6
7
8
9
10
11
12
/**
* @file HelloWorldPublisher.cpp
*
*/
// 对比上一个例子,新增了针对writer的 QoS 的设置
// CREATE THE WRITER
DataWriterQos wqos = DATAWRITER_QOS_DEFAULT;
wqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS;
wqos.history().depth = 10;
wqos.data_sharing().automatic();
writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);

1
2
3
4
5
6
7
8
9
10
11
/**
* @file HelloWorldSubscriber.cpp
*
*/
// 对比上一个例子,新增了针对reader的 QoS 的设置
// CREATE THE READER
DataReaderQos rqos = DATAREADER_QOS_DEFAULT;
rqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS;
rqos.durability().kind = VOLATILE_DURABILITY_QOS;
rqos.data_sharing().automatic();
reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);

4.3 HelloWorldExampleSharedMem

4.3.0 关于共享内存

本地使用共享内存传输可以获得比以太网协议栈传输更优的性能,因为共享内存可以减少内存副本的数量以及OS开销,还可以发送更大的消息,消息大小仅取决于内存大小。

image-20230515133132146 image-20230515133159772

共享内存传输序列图

image-20230515133232198 image-20230515133249086

共享内存文件存放在 /dev/shm 下

4.3.1 接口描述与数据生成
1
2
3
4
5
6
struct HelloWorld
{
unsigned long index;
string message;
char data[1024*1024];
};
4.3.2 发布者与订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* @file HelloWorldPublisher.cpp
*
*/
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
// ......

bool HelloWorldPublisher::init()
{
hello_ = std::make_shared<HelloWorld>();
hello_->index(0);
hello_->message("HelloWorld");
// DomainParticipant QoS 设置
//CREATE THE PARTICIPANT
DomainParticipantQos pqos;
pqos.wire_protocol().builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
pqos.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
pqos.wire_protocol().builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
pqos.wire_protocol().builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
pqos.name("Participant_pub");

// Explicit configuration of SharedMem transport
// 显式配置 禁用 UDP v4
pqos.transport().use_builtin_transports = false;

auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
// 设置共享内存段的大小
shm_transport->segment_size(2 * 1024 * 1024);
pqos.transport().user_transports.push_back(shm_transport);

participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

// 注册,创建topic ......

//CREATE THE DATAWRITER
DataWriterQos wqos;
wqos.history().kind = KEEP_LAST_HISTORY_QOS;
wqos.history().depth = 30;
wqos.resource_limits().max_samples = 50;
wqos.resource_limits().allocated_samples = 20;
wqos.reliable_writer_qos().times.heartbeatPeriod.seconds = 2;
wqos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 200 * 1000 * 1000;
wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
wqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;

writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);
}


bool HelloWorldPublisher::publish(
bool waitForListener)
{
if (listener_.first_connected_ || !waitForListener || listener_.matched_ > 0)
{
hello_->index(hello_->index() + 1);
size_t data_size = hello_->data().size();
std::string s = "BigData" + std::to_string(hello_->index() % 10);
strcpy(&hello_->data()[data_size - s.length() - 1], s.c_str());

writer_->write(hello_.get());

return true;
}
return false;
}
1
2
3
4
/**
* @file HelloWorldSubscriber.cpp
*
*/

4.4 HelloWorldExampleTCP

image-20230515133317686

数据分发服务开源实现FastDDS
http://ziyangfu.github.io/2023/05/15/数据分发服务开源实现FastDDS/
作者
FZY
发布于
2023年5月15日
许可协议