当前位置: 首页 > news >正文

『 Linux 』高级IO (二) - 多路转接

文章目录

    • 前情提要
    • 新连接的获取
    • 新连接的添加
    • 不同事件的处理
    • select 的缺点
    • poll( )
      • SelectServer 改为 PollServer
    • Epoll多路转接方案
      • Epoll 原理
      • 深入了解Epoll接口
      • Epoll的优势
    • select( )/poll( )完整代码(供参考)


前情提要

在博客『 Linux 』高级IO (一)中介绍了五种IO模型;

  • 阻塞式IO
  • 非阻塞式IO
  • 信号驱动IO
  • IO多路转接
  • 异步IO

并且介绍了select()如何进行多路转接;

同时为了验证select()的功能以及基础使用,实现了一个SelectServer服务器;

但并没有将该服务器的功能进行完善实现;

服务器代码内容主要为如下:

/*	SelectServer.hpp	*/class SelectServer
{
public:SelectServer(uint16_t port = defaultport) : _port(port) {}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void Start(){int listensock = _listensock.GetFd();fd_set rfds;               // 读文件描述符集for (;;){// 不能直接accept 本质上accept就表示检测并获取listensock上的事件// 新连接的到来等价于读事件的就绪FD_ZERO(&rfds);            // 清空描述符集FD_SET(listensock, &rfds); // 将文件描述符写入至读文件描述符集中struct timeval timeout = {5, 0};// 设置timeout时间int n = select(listensock + 1, &rfds, nullptr, nullptr, &timeout);switch (n){case 0:printf("Time out, timeout: %ld . %ld\n", timeout.tv_sec, timeout.tv_usec);break;case -1:printf("Select Error\n");break;default:printf("%d fd Even Ready, timeout: %ld . %ld\n", n, timeout.tv_sec, timeout.tv_usec);sleep(2);break;}}}~SelectServer(){_listensock.Close();}private:NetSocket _listensock;uint16_t _port;
};

当前代码中SelectServer服务器主要分为两个部分;

Init()的初始化部分与Start()的运行部分;

  • Init()

    该部分主要用于初始化,主要为创建套接字,绑定以及设置为监听状态;

    由于在以往的博客中实现过对应的socket()接口的封装;

    故在Init()中直接调用相关成员函数;

  • Start()

    当套接字被设置为监听状态后不能直接调用accept();

    因为本质accept()就是一种阻塞式等待客户端与服务器建立连接,即读事件的发生;

    而本服务器为SelectServer,旨在采用select()实现服务器,故不需要使用accept();

    在上一篇博客提到select()等待的方式有三种,分别为 timeout时间戳” , “非阻塞式” 以及 “阻塞式” 三种方式;

    而代码中所采用的方式为 timeout时间戳” ;

    select()可以等待多个文件描述符的读事件本质原因是该函数需要使用对应的内置数据结构 —— fd_set类型的数据结构(位图);

    通过fd_set类型位图配套的位图操作接口与select()函数使得操作系统能知道用户所关心的文件描述符;

在当前代码下,已经能够实现对新来的连接进行识别并提示给用户;

由于位图fds中只有监听套接字一个,所以无论此处监听套接字监听到了多少新连接这里也只会是1;

因为位图fds目前只关心该位图中唯一的套接字是否有读事件发生;

同时此处的循环通知与select()有关,当使用select()使操作系统关心对应文件描述符的事件是否就绪后,只要事件存在未被处理,那么事件将一直处于就绪状态,故将会一直通知上层某个描述符中某事件已经就绪;


新连接的获取

根据select()函数的返回值来看,当返回值<0时表示该函数调用失败,当返回值==0时表示timeout时间戳结束,只有当返回值>0时才表示事件已经就绪;

class SelectServer
{
public:void Start(){int listensock = _listensock.GetFd();fd_set rfds;               // 读文件描述符集for (;;){// 不能直接accept 本质上accept就表示检测并获取listensock上的事件// 新连接的到来等价于读事件的就绪FD_ZERO(&rfds);            // 清空描述符集FD_SET(listensock, &rfds); // 将文件描述符写入至读文件描述符集中struct timeval timeout = {0, 0}; // 非阻塞// struct timeval timeout = {5, 0}; // 超时事件int n = select(listensock + 1, &rfds, nullptr, nullptr, /*&timeout*/nullptr);switch (n){case 0:printf("Time out, timeout: %ld . %ld\n", timeout.tv_sec, timeout.tv_usec);break;case -1:printf("Select Error\n");break;default:printf("%d fd Even Ready, timeout: %ld . %ld\n", n, timeout.tv_sec, timeout.tv_usec);HandlerEvent(rfds); // 通过select()函数判断位图中是否有读事件发生 // 若是有读事件发生则调用对应的读事件发生 HandlerEvent() sleep(2);break;}}}void HandlerEvent(fd_set& rfds){if(FD_ISSET(_listensock.GetFd(),&rfds)){  // 判断事件是否就绪std::string clientip;uint16_t clientport;int sock = _listensock.Accept(&clientip,&clientport); // 获取新连接的IP与端口if(sock<0) return;lg(INFO,"Accept sucessm %s: %d",clientip.c_str(),clientport);}}private:NetSocket _listensock;uint16_t _port;
};

故在代码中采用switch()case:语句,通过不同的返回值来进行不同的操作;

select()返回值>0时则表示该次select()已经有"返回值"个读事件已经就绪;

就绪后调用void HandlerEvent(fd_set& rfds)函数进行事件处理;

在这里的事件处理函数中,主要是调用accept()获取具体连接;

在以往的代码中,通常直接使用accept()时将会阻塞,必须通过创建一个新的执行流来完成不同工作以至于提高效率;

而在该处使用accept()并不会阻塞;

原因是该处的accept()处于事件处理函数之中, 而事件处理函数的调用是读事件已经就绪的情况才会调用,当读事件已经就绪,即监听套接字已经监听到了一个新的连接时才会调用HandlerEvent()函数,该函数中的accept()直接将就绪的连接获取即可;

以往直接调用accept()获取新连接会被阻塞的原因是,accept()本质上就充当了既负责监听又负责获取连接两个步骤的执行人,即阻塞式IO,而使用了select()后等待新连接与获取连接两个操作不再单由accept()完成,select()负责等待,accept()负责获取(拷贝),这也意味着accept()函数只需要将已有的连接获取上来即可;

从图片来看,当新连接被accept()获取后则视为事件被处理(accept()获取新连接后取消监听就绪状态,本质上是清理或消费监听队列中的事件);

accept()获取了这个新的连接后并不能马上对该描述符直接进行读写,本质是因为读写所调用的read()/write()同样是一种阻塞式IO;

而为了防止阻塞式IO的阻塞降低效率,必须同样的通过select()来对事件进行检查,因此还需要一个重要步骤,即将accept()获取上来的新连接(套接字描述符)添加到select()中的位图中;


新连接的添加

select()能同时等待多个描述符的读写事件就绪的本质原因是得益于其对应的fd_set类型(操作系统所提供的一种文件描述符位图类型,其比特位的位置永远表示文件描述符的编号);

int main()
{std::cout<<"fd_set bits num : "<<sizeof(fd_set) * 8<<std::endl;return 0;
}

这是一个用来计算该位图类型共能够存放文件描述符个数的代码(位图单位为bit,1byte == 8bits);

最终结果不同的系统不同机器可能计算出的结果不同,本机计算结果为1024,这说明在当前机器中select()最多可以同时等待1024个文件描述符的事件;

为了使得套接字信息能进行传递,通常在使用select()时还会额外使用一个辅助数组,这个辅助数组用于将accept()所获取的套接字传递至select()部分使得一个select()可以同时等待监听套接字与读写套接字;

static const int fd_num_max = (sizeof(fd_set)*8); // 具体的位图可监听数量class SelectServer
{
public:SelectServer(uint16_t port = defaultport) : _port(port) {   for(int i = 0 ;i<fd_num_max;++i){ // 对辅助数组进行初始化_fd_array[i] = -1; // 初始化为 -1}}
private:NetSocket _listensock;uint16_t _port;int _fd_array[fd_num_max]; // 辅助数组 用于传递文件描述符
};

在这次代码修改中为SelectServer服务器添加了一个辅助数组,并在构造函数中对辅助数组进行初始化为-1(在本代码中只关心读事件,故该辅助数组用于辅助读事件的观察);

在上文中提过;

int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);

select()中除了nfds参数以外,其他参数都是输入输出型参数;

其输入输出型参数则表示当select()函数调用完成后,除了nfds参数以外的其他参数中所传参数指针对应的值都会发生改变;

如假设你需要等待[3,7]文件描述符中的读事件,则需要设置对应的位图为1111 1000,而当select()等待成功后只有3,4文件描述符就绪时对应的位图结构中位图占位则会修改为0001 1000;

这种变化是不可控的,完全根据select()的结果而定,也可能因为所设置timeout结束从而返回一个空位图;

因此位图需要被重复设置,以防止未就绪的位图也被清理;

而重复设置位图则表示需要一个结构来保存对应位图的状态从而方便对位图进行重新设置,而这个结构则为上文中所设置的辅助数组;

本质上而言辅助数组既可以完成描述符的传递,也可以用来进行对原有位图状态的保存;

static const int fd_num_max = (sizeof(fd_set) * 8); // 具体的位图可监听数量static const int defaultfd = -1;class SelectServer
{
public:void Start(){int listensock = _listensock.GetFd();_fd_array[0] = listensock;for (;;){fd_set rfds;    // 读文件描述符集FD_ZERO(&rfds); // 清空描述符集 提前进行初始化int maxfd = _fd_array[0];for (int i = 0; i < fd_num_max; ++i) { // 此处为暴力遍历 可以使用滑动窗口算法 来减少遍历范围if (_fd_array[i] = defaultfd)continue;FD_SET(_fd_array[i], &rfds);maxfd = (maxfd < _fd_array[i] ? _fd_array[i] : maxfd);}lg(INFO,"The current maxfd is : %d",maxfd);struct timeval timeout = {5, 0}; // 超时事件// int n = select(listensock + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);switch (n){case 0:printf("Time out, timeout: %ld . %ld\n", timeout.tv_sec, timeout.tv_usec);break;case -1:printf("Select Error\n");break;default:printf("%d fd Even Ready, timeout: %ld . %ld\n", n, timeout.tv_sec, timeout.tv_usec);HandlerEvent(rfds); // 通过select()函数判断位图中是否有读事件发生// 若是有读事件发生则调用对应的读事件发生 HandlerEvent()sleep(2);break;}}}private:NetSocket _listensock;uint16_t _port;int _fd_array[fd_num_max]; // 辅助数组 用于传递文件描述符
};

在这段代码中添加了一个循环,这个循环是用于对辅助数组的遍历,将辅助数组中对应的文件描述符通过FD_SET()函数设置进位图中;

这里的遍历采用的是暴力遍历的方法,也可以使用滑动窗口的方法来区分数组中已占用和未占用的区域从而减少遍历次数;

在调用select()时需要最大描述符+1,故在循环FD_SET()时不断更新当前最大描述符大小;

假设这是第一次循环,那么整个位图以及辅助数组中只会存在监听套接字对应的描述符,当select()在第一次调用时返回值为1则表示当前监听套接字有读事件就绪,此时应调用HandlerEvent()函数并使用accept()获取新连接;

而新的连接同样需要添加到select()对应的位图当中,而管理位图对应位置套接字的结构为辅助数组,因此当获取到一个新的连接时将连接添加到辅助数组即可;

class SelectServer
{
public:void HandlerEvent(fd_set &rfds){if (FD_ISSET(_listensock.GetFd(), &rfds)){ std::string clientip;uint16_t clientport;int sock = _listensock.Accept(&clientip, &clientport); if (sock < 0)return;lg(INFO, "Accept sucessm %s: %d , sockfd: %d", clientip.c_str(), clientport, sock);// 将获取到的套接字添加到辅助数组里int pos = 1; // 从 1 开始 , 0 已经被监听套接字占用for(;pos<fd_num_max;++pos){if(_fd_array[pos] != defaultfd) continue;// 数组该位置被占用break;}if(pos == fd_num_max){lg(WARNING,"server is full, close %d. ",sock);// 服务器满载(位图已经被用完)close(sock);}else{_fd_array[pos] = sock; // 增加}}}private:NetSocket _listensock;uint16_t _port;int _fd_array[fd_num_max]; // 辅助数组 用于传递文件描述符
};

遍历辅助数组,找到未被占用的地方将新的连接添加到该处,若是辅助数组中所有位置都被占满则表示服务器满载,无法添加新的连接,需要关闭新的连接;

当然,该处的HandlerEvent()函数每次只处理一个新连接,原因是本质这里是调用accept()进行连接获取,而accept()获取连接是一次调用获取一个新连接,故代码逻辑也是以一次调用获取一个连接为基准;

从结果可以看出,该程序目前可以正常获取新的连接并将新连接更新至辅助数组当中;


不同事件的处理

当前情况下程序能够有效的获取新连接,将连接存放至辅助数组并将连接设置进对应的位图结构当中;

select()最终所返回的结果中就绪的事件不仅仅只有监听套接字获取新连接的事件,同样的还有读写套接字中的可读事件是否就绪,因此需要在HandlerEvent()事件处理函数当中对两种事件进行区分;

当然此处只甄别监听套接字监听连接事件与读写套接字中可读事件主要是因为该程序主要只进行读事件的关注,如果该程序同时注意写事件的话同样也需要进行甄别;

void HandlerEvent(fd_set &rfds){for (int i = 0; i < fd_num_max; ++i){int fd = _fd_array[i]; // 取出辅助数组中存储的描述符if (fd == defaultfd)continue; // 如果为默认fd则直接进行下一次循环// if (FD_ISSET(_listensock.GetFd(), &rfds))if (FD_ISSET(fd, &rfds)){                                  // 判断事件是否就绪if (fd == _listensock.GetFd()) // 如果描述符为监听套接字 则进行新连接的获取{// ....}else{ // 表示其他文件描述符对应的读事件就绪 可以直接进行读取// ....}}}DebugPrint();}

如代码所示,在原有的基础上又套上了一层循环,这个循环是用来遍历辅助数组中的文件描述符;

当遍历到默认描述符-1则表示遍历位置并未存储描述符,继续向下遍历;

当遍历数组对应位置不为-1时则表示该位置存储了一个有效的文件描述符,随机对该位置描述符进行判断,判断结果为两种:

  • 属于监听套接字描述符
  • 不属于监听套接字描述符

若是属于监听套接字描述符则进行上文中添加新的描述符的操作;

若是不属于监听套接字则表示其他描述符的读事件就绪,可以对数据进行读取;

static const uint16_t defaultport = 8050;static const int fd_num_max = (sizeof(fd_set) * 8); // 具体的位图可监听数量static const int defaultfd = -1;class SelectServer
{
public:void HandlerEvent(fd_set &rfds){for (int i = 0; i < fd_num_max; ++i){int fd = _fd_array[i]; // 取出辅助数组中存储的描述符if (fd == defaultfd)continue; // 如果为默认fd则直接进行下一次循环// if (FD_ISSET(_listensock.GetFd(), &rfds))if (FD_ISSET(fd, &rfds)){                                  // 判断事件是否就绪if (fd == _listensock.GetFd()) // 如果描述符为监听套接字 则进行新连接的获取{std::string clientip;uint16_t clientport;int sock = _listensock.Accept(&clientip, &clientport); // 获取新连接的IP与端口if (sock < 0)continue;lg(INFO, "Accept sucessm %s: %d , sockfd: %d", clientip.c_str(), clientport, sock);// 将获取到的套接字添加到辅助数组里int pos = 1; // 从 1 开始 , 0 已经被监听套接字占用for (; pos < fd_num_max; ++pos){if (_fd_array[pos] != defaultfd)continue;break;}if (pos == fd_num_max){lg(WARNING, "server is full, close %d. ", sock);close(sock);}else{_fd_array[pos] = sock;}}else{ // 如果不为监听套接字则表示其他描述符的读事件已经就绪// 该程序只关心读事件char inbuff[1024];ssize_t n = read(fd, inbuff, sizeof(inbuff) - 1);// 直接调用read可能存在信息获取不完整等问题if (n > 0){ // 读取成功inbuff[n] = 0;printf("Get a massage for %d fd: %s\n", fd, inbuff);}else if (n == 0){ // 连接被关闭lg(INFO, "Fd %d connect closed", fd);close(fd);_fd_array[i] = defaultfd;}else{ // 读取失败lg(WARNING, "Fd %d recv error", fd);close(fd);_fd_array[i] = defaultfd;}}}}DebugPrint();}private:NetSocket _listensock;uint16_t _port;int _fd_array[fd_num_max]; // 辅助数组 用于传递文件描述符
};

在这段代码中还存在一个小问题,即读取时的问题,此处的读取所调用的函数为read(),而实际上在网络通信过程中可能所传递的数据并不是一个完整的数据,所以这里直接调用read()也可能存在数据读取不完整等问题(此处忽略);

当然调用read()后将会有三种结果:

  • 返回值>0

    表示有数据被成功读取;

  • 返回值==0

    表示对端套接字关闭;

  • 返回值<0

    表示读取失败;

将根据不同的结果进行不同的操作;

如图所示,当使用telnet工具使用环回地址连接程序后套接字列表正在逐渐增加;

当客户端向服务器发送消息时服务端能够处理并将数据进行显示;

当客户端主动向服务器断开连接时服务器打印出对端断开的信息;


select 的缺点

本质上select()已经是一个多路转接的方案,即通过同时等待多个描述符的事件就绪从而大大提升效率,但相比其他的多路转接方案还是有一定的局限性;

首先:

  • 等待的描述符个数具有上限

    一台机器所能使用的文件描述符根据不同的机器其上限不同但一定是有上限的;

    如当前机器为一台Ubuntu的机器,使用ulimit -n命令查看当前机器上可使用的文件描述符限制为最大65535;

    $ ulimit -n
    65535
    

    select()进行等待多种事件就绪的方式为内置的位图结构,这种位图结构的大小是已经定下的,在上文中验证过为1024(不同机器有所差异);

    可以得出无论如何运用,单纯使用位图结构对文件描述符进行等待操作一定是存在限制的;

    这个限制与进程和系统无关,本身是一种接口的限制,也是一种缺点;

  • 输出型参数过多

    select()的函数声明如下:

    int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
    

    该函数的参数除了第一个参数nfds以外,其他的四个参数都为输入输出型参数;

    这会导致两个问题:

    • 数据拷贝频率变高

      输入输出型参数表示,每次在进行函数调用时都将进行将用户数据拷贝进内核;

      由将内核数据拷贝回用户;

      不停的进行用户层与内核层的数据拷贝将会降低整体效率;

    • 每次需要对所关心的文件描述符进行重置

      不仅如此,输入输出型参数表示每次内核层数据拷贝回用户层时都会覆盖原有用户层的数据;

      当用户需要对事件未就绪的描述符进行等待时需要重新设置参数,同样的也会降低效率并且维护成本较高;

  • 多次遍历

    同时,select()在实现中,无论是在用户态还是内核态,检查文件描述符的状态都需要进行遍历;

    当调用该函数时,用户态所填写的一张或多张位图将会由用户态拷贝至内核态;

    内核将逐一检查这些文件描述符是否就绪(遍历);

    具体的将通过遍历的方式逐一根据实际的底层设备驱动程序接口询问设备状态;

    检测完成后内核将就绪描述符更新回位图并返回到用户态;

    多次的遍历将会降低整体效率;

为了修正上述select多路转接方案的短板,poll被引入;


poll( )

poll()是用来修正select()多路转接方案短板的一种方案,其重点是解决select()中的等待描述符具有上限与需要每次重置所等待的文件描述符两个问题;

SYNOPSIS#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);RETURN VALUEOn success, poll() returns a nonnegative value which is the number of elements in the pollfds  whoserevents  fields  have been set to a nonzero value (indicating an event or an error).  A return valueof zero indicates that the system call timed out before any file descriptors became read.On error, -1 is returned, and errno is set to indicate the cause of the error.

该函数的返回值(n)与select()的返回值相同;

  • n > 0

    表示有n个事件就绪;

  • n < 0

    表示调用失败;

  • n == 0

    表示timeout事件就绪(超时);

其参数如下:

  • int timeout

    timeoutselect()不同;

    select()中的timeout过于复杂,为结构体且存在微秒与秒,而polltimeoutint类型,单位为毫秒;

    其存在三种情况:

    • timeout < 0

      表示设置poll多路转接的方案为永久阻塞阻塞,直至事件就绪;

    • timeout > 0

      表示设置poll多路转接方案为设置timeout时间为传入参数时间(单位为毫秒);

      timeout传入参数为1000,则一秒过后返回(1s = 1000ms);

    • timeout == 0

      表示设置poll多路转接方案为非阻塞方式;

  • struct pollfd *fds

    该参数的类型struct pollfd是为poll()所提供的一种类型;

    该结构体的定义如下:

               struct pollfd {int   fd;         /* file descriptor */short events;     /* requested events */short revents;    /* returned events */};
    

    其中int fd表示用户需要关心的描述符,events表示用户告诉内核需要关心的文件描述符的具体事件,revents表示内核告诉用户具体事件是否就绪;

    有效的将两者进行了分离从而避免内核态拷贝回用户态时覆盖用户态的内容;

    而实际上这里同样采用了位图的方式,与select()不同的是,这个位图采用shrot的类型,即16位位图,通过位操作进行选项设置从而进行传参;

    事件描述是否可作为输入参数是否可作为输出检测
    POLLIN 数据可读(例如普通数据、流式套接字和优先级数据到达)。
    POLLRDNORM 普通数据可读(与 POLLIN 等价,主要为了 POSIX 兼容)。
    POLLRDBAND 优先级带宽数据可读(较为少用,在 Linux 平台通常无实际作用)。
    POLLPRI 紧急数据可读(如 TCP 带外数据或特殊状态发生)。
    POLLOUT 数据可写写入不会阻塞(例如缓冲区足够大时可写入或管道可以接受写操作)。
    POLLWRNORM 普通写入数据可用(与 POLLOUT 等价,主要为了 POSIX 兼容)。
    POLLWRBAND 优先级带宽数据可写(较为少用)。
    POLLERR 错误,比如对管道的写入端关闭、文件描述符无效等,仅在 revents 中返回,表示已发生错误。
    POLLHUP 挂起或关闭,例如套接字连接的对端已关闭,仅在 revents 中返回。
    POLLRDHUP 流式套接字的对端关闭连接或关闭写半通道,仅在 Linux 2.6.17 及更高版本支持,需定义 _GNU_SOURCE 宏。
    POLLNVAL 文件描述符未打开或非法请求,仅在 revents 中返回。

    这些标志实际上都为宏,本质上是通过short类型转化为对应位图的标志位,通过这些标志位来进行参数的传递;

    而这里传递的参数为指针struct pollfd *,这表明所传递的并不单单是一个结构体,而是段连续的空间,或者可以说是结构体数组,而该数组的元素个数即为参数nfds_t nfds;

  • nfds_t nfds

    结构体数组中元素个数;


SelectServer 改为 PollServer

通过上文中对SelectServer缺点的描述以及poll()的介绍,可以直接将对应的SelectServer修改为PollServer;

只需要修改与select()相关位置即可;

  • PollServer类定义与初始化

    static const int fd_num_max = 64; // 具体的可监听数量static const int defaultfd = -1;static const int non_event = 0;class PollServer
    {
    public:PollServer(uint16_t port = defaultport) : _port(port){for (int i = 0; i < fd_num_max; ++i){_event_fds[i].fd = defaultfd; // 初始化_event_fds[i].events = non_event;_event_fds[i].revents = non_event;}}bool Init(){_listensock.Socket();_listensock.Bind(_port);_listensock.Listen();return true;}void Start(){ }~PollServer(){_listensock.Close();}private:struct pollfd _event_fds[fd_num_max]; // 此处为用户自行定义NetSocket _listensock;uint16_t _port;
    };
    

    整体与SelectServer相差不大,在SelectServer服务器中,由于需要重复对位图进行设置(所关心事件套接字),因此需要维护一个辅助数组用来存储需要设置进位图的数组;

    而对PollServer而言同样需要一个额外的数组,但是这个数组并不是一个辅助数组,是本身需要传递给poll()函数的一个数组;

    这个数组用来代替select()的位图传递用户到内核及内核到用户的数据信息;

    在代码的开头处同样设置了几个常量,分别为:

    • static const int fd_num_max = 64

      表示关心的文件描述符最大限制是多少,此处为64个(可根据用户需求进行更改);

    • static const int defaultfd = -1

      表示未被占用时,默认所存放的描述符;

    • static const int non_event = 0

      表示没有事件需要关心;

    在上文中提到,所传入的数组中每一个元素都为struct pollfd类型;

               struct pollfd {int   fd;         /* file descriptor */short events;     /* requested events */short revents;    /* returned events */};
    

    定义了对应的结构体数组后需要对数组进行初始化,在构造函数中将每个结构体对象中的fd初始化为-1,eventsrevents则设置为0;

    PollServer(uint16_t port = defaultport) : _port(port){for (int i = 0; i < fd_num_max; ++i){_event_fds[i].fd = defaultfd; // 初始化_event_fds[i].events = non_event;_event_fds[i].revents = non_event;}}
    

    对应的Init()初始化函数则不需要进行修改,同样是进行套接字的创建,绑定与监听;

    同样的析构函数则是close()对应的监听套接字描述符即可;

同样的函数主要分为两个部分,一个是初始化部分,一个是启动/运行部分;

其中初始化的工作由构造函数与Init()负责,而运行部分则为Start()部分负责;

  • Start()

    SelectServer服务器中,由于使用select()时其大多数参数为输入输出型参数,当函数调用完成后,对应由用户输入的参数将会被内核所覆盖,因此需要重复对参数进行设置;

    PollServer服务器使用poll,其将用户到内核与内核到用户的数据进行区分,无需循环对参数属性进行设置;

    class PollServer
    {
    public:void Start(){_event_fds[0].fd = _listensock.GetFd(); // 设置关心监听套接字_event_fds[0].events = POLLIN;          // 设置监听套接字关心读事件 (监听将要来的连接)// revents 可设可不设 原因为 revents 为内核给用户的数据信息int timeout = 3000;for (;;){int n = poll(_event_fds, fd_num_max, timeout /* timeout可硬编码 也可以设置变量*/);switch (n){case 0:printf("Time out\n");break;case -1:printf("Poll Error\n");break;default:printf("%d fd Even Ready\n",n);Dispatcher(); // 若是有读事件发生则调用对应的读事件发生 Dispatcher()sleep(2);break;}}}private:struct pollfd _event_fds[fd_num_max];NetSocket _listensock;uint16_t _port;
    };
    

    首次poll()只需设置监听套接字即可,原因为第一次poll()时只有监听套接字需要关心读事件是否就绪,后续的描述符的添加为处理事件处;

    循环调用poll()判断是否有读事件就绪,通常poll()返回值存在三种情况(上文中有描述),需要根据三种情况进行不同处理,其中当返回值>0时表示所关心的文件描述符有事件已经就绪,需要对数据进行处理;

  • Dispatcher()事件派发器

    事件派发器主要是用于根据所关心文件描述符的情况进行不同的操作;

    在当前程序中将会出现两种情况:

    • 事件就绪文件描述符为监听套接字描述符
    • 事件就绪文件描述符为读写描述符

    当就绪的描述符为监听套接字描述符时表示套接字监听到了一个新的连接,需要获取新的连接,并将新的连接设置进poll()中以后续能够以读写文件描述符的方式关心该描述符读事件是否就绪;

    当就绪的描述符为读写描述符(此程序只关心读事件)时表示读事件就绪,可以调用类似read()函数将数据由套接字描述符中拷贝至用户;

    class PollServer
    {
    public:void Dispatcher() // 事件派发器 将不同的事件派发给不同的模块{for (int i = 0; i < fd_num_max; ++i){int fd = _event_fds[i].fd; // 取出数组中存储的描述符if (fd == defaultfd)continue; // 如果为默认fd则直接进行下一次循环if (_event_fds[i].revents & POLLIN) // 通过按位与的方式判断是否就绪{                                  if (fd == _listensock.GetFd()) // 如果描述符为监听套接字 则进行新连接的获取{Accepter();}else{ // 如果不为监听套接字则表示其他描述符的读事件已经就绪Recver(fd, i);}}}}
    private:struct pollfd _event_fds[fd_num_max];NetSocket _listensock;uint16_t _port;
    };
  • 连接管理器Accepter()

    当监听套接字监听到了一个新的连接后则需要对该连接进行处理;

    主要的工作为:

    • 调用accept()获取新连接
    • 遍历struct pollfd *数组找到空位
    • 将新连接添加至数组中并设置读事件为关心事件
    class PollServer
    {
    public:void Accepter(){ // 连接管理器std::string clientip;uint16_t clientport;int sock = _listensock.Accept(&clientip, &clientport); // 获取新连接的IP与端口if (sock < 0){lg(WARNING, "Accept error");return;}lg(INFO, "Accept sucess %s: %d , sockfd: %d", clientip.c_str(), clientport, sock);int pos = 1; // 从 1 开始 , 0 已经被监听套接字占用for (; pos < fd_num_max; ++pos){if (_event_fds[pos].fd != defaultfd)continue;break;}if (pos == fd_num_max){lg(WARNING, "server is full, close %d. ", sock);close(sock);// 可配合动态数组 如直接使用vector使得进行扩容}else{_event_fds[pos].fd = sock;_event_fds[pos].events = POLLIN; // 直接设置所关心事件DebugPrint();}}private:struct pollfd _event_fds[fd_num_max];NetSocket _listensock;uint16_t _port;
    };
    

    select()不同的是,select()的位图使用的是定长,即无论如何都为固定的长度;

    poll()根据用户所设置的大小可以突破原有的限制,甚至可以使用变长容器,如vector,或者new开辟一块空间使得能够实现动态增长(此处为固定,即当服务器满载时关闭新的描述符);

  • 信息获取Recver()

    若读事件就绪描述符非监听套接字描述符则为读写套接字描述符,此时可以直接调用recv()/read()等函数进行信息获取;

    class PollServer
    {
    public:void Recver(int fd, int pos){// 该程序只关心读事件char inbuff[1024];ssize_t n = read(fd, inbuff, sizeof(inbuff) - 1);// 直接调用read可能存在信息获取不完整等问题if (n > 0){ // 读取成功inbuff[n] = 0;printf("Get a massage for %d fd: %s\n", fd, inbuff);}else if (n == 0){ // 连接被关闭lg(INFO, "Fd %d connect closed", fd);close(fd);_event_fds[pos].fd = defaultfd;_event_fds[pos].events = non_event;}else{ // 读取失败lg(WARNING, "Fd %d recv error", fd);close(fd);_event_fds[pos].fd = defaultfd;_event_fds[pos].events = non_event;}}private:struct pollfd _event_fds[fd_num_max];NetSocket _listensock;uint16_t _port;
    };
    

    当然这里直接读取同样存在一个问题,即数据读取可能不完整;

运行程序并测试;

从结果可以看出,当有新连接到来时将会被获取,同时数组中的套接字描述符也在增多(连接被获取并被添加至数组中);

当服务器接收到消息时将消息获取并进行显示;

当连接被关闭时服务器也将关闭对应的连接;

对于select()多路转接方案而言,poll()多路转接方案能够有效提高多路转接效率,尤其针对select()的重复设置所关心事件就绪描述符与关心描述符有限;

此处的有限主要针对在于接口的设置上,在select()中所关心的描述符的大小为默认大小,通常情况为1024,而poll()实际在于用户设置的大小限制,不限于接口上,其最大甚至可以同时关心65535个文件描述符;


Epoll多路转接方案

poll()多路转接方案相比select()方案而言,无论是效率上还是限制上要好了不少,从上文可以得知,select()方案的多路转接可以很方便的改成poll()方案的多路转接;

说明实际上这两种多路转接方案还是十分相似;

如"遍历";

select()而言,其需要进行多次遍历,但poll()方案也好不了多少,因为遍历不仅分用户态遍历也分内核态遍历;

况且poll()方案就select()而言,其提供了更大范围,甚至是没有上限的数组,因此其需要遍历的范围也将变得更大,就如上述代码而言,每次套接字描述符事件就绪后无非是监听到了新的连接或是某个描述符获取到了新的信息,无论是哪种,实际的处理方式都是对一个描述符进行处理;

而每次处理一个描述符时都要对描述符进行一次遍历,若是描述符已经变得很大,那么每次套接字描述符的处理时的遍历都将成为一笔不菲的开销;

这是一种效率问题,因此为了解决这个效率问题,Epoll多路转接方案被引出;

epoll(Extended Poll), 为了处理大批量句柄而做了改进的poll;

其主要有三个核心函数:

  • epoll_create()

    int epoll_create(int size);
    

    该函数表示创建一个epoll实例,并返回一个文件描述符,用于后续对该实例进行操作;

    其参数size用于提示内核分配所需的资源大小,而在现代系统中已经被忽略;

  • epoll_wait()

    int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
    

    该函数表示等待事件发生,并返回已触发的事件列表;

    其参数为如下:

    • int epfd

      该参数表示需要传递的epoll实例的文件描述符,即epoll_create函数的返回值;

    • struct epoll_event *events

      用于存储触发事件的数组,这个数组与poll相同是个结构体数组,其对应的结构体结构为如下:

      struct epoll_event {uint32_t events;    // 感兴趣的事件类型,如 EPOLLIN、EPOLLOUTepoll_data_t data;  // 联合体,可以存储用户数据。
      };typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;
      } epoll_data_t;
      

      其中data的类型为联合体类型epoll_data,通常用来保存用户级的数据;

      events是一种以位图形式表现的,可以通过设置不同的标记位使得传递用户所感兴趣的事件类型;

    • int maxevents

      表示数组的最大容量,限制每次返回的最大事件数;

    • int timeout

      表示等待事件,单位为毫秒,与poll中的timeout相同,此处不做赘述;

    该函数返回值(n)类型为int类型,其返回值存在三种情况:

    • n > 0

      表示已经事件就绪的描述符有n个,此时用户应该处理events数组中的前n个事件;

    • n == 0

      表示没有任何事件就绪,返回原因可能为timeout所设置的时间已到;

    • n < 0

      表示函数调用失败,可根据errno配合来查明失败原因;

  • epoll_ctl()

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    

    该函数表示对epoll实例进行管理和控制,包括添加,修改或删除文件描述符上的事件;

    参数如下:

    • int epfd

      该参数表示epoll实例的文件描述符;

    • int op

      表示要执行的操作,操作可以是以下之一:

      EPOLL_CTL_ADD: 添加新的事件到epoll实例当中;

      EPOLL_CTL_MOD: 修改已有文件描述符对应的事件;

      EPOLL_CTL_DEL: 从epoll实例中删除某个文件描述符的事件监控;

    • int fd

      表示目标文件描述符;

    • struct epoll_event *event

      表示指向epoll_event结构体的指针,定义了需要监听的事件以及其他参数;

    该函数的返回值有两种情况,当返回值== 0时表示函数调用成功,< 0时则表示函数调用失败;


Epoll 原理

通常情况下,硬件层到用户层由下至上分别为,硬件,硬件驱动,操作系统,系统调用接口以及用户;

此处硬件以网卡为例;

操作系统在硬件层面上本质是通过硬件中断的方式从而了解到对应硬件上已经有数据可以被读取;

当网卡获取到数据时,将会向CPU发出一个硬件中断,当硬件中断产生时将使操作系统调用网卡驱动将网卡中的数据由数据链路层拷贝至操作系统内核;

这是数据由数据链路层转入至操作系统,而实际上数据最终需要拷贝至文件缓冲区中;

而操作系统为了支持Epoll提供了三种机制:

  • 操作系统在自身内部维护了一棵红黑树

    这棵红黑树中每个节点结构类似如下:

    struct rb_node{int fd; // 内核需要关心的fduint32_t event; // 要关心的事件// ... 其他链接字段等
    };
    

    其中存在内核需要关心的文件描述符,以及对应描述符中需要关心的事件,对应的事件在位图event中以标记位的形式存在;

  • 操作系统在自身内部维护了一个就绪队列

    这个就绪队列可能是一个链表或者其他结构,当红黑树中的某些节点的事件就绪后,将该红黑树节点链入该就绪队列当中(一个节点可以是两个不同数据结构的节点,两种数据结构不冲突);

    其中就绪队列中的节点可能如下:

    struct list_node{int fd; // 已经就绪的fduint32_t event; // 已经就绪的事件
    };
    
  • 网卡允许操作系统注册一些回调机制

    通常情况下操作系统中还会提供一个回调函数;

    网卡以硬件中断的方式将数据拷贝至网卡驱动层,当网卡驱动层数据就绪后将会自动调用回调函数;

    该回调函数大致做以下操作:

    • 将数据向上交付

    • 将数据交付给TCP的接受队列

    • 查找红黑树

      判断红黑树中是否存在对应的文件描述符,该处主要判断是否需要关心对应事件,如EPOLLIN\EPOLLOUT;

      如果关心则将对应节点链入就绪队列中;

    而用户只需关心就绪队列即可;

    而上述的三种机制共同组成一个Epoll模型描述(被整合为一块);

    Epoll模型只是一种描述,在操作系统中将会存在若干个Epoll模型实例,这些实例将通过"先描述后组织"的方式进行管理;

    而实际上Epoll的原理为,当用户使用Epoll模型时,操作系统将会为Epoll模型构建一个struct file结构体,并以文件的形式来管理Epoll模型(Linux中一切皆文件),故epoll_create()函数将返回一个文件描述符;


深入了解Epoll接口

通过上文对Epoll模型原理的解释可以得知Epoll模型对应接口本质如下:

  • epoll_create()

    实际上在调用epoll_create()函数时本质上是实例化出一个Epoll模型,并为该Epoll模型构建对应的struct file结构体挂接到文件描述符表中并返回对应的文件描述符;

  • epoll_ctl()

    该函数是用来设置需要关心的文件描述符与对应需要关心的事件的,而这个本质上由Epoll模型中的红黑树管理;

    因此而函数epoll_ctl(),实际上是对实例化出的Epoll模型中的红黑树进行增删改等操作;

  • epoll_wait()

    该函数用于等待并返回对应就绪事件,本质上是关心对应的就绪队列;


Epoll的优势

Epoll的优势主要体现在较于另外两种多路转接方案(SelectPoll)的效率大大提高;

其在检测就绪事件的时间复杂度为O(1)(只需判断就绪队列是否为空);

在获取就绪事件的时间复杂度为O(n)(只需遍历就绪队列);

其次其较于Select而言,也没有fdevent的限制;

同时其操作系统自行构建与维护的红黑树代替了poll,select中用户维护的数组;

用户无需再维护一个新的数据结构,只需根据epoll所提供的接口使用即可;


select( )/poll( )完整代码(供参考)

  • SelectServer服务器

    Pro24/IO/AdvancedIO/SelectServer · 半介莽夫/My_Linux - 码云 - 开源中国 (gitee.com)

  • PollServer服务器

    Pro24/IO/AdvancedIO/PollServer · 半介莽夫/My_Linux - 码云 - 开源中国 (gitee.com)


http://www.mrgr.cn/news/82443.html

相关文章:

  • JAVA类和对象练习
  • Mac M2基于MySQL 8.4.3搭建(伪)主从集群
  • C++例程:使用其I/O模拟IIC接扣(2)
  • JAVA异常处理练习
  • 从索尼爱立信手机打印短信的简单方法
  • Ubuntu静态IP地址
  • SQL字符串截取函数——Left()、Right()、Substring()用法详解
  • 计算机网络 (21)网络层的几个重要概念
  • AI数据标注师理论部分考试题库 - 500题
  • Spring AOP 扫盲
  • React Router 用法概览
  • C# 附加到进程中,发现断点不是实的断点
  • 中国联通首次推出一套量化大模型的新标准
  • 【YOLOv8老鼠检测】
  • USB 驱动开发 --- Gadget 驱动框架梳理
  • 动态规划<八> 完全背包问题及其余背包问题
  • 国内Ubuntu环境Docker部署CosyVoice
  • 国内Ubuntu环境Docker部署Stable Diffusion入坑记录
  • 多模态论文笔记——Coca
  • 多模态论文笔记——CogVLM和CogVLM2(副)
  • redis的集群模式与ELK基础
  • 如何从文档创建 RAG 评估数据集
  • .Net Core配置系统
  • U8G2库使用案例(stm32)
  • 计算机网络原理(谢希仁第八版)第4章课后习题答案
  • Java-list均分分割到多个子列表