netty之是如何做好服务准备的
写在前面
本文来看下netty是如何来启动服务,准备接收外部连接的。
1:正文
为了够好的理解和学习netty启动服务的过程,可以参考这篇文章看下Java NIO是如何来启动服务的。
我们使用netty源码的io.netty.example.echo.EchoServer来进行debug学习。
1.1:创建selector
在Java NIO中通过selector来完成监听通道事件的工作,如下:
执行代码EventLoopGroup bossGroup = new NioEventLoopGroup(1);
,经过如下的代码之后会执行到创建selector的位置:
//io.netty.channel.nio.NioEventLoop#openSelector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {// ...// 打开selector,会通过不同平台的selectorprovider来进行实际的处理,从而适配不同操作系统底层的IO模型final SelectorTuple selectorTuple = openSelector();// 创建后赋值到成员变量以备使用this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
1.2:创建ServerSocketChannel并注册到EventLoop中的Selector
从代码ChannelFuture f = b.bind(PORT).sync();
开始执行,其中的bind方法,执行到如下代码:
// io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {/*** 做以下的事情:* 1:创建ServerSocketChannel* 2:初始化* 3:将ServerSocketChannel注册到NioEventLoop中的Selector* 这里返回future,说明不是立即完成的,而是异步的,异步的原因是注册的工作是在EventLoop的线程中执行的,* 我认为这里就对应了reactor线程模型的主从多线程模型中的mainReactor的线程了。参考:https://dongyunqi.blog.csdn.net/article/details/142661665*/final ChannelFuture regFuture = initAndRegister();
}
initAndRegister:
final ChannelFuture initAndRegister() {Channel channel = null;try {// 工厂+泛型+反射方式创建对应的ServerSocketChannel(BIO的,OIO的,AIO的等)channel = channelFactory.newChannel();// 为有连接进入,创建socketchannel做准备init(channel);} catch (Throwable t) {// ...}// 完成socketchannel注册到selectorChannelFuture regFuture = config().group().register(channel);// ...
}
config().group().register(channel);
最终执行到:
// io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// this:作为SelectionKey的attachment很重要,注册的op_code是0,似乎不是个有效的op_code,因为SelectionKey类中无该opcode// 其实就是java NIO 中执行代码:serverSocketChannel.register(selector, 0, this);selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {// ...}}
}
这样就完成serversocketchannel注册到selector的工作了,但是注意此时注册的op_code是0,而不是16(public static final int OP_ACCEPT = 1 << 4;),这里没有绑定的原因是此时serversocketchannel还没有绑定到端口,所以我们继续看是如何绑定到端口的。
1.3:ServerSocketChannel绑定端口并注册op_accept
书接上文,在注册成功后,会执行如下代码完成端口的绑定工作:
// io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {// ...// 立马注册完成了,一般不会if (regFuture.isDone()) {// ...doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 在regfuture中注册一个监听器,当initAndRegister方法注册完成时会触发调用regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 判断是否出错Throwable cause = future.cause();if (cause != null) {// ...} else {// ...// 可能执行这里,reg done的话也可能执行上面的doBind0,不论如何都是同一个方法,执行的实际不同而已doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
通过执行doBind0方法就就可以完成绑定端口号的工作了:
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.// 在eventloop的线程中执行绑定channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
执行到:
// io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// pipeline是个串行化的操作,就是个责任链return pipeline.bind(localAddress, promise);
}
因为pipelien上有很多的handler,所以我们直接找到最重要的handler来看绑定端口的核心逻辑:
执行到如下代码:
// io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);
}
最终执行到:
// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {// 真正的通过Java NIO完成端口绑定的地方if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
这样子端口的绑定工作就完成了,但是依然还是没有注册op_accept事件,所以还是无法监听连接请求,接着来看。
在doBind绑定端口成功后,执行后续代码:
// io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...boolean wasActive = isActive();try {doBind(localAddress); // 完成端口的绑定,基于Java NIO的API完成} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}// 必须是不激活 -》 激活 状态,绑定端口成功就激活了if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}// ...
}
重点看pipeline.fireChannelActive();
,我们还是重点看HeadContext的fireChannelActive方法,如下:
// io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 会完成op事件的注册readIfIsAutoRead();
}
readIfIsAutoRead方法执行到:
// io.netty.channel.AbstractChannel#read
public Channel read() {pipeline.read();return this;
}
又是pipeline,我们老套路执行到:
// io.netty.channel.DefaultChannelPipeline.HeadContext#read
public void read(ChannelHandlerContext ctx) {unsafe.beginRead();
}
最终执行到:
// io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 完成op事件的注册,对于serversocketchannel就是op_acceptfinal int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {logger.info("注册op事件:{}", (interestOps | readInterestOp));selectionKey.interestOps(interestOps | readInterestOp);}
}
这样就注册op_accept成功,serversocketchannel就可以等待accept客户端的连接了。
2:流程总结
主要做了如下的几件事:
1:创建selectorfinal SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;
2:创建serversocketchannelchannel = channelFactory.newChannel();
3:serversocketchannel注册到selectorselectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
4:绑定端口javaChannel().bind(localAddress, config.getBacklog());
5:注册op_accept事件selectionKey.interestOps(interestOps | readInterestOp);
3:两个线程
其中一个线程是启动服务的线程,如果是main函数启动则就是main线程,还有一个就是选出的eventloop中的线程,二者分别完成不同的工作,如下:
主线程:创建selctor,serversocketchannel,从event loop group中选择一个event loop。
eventloop线程:注册serversocketchannel到selector,绑定端口,注册op_accept事件。
写在后面
参考文章列表
实现一个基于nio的discard server 。