当前位置: 首页 > news >正文

【计网】实现reactor反应堆模型 --- 处理数据发回问题 ,异常处理问题

在这里插入图片描述

没有一颗星,
会因为追求梦想而受伤,
当你真心渴望某样东西时,
整个宇宙都会来帮忙。
--- 保罗・戈埃罗 《牧羊少年奇幻之旅》---

实现Reactor反应堆模型

  • 1 数据处理
  • 2 数据发回问题
  • 3 异常处理问题
  • 4 运行效果

1 数据处理

在上一篇文章中我们搭建起了Reactor反应堆模型的基础框架,可以实现对数据接受。那么接下来就需要对数据进行处理之后发回。
在这里插入图片描述

数据处理方面,需要使用到协议,我这里采取使用JSON串的形式,将之前网络计算器项目的协议结构直接拿过来使用。
【计网】从零开始掌握序列化 — 实现网络计算器项目

HandlerConnection模块中connection连接的缓冲区中会接受到新的数据。接收到数据之后HandlerConnection模块的工作就完成了,下面需要进入协议解析模块进行处理。直接调用HandlerConnection模块的回调函数_process,进入协议解析模块。

协议解析模块的逻辑很简单:

  1. 判断缓冲区中是否有完整报文
  2. 对完整报文进行协议解析
  3. 对解析出来的数据进行处理,得到应答报文
  4. 将应答报文发送回去
