爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      Reactor模式

      首页 知识中心 服务器 文章详情页

      Reactor模式

      2023-05-22 08:06:30 阅读次数:140

      linux,Reactor,服务器

       

      Reactor模式

      Reactor模式的定义

      Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。
      Reactor模式

      Reactor模式的角色构成

      Reactor主要由以下五个角色构成:

      角色 解释
      Handle(句柄) 用于标识不同的事件,本质就是一个文件描述符。
      Sychronous Event Demultiplexer(同步事件分离器) 本质就是一个系统调用,用于等待事件的发生。对于Linux来说,同步事件分离器指的就是I/O多路复用,比如select、poll、epoll等。
      Event Handler(事件处理器) 由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。
      Concrete Event Handler(具体事件处理器) 事件处理器中各个回调方法的具体实现。
      Initiation Dispatcher(初始分发器) 初始分发器实际上就是Reactor角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪时就调用事件处理器,最后调用对应的回调方法来处理这个事件。

      Reactor模式的工作流程

      Reactor模式的工作流程如下:

      1. 当应用向初始分发器注册具体事件处理器时,应用会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件与Handle关联。
      2. 初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
      3. 当所有的事件处理器注册完毕后,应用会启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。
      4. 当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器。
      5. 初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
      6. 初始分发器会调用其对应事件处理器当中对应的回调方法来响应该事件。

      epoll ET服务器(Reactor模式)

      如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。

      设计思路

      epoll ET服务器

      在epoll ET服务器中,我们需要处理如下几种事件:

      • 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
      • 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
      • 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。

      当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。

      Reactor模式的五个角色

      在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:

      • 句柄:文件描述符。
      • 同步事件分离器:I/O多路复用epoll。
      • 事件处理器:包括读回调、写回调和异常回调。
      • 具体事件处理器:读回调、写回调和异常回调的具体实现。
      • 初始分发器:Reactor类当中的Dispatcher函数。

      Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。

      EventItem类

      • 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调与某个文件描述符关联起来。
      • 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。

      所以我们可以设计一个EventItem类,该类当中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数,此外还有一些其他成员,后面实现的时候再做详细论述。

      Reactor类

      • 在Reactor的工作流程中说到,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件。
      • 本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理。

      对此我们可以设计一个Reactor类。

      • 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
      • 当事件就绪后需要根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调都封装到一个EventItem结构当中,所以实际我们就是需要根据文件描述符找到其对应的EventItem结构。
      • 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的EventItem结构之间的映射,这个unordered_map可以作为Reactor类的一个成员变量,当需要找某个文件描述符的EventItem结构时就可以通过该成员变量找到。
      • 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。

      此外,在Reactor类当中还有一些其他成员,后面实现的时候再做详细论述。

      epoll ET服务器的工作流程

      这个epoll ET服务器在Reactor模式下的工作流程如下:

      • 首先epoll ET服务器需要进行套接字的创建、绑定和监听。
      • 然后定义一个Reactor对象并初始化,初始化时要做的就是创建epoll模型。
      • 紧接着需要为监听套接字创建对应的EventItem结构,并调用Reactor类中提供的AddEvent函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的EventItem结构之间的映射关系。
      • 之后就可以不断调用Reactor类中的Dispatcher函数进行事件派发。

      在事件处理过程中,会不断向Dispatcher当中新增或删除事件,而每个事件就绪时都会自动调用其对应的回调函数进行处理,所以我们要做的就是不断调用Dispatcher函数进行事件派发即可。

      EventItem结构

      EventItem结构中除了包含文件描述符和其对应的读回调、写回调和异常回调之外,还包含一个输入缓冲区inbuffer、一个输出缓冲区outbuffer以及一个回指指针R。

      • 当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发来的数据,但我们并不能保证我们读取到了一个完整的报文,因此需要将读取到的数据暂时存放到该文件描述符对应的inbuffer当中,当inbuffer当中可以分离出一个完整的报文后再将其分离出来进行数据处理,这里的inbuffer本质就是用来解决粘包问题的。
      • 当处理完一个报文请求后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的数据暂时存放到该文件描述符对应的outbuffer当中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送outbuffer当中的数据。
      • EventItem结构当中设置回指指针R,便于快速找到我们定义的Reactor对象,因为后续我们需要根据EventItem结构找到这个Reactor对象。比如当连接事件就绪时,需要调用Reactor类当中的AddEvent函数将其添加到Dispatcher当中。

      此外,EventItem结构当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。

      代码如下:

      typedef int(*callback_t)(EventItem*);
      
      class EventItem{
      public:
      	int _sock; //文件描述符
      	Reactor* _R; //回指指针
      
      	callback_t _recv_handler; //读回调
      	callback_t _send_handler; //写回调
      	callback_t _error_handler; //异常回调
      
      	std::string _inbuffer; //输入缓冲区
      	std::string _outbuffer; //输出缓冲区
      public:
      	EventItem()
      		: _sock(-1)
      		, _R(nullptr)
      		, _recv_handler(nullptr)
      		, _send_handler(nullptr)
      		, _error_handler(nullptr)
      	{}
      	//管理回调
      	void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler)
      	{
      		_recv_handler = recv_handler;
      		_send_handler = send_handler;
      		_error_handler = error_handler;
      	}
      	~EventItem()
      	{}
      };
      

      Reactor类

      在Reactor类当中有一个unordered_map成员,用于建立文件描述符和与其对应的EventItem结构之间的映射,还有一个epfd成员,该成员是epoll模型对应的文件描述符。

      • 在初始化Reactor对象的时候就可以调用epoll_create函数创建epoll模型,并将该epoll模型对应的文件描述符用epfd成员记录下来,便于后续使用。
      • Reactor对象在析构的时候,需要调用close函数将该epoll模型进行关闭。

      代码如下:

      #define SIZE 256
      
      class Reactor{
      private:
      	int _epfd; //epoll模型
      	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
      public:
      	Reactor()
      		: _epfd(-1)
      	{}
      	void InitReactor()
      	{
      		//创建epoll模型
      		_epfd = epoll_create(SIZE);
      		if (_epfd < 0){
      			std::cerr << "epoll_create error" << std::endl;
      			exit(5);
      		}
      	}
      	~Reactor()
      	{
      		if (_epfd >= 0){
      			close(_epfd);
      		}
      	}
      };
      

      Dispatcher函数(事件分派器)

      Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。

      • 事件分派器要做的就是调用epoll_wait函数等待事件发生。
      • 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的EventItem结构,然后调用EventItem结构当中对应的回调函数对该事件进行处理即可。

      代码如下:

      #define MAX_NUM 64
      
      class Reactor{
      private:
      	int _epfd; //epoll模型
      	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
      public:
      	//事件分派器
      	void Dispatcher(int timeout)
      	{
      		struct epoll_event revs[MAX_NUM];
      		int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);
      		for (int i = 0; i < num; i++){
      			int sock = revs[i].data.fd; //就绪的文件描述符
      			if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)){ //异常事件就绪(优先处理)
      				if (_event_items[sock]._error_handler)
      					_event_items[sock]._error_handler(&_event_items[sock]); //调用异常回调
      			}
      			if (revs[i].events&EPOLLIN){ //读事件就绪
      				if (_event_items[sock]._recv_handler)
      					_event_items[sock]._recv_handler(&_event_items[sock]); //调用读回调
      			}
      			if (revs[i].events&EPOLLOUT){ //写事件就绪
      				if (_event_items[sock]._send_handler)
      					_event_items[sock]._send_handler(&_event_items[sock]); //调用写回调
      			}
      		}
      	}
      };
      

      说明一下:

      • 这里没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
      • 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
      • 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
      • 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。
      • 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。

      AddEvent函数

      Reactor类当中的AddEvent函数是用于进行事件注册的。

      • 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
      • 还需要传入该文件描述符对应的EventItem结构,表示当该文件描述符上的事件就绪后应该执行的回调方法。
      • 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。

      代码如下:

      class Reactor{
      private:
      	int _epfd; //epoll模型
      	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
      public:
      	void AddEvent(int sock, uint32_t event, const EventItem& item)
      	{
      		struct epoll_event ev;
      		ev.data.fd = sock;
      		ev.events = event;
      		
      		if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){ //将该文件描述符添加到epoll模型当中
      			std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;
      		}
      		else{
      			//建立sock与EventItem结构的映射关系
      			_event_items.insert({ sock, item });
      			std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;
      		}
      	}
      };
      

      DelEvent函数

      Reactor类当中的DelEvent函数是用于进行事件删除的。

      • 在删除事件时只需要传入一个文件描述符即可。
      • 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。

      代码如下:

      class Reactor{
      private:
      	int _epfd; //epoll模型
      	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
      public:
      	void DelEvent(int sock)
      	{
      		if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){ //将该文件描述符从epoll模型中删除
      			std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;
      		}
      		else{
      			//取消sock与EventItem结构的映射关系
      			_event_items.erase(sock);
      			std::cout << "从epoll模型中删除: " << sock << ",成功" << std::endl;
      		}
      	}
      };
      

      EnableReadWrite函数

      Reactor类当中的EnableReadWrite函数,用于使能或使能某个文件描述符的读写事件。

      • 调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件。
      • 还需要传入两个bool值,分别表示需要使能还是使能读写事件。
      • EnableReadWrite函数内部会调用epoll_ctl函数修改将该文件描述符的监听事件。

      代码如下:

      class Reactor{
      private:
      	int _epfd; //epoll模型
      	std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
      public:
      	void EnableReadWrite(int sock, bool read, bool write){
      		struct epoll_event ev;
      		ev.data.fd = sock;
      		ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
      		if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){ //修改该文件描述符所需要监视的事件
      			std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;
      		}
      	}
      };
      

      回调函数

      下面我们就可以实现一些回调函数,这里主要实现四个回调函数。

      • accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
      • recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
      • sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
      • errorer:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。

      当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数到EventItem结构当中。

      • 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
      • 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用accepter回调获取底层建立好的连接。
      • 而对于与客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
      • 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。

      accepter回调

      accepter回调用于处理连接事件,其工作流程如下:

      1. 调用accept函数获取底层建立好的连接。
      2. 将获取到的套接字设置为非阻塞,并为其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
      3. 将该套接字及其对应需要关心的事件注册到Dispatcher当中。

      下一次Dispatcher在进行事件派发时就会帮我们关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的EventItem结构中对应的回调方法。

      代码如下:

      int accepter(EventItem* item)
      {
      	while (true){
      		struct sockaddr_in peer;
      		memset(&peer, 0, sizeof(peer));
      		socklen_t len = sizeof(peer);
      		int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);
      		if (sock < 0){
      			if (errno == EAGAIN || errno == EWOULDBLOCK){ //并没有读取出错,只是底层没有连接了
      				return 0;
      			}
      			else if (errno == EINTR){ //读取的过程被信号中断了
      				continue;
      			}
      			else{ //获取连接失败
      				std::cerr << "accept error" << std::endl;
      				return -1;
      			}
      		}
      		SetNonBlock(sock); //将该套接字设置为非阻塞
      		//构建EventItem结构
      		EventItem sock_item;
      		sock_item._sock = sock;
      		sock_item._R = item->_R;
      		sock_item.ManageCallbacks(recver, sender, errorer); //注册回调方法
      		
      		Reactor* R = item->_R;
      		R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //将该套接字及其对应的事件注册到Dispatcher中
      	}
      	return 0;
      }
      

      需要注意的是,因为这里实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。

      • 因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
      • 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。

      accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。

      设置文件描述符为非阻塞

      设置文件描述符为非阻塞时,需要先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。

      代码如下:

      //设置文件描述符为非阻塞
      bool SetNonBlock(int sock)
      {
      	int fl = fcntl(sock, F_GETFL);
      	if (fl < 0){
      		std::cerr << "fcntl error" << std::endl;
      		return false;
      	}
      	fcntl(sock, F_SETFL, fl | O_NONBLOCK);
      	return true;
      }
      

      监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。

      • 如果错误码为EAGAIN或EWOULDBLOCK,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。
      • 如果错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
      • 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。

      accept、recv和send等IO系统调用为什么会被信号中断?

      IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。

      • 在内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
      • 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。

      写事件是按需打开的

      这里调用accept获取上来的套接字在添加到Dispatcher中时,只添加了EOPLLIN和EPOLLET事件,也就是说只让epoll帮我们关心该套接字的读事件。

      • 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此没有必要让epoll帮我们关心写事件。
      • 一般读事件是经常会被设置的,而写事件则是按序打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。

      recver回调

      recver回调用于处理读事件,其工作流程如下:

      1. 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应EventItem结构的inbuffer当中。
      2. 对inbuffer当中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer当中。
      3. 对切割出来的完整报文进行反序列化。
      4. 业务处理。
      5. 业务处理后形成响应报文。
      6. 将响应报头添加到对应EventItem结构的outbuffer当中,并打开写事件。

      下一次Dispatcher在进行事件派发时就会帮我们关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的EventItem结构中写回调方法,进而将outbuffer中的响应数据发送给客户端。

      代码如下:

      int recver(EventItem* item)
      {
      	if (item->_sock < 0) //该文件描述符已经被关闭
      		return -1;
      	//1、数据读取
      	if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
      		item->_error_handler(item);
      		return -1;
      	}
      
      	//2、报文切割
      	std::vector<std::string> datagrams;
      	StringUtil::Split(item->_inbuffer, &datagrams, "X");
      
      	for (auto s : datagrams){
      		//3、反序列化
      		struct data d;
      		StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
      
      		//4、业务处理
      		int result = 0;
      		switch (d._op)
      		{
      			case '+':
      				result = d._x + d._y;
      				break;
      			case '-':
      				result = d._x - d._y;
      				break;
      			case '*':
      				result = d._x * d._y;
      				break;
      			case '/':
      				if (d._y == 0){
      					std::cerr << "Error: div zero!" << std::endl;
      					continue; //继续处理下一个报文
      				}
      				else{
      					result = d._x / d._y;
      				}
      				break;
      			case '%':
      				if (d._y == 0){
      					std::cerr << "Error: mod zero!" << std::endl;
      					continue; //继续处理下一个报文
      				}
      				else{
      					result = d._x % d._y;
      				}
      				break;
      			default:
      				std::cerr << "operation error!" << std::endl;
      				continue; //继续处理下一个报文
      		}
      
      		//5、形成响应报文
      		std::string response;
      		response += std::to_string(d._x);
      		response += d._op;
      		response += std::to_string(d._y);
      		response += "=";
      		response += std::to_string(result);
      		response += "X"; //报文与报文之间的分隔符
      		
      		//6、将响应报文添加到outbuffer中
      		item->_outbuffer += response;
      		if (!item->_outbuffer.empty())
      			item->_R->EnableReadWrite(item->_sock, true, true); //打开写事件
      	}
      	return 0;
      }
      

      一、数据读取

      我们可以将循环调用recv函数读取数据的过程封装成一个recver_helper函数。

      • recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer当中。
      • 当recv函数的返回值小于0时同样需要进一步判断错误码,如果错误码为EAGAIN或EWOULDBLOCK则说明底层数据读取完毕了,如果错误码为EINTR则说明读取过程被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。
      • 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。

      代码如下:

      int recver_helper(int sock, std::string* out)
      {
      	while (true){
      		char buffer[128];
      		ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);
      		if (size < 0){
      			if (errno == EAGAIN || errno == EWOULDBLOCK){ //数据读取完毕
      				return 0;
      			}
      			else if (errno == EINTR){ //被信号中断,继续尝试读取
      				continue;
      			}
      			else{ //读取出错
      				return -1;
      			}
      		}
      		else if (size == 0){ //对端连接关闭
      			return -1;
      		}
      		//读取成功
      		buffer[size] = '\0';
      		*out += buffer; //将读取到的数据添加到该套接字对应EventItem结构的inbuffer中
      	}
      }
      

      二、报文切割

      报文切割本质就是为了防止粘包问题,而粘包问题实际是涉及到协议定制的。

      • 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
      • 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以“X”作为各个报文之间的分隔符,每个报文的最后都会以一个“X”作为报文结束的标志。
      • 因此现在要做的就是以“X”作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类当中。
      • Split函数要做的就是对inbuffer当中的字符串进行切割,将切割出来的一个个报文放到vector当中,对于最后无法切出完整报文的数据就留在inbuffer当中即可。

      代码如下:

      class StringUtil{
      public:
      	static void Split(std::string& in, std::vector<std::string>* out, std::string sep)
      	{
      		int start = 0;
      		size_t pos = in.find(sep, start);
      		while (pos != std::string::npos){
      			out->push_back(in.substr(start, pos - start));
      			start = pos + sep.size();
      			pos = in.find(sep, start);
      		}
      		in = in.substr(start);
      	}
      };
      

      三、反序列化

      在数据发送之前需要进行序列化encode,接收到数据之后需要对数据进行反序列化decode。

      • 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
      • 反序列化就是把字节序列恢复为原对象的过程。

      实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发来的都是需要服务器计算的计算表达式,因此可以用一个结构体来描述这样一个计算表达式,结构体当中包含两个操作数x和y,以及一个操作符op。

      struct data{
      	int _x;
      	int _y;
      	char _op;
      };
      

      此时这里所谓的反序列化就是将一个计算表达式转换成这样一个结构体,

      • 因此现在要做的就是将形如“1+2”这样的计算表达式转换成一个结构体,该结构体当中的x成员的值就是1,y的值就是2,op的值就是‘+’,这里将这个过程封装成一个Deserialize函数并放到StringUtil工具类当中。
      • Deserialize函数要做的工作其实也很简单,就是在传入的字符串当中找到操作符op,此时操作符左边的就是操作数x,右边的就是操作数y。

      代码如下:

      class StringUtil{
      public:
      	static void Deserialize(std::string& in, int* x, int* y, char* op)
      	{
      		size_t pos = 0;
      		for (pos = 0; pos < in.size(); pos++){
      			if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')
      				break;
      		}
      		if (pos < in.size()){
      			std::string left = in.substr(0, pos);
      			std::string right = in.substr(pos + 1);
      
      			*x = atoi(left.c_str());
      			*y = atoi(right.c_str());
      			*op = in[pos];
      		}
      		else{
      			*op = -1;
      		}
      	}
      };
      

      说明一下: 实际在做项目时不需要我们自己进行序列化和反序列化,我们一般会直接用JSON或XML这样的序列化反序列化工具。

      四、业务处理

      业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的资源。

      • 我们这里要做的业务处理非常简单,就是用反序列化后的数据进行数据计算,此时得到的计算结果就是客户端想要的。

      五、形成响应报文

      在业务处理后我们已经拿到了客户端想要的数据,现在我们要做的就是形成响应报文,由于我们这里规定每个报文都以“X”作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面都添加上一个“X”,表示这是之前某一个请求报文的响应报文,因为协议制定后就需要双方遵守。

      六、将响应报文添加到outbuffer中

      响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。

      sender回调

      sender回调用于处理写事件,其工作流程如下:

      1. 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构的outbuffer中删除。
      2. 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就需要将该套接字对应的写事件关闭,因为已经没有要发送的数据了,如果outbuffer当中的数据还有剩余,那么该套接字对应的写事件就应该继续打开。

      代码如下:

      int sender(EventItem* item)
      {
      	if (item->_sock < 0) //该文件描述符已经被关闭
      		return -1;
      
      	int ret = sender_helper(item->_sock, item->_outbuffer);
      	if (ret == 0){ //全部发送成功,不再关心写事件
      		item->_R->EnableReadWrite(item->_sock, true, false);
      	}
      	else if (ret == 1){ //没有发送完毕,还需要继续关心写事件
      		item->_R->EnableReadWrite(item->_sock, true, true);
      	}
      	else{ //写入出错
      		item->_error_handler(item);
      	}
      	return 0;
      }
      

      我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。

      • sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
      • 当send函数的返回值小于0时也需要进一步判断错误码,如果错误码为EAGAIN或EWOULDBLOCK则说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。
      • 如果错误码为EINTR则说明发送过程被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。
      • 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
      • 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。

      代码如下:

      int sender_helper(int sock, std::string& in)
      {
      	size_t total = 0; //累加已经发送的字节数
      	while (true){
      		ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);
      		if (size < 0){
      			if (errno == EAGAIN || errno == EWOULDBLOCK){ //底层发送缓冲区已经没有空间了
      				in.erase(0, total); //将已经发送的数据移出outbuffer
      				return 1; //缓冲区写满,没写完
      			}
      			else if (errno == EINTR){ //被信号中断,继续尝试写入
      				continue;
      			}
      			else{ //写入出错
      				return -1;
      			}
      		}
      		total += size;
      		if (total >= in.size()){
      			in.clear(); //清空outbuffer
      			return 0; //全部写入完毕
      		}
      	}
      }
      

      errorer回调

      errorer回调用于处理异常事件。

      • 对于异常事件就绪的套接字我们这里不做其他过多的处理,简单的调用close函数将该套接字关闭即可。
      • 但是在关闭该套接字之前,需要先调用DelEvent函数将该套接字从epoll模型中删除,并取消该套接字与其对应的EventItem结构的映射关系。
      • 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后因为读写操作失败再次调用errorer回调重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符值设置为-1。
      • 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符值是否有效,如果无效则不会进行后续操作。

      代码如下:

      int errorer(EventItem* item)
      {
      	item->_R->DelEvent(item->_sock); //将该文件描述符从epoll模型中删除,并取消该文件描述符与其EventItem结构的映射关系
      	close(item->_sock); //关闭该文件描述符
      
      	item->_sock = -1; //防止关闭后继续执行读写回调
      	return 0;
      }
      

      套接字相关

      这里可以编写一个Socket类,对套接字相关的接口进行一定程度的封装,为了让外部能够直接调用Socket类当中封装的函数,于是将这些函数定义成了静态成员函数。

      代码如下:

      class Socket{
      public:
      	//创建套接字
      	static int SocketCreate()
      	{
      		int sock = socket(AF_INET, SOCK_STREAM, 0);
      		if (sock < 0){
      			std::cerr << "socket error" << std::endl;
      			exit(2);
      		}
      		//设置端口复用
      		int opt = 1;
      		setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
      		return sock;
      	}
      	//绑定
      	static void SocketBind(int sock, int port)
      	{
      		struct sockaddr_in local;
      		memset(&local, 0, sizeof(local));
      		local.sin_family = AF_INET;
      		local.sin_port = htons(port);
      		local.sin_addr.s_addr = INADDR_ANY;
      		
      		socklen_t len = sizeof(local);
      
      		if (bind(sock, (struct sockaddr*)&local, len) < 0){
      			std::cerr << "bind error" << std::endl;
      			exit(3);
      		}
      	}
      	//监听
      	static void SocketListen(int sock, int backlog)
      	{
      		if (listen(sock, backlog) < 0){
      			std::cerr << "listen error" << std::endl;
      			exit(4);
      		}
      	}
      };
      

      运行epoll ET服务器

      运行我们的epoll ET服务器的步骤如下:

      • 首先需要进行的就是套接字的创建、绑定和监听,因为是ET模式下的epoll服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
      • 然后就可以实例化一个Reactor对象,并对其进行初始化,也就是创建epoll模型。
      • 紧接着需要为监听套接字定义一个EventItem结构,填充EventItem结构当中的各个字段,并将accepter回调设置为监听套接字的读回调方法。
      • 然后调用AddEvent函数将监听套接字及其需要关系的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,以及建立监听套接字与其对应EventItem结构的映射。
      • 最后就可以循环调用Reactor类当中的Dispatcher函数进行事件派发了。

      代码如下:

      #include "app_interface.hpp"
      #include "reactor.hpp"
      #include "socket.hpp"
      #include "util.hpp"
      #include <string>
      
      #define BACK_LOG 5
      
      static void Usage(std::string proc)
      {
      	std::cout << "Usage: " << proc << " port" << std::endl;
      }
      int main(int argc, char* argv[])
      {
      	if (argc != 2){
      		Usage(argv[0]);
      		exit(1);
      	}
      	int port = atoi(argv[1]);
      
      	//服务器监听套接字的创建、绑定和监听
      	int listen_sock = Socket::SocketCreate();
      	SetNonBlock(listen_sock); //将监听套接字设置为非阻塞
      	Socket::SocketBind(listen_sock, port);
      	Socket::SocketListen(listen_sock, BACK_LOG);
      	
      	//创建Reactor,并初始化
      	Reactor R;
      	R.InitReactor();
      
      	//创建监听套接字对应的EventItem结构            
      	EventItem item;
      	item._sock = listen_sock;
      	item._R = &R;
      	item.ManageCallbacks(accepter, nullptr, nullptr); //监听套接字只需要关心读事件
      	
      	//将监听套接字托管给Dispatcher
      	R.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);
      	
      	//循环进行事件派发
      	int timeout = 1000;
      	while (true){
      		R.Dispatcher(timeout);
      	}
      	return 0;
      }
      

      运行服务器后可以看到3号文件描述符被添加到了epoll模型中,这里的3号文件描述符其实就是监听套接字。
      Reactor模式
      当客户端连接服务器后,在服务器端会显示5号文件描述符被添加到了epoll模型当中,因为4号文件描述符已经被epoll模型使用了。
      Reactor模式
      此时客户端就可以向服务器发送一些简单计算任务,这些计算任务之间用“X”隔开,服务器收到计算请求并处理后就会将计算结果发送给客户端,这些计算结果之间也是用“X”隔开的。
      Reactor模式
      此外,由于使用了多路转接技术,虽然当前的epoll服务器是一个单进程的服务器,但它却可以同时为多个客户端提供服务。
      Reactor模式
      当客户端退出后服务器端也会将对应的文件描述符从epoll模型中删除。
      Reactor模式

      接入线程池

      单进程epoll服务器存在的问题

      因为当前的epoll服务器的业务处理逻辑比较简单,所以单进程的epoll服务器看起来没什么压力,但如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。

      解决思路

      可以在当前服务器的基础上接入线程池,当recver回调读取完数据并完成报文的切割和反序列化后,就可以将其构建成一个任务然后放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将时间耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。

      接入线程池

      在博主的另一篇博客当中详细介绍并实现了线程池,这里直接将线程池的代码接入到当前的epoll ET服务器,因此下面只会讲解接入线程池的方法,如果对线程池的实现有疑问的可以去阅读那篇博客。

      线程池的代码如下:

      #pragma once
      
      #include <iostream>
      #include <unistd.h>
      #include <queue>
      #include <pthread.h>
      
      #define NUM 5
      
       //线程池
      template<class T>
      class ThreadPool
      {
      public:
      	//提供一个全局访问点
      	static ThreadPool* GetInstance()
      	{
      		return &_sInst;
      	}
      private:
      	bool IsEmpty()
      	{
      		return _task_queue.size() == 0;
      	}
      	void LockQueue()
      	{
      		pthread_mutex_lock(&_mutex);
      	}
      	void UnLockQueue()
      	{
      		pthread_mutex_unlock(&_mutex);
      	}
      	void Wait()
      	{
      		pthread_cond_wait(&_cond, &_mutex);
      	}
      	void WakeUp()
      	{
      		pthread_cond_signal(&_cond);
      	}
      public:
      	~ThreadPool()
      	{
      		pthread_mutex_destroy(&_mutex);
      		pthread_cond_destroy(&_cond);
      	}
      	//线程池中线程的执行例程
      	static void* Routine(void* arg)
      	{
      		pthread_detach(pthread_self());
      		ThreadPool* self = (ThreadPool*)arg;
      		//不断从任务队列获取任务进行处理
      		while (true){
      			self->LockQueue();
      			while (self->IsEmpty()){
      				self->Wait();
      			}
      			T task;
      			self->Pop(task);
      			self->UnLockQueue();
      
      			task.Run(); //处理任务
      		}
      	}
      	void ThreadPoolInit()
      	{
      		pthread_t tid;
      		for (int i = 0; i < _thread_num; i++){
      			pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针
      		}
      	}
      	//往任务队列塞任务(主线程调用)
      	void Push(const T& task)
      	{
      		LockQueue();
      		_task_queue.push(task);
      		UnLockQueue();
      		WakeUp();
      	}
      	//从任务队列获取任务(线程池中的线程调用)
      	void Pop(T& task)
      	{
      		task = _task_queue.front();
      		_task_queue.pop();
      	}
      private:
      	ThreadPool(int num = NUM) //构造函数私有
      		: _thread_num(num)
      	{
      		pthread_mutex_init(&_mutex, nullptr);
      		pthread_cond_init(&_cond, nullptr);
      	}
      	ThreadPool(const ThreadPool&) = delete; //防拷贝
      
      	std::queue<T> _task_queue; //任务队列
      	int _thread_num; //线程池中线程的数量
      	pthread_mutex_t _mutex;
      	pthread_cond_t _cond;
      
      	static ThreadPool<T> _sInst;
      };
      
      template<class T>
      ThreadPool<T> ThreadPool<T>::_sInst;
      

      在服务器开始进行事件派发之前需要对线程池进行初始化:

      //初始化线程池
      ThreadPool<Task>::GetInstance()->ThreadPoolInit();
      

      设计任务类

      线程池有了之后需要定义出一个任务类,该任务类当中需要提供一个Run方法,这个Run方法就是线程池中的若干线程池从任务队列当中拿到任务后会执行的方法。

      • 在任务类中包含两个成员变量,成员变量d就是反序列化后用于进行业务处理的数据,成员变量item就是该套接字的EventItem结构,因为数据处理完后需要将形成的响应报文添加到该套接字对应outbuffer当中。
      • Run方法中处理数据的逻辑与之前的一样,只是现在将那部分代码放到了Run方法中。

      代码如下:

      #pragma once
      
      #include <iostream>
      #include "reactor.hpp"
      #include "comm.hpp"
      
      //任务类
      class Task
      {
      private:
      	struct data _d;
      	EventItem* _item;
      public:
      	Task(struct data d, EventItem* item)
      		: _d(d), _item(item)
      	{}
      	Task() //提供默认构造
      	{}
      	~Task()
      	{}
      	//处理任务的方法
      	void Run()
      	{
      		//4、业务处理
      		int result = 0;
      		switch (_d._op)
      		{
      		case '+':
      			result = _d._x + _d._y;
      			break;
      		case '-':
      			result = _d._x - _d._y;
      			break;
      		case '*':
      			result = _d._x * _d._y;
      			break;
      		case '/':
      			if (_d._y == 0){
      				std::cerr << "Error: div zero!" << std::endl;
      				return;
      			}
      			else{
      				result = _d._x / _d._y;
      			}
      			break;
      		case '%':
      			if (_d._y == 0){
      				std::cerr << "Error: mod zero!" << std::endl;
      				return;
      			}
      			else{
      				result = _d._x % _d._y;
      			}
      			break;
      		default:
      			std::cerr << "operation error!" << std::endl;
      			return;
      		}
      		std::cout << "thread[" << pthread_self() << "]:" << _d._x << _d._op << _d._y << "=" << result << std::endl;
      
      		//5、形成响应报文
      		std::string response;
      		response += std::to_string(_d._x);
      		response += _d._op;
      		response += std::to_string(_d._y);
      		response += "=";
      		response += std::to_string(result);
      		response += "X"; //报文与报文之间的分隔符
      		
      		//6、将响应报文添加到outbuffer中
      		_item->_outbuffer += response;
      		if (!_item->_outbuffer.empty())
      			_item->_R->EnableReadWrite(_item->_sock, true, true); //打开写事件
      	}
      };
      

      此时recver回调函数中在读取数据、报文切割、反序列化后就可以构建出一个任务对象,然后将该任务放到任务队列当中就行了。

      int recver(EventItem* item)
      {
      	if (item->_sock < 0) //该文件描述符已经被关闭
      		return -1;
      
      	//1、数据读取
      	if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
      		item->_error_handler(item);
      		return -1;
      	}
      
      	//2、报文切割
      	std::vector<std::string> datagrams;
      	StringUtil::Split(item->_inbuffer, &datagrams, "X");
      
      	for (auto s : datagrams){
      		//3、反序列化
      		struct data d;
      		StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
      
      		Task t(d, item); //构建任务
      		ThreadPool<Task>::GetInstance()->Push(t); //将任务push到线程池的任务队列中
      	}
      	return 0;
      }
      

      至此线程池便接入完毕了,由于我们在任务被处理后将处理对应任务线程的ID进行了打印,因此现在客户端向服务器发送的计算请求在被处理后,可以在服务器端看到处理该任务的线程的ID。
      Reactor模式

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.csdn.net/chenlong_cxy/article/details/126387589,作者:2021dragon,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:在IDEA中配置SFTP远程Linux服务器系统

      下一篇:解决nginx代理web服务器获取用户(访问者)真实IP的问题

      相关文章

      2025-05-19 09:05:01

      项目更新到公网服务器的操作步骤

      项目更新到公网服务器的操作步骤

      2025-05-19 09:05:01
      公网 , 数据库 , 文件 , 更新 , 服务器
      2025-05-19 09:04:53

      查看RISC-V版本的gcc中默认定义的宏

      查看RISC-V版本的gcc中默认定义的宏

      2025-05-19 09:04:53
      c++ , linux
      2025-05-19 09:04:44

      FinalShell 配置SSH密钥登陆

      FinalShell 配置SSH密钥登陆

      2025-05-19 09:04:44
      密钥 , 服务器 , 配置
      2025-05-14 10:33:16

      30天拿下Python之使用网络

      Python网络编程覆盖的范围非常广,包括:套接字编程、socketserver、HTTP和Web开发、异步编程和asyncio等。

      2025-05-14 10:33:16
      Json , TCP , 客户端 , 接字 , 服务器 , 示例 , 连接
      2025-05-14 09:51:21

      python 在创建socket之后建立心跳机制

      在Python中,建立心跳机制通常用于维持客户端和服务器之间的长连接,确保连接活跃性。心跳机制通常是通过定时发送心跳包(一种小型的、特定格式的数据包)来实现的,如果在预定时间内没有收到对方的心跳响应,则认为连接可能已经中断。

      2025-05-14 09:51:21
      发送 , 客户端 , 服务器 , 服务器端 , 示例
      2025-05-13 09:53:23

      在Java、Java Web中放置图片、视频、音频、图像文件的方法

      在Java软件中放置图片,通常涉及将图片文件(如JPEG、PNG等)作为资源包含在我们的项目中,并在代码中通过适当的方式引用这些资源。这可以通过多种方式实现,但最常见的是在Java桌面应用(如Swing或JavaFX)或Web应用(如Servlet/JSP)中。

      2025-05-13 09:53:23
      JSP , URL , Web , 图片 , 服务器
      2025-05-09 09:21:53

      WebAPI 和 webservice的区别

      WebAPI 和 webservice的区别

      2025-05-09 09:21:53
      HTTP , 协议 , 客户端 , 服务器 , 请求
      2025-05-09 08:51:09

      git学习(1)(简单概述、代码版本控制方式(集中/分布))

      git学习(1)(简单概述、代码版本控制方式(集中/分布))

      2025-05-09 08:51:09
      git , 一个 , 代码 , 共享 , 开发人员 , 服务器 , 版本
      2025-05-06 09:19:12

      redis高可用集群搭建

      redis高可用集群搭建

      2025-05-06 09:19:12
      master , redis , 服务器 , 节点 , 集群
      2025-05-06 09:19:00

      基于javaWeb+jsp人力资源管理系统(含文档)

      基于javaWeb+jsp人力资源管理系统(含文档)

      2025-05-06 09:19:00
      download , 数据库 , 文档 , 服务器 , 管理
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5251681

      查看更多

      最新文章

      项目更新到公网服务器的操作步骤

      2025-05-19 09:05:01

      FinalShell 配置SSH密钥登陆

      2025-05-19 09:04:44

      redis高可用集群搭建

      2025-05-06 09:19:12

      基于javaWeb+jsp人力资源管理系统(含文档)

      2025-05-06 09:19:00

      【Linux 从基础到进阶】Apache服务器搭建与优化

      2025-05-06 09:18:38

      【Linux 从基础到进阶】FTP/SFTP 服务器搭建与管理

      2025-05-06 08:30:38

      查看更多

      热门文章

      linux篇-linux iptables配置

      2023-03-16 06:47:52

      Linux运维小技巧---每日收集所有服务器信息并归档到指定服务器

      2023-03-16 07:49:58

      用ftp服务器进行yum源的设置

      2023-04-21 03:04:35

      Confluence 6 配置服务器基础地址

      2023-04-23 09:34:48

      linux中常见工具安装问题集锦

      2023-05-05 10:12:49

      linux环境日志排查,cat命令关键字查找、最近1000条、定位到指定位置

      2022-12-28 07:22:30

      查看更多

      热门标签

      服务器 linux 虚拟机 Linux 数据库 运维 网络 日志 数据恢复 java python 配置 nginx centos mysql
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      Linux命令之查看命令类型type

      linux nohup命令如何使用?

      你在服务器上的一举一动,我可都看着!linux超骚技巧三分钟Get

      Linux脚本练习之script083-nginx日志分析之查询某个IP的详细访问情况

      搞死人的windows地址监听

      成功解决:CentOS 7中如何配置修改Vim

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号