当前位置: 首页 > news >正文

netty之是如何做好服务准备的

写在前面

本文来看下netty是如何来启动服务,准备接收外部连接的。

1:正文

为了够好的理解和学习netty启动服务的过程,可以参考这篇文章看下Java NIO是如何来启动服务的。
我们使用netty源码的io.netty.example.echo.EchoServer来进行debug学习。

1.1:创建selector

在Java NIO中通过selector来完成监听通道事件的工作,如下:
在这里插入图片描述
执行代码EventLoopGroup bossGroup = new NioEventLoopGroup(1);,经过如下的代码之后会执行到创建selector的位置:
在这里插入图片描述

//io.netty.channel.nio.NioEventLoop#openSelector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {// ...// 打开selector,会通过不同平台的selectorprovider来进行实际的处理,从而适配不同操作系统底层的IO模型final SelectorTuple selectorTuple = openSelector();// 创建后赋值到成员变量以备使用this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

1.2:创建ServerSocketChannel并注册到EventLoop中的Selector

从代码ChannelFuture f = b.bind(PORT).sync();开始执行,其中的bind方法,执行到如下代码:

// io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {/*** 做以下的事情:* 1:创建ServerSocketChannel* 2:初始化* 3:将ServerSocketChannel注册到NioEventLoop中的Selector* 这里返回future,说明不是立即完成的,而是异步的,异步的原因是注册的工作是在EventLoop的线程中执行的,* 我认为这里就对应了reactor线程模型的主从多线程模型中的mainReactor的线程了。参考:https://dongyunqi.blog.csdn.net/article/details/142661665*/final ChannelFuture regFuture = initAndRegister();
}

initAndRegister:

final ChannelFuture initAndRegister() {Channel channel = null;try {// 工厂+泛型+反射方式创建对应的ServerSocketChannel(BIO的,OIO的,AIO的等)channel = channelFactory.newChannel();// 为有连接进入,创建socketchannel做准备init(channel);} catch (Throwable t) {// ...}// 完成socketchannel注册到selectorChannelFuture regFuture = config().group().register(channel);// ...
}

config().group().register(channel);最终执行到:

// io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// this:作为SelectionKey的attachment很重要,注册的op_code是0,似乎不是个有效的op_code,因为SelectionKey类中无该opcode// 其实就是java NIO 中执行代码:serverSocketChannel.register(selector, 0, this);selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {// ...}}
}

这样就完成serversocketchannel注册到selector的工作了,但是注意此时注册的op_code是0,而不是16(public static final int OP_ACCEPT = 1 << 4;),这里没有绑定的原因是此时serversocketchannel还没有绑定到端口,所以我们继续看是如何绑定到端口的。

1.3:ServerSocketChannel绑定端口并注册op_accept

书接上文,在注册成功后,会执行如下代码完成端口的绑定工作:

// io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {// ...// 立马注册完成了,一般不会if (regFuture.isDone()) {// ...doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 在regfuture中注册一个监听器,当initAndRegister方法注册完成时会触发调用regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 判断是否出错Throwable cause = future.cause();if (cause != null) {// ...} else {// ...// 可能执行这里,reg done的话也可能执行上面的doBind0,不论如何都是同一个方法,执行的实际不同而已doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}

通过执行doBind0方法就就可以完成绑定端口号的工作了:

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.// 在eventloop的线程中执行绑定channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}

执行到:

// io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// pipeline是个串行化的操作,就是个责任链return pipeline.bind(localAddress, promise);
}

因为pipelien上有很多的handler,所以我们直接找到最重要的handler来看绑定端口的核心逻辑:
在这里插入图片描述
执行到如下代码:

// io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);
}

最终执行到:

// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {// 真正的通过Java NIO完成端口绑定的地方if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}

这样子端口的绑定工作就完成了,但是依然还是没有注册op_accept事件,所以还是无法监听连接请求,接着来看。
在doBind绑定端口成功后,执行后续代码:

// io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...boolean wasActive = isActive();try {doBind(localAddress); // 完成端口的绑定,基于Java NIO的API完成} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}// 必须是不激活 -》 激活 状态,绑定端口成功就激活了if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}// ...
}

重点看pipeline.fireChannelActive();,我们还是重点看HeadContext的fireChannelActive方法,如下:

// io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();// 会完成op事件的注册readIfIsAutoRead();
}

readIfIsAutoRead方法执行到:

// io.netty.channel.AbstractChannel#read
public Channel read() {pipeline.read();return this;
}

又是pipeline,我们老套路执行到:

// io.netty.channel.DefaultChannelPipeline.HeadContext#read
public void read(ChannelHandlerContext ctx) {unsafe.beginRead();
}

最终执行到:

// io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;// 完成op事件的注册,对于serversocketchannel就是op_acceptfinal int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {logger.info("注册op事件:{}", (interestOps | readInterestOp));selectionKey.interestOps(interestOps | readInterestOp);}
}

这样就注册op_accept成功,serversocketchannel就可以等待accept客户端的连接了。

2:流程总结

主要做了如下的几件事:

1:创建selectorfinal SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;
2:创建serversocketchannelchannel = channelFactory.newChannel();
3:serversocketchannel注册到selectorselectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
4:绑定端口javaChannel().bind(localAddress, config.getBacklog());
5:注册op_accept事件selectionKey.interestOps(interestOps | readInterestOp);

3:两个线程

其中一个线程是启动服务的线程,如果是main函数启动则就是main线程,还有一个就是选出的eventloop中的线程,二者分别完成不同的工作,如下:

主线程:创建selctor,serversocketchannel,从event loop group中选择一个event loop。
eventloop线程:注册serversocketchannel到selector,绑定端口,注册op_accept事件。

写在后面

参考文章列表

实现一个基于nio的discard server 。


http://www.mrgr.cn/news/65250.html

相关文章:

  • PMP知识体系
  • 闪存学习_3:闪存SW(软件,software)
  • IMS 注册慢问题分析
  • Throughput
  • 常见的函数求导公式以及复合函数的求导公式
  • Linux命令行速查手册:快速参考与实践
  • 魔改Transformer!9种提速又提效的模型优化方案分享!
  • 【前端基础】盒子模型
  • Python实现Taran算法
  • 个人开发者没有公司或企业信息,如何注册成为商家开发调试小程序,在不同的小程序平台使用企业号的功能,例如:没有商户号,个人怎样接入微信支付?
  • 19种RAG结构
  • 「Mac畅玩鸿蒙与硬件18」鸿蒙UI组件篇8 - 高级动画效果与缓动控制
  • 如何建立一套完善的六西格玛黑带培训体系?
  • java的动态代理
  • OTFS基带通信系统(脉冲导频,信道估计,MP解调算法)
  • Linux 常用命令整理大全及命令使用心得
  • 薄膜与胶带展同期论坛:新质生产力下的薄膜与胶带工艺与材料之美
  • 风险分析方法-敏感性分析
  • leetcode刷题记录(二十)——383. 赎金信
  • 管家婆财贸ERP BB090.销售单指定客户控制超期应收款