rocketmq-netty通信设计-request和response
1、NettyRemotingServer启动分析
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});
ServerBootstrap是netty作为服务端的启动入口。首先绑定了两个线程池。也就是boss线程池、worker线程池。
这里分析,也就是eventLoopGroupBoss和eventLoopGroupSelector
ServerBootstrap启动分析
查看初始化源码
if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});
} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});
}
不同平台,实现方式不一样。linux上会使用Epoll,其他会使用java提供的跨平台的NIO
eventLoopGroupBoss默认线程数是1
eventLoopGroupSelector默认线程数是3
再看defaultEventExecutorGroup。
.childHandler()方式指定了channelHandle的线程池。看线程名称NettyServerCodecThread_,可猜测该线程池主要功能是做编码解码。
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
目的是做TSL的加解密 - .addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
encoder是做编码。将本地msg转成byte[],传输到远端
NettyDecoder是做解码,将远端byte[]解码成本地msg。这里比较关键。将请求体转换成RemotingCommand类。详情看org.apache.rocketmq.remoting.netty.NettyDecoder#decode
connectionManageHandler 看代码可知,是channel有特定事件时打印日志。注册、取消注册、激活、取消激活等
serverHandler就是业务的request、response处理类。处理RemotingCommand消息
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}
查看NettyServerHandler 的处理流程
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}
对于request,会根据cmd.getCode()来获取对应的NettyRequestProcessor。找不到的话,会找默认的。对于nameserver,它就是设置了一个默认,处理所有的请求。对于broker,有多个NettyRequestProcessor。
NettyRequestProcessor注册到org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processorTable时,会绑定一个线程池
Pair<NettyRequestProcessor, ExecutorService>。RemotingCommand任务提交给该线程池处理
/*** This container holds all processors per request code, aka, for each incoming request, we may look up the* responding processor in this map to handle the request.*/protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
总结服务端启动后netty的线程模型
默认情况下:
eventLoopGroupBoss线程池1个线程,负责channel的链接
eventLoopGroupSelector线程池3个线程,负责IO处理
defaultEventExecutorGroup线程池8个线程,负责请求的RemotingCommand编解码
NettyRequestProcessor绑定的ExecutorService,默认8个线程,负责处理RemotingCommand的业务处理
证据如下。以下是nameServer的启动线程截图。RemotingExecutorThread是NettyRequestProcessor绑定的
2、NettyRemotingClient启动分析
查看启动代码
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),new NettyClientHandler());}});if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);if (this.channelEventListener != null) {this.nettyEventExecutor.start();}}
从Bootstrap的创建可知,和Server是很类似的。
区别是Bootstrap没有selector线程池。业务处理类是NettyClientHandler
3、Product发送消息,从namesrv获取topic的路由信息
Product发送端分析、编码
用rocketmq源码的例子,稍作变更:
org/apache/rocketmq/example/quickstart/Producer.java
producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {try {/** Create a message instance, specifying topic, tag and message body.*/Message msg = new Message(TOPIC /* Topic */,TAG /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);/** Call send message to deliver message to one of brokers.*/SendResult sendResult = producer.send(msg);
producer.send(msg)最终会调用如下方法
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageSync
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl
private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response, addr);
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);}}
将msg包装成RemotingCommand,channel是namesrv服务端的远程地址
源码所见,channel.writeAndFlush(request)。将请求发送到namesrv
发送前,还需要做编码处理。下图是debug下,编码的处理。请求是105
org.apache.rocketmq.common.protocol.RequestCode#GET_ROUTEINFO_BY_TOPIC
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
org.apache.rocketmq.remoting.netty.NettyEncoder#encode
namesrv端解码,消息处理
接收到105请求。namesrv做解码操作。消息体转换成RemotingCommand处理
org.apache.rocketmq.remoting.netty.NettyDecoder#decode
io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0处理
class NettyServerHandler extends SimpleChannelInboundHandler {
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}
}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
通用的一个请求命令处理
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}
processRequestCommand就会通用的处理请求,转发到NettyRequestProcessor处理,
并且用绑定的ExecutorService线程池去执行任务---->pair.getObject2().submit(requestTask);
响应结果,会调用callback处理
ctx.writeAndFlush(response);
namesrv启动时注册NettyRequestProcessor、ExecutorService
org.apache.rocketmq.namesrv.NamesrvController#registerProcessor
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}}
remotingExecutor的初始化
this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
如上所说,processRequestCommand找到DefaultRequestProcessor,执行到以下位置
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
再贴一下处理topic获取routInfo源码
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content;Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();if (request.getVersion() >= Version.V4_9_4.ordinal() || (null != standardJsonOnly && standardJsonOnly)) {content = topicRouteData.encode(SerializerFeature.BrowserCompatible,SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,SerializerFeature.MapSortField);} else {content = RemotingSerializable.encode(topicRouteData);}response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}
总结
通过以上三个大的小节,已经梳理完成rocketmq使用netty的处理过程。
举了Product发送消息的例子,组装成RemotingCommand命令,经过编码,变成byte[]数组,发送到namesrv
namesrv接收到byte[]数组,解码成RemotingCommand。再交给NettyRequestProcessor处理。根据code,选择对应的请求类型处理
响应结果也组装成RemotingCommand命令,经过编码,变成byte[]数组,返回到Product
后续
之前的文章,已分析完成 consumer、namesrv、Product。剩余broker,最复杂的放到最后分析。
后续就是broker的分析文章。