启程Pulsar:深入剖析高速启动引擎,揭秘消息中间件巨兽的诞生
文章目录
- 一、简析
- 二、何时、如何触发启动
- 三、Broker启动流程
- 四、总结
一、简析
Broker的启动流程框架基本如下
- 触发启动
- 初始化
- 读取配置、检测、赋值
- 启动
- Bookie启动
- Broker启动
- 启动Netty
- 启动后台监控任务
二、何时、如何触发启动
Broker的启动基本都是靠维护人员主动触发的,入口是Broker提供的脚本 bin/pulsar
、bin/pulsar-daemon
。常见的启动指令有 bin/pulsar standalone
、 bin/pulsar broker
、bin/pulsar-daemon start broker
等,今天就从bin/pulsar broker
流程进行探讨。先来看看bin/pulsar
的脚本逻辑
pulsar_help() {cat <<EOF
//在这里可以看到pulsar支持的操作,当然也可以通过bin/pulsar help 指令进行查看
Usage: pulsar <command>
where command is one of://启动Broker服务broker Run a broker server//启动bookie服务bookie Run a bookie serverzookeeper Run a zookeeper serverconfiguration-store Run a configuration-store serverdiscovery Run a discovery serverproxy Run a pulsar proxywebsocket Run a web socket proxy serverfunctions-worker Run a functions worker serversql-worker Run a sql worker serversql Run sql CLIstandalone Run a broker server with local bookies and local zookeeperautorecovery Run an autorecovery service....
}//如果执行的是bin/pulsar broker则会走到这里,可以清楚的看到最终会调用PulsarBrokerStarter类进行启动
if [ $COMMAND == "broker" ]; thenPULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}exec $JAVA $LOG4J2_SHUTDOWN_HOOK_DISABLED $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; thenPULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.server.Main --conf $PULSAR_BOOKKEEPER_CONF $@
三、Broker启动流程
通过脚本能够看到是通过PulsarBrokerStarter进行启动,因此现在就直接从它的main方法进行跟踪吧
这里的初始化和启动两条链路都值得看,首先跟踪下初始化Broker的链路
可以看到初始化的过程中分别做了 配置加载、初始化Broker、初始化Bookkeeper以及AutoRecoveryMain服务等。接下来就就看服务启动链路。
可以看到启动入口很简洁,就是启动上面初始化好的Broker、Bookkeeper、AutoRecoveryMain,这里先看Broker的启动流程也就是276行
可以看到这个方法非常大,里面内容非常丰富,一起来详细看看
简单总结下这个方法,它主要创建了以下几个对象
-
CoordinationServiceImpl对象,用于协调Broker选主
-
BrokerService对象,这个是启动Broker对象的核心
-
LoadManager对象,用于管理Broker对象的负载均衡
-
SchemaStorage对象,负责处理读写schema的请求
-
OffloadPoliciesImpl,负责分层存储操作
-
并启动WebService服务,负责对外提供http服务
-
WorkerService服务,负责处理function计算操作
这里面的BrokerService是最核心的,在这里进去看下它创建的逻辑
此方法主要做了下面几件事
- 创建Netty服务端,用于处理生产者/消费者/代理的TCP请求
- 创建定期检测服务
- 不活跃的检测
- 消息过期检测
- 压缩检测
- 消费者检测
- 初始化五个Map容器
- 维护Topic对象
- 维护集群副本复制的客户端
- 维护连接当前Broker的管理流
- 维护Topic归属信息
- 维护多层级的Topic信息?
- 启动DelayedDeliveryTrackerLoader跟踪延迟消息
- 启动Broker拦截器BrokerEntryMetadataInterceptors
- 启动限额管理对象BundlesQuotas
- 启动Netty服务端(此时Pulsar服务具备处理所有客户端请求能力)
- 启动定期检测服务
- 不活跃的检测
- 消息过期检测
- 压缩检测
- 消费者检测
- 消息积压检测
四、总结
PulsarService是Pulsar服务启动的核心类,其内置了七大重要的对象如下图
- BrokerService: 核心是启动Netty,处理客户端的TCP连接,同时通过多个Map容器维护例如Topic信息、Topic归属信息等等,除此之外还启动一批定时线程定期检测(消息过期、压缩、客户端活跃等)
- LoadManager: 负责处理Broker服务的负载均衡
- WebService: 对外提供HTTP服务,例如管理流的操作(元数据)等
- CoordinationService: 给Broker提供协调服务,例如Broker选主操作
- SchemaStorage: 提供schema相关的一切服务,常见的就是schema的读写
- OffloadPolicies: 提供分层存储,冷数据自动迁移到外部服务中
- WorkerService: 管理Worker实例,用来执行Function计算任务
以上就是Pulsar启动流程所做的事情,其中Bookkeeper的启动以及其余的功能例如CoordinationService、SchemaStorage等等都值得单独新开一篇文章进行讲解,这里就不混在一起讲了。