[Linux]:Reactor模式
1. Reactor 的概念
Reactor
反应器模式,也被称为分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。
2. Reactor 模式的角色
在 Reactor
模式,一般存在五个关键角色:
角色 | 解释 |
---|---|
Handle(句柄) | 用于标识不同的事件,本质就是一个文件描述符。 |
Synchronous Event Demultiplexer(同步事件分离器) | 本质就是一个系统调用,用于等待事件的发生。对于 Linux 来说,同步事件分离器指的就是I/O多路复用,比如 select 、poll 、epoll 等。 |
Event Handler(事件处理器) | 由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。 |
Concrete Event Handler(具体事件处理器) | 事件处理器中各个回调方法的具体实现。 |
Initiation Dispatcher(初始分发器) | 初始分发器实际上就是 Reactor 角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪时,就调用事件处理器,最后调用对应的回调方法来处理这个事件。 |
以下就是 Reactor
模式工作流程:
- 注册阶段:具体事件处理器向初始分发器(也就是
Reactor
)注册,表明希望在特定事件发生时得到通知,同时要传递内部的Handle
给初始分发器,Handle
用于向操作系统标识事件处理器。- 启动循环阶段:所有事件处理器注册完毕后,启动初始分发器的事件循环,它会合并各事件处理器的
Handle
,然后通过同步事件分离器等待相关事件发生。- 事件通知阶段:当某个事件处理器的
Handle
变为Ready
状态,同步事件分离器就会通知初始分发器。- 查找处理阶段:初始分发器以
Ready
状态的Handle
为关键信息,找到对应的事件处理器。- 响应事件阶段:初始分发器调用对应事件处理器中的回调方法来对发生的事件作出响应,完成整个事件处理流程,之后持续循环等待新的事件发生并重复上述步骤。
3. 服务器实现
下面我们来实现一个基于 ET 模式的 Reactor 服务器。基于ET(边缘触发)模式实现的 Reactor 服务器,核心在于利用 Reactor 模式高效处理I/O事件。
在其中,文件描述符作为句柄来标识I/O资源。I/O多路复用机制 epoll 充当同步事件分离器,负责监听多个文件描述符的I/O事件,精准判断事件何时发生。 事件处理器涵盖读回调、写回调和异常回调,而其具体实现就是具体事件处理器,分别针对读、写、异常这几类事件做具体逻辑处理。
服务器中的 TcpServer 里的 Dispatcher 函数扮演初始分发器角色,它通过调用 epoll_wait 等待I/O事件就绪。 一旦监测到事件就绪,比如读事件中监听套接字就绪就调 accept 获取连接、其他套接字就绪则调 recv 读数据;写事件就绪就向发送缓冲区写数据;异常事件就绪则直接关闭套接字。Dispatcher 函数会把就绪事件分发给对应的服务处理程序,以此完成整个事件处理流程,实现基于ET模式下对众多并发I/O事件的有效管理与处理。
3.1 准备工作
一般我们的服务器只允许存在一个,所以我们可以写一个防止拷贝的类(禁止赋值重载与拷贝构造),然后交给服务器类继承。
#pragma once
class nocopy
{
public:nocopy(){}nocopy(const nocopy &) = delete;const nocopy&operator=(const nocopy &) = delete;
};
然后我们在写服务器之前,先将套接字 Sock
与多路转接 epoll
的接口封装成单独的类。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>// 定义一些错误码,方便在不同网络操作出错时统一处理
enum
{SocketErr = 1,BindErr,LinstenErr,NON_BLOCK_ERR
};// Sock类,用于封装一些基本的网络套接字操作
class Sock
{// 监听队列的最大长度,用于listen函数中const static int backlog = 10;public:Sock() {}~Sock() {}public:// 创建套接字void Socket(){// 创建一个基于IPv4、TCP协议的套接字_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){// 如果创建套接字失败,输出错误信息并以对应的错误码退出程序std::cerr << "socket error:" << strerror(errno) << std::endl;exit(SocketErr);}// 设置套接字选项,允许地址和端口复用,这样可以在服务器重启后快速重用之前的端口等资源int opt = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));}// 将套接字绑定到指定的IP地址和端口上void Bind(uint16_t port){// 初始化用于绑定的地址结构体,指定协议族为IPv4等信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0){// 如果绑定失败,输出错误信息并以对应的错误码退出程序std::cerr << "bind error:" << strerror(errno) << std::endl;exit(BindErr);}}// 使套接字进入监听状态,等待客户端连接请求void Listen(){if (listen(_sockfd, backlog) < 0){// 如果监听失败,输出错误信息并以对应的错误码退出程序std::cerr << "listen error:" << strerror(errno) << std::endl;exit(LinstenErr);}}// 接受客户端连接请求,返回新的套接字描述符用于与客户端通信,并获取客户端的IP地址和端口号int Accept(std::string *clientip, uint16_t *clientport){// 用于存放客户端地址信息的结构体struct sockaddr_in peer;socklen_t len = sizeof(peer);// 阻塞等待客户端连接,接受连接并返回与客户端通信的新套接字描述符int newfd = accept(_sockfd, (struct sockaddr *)&peer, &len);if (newfd < 0){// 如果接受连接失败,输出错误信息并返回 -1std::cout << "accept error:" << strerror(errno) << std::endl;return -1;}char ipstr[64];// 将网络字节序的IP地址转换为点分十进制的字符串形式inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));*clientip = ipstr;// 将网络字节序的端口号转换为主机字节序,并赋值给对应的指针*clientport = ntohs(peer.sin_port);return newfd;}// 主动发起与指定IP和端口的连接bool Connect(const std::string &ip, const uint16_t &port){// 初始化用于连接的目标地址结构体,指定协议族、端口等信息struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));peer.sin_family = AF_INET;peer.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &peer.sin_addr);int n = connect(_sockfd, (struct sockaddr *)&peer, sizeof(peer));if (n == -1){// 如果连接失败,输出错误信息并返回false表示连接失败std::cerr << "connect to " << ip << ": " << port << " error" << std::endl;return false;}return true;}// 关闭套接字void Close(){close(_sockfd);}// 获取套接字描述符int Fd(){return _sockfd;}private:// 保存套接字描述符int _sockfd;
};
#pragma once
#include <iostream>
#include "nocopy.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
#include <unistd.h>// Epoller类,用于封装epoll相关操作,实现高效的I/O事件多路复用
class Epoller
{// epoll实例创建时初始分配的文件描述符数量,这里设定为128static const int size = 128;public:Epoller(){// 创建一个epoll实例,用于监听多个文件描述符上的I/O事件_epfd = epoll_create(size);if (_epfd == -1){// 如果创建epoll实例失败,输出错误信息(包含具体的系统错误原因)std::cerr << "epoll_create error, " << strerror(errno) << std::endl;}else{// 如果创建成功,输出提示信息,显示创建的epoll实例对应的文件描述符值std::cout << "epoll_create success,_epfd is " << _epfd << std::endl;}}// 用于更新epoll实例中对某个文件描述符关注的I/O事件,或者删除对某个文件描述符的关注bool EpollerUpdate(int oper, int sock, uint32_t event){int n = 0;// 如果操作类型是EPOLL_CTL_DEL,表示要从epoll实例中删除对指定文件描述符的关注if (oper == EPOLL_CTL_DEL){// 调用epoll_ctl函数执行删除操作,传入相应参数(epoll实例描述符、操作类型、文件描述符、空指针表示不需要额外的事件数据)n = epoll_ctl(_epfd, oper, sock, nullptr);}else{// 当不是删除操作时,需要设置要关注的事件相关信息,并传递给epoll_ctl函数// 构造一个epoll_event结构体,用于描述要关注的事件以及关联的文件描述符struct epoll_event ev;ev.events = event;ev.data.fd = sock;// 调用epoll_ctl函数,传入相应参数(epoll实例描述符、操作类型、文件描述符、事件结构体指针)来添加或修改对文件描述符关注的事件n = epoll_ctl(_epfd, oper, sock, &ev);}// 根据epoll_ctl函数的返回值判断操作是否成功(返回0表示成功),并返回相应的布尔值结果return n == 0;}// 等待epoll实例上注册的文件描述符有I/O事件发生,阻塞一定时间(默认3000毫秒,可设置)后返回发生事件的相关信息int EpollerWait(struct epoll_event revents[], int num, int timeout = 3000){// 调用epoll_wait函数,传入相应参数(epoll实例描述符、用于接收事件的数组、数组大小、超时时间),等待事件发生并获取事件信息int n = epoll_wait(_epfd, revents, num, timeout);return n;}~Epoller(){// 在析构函数中,检查epoll实例的文件描述符是否合法(大于等于0表示有效),如果有效则关闭该文件描述符,释放相关资源if (_epfd >= 0){close(_epfd);}}private:// 保存epoll实例对应的文件描述符int _epfd;
};
3.2 Connection 类实现
我们首先需要定义一个Connection
类,旨在封装与网络连接相关的关键要素,包括网络通信所用的文件描述符 _sock
、针对不同事件(读 _recv_cb
、写 _send_cb
、异常 _except_cb
)的回调处理机制,以及用于数据暂存和与 TcpServer
关联的相关成员,以此来有效管理和处理网络连接中的各种情况。
同时也需要定义相关缓冲区成员以及操作函数,便于数据发送与接收。
- 缓冲区相关成员及操作函数:
_inbuffer
和_outbuffer
分别作为输入缓冲区和输出缓冲区,用于解决网络通信中数据读写的缓冲问题。
AppendInBuffer
函数可将从客户端读取到的数据追加到_inbuffer
中,以应对可能出现的粘包情况,待完整报文出现后再做处理;AppendOutBuffer
则用于将要发送给客户端的数据先暂存到_outbuffer
中,待底层 TCP 发送缓冲区有空间时再发送。并且提供了Inbuffer
和OutBuffer
函数来获取这两个缓冲区的引用,便于调试或其他相关操作。
- 与
TcpServer
关联的成员及设置函数:
_tcp_server_ptr
作为指向TcpServer
的弱指针,借助SetWeakPtr
函数进行赋值,它的存在便于在Connection
中根据连接情况快速定位到对应的TcpServer
对象,确保如业务处理函数向输出缓冲区递交数据后能 “提醒”TcpServer
进行后续处理等协作流程的顺利进行。
class Connection;
class TcpServer;// 读、写事件标志(含边缘触发模式)
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
// 全局缓冲区大小常量
const static int g_buffer_size = 128;
// 接收数据回调函数类型别名
using func_t = std::function<void(std::weak_ptr<Connection>)>;
// 异常处理回调函数类型别名
using except_func = std::function<void(std::weak_ptr<Connection>)>;class Connection
{
public:// 构造函数,传入文件描述符初始化Connection(int sock) : _sock(sock) {}// 设置各事件回调函数void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb) {_recv_cb = recv_cb;_send_cb = send_cb;_except_cb = except_cb;}// 获取文件描述符int SockFd() { return _sock; }// 追加数据到输入缓冲区void AppendInBuffer(const std::string &info) {_inbuffer += info;}// 追加数据到输出缓冲区void AppendOutBuffer(const std::string &info) {_outbuffer += info;}// 获取输入缓冲区(用于调试)std::string &Inbuffer() // for debug{return _inbuffer;}// 获取输出缓冲区std::string &OutBuffer(){return _outbuffer;}// 设置指向TcpServer的弱指针void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr) {_tcp_server_ptr = tcp_server_ptr;}private:int _sock; // 关联的文件描述符std::string _inbuffer; // 输入缓冲区std::string _outbuffer; // 输出缓冲区
public:func_t _recv_cb; // 接收数据回调func_t _send_cb; // 发送数据回调except_func _except_cb; // 异常回调std::weak_ptr<TcpServer> _tcp_server_ptr; // 指向TcpServer弱指针std::string _ip;uint16_t _port;
};
3.3 TcpServer 类实现
以下是 TcpServer 实现的具体分析:
- 类继承与成员初始化:
TcpServer
类继承自std::enable_shared_from_this<TcpServer>
与nocopy
,定义了常量num
用于相关操作限定数量。构造函数接收端口号和消息处理回调函数,初始化端口、回调函数、运行状态以及用于管理epoll
操作的指针和套接字操作的指针等关键成员。- 服务器初始化:通过
InitServer
函数按顺序进行套接字创建、设置为非阻塞模式、绑定指定端口、进入监听状态,并将监听套接字添加到epoll
实例中关注读事件,关联接受连接回调函数,为接收客户端连接做准备。- 新连接添加:
AddConnection
函数负责创建Connection
对象,设置其指向当前TcpServer
的弱指针、各类回调函数以及连接相关的IP和端口信息,然后将该连接存入管理容器,并更新epoll
实例以关注对应事件。- 接受连接处理:
Accepter
回调函数在监听套接字读事件就绪时,循环尝试从监听套接字接受客户端连接,对不同错误情况(如资源暂时不可用、被中断等)分别处理,成功接受新连接后设置为非阻塞模式,并再次通过AddConnection
为新连接配置对应接收、发送、异常处理回调函数。- 数据接收与处理:
Recver
回调函数针对客户端数据读取,循环调用recv
函数从套接字读取数据到缓冲区,根据读取结果(成功、客户端关闭连接、各类错误等情况)分别将数据追加到输入缓冲区、调用异常回调或继续尝试读取等操作,读取完成后调用传入的消息处理回调函数做业务逻辑处理。- 数据发送与事件更新:
Sender
回调函数用于向客户端发送数据,循环从连接对应的输出缓冲区取数据发送,依据发送结果(成功、发送字节数为0、各类错误等情况)相应地更新输出缓冲区、结束循环或调用异常回调,最后根据输出缓冲区是否为空决定是否启用写事件来更新epoll
关注的事件。- 异常处理:
Excepter
回调函数在连接出现异常时,从epoll
实例移除对该连接的关注,关闭对应的套接字,并从连接管理容器中删除该连接记录。- 事件分发:
Dispatcher
函数调用epoll_wait
等待事件发生,遍历返回的事件信息,针对异常事件转换为读写事件处理,再依据读、写事件就绪且连接合法的情况,分别调用对应连接已设置的接收、发送回调函数进行处理。- 服务器主循环:
Loop
函数通过控制_quit
状态,不断调用Dispatcher
函数进行事件分发处理,实现服务器持续运行以响应客户端各类I/O事件的功能。
最后,析构函数负责在合适时机关闭监听套接字释放相关资源。整个 TcpServer
类实现了基于 epoll
的服务器功能,通过各函数协同工作有效管理客户端连接及处理各类I/O事件。
class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{// 定义一个常量,用于表示一些操作涉及的数量(比如epoll_wait等操作中相关数组大小等可能会用到)static const int num = 64;public:// 构造函数,初始化TcpServer对象,接收端口号和处理消息的回调函数TcpServer(uint16_t port, func_t OnMessage): _port(port), _OnMessage(OnMessage), _quit(true),_epoller_ptr(new Epoller), _listensock_ptr(new Sock){}// 初始化服务器相关操作,创建套接字、设置为非阻塞、绑定端口、开始监听,并添加监听套接字到epoll关注事件中void InitServer(){// 创建套接字_listensock_ptr->Socket();// 设置套接字为非阻塞模式SetNonBlock(_listensock_ptr->Fd());// 将套接字绑定到指定端口_listensock_ptr->Bind(_port);// 使套接字进入监听状态,等待客户端连接_listensock_ptr->Listen();// 添加监听套接字到epoll实例中,关注读事件,关联接受连接的回调函数(Accepter)等AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1),nullptr, nullptr);}// 添加新的连接相关信息到服务器管理中,包括创建Connection对象、设置回调、关联到TcpServer、添加到epoll关注等void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,const std::string &ip = "0.0.0.0", uint16_t port = 0){// 创建一个新的Connection对象,用于管理该连接的相关信息std::shared_ptr<Connection> new_connection(new Connection(sock));// 设置指向当前TcpServer对象的弱指针,建立关联new_connection->SetWeakPtr(shared_from_this());// 设置该连接对应的接收、发送、异常回调函数new_connection->SetHandler(recv_cb, send_cb, except_cb);new_connection->_ip = ip;new_connection->_port = port;// 将该连接以文件描述符为键,保存到连接管理的容器中(无序映射)_connections.insert(make_pair(sock, new_connection));// 更新epoll实例,添加对该文件描述符关注的事件_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event);}// 接受客户端连接的回调函数,从监听套接字接受新连接,处理相关错误情况,并为新连接设置回调等void Accepter(std::weak_ptr<Connection> conn){auto connection = conn.lock();while (true){std::string clientip;uint16_t clientport;// 从监听套接字接受客户端连接,获取新的套接字描述符等信息int sock = _listensock_ptr->Accept(&clientip, &clientport);if (sock > 0){std::cout << "get a new client, get info sock is " << sock << std::endl;// 设置新套接字为非阻塞模式SetNonBlock(sock);// 添加新连接到服务器管理中,设置接收、发送、异常处理的回调函数(对应类内其他相关函数)AddConnection(sock, EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),std::bind(&TcpServer::Sender, this, std::placeholders::_1),std::bind(&TcpServer::Excepter, this, std::placeholders::_1));}else{// 如果错误码表示资源暂时不可用(非阻塞模式下无连接可接受),跳出循环if (errno == EWOULDBLOCK){break;}// 如果错误码表示被中断,继续尝试接受连接else if (errno == EINTR){continue;}else{break;}}}}// 接收客户端数据的回调函数,循环读取数据到输入缓冲区,处理读取相关错误情况,最后调用消息处理回调void Recver(std::weak_ptr<Connection> conn){if (conn.expired()){return;}auto connection = conn.lock();int sock = connection->SockFd();while (true){char buffer[g_buffer_size];memset(buffer, 0, sizeof(buffer));// 从套接字读取数据到缓冲区ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);if (n > 0){// 将读取到的数据追加到连接对应的输入缓冲区connection->AppendInBuffer(buffer);}else if (n == 0){// 如果读取到的字节数为0,表示客户端关闭连接,调用异常回调处理connection->_except_cb(connection);return;}else{// 如果错误码表示资源暂时不可用(非阻塞模式下无数据可读),跳出循环if (errno == EWOULDBLOCK){break;}// 如果错误码表示被中断,继续尝试读取else if (errno == EINTR){continue;}else{// 其他错误情况,调用异常回调处理connection->_except_cb(connection);return;}}}// 调用处理消息的回调函数,传递当前连接对象,进行业务逻辑处理_OnMessage(connection);}// 发送数据给客户端的回调函数,循环从输出缓冲区发送数据,处理发送相关错误情况,根据缓冲区状态更新epoll事件void Sender(std::weak_ptr<Connection> conn){if (conn.expired()){return;}auto connection = conn.lock();int sock = connection->SockFd();auto &outbuffer = connection->OutBuffer();while (true){// 从套接字发送输出缓冲区中的数据ssize_t n = send(sock, outbuffer.c_str(), outbuffer.size(), 0);if (n > 0){// 如果发送成功,更新输出缓冲区,移除已发送的数据部分outbuffer.erase(0, n);if (outbuffer.empty())break;}else if (n == 0){return;}else{// 如果错误码表示资源暂时不可用(非阻塞模式下缓冲区不可写),跳出循环if (errno == EWOULDBLOCK)break;// 如果错误码表示被中断,继续尝试发送else if (errno == EINTR)continue;else{// 其他错误情况,调用异常回调处理connection->_except_cb(connection);return;}}}// 根据输出缓冲区是否为空,决定是否启用写事件(更新epoll关注的事件)if (!outbuffer.empty()){EnableEvent(sock, true, true);}else{EnableEvent(sock, true, false);}}// 处理连接异常情况的回调函数,从epoll实例中移除对该连接的关注,关闭套接字,删除连接管理中的记录void Excepter(std::weak_ptr<Connection> conn){if (conn.expired()){return;}auto connection = conn.lock();int sock = connection->SockFd();_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, sock, 0);close(sock);_connections.erase(sock);}// 根据传入参数启用或禁用指定套接字对应的读、写事件(通过更新epoll实例关注的事件来实现)void EnableEvent(int sock, bool readable, bool writeable){uint32_t events = 0;events |= ((readable? EPOLLIN : 0) | (writeable? EPOLLOUT : 0) | EPOLLET);_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);}// 判断给定的文件描述符对应的连接是否在服务器管理的连接容器中,用于检查连接是否合法存在bool IsConnectionSafe(int fd){auto iter = _connections.find(fd);if (iter == _connections.end()){return false;}return true;}// 事件分发函数,调用epoll_wait等待事件发生,根据事件类型调用相应的回调函数处理void Dispatcher(int timeout){int n = _epoller_ptr->EpollerWait(_revs, num, timeout);for (int i = 0; i < n; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;// 如果有异常事件,转换为读写事件一并处理if (events & EPOLLERR)events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLHUP)events |= (EPOLLIN | EPOLLOUT);if ((events & EPOLLIN) && IsConnectionSafe(sock)){if (_connections[sock]->_recv_cb){_connections[sock]->_recv_cb(_connections[sock]);}}if ((events & EPOLLOUT) && IsConnectionSafe(sock)){if (_connections[sock]->_send_cb){_connections[sock]->_send_cb(_connections[sock]);}}}}// 服务器主循环函数,设置服务器运行状态为非退出,不断调用Dispatcher进行事件分发处理void Loop(){_quit = false;while (!_quit){Dispatcher(-1);}_quit = true;}~TcpServer(){if (_listensock_ptr->Fd() >= 0)_listensock_ptr->Close();}private:// 指向Epoller对象的智能指针,用于管理epoll相关操作(如创建、更新、等待事件等)std::shared_ptr<Epoller> _epoller_ptr;// 指向Sock对象的智能指针,用于管理套接字相关操作(创建、绑定、监听等)std::shared_ptr<Sock> _listensock_ptr;// 无序映射容器,用于管理所有连接,以文件描述符为键,对应的Connection对象智能指针为值std::unordered_map<int, std::shared_ptr<Connection>> _connections;// 用于存储epoll_wait返回的事件信息的数组struct epoll_event _revs[num];uint16_t _port;bool _quit;func_t _OnMessage;
};
在 Accepter
获取到套接字后,将其添加到 Dispatcher
中时,仅添加了 EPOLLIN
(可读事件)以及 EPOLLET
( 边缘触发模式)这两个事件设置,这意味着让 epol
l只关心该套接字的读事件。 之所以做这样的设置,原因在于获取套接字这个阶段并没有需要发送的数据,所以不存在对写事件的需求,也就没必要让epoll去关注写事件了。
通常而言,读事件在这种情况下是常规会被设置的,以便能及时知晓是否有数据可读。而写事件与之不同,它遵循按需打开的原则,只有当有实际的数据需要向对应的套接字进行发送时,才会将写事件打开,并且在待发送的数据全部成功写入完毕后,又会立刻把写事件关闭,以此来更高效、精准地利用系统资源以及管理I/O事件。
在ET模式下,其通知机制具有特殊性,只有当数据发生变化时,才会向上层发出读写通知。鉴于此特性,为避免读写的数据出现丢失的情况,在进行读写操作时,需要尽可能一次性完成相应操作。 同时,若文件描述符处于阻塞状态,当遇到暂时无法继续读写(比如读时无数据可读、写时缓冲区已满等情况),就容易导致操作阻塞,进而使服务器卡死,无法正常运行其他业务逻辑。所以,为防止这类因阻塞而引发服务器卡死的情况出现,通常会将涉及读写操作的文件描述符设置为非阻塞状态。
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFD);if (fl < 0){std::cout << "fcntl error..." << std::endl;return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
IO系统调用函数存在一种特例情况,即当它出错返回且错误码被设置为 EINTR
时,意味着本次数据读取或数据写入操作在陷入内核后、尚未返回用户态之时,被信号中断了,也就是内核去处理其他信号了。而内核态在返回用户态之前,会检查未决信号集(也就是信号的 pending
位图),若其中存在未处理的信号,内核便会对该信号予以处理。所以为了防止信号对读写操作的干扰,我们需要对这种情况进行特判。
4. 测试
最后我们可以根据上层的具体需求提供特定的 _OnMessage
函数,我们这里就实现一个简单回声功能。
#include "TcpServer.hpp"
#include "Calculator.hpp"
#include <functional>
#include <memory>
void DefaultOnMessage(std::weak_ptr<Connection> conn)
{if (conn.expired())return;auto connection_ptr = conn.lock();Calculator calculator;std::string response_str = "echo $: ";std::cout << "上层得到了数据: " << connection_ptr->Inbuffer();response_str+=connection_ptr->Inbuffer();// response_str 发送出去connection_ptr->AppendOutBuffer(response_str);auto tcpserver = connection_ptr->_tcp_server_ptr.lock();tcpserver->Sender(connection_ptr);
}
int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(1);}uint16_t port = std::stoi(argv[1]);std::shared_ptr<TcpServer> Reactor(new TcpServer(port, DefaultOnMessage));Reactor->InitServer();Reactor->Loop();return 0;
}