OpenHarmony5.0分布式系统源码实现分析—软总线
一、引言
OpenHarmony 作为一款面向万物互联的操作系统,其分布式软总线(Distributed SoftBus)是实现设备间高效通信和协同的核心技术之一。分布式软总线通过构建一个虚拟的总线网络,使得不同设备能够无缝连接、通信和协同工作。本文将深入分析 OpenHarmony 5.0 中分布式软总线的实现原理,包括其架构设计、核心功能、实现机制以及实际应用。(只包含了软总线的一些核心内容,对于整体的框架还在整理中,例如分布式数据如何管理的、分布式的任务调度等等好多东西还没理清楚)
二、分布式软总线的架构设计
(一)架构层次
分布式软总线的架构可以分为以下几个层次:
- 应用层:提供给开发者使用的接口,用于实现设备间的通信和协同。
- 服务层:包括设备发现、连接管理、数据传输等功能模块。
- 协议层:定义了设备间通信的协议和数据格式。
- 传输层:负责设备间的数据传输,支持多种传输介质(如 Wi-Fi、蓝牙等)。
- 设备层:管理设备的硬件资源和网络接口。
在IPC Kit中也经常用Proxy表示服务请求方(客户端-Client),Stub表示服务提供方(服务端-Server)
软总线通常采用客户端-服务端(Client-Server)模型,在使用时,请求Client端进程可获取Server端所在进程的代理(Proxy),并通过此代理读写数据来实现进程间的数据通信,更具体的讲,首先客户端会建立一个服务端的代理对象,这个代理对象具备和服务端一样的功能,若想访问服务端中的某一个方法,只需访问代理对象中对应的方法即可,代理对象会将请求发送给服务端;然后服务端处理接受到的请求,处理完之后通过驱动返回处理结果给代理对象;最后代理对象将请求结果进一步返回给客户端。
如下图所示:
通常,Stub会先注册系统能力(System Ability)到系统能力管理者(System Ability Manager,缩写SAMgr)中,SAMgr负责管理这些SA并向Client提供相关的接口。Client要和某个具体的SA通信,必须先从SAMgr中获取该SA的代理Proxy对象,然后使用代理Proxy对象和SA通信。在整个通信过程中,如果使用的是IPC通信,则依赖的是Binder驱动,使用的是RPC通信,则依赖的是软总线驱动。
(二)核心组件
分布式软总线的核心组件包括:
- 设备发现模块:负责发现网络中的其他设备,并获取设备信息。
- 连接管理模块:负责建立、维护和断开设备间的连接。
- 数据传输模块:负责设备间的数据传输,支持多种数据类型(如字节流、消息等)。
- 安全认证模块:通过 HiChain 模块实现设备间的认证和加密通信。
分布式软总线组件主要代码目录即内容概述如下:
//foundation/communication/dsoftbus
├── adapter # 适配层代码为3类系统(轻量、小型和标准系统)提供不同丰富程度和不同能力的软总线特性
├── components # 依赖组件代码,主要存放软总线依赖的组件包含mbedtls和nstackx
├── core # 核心代码,对系统提供服务的服务器
│ ├── adapter # 适配层代码,不同的系统编译输出的目标文件和部署路径是不同的
│ ├── authentication # 认证代码
│ ├── bus_center # 组网代码
│ ├── common # 公共代码包括json工具、消息处理机制(message_handler)、队列管理(queue)、安全管理(security)、软总线属性配置(softbus_property)、日志管理(log)等
│ ├── connection # 连接代码
│ ├── discovery # 发现代码
│ ├── frame # 框架代码
│ └── transmission # 传输代码
├── interfaces # 对外接口代码
├── sdk # 运行业务进程代码,对外提供服务的客户端与系统提供服务的服务器(softbus_server)进行进程间通信,借助samgr进程的saManager服务向softbus_server发布服务、发现服务。
│ ├── bus_center # 组网模块代码
│ ├── discovery # 发现模块代码主要包含3个部分,service:为分布式服务接入或退出组网提供支持的一组接口;manager:管理发现的数据结构和生命周期,并为service接口的功能实现提供支持;ipc:为service接口的功能实现提供一个代理(proxy),以便远程访问softbus_server的服务
│ ├── frame # 框架模块代码包含三类系统,会对整个sdk的4个功能模块(框架、组网、发现和传输)进行初始化
│ └── transmission # 传输模块代码包含三部分:session:合并了service和manager的实现,为分布式服务提供会话相关的数据结构和会话生命周期管理接口的实现。trans_channel:为不同的会话类型提供具体的实现。ipc:为service接口的功能实现提供一个代理(proxy)以便远程访问softbus_server
├── tests # 测试代码
└── tools # 工具代码
- softbus_server:在本地通内,为设备的分布式虚拟化、分布式数据管理的实现等提供基础的通信能力服务(在有线网络、无线完了、蓝牙等通信媒介的基础上,为分布式设备的RPC提供支持),代码目录为foundation\communication\dsoftbus\core\frame\standard\init
- samgr:标准系统的samgr组件和dmsfwk组件的实现代码在\foundation\systemabilitymgr目录中。待分析总结
三、分布式软总线的核心功能
(一)设备发现
设备发现是分布式软总线的基础功能之一。通常通过广播机制(如 mDNS 或 UDP 广播)实现设备发现,设备可以自动发现网络中的其他设备,并获取其设备信息。设备发现的过程如下:
- 订阅设备信息:设备通过调用
StartDiscovery
接口,向网络中广播订阅请求。 - 接收设备信息:当其他设备收到订阅请求后,会将自身的设备信息发送给订阅者。
- 回调通知:订阅者通过回调函数接收设备信息,并进行更新设备信息。
- 订阅设备信息并设置回调
//它的作用是启动设备发现功能
int StartDiscovery(const char *packageName, const SubscribeInfo *info, const IDiscoveryCallback *cb)|--> if (InitSoftBus(packageName) != SOFTBUS_OK) {//初始化 SoftBus 客户端|--> if (CheckPackageName(packageName) != SOFTBUS_OK) {//校验服务包名称|--> if (SubscribeInfoCheck(info) != SOFTBUS_OK) {//校验订阅信息|--> int32_t ret = StartDiscoveryInner(packageName, info, cb);//调用核心发现功能|--> DfxRecordDiscServerEnd(SERVER_DISCOVERY, ret, packageName);//将返回值记录到日志中,并返回给调用者return ret;
后续流程
- 内核层处理: Binder 驱动将数据拷贝到目标进程的共享内存中。详细内容参考协议栈部分
- 服务端接收: 服务端通过
OnTransaction
处理接收到的事务数据。
- 接口说明
表1 Native侧IPC接口
类/接口 | 方法名 | 描述 |
---|---|---|
IRemoteBroker | sptr<IRemoteObject> AsObject() | 返回通信对象。Stub端返回RemoteObject对象本身,Proxy端返回代理对象。 |
IRemoteStub | virtual int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply, MessageOption &option) | 请求处理方法,派生类需要重写该方法用来处理Proxy的请求并返回结果。 |
IRemoteProxy | Remote()->SendRequest(code, data, reply, option) | 消息发送方法,业务的Proxy类需要从IRemoteProxy类派生,该方法用来向对端发送消息。 |
- 接收设备信息
(二)连接管理
连接管理是分布式软总线的重要功能之一。通过连接管理,设备可以建立、维护和断开与其他设备的连接。连接管理的过程如下:
- 创建会话服务器:设备通过调用
CreateSessionServer
接口,创建一个会话服务器。 - 打开会话:设备通过调用
OpenSession
接口,向目标设备发起连接请求。 - 接收连接请求:目标设备接收到连接请求后,通过回调函数通知应用层。
- 关闭会话:设备通过调用
CloseSession
接口,关闭与目标设备的连接。
-
创建会话服务器和接收连接请求
调用函数
CreateSessionServer
:创建会话服务器,监听连接请求。
//foundation\communication\dsoftbus\interfaces\kits\transport\session.h
//foundation\communication\dsoftbus\sdk\transmission\session\src\client_trans_session_service.c
//pkgName:应用程序包名 sessionName:会话名称 listener:会话监听器,用于处理连接事件
int CreateSessionServer(const char *pkgName, const char *sessionName, const ISessionListener *listener);|-->if (InitSoftBus(pkgName) != SOFTBUS_OK) {初始化软总线环境|-->if (AddClientPkgName(pkgName) != SOFTBUS_OK) {//将包名添加到客户端列表(g_pkgNameList)中|--> if (SoftBusTimerInit() != SOFTBUS_OK) {//初始化 SoftBus 的定时器模块|-->if (ClientModuleInit() != SOFTBUS_OK) {//初始化客户端模块包含事件(g_observerList)、总线(g_busCenterClient)、发现(g_discInfo)、连接、传输等|-->int ret = ClientAddSessionServer(SEC_TYPE_CIPHERTEXT, pkgName, sessionName, listener)//在客户端添加会话服务器|-->ClientSessionServer *server = GetNewSessionServer(type, sessionName, pkgName, listener)//创建客户端会话服务器(分配内存并赋初值)|-->ListAdd(&g_clientSessionServerList->list, &server->node)//添加客户端会话服务器的节点至g_clientSessionServerList|-->ret = ServerIpcCreateSessionServer(pkgName, sessionName);//通过 IPC(进程间通信)在服务器端创建会话服务器|-->sptr<TransServerProxy> proxy = RetryGetProxy();//函数获取一个代理对象(TransServerProxy)|-->return proxy->CreateSessionServer(pkgName, sessionName);//通过 IPC 机制与服务器端通信,请求创建一个会话服务器|-->sptr<IRemoteObject> remote = GetSystemAbility()//获取系统服务的远程对象|-->MessageParcel data;//4. 构造请求数据|-->if (!data.WriteInterfaceToken(GetDescriptor()))//写入接口令牌(InterfaceToken),用于标识调用的接口|-->int32_t ret = remote->SendRequest(SERVER_CREATE_SESSION_SERVER, data, reply, option);//向系统服务发送请求|-->if (!reply.ReadInt32(serverRet)) {//从响应数据中读取结果
创建会话服务器的主要流程如上为了便于理解可以结合下图的类图做分析
- 打开会话
调用 OpenSession
接口,向目标设备发起连接请求。
//foundation\communication\dsoftbus\interfaces\kits\transport\session.h
//foundation\communication\dsoftbus\sdk\transmission\ipc\standard\src\trans_server_proxy_standard.cpp
int OpenSession(const char *mySessionName, const char *peerSessionName, const char *peerNetworkId,const char *groupId, const SessionAttribute* attr)|-->SessionAttribute *tmpAttr = BuildParamSessionAttribute(attr)//构建会话属性(分配SessionAttribute内存,并初始化)|-->ret = ClientAddSession(¶m, &sessionId, &isEnabled);//在客户端添加会话|-->session = CreateNewSession(param);//根据 param 创建一个新的会话(SessionInfo)|-->ret = AddSession(param->sessionName, session);//将新创建的会话添加到客户端管理模块(g_clientSessionServerList)|-->ret = ServerIpcOpenSession(¶m, &transInfo);//将会话参数传递给服务器端|-->sptr<TransServerProxy> proxy = RetryGetProxy();// 获取服务器端的代理对象|-->int ret = proxy->OpenSession(param, info);//调用代理对象的 OpenSession 方法,将 SessionParam 和 TransInfo 传递给服务器端|-->ret = ClientSetChannelBySessionId(sessionId, &transInfo);//将通道信息与会话 ID 关联|-->return sessionId
为了便于理解可以结合下图的类图做分析
- 关闭会话
调用 CloseSession
接口,关闭与目标设备的连接。
//foundation\communication\dsoftbus\interfaces\kits\transport\session.h
//foundation\communication\dsoftbus\sdk\transmission\session\src\client_trans_session_service.c
void CloseSession(int sessionId)|-->ret = ClientGetChannelBySessionId(sessionId, &channelId, &type, NULL);//根据会话 ID 获取对应的通道信息,从g_clientSessionServerList中获取channelId和type|-->AddSessionStateClosing();//将当前会话的状态标记为“关闭中”|-->ret = ClientTransCloseChannel(channelId, type);//关闭与会话关联的通道,最终会根据不同的类型通过ipc总线发送对应关闭的消息|-->ret = ClientDeleteSession(sessionId);//从会话管理模块(g_clientSessionServerList)中删除指定的会话
- 移除会话服务器
RemoveSessionServer
:移除会话服务器。
//foundation\communication\dsoftbus\interfaces\kits\transport\session.h
//foundation\communication\dsoftbus\sdk\transmission\session\src\client_trans_session_service.c
int RemoveSessionServer(const char *pkgName, const char *sessionName)|-->int32_t ret = ServerIpcRemoveSessionServer(pkgName, sessionName);//通过 IPC 向服务器端发送请求,移除会话服务器|-->ret = ClientDeleteSessionServer(SEC_TYPE_CIPHERTEXT, sessionName);//在客户端删除会话服务器|-->DeleteFileListener(sessionName);|-->AnonymizeFree(tmpName);//清理与会话服务器相关的资源
(三)数据传输
数据传输是分布式软总线的核心功能之一。通过数据传输,设备可以将数据发送给其他设备。数据传输的过程如下:
- 发送数据:设备通过调用
SendBytes
或SendMessage
接口,将数据发送给目标设备。 - 接收数据:目标设备接收到数据后,通过回调函数通知应用层。在CreateSessionServer函数时传入的listener参数为回调函数。
-
发送数据
-
SendBytes
:发送字节数据。//foundation\communication\dsoftbus\interfaces\kits\transport\session.h //foundation\communication\dsoftbus\sdk\transmission\session\src\client_trans_message_service.c //`sessionId`:会话ID。`data`:待发送的数据。`len`:数据长度。 int SendBytes(int sessionId, const void *data, unsigned int len)|-->int ret = CheckPermissionState(sessionId);//检查g_clientSessionServerList中的当前会话是否有权限发送数据|-->ret = ClientGetChannelBySessionId(sessionId, &channelId, &channelType, &enableStatus);//通过 sessionId 从g_clientSessionServerList中获取与该会话相关的通道信息,包括通道 ID(channelId)、通道类型(channelType)和会话启用状态(enableStatus|-->ret = ClientGetChannelBusinessTypeBySessionId(sessionId, &businessType);//获取当前会话的业务类型(businessType)包含消息、字节、数据流等|-->UpdateChannelStatistics(sessionId, len);//更新通道的统计信息(例如发送的数据量)|-->return ClientTransChannelSendBytes(channelId, channelType, data, len);//通过指定的通道发送数据|-->switch (channelType) {|-->case CHANNEL_TYPE_AUTH:ret = TransAuthChannelSendBytes(channelId, data, len);//通过认证通道发送数据(g_serverProxy->SendMessage)|-->case CHANNEL_TYPE_PROXY:ret = TransProxyChannelSendBytes(channelId, data, len);//通过代理通道发送数据(g_serverProxy->SendMessage)|-->case CHANNEL_TYPE_TCP_DIRECT:ret = TransTdcSendBytes(channelId, data, len);//通过 TCP 直连通道发送数据(send)
发送数据接口除了SendBytes外还有SendMessage和SendStream,大体的流程差不多,就不再单独说明了。
示例
为了更好的理解从oh的源码中我们可以看到在transmission模块中包含一个demo示例,其中包含一个sendbytes_message_demo.c的文件,如下(简化了一下):
- 设备A:
#include <stdio.h>
#include "session.h"
#include "softbus_config_type.h"const char *g_pkgNameA = "dms"; // Application bundle name of device A
const char *g_sessionNameA = "ohos.distributedschedule.dms.test"; // Session name of device A// Network ID generated by device B after devices A and B are networked
const char *g_networkidB = "ABCDEF00ABCDEF00ABCDEF00ABCDEF00ABCDEF00ABCDEF00ABCDEF00ABCDEF00";
const char *g_groupid = "TEST_GROUP_ID"; // Group ID
static SessionAttribute g_sessionAttr = {.dataType = TYPE_BYTES, // Session type
};// Notify that the session is set up successfully.
static int OnSessionOpened(int sessionId, int result)
{printf("session opened,sesison id = %d\r\n", sessionId);return 0;
}// Notify that the session is closed.
static void OnSessionClosed(int sessionId)
{printf("session closed, session id = %d\r\n", sessionId);
}// Notify that the byte data is received.
static void OnBytesReceived(int sessionId, const void *data, unsigned int len)
{printf("session bytes received, session id = %d\r\n", sessionId);
}// Notify that the message is received.
static void OnMessageReceived(int sessionId, const void *data, unsigned int len)
{printf("session msg received, session id = %d\r\n", sessionId);
}static ISessionListener g_sessionlistenerA = {.OnSessionOpened = OnSessionOpened,.OnSessionClosed = OnSessionClosed,.OnBytesReceived = OnBytesReceived,.OnMessageReceived = OnMessageReceived,
};int main(void)
{/** 1. Device A calls CreateSessionServer() to create a session server based on the application bundle name and* session name, and registers the callbacks for session opened, session closed, byte received, and message* received.*/int ret = CreateSessionServer(g_pkgNameA, g_sessionNameA, &g_sessionlistenerA);printf("create session server result = %d\n", ret);/** 2. Device A calls OpenSession() to open a session based on the local session name,* peer session name, and peer network ID and determine the session channel based on the session type.* When the session is opened, a callback will be invoked to notify devices A and B.* A session ID is returned for subsequent data sending.*/int sessionId = OpenSession(g_sessionNameA, g_sessionNameB, g_networkidB, g_groupid, &g_sessionAttr);printf("open session result = %d\n", sessionId);/* 3. Device A calls SendBytes() to send byte data or calls SendMessage() to send messages to device B. */const char *data = "testdata";uint32_t len = strlen(data);ret = SendBytes(sessionId, data, len);printf("send bytes result = %d\n", ret);ret = SendMessage(sessionId, data, len);printf("send message result = %d\n", ret);/* 4. After data transmission is complete, device A calls CloseSession() to close the session* and instructs device B to close the session.*/CloseSession(sessionId);printf("SOFTBUS_OK");/* 5. After the session is closed, devices A and B call RemoveSessionServer() to remove the session server. */ret = RemoveSessionServer(g_pkgNameA, g_sessionNameA);printf("remove session server result = %d\n", ret);
}
- 设备B
#include <stdio.h>
#include "session.h"const char *g_pkgNameB = "dmsB"; // Application bundle name of device B
const char *g_sessionNameB = "ohos.distributedschedule.dms.testB"; // Session name of device Bstatic int OnSessionOpened(int sessionId, int result)
{printf("session opened,sesison id = %d\r\n", sessionId);return 0;
}static void OnSessionClosed(int sessionId)
{printf("session closed, session id = %d\r\n", sessionId);
}static void OnBytesReceived(int sessionId, const void *data, unsigned int len)
{printf("session bytes received, session id = %d\r\n", sessionId);
}static void OnMessageReceived(int sessionId, const void *data, unsigned int len)
{printf("session msg received, session id = %d\r\n", sessionId);
}static ISessionListener g_sessionlistenerB = {.OnSessionOpened = OnSessionOpened,.OnSessionClosed = OnSessionClosed,.OnBytesReceived = OnBytesReceived,.OnMessageReceived = OnMessageReceived,
};int main(void)
{/** 1. Device B calls CreateSessionServer() to create a session server based on* the application bundle name and session name, and registers the callbacks for* session opened, session closed, byte received, and message received.*/int ret = CreateSessionServer(g_pkgNameB, g_sessionNameB, &g_sessionlistenerB);printf("create session server result = %d\n", ret);/** 2. Upon receiving the session open notification via OnSessionOpened(), device B waits for device A to send data.* When receiving data, device B returns the receiving status via OnBytesReceived() or OnMessageReceived().*//* 3. When the data is received, device B closes the session and removes the session server. */ret = RemoveSessionServer(g_pkgNameB, g_sessionNameB);printf("remove session server result = %d\n", ret);
}
四、分布式软总线的实现机制
(一)协议栈
分布式软总线的协议栈包括多个层次,从下到上依次为:
- 物理层:负责设备间的物理连接,支持多种传输介质(如 Wi-Fi、蓝牙等)。
- 链路层:负责设备间的链路建立和维护。
- 网络层:负责设备间的寻址和路由。
- 传输层:负责设备间的数据传输,支持多种传输协议(如 TCP、UDP 等)。
- 应用层:负责设备间的应用层通信,支持多种数据格式(如字节流、消息等)。
(二)安全机制
分布式软总线的安全机制通过 HiChain 模块实现。HiChain 模块为 OpenHarmony 提供设备认证能力,支持通过点对点认证方式创建可信群组。在设备连接过程中,软总线会调用 HiChain 模块的认证接口,与对端设备进行认证操作。只有通过认证的设备才能加入可信群组,从而保障设备间通信的安全性。
详细的可参考源码中的说明文档"device-dev\security\security-guidelines-overall.md"
五、分布式软总线的发布与连接
此部分内容与设备发现模块比较容易混淆,有此困惑的可以参考这篇博文
(一)设备发现与连接
设备发现与连接是分布式软总线的基本应用场景之一。通过设备发现,设备可以自动发现网络中的其他设备,并获取其设备信息。然后,设备可以通过连接管理模块,建立与其他设备的连接。例如,在智能家居场景中,用户可以通过手机发现家中的智能设备,并与之建立连接。
- 发布流程
- 上层应用需要对外发布自身能力时,调用服务发布接口发布自身能力。
//foundation\communication\dsoftbus\interfaces\kits\bus_center\softbus_bus_center.h
//foundation\communication\dsoftbus\sdk\bus_center\service\src\client_bus_center.c
int32_t PublishLNN(const char *pkgName, const PublishInfo *info, const IPublishCb *cb)|-->int32_t ret = CommonInit(pkgName);//进行公共初始化操作,包括资源分配、环境检查等|-->if (PublishInfoCheck(info) != SOFTBUS_OK) {//验证 info 参数的合法性,确保发布信息符合要求|-->ret = PublishLNNInner(pkgName, info, cb);//执行实际的发布操作,可能涉及网络配置、广播发送等|-->g_busCenterClient.publishCb = *cb;//在后续的发布过程中使用这些回调函数来通知调用者发布状态的变化|-->int32_t ret = ServerIpcPublishLNN(pkgName, info);//通过 IPC机制将发布请求发送到服务端进行处理|-->int32_t ret = g_serverProxy->PublishLNN(pkgName, info)|-->if (AddDiscPublishMsg(pkgName, info) != SOFTBUS_OK) {//将发布消息的相关信息添加到记录中,用于后续的状态管理或调试
在梳理代码时可参考以下类图:
上层应用不再需要对外发布自身能力时,调用StopPublishLNN接口注销服务。
- 发现流程
- 上层应用需要发现特定能力设备时,调用发现接口启动发现。
//foundation\communication\dsoftbus\interfaces\kits\bus_center\softbus_bus_center.h
//foundation\communication\dsoftbus\sdk\bus_center\service\src\client_bus_center.c
int32_t RefreshLNN(const char *pkgName, const SubscribeInfo *info, const IRefreshCallback *cb);|-->int32_t ret = CommonInit(pkgName);//公共初始化操作,包括资源分配、环境检查等|-->if (SubscribeInfoCheck(info) != SOFTBUS_OK) {//验证 info 参数的合法性,确保订阅信息符合要求|-->ret = RefreshLNNInner(pkgName, info, cb);//执行实际的刷新操作,可能涉及重新发送发现请求、更新设备列表等|-->g_busCenterClient.refreshCb = *cb;//刷新过程中使用这些回调函数来通知调用者刷新状态的变化|-->int32_t ret = ServerIpcRefreshLNN(pkgName, info);//通过 IPC机制将刷新请求发送到服务端进行处理|-->if (AddDiscSubscribeMsg(pkgName, info) != SOFTBUS_OK) {//将订阅消息的相关信息添加到记录中
在梳理代码时可参考以下类图:
上层应用不再需要发现时,调用StopRefreshLNN接口停止设备发现。
(二)设备连接
-
发起组网请求,携带组网连接地址信息,并且提供组网执行结果回调函数。
// 组网连接地址 typedef struct {ConnectionAddrType type;union {struct BrAddr {char brMac[BT_MAC_LEN];} br;struct BleAddr {char bleMac[BT_MAC_LEN];uint8_t udidHash[UDID_HASH_LEN];} ble;struct IpAddr {char ip[IP_STR_MAX_LEN];uint16_t port; } ip;} info;char peerUid[MAX_ACCOUNT_HASH_LEN]; } ConnectionAddr;// 组网连接地址类型 typedef enum {CONNECTION_ADDR_WLAN = 0,CONNECTION_ADDR_BR,CONNECTION_ADDR_BLE,CONNECTION_ADDR_ETH,CONNECTION_ADDR_MAX } ConnectionAddrType;// 组网请求执行结果回调 typedef void (*OnJoinLNNResult)(ConnectionAddr *addr, const char *networkId, int32_t retCode);
//foundation\communication\dsoftbus\interfaces\kits\bus_center\softbus_bus_center.h
//foundation\communication\dsoftbus\sdk\bus_center\service\src\client_bus_center.c
int32_t JoinLNN(const char *pkgName, ConnectionAddr *target, OnJoinLNNResult cb)|-->int32_t ret = CommonInit(pkgName);//进行公共初始化操作,包括资源分配、环境检查等|-->ret = JoinLNNInner(pkgName, target, cb);//执行实际的加入操作,可能涉及通过网络协议向目标设备发送加入请求|-->if (FindJoinLNNCbItem(target, cb) != NULL) {//检查是否已存在相同的加入请求|-->rc = ServerIpcJoinLNN(pkgName, target, sizeof(*target));//通过 IPC 机制向服务端发送加入请求|-->int32_t ret = g_serverProxy->JoinLNN(pkgName, addr, addrTypeLen)|-->rc = AddJoinLNNCbItem(target, cb);//将加入请求的回调信息添加到链表中,以便后续通知
-
等待组网结果,JoinLNN()返回成功表示软总线接受了组网请求,组网结果通过回调函数通知业务;组网回调函数中addr参数内容和JoinLNN()的入参互相匹配;retCode如果为0,表示组网成功,此时networkId为有效值,后续传输、退网等接口均需使用该参数;retCode如果不为0,表示组网失败,此时networkId为无效值。
-
使用传输相关接口进行数据传输。
-
发送退网请求,携带组网成功后返回的networkId,并且提供退网执行结果回调。
// 退网执行结果回调 typedef void (*OnLeaveLNNResult)(const char *networkId, int32_t retCode);// 退网请求 int32_t LeaveLNN(const char *pkgName, const char *networkId, OnLeaveLNNResult cb);
-
等待退网完成,OnLeaveLNNResult()的networkId和退网请求接口中的networkId互相匹配;retCode为0表示退网成功,否则退网失败。退网成功后,networkId变为无效值,后续不应该被继续使用。
-
使用节点(即设备)注册和注销接口,监听网络中节点状态变化等事件。
// 事件掩码 #define EVENT_NODE_STATE_ONLINE 0x1 #define EVENT_NODE_STATE_OFFLINE 0x02 #define EVENT_NODE_STATE_INFO_CHANGED 0x04 #define EVENT_NODE_STATUS_CHANGED 0x08 #define EVENT_NODE_STATE_MASK 0xF// 节点信息 typedef struct {char networkId[NETWORK_ID_BUF_LEN];char deviceName[DEVICE_NAME_BUF_LEN];uint16_t deviceTypeId; } NodeBasicInfo;// 节点状态事件回调 typedef struct {uint32_t events; // 组网事件掩码void (*onNodeOnline)(NodeBasicInfo *info); // 节点上线事件回调void (*onNodeOffline)(NodeBasicInfo *info); // 节点下线事件回调void (*onNodeBasicInfoChanged)(NodeBasicInfoType type, NodeBasicInfo *info); // 节点信息变化事件回调void (*onNodeStatusChanged)(NodeStatusType type, NodeStatus *status); // 设备运行状态变化事件回调 } INodeStateCb;// 注册节点状态事件回调 int32_t RegNodeDeviceStateCb(const char *pkgName, INodeStateCb *callback);// 注销节点状态事件回调 int32_t UnregNodeDeviceStateCb(INodeStateCb *callback);
六、总结
OpenHarmony 5.0 的分布式软总线通过构建一个虚拟的总线网络,实现了设备间的高效通信和协同。其架构设计合理,核心功能强大,实现机制先进,实际应用广泛。分布式软总线为 OpenHarmony 的分布式特性提供了强大的支持,为万物互联提供了坚实的技术基础。以上内容基于 OpenHarmony 5.0 分布式软总线的相关技术文档和代码实现进行分析,只涵盖了软总线的一些核心内容,对于整体的框架还在整理中。。
参考资料
zh-cn\application-dev\ipc
zh-cn\readme\分布式软总线子系统.md
简析OpenHarmony软总线能力
HarmonyOS 分布式软总线架构组成 分布式软总线 鸿蒙