爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      【ceph】AsyncMessenger源码分析--new

      首页 知识中心 软件开发 文章详情页

      【ceph】AsyncMessenger源码分析--new

      2025-02-21 08:58:00 阅读次数:16

      函数,消息,调用

      1、Messenger的生命周期

      【ceph】AsyncMessenger源码分析--new​

       

      不同的角色(osd,mon,mds...)会启动相应的守护进程,如OSD,通过 ceph_这个文件来启动守护进程,首先进入的是main()函数;如图所示,以OSD为例描述了消息模块的生命周期,相关代码和说明如下:

      在main()函数中首先创建多个Messenger用于不同的通信,然后对注册的Messenger进行bind,绑定后start消息模块进行工作,消息模块start后init OSD。

      int main(int argc, const char **argv){
      
      ……
      //创建多个Messenger用于不同的通信
        Messenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,
                             entity_name_t::OSD(whoami), "client",
                             getpid(),
                             Messenger::HAS_HEAVY_TRAFFIC |
                             Messenger::HAS_MANY_CONNECTIONS);
        Messenger *ms_cluster = Messenger::create(……);
        Messenger *ms_hb_back_client = Messenger::create(……);
        Messenger *ms_hb_front_client = Messenger::create(……);
        Messenger *ms_hb_back_server = Messenger::create(……);
        Messenger *ms_hb_front_server = Messenger::create(……);
        Messenger *ms_objecter = Messenger::create(……);
      
      ……
      //然后对注册的Messenger进行bind
        if (ms_hb_front_server->bindv(hb_front_addrs) < 0)……
        if (ms_hb_front_client->client_bind(hb_front_addrs.front()) < 0)……
        if (ms_hb_back_server->bindv(hb_back_addrs) < 0)……
        if (ms_hb_back_client->client_bind(hb_back_addrs.front()) < 0)……
      ……
      
      
        osd = new OSD(g_ceph_context,
                      store,
                      whoami,
                      ms_cluster,
                      ms_public,
                      ms_hb_front_client,
                      ms_hb_back_client,
                      ms_hb_front_server,
                      ms_hb_back_server,
                      ms_objecter,
                      &mc,
                      data_path,
                      journal_path);
      
        int err = osd->pre_init();
        
      //绑定后start消息模块进行工作
        ms_public->start();
        ms_hb_front_client->start();
        ms_hb_back_client->start();
        ms_hb_front_server->start();
        ms_hb_back_server->start();
        ms_cluster->start();
        ms_objecter->start();
      
        // 消息模块start后init OSD //start osd
        err = osd->init();
      ……
      
        ms_public->wait();
        ms_hb_front_client->wait();
        ms_hb_back_client->wait();
        ms_hb_front_server->wait();
        ms_hb_back_server->wait();
        ms_cluster->wait();
        ms_objecter->wait();
      ……
      // done
        delete osd;
        delete ms_public;
        delete ms_hb_front_client;
        delete ms_hb_back_client;
        delete ms_hb_front_server;
        delete ms_hb_back_server;
        delete ms_cluster;
        delete ms_objecter;
      
      }

      在OSD的初始化中让Messenger处于ready状态,即准备工作状态。当消息模块工作结束后处于wait状态,如果需要的话则删除注册的Messenger。这就是消息模块大致的生命周期,下面详细描述一下每个过程的操作。


      在main()函数中注册了6个Messenger的实例,如下表所示。

      编号    Messenger实例名称    作用
      1    *ms_public    用来处理OSD和Client之间的消息
      2    *ms_cluster    用来处理OSD和集群之间的消息
      3    *ms_hbclient    用来处理OSD和其它OSD保持心跳的消息
      4    *ms_hb_back_server    用来处理OSD接收心跳消息
      5    *ms_hb_front_server    用来处理OSD发送心跳消息
      6    *ms_objecter    用来处理OSD和Objecter之间的消息

      Messenger是一个接口类(Interface Class,纯虚函数类既一个包含一个或多个纯虚函数的类),根据不同的需求对其进行实现,async就是AsyncMessenger。
      初始化消息模块后调用AsyncMessenger的bind函数()进行绑定,调用的参数是配置文件中的g_conf->public_addr和g_conf->cluster_addr等。

      AsyncMessenger的bind()函数执行的是Processor::bind()。在Processor的bind函数中真正完成了绑定,Processor的bind函数有两个参数,一个是addr,另一个是port。

      在Processor的bind函数中主要进行的操作有:
      1) 根据bind_addr得出socket的参数family;
      2) 创建socket,family参数根据步骤1)获取;
      3) 将socket设置为非阻塞;
      4) 绑定需要监听的端口;
      5) 获取绑定的socket的name;
      6) 监听端口。


      开启消息模块。

      在步骤2中create了Messenger(AsyncMessenger),还需要开启它的服务来工作,AsyncMessnger在start函数中调用WorkPool的start来具体执行AsyncMessenger的开启工作。


      启动OSD ( err = osd->init();)

      在这个之前有一个pre启动(int err = osd->pre_init()),在这个后面还有一个final启动(osd->final_init();)。执行OSD启动的是文件中的init函数,调用Messenger的add_dispatcher_head()函数将响应的消息的分拣器实例(dispatchers)加入到dispatchers的链表中。

      ……  
      // i'm ready!
        client_messenger->add_dispatcher_tail(&mgrc);
        client_messenger->add_dispatcher_tail(this);
        cluster_messenger->add_dispatcher_head(this);
      
        hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
      ……

      在add_dispatcher_head()函数中如果(添加的)是链表的第一个元素,则执行ready函数。

      // 调用AsyncMessenger::ready()启动AsyncMessenger。 
       void add_dispatcher_head(Dispatcher *d) {
         ……
          if (first)
            ready();  // Messenger::ready()
        }

      ready函数的具体执行是由AsyncMessenger的ready来实现(AsyncMessenger.ready内调用stack->ready(),这个stack就是在AsyncMessenger实例化的时候根据参数type=rdma/tcp创建的.

      通过WorkerPool来获取worker,然后启动事件处理中心来处理事件。启动worker线程(processors在创建AsyncMessenger的时候new的),并通知事件处理中心可以开始工作了,主要是事件的create和处理。

      void AsyncMessenger::ready(){
      ……
        for (auto &&p : processors)
          p->start();
      …… }

      这个时候AsyncMessenger的机制已经基本全部启动完成,可以进行正常的工作。

      Messenger进入wait状态,等待stop条件变量,一但stop添加变量满足,就完成清理工作,关闭所有的连接。执行完wait操作后,删除之前创建的Messenger。
       

      2、Messenger的初始化

      Messenger模块的初始化流程如下图所示。

      【ceph】AsyncMessenger源码分析--new​

      上图表示了消息模块初始化时一些关键函数的调用流程,和代码流程大体上是一致的。

      下面以OSD为例来描述消息模块的初始化流程。

      Messenger::create()用来创建多个Messenger,

      使用的是工厂模式,如果配置是async模式(传如参数是async),返回AsyncMessenger对象:

      //src/msg/
      ms_public = Messenger *Messenger::create(CephContext *cct, const string &type, entity_name_t name, ring lname,uint64_t nonce){
        ......
        else if ((r == 1 || type == "async") &&
      	   cct->check_experimental_feature_enabled("ms-type-async"))
          return new AsyncMessenger(cct, name, lname, nonce);
        ......
        return NULL;
      }

      在main()函数中调用AsyncMessenger::bind()绑定IP地址

      //然后对注册的Messenger进行bind
        if (ms_hb_front_server->bindv(hb_front_addrs) < 0)……
        if (ms_hb_front_client->client_bind(hb_front_addrs.front()) < 0)……
        if (ms_hb_back_server->bindv(hb_back_addrs) < 0)……
        if (ms_hb_back_client->client_bind(hb_back_addrs.front()) < 0)……
      ……

      在OSD模块的初始化函数init()中,调用add_dispatcher_head()或者add_dispatcher_tail()函数,执行如下操作:

      将OSD创建的所有Dispatcher添加到Messenger中定义的dispatchers队里中;
      调用AsyncMessenger::ready()启动AsyncMessenger。

      ……  
      // i'm ready!
        client_messenger->add_dispatcher_tail(&mgrc);
        client_messenger->add_dispatcher_tail(this);
        cluster_messenger->add_dispatcher_head(this);
      
        hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
        hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
      ……
      // 调用AsyncMessenger::ready()启动AsyncMessenger。 
       void add_dispatcher_head(Dispatcher *d) {
         ……
          if (first)
            ready();  // Messenger::ready()
        }

      Messenger模块的初始化主要启动两个模块:

      一个是EventerCenter(事件中心),事件中心的启动流程在下面小节中详细描述。

      另一个启动的是AsyncMessenger,调用AsyncMessenger::ready()执行Processor::start(Worker *w)进行具体的初始化工作。(processors是在实例化AsyncMessenger的时候new的,每个processors配一个woker)

      void AsyncMessenger::ready(){
      ……
        for (auto &&p : processors)
          p->start();
      …… }

      创建Messenger的时候,创建一个单例Stack,Stack内创建wokers,Messenger内创建processors,每个processor配一个woker。每个woker有一个center

      在EventCenter::create_file_event()中创建文件事件,调用EpollDriver::add_event(),执行epoll_ctl,启动向epoll注册事件,事件中心的EventCenter::process_events()在等待事件的产生。

      同时Processor::accept()也被执行来准备接收连接。

      至此,整个消息模块初始化完毕。

      【ceph】AsyncMessenger源码分析--new​

      在AsyncMessenger网络模块中,采用事件驱动模型,在事件驱动模型中有一个事件处理中心用来处理注册的事件。

      本节主要描述事件中心(EventCenter)的初始化。

      首先,在OSD守护进程中启动Messenger,由于Messenger是消息处理的一个接口,具体执行是由其子类来完成的,即AsyncMessenger::start()。

      AsyncMessenger执行启动时完成了工作线程池的启动——WorkerPool::start(),工作线程池根据配置参数ms_async_op_threads(默认值是2)创建对应数量的工作线程Worker。工作线程的作用就是处理事件,在工作线程中定义了一个事件中心EventCenter,事件的具体执行由EventCenter来完成,具体执行函数是EventCenter::process_events(),在函数中主要有以下三个操作:

      • 调用EpollDriver的event_wait()函数执行epoll_wait,即epoll的主循环,返回需要处理的事件数量,系统根据epoll_wait返回的值来处理事件;用一个for循环来处理epoll_wait返回的事件。
      • 调用FileEvent *_get_file_event()函数创建一个文件事件,根据文件事件的mask判定操作是读还是写,然后调用相应的回调函数进行处理;
      • 看一下外部事件容器中是否有事件需要处理,如果有,使用一个while循环来处理外部事件,具体处理过程还是调用事件的回调函数。

      至此,AsyncMessenger和事件中心(EventCenter)都已经启动完毕并且完成了初始化,消息中心也已经进入了工作状态,等待事件的到来并处理。下一节描述消息的接收。

       3.消息的发送


      如图下图所示为消息发送的基本流程。

      Messenger调用send_message()发送消息,由于Messenger是一个抽象类,具体执行是由AsyncMessenger::send_message():

      AsyncMessenger::send_message()

      --AsyncMessenger::_send_message()根据目的地址发送消息m。

      在AsyncMessenger::_send_message()中先寻找创建的连接conn,然后通过conn调用AsyncMessenger::submit_message()来提交消息,在函数中根据之前寻找的conn建立了连接。

      接着调用AsyncConnection::send_message()发送消息(放到到out_q队列,向EventCenter放回调函数,唤醒线程处理),代码如下(A------>B段):

      但是现在消息的形式是Message,如果通过网络进行发送出去,需要一个转换,即将Message转换成网络层可以识别的bufferlist的形式,这个过程是在AsyncConnection::handle_write()中通过AsyncConnection::write_message()来完成的,handle_write()将消息放到sent这个专门存放消息的链表中,标识哪些是需要发送的消息。然后write_message()中取出m的header、footer以及数据部分等放入complete_bl中,complete_bl是一个bufferlist,调用AsyncConnection::_try_send()把complete_bl中携带的数据发送出去。代码如下(B------>C段):

      #---------------------------------------------------------A 
      Messenger::send_message(Message *m, dest)
      AsyncMessenger::send_message(Message *m, dest)
      --|AsyncMessenger::_send_message(Message *m, dest)
      ----|conn=_lookup_conn(dest.addr) 
      //AsyncMessenger::conns <-accpet_conn()<-bind<-accpeting_conns[]<-add_accept() or create_connect
      ----|AsyncMessenger::submit_message(Message *m,conn,dest,...)
      ------|conn->send_message(m) # AsyncConnection::send_message(Message *m)
      --------|out_q[priority].emplace_back(std::move(bl),m)  #放入队列
      --------|EventCenter::dispatch_event_external(write_handler)        #回调操作(write_handler= new C_handle_write(this))放入event中心,wakeup线程执行
      ----------|external_event.push_back(write_handler)
      ----------|wakeup()
                   |
      #---------------------------------------------------------B
                   |
       w->center.process_events
                   |
               cb = event->read_cb;
               cb->do_request()
                   |C_handle_write
                   |
      			 |
      --|write_handler = new C_handle_write(this)
      C_handle_write::do_request(int fd)
      --|conn->handle_write()   # AsyncConnection::handle_write()
      ----|sent.push_back(m);
      ----|bufferlist data;m=_get_next_outgoing(&data); #从out_q中取出message同时转一个网络层可以识别的bufferlist的形式的data
      ----|AsyncConnection::write_message(m,data,more)
      ------|AsyncConnection::outcoming_bl <--bl
      ------|AsyncConnection::_try_send(bool more)
      #---------------------------------------------------------C

      AsyncConnection::_try_send调用 cs->send(outcoming_bl,more),

       cs->send(outcoming_bl,more)调用_csi->send(outcoming_bl,more) ,也就是cs的implement,RDMA的话就是RDMAConnectedSocketImpl::send(outcoming_bl,more),如下代码:

      ------|AsyncConnection::_try_send(bool more)
      --------|AsyncConnection::connectedSocket cs->send(outcoming_bl,more)
      ----------|connectedSocket::_csi->send(outcoming_bl,more)  #std::unique_ptr<ConnectedSocketImpl> _csi;
      ConnectedSocketImpl:: virtual ssizet_t send(bl,more) <=== RDMAConnectedSocketImpl::send
      ----------|RDMAConnectedSocketImpl::send(outcoming_bl,more)#----------------------------------------------------RDMA send 入口
      
      ------------|RDMAConnectedSocketImpl::pending_bl <--pending_bl.claim_append(bl);
      ------------|RDMAConnectedSocketImpl::submit(bool more)
      --------------|r = post_work_request(tx_buffers);
      --------------|while (current_buffer != tx_buffers.end()) {
                  isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
                  isge[current_sge].length = (*current_buffer)->get_offset();
                  isge[current_sge].lkey = (*current_buffer)->mr->lkey;
                  ……
                  iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
                  iswr[current_swr].next = NULL;
                  iswr[current_swr].sg_list = &isge[current_sge];
                  iswr[current_swr].num_sge = 1;
                  iswr[current_swr].opcode = IBV_WR_SEND;
                  iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
                  ……
                 }
      ------------|ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)
      

       以上是源码,我们修改后的:

      ------|AsyncConnection::_try_send(bool more)
      --------|AsyncConnection::connectedSocket cs->send(outcoming_bl,more)
      ----------|connectedSocket::_csi->send(outcoming_bl,more)  #std::unique_ptr<ConnectedSocketImpl> _csi;
      ConnectedSocketImpl:: virtual ssizet_t send(bl,more) <=== RDMAConnectedSocketImpl::send
      ----------|RDMAConnectedSocketImpl::send(outcoming_bl,more)#----------------------------------------------------RDMA send 入口
      
      ------------|RDMAConnectedSocketImpl::pending_bl <--pending_bl.claim_append(bl);
      ------------|RDMAConnectedSocketImpl::_submit_by_write(more)
      --------------|RDMAConnectedSocketImpl::_sbnmit_send_and_write(more,is_worker)
                     or
      			  |RDMAConnectedSocketImpl::write_data_to_raddrs(more,is_worker)
                    or
                    |RDMAConnectedSocketImpl::_submit_send_data(is_worker)
      ----------------|msg = get_send_msg_worker()/get_send_msg_polling()
      ----------------|RDMAConnectedSocketImpl::post_send_msg(msg)
      ------------------|pending_bl-->msg.data-->wr.id=&msg or pending_bl-->write_res->bl-->send_bl[i]
      ------------------|ibv_post_send(qp,&wr,&bad_wr)
                      |
      				|
      				qp
      				|
      				|
       
       
      AsyncConnection::AsyncConnection()
      AsyncMessenger::accpet_conn(Worker *w,ConnectedSocket cli_socket, addr,flag)
      

       下面这个图是TCP的,不太一样, 

       【ceph】AsyncMessenger源码分析--new​

      上述流程为消息发送的大体脉络流程,下面详细描述消息发送的各个过程。


      如图下图1所示,

      在AsyncMessenger::_send_message()

      根据目的地址调用AsyncConnectionRef _lookup_conn()寻找连接,然后调用AsyncMessenger::submit_message()执行消息的提交:

      首先判断连接是否已经建立:

      1、连接存在:

            直接调用AsyncConnection::send_message()函数执行消息的发送。

      2、连接不存在:

      判断消息所需的连接是否是本地连接

      1)连接不存在,是本地连接

           直接调用本地连接实例的AsyncConnection::send_message()函数来发送消息。

      2)连接不存在,不是本地连接

           需要根据消息发送的目的地址和连接的类型创建一个新的连接,然后用这个新的连接调用AsyncConnection::send_message()函数发送消息。

       图1:
      【ceph】AsyncMessenger源码分析--new​

       

      在AsyncConnection::send_message()函数中准备消息的发送过程如图下图2所示。

      首先判断消息的连接是否本地连接,如果是本地连接将消息放到local_messages链表中,然后直接调用AsyncConnection::local_deliver(),在函数中对local_messages进行一个判断,如果是空的结束本地传送,否则从local_messages链表中取出消息,设置一下当前的连接状态,设置接收的时间戳等信息,然后判断消息是需要快速派送还是正常途径派送,根据判断的结果执行相应的操作。

      如果不是本地消息,对消息进行判断是否需要快速派送,如果是快速派送的消息执行AsyncConnection::prepare_send_message(),将消息中的数据添加到bufferlist中。

      如果正常派送的消息,判断是否需要对消息进行一个处理,处理也是通过AsyncConnection::prepare_send_message()执行的,消息处理完成后调用AsyncConnection::write_message()进行写消息。

      图2:
      【ceph】AsyncMessenger源码分析--new​

      4、消息的接收

      消息接收的过程

      在消息模块初始化中创建了一个线程,循环执行EventCenter::process_events()处理EventCenter中的事件。

      消息模块启动时,EventCenter::process_events()的循环中没有事件需要处理(EventCenter中的还没有事件),直到Processor::start()函数执行EventCenter::create_file_event()创建文件事件放入事件中心EventCenter中。

      消息的处理也是借由事件来进行的,在EventCenter定义了两个回调指针——read_cb和write_cb,专门用来处理消息的读和写,具体执行由回调子类来实现。事件到达事件中心后,EventCenter::process_events从中取出事件,执行事件自带的回调函数,比如

      AsyncConnection::process()处理读操作,---->处理接收到的信息封装成上层需要的message

      AsyncConnection::handle_write()处理写操作--->处理要发送的信息。

      消息接收的过程:
      【ceph】AsyncMessenger源码分析--new​

       

      消息接收之前,事件中心(处理线程)已经启动了,也就是说启动了EventCenter::process_events,这时候还没有事件放入到事件中心去处理,可通过EventCenter::create_file_event()向事件中心中放入file事件,所以消息的接收也是以事件的形式操作的,然后调用相应的接收模块来接收消息。

      在Processor::start()调用EventCenter::create_file_event()时传递了两个参数,一个是mask的值EVENT_READABLE,告诉事件中心处理消息的读,另一个是回调指针的实例C_processor_accept,即当事件到达时,EventCenter::process_events回调C_processor_accept中的函数来处理该事件。

      EventCenter::create_file_event()接收事件操作是C_processor_accept实例。在EventCenter::create_file_event()函数中对mask进行判定,如果是EVENT_READABLE,调用相应的回调函数来处理事件,此时的回调类是C_processor_accept,执行的回调操作是接收连接,具体来说就是Processor::accept(),一方面调用标准socket函数accept接收连接,另一方面调用AsyncConnectionRef AsyncMessenger::add_accept()处理连接。

      AsyncMessenger::add_accept()处理请求时先创建了一个连接AsyncConnection,然后调用AsyncConnection的accept()来接收消息,先将state的值置为STATE_ACCEPTING,然后创建一个接收消息的事件让事件中心处理。事件的maks是EVENT_READABLE,回调操作是read_handler,如果这个时候创建事件这个操作被锁住了,则将消息读操作放入[外部事件]容器中,等到事件中心处理事件的函数解锁以后会去处理外部事件容器,也会继续处理消息的读操作。

      下面主要描述消息的接收状态是如何工作的(STATE_ACCEPTING)。
       

      消息接收初始工作流程

      【ceph】AsyncMessenger源码分析--new​

      消息的处理有两个途径

      一个是根据事件的mask判定事件是否为EVENT_READABLE,如果Yes将read_cb回调指针指向传入的回调操作,即read_handler。

      另一个处理途径是当前create_file_event()正在执行别的操作被锁住了,则通过dispatch_event_external()函数将回调操作read_handler放入external_events中,事件处理中心有一个循环在轮询external_events,一旦发现有回调操作放入,则调用相应的回调函数来处理。

      这两个途径最后都是通过执行read_handler的回调函数来完成消息的读操作。

      read_handler的回调操作是调用AsyncConnection::process()处理。在process()中有一个switch操作,根据之前accept()接收的state的值找到相应的执行体。

      default值进入AsyncConnection::_process_connection()处理。在_process_connection()中新建了一个bufferlist,把CEPH_BANNER添加到bl中,CEPH_BANNER是一个字符串,标识这个消息是ceph的数据。然后通过get_myaddr()获取一个Messenger实例的地址encode到bl中。接着将socket_addr也encode到bl中,调用try_send()执行消息的发送准备工作,try_send()函数执行完成后返回剩余没发送的消息字节的长度,

      如果返回的值为0,说明消息发送完毕,将stated的值置为STATE_ACCEPTING_WAIT_BANNER_ADDR,

      如果返回的值大于0,说明消息没有发送完成,将state的值置为STATE_WAIT_SEND。

      启动消息的接收以后,消息的接收状态是STATE_ACCEPTING,在这个状态中对消息进行了一些简单的处理,然后将state的值置为STATE_ACCEPTING_WAIT_BANNER_ADDR。类似于TCP的三次握手过程。每个接收状态下消息模块都会对消息进行一些简单的处理操作,比如Open消息,读取其中的头部、中间部分、数据部分,最后读取数据等,下面主要介绍消息接收状态(state)的转换过程,由于每个状态下对消息进行了相应的处理,

      直到STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH状态时消息接收完毕。
       

      消息接收状态转换图

      【ceph】AsyncMessenger源码分析--new​

       

      握手状态


      STATE_ACCEPTING

      如图所示为消息的状态转换图,开始建立连接以后消息的接收状态是STATE_ACCEPTING,在STATE_ACCEPTING状态下将消息加上了CEPH_BANNER这个标识,然后调用try_send()发送出去,成功以后将state的值置为STATE_ACCEPTING_WAIT_BANNER_ADDR。

      STATE_ACCEPTING_WAIT_BANNER_ADDR

      在STATE_ACCEPTING_WAIT_BANNER_ADDR状态下将CEPH_BANNER和peer_addr读取到state_buffer中,如果成功了将peer_addr加入到addr_bl,addr_bl是一个专门存放地址的链表。有一种可能是对端不清楚他们拥有哪些IP地址,因此需要将socket绑定绑定的IP地址告诉peer_addr,然后根据peer_addr的值调用set_peer_addr()函数建立连接,完成以后将state的值置为STATE_ACCEPTING_WAIT_CONNECT_MSG。

      STATE_ACCEPTING_WAIT_CONNECT_MSG

      在STATE_ACCEPTING_WAIT_CONNECT_MSG状态下首先读取connect_msg到state_buffer中,
      connect_msg是消息连接使用的一种数据结构,里面有一些标识和认证信息。从连接中读出消息的标识后清除这些标识,然后将state的值置为STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH。

      STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH

      在STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH状态下首先还是读取消息中的认证信息,然后放到authorizer_bl中,authorizer_bl是一个专门存放标识的bufferlist。然后根据authorizer_bl和authorizer_reply的值调用handle_connect_msg来处理连接。

      在AsyncConnection::handle_connect_msg()中首先根据peer_addr判断连接是否存在,如果连接是存在的可以进行后续的操作,对连接进行一些处理,然后调用AsyncConnection::_reply_accept()将回复信息发送给对端,发送信息的时候有一个flag,如果可以接受消息了则将CEPH_MSGR_TAG_SEQ作为flag回复,然后将state的值置为STATE_ACCEPTING_WAIT_SEQ。

      STATE_ACCEPTING_WAIT_SEQ

      在STATE_ACCEPTING_WAIT_SEQ状态下将确认信息读取到state_buffer中,然后根据确认信息对消息进行优先级的设置,如果是高优先级的消息先处理。最后将state的值置为STATE_ACCEPTING_READY,即可以接受消息了。

      STATE_ACCEPTING_READY

      在STATE_ACCEPTING_READY状态下主要操作是打印accept完成的信息,然后将用于连接的数据结构connect_msg清空,最后把state的值置为STATE_OPEN。

      处理消息状态


      STATE_OPEN

      在STATE_OPEN状态下首先读出标识信息tag,如果tag是CEPH_MSGR_TAG_MSG,即读取的是消息的标识,将state的值置为STATE_OPEN_MESSAGE_HEADER,否则进行一些其它的处理。

      STATE_OPEN_MESSAGE_HEADER

      在STATE_OPEN_MESSAGE_HEADER状态下读出消息的头部,然后进行一些类似CRC的校验工作,如果收到的是坏的消息中断当前的操作,返回错误信息,如果没有问题,将state的值置为STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,进行下一步的消息读取操作。

      STATE_OPEN_MESSAGE_THROTTLE_MESSAGE

      在STATE_OPEN_MESSAGE_THROTTLE_MESSAGE中对消息进行判断,如果阻塞了则创建一个时间的事件来等待处理,如果正常状态则将state的值置为STATE_OPEN_MESSAGE_THROTTLE_BYTES。

      STATE_OPEN_MESSAGE_THROTTLE_BYTES

      在STATE_OPEN_MESSAGE_THROTTLE_BYTES状态下计算一下当前收到的消息头部的操作,然后加上时间戳,最后将state的值置为STATE_OPEN_MESSAGE_READ_FRONT。

      STATE_OPEN_MESSAGE_READ_FRONT

      在STATE_OPEN_MESSAGE_READ_FRONT状态下调用read_until()函数将消息的头部(不同于之前的头部校验信息,这个是数据的front部分)读到front中,front是在AsyncConnection中定义的一个bufferlist的结构,专门用于存放消息的头部。完成以后将state的值置为STATE_OPEN_MESSAGE_READ_MIDDLE。

      STATE_OPEN_MESSAGE_READ_MIDDLE

      在STATE_OPEN_MESSAGE_READ_MIDDLE状态下和读取头部数据一样,调用read_until()函数将消息的中间部分读取到middle中,middle也是在AsyncConnection中定义的一个bufferlist的结构,专门用于存放消息的中间部分。完成以后将state的值置为STATE_OPEN_MESSAGE_READ_DATA_PREPARE。

      STATE_OPEN_MESSAGE_READ_DATA_PREPARE

      在STATE_OPEN_MESSAGE_READ_DATA_PREPARE状态下进行的是读取消息数据部分的准备工作,比如判断接收消息中数据部分的数据结构是不是足够容纳数据,如果现有的申请的接收数据的结构的空间大小不能容纳数据,则重新申请空间大小给其使用,如果可以则不用操作,最后将state的值置为STATE_OPEN_MESSAGE_READ_DATA,真正接收消息中的数据部分。

      STATE_OPEN_MESSAGE_READ_DATA

      在STATE_OPEN_MESSAGE_READ_DATA状态下用一个while循环来读取消息携带的数据,直到消息中没有数据可读才跳出循环,在循环中将消息读取到data中,data是在AsyncConnection中定义的一个bufferlist的结构,专门用于存放消息的数据部分。如果一次没有读完则终端当前的操作,等待下一次继续读取数据,最后将将state的值置为STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,准备读取消息的尾部然后将消息分发出去。

      STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH

      在STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH状态下主要读取消息的尾部,然后对读取到的消息进行处理后分发出去让注册的Dispatcher来处理,关于接收到的消息是如何处理的在下一节中主要分析。
       

      【ceph】AsyncMessenger源码分析--new​

       

      如图所示,本小节主要描述消息的处理。

      经过一系列的状态变换,消息通信的接受端读出了消息中的包含的信息,但是读出的数据大都放在bufferlist中,如果将接收到的消息分发出去,Dispatcher无法处理bufferlist,因此需要一个将bufferlist中的数据封装成消息Message的过程。然后将封装的Message发送给Dispatcher处理。

      在STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH状态下首先是从消息中读出尾部footer,然后将之前读取到的current_header、front、middle、data一起封装成消息,执行过程是调用Message *decode_message()来完成的,在函数中首先进行CRC校验,如果没有问题根据header中的type定义该类型的消息实例,然后调用Message::set_header()将header封装到消息实例中,调用Message::set_footer()将fooer封装到消息实例中,调用Message:: set_payload()将front封装到消息实例中,调用Message:: set_middle()将middle封装到消息实例中,调用Message:: set_data()将data封装到消息实例中,至此完成了消息的封装。下一步就可以将封装好的消息分发出去了。

      封装完消息以后调用Message::set_connection()将当前的连接添加到消息的连接中,然后执行Messenger::ms_fast_preprocess()对消息的分发进行一个预处理,具体执行时注册的Dispatcher来操作的,比如OSD。

      预处理完成以后对当前的消息进行一个判断,即当前的消息是不是需要快速派送的消息,如果需要快速派送,调用Messenger::ms_fast_dispatch(),从fast_dispatchers链表中选择注册的Dispatcher,对消息进行快速派发。如果不需要快速派发则调用正常派送流程,调用事件中心创建一个派送消息的事件,创建事件的时候新建一个消息派送类的实例。EventCenter中有一个线程在循环等待处理放入事件中心的事件,当发现需要派送消息时调用消息派送类的回调函数来执行消息的具体派送,函数是Messenger::ms_deliver_dispatch(),Messenger从dispatchers链表中选择注册的Dispatcher对消息进行普通派发。
       RDMA

      Infiniband 初始化的时候,会

      max_send_wr=max_qp_wr > ms_async_rdma_send_buffers?ms_async_rdma_send_buffers:max_qp_wr ;

      max_recv_wr=ms_async_rdma_receive_buffers;

      Cluster* channel;//RECV

      Cluster* send;// SEND

      申请max_send_wr*ms_async_rdma_buffer_size 的空间给发送的send->chunk

      申请max_recv_wr*ms_async_rdma_buffer_size 的空间给接收的channel->chunk

      Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
        : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
      {
      }
      
      void Infiniband::init()
      {
        Mutex::Locker l(lock);
      
        if (initialized)
          return;
      
        device_list = new DeviceList(cct);
        initialized = true;
      
        device = device_list->get_device(device_name.c_str());
        device->binding_port(cct, port_num);
        assert(device);
        ib_physical_port = device->active_port->get_port_num();
        pd = new ProtectionDomain(cct, device);
        assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
      
        max_recv_wr = device->device_attr->max_srq_wr;
        if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
          max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
          ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
        } else {
          ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
        }
      
        max_send_wr = device->device_attr->max_qp_wr;
        if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
          max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
          ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers"  << dendl;
        } else {
          ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
        }
      
        ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
                      << " completion entries" << dendl;
      
        memory_manager = new MemoryManager(device, pd,
                                           cct->_conf->ms_async_rdma_enable_hugepage);
        memory_manager->register_rx_tx(
            cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
      
        srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
        post_channel_cluster();
      
        dispatcher->polling_start();
      }
      
      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.csdn.net/bandaoyu/article/details/119728808,作者:bandaoyu,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:【质量】代码22种异味[变差的表征]和解决方法

      下一篇:【C++】join()和detach|不join()也不detach()的后果

      相关文章

      2025-05-19 09:04:44

      js小题2:构造函数介绍与普通函数对比

      js小题2:构造函数介绍与普通函数对比

      2025-05-19 09:04:44
      new , 关键字 , 函数 , 对象 , 构造函数
      2025-05-19 09:04:30

      【Canvas技法】辐射式多道光影的实现

      【Canvas技法】辐射式多道光影的实现

      2025-05-19 09:04:30
      代码 , 函数 , 实现
      2025-05-19 09:04:22

      外设驱动库开发笔记54:外设库驱动设计改进的思考

      外设驱动库开发笔记54:外设库驱动设计改进的思考

      2025-05-19 09:04:22
      使用 , 函数 , 初始化 , 定义 , 对象
      2025-05-19 09:04:14

      C语言字符函数和字符串函数--(超全超详细)

      C语言字符函数和字符串函数--(超全超详细)

      2025-05-19 09:04:14
      函数 , 字符 , 字符串
      2025-05-16 09:15:24

      如何将一串数字用函数的方法倒过来(C语言)

      如何将一串数字用函数的方法倒过来(C语言)

      2025-05-16 09:15:24
      函数 , 数字 , 数组
      2025-05-14 10:33:31

      计算机小白的成长历程——习题演练(函数篇)

      计算机小白的成长历程——习题演练(函数篇)

      2025-05-14 10:33:31
      函数 , 字符串 , 数组 , 知识点 , 编写 , 迭代 , 递归
      2025-05-14 10:33:31

      【数据结构】第一章——绪论(2)

      【数据结构】第一章——绪论(2)

      2025-05-14 10:33:31
      函数 , 实现 , 打印 , 理解 , 算法 , 输入 , 输出
      2025-05-14 10:33:25

      30天拿下Rust之高级类型

      Rust作为一门系统编程语言,以其独特的内存管理方式和强大的类型系统著称。其中,高级类型的应用,为Rust的开发者提供了丰富的编程工具和手段,使得开发者可以更加灵活和高效地进行编程。

      2025-05-14 10:33:25
      Rust , type , 代码 , 函数 , 类型 , 返回
      2025-05-14 10:33:16

      30天拿下Python之文件操作

      Python是一种高级编程语言,它提供了许多内置函数和模块来处理文件操作,主要包括:打开文件、读取文件、写入文件、关闭文件、获取目录列表等。

      2025-05-14 10:33:16
      Python , 使用 , 函数 , 文件 , 权限 , 目录
      2025-05-14 10:33:16

      C++ 11新特性之tuple

      在C++编程语言的发展历程中,C++ 11标准引入了许多开创性的新特性,极大地提升了开发效率与代码质量。其中,tuple(元组)作为一种强大的容器类型,为处理多个不同类型的值提供了便捷的手段。

      2025-05-14 10:33:16
      std , 元素 , 函数 , 初始化 , 模板 , 类型
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5264671

      查看更多

      最新文章

      【Canvas技法】辐射式多道光影的实现

      2025-05-19 09:04:30

      外设驱动库开发笔记54:外设库驱动设计改进的思考

      2025-05-19 09:04:22

      C语言字符函数和字符串函数--(超全超详细)

      2025-05-19 09:04:14

      如何将一串数字用函数的方法倒过来(C语言)

      2025-05-16 09:15:24

      30天拿下Rust之高级类型

      2025-05-14 10:33:25

      30天拿下Python之文件操作

      2025-05-14 10:33:16

      查看更多

      热门文章

      Python 函数调用父类详解

      2023-04-23 09:44:31

      C#8.0新语法

      2023-02-07 10:34:04

      游戏编程之六 游戏编程的特点

      2024-09-25 10:13:46

      实现远程线程DLL注入

      2023-05-04 08:57:15

      Python----map,filter,reduce,zip,lambda的使用方法

      2023-04-28 02:17:08

      Java学习之方法调用过程图解(理解)

      2023-04-06 06:35:24

      查看更多

      热门标签

      java Java python 编程开发 代码 开发语言 算法 线程 Python html 数组 C++ 元素 javascript c++
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      Java中的RPC远程过程调用技术详解

      Java中Lambda表达式的应用

      STL:哈希表和unordered系列容器的封装

      编译和连接

      编写和理解int main()函数的几个重要注意事项

      二级考试C语言基础知识精讲概述(一)

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号