#pragma once#include "Connection.hpp"
#include "InetAddr.hpp"
#include "Protocol.hpp"
#include "Log.hpp"
#include "NetCal.hpp"using namespace log_ns;class PackageParse
{
public:void Execute(Connection *conn){LOG(INFO, "service start!!!\n");while (true){// 1.报文解析std::string str = Decode(conn->Inbuffer()); // 通过去报头获取报文// std::cout << "str: " << str << std::endl;// 连接当前没有完整的报文! --- 直接退出if (str.empty())break;// 到这里说明有完整的报文!!!auto req = Factory::BuildRequestDefault();// 2.反序列化 得到 Requestreq->Deserialize(str);// auto res = Factory::BuildResponseDefault();// 3.业务处理auto res = cal.Calculator(req);// 4.进行序列化处理std::string ret;res->Serialize(&ret);std::cout << "ret: " << ret << std::endl;// 5.加入报头std::string package = Encode(ret);//std::cout << "package: \n"<< package << std::endl;// 6.发回数据// 直接进行发送 , 怎么知道写入条件不满足呢? 通过错误码errno是EAGAIN即可。conn->AppendOutbuffer(package);}// 到了这里 说明至少处理了一个请求 只是有一个应答// 进行发回数据if (!conn->Outbuffer().empty())conn->_handler_sender(conn); // 方法1:直接发回数据// 方法2:将写事件设置进入EPOLL就可以了 会自动进行读取}~PackageParse(){}private:NetCal cal; // 计算器
};

这里处理结束后,要将数据发回,但是我们还没有实现数据发回的逻辑,接下来我们来分析一下发回数据要怎么处理

2 数据发回问题

对于多进程与多线程的情况下,write更加简单,有多少发多少,直接进行阻塞式写入

但是对于多路转接来说,write比较复杂:

  • 当我们获得一个新的的fd时, 输入输出缓冲区默认都是空的
  • 读事件就绪:本质就是输入缓冲区有了数据,有了新连接。
  • 写事件就绪:不关心数据是什么,而是关心发送缓冲区中有没有空间,如果有空间,发送条件就是就绪的,否则不满足。
  • 把一个sockfd 托管给 select poll epoll,原因sockfd上事件没有就绪,还是事件就绪了?当然是不就绪的时候托管给EPOLL
  • 默认sockfd新建的情况下,读事件不是就绪的,因为输入缓冲区没有数据,所以读事件要常添加到epoll中托管
  • 默认sockfd新建的情况下,写事件是就绪的,因为输出缓冲区没有数据,所以写事件默认是直接写的
  • 所以 只有当写入条件不满足时,我们才按需开启对sockfd的EPOLLOUT事件进行托管, 一直写,到缓冲区写满时数据还没有发完,就需要开启对写事件的关心!
  • 对于写来说,当写入时出现条件不满足的情况时,后续剩余的数据,EPOLL会自动进行发送!
  • 如果直接对一个sockfd设置EPOLLOUT关心,epoll就会大量的就绪,因为输出缓冲区不会第一时间写满!
    未来如果发完了,对于EPOLLOUT事件的关心,就要被关闭
    如果缓冲区没写满,数据也发完了 ,就不需要开启写事件关心
    如果我们设置了对EPOLLOUT的关心,EPOLL对EPOLLOUT首次设置关心的时候默认会就绪一次!

这是根据以上准则整理出的代码:

void Sender(Connection *conn){errno = 0;//进行发送while(true){//进行发送数据ssize_t n = ::send(conn->Sockfd() ,conn->Outbuffer().c_str() , conn->Outbuffer().size() , 0);//发送成功if(n > 0){//向将读取成功的数据从缓冲区删除conn->DiscardOutbuffer(n);//判断是否读取完if(conn->Outbuffer().empty()) {break ;}}else if(n == 0){break ;}else{//通过errno判断错误类型if(errno == EWOULDBLOCK){//说明输出缓冲区满了break;}else if(errno == EINTR){//信号中断 继续发送continue;}else{//真的出错了LOG(ERROR , "send error , errno:%d\n" ,errno);//进入异常处理conn->_handler_excepter(conn);return ;}}}//到这里说明是写入条件不满足了if(!conn->Outbuffer().empty()){//EPOLL进行托管//开启对写事件的关心//LOG(DEBUG , "----------\n");conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd() , true , true); //发送完了呢?}else{//将写事件关闭conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd(), true , false);}}

这样我们就可以成功的将数据发回,并在发回条件不满足时,将写事件托管给EPOLL进行自动发送!发完之后需要将EPOLLOUT事件撤销。

这样写入的问题就解决了

3 异常处理问题

在写入和读取数据的过程中,所有的异常我们都是交给异常处理方法进行解决。而所以的异常,最终的都是要将连接中断,文件描述符关闭,解除EPOLL托管。

    void Excepter(Connection *conn){//整个代码所以的逻辑异常 都在这里处理//删除连接conn->GetReactor()->DelConnection(conn->Sockfd());}

去除连接的代码在Reactor中:

	void DelConnection(int sockfd){// 安全检测if (!IsConnExist(sockfd))return;LOG(INFO, "sockfd: %d quit , 服务器释放资源\n", sockfd);// 在内核中移除sockfd关心EnableConnectionReadWrite(sockfd, false, false);_epoller->DelEvent(sockfd);// Socketfd要进行关闭_conn[sockfd]->Close();// 在Reactor中移除Connection的关心delete _conn[sockfd];_conn.erase(sockfd);}

经过这个处理,出现异常的连接,就直接别删除了。

4 运行效果

截止目前为止,我们已经实现了:

  1. 通过Reactor托管Listener获取新连接
  2. EPOLL对新连接的读事件进行托管,获取数据
  3. 得到数据之后可以进行上层的协议解析与业务处理
  4. 数据处理之后,可以进行发回数据,发回条件不满足时,可以将写事件托管给Reactor进行自动处理

来看效果:
在这里插入图片描述

效果非常可以了!

下一篇文章我们来解决如何加入多线程与多进程,提高效率!


http://www.mrgr.cn/news/69627.html

相关文章:

  • 数据结构——快速排序
  • 世界坐标系、相机坐标系、图像物理坐标系、像素平面坐标系
  • Mac的极速文件搜索工具,高效管理文件
  • 自动化爬虫DrissionPage
  • 制作图片木马
  • opencv常用api
  • 【Linux】【线程操作与同步】汇总整理
  • 鸿蒙next版开发:ArkTS组件通用属性(图形变换)
  • AndroidStudio-视图基础
  • 链表的使用
  • 《Agent 工作流 2025》
  • AI与OCR:数字档案馆图像扫描与文字识别技术实现与项目案例
  • 01_docker安装
  • STM32各模块
  • Elasticsearch 实战应用:高效搜索与数据分析
  • 网络编程中非阻塞的实现方式
  • 540. 有序数组中的单一元素
  • SimpleMemory 博客园主题美化
  • 如何自己实现事件的订阅和发布呢?
  • 基于SpringBoot+Vue音乐播放和推荐系统【提供源码+答辩PPT+参考文档+项目部署】
  • PostgreSQL 用户登录失败账号锁定
  • 基于SpringBoot的“生鲜交易系统”的设计与实现(源码+数据库+文档+PPT)
  • numpy np.logical_not函数介绍
  • LLMs在供应链投毒检测中的应用
  • Python中的动态属性管理:使用`__getattr__`和`__setattr__`实现灵活的数据访问
  • 文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《基于数据-模型混合驱动方法的多类型移动应急资源优化调度策略 》