一、介紹
OpenDDS是一個開源的、C++語言編寫的分散式數據交換中間件框架,使用可擴展的、面向對象的模型實現了數據通信。它支持多種傳輸協議(包括TCP/IP、UDP、Shared Memory等),可以在不同的操作系統和開發語言之間進行分散式通信。
OpenDDS採用了Data Distribution Service (DDS)規範,由Object Management Group (OMG)維護,是一種面向數據的、高性能、可靠的消息傳遞中間件。
OpenDDS的重點是處理高吞吐量、低延遲的數據傳輸。它提供了靈活的QoS機制(Quality of Service,服務質量),使得開發者可以根據需求調整數據傳輸的優先順序、可靠性和延遲等參數。
二、安裝
OpenDDS的安裝分為以下幾個步驟:
1. 下載
首先需要從OpenDDS官方網站上下載源代碼:http://opendds.org/downloads.html
2. 安裝依賴
在安裝OpenDDS之前,需要確保系統中安裝了以下依賴庫:
sudo apt-get install libboost-all-dev libssl-dev libace-dev
3. 編譯安裝
接下來進入源代碼的根目錄,執行以下命令進行編譯和安裝:
./configure --prefix=/usr/local/opendds
make -j4
sudo make install
三、基本概念
在使用OpenDDS開發分散式應用程序之前,需要了解以下基本概念:
1. Topic
Topic是指一種數據類型的集合,可以理解為一個「主題」,可以被發布者發布和訂閱者訂閱。每個Topic都有一個唯一的名稱和類型。
2. Publisher和Subscriber
Publisher是數據發布者,用於將消息發送給訂閱者,而Subscriber則是數據訂閱者,用於接收發布者發送的消息。一個Publisher或Subscriber可以訂閱或發布多個Topic。
3. DataWriter和DataReader
DataWriter是發布者中的數據寫入器,用於將消息發送到Topic中。DataReader是訂閱者中的數據讀取器,用於從Topic中讀取消息。一個DataWriter或DataReader只能發布或訂閱一個Topic。
4. Domain
Domain是指OpenDDS中的一個消息域,可以理解為消息傳輸的一個獨立區域。每個Domain都有一個唯一的標識符(Domain ID),用於標識消息傳輸中的不同域。一個域中可以包含多個Publisher和Subscriber,它們可以通過共享同一個Topic進行通信。
四、使用指南
以下是一個簡單的OpenDDS程序:
#include <dds/DCPS/Service_Participant.h>
#include <dds/DCPS/PublisherImpl.h>
#include <dds/DCPS/SubscriberImpl.h>
#include <dds/DCPS/WaitSet.h>
#include <dds/DCPS/transport/framework/TheTransportFactory.h>
#include <dds/DCPS/transport/framework/TransportDefs.h>
#include <dds/DCPS/transport/tcp/TcpInst.h>
#include "ExampleTypeSupportImpl.h"
int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) {
// 初始化DDS
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
// 創建一個Domain
DDS::DomainParticipant_var participant = dpf->create_participant(
42,
PARTICIPANT_QOS_DEFAULT,
DDS::DomainParticipantListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(participant.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create domain participant\n"), -1);
}
// 註冊數據類型
Example::ExampleTypeSupport_var ts = new Example::ExampleTypeSupportImpl();
if (ts->register_type(participant.in(), "") != DDS::RETCODE_OK) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to register the Example type\n"), -1);
}
// 創建Publisher和Subscriber
DDS::Publisher_var pub = participant->create_publisher(PUBLISHER_QOS_DEFAULT,
DDS::PublisherListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(pub.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create publisher\n"), -1);
}
DDS::Subscriber_var sub = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
DDS::SubscriberListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(sub.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create subscriber\n"), -1);
}
// 創建Topic
DDS::Topic_var topic = participant->create_topic("ExampleTopic",
ts->get_type_name(),
TOPIC_QOS_DEFAULT,
DDS::TopicListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(topic.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create topic\n"), -1);
}
// 創建DataWriter和DataReader
DDS::DataWriter_var dw = pub->create_datawriter(topic.in(),
DATAWRITER_QOS_DEFAULT,
DDS::DataWriterListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(dw.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create data writer\n"), -1);
}
DDS::DataReader_var dr = sub->create_datareader(topic.in(),
DATAREADER_QOS_DEFAULT,
DDS::DataReaderListener::_nil(),
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (CORBA::is_nil(dr.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to create data reader\n"), -1);
}
Example::ExampleDataWriter_var example_dw = Example::ExampleDataWriter::_narrow(dw.in());
if (CORBA::is_nil(example_dw.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data writer\n"), -1);
}
Example::ExampleDataReader_var example_dr = Example::ExampleDataReader::_narrow(dr.in());
if (CORBA::is_nil(example_dr.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data reader\n"), -1);
}
// 發布數據
Example::Example data;
data.key = 1;
data.value = "Hello, world!";
example_dw->write(data, DDS::HANDLE_NIL);
// 等待接收數據
DDS::DataReaderSeq readers;
DDS::SampleInfoSeq infos;
DDS::ReturnCode_t result;
do {
// 等待可讀事件
DDS::WaitSet_var ws(new DDS::WaitSet);
ws->attach_condition(dr->create_readcondition(DDS::NOT_READ_SAMPLE_STATE,
DDS::NEW_VIEW_STATE,
DDS::ANY_INSTANCE_STATE));
result = sub->get_readers(readers, infos, DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
if (result != DDS::RETCODE_OK && result != DDS::RETCODE_NO_DATA) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to get readers\n"), -1);
}
ACE_DEBUG((LM_INFO, "readers.length() = %d\n", readers.length()));
// 接收數據
int count = 0;
for (unsigned int i = 0; i < readers.length(); ++i) {
Example::ExampleDataReader_var dr = Example::ExampleDataReader::_narrow(readers[i]);
if (CORBA::is_nil(dr.in())) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to narrow data reader\n"), -1);
}
for (unsigned int j = 0; j take_next_sample(data, infos[j]);
if (result == DDS::RETCODE_OK) {
ACE_DEBUG((LM_INFO, "Received data: key = %d, value = %C\n",
data.key, data.value.in()));
++count;
} else if (result == DDS::RETCODE_NO_DATA) {
continue;
} else {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Failed to take next sample\n"), -1);
}
}
}
}
DDS::ConditionSeq active_conditions;
if (ws->wait(active_conditions, DDS::DURATION_INFINITE) != DDS::RETCODE_OK) {
ACE_ERROR_RETURN((LM_ERROR, "ERROR: Wait failed\n"), -1);
}
ws->detach_condition(active_conditions[0]);
} while (result == DDS::RETCODE_NO_DATA);
// 銷毀資源
dpf->delete_participant(participant.in());
TheTransportFactory->release();
::DDS::StringSeq_var strings = dpf->get_log_lines();
for (CORBA::ULong i = 0; i length(); ++i) {
ACE_DEBUG((LM_DEBUG, "%C\n", strings[i].in()));
}
return 0;
}
該程序創建了一個Domain,並註冊了一個名為「Example」的數據類型。接下來創建了一個Publisher和一個Subscriber,並創建了一個名為「ExampleTopic」的Topic。最後,程序向Topic中發布了一條數據,再從Topic中接收數據並輸出。
五、總結
OpenDDS是一個開源的、高性能的消息傳遞中間件框架,支持多種傳輸協議和QoS機制。使用OpenDDS可以方便地實現分散式應用程序中的數據交換和通訊,具有廣泛的應用前景。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/231729.html