当前位置: 首页 > news >正文

深入拆解TomcatJetty——Tomcat如何实现IO多路复用

深入拆解Tomcat&Jetty

专栏地址: 极客时间-深入拆解Tomcat & Jetty

IO 多路复用

当用户线程发起 I/O 操作后,网络数据读取操作会经历两个步骤:

  • 用户线程等待内核将数据从网卡拷贝到内核空间。
  • 内核将数据从内核空间拷贝到用户空间。

IO 多路复用 是 Linux 五种 IO 模型中的一种,逻辑如下图:

image-20241025140303833

这时用户线程读取数据分为了两步:

  • 间断的发起 select 调用询问内核数据是否已经准备好
  • 在数据就绪后发起 read 系统调用,注意在数据从内核空间拷贝到用户空间时,线程依然是阻塞的

多路复用体现在一次 select 调用可以查询多个 数据通道(channel) 上的数据是否已经准备好

Tomcat 如何实现多路复用模型

对于 Java 的多路复用器的使用,无非是两步:

  • 创建一个 Selector,在它身上注册各种感兴趣的事件,然后调用 select 方法,等待感兴趣的事情发生。
  • 感兴趣的事情发生了,比如可以读了,这时便创建一个新的线程从 Channel 中读数据。

Tomcat 的 NioEndpoint 组件虽然实现比较复杂,但基本原理就是上面两步。它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor 和 Executor 共 5 个组件,它们的工作过程如下图所示:

img

  • LimitLatch:连接控制器,它负责控制最大连接数,NIO 模式下默认是 10000,达到这个阈值后,连接请求被拒绝(这里的拒绝指的是应用层意义上的拒绝,操作系统依然会接收 socket 连接,直到等待队列满)。
  • Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。
  • Poller 的本质是一个 Selector,也跑在单独线程里。Poller 在内部维护一个 Channel 数组,它在一个死循环里不断检测 Channel 的数据就绪状态,一旦有 Channel 可读,就生成一个 SocketProcessor 任务对象扔给 Executor 去处理。
  • Executor 就是线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据。Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

LimitLatch(org.apache.tomcat.util.threads.LimitLatch)

部分核心代码如下:

