FastDDS服务发现之PDP的收发
目录
- PDP发送
- PDP接收
- EDP更新
- EntityID
通过FastDDS服务发现之PDP和EDP的创建这一节内容,可以了解服务发现的概念,机制和PDP/EDP中各类对象的创建,本文详细介绍Simple PDP发送数据,接收数据和处理报文的流程。
PDP发送
通过在RTPSParticipantImpl::enable
中调用BuiltinProtocols::enable
函数,开始服务发现的PDP的报文发送
void BuiltinProtocols::enable()
{if (nullptr != mp_PDP){mp_PDP->enable();mp_PDP->announceParticipantState(true);mp_PDP->resetParticipantAnnouncement();}
}
在mp_PDP->enable()
中主要实现分为三部分:
- 创建一个定时器,用于周期性发送PDP报文
bool PDP::enable()
{...resend_participant_info_event_ = new TimedEvent(mp_RTPSParticipant->getEventResource(),[&]() -> bool{announceParticipantState(false);set_next_announcement_interval();return true;},0);set_initial_announcement_interval();...
}
通过函数set_initial_announcement_interval()
设置初始发送周期为100ms,按这个周期连续发送initial_announcements_.count
个报文后,周期重新设定为m_discovery.discovery_config.leaseDuration_announcementperiod
,默认是3s
《TODO:抓包示例》
2. 发现自己
bool PDP::enable()
{...// Notify "self-discovery"getRTPSParticipant()->on_entity_discovery(mp_RTPSParticipant->getGuid(),get_participant_proxy_data(mp_RTPSParticipant->getGuid().guidPrefix)->m_properties);...
}
这里调用
bool StatisticsParticipantImpl::are_statistics_writers_enabled(uint32_t checked_enabled_writers)
{return (enabled_writers_mask_ & checked_enabled_writers);
}
默认enabled_writers_mask_
为0,需要实际并没有做什么操作(TODO)
3. 分配组播和单播地址:mp_PDP->enable()
中继续调用builtin_endpoints_->enable_pdp_readers
函数,这个函数最终调到RTPSParticipantImpl::assignEndpointListenResources
:
bool RTPSParticipantImpl::assignEndpointListenResources(Endpoint* endp)
{//Tag the endpoint with the ReceiverResourcesbool valid = true;//UNICASTassignEndpoint2LocatorList(endp, endp->getAttributes().unicastLocatorList);//MULTICASTassignEndpoint2LocatorList(endp, endp->getAttributes().multicastLocatorList);return valid;
}
bool RTPSParticipantImpl::assignEndpoint2LocatorList(Endpoint* endp,LocatorList_t& list)
{for (auto lit = list.begin(); lit != list.end(); ++lit){//Iteration of all Locators within the Locator list passed down as argumentstd::lock_guard<std::mutex> guard(m_receiverResourcelistMutex);//Check among ReceiverResources whether the locator is supported or notfor (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it){if (it->Receiver->SupportsLocator(*lit)){it->mp_receiver->associateEndpoint(endp);}}return true;
}
默认单播地址为“本地ip:7410”
,组播地址为“239.255.0.1:7400”
,assignEndpoint2LocatorList
中会通过判断locator,具体来说是判断locator中的kind和port,这一步是通过调用ReceiverResource
的lambda来判断:
LocatorMapsToManagedChannel = [&transport, locator](const Locator_t& locatorToCheck) -> bool
{return locator.kind == locatorToCheck.kind && transport.DoInputLocatorsMatch(locator, locatorToCheck);
};
如果判断结果为true, assignEndpoint2LocatorList
中继续调用 MessageReceiver::associateEndpoint
函数,会将PDP对象的{EntityID:vector<RTPSReader*>}
添加到MessageReceiver::associated_readers_
中, 收到PDP报文后的处理流程同subscriber一致,收到消息后查找对应的RTPSReader并进行下一步处理。
oid MessageReceiver::associateEndpoint(Endpoint* to_add)
{...const auto reader = dynamic_cast<RTPSReader*>(to_add);const auto entityId = reader->getGuid().entityId;// search for set of readers by entity IDconst auto readers = associated_readers_.find(entityId);if (readers == associated_readers_.end()){auto vec = std::vector<RTPSReader*>();vec.push_back(reader);associated_readers_.emplace(entityId, vec);}...
}
将 {entityid: PDPreader}
添加到associated_readers_
用于接收收到其他participant的服务发现消息。这样就完成了reader对这几个地址的监听。
mp_PDP->announceParticipantState(true)
:完成在mp_PDP->enable()
的三部分后,开始调用PDPSimple::announceParticipantState
发送第一个PDP数据包,具体报文的组装发送在PDP::announceParticipantState
中实现。
随后调用mp_PDP->resetParticipantAnnouncement()
开启定时器,开始周期性发送。
PDP接收
PDPListener用于监听和接收PDP报文,接收流程同其他DataReader的接收流程,具体可以参考 FastDDS之UDP通信,UDP层收到PDP消息后调用PDPListener::onNewCacheChangeAdded
开始处理PDP报文
void PDPListener::onNewCacheChangeAdded(RTPSReader* reader,const CacheChange_t* const change_in)
{...if (change->kind == ALIVE){// 如果是自己,则直接返回,因为在创建之初就已经添加了if (guid == parent_pdp_->getRTPSParticipant()->getGuid())...CDRMessage_t msg(change->serializedPayload);temp_participant_data_.clear();if (temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory(),parent_pdp_->getRTPSParticipant()->has_shm_transport(), true, change_in->vendor_id)){// After correctly reading itchange->instanceHandle = temp_participant_data_.m_key;guid = temp_participant_data_.m_guid;if (parent_pdp_->getRTPSParticipant()->is_participant_ignored(guid.guidPrefix)){return;}// Filter locatorsconst auto& pattr = parent_pdp_->getRTPSParticipant()->getAttributes();fastdds::rtps::network::external_locators::filter_remote_locators(temp_participant_data_,pattr.builtin.metatraffic_external_unicast_locators, pattr.default_external_unicast_locators,pattr.ignore_non_matching_locators);// Check if participant already exists (updated info)ParticipantProxyData* pdata = nullptr;bool already_processed = false;for (ParticipantProxyData* it : parent_pdp_->participant_proxies_){if (guid == it->m_guid){pdata = it;if (it->m_sample_identity.writer_guid() == change->writerGUID &&it->m_sample_identity.sequence_number() == change->sequenceNumber){already_processed = true;}break;}}// Only process the DATA(p) if it is not a repeated oneif (!already_processed){temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID);temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber);process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);}}}...//Remove change form history.parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change);
}
主要分三种情况:
- 如果发现了自己(通过对比guid:
guid == parent_pdp_->getRTPSParticipant()->getGuid()
)不做任何处理直接返回,因为自身的ParticipantProxyData已经添加到participant_proxies_
中了,细节请参考FastDDS服务发现之PDP和EDP的创建 - 如果是第一次发现某一个其他Participant或者该Participant的Sample没有被处理过(TODO:在什么情况下没有被处理过),PDP的数据会保存到
ParticipantProxyData
的对象中,调用PDPListener::process_alive_data
进行下一步处理 - 如果这个Participant已经被发现和接收处理了(通过
already_processed
判断,already_processed
是由guid
,SampleIdentity
中writer_guid
和sequence_number
一致才可以置为true,TODO:判断条件),也不做任何处理退出 - 上述三种情况完成处理后都会调用
remove_from_pdp_reader_history
从datareader的history中删除CacheChange。
void PDPListener::process_alive_data(ParticipantProxyData* old_data,ParticipantProxyData& new_data,GUID_t& writer_guid,RTPSReader* reader,std::unique_lock<std::recursive_mutex>& lock)
{GUID_t participant_guid = new_data.m_guid;if (old_data == nullptr){// Create a new one when not foundold_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid);if (old_data != nullptr){// Copy proxy to be passed forward before releasing PDP mutexParticipantProxyData old_data_copy(*old_data);reader->getMutex().unlock();lock.unlock();parent_pdp_->assignRemoteEndpoints(&old_data_copy);}else{reader->getMutex().unlock();lock.unlock();}}else{old_data->updateData(new_data);old_data->isAlive = true;reader->getMutex().unlock();EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant "<< old_data->m_guid << " at "<< "MTTLoc: " << old_data->metatraffic_locators<< " DefLoc:" << old_data->default_locators);if (parent_pdp_->updateInfoMatchesEDP()){parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true);}// Copy proxy to be passed forward before releasing PDP mutexParticipantProxyData old_data_copy(*old_data);lock.unlock();RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener();if (listener != nullptr){bool should_be_ignored = false;{std::lock_guard<std::mutex> cb_lock(parent_pdp_->callback_mtx_);ParticipantDiscoveryInfo info(old_data_copy);info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT;listener->onParticipantDiscovery(parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(),std::move(info),should_be_ignored);}if (should_be_ignored){parent_pdp_->getRTPSParticipant()->ignore_participant(participant_guid.guidPrefix);}}}#ifdef FASTDDS_STATISTICS//! Addition or update of a participant proxy should trigger//! a connections update on the local participant connection listif (nullptr != parent_pdp_->getRTPSParticipant()->get_connections_observer()){parent_pdp_->getRTPSParticipant()->get_connections_observer()->on_local_entity_connections_change(parent_pdp_->getRTPSParticipant()->getGuid());}
#endif //FASTDDS_STATISTICS// Take again the reader lockreader->getMutex().lock();
}
PDPListener::process_alive_data
中的处理分为两种情况:parent_pdp_->participant_proxies_
中没有ParticipantProxyData
和接收到PDP报文相同的guid(对应if(old_data == nullptr)
分支);parent_pdp_->participant_proxies_
中找到ParticipantProxyData
和接收到PDP报文相同的guid。
第一种情况:通过调用parent_pdp_->createParticipantProxyData
创建一个ParticipantProxyData
对象用来保存新发现的participant的对象信息,再调用PDPSimple::assignRemoteEndpoints
,PDPSimple::assignRemoteEndpoints
中调用PDPSimple::match_pdp_remote_endpoints
和PDPSimple::assign_low_level_remote_endpoints
。
void PDPSimple::match_pdp_remote_endpoints(const ParticipantProxyData& pdata,bool notify_secure_endpoints)
{
#if !HAVE_SECURITYstatic_cast<void>(notify_secure_endpoints);
#endif // !HAVE_SECURITYauto endpoints = static_cast<fastdds::rtps::SimplePDPEndpoints*>(builtin_endpoints_.get());const NetworkFactory& network = mp_RTPSParticipant->network_factory();bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast ||pdata.metatraffic_locators.unicast.empty();const uint32_t endp = pdata.m_availableBuiltinEndpoints;// Default to values for non-secure endpointsauto reliability_kind = BEST_EFFORT_RELIABILITY_QOS;uint32_t pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;uint32_t pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER;EntityId_t reader_entity_id = c_EntityId_SPDPReader;EntityId_t writer_entity_id = c_EntityId_SPDPWriter;RTPSReader* reader = endpoints->reader.reader_;RTPSWriter* writer = endpoints->writer.writer_;if (0 != (endp & pdp_writer_mask)){auto temp_writer_data = get_temporary_writer_proxies_pool().get();temp_writer_data->clear();temp_writer_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_writer_data->guid().entityId = writer_entity_id;temp_writer_data->persistence_guid(pdata.get_persistence_guid());temp_writer_data->set_persistence_entity_id(writer_entity_id);temp_writer_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_writer_data->m_qos.m_reliability.kind = reliability_kind;temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;{reader->matched_writer_add(*temp_writer_data);}}if (0 != (endp & pdp_reader_mask)){auto temp_reader_data = get_temporary_reader_proxies_pool().get();temp_reader_data->clear();temp_reader_data->m_expectsInlineQos = false;temp_reader_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_reader_data->guid().entityId = reader_entity_id;temp_reader_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_reader_data->m_qos.m_reliability.kind = reliability_kind;temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;{writer->matched_reader_add(*temp_reader_data);}if (BEST_EFFORT_RELIABILITY_QOS == reliability_kind){endpoints->writer.writer_->unsent_changes_reset();}}
}
这段代码主要设置了ProxyPool<WriterProxyData>
和ProxyPool<ReaderProxyData>
这两个l临时对象用于存放PDP发现的Participant中的RTPSWriter
和RTPSReader
对象,其中guidprefix
,persistence_guid
,metatraffic_locators
都来自PDP报文中的值,其他都是固定赋值。然后使用这两个临时对象通过调用StatelessReader::matched_writer_add
/StatelessWriter::matched_reader_add
更新当前PDP的RTPSReader
和RTPSWriter
。
bool StatelessReader::matched_writer_add(const WriterProxyData& wdata)
{ReaderListener* listener = nullptr;{std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);listener = mp_listener;for (RemoteWriterInfo_t& writer : matched_writers_){if (writer.guid == wdata.guid()){EPROSIMA_LOG_INFO(RTPS_READER, "Attempting to add existing writer, updating information");if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind &&writer.ownership_strength != wdata.m_qos.m_ownershipStrength.value){mp_history->writer_update_its_ownership_strength_nts(writer.guid, wdata.m_qos.m_ownershipStrength.value);}writer.ownership_strength = wdata.m_qos.m_ownershipStrength.value;if (nullptr != listener){// call the listener without the lock takenguard.unlock();listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(),&wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}}bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());bool is_datasharing = is_datasharing_compatible_with(wdata);RemoteWriterInfo_t info;info.guid = wdata.guid();info.persistence_guid = wdata.persistence_guid();info.has_manual_topic_liveliness = (MANUAL_BY_TOPIC_LIVELINESS_QOS == wdata.m_qos.m_liveliness.kind);info.is_datasharing = is_datasharing;info.ownership_strength = wdata.m_qos.m_ownershipStrength.value;if (is_datasharing){if (datasharing_listener_->add_datasharing_writer(wdata.guid(),m_att.durabilityKind == VOLATILE,mp_history->m_att.maximumReservedCaches)){EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId<< " with data sharing");}else{EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()<< " to " << this->m_guid.entityId<< " with data sharing.");return false;}}if (matched_writers_.emplace_back(info) == nullptr){EPROSIMA_LOG_WARNING(RTPS_READER, "No space to add writer " << wdata.guid() << " to reader " << m_guid);if (is_datasharing){datasharing_listener_->remove_datasharing_writer(wdata.guid());}return false;}EPROSIMA_LOG_INFO(RTPS_READER, "Writer " << wdata.guid() << " added to reader " << m_guid);add_persistence_guid(info.guid, info.persistence_guid);m_acceptMessagesFromUnkownWriters = false;// Intraprocess manages durability itselfif (is_datasharing && !is_same_process && m_att.durabilityKind != VOLATILE){// simulate a notification to force reading of transient changes// this has to be done after the writer is added to the matched_writers or the processing may faildatasharing_listener_->notify(false);}}if (liveliness_lease_duration_ < c_TimeInfinite){auto wlp = mp_RTPSParticipant->wlp();if ( wlp != nullptr){wlp->sub_liveliness_manager_->add_writer(wdata.guid(),liveliness_kind_,liveliness_lease_duration_);}else{EPROSIMA_LOG_ERROR(RTPS_LIVELINESS, "Finite liveliness lease duration but WLP not enabled");}}if (nullptr != listener){listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}
StatelessReader::matched_writer_add
的核心就是如果已经存在于matched_writers_
中则更新qos等数据,如果不存在则构造一个RemoteWriterInfo_t
对象,用于保存PDP发现的participant中的writer的信息,并将其添加到当前服务发现的端点StatelessReader
的matched_writers_
中。
bool StatelessWriter::matched_reader_add(const ReaderProxyData& data)
{using fastdds::rtps::network::external_locators::filter_remote_locators;std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);std::unique_lock<LocatorSelectorSender> locator_selector_guard(locator_selector_);assert(data.guid() != c_Guid_Unknown);if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,[this, &data](ReaderLocator& reader){if (reader.remote_guid() == data.guid()){EPROSIMA_LOG_WARNING(RTPS_WRITER, "Attempting to add existing reader, updating information.");if (reader.update(data.remote_locators().unicast,data.remote_locators().multicast,data.m_expectsInlineQos)){filter_remote_locators(*reader.general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);update_reader_info(true);}return true;}return false;})){if (nullptr != mp_listener){// call the listener without locks takenlocator_selector_guard.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::CHANGED_QOS_READER, data.guid(), &data);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}// Get a locator from the inactive pool (or create a new one if necessary and allowed)std::unique_ptr<ReaderLocator> new_reader;if (matched_readers_pool_.empty()){size_t max_readers = matched_readers_pool_.max_size();if (getMatchedReadersSize() + matched_readers_pool_.size() < max_readers){const RemoteLocatorsAllocationAttributes& loc_alloc =mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators;new_reader.reset(new ReaderLocator(this,loc_alloc.max_unicast_locators,loc_alloc.max_multicast_locators));}else{EPROSIMA_LOG_WARNING(RTPS_WRITER, "Couldn't add matched reader due to resource limits");return false;}}else{new_reader = std::move(matched_readers_pool_.back());matched_readers_pool_.pop_back();}// Add info of new datareader.new_reader->start(data.guid(),data.remote_locators().unicast,data.remote_locators().multicast,data.m_expectsInlineQos,is_datasharing_compatible_with(data));filter_remote_locators(*new_reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());if (new_reader->is_local_reader()){matched_local_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as local reader");}else if (new_reader->is_datasharing_reader()){matched_datasharing_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as data sharing");}else{matched_remote_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as remote reader");}update_reader_info(true);if (nullptr != mp_listener){// call the listener without locks takenlocator_selector_guard.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, data.guid(), &data);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}
StatelessWriter::matched_reader_add
的更新类似,先构造一个ReaderLocator
对象,ReaderLocator
中保存了unicast和multicast的Locator_t
用于发送向这些Locator_t
发送PDP报文,还有其他需要的信息,将ReaderLocator
对象保存在本地(分别对应matched_local_readers_
,matched_datasharing_readers_
, matched_remote_readers_
),接着通过调用StatelessWriter::update_reader_info
更新LocatorSelectorSender
信息,并调用RTPSParticipantImpl::createSenderResources
创建Sender Resource。sender resource的创建可以参考这里:TODO)
void StatelessWriter::update_reader_info(bool create_sender_resources)
{...update_cached_info_nts(locator_selector_);if (addGuid){compute_selected_guids(locator_selector_);}if (create_sender_resources){RTPSParticipantImpl* part = mp_RTPSParticipant;locator_selector_.locator_selector.for_each([part](const Locator_t& loc){part->createSenderResources(loc);});}
}
以上完成了通过PDP报文更新PDP数据的过程了。
EDP更新
PDPSimple::assign_low_level_remote_endpoints
主要用于EDP对象的更新,实现如下:
void PDPSimple::assign_low_level_remote_endpoints(const ParticipantProxyData& pdata,bool notify_secure_endpoints)
{if (mp_EDP != nullptr){mp_EDP->assignRemoteEndpoints(pdata, notify_secure_endpoints);}...
}
这里主要关注通过调用EDPSimple::assignRemoteEndpoints
实现对EDP对象的更新:
void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata,bool assign_secure_endpoints)
{EPROSIMA_LOG_INFO(RTPS_EDP, "New DPD received, adding remote endpoints to our SimpleEDP endpoints");const NetworkFactory& network = mp_RTPSParticipant->network_factory();uint32_t endp = pdata.m_availableBuiltinEndpoints;uint32_t auxendp;bool use_multicast_locators = !mp_PDP->getRTPSParticipant()->getAttributes().builtin.avoid_builtin_multicast ||pdata.metatraffic_locators.unicast.empty();auto temp_reader_proxy_data = get_temporary_reader_proxies_pool().get();temp_reader_proxy_data->clear();temp_reader_proxy_data->m_expectsInlineQos = false;temp_reader_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_reader_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_reader_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;temp_reader_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;auto temp_writer_proxy_data = get_temporary_writer_proxies_pool().get();temp_writer_proxy_data->clear();temp_writer_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_writer_proxy_data->persistence_guid(pdata.get_persistence_guid());temp_writer_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_writer_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;temp_writer_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;if (auxendp != 0 && publications_reader_.first != nullptr) //Exist Pub Writer and i have pub reader{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Writer to my Pub Reader");temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPPubWriter;temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPPubWriter);publications_reader_.first->matched_writer_add(*temp_writer_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR;if (auxendp != 0 && publications_writer_.first != nullptr) //Exist Pub Detector{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Reader to my Pub Writer");temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPPubReader;publications_writer_.first->matched_reader_add(*temp_reader_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;if (auxendp != 0 && subscriptions_reader_.first != nullptr) //Exist Pub Announcer{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Writer to my Sub Reader");temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPSubWriter;temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPSubWriter);subscriptions_reader_.first->matched_writer_add(*temp_writer_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR;if (auxendp != 0 && subscriptions_writer_.first != nullptr) //Exist Pub Announcer{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPSubReader;subscriptions_writer_.first->matched_reader_add(*temp_reader_proxy_data);}#if HAVE_SECURITYauxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER;if (auxendp != 0 && publications_secure_reader_.first != nullptr && assign_secure_endpoints){temp_writer_proxy_data->guid().entityId = sedp_builtin_publications_secure_writer;temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_publications_secure_writer);if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(publications_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,publications_secure_reader_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<publications_secure_reader_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR;if (auxendp != 0 && publications_secure_writer_.first != nullptr && assign_secure_endpoints){temp_reader_proxy_data->guid().entityId = sedp_builtin_publications_secure_reader;if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(publications_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,publications_secure_writer_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<publications_secure_writer_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER;if (auxendp != 0 && subscriptions_secure_reader_.first != nullptr && assign_secure_endpoints){temp_writer_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_writer;temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_subscriptions_secure_writer);if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(subscriptions_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,subscriptions_secure_reader_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<subscriptions_secure_reader_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR;if (auxendp != 0 && subscriptions_secure_writer_.first != nullptr && assign_secure_endpoints){EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");temp_reader_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_reader;if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(subscriptions_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,subscriptions_secure_writer_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<subscriptions_secure_writer_.first->getGuid());}}
#elsestatic_cast<void>(assign_secure_endpoints);
#endif // if HAVE_SECURITY}
这段代码中通过设置ProxyPool<ReaderProxyData>
和ProxyPool<WriterProxyData>
两个临时对象,用于更新EDP的两对Endpoints端点,其中ProxyPool<WriterProxyData>
用于更新publications_reader_
和subscriptions_reader_
,ProxyPool<ReaderProxyData>
用于更新publications_writer_
和subscriptions_writer_
。这里只更新对端的guid,和metatraffic_locators
(单播地址,默认对端ip:7410
),durability qos为TRANSIENT_LOCAL_DURABILITY_QOS
,reliability qos为RELIABLE_RELIABILITY_QOS
。
bool StatefulReader::matched_writer_add(const WriterProxyData& wdata)
{assert(wdata.guid() != c_Guid_Unknown);ReaderListener* listener = nullptr;{std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);if (!is_alive_){return false;}listener = mp_listener;bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());bool is_datasharing = is_datasharing_compatible_with(wdata);for (WriterProxy* it : matched_writers_){if (it->guid() == wdata.guid()){EPROSIMA_LOG_INFO(RTPS_READER, "Attempting to add existing writer, updating information");// If Ownership strength changes then update all history instances.if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind &&it->ownership_strength() != wdata.m_qos.m_ownershipStrength.value){mp_history->writer_update_its_ownership_strength_nts(it->guid(), wdata.m_qos.m_ownershipStrength.value);}it->update(wdata);if (!is_same_process){for (const Locator_t& locator : it->remote_locators_shrinked()){getRTPSParticipant()->createSenderResources(locator);}}if (nullptr != listener){// call the listener without the lock takenguard.unlock();listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}}// Get a writer proxy from the inactive pool (or create a new one if necessary and allowed)WriterProxy* wp = nullptr;if (matched_writers_pool_.empty()){size_t max_readers = matched_writers_pool_.max_size();if (getMatchedWritersSize() + matched_writers_pool_.size() < max_readers){const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();wp = new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_);}else{EPROSIMA_LOG_WARNING(RTPS_READER, "Maximum number of reader proxies (" << max_readers << \") reached for writer " << m_guid);return false;}}else{wp = matched_writers_pool_.back();matched_writers_pool_.pop_back();}SequenceNumber_t initial_sequence;add_persistence_guid(wdata.guid(), wdata.persistence_guid());initial_sequence = get_last_notified(wdata.guid());wp->start(wdata, initial_sequence, is_datasharing);if (!is_same_process){for (const Locator_t& locator : wp->remote_locators_shrinked()){getRTPSParticipant()->createSenderResources(locator);}}if (is_datasharing){if (datasharing_listener_->add_datasharing_writer(wdata.guid(),m_att.durabilityKind == VOLATILE,mp_history->m_att.maximumReservedCaches)){matched_writers_.push_back(wp);EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId<< " with data sharing");}else{EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()<< " to " << this->m_guid.entityId<< " with data sharing.");{// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finishguard.unlock();assert(!guard.owns_lock());wp->stop();guard.lock();}matched_writers_pool_.push_back(wp);return false;}// Intraprocess manages durability itselfif (VOLATILE == m_att.durabilityKind){std::shared_ptr<ReaderPool> pool = datasharing_listener_->get_pool_for_writer(wp->guid());SequenceNumber_t last_seq = pool->get_last_read_sequence_number();if (SequenceNumber_t::unknown() != last_seq){SequenceNumberSet_t sns(last_seq + 1);send_acknack(wp, sns, wp, false);wp->lost_changes_update(last_seq + 1);}}else{// simulate a notification to force reading of transient changesdatasharing_listener_->notify(false);}}else{matched_writers_.push_back(wp);EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wp->guid() << " added to " << m_guid.entityId);}}if (liveliness_lease_duration_ < c_TimeInfinite){auto wlp = this->mp_RTPSParticipant->wlp();if ( wlp != nullptr){wlp->sub_liveliness_manager_->add_writer(wdata.guid(),liveliness_kind_,liveliness_lease_duration_);}else{EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,"Finite liveliness lease duration but WLP not enabled, cannot add writer");}}if (nullptr != listener){listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}
如果在StatefulReader::matched_writers_
中没有查找到PDP发现的guid则创建一个新的WriterProxy
对象EDP阶段需要的数据。因为EDP是reliable模式,在WriterProxy
的构造中会创建heartbeat
和acknack
的定时器。
{//Create EventsResourceEvent& event_manager = reader_->getEventResource();auto heartbeat_lambda = [this]() -> bool{perform_heartbeat_response();return false;};auto acknack_lambda = [this]() -> bool{return perform_initial_ack_nack();};heartbeat_response_ = new TimedEvent(event_manager, heartbeat_lambda, 0);initial_acknack_ = new TimedEvent(event_manager, acknack_lambda, 0);clear();EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy created in reader: " << reader_->getGuid().entityId);
}
接着创建发送资源sender resource。
bool StatefulWriter::matched_reader_add(const ReaderProxyData& rdata)
{using fastdds::rtps::network::external_locators::filter_remote_locators;if (rdata.guid() == c_Guid_Unknown){EPROSIMA_LOG_ERROR(RTPS_WRITER, "Reliable Writer need GUID_t of matched readers");return false;}std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);// Check if it is already matched.if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,[this, &rdata](ReaderProxy* reader){if (reader->guid() == rdata.guid()){EPROSIMA_LOG_INFO(RTPS_WRITER, "Attempting to add existing reader, updating information.");if (reader->update(rdata)){filter_remote_locators(*reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);filter_remote_locators(*reader->async_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);update_reader_info(locator_selector_general_, true);update_reader_info(locator_selector_async_, true);}return true;}return false;})){if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::CHANGED_QOS_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}// Get a reader proxy from the inactive pool (or create a new one if necessary and allowed)ReaderProxy* rp = nullptr;if (matched_readers_pool_.empty()){size_t max_readers = matched_readers_pool_.max_size();if (getMatchedReadersSize() + matched_readers_pool_.size() < max_readers){const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();rp = new ReaderProxy(m_times, part_att.allocation.locators, this);}else{EPROSIMA_LOG_WARNING(RTPS_WRITER, "Maximum number of reader proxies (" << max_readers <<") reached for writer " << m_guid);return false;}}else{rp = matched_readers_pool_.back();matched_readers_pool_.pop_back();}// Add info of new datareader.rp->start(rdata, is_datasharing_compatible_with(rdata));filter_remote_locators(*rp->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);filter_remote_locators(*rp->async_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_general_.locator_selector.add_entry(rp->general_locator_selector_entry());locator_selector_async_.locator_selector.add_entry(rp->async_locator_selector_entry());if (rp->is_local_reader()){matched_local_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as local reader");}else{if (rp->is_datasharing_reader()){matched_datasharing_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as data sharing");}else{matched_remote_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as remote reader");}}update_reader_info(locator_selector_general_, true);update_reader_info(locator_selector_async_, true);if (rp->is_datasharing_reader()){if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;}bool is_reliable = rp->is_reliable();if (is_reliable){SequenceNumber_t min_seq = get_seq_num_min();SequenceNumber_t last_seq = get_seq_num_max();RTPSMessageGroup group(mp_RTPSParticipant, this, rp->message_sender());// History not emptyif (min_seq != SequenceNumber_t::unknown()){(void)last_seq;assert(last_seq != SequenceNumber_t::unknown());assert(min_seq <= last_seq);try{// Late-joinerif (TRANSIENT_LOCAL <= rp->durability_kind() &&TRANSIENT_LOCAL <= m_att.durabilityKind){for (History::iterator cit = mp_history->changesBegin(); cit != mp_history->changesEnd(); ++cit){// Holes are managed when deliver_sample(), sending GAP messages.if (rp->rtps_is_relevant(*cit)){ChangeForReader_t changeForReader(*cit);// If it is local, maintain in UNSENT status and add to flow controller.if (rp->is_local_reader()){flow_controller_->add_old_sample(this, *cit);}// In other case, set as UNACKNOWLEDGED and expects the reader request them.else{changeForReader.setStatus(UNACKNOWLEDGED);}rp->add_change(changeForReader, true, false);}}}else{if (rp->is_local_reader()){intraprocess_gap(rp, min_seq, mp_history->next_sequence_number());}else{// Send a GAP of the whole history.group.add_gap(min_seq, SequenceNumberSet_t(mp_history->next_sequence_number()), rp->guid());}}// Always activate heartbeat period. We need a confirmation of the reader.// The state has to be updated.periodic_hb_event_->restart_timer(std::chrono::steady_clock::now() + std::chrono::hours(24));}catch (const RTPSMessageGroup::timeout&){EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");}}if (rp->is_local_reader()){intraprocess_heartbeat(rp);}else{send_heartbeat_nts_(1u, group, disable_positive_acks_);group.flush_and_reset();}}else{// Acknowledged all for best-effort reader.rp->acked_changes_set(mp_history->next_sequence_number());}EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy " << rp->guid() << " added to " << this->m_guid.entityId << " with "<< rdata.remote_locators().unicast.size() << "(u)-"<< rdata.remote_locators().multicast.size() <<"(m) locators");if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}
RTPSWriter
中更新reader的逻辑类似,PDP的发现如果在StatefulWriter
的匹配的reader集合中没有找到则新创建一个ReaderProxy
对象.
EntityID
服务发现的EntityID是标准固定,回调函数中通过固定EntityID来调用不同的监听对象。这些固定的Entity ID定义在EntityId_t.hpp中,主要有以下:
const EntityId_t c_EntityId_Unknown = ENTITYID_UNKNOWN;
const EntityId_t c_EntityId_SPDPReader = ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER;
const EntityId_t c_EntityId_SPDPWriter = ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER;const EntityId_t c_EntityId_SEDPPubWriter = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
const EntityId_t c_EntityId_SEDPPubReader = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
const EntityId_t c_EntityId_SEDPSubWriter = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
const EntityId_t c_EntityId_SEDPSubReader = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;const EntityId_t c_EntityId_RTPSParticipant = ENTITYID_RTPSParticipant;const EntityId_t c_EntityId_WriterLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_WRITER;
const EntityId_t c_EntityId_ReaderLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_READER;const EntityId_t participant_stateless_message_writer_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
const EntityId_t participant_stateless_message_reader_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;const EntityId_t c_EntityId_TypeLookup_request_writer = ENTITYID_TL_SVC_REQ_WRITER;
const EntityId_t c_EntityId_TypeLookup_request_reader = ENTITYID_TL_SVC_REQ_READER;
const EntityId_t c_EntityId_TypeLookup_reply_writer = ENTITYID_TL_SVC_REPLY_WRITER;
const EntityId_t c_EntityId_TypeLookup_reply_reader = ENTITYID_TL_SVC_REPLY_READER;