【计网】实现reactor反应堆模型 --- 处理数据发回问题 ,异常处理问题
没有一颗星,
会因为追求梦想而受伤,
当你真心渴望某样东西时,
整个宇宙都会来帮忙。
--- 保罗・戈埃罗 《牧羊少年奇幻之旅》---
实现Reactor反应堆模型
- 1 数据处理
- 2 数据发回问题
- 3 异常处理问题
- 4 运行效果
1 数据处理
在上一篇文章中我们搭建起了Reactor反应堆模型的基础框架,可以实现对数据接受。那么接下来就需要对数据进行处理之后发回。
数据处理方面,需要使用到协议,我这里采取使用JSON串的形式,将之前网络计算器项目的协议结构直接拿过来使用。
【计网】从零开始掌握序列化 — 实现网络计算器项目
在HandlerConnection
模块中connection
连接的缓冲区中会接受到新的数据。接收到数据之后HandlerConnection
模块的工作就完成了,下面需要进入协议解析模块进行处理。直接调用HandlerConnection
模块的回调函数_process
,进入协议解析模块。
协议解析模块的逻辑很简单:
- 判断缓冲区中是否有完整报文
- 对完整报文进行协议解析
- 对解析出来的数据进行处理,得到应答报文
- 将应答报文发送回去
#pragma once#include "Connection.hpp"
#include "InetAddr.hpp"
#include "Protocol.hpp"
#include "Log.hpp"
#include "NetCal.hpp"using namespace log_ns;class PackageParse
{
public:void Execute(Connection *conn){LOG(INFO, "service start!!!\n");while (true){// 1.报文解析std::string str = Decode(conn->Inbuffer()); // 通过去报头获取报文// std::cout << "str: " << str << std::endl;// 连接当前没有完整的报文! --- 直接退出if (str.empty())break;// 到这里说明有完整的报文!!!auto req = Factory::BuildRequestDefault();// 2.反序列化 得到 Requestreq->Deserialize(str);// auto res = Factory::BuildResponseDefault();// 3.业务处理auto res = cal.Calculator(req);// 4.进行序列化处理std::string ret;res->Serialize(&ret);std::cout << "ret: " << ret << std::endl;// 5.加入报头std::string package = Encode(ret);//std::cout << "package: \n"<< package << std::endl;// 6.发回数据// 直接进行发送 , 怎么知道写入条件不满足呢? 通过错误码errno是EAGAIN即可。conn->AppendOutbuffer(package);}// 到了这里 说明至少处理了一个请求 只是有一个应答// 进行发回数据if (!conn->Outbuffer().empty())conn->_handler_sender(conn); // 方法1:直接发回数据// 方法2:将写事件设置进入EPOLL就可以了 会自动进行读取}~PackageParse(){}private:NetCal cal; // 计算器
};
这里处理结束后,要将数据发回,但是我们还没有实现数据发回的逻辑,接下来我们来分析一下发回数据要怎么处理
2 数据发回问题
对于多进程与多线程的情况下,write更加简单,有多少发多少,直接进行阻塞式写入。
但是对于多路转接来说,write比较复杂:
- 当我们获得一个新的的fd时, 输入输出缓冲区默认都是空的
- 读事件就绪:本质就是输入缓冲区有了数据,有了新连接。
- 写事件就绪:不关心数据是什么,而是关心发送缓冲区中有没有空间,如果有空间,发送条件就是就绪的,否则不满足。
- 把一个
sockfd
托管给select poll epoll
,原因sockfd
上事件没有就绪,还是事件就绪了?当然是不就绪的时候托管给EPOLL。 - 默认sockfd新建的情况下,读事件不是就绪的,因为输入缓冲区没有数据,所以读事件要常添加到epoll中托管
- 默认sockfd新建的情况下,写事件是就绪的,因为输出缓冲区没有数据,所以写事件默认是直接写的
- 所以 只有当写入条件不满足时,我们才按需开启对sockfd的EPOLLOUT事件进行托管, 一直写,到缓冲区写满时数据还没有发完,就需要开启对写事件的关心!
- 对于写来说,当写入时出现条件不满足的情况时,后续剩余的数据,EPOLL会自动进行发送!
- 如果直接对一个sockfd设置EPOLLOUT关心,epoll就会大量的就绪,因为输出缓冲区不会第一时间写满!
未来如果发完了,对于EPOLLOUT事件的关心,就要被关闭
如果缓冲区没写满,数据也发完了 ,就不需要开启写事件关心
如果我们设置了对EPOLLOUT的关心,EPOLL对EPOLLOUT首次设置关心的时候默认会就绪一次!
这是根据以上准则整理出的代码:
void Sender(Connection *conn){errno = 0;//进行发送while(true){//进行发送数据ssize_t n = ::send(conn->Sockfd() ,conn->Outbuffer().c_str() , conn->Outbuffer().size() , 0);//发送成功if(n > 0){//向将读取成功的数据从缓冲区删除conn->DiscardOutbuffer(n);//判断是否读取完if(conn->Outbuffer().empty()) {break ;}}else if(n == 0){break ;}else{//通过errno判断错误类型if(errno == EWOULDBLOCK){//说明输出缓冲区满了break;}else if(errno == EINTR){//信号中断 继续发送continue;}else{//真的出错了LOG(ERROR , "send error , errno:%d\n" ,errno);//进入异常处理conn->_handler_excepter(conn);return ;}}}//到这里说明是写入条件不满足了if(!conn->Outbuffer().empty()){//EPOLL进行托管//开启对写事件的关心//LOG(DEBUG , "----------\n");conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd() , true , true); //发送完了呢?}else{//将写事件关闭conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd(), true , false);}}
这样我们就可以成功的将数据发回,并在发回条件不满足时,将写事件托管给EPOLL进行自动发送!发完之后需要将EPOLLOUT事件撤销。
这样写入的问题就解决了
3 异常处理问题
在写入和读取数据的过程中,所有的异常我们都是交给异常处理方法进行解决。而所以的异常,最终的都是要将连接中断,文件描述符关闭,解除EPOLL托管。
void Excepter(Connection *conn){//整个代码所以的逻辑异常 都在这里处理//删除连接conn->GetReactor()->DelConnection(conn->Sockfd());}
去除连接的代码在Reactor中:
void DelConnection(int sockfd){// 安全检测if (!IsConnExist(sockfd))return;LOG(INFO, "sockfd: %d quit , 服务器释放资源\n", sockfd);// 在内核中移除sockfd关心EnableConnectionReadWrite(sockfd, false, false);_epoller->DelEvent(sockfd);// Socketfd要进行关闭_conn[sockfd]->Close();// 在Reactor中移除Connection的关心delete _conn[sockfd];_conn.erase(sockfd);}
经过这个处理,出现异常的连接,就直接别删除了。
4 运行效果
截止目前为止,我们已经实现了:
- 通过Reactor托管Listener获取新连接
- EPOLL对新连接的读事件进行托管,获取数据
- 得到数据之后可以进行上层的协议解析与业务处理
- 数据处理之后,可以进行发回数据,发回条件不满足时,可以将写事件托管给Reactor进行自动处理
来看效果:
效果非常可以了!
下一篇文章我们来解决如何加入多线程与多进程,提高效率!