爆款云主机2核4G限时秒杀,88元/年起!
查看详情

活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心

      ceph:消息通信机制小记--研读

      首页 知识中心 软件开发 文章详情页

      ceph:消息通信机制小记--研读

      2025-02-27 09:35:36 阅读次数:13

      events,gt,lt,线程

      分布式存储系统,需要一个稳定的网络通信机制,来实现客户端和服务端的消息通信。ceph有三种消息通信框架:simple,xio和async。目前只研究了async。

      Messenger的创建

      消息的收发和处理都是异步的,所以必须需要单独的模块来处理。所有模块(mon,osd, mds等)在启动阶段都会创建一个Messenger。既然是async的通信框架,那必须有单独的线程来处理消息收发,所以在创建Messenger时必须启动多个线程。以ceph-mon为例,创建Messenger

      1. mon是运行在服务器上的进程,进程的主体在ceph-mon.cc中的main函数开始,并且申请一些用于服务的线程。比如这里用于接收客户端连接的线程Accepter。如果配对连接成功后,会把连接成功的socket交给Messager来处理。
      Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,entity_name_t::MON(rank), 
      "mon", 0, Messenger::HAS_MANY_CONNECTIONS)
      

      Messenger::create函数如下,根据public_msgr_type的配置创建对应类型的Messager并返回:

      Messenger *Messenger::create(CephContext *cct, const string &type, entity_name_t name, string lname,
      			     uint64_t nonce, uint64_t cflags)
      { // name = entity_name_t::MON(rank), lname = "mon"
        ...
        else if (r == 1 || type.find("async") != std::string::npos)
          return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
        ...
      }
      

      AsyncMessenger中的几个重要成员

      class AsyncMessenger : public SimplePolicyMessenger {
      private:
        NetworkStack *stack;                    // 起线程
        std::vector<Processor*> processors;     // 主要用来监听连接
        DispatchQueue dispatch_queue;           // 用来分发消息
        ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns  // 保存已建立的连接
        ...
      }
      

      接下来就是new AsyncMessenger的过程。

      AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                                     const std::string &type, string mname, uint64_t _nonce)
        : SimplePolicyMessenger(cct, name,mname, _nonce), dispatch_queue(cct, this, mname), ...
      { 
        ...
        StackSingleton *single;
        // 创建一个StackSingleton的单例
        cct->lookup_or_create_singleton_object<StackSingleton>(single, "AsyncMessenger::NetworkStack::"+transport_type);
        single->ready(transport_type);
        stack = single->stack.get();
        stack->start();
        // 获取worker
        local_worker = stack->get_worker();
        local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
        init_local_connection();
        reap_handler = new C_handle_reap(this);
        unsigned processor_num = 1;
        if (stack->support_local_listen_table())
          processor_num = stack->get_num_worker();
        for (unsigned i = 0; i < processor_num; ++i)
          processors.push_back(new Processor(this, stack->get_worker(i), cct));
      }

      说明:AsyncMessenger初始化的过程很简单,先将几个重要的成员初始化。

      这里面比较重要的是stack的初始化,先是cct->lookup_or_create_singleton_object<StackSingleton>创建一个StackSingleton的单例,StackSingleton如下:

      struct StackSingleton {
        CephContext *cct;
        std::shared_ptr<NetworkStack> stack;
        StackSingleton(CephContext *c): cct(c) {}
        void ready(std::string &type) {
          if (!stack)
            stack = NetworkStack::create(cct, type);
        }
      }
      

      然后通过single->ready --> NetworkStack::create来创建NetwrokStack,流程如下

      ceph:消息通信机制小记--研读

       

      创建Worker和Epoll句柄

      NetworkStack::create

      std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
      {
        if (t == "posix")
          return std::make_shared<PosixNetworkStack>(c, t);
        ...
      }
      

      NetworkStack和子类PosixNetworkStack中几个重要成员如下:

      class NetworkStack : public CephContext::ForkWatcher {
        std::string type;                               // type = "posix"
        unsigned num_workers = 0;                       // num_workers = 3
        ...
        std::function<void ()> add_thread(unsigned i);
       protected:
        CephContext *cct;
        vector<Worker*> workers;                       // 存着三个PosixWorker
        ...
      }
      
      class PosixNetworkStack : public NetworkStack {
        vector<int> coreids;
        vector<std::thread> threads;                 // threads存着三个add_thread()中的匿名函数
        ...
      }

      NetworkStack中的workers用来保存多个Worker,每个Worker都会创建一个Epoll(大多的网络编程中,都会使用基于事件通知的异步网络IO方式来实现,比如Epoll和Kqueue,ceph的网络模块使用的是Epoll)。在NetworkStack的构造函数中,会创建三个Worker

      NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
      {
        const uint64_t InitEventNumber = 5000;
        num_workers = cct->_conf->ms_async_op_threads;        // num_workers = 3
        for (unsigned i = 0; i < num_workers; ++i) {
          Worker *w = create_worker(cct, type, i);
          w->center.init(InitEventNumber, i, type);
          workers.push_back(w);
        }
        cct->register_fork_watcher(this);
      }

      NetworkStack::create_worker如下

      Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
      {
        if (type == "posix")
          return new PosixWorker(c, i);
        ...
      } 

      Worker和PosixWorker中几个重要成员如下:

      class Worker : public Thread {
        ...
        EventCenter center;
        ...
      }
      
      class PosixWorker : public Worker {
        NetHandler net;
        ...
      }
      

      EventCenter(事件中心)是一个处理事件的数据结构,相当于一个事件处理的容器。它本身并不真正去处理事件,通过回调函数的方式来完成事件的处理。同样,如何获取需要处理的事件也不是事件中心来完成的,它只负责处理,具体对需要处理的事件的获取是通过EventDriver来完成的。EventDriver是一个接口类,其实现主要是由EpollDriver、KqueueDriver和SelectDriver三个类操作的。Ceph支持多种操作系统的使用,如果使用的是Linux操作系统,使用EpollDriver,如果是BSD,使用KqueueDriver,如果都不是的情况下再使用SelectDriver(系统定义为最坏状况下)。EpollDriver封装了epoll的接口,事件驱动的执行主要依赖于epoll的方式,其中主要有三个函数:epoll_create,创建epoll句柄; epoll_ctl,将被监听的描述符fd添加到epoll句柄或从epoll句柄中删除或者对监听事件进行修改;epoll_wait,等待事件触发(观察就绪列表里面有没有数据,并进行提取和清空就绪列表,非常高效)。

      class EpollDriver : public EventDriver {
        int epfd;
        struct epoll_event *events;
        CephContext *cct;
        int size;
        ...
      }

      启动线程

      在AsyncMessenger的构造函数中创建Worker后,就该启动Worker中的线程,在启动线程的过程中,重要的有两点:1,加入监听事件;2,等待并处理事件。

      从stack->start()开始,代码如下

      void NetworkStack::start()
      {
        ...
        for (unsigned i = 0; i < num_workers; ++i) {
          if (workers[i]->is_init())
            continue;
          std::function<void ()> thread = add_thread(i);   // add_thread返回一个匿名函数
          spawn_worker(i, std::move(thread));
        }
        ...
      }
      

      遍历workers,执行spawn_worker,在spawn_worker中执行std::thread(func)启动线程。

        void spawn_worker(unsigned i, std::function<void ()> &&func) override {
          threads.resize(i+1);
          threads[i] = std::thread(func);
        }

      线程执行的函数func如下

      [this, w]() {
            char tp_name[16];
            sprintf(tp_name, "msgr-worker-%u", w->id);
            ceph_pthread_setname(pthread_self(), tp_name);
            const uint64_t EventMaxWaitUs = 30000000;
            w->center.set_owner();         // 加入监听事件
            w->initialize();
            w->init_done();
            while (!w->done) {
              ceph::timespan dur;
              int r = w->center.process_events(EventMaxWaitUs, &dur);   // 等待并处理事件
              ...
            }
            ...
        }

      加入监听事件

      在EventCenter::init过程中,会创建一个管道,并将管道r/w两个文件描述符赋值给EventCenter中notify_receive_fd和notify_send_fd,如下

      int EventCenter::init(int n, unsigned i, const std::string &t)
      { ...
        int fds[2];
        if (pipe(fds) < 0) {
          lderr(cct) << __func__ << " can't create notify pipe" << dendl;
          return -errno;
        }
        notify_receive_fd = fds[0];
        notify_send_fd = fds[1];
        ...
      }
      

      所有的监听事件都有对应的处理函数:读和写,封装在FileEvent中。

       struct FileEvent {
          int mask;
          EventCallbackRef read_cb;
          EventCallbackRef write_cb;
        }
      

      w->center.set_owner()将notify_receive_fd加入epoll队列,并且fd和对应的FileEvent都存在file_events中。这样的话,如果监听到有事件,获取到fd后,就可以从file_events中拿出回调去处理事件。

      vector<FileEvent> file_events;
      

      EventCenter::set_owner的流程如下

      ceph:消息通信机制小记--研读

      代码如下

      void EventCenter::set_owner()
      {
        owner = pthread_self();
        if (!global_centers) {
          cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
              global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
          global_centers->centers[idx] = this;
          if (driver->need_wakeup()) {
            notify_handler = new C_handle_notify(this, cct);
            int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
            assert(r == 0);
          }
        }
      }

      notify_handler就是notify_receive_fd的事件回调函数 对象,EventCenter::create_file_event将notify_receive_fd加入epoll的监听队列中,并且注册回调函数 对象,如下:

      int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
      {
        int r = 0;
        ...
        EventCenter::FileEvent *event = _get_file_event(fd);
        r = driver->add_event(fd, event->mask, mask);
        event->mask |= mask;
        if (mask & EVENT_READABLE) { event->read_cb = ctxt; }
        if (mask & EVENT_WRITABLE) { event->write_cb = ctxt; }
        ...
        return 0;
      }

      #undef dout_prefix
      #define dout_prefix *_dout << "EventCallback "
      class C_handle_notify : public EventCallback
      {
          EventCenter *center;
          CephContext *cct;

      public:
          C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
          void do_request(int fd_or_id)
       {
              char c[256];
              int r = 0;
              do
              {
                  r = read(fd_or_id, c, sizeof(c));  //fd_or_id如果已经设置为阻塞模式就会阻塞等待数据,如果没有设置为阻塞就不会阻塞
                  if (r < 0)
                  {
                      if (errno != EAGAIN)
                          ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
                  }
              } while (r > 0);
          }
      };

      管道是一种最基本的IPC机制,作用于有血缘关系的进程之间,完成数据传递。这里监听管道读事件描述符的作用是:

      一般IO复用是使用 one loop per thread 的模型,阻塞等待的一般都是可读事件,但是监听可写需实时添加(如添加到epoll_base)。

      如果遇到wait线程在等待可读事件,并一直阻塞下去,此时有一个线程想添加监听可写事件,这时需要唤醒wait,(执行wait判断,并执行while 内wait之后的代码)添加可写事件(到监听队列)。(wait在监听)直接唤醒wait的方式就是可读事件,那么只需要给wait一个可读事件即可。因此epoll需要多监听一个fd作为唤醒wait的专用fd,可以用pipe, 可以用其他的eventfd,需要唤醒是向该fd写入数据使得wait监听到即可。这里使用pipe,开启一个pipe,将读端的fd加入监听队列(红黑树),那么当需要唤醒使wait返回时,只需要向pipe写端写入数据即可。

      void EventCenter::wakeup()
      {
          // No need to wake up since we never sleep
          if (!pollers.empty() || !driver->need_wakeup())
              return ;

          ldout(cct, 2) << __func__ << dendl;
          char buf = 'c';
          // wake up "event_wait"
          int n = write(notify_send_fd, &buf, sizeof(buf)); //通过向管道中写入数据 唤醒监听该管道的wait
          if (n < 0)
          {
              ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
              ceph_abort();
          }
      }

      等待并处理事件

      EventCenter::process_events是在while中循环,process_events中处理的事件有三类

      • time_events:定时事件,比如connection的定时函数AsyncConnection::tick。
      • external_events:外部事件,比如要发送消息时,send_message就是外部事件。
      • 可读事件:epoll监听的事件,比如socket连接的对端发来的消息。

      EventCenter::process_events函数中分为两部分:超时监听,回调事件注册函数,函数中超时变量一般设置为30秒(并不固定,与external_events和time_events有关)。如果有external_events,则超时时间为0,先去处理external_events;如果有time_events,根据定时的时间和30秒来确定超时时间。超时时间确定后,就开始等待。

      int EventCenter::process_events(int timeout_microseconds,  ceph::timespan *working_dur)
      { // timeout_microseconds = 30,000,000
        struct timeval tv;
        int numevents;
        bool trigger_time = false;
        auto now = clock_type::now();
        auto it = time_events.begin();
        bool blocking = pollers.empty() && !external_num_events.load();   
        // If exists external events or poller, don't block
        if (!blocking) {                                              // 如果有外部事件
          if (it != time_events.end() && now >= it->first)
            trigger_time = true;
          tv.tv_sec = 0;
          tv.tv_usec = 0;
        } else {
          clock_type::time_point shortest;
          shortest = now + std::chrono::microseconds(timeout_microseconds);           
          if (it != time_events.end() && shortest >= it->first) {                     
            shortest = it->first;
            trigger_time = true;
            if (shortest > now) {
              timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
                  shortest - now).count();
            } else {
              shortest = now;
              timeout_microseconds = 0;
            }
          }
          tv.tv_sec = timeout_microseconds / 1000000;          //  tv.tv_sec = 30
          tv.tv_usec = timeout_microseconds % 1000000;         // tv.tv_usec = 0
        }
        vector<FiredFileEvent> fired_events;
        numevents = driver->event_wait(fired_events, &tv);              // 等待事件触发
        ...
      }
       
      

      在EpollDriver::event_wait中,如果有就绪事件,则将fd和事件类型(EVENT_READABLE/EVENT_WRITABLE)保存在fired_events。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24

       

      int EpollDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
      {
        int retval, numevents = 0;

        retval = epoll_wait(epfd, events, size,
                            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
          int j;

          numevents = retval;
          fired_events.resize(numevents);
          for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = events + j;

            if (e->events & EPOLLIN) mask |= EVENT_READABLE;
            if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
            if (e->events & EPOLLERR) mask |= EVENT_READABLE|EVENT_WRITABLE;
            if (e->events & EPOLLHUP) mask |= EVENT_READABLE|EVENT_WRITABLE;
            fired_events[j].fd = e->data.fd;      //fired_events 相当于epoll的events
            fired_events[j].mask = mask;

          }
        }
        return numevents;

      process_events线程原本在wait处阻塞,此时fired_events中有了数据唤醒event_wait(numevents = driver->event_wait(fired_events, &tv);),接下来就是处理事件,即回调事件注册函数。

      int EventCenter::process_events(int timeout_microseconds,  ceph::timespan *working_dur) 
      {  
        ...
        numevents = driver->event_wait(fired_events, &tv);<---------唤醒event_wait
        for (int j = 0; j < numevents; j++) {
          int rfired = 0;
          FileEvent *event;
          EventCallbackRef cb;
          event = _get_file_event(fired_events[j].fd);
          // 回调监听事件函数
          if (event->mask & fired_events[j].mask & EVENT_READABLE) {  
            rfired = 1;
            cb = event->read_cb;
            cb->do_request(fired_events[j].fd);
          }
          if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
            if (!rfired || event->read_cb != event->write_cb) {
              cb = event->write_cb;
              cb->do_request(fired_events[j].fd);
            }
          }
        }
        ...
        if (external_num_events.load()) {                       // 处理外部事件
          external_lock.lock();
          deque<EventCallbackRef> cur_process;
          cur_process.swap(external_events);
          external_num_events.store(0);
          external_lock.unlock();
          numevents += cur_process.size();
          while (!cur_process.empty()) {
            EventCallbackRef e = cur_process.front();
            e->do_request(0);
            cur_process.pop_front();
          }
        }
        ...
        return numevents;
      }

       

      服务端bind

      服务端和客户端是通过socket来通信,服务端的建立过程还是那几步:

      socket() –> bind() –> listen() –> accept() –> recv/send

      其中socket() –> bind() –> listen()可以看做是Processor的bind过程(最终封装在PosixWorker::listen函数中/RDMAWorker/DPDKWorker)。socket的流程基本上没什么可记录的,网上一大堆。但是Processor::bind并不是直接去调用PosixWorker::listen,因为AsyncMessenger::bind --> Processor::bind这个过程是由主线程调用的,Processor有属于自己的线程,这里它得切换到自己线程上去执行。

      AsyncMessenger::bind

      int AsyncMessenger::bind(const entity_addr_t &bind_addr)
      { 
        // bind to a socket
        set<int> avoid_ports;
        entity_addr_t bound_addr;
        unsigned i = 0;
        for (auto &&p : processors) {      // processors中只有一个元素
          int r = p->bind(bind_addr, avoid_ports, &bound_addr);
          ...
          ++i;
        }
        _finish_bind(bind_addr, bound_addr);
        return 0;
      }
      

      Processor::bind

      int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports, entity_addr_t* bound_addr)
      { 
        ...
        /* bind to port */
        int r = -1;
        for (int i = 0; i < conf->ms_bind_retry_count; i++) {        // conf->ms_bind_retry_count = 6
          if (i > 0) { sleep(conf->ms_bind_retry_delay); }  // conf->ms_bind_retry_delay = 6,如果先前一次bind失败,每个循环睡眠6秒
          if (listen_addr.get_port()) {      // ceph_mon中端口固定为6789
            worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() {
              r = worker->listen(listen_addr, opts, &listen_socket);}, 
                                     false);
            if (r < 0) continue;
          } 
        ...
        return 0;
      }
      

      worker->center.submit_to就是去判断目前所在的线程是否是EventCenter自己的(直属)线程,如果是就直接执行worker->listen,反之,则去唤醒自己的线程去执行。(AsyncMessenger::bind --> Processor::bind这个过程是由主线程调用的,Processor有属于自己的线程,这里它得切换到自己线程上去执行。)

      流程如下

      ceph:消息通信机制小记--研读

      代码如下

      template <typename func>
      void submit_to(int i, func &&f, bool nowait = false)
      {
          assert(i < MAX_EVENTCENTER && global_centers);
          EventCenter *c = global_centers->centers[i];
          assert(c);
          
          if (!nowait && c->in_thread())     // c->in_thread()就是判断是否是自己的线程
          {
              f();
              return ;
          }
          
          if (nowait)
          {
              ...
          }
          else
          {
              C_submit_event<func> event(std::move(f), false);   // 创建回调类
              c->dispatch_event_external(&event);
              event.wait();
          }
      };

      EventCenter::dispatch_event_external(e)就是去唤醒epoll_wait然后去执行回调函数EventCallback:: do_request(event->do_request)。(EventCenter::dispatch_event_external将回调函数e放入 external_events,wakeup (通过向epoll监听的管道写入数据的方式))

      void EventCenter::dispatch_event_external(EventCallbackRef e)
      {
        external_lock.lock();
        external_events.push_back(e);         // 将事件加入external_events队列                   
        bool wake = !external_num_events.load();  // external_num_events为0,所以wake就是true
        uint64_t num = ++external_num_events;  // num = ++external_num_events = 1
        external_lock.unlock();
        if (!in_thread() && wake)
          wakeup();                      // 去唤醒epoll_wait
      }
      

      EventCenter::wakeup

      void EventCenter::wakeup()
      { 
        ...
        char buf = 'c';
        // wake up "event_wait"
        int n = write(notify_send_fd, &buf, sizeof(buf));     // 往管道中写数据,来唤醒epoll_wait
        if (n < 0) { ... }
        }
      }
      

      epoll_wait被唤醒后,先去读管道数据,管道中的数据没有特别的意义(只是个触发引子)。接着就是去处理external_events中的事件

      int EventCenter::process_events(int timeout_microseconds,  ceph::timespan *working_dur)
      {
          ...
          if (external_num_events.load())
          {
              external_lock.lock();
              deque<EventCallbackRef> cur_process;
              cur_process.swap(external_events);              // 获取external_events
              external_num_events.store(0);
              external_lock.unlock();
              numevents += cur_process.size();
              while (!cur_process.empty())
              {
                  EventCallbackRef e = cur_process.front();
                  e->do_request(0);                         // 执行回调
                  cur_process.pop_front();
              }
          }
          ...
          return numevents;
      }
      

      e->do_request(0)就是去执行回调,流程如下

      ceph:消息通信机制小记--研读

      PosixWorker::listen

      int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, ServerSocket *sock)
      {
        int listen_sd = net.create_socket(sa.get_family(), true); // 创建socket
        // 设置为非阻塞,即它在读取不到数据时结果为-1,并且设置errno为EAGAIN,而不会阻塞等待
        int r = net.set_nonblock(listen_sd);
        // 设置FD_CLOEXEC标志位,即子进程在执行exec后关闭子进程中的fd           
        net.set_close_on_exec(listen_sd);                
        // 设置TCP_NODELAY标志。Nginx指令tcp_nodelay作用于socket参数TCP_NODELAY。
        // 在这之前,我们先说说nagle缓存算法,有些应用程序在网络通讯的时候会发送很少的字节,比如说一个字节,那么再加TCP协议本身,实际上发的要41个字节,这样的效率是很低的。这时候nagle算法就应运而生了,它将要发送的数据存放在缓存里,当积累到一定量或一定时间,再将它们发送出去。  
        // 这里TCP_NODELAY就是nagle启用与否的开关,所以下面的指令(tcp_nodelay on)的效果就是禁用nagle算法,也即不缓存数据。
        r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
        // 给listen_sd绑定地址
        r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
        // 监听listen_sd, 待处理的客户连接队列为512
        r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);            // cct->_conf->ms_tcp_listen_backlog = 512
        *sock = ServerSocket( std::unique_ptr<PosixServerSocketImpl>(new PosixServerSocketImpl(net, listen_sd)));
        return 0;
      }

       

      服务端listen

      bind完后,就得去listen socket的fd,从Monitor::init开始,流程如下

      ceph:消息通信机制小记--研读

      Monitor::init

      int Monitor::init()
      {
        ...
        // i'm ready!
        messenger->add_dispatcher_tail(this);
        ...
        return 0;
      }
      

      将mon加入dispatchers队列

      void add_dispatcher_tail(Dispatcher *d) { 
          bool first = dispatchers.empty();
          dispatchers.push_back(d);
          if (d->ms_can_fast_dispatch_any())
            fast_dispatchers.push_back(d);
          if (first)                 // first为true
            ready();
        }
      

      AsyncMessenger::ready

      void AsyncMessenger::ready()
      {
        stack->ready();
        for (auto &&p : processors) 
          p->start();
        dispatch_queue.start();
      }
      

      Processor::start就开始去监听

      void Processor::start()
      {
        // start thread
        if (listen_socket) {
          worker->center.submit_to(worker->center.get_id(), [this]() {
            worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
        }
      }

      还是一样,去唤醒Processor中的线程,执行EventCenter::create_file_event,将socket fd加入epoll事件中,并注册回调事件,执行pro->accept(),如果有连接来,就去accept,建立socket连接。

      listen_handler(new C_processor_accept(this));
      
      class Processor::C_processor_accept : public EventCallback {
        Processor *pro;
       public:
        ...
        void do_request(uint64_t id) override {
          pro->accept();
        }
      };
      

      在AsyncMessenger::ready中还创建了2个线程

      void DispatchQueue::start()
      {
        assert(!stop);
        assert(!dispatch_thread.is_started());
        dispatch_thread.create("ms_dispatch");
        local_delivery_thread.create("ms_local");
      }
      

       

      处理连接

      如果有连接过来,唤醒epoll_wait,建立连接,在EventCenter::process_events执行listen_socket对应的回调函数Processor::accept

      void Processor::accept()
      {
        SocketOptions opts;
        opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
        opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
        opts.priority = msgr->get_socket_priority();            
        while (true) {
          entity_addr_t addr;
          ConnectedSocket cli_socket;
          Worker *w = worker;
          // msgr->get_stack()->support_local_listen_table()为false
          if (!msgr->get_stack()->support_local_listen_table())               
            w = msgr->get_stack()->get_worker();                              //选一个负载较小的worker
          int r = listen_socket.accept(&cli_socket, opts, &addr, w);
          if (r == 0) {
            msgr->add_accept(w, std::move(cli_socket), addr);
            continue;
          } else { ... }
        }
      }

      Processor::accept可以分为两个过程:建立连接和将cli_socket加入epoll监听。

      在建立连接之前先去选一个当前负载最小的worker,这个worker与新建的Connection绑定,该连接上的IO事件都会由此worker来处理。listen_socket.accept最终调用的是PosixServerSocketImpl::accept

      int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) 
        { 
            sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            // 接受连接,_fd是非阻塞的,非阻塞倾听socket,在有没有连接时都accept立即返回,
            int sd = ::accept(_fd, (sockaddr*)&ss, &slen);  
            if (sd < 0) {  // 非阻塞模式下,没有连接时,返回值是-1,并且错误码是EAGAIN or EWOULDBLOCK  
              return -errno;   
            }
            handler.set_close_on_exec(sd);                  
            int r = handler.set_nonblock(sd);   // 设置为非阻塞,
            r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
            out->set_sockaddr((sockaddr*)&ss);               // 填充addr
            handler.set_priority(sd, opt.priority, out->get_family());
            std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
            *sock = ConnectedSocket(std::move(csi));
            return 0;
         }
      }

      连接建立完后,需要创建AsyncConnection。socket连接建立了,并不代表可以发消息,AsyncConnection可以看作socket的上层,连接的创建和删除、数据的读写指令、连接的重建、消息的处理等都是在这个类中进行。

      void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
      {
        lock.Lock();
        AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
        conn->accept(std::move(cli_socket), addr);
        accepting_conns.insert(conn);
        lock.Unlock();
      }

      AsyncConnection::accept唤醒AsyncConnection绑定的worker去执行read_handler事件

      void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
      {
        std::lock_guard<std::mutex> l(lock);
        cs = std::move(socket);
        socket_addr = addr;
        state = STATE_ACCEPTING;
        // rescheduler connection in order to avoid lock dep
        center->dispatch_event_external(read_handler);
      }
      

      read_handler定义如下

      read_handler = new C_handle_read(this);
      ...
      class C_handle_read : public EventCallback {
        AsyncConnectionRef conn;
       public:
        ...
        void do_request(uint64_t fd_or_id) override {
          conn->process();
        }
      };
      

      AsyncConnection::process中主要做三件事:

      • 将fd加入epoll,并注册read_handle到events中
      • 将服务端和客户端的addr发送给客户端
      • 将connect的状态转换为STATE_ACCEPTING_WAIT_BANNER_ADDR

      这之后就涉及到AsyncConnection的状态转换。

      AsyncConnection的状态转换

      AsyncConnection有很多个状态,可以简单分为connect,accept,open, standby,closed,wait。socket连接建立后,还需要将AsyncConnection的状态转换为STATE_OPEN,才可以正常发送消息,STATE_CONNECTING/STATE_ACCEPTING状态切换到SATE_OPEN的过程中,会去做一些校验,这个过程很有必要。如下是状态切换的流程图。

      ceph:消息通信机制小记--研读

       

      消息发送过程

      先码...

      AsyncMessenger启动接收线程

      ceph通信(3)——AsyncMessenger启动接收线程_ceph asyncmessenger 的本地连接-CSDN博客

      OSD::init
      |---client_messenger->add_dispatcher_head(this) // Messenger.h void add_dispatcher_head(Dispatcher *d) 
          |---bool first = dispatchers.empty()  // 初始化时dispatcher为空,list<Dispatcher*> dispatchers
          |---dispatchers.push_front(d);
          |---ready() // void AsyncMessenger::ready()
              |---stack->ready() // PosixNetworkStack : public NetworkStack 的ready
              |---for (auto &&p : processors)
                  |---p->start();  // void Processor::start()  启动work
                      |---worker->center.submit_to(worker->center.get_id(), 
                                 [this]() { worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false);
                          // 执行EventCenter::create_file_event
                          |---EventCenter::FileEvent *event = _get_file_event(fd);
                          |---r = driver->add_event(fd, event->mask, mask);  // int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
                              |---if (epoll_ctl(epfd, op, fd, &ee) == -1)   // 唤醒 EventCenter::process_events 
                          |---if (mask & EVENT_READABLE)  
                              |---event->read_cb = ctxt  // 读回调
                          |---if (mask & EVENT_WRITABLE) 
                              |---event->write_cb = ctxt // 写回调
              |---dispatch_queue.start()  // DispatchQueue::start()
                  |---dispatch_thread.create("ms_dispatch")  // 启动线程 DispatchQueue::entry()
                      |---while (true) {
                          |---while (!mqueue.empty()) {
                              |---if (qitem.is_code())
                                  |---local_delivery_cond.Wait(local_delivery_lock);  
                              |---else
                                  |---Message *m = qitem.get_message();
                                  |---msgr->ms_deliver_dispatch(m)  // Messenger.h  ms_deliver_dispatch
                                      |---for (list<Dispatcher*>::iterator p = dispatchers.begin()
                                          |---if ((*p)->ms_dispatch(m))  // OSD::ms_dispatch(Message *m) 对应的类处理函数的入口
                                  |---post_dispatch(m, msize);
                                      |---dispatch_throttle_release(msize)
                                          |---cond.front()->SignalOne();   //唤醒线程
                          |---cond.Wait(lock)
                  |---local_delivery_thread.create("ms_local")  // 启动线程 DispatchQueue::run_local_delivery()
                      |---while (true) {
                          |---if (local_messages.empty()) {
                              |---local_delivery_cond.Wait(local_delivery_lock)
                          |---fast_preprocess(m);
                              |---msgr->ms_fast_preprocess(m)
                                  |---for (list<Dispatcher*>::iterator p = fast_dispatchers.begin()
                                      |---(*p)->ms_fast_preprocess(m)   // OSD::ms_fast_preprocess(Message *m)
                          |---if (can_fast_dispatch(m))
                              |---msgr->ms_fast_dispatch(m)
                                  |---for (list<Dispatcher*>::iterator p = fast_dispatchers.begin()
                                      |---if ((*p)->ms_can_fast_dispatch(m)) 
                                          |---(*p)->ms_fast_dispatch(m)   // OSD::ms_fast_dispatch(Message *m)
                              |---post_dispatch(m, msize)
                          |---else
                              |---enqueue(m, priority, 0);  // 加入队列

      AsyncMessenger类图

      ceph——AsyncMessenger类图_ceph asyncmessenger类视图-CSDN博客

      ceph:消息通信机制小记--研读

      Send IO

      消息发送过程可以分为几个关键流程:

      –s1–>AsyncMessenger::send_message(Message m)是Msg模块提供给上层提交IO请求的接口,IO请求封装在Message中。
      –s2–>AsyncMessenger::submit_message()根据目标地址获取AsyncConnection对象,将Message传入该AsyncConnection对象中继续处理
      –s3–>AsyncConnection::send_message(m)是IO请求在AsyncConnection实例中的入口,该函数主要将Message放在优先级队列AsyncConnection::out_q,并调用dispatch_event_external()将write_handler入队。
      –s4–>当Worker线程又被唤醒并执行这个write_handler时,就会调用AsyncConnection::handle_write(),该函数主要是从out_q中依照优先级将一个Message出队,并通过AsyncConnection::write_message()把Mesage中的payload、middle、data都拷贝到AsyncConnection::outcoming_bl中,并调用AsyncConnection::_try_send()
      –s5–>AsyncConnection::_try_send()作为Send IO在Async层的最后一个接口,主要是将outcoming_bl向下层传递,最终会到达RDMAStack,调用其中的RDMAConnectedSocketImpl::send()。
      –s6–>RDMAStack内部会维持一个pending_bl,将RDMAConnectedSocketImpl::send()上层传入的outcoming_bl”接”在其后,调用的Chunk::write()启动发送
      –s7–>ibv_post_send()是verbs中发送数据的接口,发送过程至此结束。

      Receive IO

      在RDMA协议中,收发完成(!=成功)接收消息首先会在CQ(Completion Queue)中放入CQE(CQ Entry)来通知上层有事件完成,verbs标准支持轮询和通知两种机制,Ceph RDMAStack使用的是轮询机制

      –r1–>RDMADispatcher::polling线程不断的调用ibv_poll_cq()来轮询事件是否完成。
      –r2–>如果发现一个底层读事件完成,就会通过conn = get_conn_lockless(response->qp_num)来获取其Connection对象,进而找到其关联的Worker和EventCenter,写EventCenter对象的notify_fd,由于该fd已经注册在file_events中,此举将唤醒Worker线程。
      –r3–>EventCenter::Worker被唤醒后回调notify_fd的readcb,其中核心函数是AsyncConnection::process();
      –r4–>AsyncConnection::process()线程会申请一块buffer用于接收收到的数据,这个buffer最终会被封装到bufferlist中并进一步被封装为Message供上层使用。
      –r5–>AsyncConnection本身有一个读缓冲区:recv_buf,该buffer只给AsyncConnection::read_util()使用,until,顾名思义,就是该接口一定会读到想读取的长度,该接口首先试图从recv_buf中获取请求的数据,当recv_buf已有数据不能满足请求时,就要依情况从底层读取,对于recv_buf可以承载的数据长度,会调用底层接口先将recv_buf填满,再将从中读取所需;对于recv_buf不足的数据长度,直接从底层获取,从底层读取的接口是read_bulk()
      –r6–>AsyncConnection::read_bulk()最终会带着传入的buffer,层层调用,直到RDMAStack中RDMAConnectedSocketImpl::read()–>Chunk::read()–>memcpy()将获取的数据填充到为Message准备号的buffer中
      –r7–>AsyncConnection::process()经过层层调用已为Message的构造封装好了数据,接下来就只是构造一个Message,入dispatch_queue,交给上层处理。

      版权声明:本文内容来自第三方投稿或授权转载,原文地址:https://blog.csdn.net/bandaoyu/article/details/112307552,作者:bandaoyu,版权归原作者所有。本网站转在其作品的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如因作品内容、版权等问题需要同本网站联系,请发邮件至ctyunbbs@chinatelecom.cn沟通。

      上一篇:National _C_C++_A\\3.埃及分数

      下一篇:【数据库】时序数据库InfluxDB 性能测试和为什么时序数据库更快、时序数据库应用场景

      相关文章

      2025-05-19 09:04:22

      loki仿函数原理

      loki仿函数原理

      2025-05-19 09:04:22
      lt , void
      2025-05-16 09:15:17

      多源BFS问题(1)_01矩阵

      多源BFS问题(1)_01矩阵

      2025-05-16 09:15:17
      lt , 矩阵 , 遍历
      2025-05-16 09:15:17

      BFS解决拓扑排序(1)_课程表

      BFS解决拓扑排序(1)_课程表

      2025-05-16 09:15:17
      lt , 课程 , 队列
      2025-05-16 09:15:17

      BFS解决最短路问题(4)_为高尔夫比赛砍树

      BFS解决最短路问题(4)_为高尔夫比赛砍树

      2025-05-16 09:15:17
      BFS , lt , 复杂度 , 算法
      2025-05-16 09:15:17

      Linux系统基础-多线程超详细讲解(5)_单例模式与线程池

      Linux系统基础-多线程超详细讲解(5)_单例模式与线程池

      2025-05-16 09:15:17
      单例 , 线程 , 队列
      2025-05-16 09:15:10

      52.介绍AOP有几种实现方式

      52.介绍AOP有几种实现方式

      2025-05-16 09:15:10
      gt , lt , Spring
      2025-05-16 09:15:10

      C语言练习之猜名次-----A选手说:B第二,我第三;B选手说:我第二,E第四;C选手说:我第一,D第二;D选手说:C最后,我第三;E选手说:我第四,A第一;

      C语言练习之猜名次-----A选手说:B第二,我第三;B选手说:我第二,E第四;C选手说:我第一,D第二;D选手说:C最后,我第三;E选手说:我第四,A第一;

      2025-05-16 09:15:10
      amp , lt , 排名
      2025-05-14 10:07:38

      超级好用的C++实用库之互斥锁

      互斥锁是一种用于多线程编程的同步机制,其主要目的是确保在并发执行环境中,同一时间内只有一个线程能够访问和修改共享资源。

      2025-05-14 10:07:38
      CHP , Lock , 互斥 , 线程 , 释放 , 锁定
      2025-05-14 10:03:13

      超级好用的C++实用库之线程基类

      在C++中,线程是操作系统能够进行运算调度的最小单位。一个进程可以包含多个线程,这些线程共享进程的资源,比如:内存空间和系统资源,但它们有自己的指令指针、堆栈和局部变量等。

      2025-05-14 10:03:13
      Linux , void , Windows , 函数 , 操作系统 , 线程
      2025-05-14 10:02:48

      互斥锁解决redis缓存击穿

      在高并发系统中,Redis 缓存是一种常见的性能优化方式。然而,缓存击穿问题也伴随着高并发访问而来。

      2025-05-14 10:02:48
      Redis , 互斥 , 数据库 , 线程 , 缓存 , 请求
      查看更多
      推荐标签

      作者介绍

      天翼云小翼
      天翼云用户

      文章

      33561

      阅读量

      5237850

      查看更多

      最新文章

      Linux系统基础-多线程超详细讲解(5)_单例模式与线程池

      2025-05-16 09:15:17

      C语言练习之猜名次-----A选手说:B第二,我第三;B选手说:我第二,E第四;C选手说:我第一,D第二;D选手说:C最后,我第三;E选手说:我第四,A第一;

      2025-05-16 09:15:10

      52.介绍AOP有几种实现方式

      2025-05-16 09:15:10

      超级好用的C++实用库之互斥锁

      2025-05-14 10:07:38

      超级好用的C++实用库之线程基类

      2025-05-14 10:03:13

      互斥锁解决redis缓存击穿

      2025-05-14 10:02:48

      查看更多

      热门文章

      Java线程同步synchronized wait notifyAll

      2023-04-18 14:15:05

      Android Priority Job Queue (Job Manager):线程任务的容错重启机制(二)

      2024-09-25 10:13:46

      操作系统中的线程种类

      2023-04-24 11:27:18

      Android Priority Job Queue (Job Manager):多重不同Job并发执行并在前台获得返回结果(四)

      2023-04-13 09:54:33

      实现远程线程DLL注入

      2023-05-04 08:57:15

      【Java并发编程】之十:使用wait/notify/notifyAll实现线程间通信的几点重要说明

      2023-04-24 11:25:19

      查看更多

      热门标签

      java Java python 编程开发 代码 开发语言 算法 线程 Python html 数组 C++ 元素 javascript c++
      查看更多

      相关产品

      弹性云主机

      随时自助获取、弹性伸缩的云服务器资源

      天翼云电脑(公众版)

      便捷、安全、高效的云电脑服务

      对象存储

      高品质、低成本的云上存储服务

      云硬盘

      为云上计算资源提供持久性块存储

      查看更多

      随机文章

      Java 应用的资源管理:连接池与线程池

      线程工具类(根据电脑逻辑处理器个数控制同时运行的线程个数)

      “头”和“段”里有什么? ——WEB开发系列04

      操作系统中的线程种类

      【epoll】epoll多路复用和Reactor设计思想---编辑中

      Java线程中的run()和start()区别

      • 7*24小时售后
      • 无忧退款
      • 免费备案
      • 专家服务
      售前咨询热线
      400-810-9889转1
      关注天翼云
      • 旗舰店
      • 天翼云APP
      • 天翼云微信公众号
      服务与支持
      • 备案中心
      • 售前咨询
      • 智能客服
      • 自助服务
      • 工单管理
      • 客户公告
      • 涉诈举报
      账户管理
      • 管理中心
      • 订单管理
      • 余额管理
      • 发票管理
      • 充值汇款
      • 续费管理
      快速入口
      • 天翼云旗舰店
      • 文档中心
      • 最新活动
      • 免费试用
      • 信任中心
      • 天翼云学堂
      云网生态
      • 甄选商城
      • 渠道合作
      • 云市场合作
      了解天翼云
      • 关于天翼云
      • 天翼云APP
      • 服务案例
      • 新闻资讯
      • 联系我们
      热门产品
      • 云电脑
      • 弹性云主机
      • 云电脑政企版
      • 天翼云手机
      • 云数据库
      • 对象存储
      • 云硬盘
      • Web应用防火墙
      • 服务器安全卫士
      • CDN加速
      热门推荐
      • 云服务备份
      • 边缘安全加速平台
      • 全站加速
      • 安全加速
      • 云服务器
      • 云主机
      • 智能边缘云
      • 应用编排服务
      • 微服务引擎
      • 共享流量包
      更多推荐
      • web应用防火墙
      • 密钥管理
      • 等保咨询
      • 安全专区
      • 应用运维管理
      • 云日志服务
      • 文档数据库服务
      • 云搜索服务
      • 数据湖探索
      • 数据仓库服务
      友情链接
      • 中国电信集团
      • 189邮箱
      • 天翼企业云盘
      • 天翼云盘
      ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
      公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
      • 用户协议
      • 隐私政策
      • 个人信息保护
      • 法律声明
      备案 京公网安备11010802043424号 京ICP备 2021034386号