爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      【ceph 】ceph messenger的Dispatcher模块分析

      首页 知识中心 其他 文章详情页

      【ceph 】ceph messenger的Dispatcher模块分析

      2025-02-25 08:55:16 阅读次数:9

      gt,消息

      Dispatcher模块

      dispatcher 是消息分发中心,所有收到的消息都经由该模块,并由该模块转发给相应的处理模块(moncliet、mdsclient、osd等)。

      如图:

         创建messenger后,把处理消息的dispatcher add 到dispatcher list里面,当messenger接收到消息msg之后,把msg放入msg的队列,由消息处理线程取出来,遍历dispatcher list里面dispatcher,在dispatcher里面的switch比对msg-type,如果刚好有可以处理该msg的分支就处理,dispatcher返回ture,结束。如果该dispatcher里面的switch可以处理该msg的分支,就到default,dispatcher返回false。msg接着会被交给下一个dispatcher去处理。直到被处理,或dispatcher遍历完。

      【ceph 】ceph messenger的Dispatcher模块分析

              Dispatcher类是一个基类,里面设计封装了应用同Messenger交互的接口(每个具体的Dispatcher派生类自行去实现更具体的消息处理,一般都是再根据消息类型来分开处理消息)。Dispatcher并不是所有的接口封装都是为了转发消息,它更深层次的含义是提供一个应用层和底层的通信接口,而这个接口的桥梁是Messenger消息管理器。

      当底层有消息到来时,Messenger会将消息转给dispatcher对于的ms_*系列的接口,最常用的是ms_dispatch接口,因此你可以看到像monitor,osd这些应用的核心消息处理都在ms_dispatch接口里面实现。

       

      bool OSD::ms_dispatch(Message *m)
      {
       ……
        _dispatch(m);

      ……
        return true;
      }

       

       

       

      void OSD::_dispatch(Message *m)
      {
       
        switch (m->get_type()) {
          // -- don't need OSDMap --
          // map and replication
        case CEPH_MSG_OSD_MAP:
          handle_osd_map(static_cast<MOSDMap*>(m));
          break;
        case MSG_MON_GET_PURGED_SNAPS_REPLY:
          handle_get_purged_snaps_reply(static_cast<MMonGetPurgedSnapsReply*>(m));
          break;
          // osd
        case MSG_OSD_SCRUB:
          handle_scrub(static_cast<MOSDScrub*>(m));
          break;
        case MSG_COMMAND:
          handle_command(static_cast<MCommand*>(m));
          return;
          // -- need OSDMap --
        case MSG_OSD_PG_CREATE:
          {
            OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
            if (m->trace)
              op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
            // no map?  starting up?
            if (!get_osdmap()) {
              dout(7) << "no OSDMap, not booted" << dendl;
          logger->inc(l_osd_waiting_for_map);
              waiting_for_osdmap.push_back(op);
          op->mark_delayed("no osdmap");
              break;
            }
            // need OSDMap
            dispatch_op(op);
          }
        }
      }

      Dispatcher的使用

      最简单的方式就是应用本身作为Dispatcher的派生类,如此,Messenger便是直接通过应用关联,比如Monitor、osd、mgr都是应用组件本身作为Dispatcher的派生类。

      class Monitor : public Dispatcher

      class MDSDaemon : public Dispatcher 

      申请一个Dispatcher的派生类实例,做为应用的模块注册给Messenger,比如RadosClient里面会注册各个Client给Messenger,而这些Client都是Dispatcher的派生类。

      class MDSDaemon : public Dispatcher

      {……}

      int MDSDaemon::init()
      {

      ……

        messenger->add_dispatcher_tail(&beacon);
        messenger->add_dispatcher_tail(this); #this 就是MDSDaemon 它自己

      ……
       }

      bool MDSDaemon::ms_dispatch(const ref_t<Message> &m)
      {
      ……
      }

       

      例子分析:

      Dipatcher类是消息分发的接口,OSD、MON、等类都继承该类,并实现了Dipatcher的消息分发接口

      class OSD : public Dispatcher,public md_config_obs_t
      {
          /** OSD **/
      }
      
      class Monitor : public Dispatcher,public md_config_obs_t
      {
      public:
      
          // me
          string name;
      }

      在OSD::init()函数中把  处理不同消息  的Dipatcher加入到Messenger实例中 

      // i'm ready!
      client_messenger->add_dispatcher_head(this);
      cluster_messenger->add_dispatcher_head(this);
      hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
      hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
      hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
      objecter_messenger->add_dispatcher_head(service.objecter);

      Messenger::add_dispatcher_head(Dispatcher *d) 把Dipatcher放入链表Messenger::list<Dispatcher*> dispatchers中,并调用ready(),SimpleMessenger::ready()重写了基类的ready,

      void add_dispatcher_head(Dispatcher *d)
      {
          bool first = dispatchers.empty();
          dispatchers.push_front(d);
          if (d->ms_can_fast_dispatch_any())
              fast_dispatchers.push_front(d);
          if (first)
              ready();
      }

      在ready函数中调用DispatchQueue::start, start()函数启动DispatchQueue::DispatchThread和DispatchQueue::LocalDeliveryThread线程类,最终调用DispatchQueue::entry()和DispatchQueue::run_local_delivery。

      void DispatchQueue::start()
      {
          assert(!stop);
          assert(!dispatch_thread.is_started());
          dispatch_thread.create("ms_dispatch");    //调用Thread::create->Thread::try_create->Thread::_entry_func->Thread::entry_wrapper->DispatchThread::entry
          local_delivery_thread.create("ms_local");
      }
      class DispatchThread : public Thread
      {
          DispatchQueue *dq;
      public:
          explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
          void *entry()
          {
              dq->entry();
              return 0;
          }
      } dispatch_thread;

      在DispatchQueue::entry()中调用根据不同的命令码qitem.get_code调用不同的Messenger类中的处理函数:

      void DispatchQueue::entry()
      {
          .
          .

          switch (qitem.get_code())
          {
          case D_BAD_REMOTE_RESET:
              msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
              break;
          case D_CONNECT:
              msgr->ms_deliver_handle_connect(qitem.get_connection());
              break;
          case D_ACCEPT:
              msgr->ms_deliver_handle_accept(qitem.get_connection());
              break;
          case D_BAD_RESET:
              msgr->ms_deliver_handle_reset(qitem.get_connection());
              break;
          default:
              assert(0);
          }
      }
      else
      {
          Message *m = qitem.get_message();
          if (stop)
          {
              ldout(cct, 10) << " stop flag set, discarding " << m << " " << *m << dendl;
              m->put();
          }
          else
          {
              uint64_t msize = pre_dispatch(m);
              msgr->ms_deliver_dispatch(m);
              post_dispatch(m, msize);
          }
      }
      .
      .

      }

      在Messenger::ms_deliver_dispatch中最终遍历注册到链表的Dipatcher,找到可以处理到来的消息的 Dipatcher调用Dipatcher继承类的ms_dispatch进行处理:

      void ms_deliver_dispatch(Message *m)
      {
          m->set_dispatch_stamp(ceph_clock_now(cct));
          for (list<Dispatcher *>::iterator p = dispatchers.begin();
                  p != dispatchers.end();
                  ++p)
          {
              if ((*p)->ms_dispatch(m))     //在Dispatcher继承类中进行处理
                  return;
          }
          lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                               << m->get_source_inst() << dendl;
          assert(!cct->_conf->ms_die_on_unhandled_msg);
          m->put();
      }
      bool MgrClient::ms_dispatch(const ref_t<Message>& m)
      {
        std::lock_guard l(lock);
      
        switch(m->get_type()) {
        case MSG_MGR_MAP:
          return handle_mgr_map(ref_cast<MMgrMap>(m));
        case MSG_MGR_CONFIGURE:
          return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
        case MSG_MGR_CLOSE:
          return handle_mgr_close(ref_cast<MMgrClose>(m));
        case MSG_COMMAND_REPLY:
          if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
            MCommandReply *c = static_cast<MCommandReply*>(m.get());
            handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
            return true;
          } else {
            return false;
          }
        case MSG_MGR_COMMAND_REPLY:
          if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
            MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
            handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
            return true;
          } else {
            return false;
          }
        default:
          ldout(cct, 30) << "Not handling " << *m << dendl; 
          return false;
        }
      }

      创建 osd_dispatcher/mon_dispatcher ==>add_dispatcher_

                                                                        ==>ready()

                                                                             ==>start()

                                                                                   ==>DispatchThread

                                                                                         ==>entry()

                                                                                   ==>LocalDeliveryThread

                                                                                         ==>run_local_delivery()

       

      ms_fast_dispatch和ms_dispatch的区别

      fast_preprocess

      d) 调用函数read_message()来接收消息,当本函数返回后,就完成了接收消息

      2) 调用函数in_q->fast_preprocess(m)预处理消息

      3) 调用函数in_q->can_fast_dispatch(m),如果可以进行fast_dispatch,就in_q->fast_dispatch(m)处理。fast_dispatch并不把消息加入到mqueue里,而是直接调用msgr->ms_fast_dispatch()函数,并最终调用注册的fast_dispatcher来进行处理。

      4) 如果不能fast_dispatch,就调用函数in_q->enqueue(m, m->get_priority(), conn_id)把接收到的消息加入到DispatchQueue的mqueue队列里,由DispatchQueue的分发线程调用ms_dispatch处理。

      ms_fast_dispatch和ms_dispatch两种处理的区别在于:ms_dispatch是由DispatchQueue的线程处理的,它是一个单线程;ms_fast_dispatch函数是由Pipe接收线程直接调用处理的,因此性能比前者好。

      ceph网络通信 | Ivanzz

      Ceph 数据IO全栈流程-源码分析_Darren_Wen的技术博客_51CTO博客

        bool ms_can_fast_dispatch(const Message *m) const override {
          switch (m->get_type()) {
          case CEPH_MSG_PING:
          case CEPH_MSG_OSD_OP:
          case CEPH_MSG_OSD_BACKOFF:
          case MSG_OSD_SCRUB2:
          case MSG_OSD_FORCE_RECOVERY:
          case MSG_MON_COMMAND:
          case MSG_OSD_PG_CREATE2:
          case MSG_OSD_PG_QUERY:
          case MSG_OSD_PG_QUERY2:
        ……
            return true;
          default:
            return false;
          }
        }


      bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
      {
        return msgr->ms_can_fast_dispatch(m);
      }

      void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
      {
        uint64_t msize = pre_dispatch(m);
        msgr->ms_fast_dispatch(m);
        post_dispatch(m, msize);
      }
       

      void DispatchQueue::run_local_delivery()
      {
        std::unique_lock l{local_delivery_lock};
        while (true) {
          if (stop_local_delivery)
            break;
          if (local_messages.empty()) {
            local_delivery_cond.wait(l);
            continue;
          }
          auto p = std::move(local_messages.front());
          local_messages.pop();
          l.unlock();
          const ref_t<Message>& m = p.first;
          int priority = p.second;
          fast_preprocess(m);
          if (can_fast_dispatch(m)) {
            fast_dispatch(m);
          } else {
            enqueue(m, priority, 0);
          }
          l.lock();
        }
      }

       

       

      Dispatcher处理线程

      dispatcher 是消息分发中心,所有收到的消息都经由该模块,并由该模块转发给相应的处理模块(moncliet、mdsclient、osd等)。

      其实现方式比较简单,就是把所有的模块及其处理消息的方法 handle 注册到分发中心,具体函数为 add_dispatcher_head/tail(),这样就向 dispatcher_queue 中添加了指定模块。后续在分发消息时,对 dispatcher_queue 进行轮询,直到有一个处理模块能够处理该消息,通过 message->get_type() 来指定消息的处理函数。所有的消息分发都在 dispatcher 线程中完成。

      在 add_dispatcher_head() 和 add_dispatcher_tail() 函数中,都做了 dispatcher 队列是否为空的判断(通过 dispatchers.empty() == true)。如果判定结果为空,说明需要重新创建 dispatcher 线程并绑定服务端地址,加入事件中心监听端口,具体方法在 ready() 中。

        void add_dispatcher_head(Dispatcher *d) {
          bool first = dispatchers.empty();
          dispatchers.push_front(d);
          if (d->ms_can_fast_dispatch_any())
            fast_dispatchers.push_front(d);
          if (first)
            ready();
        }
      

       

       add_dispatche_* 中调用了 AsyncMessenger::ready() 方法。下面给出AsyncMessenger::ready()方法代码:

      p->start()(Processor::start())方法中监听 EVENT_READABLE 事件,并把事件提交到 EventCenter 事件中心,由上文介绍的 msgr-worker-x 线程去轮询事件中心的队列,监听端口是否收到消息。收到的消息则由 dispatcher 线程分发给指定的处理程序,其分发消息的接口为 ms_dispatch() 和 ms_fast_dispatch()。

      dispatch_queue.start() 中开启了消息分发线程,分别为处理外部消息的 ms_dispatch 线程和处理本地消息的 ms_local 线程。相应的,它们有各自的优先级队列(注意:分发消息的队列时有优先级的,优先级越高,发送时机越早),分别是存储外部消息的 mqueue 和本地消息队列的 local_messages。消息队列的添加方式也有两种:mqueue.enqueue() 和 local_queue.emplace()。

      void AsyncMessenger::ready()
      {
        ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
      
        stack->ready();
        //绑定端口
        if (pending_bind) {
          int err = bindv(pending_bind_addrs);
          if (err) {
            lderr(cct) << __func__ << " postponed bind failed" << dendl;
            ceph_abort();
          }
        }
      
        Mutex::Locker l(lock);
        //调用 worker 线程,监听端口
        for (auto &&p : processors)  
          p->start();
        //开启 ms_dispatcher 和 ms_locla 线程
        dispatch_queue.start();
      }
      
      void DispatchQueue::start()
      {
        ceph_assert(!stop);
        ceph_assert(!dispatch_thread.is_started());
        //开启 ms_dispatch 和 ms_local 线程
        dispatch_thread.create("ms_dispatch");
        local_delivery_thread.create("ms_local");
      }


      链接:https:///p/58956728dadc

       

      Messenger和Dispatcher两大角色

       

      ceph通信模块的角色主要分为Messenger和Dispatcher两大角色

       

      使用ceph通信模块来收发消息
      * 发送消息

      发消息比较简单,应用只需将消息内容按照需要的消息类型(定义在messaging/*或自定义)进行封装后调用Messenger的send_message即可。

       
      接收消息
      Messenger是如何将消息转给应用的或者说是如何管理?

      Messenger设计了两个分发器管理成员:dispatchers和fast_dispatchers,用来处理不同类型的请求处理。应用层则按需求将不同的分发器注册给Messenger,进而Messenger接收到底层来的消息时,会将消息分发给已经注册的两个dispatchers。


      设计fast_dispatchers的目的就是为了让有些消息能够省去底层的一层流程(如放入队列),直接到达应用。

      Dispatcher类是一个基类,里面设计封装了应用同Messenger交互的接口(每个具体的Dispatcher派生类自行去实现更具体的消息处理,一般都是再根据消息类型来分开处理消息)。Dispatcher并不是所有的接口封装都是为了转发消息,它更深层次的含义是提供一个应用层和底层的通信接口,而这个接口的桥梁是Messenger消息管理器。


      当底层有消息到来时,Messenger会将消息转给dispatcher对于的ms_*系列的接口,最常用的是ms_dispatch接口,因此你可以看到像monitor,osd这些应用的核心消息处理都在ms_dispatch接口里面实现。

       

      Dispatcher的使用

      最简单的方式就是应用本身作为Dispatcher的派生类,如此,Messenger便是直接通过应用关联,比如Monitor、osd、mgr都是应用组件本身作为Dispatcher的派生类。

      class Monitor : public Dispatcher

      申请一个Dispatcher的派生类实例,做为应用的模块注册给Messenger,比如RadosClient里面会注册各个Client给Messenger,而这些Client都是Dispatcher的派生类。

      消息类型

      ceph的消息基类是:Message,Message里面设计了一个type成员,用来区分不同的消息类型,不同的消息模块可以通过type来构造,而这些type定义在Message.h中

      switch(m->get_type()) {

      case MSG_MGR_MAP:
          return handle_mgr_map(ref_cast<MMgrMap>(m));
        case MSG_MGR_CONFIGURE:
          return handle_mgr_configure(ref_cast<MMgrConfigure>(m));

      ……}

       type定义在Message.h中

      Message.h

      ……

      // monitor internal
      #define MSG_MON_SCRUB              64
      #define MSG_MON_ELECTION           65
      #define MSG_MON_PAXOS              66
      #define MSG_MON_PROBE              67
      #define MSG_MON_JOIN               68
      #define MSG_MON_SYNC           69
      #define MSG_MON_PING               140

      ……

       

      技巧:比如你想要看某个消息是谁发的,那么你只需要去查看这个消息类型对应的消息模块有哪些,然后再查到谁在使用这个消息模块来封装消息,进而就可以找到发送这个消息的地方。

       

      例子
      以ceph-mon为例子,Monitor类继承自Dispatcher

      class Monitor : public Dispatcher,
      实现它的ms_dispatcher方法,这个方法里面实现了mon的消息处理

      注册给Messenger,add_dispatcher_tail方法就是将当前应用添加到dispatcher列表中

      messenger->add_dispatcher_tail(this); 
       Messenger收到消息转给dipatcher

      void ms_deliver_dispatch(Message *m)
      {
          m->set_dispatch_stamp(ceph_clock_now());
          for (list<Dispatcher *>::iterator p = dispatchers.begin();
                  p != dispatchers.end();
                  ++p)
          {
              if ((*p)->ms_dispatch(m))
                  return;

          }
          lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                                          << m->get_source_inst() << dendl;
          assert(!cct->_conf->ms_die_on_unhandled_msg);
          m->put();

      }
      ceph monitor处理消息

      bool ms_dispatch(Message *m) override
      {
          lock.Lock();
          _ms_dispatch(m);
          lock.Unlock();
          return true;

      }

       

      bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
      {
        std::lock_guard l(lock);
      
        switch(m->get_type()) {
        case MSG_MGR_MAP:
          return handle_mgr_map(ref_cast<MMgrMap>(m));
        case MSG_MGR_CONFIGURE:
          return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
        case MSG_MGR_CLOSE:
          return handle_mgr_close(ref_cast<MMgrClose>(m));
        case MSG_COMMAND_REPLY:
          if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
            MCommandReply *c = static_cast<MCommandReply*>(m.get());
            handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
            return true;
          } else {
            return false;
          }
        case MSG_MGR_COMMAND_REPLY:
          if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
            MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
            handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
            return true;
          } else {
            return false;
          }
        default:
          ldout(cct, 30) << "Not handling " << *m << dendl; 
          return false;
        }
      }

       

      未整理内容;

      Ceph网络模块使用案例:OSD心跳检测机制 - 灰信网(软件开发博客聚合)

      ceph的组件主要包括,OSD,monitor,mgr,osdclient,client等,这些模块内部的通信,以及模块间的通信都使用了,后面我们直接把这些组件称为网络模块的使用者。使用者在使用网络模块的时候,主要涉及到2个角色,一个是messenger,一个是dispatcher。messenger相当于一个消息管理器(网络模块的核心,消息的发送和接收都是messenger通过底层的类实现),dispatcher相当于一个消息的处理器。

       

      -messenger

      • ① 将dispatcher移交给它的信息发送给其它节点(节点,这里节点是一个逻辑单元,比如osd.0就算是一个节点,osd.1算一个新的节点)处理
      • ② 从其它节点的messenger获取消息移交给本节点的dispatcher。

      -dispatcher

      • ① 对接收到的消息(有messenger移交给它)进行处理
      • ② 把需要发送的消息移交给本节点的messenger

      每个ceph组件都会注册多个messenger和多个dispatcher用于处理不同类型的消息。以OSD组件为例,OSD的守护进程启动的时候会注册7个messenger来管理消息,每个messenger的用途不一样。这部分代码在ceph osd守护进程启动中 /src/ceph_。 每一个OSD都有一个守护进程(OSD deamon)。这个deamon负责完成OSD的所有逻辑功能,包括与monitor和其他OSD(事实上是其他OSD的deamon)通信以维护更新系 统状态,与其他OSD共同完成数据的存储和维护,与client通信完成各种数据对象操作等等。

       

      例子:一个OSD模块会注册7个messenger和2个dispatcher。

       

       

      编号

      Messenger实例名称

      作用

      1

      *ms_public

      用来处理OSD和Client之间的消息

      2

      *ms_cluster

      用来处理OSD和集群之间的消息

      3

      *ms_hb_front_client

      用来向其它OSD发送心跳的消息

      4

      *ms_hb_back_client

      用来向其它OSD发送心跳的消息

      5

      *ms_hb_back_server

      用来接收其他OSD的心跳消息

      6

      *ms_hb_front_server

      用来接收其他OSD的心跳消息

      7

      *ms_objecter

      用来处理OSD和Objecter之间的消息

       

      编号

      Dispatcher实例名称

      作用

      1

      *OSD

      可以处理部分osd节点的消息

      2

      *heartbeat_dispatcher

      处理心跳连接

      1 消息模块的使用框架

      1.1 发送消息

      本节描述ceph中的应用(osd、mgr、monitor等)是如何使用网络模块发送消息的。在实际的应用中,有两种常见的发送消息的方式,当然这两种方式只是看起来有些不同,的底层实现都是相同,都是调用AsyncConnection::send_message(Message *m)把消息发送出去。

      1)方式一:使用AsyncMessenger::send_message(Message *m, const entity_inst_t& dest))

      其中Message *m是要发送的消息,dest是目的地址。

      send_message会首先去判断自己和目标地址之前是不是已经存在链接Connection,如果没有就创建一个,conn->send_message(m)发送消息

      -----------------------------------------------------------

        conn->send_message(m)

        -----------------------------------------------------------

      2) 方式二:

      ① 首先要通过Messenger类,获取对应的Connection:

        ------------------------------------------------------------

        conn = messenger->get_connection(dest_server);

        ------------------------------------------------------------

        get_connection过程是这样的,如果dest.addr是my_inst.addr,就直接返回local_connection。

        如果链接不存在就新建一个。

      ② 当获得一个Connection之后,就可以调用Connection的发送函数来发送消息。

        -----------------------------------------------------------

        conn->send_message(m)

        -----------------------------------------------------------

      具体发送的实现过程依赖于选择的消息模式,simple、async等实现方式都不同。在另一个文章里我会讲到具体实现过程,这里不多做解释。

      1.2 消息的接收

      消息的接收过程简言之就是通过监听socket判断是否有消息到来,如果有就接收。这个过程是个很复杂的过程,涉及到了连接建立、错误处理等等。具体的实现依赖于选择的消息模式,比如,SimpleMessenger是使用一个read线程来实现;AsyncMessenger是使用基于事件的机制实现。接收的过程对应用层都是透明的,本章不做解释。

      1.3 消息的处理

      消息接收完成后,就进入消息的处理。首先判断消息m是否可以fast_dispatch,如果可以,调用注册fast_dispatcher函数处理消息。如果不能fast_dispatch,调用函数in_q->enqueue,将接收到的消息加入到DispatchQueue的mqueue队列中,排队等待处理。

      2 ceph OSD心跳检测与网络模块 

      下面我们具体举一个OSD心跳检测的例子来讲解,通过心跳检测机制来了解网络模块的使用。在ceph中需要通过心跳检测来判断OSD是不是在线,因为这部分的功能比较简单独立。

      2.1 Messenger & Dispatcher的注册

      在OSD模块注册的7个Messenger和2个Dispatcher中,4个Messenger都和心跳检测相关,一个heartbeat_dispatcher用来处理心跳连接。

       

      编号

      Messenger实例名称

      作用

      1

      *ms_public

      用来处理OSD和Client之间的消息

      2

      *ms_cluster

      用来处理OSD和集群之间的消息

      3

      *ms_hb_front_client

      用来向其它OSD发送心跳的消息

      4

      *ms_hb_back_client

      用来向其它OSD发送心跳的消息

      5

      *ms_hb_back_server

      用来接收其他OSD的心跳消息

      6

      *ms_hb_front_server

      用来接收其他OSD的心跳消息

      7

      *ms_objecter

      用来处理OSD和Objecter之间的消息

      对应代码在ceph_中

      -------------------------------------------------------------------------------------------------------

      -HeartBeatMessenger

      在ceph守护进程启动过程中(ceph-),创建了4个messenger用于心跳检测。

       

      在ceph守护进程创建osd的时候将这些messenger传给了osd, 注意这些messenger在osd中被重命名了。

      重命名为了:

      hb_front_client_messenger

      hb_back_client_messenger

      hb_front_server_messenger

      hb_back_server_messenger

       

      ---------------------------------------------------------------------------------------------------

       

      编号

      Dispatcher实例名称

      作用

      1

      *OSD

      可以处理部分osd节点的消息

      2

      *heartbeat_dispatcher

      处理心跳连接

      对应代码在中

      -------------------------------------------------------------------------------------------------------

        hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);

        hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);

        hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

        hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

      -------------------------------------------------------------------------------------------------------

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.csdn.net/bandaoyu/article/details/111698898,作者:bandaoyu,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:linux管道pipe详解

      下一篇:flask celery python 每月定时任务

      相关文章

      2025-05-16 09:15:10

      52.介绍AOP有几种实现方式

      52.介绍AOP有几种实现方式

      2025-05-16 09:15:10
      gt , lt , Spring
      2025-05-13 09:50:48

      函数索引测试

      函数索引测试

      2025-05-13 09:50:48
      emp , gt , SQL
      2025-05-13 09:50:38

      ORA-00823异常处理

      ORA-00823异常处理

      2025-05-13 09:50:38
      gt , max , size , SQL
      2025-05-13 09:50:38

      磁盘组扩容测试01

      磁盘组扩容测试01

      2025-05-13 09:50:38
      gt , ORCL , SQL
      2025-05-13 09:50:28

      添加控制文件—场景(DG备库)—采用Switchover方式

      添加控制文件—场景(DG备库)—采用Switchover方式

      2025-05-13 09:50:28
      database , gt , mode , SQL
      2025-05-13 09:50:28

      添加控制文件—场景(磁盘组+RAC)

      添加控制文件—场景(磁盘组+RAC)

      2025-05-13 09:50:28
      gt , ORCL
      2025-05-09 09:21:53

      Oracle 11g系列:约束

      约束是每个数据库必不可少的一部分,约束的目的在于保存数据的完整性。数据完整性是指数据的精确性和可靠性。数据库约束主要包括:主键约束、外键约束、唯一性约束、检查约束和默认值约束。

      2025-05-09 09:21:53
      gt , 主键 , 删除 , 外键 , 约束
      2025-05-09 09:21:53

      优化查询性能

      优化查询性能

      2025-05-09 09:21:53
      gt , JOIN , lt , 生成
      2025-05-09 09:21:53

      Oracle 11g系列:视图

      视图是数据库中特有的对象,视图用于存储查询,但不会存储数据(物化视图除外)。这是视图和数据表的重要区别。Oracle中有4种视图:关系视图、内嵌视图、对象视图和物化视图。

      2025-05-09 09:21:53
      gt , 对象 , 查询 , 视图
      2025-05-07 09:08:54

      【Linux】>`, `>>`, `2>`, `|`, `uniq`, `comm`, `diff`, `history`, `Tab补全`, `Ctrl+R`

      【Linux】>`, `>>`, `2>`, `|`, `uniq`, `comm`, `diff`, `history`, `Tab补全`, `Ctrl+R`

      2025-05-07 09:08:54
      diff , gt , uniq
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5274346

      查看更多

      最新文章

      ORA-00823异常处理

      2025-05-13 09:50:38

      添加控制文件—场景(磁盘组+RAC)

      2025-05-13 09:50:28

      添加控制文件—场景(DG备库)—采用Switchover方式

      2025-05-13 09:50:28

      优化查询性能

      2025-05-09 09:21:53

      【Linux】>`, `>>`, `2>`, `|`, `uniq`, `comm`, `diff`, `history`, `Tab补全`, `Ctrl+R`

      2025-05-07 09:08:54

      MFC中如何使用定时器(SetTimer)

      2025-05-06 09:21:03

      查看更多

      热门文章

      四、消息认证码、认证加密和重放攻击

      2023-06-16 06:04:11

      【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Order,RocketMQ的消息可以是有序的哦

      2023-07-04 07:00:45

      RocketMQ初级特性能力 | Message Priority,RocketMQ消息的优先级?没玩过吧!

      2023-07-06 09:42:03

      RocketMQ-术语详解(通俗易懂)

      2023-07-04 07:00:45

      shell 管道命令 &、&&、||、>、>>(精)

      2024-05-31 08:13:27

      java.lang.IllegalArgumentException: Could not resolve placeholder ‘basePath‘ in value ““

      2024-09-24 06:31:08

      查看更多

      热门标签

      linux java python javascript 数组 前端 docker Linux vue 函数 shell git 节点 容器 示例
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      处理特殊格式的GET传参

      /etc/security/limits.conf的相关说明

      shell 管道命令 &、&&、||、>、>>(精)

      【leetcode】位运算 - 比特位计数

      四、消息认证码、认证加密和重放攻击

      HTML表格、表单标签

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号