public class LimitLatch {private class Sync extends AbstractQueuedSynchronizer {@Overrideprotected int tryAcquireShared() {long newCount = count.incrementAndGet();if (newCount > limit) {count.decrementAndGet();return -1;} else {return 1;}}@Overrideprotected boolean tryReleaseShared(int arg) {count.decrementAndGet();return true;}}private final Sync sync;private final AtomicLong count;private volatile long limit;//线程调用这个方法来获得接收新连接的许可,线程可能被阻塞public void countUpOrAwait() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒public long countDown() {sync.releaseShared(0);long result = getCount();return result;}}
  • 内部类 Sync 继承了 AQS,AQS 是 Java 并发包中的一个核心类,它在内部维护一个状态和一个线程队列,可以用来控制线程什么时候挂起,什么时候唤醒。
  • 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。而这个方法实际会调用拓展类所重写的 tryAcquireShared 方法,它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1。
  • 如果用户线程被阻塞到了 AQS 的队列,同样是由 Sync 内部类决定唤醒,Sync 重写了 AQS 的 tryReleaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

Acceptor(org.apache.tomcat.util.net.Acceptor)

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。

一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。初始化过程如下:

// org.apache.tomcat.util.net.NioEndpoint.initServerSocket()
serverSock = ServerSocketChannel.open();
// getAcceptCount() Acceptor负责从ACCEPT队列中取出连接,当Acceptor处理不过来时,连接就堆积在ACCEPT队列中,默认100
serverSock.socket().bind(addr, getAcceptCount());
serverSock.configureBlocking(true);

ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的“生产者 - 消费者”模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

NioEndpoint.start->startInternal()->startAcceptorThread() {new Thread(acceptor, threadName).start()} -> Acceptor.run(){ socket = endpoint.serverSocketAccept(); endpoint.setSocketOptions(socket); } ->NioEndpoint.setSocketOptions(socke){ NioSocketWrapper socketWrapper = new Nio2SocketWrapper(channel, this); poller.register(); } ->NioEndpoint.register(socketWrapper){ addEvent(new PollerEvent(socketWrapper, OP_REGISTER)); } ->NioEndpoint.events.offer(event){ Poller.events.offer(event); }

Poller(org.apache.tomcat.util.net.NioEndpoint.Poller)

Poller 本质是一个 Selector,它内部维护一个 Queue,这个 Queue 定义如下:

private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行(Tomcat9只有一个线程在运行,NioEndpoint#startInternal()),每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

  • Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。
  • Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。

SocketProcessor(org.apache.tomcat.util.net.NioEndpoint.SocketProcessor)

Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,(这里是 SocketProcessorBase 实现了 Runnable 接口,在 run 方法里调用了抽象方法 doRun,SocketProcessor 继承了它并重写了 doRun 方法),用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象(Http11Processor#service())。

这里请注意:Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

Executor

Executor 是 Tomcat 定制版的线程池,它负责创建真正干活的工作线程,干什么活呢?就是执行 SocketProcessor 的 run 方法,也就是解析请求并通过容器来处理请求,最终会调用到 Servlet。

如何实现高并发

高并发就是能快速地处理大量的请求,需要合理设计线程模型让 CPU 忙起来,尽量不要让线程阻塞,因为一阻塞,CPU 就闲下来了。另外就是有多少任务,就用相应规模的线程数去处理。我们注意到 NioEndpoint 要完成三件事情:接收连接、检测 I/O 事件以及处理请求,那么最核心的就是把这三件事情分开,用不同规模的线程数去处理,比如用专门的线程组去跑 Acceptor,并且 Acceptor 的个数可以配置;用专门的线程组去跑 Poller,Poller 的个数也可以配置;最后具体任务的执行也由专门的线程池来处理,也可以配置线程池的大小。

这其中比较核心的就是把检测IO事件这一操作由少量selector集中处理,避免大量线程占用cpu时间在轮询IO事件上

Java中自身的NIO到底是同步非阻塞,还是IO多路复用

NIO API 可以不用 Selector,就是同步非阻塞。使用了 Selector 就是 IO 多路复用

Tomcat 该组件虽然是叫 NioEndpoint,但使用了 Selector,所以其实是 IO 多路复用

如何理解 IO 操作模型中的同步异步,阻塞非阻塞

同步异步:

  • 同步可以理解为线程在请求 IO 数据后是直接返回,还是阻塞等待数据从网卡(或者其他地方)拷贝到内存空间再到用户空间
  • 异步可以理解为线程在请求 IO 数据后直接返回,但是在请求时注册了一个回调函数,内核将数据准备好后通过回调函数通知

阻塞和非阻塞主要是看发起I/O操作时,内核空间没有数据可读时,线程是否会阻塞等待,直到有数据到来

  • 阻塞:调用 read() 时,如果内核空间中没有数据可读,线程就让出 cpu 阻塞等待,直到内核把数据拷贝到用户空间,唤醒线程,read()调用返回
  • 非阻塞:调用 read() 时,如果内核空间没有可读数据,线程立刻返回,直到再次调用read(),内核空间有数据可读时,阻塞等待内核把数据拷贝到 read() 函数指定的buff中,唤醒线程,read()调用返回

关于 IO 的几篇文章推荐:

【1】https://time.geekbang.org/column/article/100307 Tomcat如何实现IO多路复用

【2】https://time.geekbang.org/column/article/103959 内核如何阻塞与唤醒进程

【3】https://mp.weixin.qq.com/s/LYbJxorhsyoWWtP6OR6-eQ 一顿饭的事儿,搞懂Linux5种IO模型


http://www.mrgr.cn/news/58419.html

相关文章:

  • 扫雷游戏的分析、设计与代码实现详解
  • manictime整合两个数据库的数据
  • RabbitMQ常见问题持续汇总
  • 【Unity】仓库逻辑:拾取物体进仓库和扔掉物品
  • 工具_docsify
  • 【高录用|24-25年EI会议推荐】计算机科学、通信、图像、人工智能、算法、应用技术、电子信息工程等多领域征稿参会
  • 获取每个访客的第一条访问日志(获取网站的UV)
  • 「 自动化测试 」面试题..
  • 请简述同步和异步的区别。
  • 【嵌入式】全面解析温度传感器:PT1000、热电偶、热敏电阻与红外传感器的原理与应用
  • 【密码学】隐语HEU同态加密算法解读
  • 5G NR NARFCN计算SSB中心频率MATLAB实现
  • 『 Linux 』网络传输层 - UDP
  • Python自动化测试+邮件推送+企业微信推送+Jenkins
  • css绘制s型(grid)
  • DDD重构-实体与限界上下文重构
  • 使用mock进行接口测试教程
  • 数据库之旅:从MySQL起航,领略数据的海洋
  • 基于KV260的基础视频链路通路(MIPI+Demosaic+VDMA)
  • C语言的书写
  • java编译[WARNING]告警处理
  • 内存中划分的四个主要区域
  • 为什么使用 toFixed 方法的结果不一致呢?
  • 什么品牌的台灯护眼比较好?五款性能与品质兼并的护眼台灯分享
  • 2024年双十一有什么好物推荐?盘点2024双十一爆款好物分享
  • Nature 正刊丨阻断翻译的mRNA ADP核糖基转移酶的抗病毒防御