数据分发服务开源实现FastDDS
fastDDS 官方文档
1. 编译安装
2. FastDDS框架理解
以数据为中心的DCPS(发布-订阅)模型实现,用于分布式软件通信。
并发多线程系统,每个DomainParticipant生成一组线程处理后台任务
FastDDS API是线程安全的
DDS中间件由应用层、DDS层、RTPS层(实时发布订阅协议)及传输层组成。传输层可以选择以太网协议栈,也可以选择共享内存。
2.1 DDS核心概念
- 域 domain
- 主题 topic
- 发布者 publisher
- 订阅者 subscriber
2.2 FastDDS架构实现
应用层:用户APP
FastDDS:部署1个或者多个域(domain)
RTPS层:传输层抽象,动态发现,实时发布-订阅协议实现,互操作性
2.2.1 DDS层
2.2.2 RTPS层
支持单播与多播
服务发现
同步与异步发布模式
3. FastDDS源码理解
3.1 DDS层
3.2 RTPS层
3.3 共享内存
3.4 服务质量 QoS
服务质量QoS是用来解决网络延迟与阻塞等问题的一种技术。
reliability 可靠性
deadline 截止时间
4. 例程理解
4.1 HelloWorldExample
源码:HelloWorldExample
4.1.1 接口描述与代码生成
1 2 3 4 5
| struct HelloWorld { unsigned long index; string message; };
|
1
| fastddsgen ./HelloWorld.idl
|
生成以下4个文件:
HelloWorld.h
HelloWorld.cxx // 数据类型
HelloWorldPubSubTypes.h
HelloWorldPubSubTypes.cxx // 继承,payload 序列化与反序列化
4.1.2 主程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
#include <limits> #include <sstream>
#include "HelloWorldPublisher.h" #include "HelloWorldSubscriber.h"
#include <fastdds/dds/domain/DomainParticipantFactory.hpp> #include <fastrtps/log/Log.h>
#include <optionparser.hpp>
using eprosima::fastdds::dds::Log;
namespace option = eprosima::option;
int main( int argc, char** argv) { std::cout << "Starting " << std::endl;
int type = 1; uint32_t count = 10; uint32_t sleep = 100; bool use_environment_qos = false;
if (strcmp(type_name, "publisher") == 0) { type = 1; } else if (strcmp(type_name, "subscriber") == 0) { type = 2; }
switch (type) { case 1: { HelloWorldPublisher mypub; if (mypub.init(use_environment_qos)) { mypub.run(count, sleep); } break; } case 2: { HelloWorldSubscriber mysub; if (mysub.init(use_environment_qos)) { mysub.run(); } break; } } Log::Reset(); return 0; }
|
4.1.3 发布者与订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| bool HelloWorldPublisher::init( bool use_env) { auto factory = DomainParticipantFactory::get_instance(); participant_ = factory->create_participant(0, pqos); type_.register_type(participant_); publisher_ = participant_->create_publisher( pubqos, nullptr); writer_ = publisher_->create_datawriter( topic_, wqos, &listener_); topic_ = participant_->create_topic( "HelloWorldTopic", "HelloWorld", tqos); }
void HelloWorldPublisher::PubListener::on_publication_matched( eprosima::fastdds::dds::DataWriter*, const eprosima::fastdds::dds::PublicationMatchedStatus& info) { if (info.current_count_change == 1) { matched_ = info.total_count; firstConnected_ = true; std::cout << "Publisher matched." << std::endl; } else if (info.current_count_change == -1) { matched_ = info.total_count; std::cout << "Publisher unmatched." << std::endl; } }
bool HelloWorldPublisher::publish( bool waitForListener) { if (listener_.firstConnected_ || !waitForListener || listener_.matched_ > 0) { hello_.index(hello_.index() + 1); writer_->write(&hello_); return true; } return false; }
void HelloWorldPublisher::run( uint32_t samples, uint32_t sleep) { std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep); thread.join(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| bool HelloWorldSubscriber::init( bool use_env) { subscriber_ = participant_->create_subscriber(sqos, nullptr); topic_ = participant_->create_topic( "HelloWorldTopic", "HelloWorld", tqos); reader_ = subscriber_->create_datareader(topic_, rqos, &listener_); }
void HelloWorldSubscriber::SubListener::on_subscription_matched( DataReader*, const SubscriptionMatchedStatus& info) {}
void HelloWorldSubscriber::SubListener::on_data_available( DataReader* reader) { SampleInfo info; if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK) { if (info.instance_state == ALIVE_INSTANCE_STATE) { samples_++; std::cout << "Message " << hello_.message() << " " << hello_.index() << " RECEIVED" << std::endl; } } }
|
4.2 HelloWorldExampleDataSharing
1 2 3 4 5 6 7 8 9 10 11 12
|
DataWriterQos wqos = DATAWRITER_QOS_DEFAULT; wqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; wqos.history().depth = 10; wqos.data_sharing().automatic(); writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);
|
1 2 3 4 5 6 7 8 9 10 11
|
DataReaderQos rqos = DATAREADER_QOS_DEFAULT; rqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; rqos.durability().kind = VOLATILE_DURABILITY_QOS; rqos.data_sharing().automatic(); reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);
|
4.3 HelloWorldExampleSharedMem
4.3.0 关于共享内存
本地使用共享内存传输可以获得比以太网协议栈传输更优的性能,因为共享内存可以减少内存副本的数量以及OS开销,还可以发送更大的消息,消息大小仅取决于内存大小。
共享内存传输序列图
共享内存文件存放在 /dev/shm 下
4.3.1 接口描述与数据生成
1 2 3 4 5 6
| struct HelloWorld { unsigned long index; string message; char data[1024*1024]; };
|
4.3.2 发布者与订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
bool HelloWorldPublisher::init() { hello_ = std::make_shared<HelloWorld>(); hello_->index(0); hello_->message("HelloWorld"); DomainParticipantQos pqos; pqos.wire_protocol().builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE; pqos.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true; pqos.wire_protocol().builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true; pqos.wire_protocol().builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true; pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite; pqos.name("Participant_pub");
pqos.transport().use_builtin_transports = false;
auto shm_transport = std::make_shared<SharedMemTransportDescriptor>(); shm_transport->segment_size(2 * 1024 * 1024); pqos.transport().user_transports.push_back(shm_transport);
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);
DataWriterQos wqos; wqos.history().kind = KEEP_LAST_HISTORY_QOS; wqos.history().depth = 30; wqos.resource_limits().max_samples = 50; wqos.resource_limits().allocated_samples = 20; wqos.reliable_writer_qos().times.heartbeatPeriod.seconds = 2; wqos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 200 * 1000 * 1000; wqos.reliability().kind = RELIABLE_RELIABILITY_QOS; wqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;
writer_ = publisher_->create_datawriter(topic_, wqos, &listener_); }
bool HelloWorldPublisher::publish( bool waitForListener) { if (listener_.first_connected_ || !waitForListener || listener_.matched_ > 0) { hello_->index(hello_->index() + 1); size_t data_size = hello_->data().size(); std::string s = "BigData" + std::to_string(hello_->index() % 10); strcpy(&hello_->data()[data_size - s.length() - 1], s.c_str());
writer_->write(hello_.get());
return true; } return false; }
|
4.4 HelloWorldExampleTCP