当前位置: 首页 > news >正文

FastDDS服务发现之PDP的收发

目录

  • PDP发送
  • PDP接收
    • EDP更新
  • EntityID

通过FastDDS服务发现之PDP和EDP的创建这一节内容,可以了解服务发现的概念,机制和PDP/EDP中各类对象的创建,本文详细介绍Simple PDP发送数据,接收数据和处理报文的流程。

PDP发送

通过在RTPSParticipantImpl::enable中调用BuiltinProtocols::enable函数,开始服务发现的PDP的报文发送

void BuiltinProtocols::enable()
{if (nullptr != mp_PDP){mp_PDP->enable();mp_PDP->announceParticipantState(true);mp_PDP->resetParticipantAnnouncement();}
}

mp_PDP->enable()中主要实现分为三部分:

  1. 创建一个定时器,用于周期性发送PDP报文
bool PDP::enable()
{...resend_participant_info_event_ = new TimedEvent(mp_RTPSParticipant->getEventResource(),[&]() -> bool{announceParticipantState(false);set_next_announcement_interval();return true;},0);set_initial_announcement_interval();...
}

通过函数set_initial_announcement_interval()设置初始发送周期为100ms,按这个周期连续发送initial_announcements_.count个报文后,周期重新设定为m_discovery.discovery_config.leaseDuration_announcementperiod,默认是3s
《TODO:抓包示例》
2. 发现自己

bool PDP::enable()
{...// Notify "self-discovery"getRTPSParticipant()->on_entity_discovery(mp_RTPSParticipant->getGuid(),get_participant_proxy_data(mp_RTPSParticipant->getGuid().guidPrefix)->m_properties);...
}

这里调用

bool StatisticsParticipantImpl::are_statistics_writers_enabled(uint32_t checked_enabled_writers)
{return (enabled_writers_mask_ & checked_enabled_writers);
}

默认enabled_writers_mask_ 为0,需要实际并没有做什么操作(TODO)
3. 分配组播和单播地址:mp_PDP->enable()中继续调用builtin_endpoints_->enable_pdp_readers函数,这个函数最终调到RTPSParticipantImpl::assignEndpointListenResources

bool RTPSParticipantImpl::assignEndpointListenResources(Endpoint* endp)
{//Tag the endpoint with the ReceiverResourcesbool valid = true;//UNICASTassignEndpoint2LocatorList(endp, endp->getAttributes().unicastLocatorList);//MULTICASTassignEndpoint2LocatorList(endp, endp->getAttributes().multicastLocatorList);return valid;
}
bool RTPSParticipantImpl::assignEndpoint2LocatorList(Endpoint* endp,LocatorList_t& list)
{for (auto lit = list.begin(); lit != list.end(); ++lit){//Iteration of all Locators within the Locator list passed down as argumentstd::lock_guard<std::mutex> guard(m_receiverResourcelistMutex);//Check among ReceiverResources whether the locator is supported or notfor (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it){if (it->Receiver->SupportsLocator(*lit)){it->mp_receiver->associateEndpoint(endp);}}return true;
}

默认单播地址为“本地ip:7410”,组播地址为“239.255.0.1:7400”assignEndpoint2LocatorList中会通过判断locator,具体来说是判断locator中的kind和port,这一步是通过调用ReceiverResource的lambda来判断:

LocatorMapsToManagedChannel = [&transport, locator](const Locator_t& locatorToCheck) -> bool
{return locator.kind == locatorToCheck.kind && transport.DoInputLocatorsMatch(locator, locatorToCheck);
};

如果判断结果为true, assignEndpoint2LocatorList中继续调用 MessageReceiver::associateEndpoint函数,会将PDP对象的{EntityID:vector<RTPSReader*>}添加到MessageReceiver::associated_readers_中, 收到PDP报文后的处理流程同subscriber一致,收到消息后查找对应的RTPSReader并进行下一步处理。

oid MessageReceiver::associateEndpoint(Endpoint* to_add)
{...const auto reader = dynamic_cast<RTPSReader*>(to_add);const auto entityId = reader->getGuid().entityId;// search for set of readers by entity IDconst auto readers = associated_readers_.find(entityId);if (readers == associated_readers_.end()){auto vec = std::vector<RTPSReader*>();vec.push_back(reader);associated_readers_.emplace(entityId, vec);}...
}

{entityid: PDPreader} 添加到associated_readers_用于接收收到其他participant的服务发现消息。这样就完成了reader对这几个地址的监听。

mp_PDP->announceParticipantState(true):完成在mp_PDP->enable()的三部分后,开始调用PDPSimple::announceParticipantState发送第一个PDP数据包,具体报文的组装发送在PDP::announceParticipantState中实现。
随后调用mp_PDP->resetParticipantAnnouncement()开启定时器,开始周期性发送。

PDP接收

PDPListener用于监听和接收PDP报文,接收流程同其他DataReader的接收流程,具体可以参考 FastDDS之UDP通信,UDP层收到PDP消息后调用PDPListener::onNewCacheChangeAdded开始处理PDP报文

void PDPListener::onNewCacheChangeAdded(RTPSReader* reader,const CacheChange_t* const change_in)
{...if (change->kind == ALIVE){// 如果是自己,则直接返回,因为在创建之初就已经添加了if (guid == parent_pdp_->getRTPSParticipant()->getGuid())...CDRMessage_t msg(change->serializedPayload);temp_participant_data_.clear();if (temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory(),parent_pdp_->getRTPSParticipant()->has_shm_transport(), true, change_in->vendor_id)){// After correctly reading itchange->instanceHandle = temp_participant_data_.m_key;guid = temp_participant_data_.m_guid;if (parent_pdp_->getRTPSParticipant()->is_participant_ignored(guid.guidPrefix)){return;}// Filter locatorsconst auto& pattr = parent_pdp_->getRTPSParticipant()->getAttributes();fastdds::rtps::network::external_locators::filter_remote_locators(temp_participant_data_,pattr.builtin.metatraffic_external_unicast_locators, pattr.default_external_unicast_locators,pattr.ignore_non_matching_locators);// Check if participant already exists (updated info)ParticipantProxyData* pdata = nullptr;bool already_processed = false;for (ParticipantProxyData* it : parent_pdp_->participant_proxies_){if (guid == it->m_guid){pdata = it;if (it->m_sample_identity.writer_guid() == change->writerGUID &&it->m_sample_identity.sequence_number() == change->sequenceNumber){already_processed = true;}break;}}// Only process the DATA(p) if it is not a repeated oneif (!already_processed){temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID);temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber);process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);}}}...//Remove change form history.parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change);
}

主要分三种情况:

  1. 如果发现了自己(通过对比guid: guid == parent_pdp_->getRTPSParticipant()->getGuid())不做任何处理直接返回,因为自身的ParticipantProxyData已经添加到participant_proxies_中了,细节请参考FastDDS服务发现之PDP和EDP的创建
  2. 如果是第一次发现某一个其他Participant或者该Participant的Sample没有被处理过(TODO:在什么情况下没有被处理过),PDP的数据会保存到ParticipantProxyData的对象中,调用PDPListener::process_alive_data进行下一步处理
  3. 如果这个Participant已经被发现和接收处理了(通过already_processed判断,already_processed是由guidSampleIdentitywriter_guidsequence_number一致才可以置为true,TODO:判断条件),也不做任何处理退出
  4. 上述三种情况完成处理后都会调用remove_from_pdp_reader_history从datareader的history中删除CacheChange。
void PDPListener::process_alive_data(ParticipantProxyData* old_data,ParticipantProxyData& new_data,GUID_t& writer_guid,RTPSReader* reader,std::unique_lock<std::recursive_mutex>& lock)
{GUID_t participant_guid = new_data.m_guid;if (old_data == nullptr){// Create a new one when not foundold_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid);if (old_data != nullptr){// Copy proxy to be passed forward before releasing PDP mutexParticipantProxyData old_data_copy(*old_data);reader->getMutex().unlock();lock.unlock();parent_pdp_->assignRemoteEndpoints(&old_data_copy);}else{reader->getMutex().unlock();lock.unlock();}}else{old_data->updateData(new_data);old_data->isAlive = true;reader->getMutex().unlock();EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant "<< old_data->m_guid << " at "<< "MTTLoc: " << old_data->metatraffic_locators<< " DefLoc:" << old_data->default_locators);if (parent_pdp_->updateInfoMatchesEDP()){parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true);}// Copy proxy to be passed forward before releasing PDP mutexParticipantProxyData old_data_copy(*old_data);lock.unlock();RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener();if (listener != nullptr){bool should_be_ignored = false;{std::lock_guard<std::mutex> cb_lock(parent_pdp_->callback_mtx_);ParticipantDiscoveryInfo info(old_data_copy);info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT;listener->onParticipantDiscovery(parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(),std::move(info),should_be_ignored);}if (should_be_ignored){parent_pdp_->getRTPSParticipant()->ignore_participant(participant_guid.guidPrefix);}}}#ifdef FASTDDS_STATISTICS//! Addition or update of a participant proxy should trigger//! a connections update on the local participant connection listif (nullptr != parent_pdp_->getRTPSParticipant()->get_connections_observer()){parent_pdp_->getRTPSParticipant()->get_connections_observer()->on_local_entity_connections_change(parent_pdp_->getRTPSParticipant()->getGuid());}
#endif //FASTDDS_STATISTICS// Take again the reader lockreader->getMutex().lock();
}

PDPListener::process_alive_data中的处理分为两种情况:parent_pdp_->participant_proxies_中没有ParticipantProxyData和接收到PDP报文相同的guid(对应if(old_data == nullptr)分支);parent_pdp_->participant_proxies_中找到ParticipantProxyData和接收到PDP报文相同的guid。
第一种情况:通过调用parent_pdp_->createParticipantProxyData创建一个ParticipantProxyData对象用来保存新发现的participant的对象信息,再调用PDPSimple::assignRemoteEndpointsPDPSimple::assignRemoteEndpoints中调用PDPSimple::match_pdp_remote_endpointsPDPSimple::assign_low_level_remote_endpoints

void PDPSimple::match_pdp_remote_endpoints(const ParticipantProxyData& pdata,bool notify_secure_endpoints)
{
#if !HAVE_SECURITYstatic_cast<void>(notify_secure_endpoints);
#endif // !HAVE_SECURITYauto endpoints = static_cast<fastdds::rtps::SimplePDPEndpoints*>(builtin_endpoints_.get());const NetworkFactory& network = mp_RTPSParticipant->network_factory();bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast ||pdata.metatraffic_locators.unicast.empty();const uint32_t endp = pdata.m_availableBuiltinEndpoints;// Default to values for non-secure endpointsauto reliability_kind = BEST_EFFORT_RELIABILITY_QOS;uint32_t pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;uint32_t pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER;EntityId_t reader_entity_id = c_EntityId_SPDPReader;EntityId_t writer_entity_id = c_EntityId_SPDPWriter;RTPSReader* reader = endpoints->reader.reader_;RTPSWriter* writer = endpoints->writer.writer_;if (0 != (endp & pdp_writer_mask)){auto temp_writer_data = get_temporary_writer_proxies_pool().get();temp_writer_data->clear();temp_writer_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_writer_data->guid().entityId = writer_entity_id;temp_writer_data->persistence_guid(pdata.get_persistence_guid());temp_writer_data->set_persistence_entity_id(writer_entity_id);temp_writer_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_writer_data->m_qos.m_reliability.kind = reliability_kind;temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;{reader->matched_writer_add(*temp_writer_data);}}if (0 != (endp & pdp_reader_mask)){auto temp_reader_data = get_temporary_reader_proxies_pool().get();temp_reader_data->clear();temp_reader_data->m_expectsInlineQos = false;temp_reader_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_reader_data->guid().entityId = reader_entity_id;temp_reader_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_reader_data->m_qos.m_reliability.kind = reliability_kind;temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;{writer->matched_reader_add(*temp_reader_data);}if (BEST_EFFORT_RELIABILITY_QOS == reliability_kind){endpoints->writer.writer_->unsent_changes_reset();}}
}

这段代码主要设置了ProxyPool<WriterProxyData>ProxyPool<ReaderProxyData>这两个l临时对象用于存放PDP发现的Participant中的RTPSWriterRTPSReader对象,其中guidprefixpersistence_guidmetatraffic_locators都来自PDP报文中的值,其他都是固定赋值。然后使用这两个临时对象通过调用StatelessReader::matched_writer_add/StatelessWriter::matched_reader_add更新当前PDP的RTPSReaderRTPSWriter

bool StatelessReader::matched_writer_add(const WriterProxyData& wdata)
{ReaderListener* listener = nullptr;{std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);listener = mp_listener;for (RemoteWriterInfo_t& writer : matched_writers_){if (writer.guid == wdata.guid()){EPROSIMA_LOG_INFO(RTPS_READER, "Attempting to add existing writer, updating information");if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind &&writer.ownership_strength != wdata.m_qos.m_ownershipStrength.value){mp_history->writer_update_its_ownership_strength_nts(writer.guid, wdata.m_qos.m_ownershipStrength.value);}writer.ownership_strength = wdata.m_qos.m_ownershipStrength.value;if (nullptr != listener){// call the listener without the lock takenguard.unlock();listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(),&wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}}bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());bool is_datasharing = is_datasharing_compatible_with(wdata);RemoteWriterInfo_t info;info.guid = wdata.guid();info.persistence_guid = wdata.persistence_guid();info.has_manual_topic_liveliness = (MANUAL_BY_TOPIC_LIVELINESS_QOS == wdata.m_qos.m_liveliness.kind);info.is_datasharing = is_datasharing;info.ownership_strength = wdata.m_qos.m_ownershipStrength.value;if (is_datasharing){if (datasharing_listener_->add_datasharing_writer(wdata.guid(),m_att.durabilityKind == VOLATILE,mp_history->m_att.maximumReservedCaches)){EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId<< " with data sharing");}else{EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()<< " to " << this->m_guid.entityId<< " with data sharing.");return false;}}if (matched_writers_.emplace_back(info) == nullptr){EPROSIMA_LOG_WARNING(RTPS_READER, "No space to add writer " << wdata.guid() << " to reader " << m_guid);if (is_datasharing){datasharing_listener_->remove_datasharing_writer(wdata.guid());}return false;}EPROSIMA_LOG_INFO(RTPS_READER, "Writer " << wdata.guid() << " added to reader " << m_guid);add_persistence_guid(info.guid, info.persistence_guid);m_acceptMessagesFromUnkownWriters = false;// Intraprocess manages durability itselfif (is_datasharing && !is_same_process && m_att.durabilityKind != VOLATILE){// simulate a notification to force reading of transient changes// this has to be done after the writer is added to the matched_writers or the processing may faildatasharing_listener_->notify(false);}}if (liveliness_lease_duration_ < c_TimeInfinite){auto wlp = mp_RTPSParticipant->wlp();if ( wlp != nullptr){wlp->sub_liveliness_manager_->add_writer(wdata.guid(),liveliness_kind_,liveliness_lease_duration_);}else{EPROSIMA_LOG_ERROR(RTPS_LIVELINESS, "Finite liveliness lease duration but WLP not enabled");}}if (nullptr != listener){listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}

StatelessReader::matched_writer_add的核心就是如果已经存在于matched_writers_中则更新qos等数据,如果不存在则构造一个RemoteWriterInfo_t对象,用于保存PDP发现的participant中的writer的信息,并将其添加到当前服务发现的端点StatelessReadermatched_writers_中。

bool StatelessWriter::matched_reader_add(const ReaderProxyData& data)
{using fastdds::rtps::network::external_locators::filter_remote_locators;std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);std::unique_lock<LocatorSelectorSender> locator_selector_guard(locator_selector_);assert(data.guid() != c_Guid_Unknown);if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,[this, &data](ReaderLocator& reader){if (reader.remote_guid() == data.guid()){EPROSIMA_LOG_WARNING(RTPS_WRITER, "Attempting to add existing reader, updating information.");if (reader.update(data.remote_locators().unicast,data.remote_locators().multicast,data.m_expectsInlineQos)){filter_remote_locators(*reader.general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);update_reader_info(true);}return true;}return false;})){if (nullptr != mp_listener){// call the listener without locks takenlocator_selector_guard.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::CHANGED_QOS_READER, data.guid(), &data);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}// Get a locator from the inactive pool (or create a new one if necessary and allowed)std::unique_ptr<ReaderLocator> new_reader;if (matched_readers_pool_.empty()){size_t max_readers = matched_readers_pool_.max_size();if (getMatchedReadersSize() + matched_readers_pool_.size() < max_readers){const RemoteLocatorsAllocationAttributes& loc_alloc =mp_RTPSParticipant->getRTPSParticipantAttributes().allocation.locators;new_reader.reset(new ReaderLocator(this,loc_alloc.max_unicast_locators,loc_alloc.max_multicast_locators));}else{EPROSIMA_LOG_WARNING(RTPS_WRITER, "Couldn't add matched reader due to resource limits");return false;}}else{new_reader = std::move(matched_readers_pool_.back());matched_readers_pool_.pop_back();}// Add info of new datareader.new_reader->start(data.guid(),data.remote_locators().unicast,data.remote_locators().multicast,data.m_expectsInlineQos,is_datasharing_compatible_with(data));filter_remote_locators(*new_reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_.locator_selector.add_entry(new_reader->general_locator_selector_entry());if (new_reader->is_local_reader()){matched_local_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as local reader");}else if (new_reader->is_datasharing_reader()){matched_datasharing_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as data sharing");}else{matched_remote_readers_.push_back(std::move(new_reader));EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId<< " as remote reader");}update_reader_info(true);if (nullptr != mp_listener){// call the listener without locks takenlocator_selector_guard.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, data.guid(), &data);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}

StatelessWriter::matched_reader_add的更新类似,先构造一个ReaderLocator对象,ReaderLocator中保存了unicast和multicast的Locator_t用于发送向这些Locator_t发送PDP报文,还有其他需要的信息,将ReaderLocator对象保存在本地(分别对应matched_local_readers_matched_datasharing_readers_matched_remote_readers_),接着通过调用StatelessWriter::update_reader_info更新LocatorSelectorSender信息,并调用RTPSParticipantImpl::createSenderResources创建Sender Resource。sender resource的创建可以参考这里:TODO)

void StatelessWriter::update_reader_info(bool create_sender_resources)
{...update_cached_info_nts(locator_selector_);if (addGuid){compute_selected_guids(locator_selector_);}if (create_sender_resources){RTPSParticipantImpl* part = mp_RTPSParticipant;locator_selector_.locator_selector.for_each([part](const Locator_t& loc){part->createSenderResources(loc);});}
}

以上完成了通过PDP报文更新PDP数据的过程了。

EDP更新

PDPSimple::assign_low_level_remote_endpoints主要用于EDP对象的更新,实现如下:

void PDPSimple::assign_low_level_remote_endpoints(const ParticipantProxyData& pdata,bool notify_secure_endpoints)
{if (mp_EDP != nullptr){mp_EDP->assignRemoteEndpoints(pdata, notify_secure_endpoints);}...
}

这里主要关注通过调用EDPSimple::assignRemoteEndpoints实现对EDP对象的更新:

void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata,bool assign_secure_endpoints)
{EPROSIMA_LOG_INFO(RTPS_EDP, "New DPD received, adding remote endpoints to our SimpleEDP endpoints");const NetworkFactory& network = mp_RTPSParticipant->network_factory();uint32_t endp = pdata.m_availableBuiltinEndpoints;uint32_t auxendp;bool use_multicast_locators = !mp_PDP->getRTPSParticipant()->getAttributes().builtin.avoid_builtin_multicast ||pdata.metatraffic_locators.unicast.empty();auto temp_reader_proxy_data = get_temporary_reader_proxies_pool().get();temp_reader_proxy_data->clear();temp_reader_proxy_data->m_expectsInlineQos = false;temp_reader_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_reader_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_reader_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;temp_reader_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;auto temp_writer_proxy_data = get_temporary_writer_proxies_pool().get();temp_writer_proxy_data->clear();temp_writer_proxy_data->guid().guidPrefix = pdata.m_guid.guidPrefix;temp_writer_proxy_data->persistence_guid(pdata.get_persistence_guid());temp_writer_proxy_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators);temp_writer_proxy_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;temp_writer_proxy_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;if (auxendp != 0 && publications_reader_.first != nullptr) //Exist Pub Writer and i have pub reader{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Writer to my Pub Reader");temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPPubWriter;temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPPubWriter);publications_reader_.first->matched_writer_add(*temp_writer_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR;if (auxendp != 0 && publications_writer_.first != nullptr) //Exist Pub Detector{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Pub Reader to my Pub Writer");temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPPubReader;publications_writer_.first->matched_reader_add(*temp_reader_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;if (auxendp != 0 && subscriptions_reader_.first != nullptr) //Exist Pub Announcer{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Writer to my Sub Reader");temp_writer_proxy_data->guid().entityId = c_EntityId_SEDPSubWriter;temp_writer_proxy_data->set_persistence_entity_id(c_EntityId_SEDPSubWriter);subscriptions_reader_.first->matched_writer_add(*temp_writer_proxy_data);}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR;if (auxendp != 0 && subscriptions_writer_.first != nullptr) //Exist Pub Announcer{EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");temp_reader_proxy_data->guid().entityId = c_EntityId_SEDPSubReader;subscriptions_writer_.first->matched_reader_add(*temp_reader_proxy_data);}#if HAVE_SECURITYauxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER;if (auxendp != 0 && publications_secure_reader_.first != nullptr && assign_secure_endpoints){temp_writer_proxy_data->guid().entityId = sedp_builtin_publications_secure_writer;temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_publications_secure_writer);if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(publications_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,publications_secure_reader_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<publications_secure_reader_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR;if (auxendp != 0 && publications_secure_writer_.first != nullptr && assign_secure_endpoints){temp_reader_proxy_data->guid().entityId = sedp_builtin_publications_secure_reader;if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(publications_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,publications_secure_writer_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<publications_secure_writer_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER;if (auxendp != 0 && subscriptions_secure_reader_.first != nullptr && assign_secure_endpoints){temp_writer_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_writer;temp_writer_proxy_data->set_persistence_entity_id(sedp_builtin_subscriptions_secure_writer);if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer(subscriptions_secure_reader_.first->getGuid(), pdata.m_guid, *temp_writer_proxy_data,subscriptions_secure_reader_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<subscriptions_secure_reader_.first->getGuid());}}auxendp = endp;auxendp &= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR;if (auxendp != 0 && subscriptions_secure_writer_.first != nullptr && assign_secure_endpoints){EPROSIMA_LOG_INFO(RTPS_EDP, "Adding SEDP Sub Reader to my Sub Writer");temp_reader_proxy_data->guid().entityId = sedp_builtin_subscriptions_secure_reader;if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader(subscriptions_secure_writer_.first->getGuid(), pdata.m_guid, *temp_reader_proxy_data,subscriptions_secure_writer_.first->getAttributes().security_attributes())){EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " <<subscriptions_secure_writer_.first->getGuid());}}
#elsestatic_cast<void>(assign_secure_endpoints);
#endif // if HAVE_SECURITY}

这段代码中通过设置ProxyPool<ReaderProxyData>ProxyPool<WriterProxyData>两个临时对象,用于更新EDP的两对Endpoints端点,其中ProxyPool<WriterProxyData>用于更新publications_reader_subscriptions_reader_ProxyPool<ReaderProxyData>用于更新publications_writer_subscriptions_writer_。这里只更新对端的guid,和metatraffic_locators(单播地址,默认对端ip:7410),durability qos为TRANSIENT_LOCAL_DURABILITY_QOS,reliability qos为RELIABLE_RELIABILITY_QOS

bool StatefulReader::matched_writer_add(const WriterProxyData& wdata)
{assert(wdata.guid() != c_Guid_Unknown);ReaderListener* listener = nullptr;{std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);if (!is_alive_){return false;}listener = mp_listener;bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());bool is_datasharing = is_datasharing_compatible_with(wdata);for (WriterProxy* it : matched_writers_){if (it->guid() == wdata.guid()){EPROSIMA_LOG_INFO(RTPS_READER, "Attempting to add existing writer, updating information");// If Ownership strength changes then update all history instances.if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind &&it->ownership_strength() != wdata.m_qos.m_ownershipStrength.value){mp_history->writer_update_its_ownership_strength_nts(it->guid(), wdata.m_qos.m_ownershipStrength.value);}it->update(wdata);if (!is_same_process){for (const Locator_t& locator : it->remote_locators_shrinked()){getRTPSParticipant()->createSenderResources(locator);}}if (nullptr != listener){// call the listener without the lock takenguard.unlock();listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}}// Get a writer proxy from the inactive pool (or create a new one if necessary and allowed)WriterProxy* wp = nullptr;if (matched_writers_pool_.empty()){size_t max_readers = matched_writers_pool_.max_size();if (getMatchedWritersSize() + matched_writers_pool_.size() < max_readers){const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();wp = new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_);}else{EPROSIMA_LOG_WARNING(RTPS_READER, "Maximum number of reader proxies (" << max_readers << \") reached for writer " << m_guid);return false;}}else{wp = matched_writers_pool_.back();matched_writers_pool_.pop_back();}SequenceNumber_t initial_sequence;add_persistence_guid(wdata.guid(), wdata.persistence_guid());initial_sequence = get_last_notified(wdata.guid());wp->start(wdata, initial_sequence, is_datasharing);if (!is_same_process){for (const Locator_t& locator : wp->remote_locators_shrinked()){getRTPSParticipant()->createSenderResources(locator);}}if (is_datasharing){if (datasharing_listener_->add_datasharing_writer(wdata.guid(),m_att.durabilityKind == VOLATILE,mp_history->m_att.maximumReservedCaches)){matched_writers_.push_back(wp);EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId<< " with data sharing");}else{EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()<< " to " << this->m_guid.entityId<< " with data sharing.");{// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finishguard.unlock();assert(!guard.owns_lock());wp->stop();guard.lock();}matched_writers_pool_.push_back(wp);return false;}// Intraprocess manages durability itselfif (VOLATILE == m_att.durabilityKind){std::shared_ptr<ReaderPool> pool = datasharing_listener_->get_pool_for_writer(wp->guid());SequenceNumber_t last_seq = pool->get_last_read_sequence_number();if (SequenceNumber_t::unknown() != last_seq){SequenceNumberSet_t sns(last_seq + 1);send_acknack(wp, sns, wp, false);wp->lost_changes_update(last_seq + 1);}}else{// simulate a notification to force reading of transient changesdatasharing_listener_->notify(false);}}else{matched_writers_.push_back(wp);EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy " << wp->guid() << " added to " << m_guid.entityId);}}if (liveliness_lease_duration_ < c_TimeInfinite){auto wlp = this->mp_RTPSParticipant->wlp();if ( wlp != nullptr){wlp->sub_liveliness_manager_->add_writer(wdata.guid(),liveliness_kind_,liveliness_lease_duration_);}else{EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,"Finite liveliness lease duration but WLP not enabled, cannot add writer");}}if (nullptr != listener){listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}

如果在StatefulReader::matched_writers_中没有查找到PDP发现的guid则创建一个新的WriterProxy对象EDP阶段需要的数据。因为EDP是reliable模式,在WriterProxy的构造中会创建heartbeatacknack的定时器。

{//Create EventsResourceEvent& event_manager = reader_->getEventResource();auto heartbeat_lambda = [this]() -> bool{perform_heartbeat_response();return false;};auto acknack_lambda = [this]() -> bool{return perform_initial_ack_nack();};heartbeat_response_ = new TimedEvent(event_manager, heartbeat_lambda, 0);initial_acknack_ = new TimedEvent(event_manager, acknack_lambda, 0);clear();EPROSIMA_LOG_INFO(RTPS_READER, "Writer Proxy created in reader: " << reader_->getGuid().entityId);
}

接着创建发送资源sender resource。

bool StatefulWriter::matched_reader_add(const ReaderProxyData& rdata)
{using fastdds::rtps::network::external_locators::filter_remote_locators;if (rdata.guid() == c_Guid_Unknown){EPROSIMA_LOG_ERROR(RTPS_WRITER, "Reliable Writer need GUID_t of matched readers");return false;}std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);// Check if it is already matched.if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,[this, &rdata](ReaderProxy* reader){if (reader->guid() == rdata.guid()){EPROSIMA_LOG_INFO(RTPS_WRITER, "Attempting to add existing reader, updating information.");if (reader->update(rdata)){filter_remote_locators(*reader->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);filter_remote_locators(*reader->async_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);update_reader_info(locator_selector_general_, true);update_reader_info(locator_selector_async_, true);}return true;}return false;})){if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::CHANGED_QOS_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn false;}// Get a reader proxy from the inactive pool (or create a new one if necessary and allowed)ReaderProxy* rp = nullptr;if (matched_readers_pool_.empty()){size_t max_readers = matched_readers_pool_.max_size();if (getMatchedReadersSize() + matched_readers_pool_.size() < max_readers){const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();rp = new ReaderProxy(m_times, part_att.allocation.locators, this);}else{EPROSIMA_LOG_WARNING(RTPS_WRITER, "Maximum number of reader proxies (" << max_readers <<") reached for writer " << m_guid);return false;}}else{rp = matched_readers_pool_.back();matched_readers_pool_.pop_back();}// Add info of new datareader.rp->start(rdata, is_datasharing_compatible_with(rdata));filter_remote_locators(*rp->general_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);filter_remote_locators(*rp->async_locator_selector_entry(),m_att.external_unicast_locators, m_att.ignore_non_matching_locators);locator_selector_general_.locator_selector.add_entry(rp->general_locator_selector_entry());locator_selector_async_.locator_selector.add_entry(rp->async_locator_selector_entry());if (rp->is_local_reader()){matched_local_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as local reader");}else{if (rp->is_datasharing_reader()){matched_datasharing_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as data sharing");}else{matched_remote_readers_.push_back(rp);EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId<< " as remote reader");}}update_reader_info(locator_selector_general_, true);update_reader_info(locator_selector_async_, true);if (rp->is_datasharing_reader()){if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;}bool is_reliable = rp->is_reliable();if (is_reliable){SequenceNumber_t min_seq = get_seq_num_min();SequenceNumber_t last_seq = get_seq_num_max();RTPSMessageGroup group(mp_RTPSParticipant, this, rp->message_sender());// History not emptyif (min_seq != SequenceNumber_t::unknown()){(void)last_seq;assert(last_seq != SequenceNumber_t::unknown());assert(min_seq <= last_seq);try{// Late-joinerif (TRANSIENT_LOCAL <= rp->durability_kind() &&TRANSIENT_LOCAL <= m_att.durabilityKind){for (History::iterator cit = mp_history->changesBegin(); cit != mp_history->changesEnd(); ++cit){// Holes are managed when deliver_sample(), sending GAP messages.if (rp->rtps_is_relevant(*cit)){ChangeForReader_t changeForReader(*cit);// If it is local, maintain in UNSENT status and add to flow controller.if (rp->is_local_reader()){flow_controller_->add_old_sample(this, *cit);}// In other case, set as UNACKNOWLEDGED and expects the reader request them.else{changeForReader.setStatus(UNACKNOWLEDGED);}rp->add_change(changeForReader, true, false);}}}else{if (rp->is_local_reader()){intraprocess_gap(rp, min_seq, mp_history->next_sequence_number());}else{// Send a GAP of the whole history.group.add_gap(min_seq, SequenceNumberSet_t(mp_history->next_sequence_number()), rp->guid());}}// Always activate heartbeat period. We need a confirmation of the reader.// The state has to be updated.periodic_hb_event_->restart_timer(std::chrono::steady_clock::now() + std::chrono::hours(24));}catch (const RTPSMessageGroup::timeout&){EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");}}if (rp->is_local_reader()){intraprocess_heartbeat(rp);}else{send_heartbeat_nts_(1u, group, disable_positive_acks_);group.flush_and_reset();}}else{// Acknowledged all for best-effort reader.rp->acked_changes_set(mp_history->next_sequence_number());}EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy " << rp->guid() << " added to " << this->m_guid.entityId << " with "<< rdata.remote_locators().unicast.size() << "(u)-"<< rdata.remote_locators().multicast.size() <<"(m) locators");if (nullptr != mp_listener){// call the listener without locks takenguard_locator_selector_async.unlock();guard_locator_selector_general.unlock();guard.unlock();mp_listener->on_reader_discovery(this, ReaderDiscoveryInfo::DISCOVERED_READER, rdata.guid(), &rdata);}#ifdef FASTDDS_STATISTICS// notify monitor service so that the connectionlist for this entity// could be updatedif (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()){mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);}
#endif //FASTDDS_STATISTICSreturn true;
}

RTPSWriter中更新reader的逻辑类似,PDP的发现如果在StatefulWriter的匹配的reader集合中没有找到则新创建一个ReaderProxy对象.

EntityID

服务发现的EntityID是标准固定,回调函数中通过固定EntityID来调用不同的监听对象。这些固定的Entity ID定义在EntityId_t.hpp中,主要有以下:

const EntityId_t c_EntityId_Unknown = ENTITYID_UNKNOWN;
const EntityId_t c_EntityId_SPDPReader = ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER;
const EntityId_t c_EntityId_SPDPWriter = ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER;const EntityId_t c_EntityId_SEDPPubWriter = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
const EntityId_t c_EntityId_SEDPPubReader = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
const EntityId_t c_EntityId_SEDPSubWriter = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
const EntityId_t c_EntityId_SEDPSubReader = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;const EntityId_t c_EntityId_RTPSParticipant = ENTITYID_RTPSParticipant;const EntityId_t c_EntityId_WriterLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_WRITER;
const EntityId_t c_EntityId_ReaderLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_READER;const EntityId_t participant_stateless_message_writer_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
const EntityId_t participant_stateless_message_reader_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;const EntityId_t c_EntityId_TypeLookup_request_writer = ENTITYID_TL_SVC_REQ_WRITER;
const EntityId_t c_EntityId_TypeLookup_request_reader = ENTITYID_TL_SVC_REQ_READER;
const EntityId_t c_EntityId_TypeLookup_reply_writer = ENTITYID_TL_SVC_REPLY_WRITER;
const EntityId_t c_EntityId_TypeLookup_reply_reader = ENTITYID_TL_SVC_REPLY_READER;

http://www.mrgr.cn/news/70441.html

相关文章:

  • 全面介绍软件安全测试分类,安全测试方法、安全防护技术、安全测试流程
  • 打假官方咨询(续)
  • 1.每日SQL----2024/11/7
  • react-router-dom 库作用
  • 【基于轻量型架构的WEB开发】课程 作业4 AOP
  • C++线程
  • 如何防止技术泄密?企业的机密管控必需掌握的十个小窍门,守护数据安全无死角!【科普篇】
  • 产品设计理念:10个案例分享
  • Java异步编程CompletableFuture(串行,并行,批量执行)
  • 无人机动力测试台如何快速外接第三方传感器
  • 使用自定义LLM:RAGAs评估
  • Spring的XML配置:从“啊这...“到“啊,就这...“ --手写Spring第六篇了
  • 香港SEO服务器备份解决方案及注意事项
  • 紫光展锐携手上赞随身Wi-Fi,让5G触手可及
  • 【智谱开放平台-注册/登录安全分析报告】
  • 线代的几何意义(3)——行列式与矩阵的逆
  • 智能检测技术与传感器(热电传感器四个定律)
  • 安装阿里巴巴的Dragonwell(替代JDK)
  • C++的起源与发展
  • 5. 类加载子系统
  • 多模态融合-决策层融合
  • Git - 命令杂谈 - fetch与push
  • 如何在本地文件系统中预览 Vue 项目?
  • AVL 树的模拟实现(入门必看,图文并茂)
  • linux 下调试 mpu6050 三轴加速度
  • 某《财富》世界500强制造企业基于大模型实现财税智能问数