searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

bRPC使用分享

2023-09-25 08:34:25
279
0

背景

什么是 RPC?
RPC 把网络交互类比为“client端访问server端上的函数”:
  • client向server发送request后开始等待
  • server收到请求后,处理、回复client
  • client又再度恢复并根据response做出反应

 

RPC 框架三大要素:
  • Call ID 映射
  • 序列化和反序列化
  • 网络传输

 

希望解决什么问题?
 

 

当分布式数据库在进行数据重分布时会存在下面的问题:

  • SharedQueue 存在的问题:
        单节点连接数(进程数):D(重分布个数)*N(节点数)*P(并发数),处理复杂查询时资源消耗过高
  • 用RPC替换:
        单节点连接数(进程数):N(节点数)*C(常量)

 所以用 bRPC 代替 SharedQueue 能够大大降低数据重分布的资源消耗过高的问题。

 

Protobuf service

Protocal Buffers (简称protobuf) 是谷歌的一项技术,用于数据序列化、反序列化
比如同步demo中 syn_echo.proto 文件内容如下:
syntax = "proto2"; // 声明pb版本
package synEcho;   // 定义包名为synEcho, 生成代码时包名会作为命名空间

option cc_generic_services = true;  // 启用C++通用服务支持

// 定义请求消息
message EchoRequest {
    required string message = 1;    // 表示第1字段
};

message EchoResponse {
    required string message = 1;
};

service EchoService {
    rpc Echo(EchoRequest) returns (EchoResponse);
};

 

  • package 语句:将相关的消息和服务放置在同一个包中,在生成代码时,包名也会被用作命名空间
  • option:protobuf 编译器 protoc 根据 protobuf service 中选择的语言(option cc_generic_services = true)生成 rpc 服务的接口和 stub 存根
  • message: RPC 的请求(request)和回复(response)定义为 protobuf 的 message:
    • 字段修饰符(Field Modifiers):指定字段的修饰符,可以是 required、optional 或 repeated。required 表示字段在每个消息中都必须存在,optional 表示字段可以存在也可以不存在,而 repeated 表示字段可以重复出现,即可以包含多个相同类型的值
    • 字段类型(Field Type):指定字段的类型,可以是标量类型(例如整数、浮点数、布尔值、字符串等)或消息类型(嵌套的 Protocol Buffers 消息)
    • 字段名称(Field Name):字段的名称,用于在源代码中引用字段
    • 字段标识号(Field Number):为每个字段分配的唯一标识号,用于在二进制序列化和反序列化时识别和定位字段
  • service:用于定义服务接口,其中包含一个或多个远程过程调用(RPC)方法。在服务器的业务代码中,EchoService 为基类,其中的 Echo 等价于成员函数,需要在源代码中定义。
 
当编写完proto文件,通过protoc --cpp_out=./ ./xxx.proto来生成 xxx.pb.cc 和 xxx.pb.h 文件,在业务源代码中需要 include xxx.pb.h。

同步 bRPC

客户端

在 bRPC 中没有 client 对应的实体,取而代之的是brpc::Channel。可以把 Channel 视作 Client。
  1、建立 Channel 及初始化
 
brpc::Channel channel;
    
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;

// Init 连接服务器, 如果指定LD连接服务器集群; 否则连接单台服务器
if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
    LOG(ERROR) << "Fail to initialize channel";
    return -1;
}
   Init 函数分为连接一台服务器和连接服务集群:
  • 连接一台服务器:
// options为NULL时取默认值
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
     这类初始化连接的服务器有固定的 IP 地址,不需要命名服务和负载均衡,创建起来相对轻量。
  • 连接服务集群
int Init(const char* naming_service_url,         
         const char* load_balancer_name,         
         const ChannelOptions* options);
   这类 Channel 需要定期从naming_service_url指定的命名服务中获得服务器列表,并通过load_balancer_name指定的负载均衡算法选择出一台机器发送请求。
  • naming_service_url:ip:port 或 域名:port
  • load_balancer_name:指定负载均衡算法,为 NULL 时等同于连接单台 server 的 Init
 
2、创建 stub
synEcho::EchoService_Stub stub(&channel);
  一般不直接调用 Channel.CallMethod,而是通过 protobuf XXX_Stub 更像是“调用函数”:
 
XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);
3、远程过程调用
while(!brpc::IsAskedToQuit()) {
    synEcho::EchoRequest request;
    synEcho::EchoResponse response;
    brpc::Controller cntl;

    request.set_message("synEcho test from client");

    cntl.request_attachment().append(FLAGS_attachment);

    stub.Echo(&cntl, &request, &response, NULL);
    if(!cntl.Failed()) {
        LOG(INFO) << "Receive response from" << cntl.remote_side()
                  << " to " << cntl.local_side()
                  << ": " << response.message() << "(attached="
                  << cntl.response_attachment() << ")"
                  << " latency=" << cntl.latency_us() << "us";
    }
    else {
        LOG(WARNING) << cntl.ErrorText();
    }
    sleep(1);
}
 brpc::Controller:是一个用于控制和管理 RPC 调用的对象。它包含了与 RPC 调用相关的状态和控制信息。
  •     Controller 的主要作用如下:
    • 处理 attachment:attachment 由用户自定义,不经过protobuf的序列化,在应用层上通过编码和解码操作来处理 attachment 的转换。
    • 错误处理:Controller 提供了 Failed() 方法用于检查 RPC 调用是否失败。如果调用失败,可以通过调用 ErrorText() 方法获取错误信息
    • 超时设置:Controller 允许设置 RPC 调用的超时时间,通过调用 SetTimeoutMs() 方法可以设置超时时间,单位为毫秒。
  • 同步访问:stub.some_method(controller, request, response, NULL);
    • CallMethod会阻塞直到收到server端返回response或者发生错误
 

服务端

1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace synEcho {
class EchoServiceIml : public EchoService {
public:
    EchoServiceIml() {}
    virtual ~EchoServiceIml() {}
    virtual void Echo(google::protobuf::RpcController* controller,
                       const ::synEcho::EchoRequest* request,
                       synEcho::EchoResponse* response,
                       google::protobuf::Closure* done) {
        
        brpc::ClosureGuard done_guard(done);

        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        LOG(INFO) << "Received request from " << cntl->remote_side()
            << " to " << cntl->local_side()
            << ": " << request->message()
            << " (attached=" << cntl->request_attachment() << ")";
        
        // server 回复消息
        response->set_message("synEcho from server");

        // 是否有attachment
        if(FLAGS_echo_attachment) {
            cntl->response_attachment().append(FLAGS_attachment);
        }
    }
};
}
当客户端发来请求,函数Echo会被调用,其中参数done由框架创建,递给服务回调,包含了调用服务回调后的后续动作,包括检查response正确性,序列化,打包,发送等逻辑。不管成功失败,done->Run()必须在请求处理完成后被用户调用一次。为了确保 done->Run() 会被调用,使用使用 ClosureGuard 确保done->Run()被调用,RAII机制(ClosureGuard析构函数内调用了done->Run())
2、创建server并加入Service
brpc::Server server;

synEcho::EchoServiceIml echo_service_iml;
// Add the service into server
if(server.AddService(&echo_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
}
默认构造后的Server不包含任何服务,也不会对外提供服务,仅仅是一个对象,需要加入 Service:
  int AddService(google::protobuf::Service* service, ServiceOwnership ownership);
  ownership:
  • SERVER_OWNS_SERVICE,Server在析构时会一并删除Service
  • SERVER_DOESNT_OWN_SERVICE,不会删除 Service
3、监听端口
一个server只能监听一个端口,需要监听N个端口就起N个Server
butil::EndPoint point;
if(!FLAGS_listen_addr.empty()) {
    if(butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
        LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
    }
}
else {
    point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
}
4、启动
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_sec;
if(server.Start(point, &options) != 0) {
    LOG(ERROR) << "Fail to start server";
    return -1;
}
  Start 有四个重载:
int Start(const char* ip_and_port_str, const ServerOptions* opt);
int Start(EndPoint ip_and_port, const ServerOptions* opt);
int Start(int port, const ServerOptions* opt);
int Start(const char *ip_str, PortRange port_range, const ServerOptions *opt);  // r32009后增加, 在可用端口范围中找到第一个可用的端口

异步 bRPC

客户端

1、建立 Channel 及初始化
2、创建 stub
3、远程过程调用
  给CallMethod传递一个额外的回调对象done,CallMethod在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后,所以:
  • Channel/request 可以定义在栈上
  • response/controller 仍可能被框架及done->Run() 使用,它们一般得创建在堆上,并在done->Run()中删除
void HandleEchoResponse(brpc::Controller* cntl, asyEcho::EchoResponse* response) {
    // std::unique_ptr 确保在return之前删除 cntl/response 对象
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<asyEcho::EchoResponse> response_guard(response);

    if(cntl->Failed()) {
        LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
        return;
    }
    LOG(INFO) << "Received response from " << cntl->remote_side()
        << ": " << response->message()
        << " latency=" << cntl->latency_us() << "us";
}


// CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
asyEcho::EchoResponse* response = new asyEcho::EchoResponse();
brpc::Controller* cntl = new brpc::Controller();

// Channel/request 可以定义在栈上
asyEcho::EchoRequest request;
request.set_message("asyEcho test from client");

google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, cntl, response);
stub.Echo(cntl, &request, response, done);

 

服务端

1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace asyEcho{
class EchoServiceIml : public EchoService {
public:
    EchoServiceIml() {}
    virtual ~EchoServiceIml() {}
    virtual void Echo(::google::protobuf::RpcController* controller,
                       const ::asyEcho::EchoRequest* request,
                       ::asyEcho::EchoResponse* response,
                       ::google::protobuf::Closure* done) {
        
        brpc::ClosureGuard done_guard(done);

        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        LOG(INFO) << "Receive request from " << cntl->remote_side()
            << ": " << request->message();

        // server 回复消息
        response->set_message("asyEcho from server");
    }
};
  Service 和 Channel 都可以使用 done 来表达后续的操作,但它们完全不同:
  • Service 的 done 由框架创建,用户处理请求后调用done把response发回给client
  • Channel 的 done 由用户创建,待 RPC 结束后被框架调用以执行用户的后续代码
  如果采用异步Service,退出服务回调时请求未处理完成,done->Run()不应被调用,done应被保存下来供以后调用,但是异步Service会因各种原因跳出回调,如果不使用ClosureGuard,一些分支很可能会在return前忘记done->Run(),所以也建议在异步service中使用done_guard,但与同步Service不同的是,为了避免正常脱离函数时done->Run()也被调用,调用done_guard.release()来释放其中的done
2、创建server并加入Service
3、监听端口
4、启动

 

流式 bRPC

如果需要发送大量数据,client/server 之间虽然可以通过多次 RPC 把数据切分后传输过去,但存在如下问题:
  • 如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑复杂。
  • 如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时。
流式 RPC 能够让大块数据以流水线的方式在client/server之间传递。 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。
Streaming RPC 保证:
  • 有消息边界
  • 接收消息的顺序和发送消息的顺序严格一致
  • 全双工
  • 支持流控
  • 提供超时提醒
 
proto文件:
syntax = "proto2";
package StreamExpample;

option cc_generic_services = true;

message StreamRequest {
    required string message = 1;
};

message StreamResponse {
    required string message = 1;
};

service StreamService {
    rpc StreamFun(StreamRequest) returns (StreamResponse);
};

 

客户端

1、建立 Channel 及初始化
  Stream RPC 必须使用baidu_std协议
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
2、建立 Stream
  Stream 都由 Client 端建立:
  • Client 先在本地创建一个 Stream
  • 通过一次 PRC(必须使用baidu_std协议)与指定 Service 建立一个 Stream
  • 如果 Service 在收到请求之后选择接受这个 Stream, 那在 response 返回 Client 后 Stream 就会建立成功
   bRPC 用 StreamId 代表一个 Stream,对 Stream 的读写、关闭都作用在这个 Id 上。
  int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);
struct StreamOptions {
    // 对端允许的未消费数据的最大大小。
    // 如果 max_buf_size <= 0,则表示缓冲区大小没有限制。默认值为 2097152(2M)
    int max_buf_size; 
    // 表示在最后一次调用 on_received_messages 或 on_idle_timeout 完成后
    // 如果在至少 idle_timeout_ms 毫秒内没有数据可用,则通知用户。
    // 如果 idle_timeout_ms 的值为 -1,则表示没有设置超时时间。默认值为 -1。
    long idle_timeout_ms;     
    // 表示传递给 handler->on_received_messages 的最大批量消息数。默认值为 128
    size_t messages_in_batch; 
    // 处理输入消息的处理程序。
    // 如果 handler 为 NULL,则表示远程端不允许写入任何消息,写入操作将返回 EBADF 错误
    // 默认值为 NULL
    StreamInputHandler* handler;
};
brpc::Controller cntl;
brpc::StreamId stream;
if(brpc::StreamCreate(&stream, cntl, NULL) != 0) {
    LOG(ERROR) << "Fail to create stream";
    return -1;
}

StreamExpample::StreamRequest request;
StreamExpample::StreamResponse response;
request.set_message("Client to connect stream");
stub.StreamFun(&cntl, &request, &response, NULL);
if(cntl.Failed()) {
    LOG(ERROR) << "Fail to connect stream";
    return -1;
}

 

3、写入 Stream
  int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
  功能描述:Write |message| into |stream_id|. 远程端的处理程序将按写入顺序接收消息
while(!brpc::IsAskedToQuit()) {
    butil::IOBuf msg1;
    msg1.append("abcdefghijklmnopqrstuvwxyz");
    brpc::StreamWrite(stream, msg1);

    butil::IOBuf msg2;
    msg2.append("1234567890");
    brpc::StreamWrite(stream, msg2);
        
    sleep(2);
}

 

4、关闭 Stream
  int StreamClose(StreamId stream_id);
CHECK_EQ(0, brpc::StreamClose(stream));
LOG(INFO) << "Client is going to quit";

 

服务端

  1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace StreamExpample{
class StreamServiceIml : public StreamService {
public:
    StreamServiceIml() {}
    virtual ~StreamServiceIml() {}

    virtual void StreamFun(::google::protobuf::RpcController* controller,
                        const ::StreamExpample::StreamRequest* request,
                        ::StreamExpample::StreamResponse* response,
                        ::google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        brpc::StreamOptions stream_options;
        stream_options.handler = &_StreamReceiver;
        if(brpc::StreamAccept(&_streamid, *cntl, &stream_options) != 0) {
            cntl->SetFailed("Fail to accept stream");
            return;
        }
        response->set_message("Server accepted stream");
    }

private:
    brpc::StreamId _streamid;
    StreamReceiver _StreamReceiver;
};
}
 
  在实现的 Service 接口中,包含接受Stream读取Stream的实现:
  • 接受 Stream:Service 在收到Client的Stream连接,通过 StreamAccept 接受: int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); 接受后 Server 端对应产生的 Stream 存放在 response_stream 中,Server 通过这个 Stream 向 Client 发送数据
  • 读取 Stream:在建立或接受一个 Stream 的时候,可以继承 StreamInputHandler 并把这个 handler 填入 StreamOptions 中,通过这个 handler,可以处理对端的写入数据、连接关闭、idle timeout
class StreamInputHandler {
public:    
    // 当接收到消息后被调用
    virtual int on_received_messages(StreamId id, 
                                     butil::IOBuf *const messages[], 
                                     size_t size) = 0; 
    // 当Stream上长时间没有数据交互后被调用
    virtual void on_idle_timeout(StreamId id) = 0; 
    // 当Stream被关闭时被调用
    virtual void on_closed(StreamId id) = 0;
};
 
    比如该例子中:
 
class StreamReceiver : public brpc::StreamInputHandler {
public:
    virtual int on_received_messages(brpc::StreamId id, 
                                     butil::IOBuf *const messages[], 
                                     size_t size) {
        std::ostringstream os;
        for (size_t i = 0; i < size; ++i) {
            os << "msg[" << i << "]=" << *messages[i];
        }
        LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
        return 0;
    }

    virtual void on_idle_timeout(brpc::StreamId id) {
        LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
    }

    virtual void on_closed(brpc::StreamId id) {
        LOG(INFO) << "Stream=" << id << " is closed";
    }
};
 
  2、加入 service:
brpc::Server server;

StreamExpample::StreamServiceIml stream_servive_iml;
if(server.AddService(&stream_servive_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
}
 
  3、启动:
 
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if(server.Start(FLAGS_port, &options) != 0) {
    LOG(ERROR) << "Fail to start server";
    return -1;
}
 

Attachment+读写数据缓冲区

对于数据重分布,每次发送数据包的大小相对较大,对于bRPC有 Attachment 和 stream API。但是 Stream API 会独占连接,所以选择 Attachment 方式,直接收发二进制数据,然后写进缓冲区,让其他线程处理。
异步client+attachment,同步server+share memory
Client:
#include <brpc/callback.h>
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "buffer_demo.pb.h"

DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "pooled", "single, pooled, short");
DEFINE_int32(timeout_ms, 100, "对应Channel上所有RPC的总超时, 单位毫秒");
DEFINE_int32(max_retry, 3, "该Channel上所有RPC的最大重试次数, 默认为3, 0表示不重试");
DEFINE_string(server, "0.0.0.0:8000", "服务器的 IP");
DEFINE_string(load_balancer, "", "负载均衡的算法, 为空则等价于连接单机server");

int count = 0;

void HandleResponse(brpc::Controller* cntl, BufferDemo::Response* response) {
    // std::unique_ptr 确保在return之前删除 cntl/response 对象
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<BufferDemo::Response> response_guard(response);

    if(cntl->Failed()) {
        LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
        return;
    }
}

int main() {
    brpc::Channel channel;
    brpc::ChannelOptions options;
    options.protocol = FLAGS_protocol;
    options.connection_type = FLAGS_connection_type;
    options.timeout_ms = FLAGS_timeout_ms;
    options.max_retry = FLAGS_max_retry;
    if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
        LOG(ERROR) << "Fail to init channel";
        return -1;
    }

    BufferDemo::BufferService_Stub stub(&channel);

    while(!brpc::IsAskedToQuit()) {
        // CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
        BufferDemo::Response* response = new BufferDemo::Response();
        brpc::Controller* cntl = new brpc::Controller();

        // Channel/request 可以定义在栈上
        BufferDemo::Request request;
        cntl->request_attachment().append("count = " + std::to_string(count++) + ", ");
        cntl->request_attachment().append("Buffer attachment from client");

        /*
         * append_user_data函数测试, 可以使用
        */
        // std::string message = "buffer append_user_data attachment from client";
        // void* dataPtr = reinterpret_cast<void*>(&message[0]);
        // size_t dataSize = message.size();
        // // 向附件中追加用户数据,并指定删除器函数
        // int result = cntl->request_attachment().append_user_data(dataPtr, dataSize, [](void*) {});

        google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse, cntl, response);
        stub.BufferFun(cntl, &request, response, done);

        sleep(5);
    }

    LOG(INFO) << "Client is going to quit";
    return 0;
}
 
Server:
 
#include <butil/iobuf.h>
#include <gflags/gflags.h>
#include <brpc/log.h>
#include <brpc/server.h>
#include "buffer_demo.pb.h"
#include <brpc/stream.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <unistd.h>
#include <iostream>

#define SHARED_MEMORY_KEY 1234
#define SHARED_MEMORY_SIZE 1024

char* sharedMemory;
int shmid;

DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
             "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(port, 8000, "TCP Port of this server");

// 实现proto文件中服务接口的远程过程调用(RPC)方法, 这里是存入到共享内存中
namespace BufferDemo{
class BufferServiceIml : public BufferService {
public:
    BufferServiceIml() {}
    virtual ~BufferServiceIml() {}

    virtual void BufferFun(::google::protobuf::RpcController* controller,
                        const ::BufferDemo::Request* request,
                        ::BufferDemo::Response* response,
                        ::google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        std::string attachment = cntl->request_attachment().to_string();
        strncpy(sharedMemory, attachment.c_str(), SHARED_MEMORY_SIZE);  
    }
};
}

int main() {
    // 创建共享内存
    shmid = shmget(SHARED_MEMORY_KEY, SHARED_MEMORY_SIZE, IPC_CREAT | 0666);
    if (shmid == -1) {
        std::cerr << "Failed to create shared memory" << std::endl;
        return -1;
    }

    // 附加共享内存
    sharedMemory = (char*)shmat(shmid, NULL, 0);
    if (sharedMemory == (char*)-1) {
        std::cerr << "Failed to attach shared memory" << std::endl;
        return -1;
    }

    brpc::Server server;
    BufferDemo::BufferServiceIml buffer_service_iml;
    if(server.AddService(&buffer_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        LOG(ERROR) << "Fail to add service";
        return -1;
    }

    brpc::ServerOptions options;
    options.idle_timeout_sec = FLAGS_idle_timeout_s;
    if(server.Start(FLAGS_port, &options) != 0) {
        LOG(ERROR) << "Fail to start server";
        return -1;
    }

    // 创建子进程
    pid_t pid = fork();
    if (pid < 0) {
        perror("fork");
        exit(1);
    } else if (pid == 0) {
        // 子进程用于读取共享内存数据并清空
        while (1) {
            // 检查共享内存是否有数据
            if (strlen(sharedMemory) > 0) {
                // 读取共享内存中的数据
                printf("Child Process read data from buffer: %s\n", sharedMemory);
                // 清空共享内存
                memset(sharedMemory, 0, SHARED_MEMORY_SIZE);
            }
        }
    }

    server.RunUntilAskedToQuit();
    // 等待子进程结束
    wait(NULL);
    // 分离共享内存
    shmdt(sharedMemory);
    // 删除共享内存
    shmctl(shmid, IPC_RMID, NULL);
    return 0;
}

运行结果如下图:

 客户端:

服务端:

0条评论
0 / 1000
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
原创

bRPC使用分享

2023-09-25 08:34:25
279
0

背景

什么是 RPC?
RPC 把网络交互类比为“client端访问server端上的函数”:
  • client向server发送request后开始等待
  • server收到请求后,处理、回复client
  • client又再度恢复并根据response做出反应

 

RPC 框架三大要素:
  • Call ID 映射
  • 序列化和反序列化
  • 网络传输

 

希望解决什么问题?
 

 

当分布式数据库在进行数据重分布时会存在下面的问题:

  • SharedQueue 存在的问题:
        单节点连接数(进程数):D(重分布个数)*N(节点数)*P(并发数),处理复杂查询时资源消耗过高
  • 用RPC替换:
        单节点连接数(进程数):N(节点数)*C(常量)

 所以用 bRPC 代替 SharedQueue 能够大大降低数据重分布的资源消耗过高的问题。

 

Protobuf service

Protocal Buffers (简称protobuf) 是谷歌的一项技术,用于数据序列化、反序列化
比如同步demo中 syn_echo.proto 文件内容如下:
syntax = "proto2"; // 声明pb版本
package synEcho;   // 定义包名为synEcho, 生成代码时包名会作为命名空间

option cc_generic_services = true;  // 启用C++通用服务支持

// 定义请求消息
message EchoRequest {
    required string message = 1;    // 表示第1字段
};

message EchoResponse {
    required string message = 1;
};

service EchoService {
    rpc Echo(EchoRequest) returns (EchoResponse);
};

 

  • package 语句:将相关的消息和服务放置在同一个包中,在生成代码时,包名也会被用作命名空间
  • option:protobuf 编译器 protoc 根据 protobuf service 中选择的语言(option cc_generic_services = true)生成 rpc 服务的接口和 stub 存根
  • message: RPC 的请求(request)和回复(response)定义为 protobuf 的 message:
    • 字段修饰符(Field Modifiers):指定字段的修饰符,可以是 required、optional 或 repeated。required 表示字段在每个消息中都必须存在,optional 表示字段可以存在也可以不存在,而 repeated 表示字段可以重复出现,即可以包含多个相同类型的值
    • 字段类型(Field Type):指定字段的类型,可以是标量类型(例如整数、浮点数、布尔值、字符串等)或消息类型(嵌套的 Protocol Buffers 消息)
    • 字段名称(Field Name):字段的名称,用于在源代码中引用字段
    • 字段标识号(Field Number):为每个字段分配的唯一标识号,用于在二进制序列化和反序列化时识别和定位字段
  • service:用于定义服务接口,其中包含一个或多个远程过程调用(RPC)方法。在服务器的业务代码中,EchoService 为基类,其中的 Echo 等价于成员函数,需要在源代码中定义。
 
当编写完proto文件,通过protoc --cpp_out=./ ./xxx.proto来生成 xxx.pb.cc 和 xxx.pb.h 文件,在业务源代码中需要 include xxx.pb.h。

同步 bRPC

客户端

在 bRPC 中没有 client 对应的实体,取而代之的是brpc::Channel。可以把 Channel 视作 Client。
  1、建立 Channel 及初始化
 
brpc::Channel channel;
    
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;

// Init 连接服务器, 如果指定LD连接服务器集群; 否则连接单台服务器
if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
    LOG(ERROR) << "Fail to initialize channel";
    return -1;
}
   Init 函数分为连接一台服务器和连接服务集群:
  • 连接一台服务器:
// options为NULL时取默认值
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
     这类初始化连接的服务器有固定的 IP 地址,不需要命名服务和负载均衡,创建起来相对轻量。
  • 连接服务集群
int Init(const char* naming_service_url,         
         const char* load_balancer_name,         
         const ChannelOptions* options);
   这类 Channel 需要定期从naming_service_url指定的命名服务中获得服务器列表,并通过load_balancer_name指定的负载均衡算法选择出一台机器发送请求。
  • naming_service_url:ip:port 或 域名:port
  • load_balancer_name:指定负载均衡算法,为 NULL 时等同于连接单台 server 的 Init
 
2、创建 stub
synEcho::EchoService_Stub stub(&channel);
  一般不直接调用 Channel.CallMethod,而是通过 protobuf XXX_Stub 更像是“调用函数”:
 
XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);
3、远程过程调用
while(!brpc::IsAskedToQuit()) {
    synEcho::EchoRequest request;
    synEcho::EchoResponse response;
    brpc::Controller cntl;

    request.set_message("synEcho test from client");

    cntl.request_attachment().append(FLAGS_attachment);

    stub.Echo(&cntl, &request, &response, NULL);
    if(!cntl.Failed()) {
        LOG(INFO) << "Receive response from" << cntl.remote_side()
                  << " to " << cntl.local_side()
                  << ": " << response.message() << "(attached="
                  << cntl.response_attachment() << ")"
                  << " latency=" << cntl.latency_us() << "us";
    }
    else {
        LOG(WARNING) << cntl.ErrorText();
    }
    sleep(1);
}
 brpc::Controller:是一个用于控制和管理 RPC 调用的对象。它包含了与 RPC 调用相关的状态和控制信息。
  •     Controller 的主要作用如下:
    • 处理 attachment:attachment 由用户自定义,不经过protobuf的序列化,在应用层上通过编码和解码操作来处理 attachment 的转换。
    • 错误处理:Controller 提供了 Failed() 方法用于检查 RPC 调用是否失败。如果调用失败,可以通过调用 ErrorText() 方法获取错误信息
    • 超时设置:Controller 允许设置 RPC 调用的超时时间,通过调用 SetTimeoutMs() 方法可以设置超时时间,单位为毫秒。
  • 同步访问:stub.some_method(controller, request, response, NULL);
    • CallMethod会阻塞直到收到server端返回response或者发生错误
 

服务端

1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace synEcho {
class EchoServiceIml : public EchoService {
public:
    EchoServiceIml() {}
    virtual ~EchoServiceIml() {}
    virtual void Echo(google::protobuf::RpcController* controller,
                       const ::synEcho::EchoRequest* request,
                       synEcho::EchoResponse* response,
                       google::protobuf::Closure* done) {
        
        brpc::ClosureGuard done_guard(done);

        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        LOG(INFO) << "Received request from " << cntl->remote_side()
            << " to " << cntl->local_side()
            << ": " << request->message()
            << " (attached=" << cntl->request_attachment() << ")";
        
        // server 回复消息
        response->set_message("synEcho from server");

        // 是否有attachment
        if(FLAGS_echo_attachment) {
            cntl->response_attachment().append(FLAGS_attachment);
        }
    }
};
}
当客户端发来请求,函数Echo会被调用,其中参数done由框架创建,递给服务回调,包含了调用服务回调后的后续动作,包括检查response正确性,序列化,打包,发送等逻辑。不管成功失败,done->Run()必须在请求处理完成后被用户调用一次。为了确保 done->Run() 会被调用,使用使用 ClosureGuard 确保done->Run()被调用,RAII机制(ClosureGuard析构函数内调用了done->Run())
2、创建server并加入Service
brpc::Server server;

synEcho::EchoServiceIml echo_service_iml;
// Add the service into server
if(server.AddService(&echo_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
}
默认构造后的Server不包含任何服务,也不会对外提供服务,仅仅是一个对象,需要加入 Service:
  int AddService(google::protobuf::Service* service, ServiceOwnership ownership);
  ownership:
  • SERVER_OWNS_SERVICE,Server在析构时会一并删除Service
  • SERVER_DOESNT_OWN_SERVICE,不会删除 Service
3、监听端口
一个server只能监听一个端口,需要监听N个端口就起N个Server
butil::EndPoint point;
if(!FLAGS_listen_addr.empty()) {
    if(butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
        LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
    }
}
else {
    point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
}
4、启动
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_sec;
if(server.Start(point, &options) != 0) {
    LOG(ERROR) << "Fail to start server";
    return -1;
}
  Start 有四个重载:
int Start(const char* ip_and_port_str, const ServerOptions* opt);
int Start(EndPoint ip_and_port, const ServerOptions* opt);
int Start(int port, const ServerOptions* opt);
int Start(const char *ip_str, PortRange port_range, const ServerOptions *opt);  // r32009后增加, 在可用端口范围中找到第一个可用的端口

异步 bRPC

客户端

1、建立 Channel 及初始化
2、创建 stub
3、远程过程调用
  给CallMethod传递一个额外的回调对象done,CallMethod在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后,所以:
  • Channel/request 可以定义在栈上
  • response/controller 仍可能被框架及done->Run() 使用,它们一般得创建在堆上,并在done->Run()中删除
void HandleEchoResponse(brpc::Controller* cntl, asyEcho::EchoResponse* response) {
    // std::unique_ptr 确保在return之前删除 cntl/response 对象
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<asyEcho::EchoResponse> response_guard(response);

    if(cntl->Failed()) {
        LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
        return;
    }
    LOG(INFO) << "Received response from " << cntl->remote_side()
        << ": " << response->message()
        << " latency=" << cntl->latency_us() << "us";
}


// CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
asyEcho::EchoResponse* response = new asyEcho::EchoResponse();
brpc::Controller* cntl = new brpc::Controller();

// Channel/request 可以定义在栈上
asyEcho::EchoRequest request;
request.set_message("asyEcho test from client");

google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, cntl, response);
stub.Echo(cntl, &request, response, done);

 

服务端

1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace asyEcho{
class EchoServiceIml : public EchoService {
public:
    EchoServiceIml() {}
    virtual ~EchoServiceIml() {}
    virtual void Echo(::google::protobuf::RpcController* controller,
                       const ::asyEcho::EchoRequest* request,
                       ::asyEcho::EchoResponse* response,
                       ::google::protobuf::Closure* done) {
        
        brpc::ClosureGuard done_guard(done);

        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        LOG(INFO) << "Receive request from " << cntl->remote_side()
            << ": " << request->message();

        // server 回复消息
        response->set_message("asyEcho from server");
    }
};
  Service 和 Channel 都可以使用 done 来表达后续的操作,但它们完全不同:
  • Service 的 done 由框架创建,用户处理请求后调用done把response发回给client
  • Channel 的 done 由用户创建,待 RPC 结束后被框架调用以执行用户的后续代码
  如果采用异步Service,退出服务回调时请求未处理完成,done->Run()不应被调用,done应被保存下来供以后调用,但是异步Service会因各种原因跳出回调,如果不使用ClosureGuard,一些分支很可能会在return前忘记done->Run(),所以也建议在异步service中使用done_guard,但与同步Service不同的是,为了避免正常脱离函数时done->Run()也被调用,调用done_guard.release()来释放其中的done
2、创建server并加入Service
3、监听端口
4、启动

 

流式 bRPC

如果需要发送大量数据,client/server 之间虽然可以通过多次 RPC 把数据切分后传输过去,但存在如下问题:
  • 如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑复杂。
  • 如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时。
流式 RPC 能够让大块数据以流水线的方式在client/server之间传递。 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。
Streaming RPC 保证:
  • 有消息边界
  • 接收消息的顺序和发送消息的顺序严格一致
  • 全双工
  • 支持流控
  • 提供超时提醒
 
proto文件:
syntax = "proto2";
package StreamExpample;

option cc_generic_services = true;

message StreamRequest {
    required string message = 1;
};

message StreamResponse {
    required string message = 1;
};

service StreamService {
    rpc StreamFun(StreamRequest) returns (StreamResponse);
};

 

客户端

1、建立 Channel 及初始化
  Stream RPC 必须使用baidu_std协议
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
2、建立 Stream
  Stream 都由 Client 端建立:
  • Client 先在本地创建一个 Stream
  • 通过一次 PRC(必须使用baidu_std协议)与指定 Service 建立一个 Stream
  • 如果 Service 在收到请求之后选择接受这个 Stream, 那在 response 返回 Client 后 Stream 就会建立成功
   bRPC 用 StreamId 代表一个 Stream,对 Stream 的读写、关闭都作用在这个 Id 上。
  int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);
struct StreamOptions {
    // 对端允许的未消费数据的最大大小。
    // 如果 max_buf_size <= 0,则表示缓冲区大小没有限制。默认值为 2097152(2M)
    int max_buf_size; 
    // 表示在最后一次调用 on_received_messages 或 on_idle_timeout 完成后
    // 如果在至少 idle_timeout_ms 毫秒内没有数据可用,则通知用户。
    // 如果 idle_timeout_ms 的值为 -1,则表示没有设置超时时间。默认值为 -1。
    long idle_timeout_ms;     
    // 表示传递给 handler->on_received_messages 的最大批量消息数。默认值为 128
    size_t messages_in_batch; 
    // 处理输入消息的处理程序。
    // 如果 handler 为 NULL,则表示远程端不允许写入任何消息,写入操作将返回 EBADF 错误
    // 默认值为 NULL
    StreamInputHandler* handler;
};
brpc::Controller cntl;
brpc::StreamId stream;
if(brpc::StreamCreate(&stream, cntl, NULL) != 0) {
    LOG(ERROR) << "Fail to create stream";
    return -1;
}

StreamExpample::StreamRequest request;
StreamExpample::StreamResponse response;
request.set_message("Client to connect stream");
stub.StreamFun(&cntl, &request, &response, NULL);
if(cntl.Failed()) {
    LOG(ERROR) << "Fail to connect stream";
    return -1;
}

 

3、写入 Stream
  int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
  功能描述:Write |message| into |stream_id|. 远程端的处理程序将按写入顺序接收消息
while(!brpc::IsAskedToQuit()) {
    butil::IOBuf msg1;
    msg1.append("abcdefghijklmnopqrstuvwxyz");
    brpc::StreamWrite(stream, msg1);

    butil::IOBuf msg2;
    msg2.append("1234567890");
    brpc::StreamWrite(stream, msg2);
        
    sleep(2);
}

 

4、关闭 Stream
  int StreamClose(StreamId stream_id);
CHECK_EQ(0, brpc::StreamClose(stream));
LOG(INFO) << "Client is going to quit";

 

服务端

  1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace StreamExpample{
class StreamServiceIml : public StreamService {
public:
    StreamServiceIml() {}
    virtual ~StreamServiceIml() {}

    virtual void StreamFun(::google::protobuf::RpcController* controller,
                        const ::StreamExpample::StreamRequest* request,
                        ::StreamExpample::StreamResponse* response,
                        ::google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        brpc::StreamOptions stream_options;
        stream_options.handler = &_StreamReceiver;
        if(brpc::StreamAccept(&_streamid, *cntl, &stream_options) != 0) {
            cntl->SetFailed("Fail to accept stream");
            return;
        }
        response->set_message("Server accepted stream");
    }

private:
    brpc::StreamId _streamid;
    StreamReceiver _StreamReceiver;
};
}
 
  在实现的 Service 接口中,包含接受Stream读取Stream的实现:
  • 接受 Stream:Service 在收到Client的Stream连接,通过 StreamAccept 接受: int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options); 接受后 Server 端对应产生的 Stream 存放在 response_stream 中,Server 通过这个 Stream 向 Client 发送数据
  • 读取 Stream:在建立或接受一个 Stream 的时候,可以继承 StreamInputHandler 并把这个 handler 填入 StreamOptions 中,通过这个 handler,可以处理对端的写入数据、连接关闭、idle timeout
class StreamInputHandler {
public:    
    // 当接收到消息后被调用
    virtual int on_received_messages(StreamId id, 
                                     butil::IOBuf *const messages[], 
                                     size_t size) = 0; 
    // 当Stream上长时间没有数据交互后被调用
    virtual void on_idle_timeout(StreamId id) = 0; 
    // 当Stream被关闭时被调用
    virtual void on_closed(StreamId id) = 0;
};
 
    比如该例子中:
 
class StreamReceiver : public brpc::StreamInputHandler {
public:
    virtual int on_received_messages(brpc::StreamId id, 
                                     butil::IOBuf *const messages[], 
                                     size_t size) {
        std::ostringstream os;
        for (size_t i = 0; i < size; ++i) {
            os << "msg[" << i << "]=" << *messages[i];
        }
        LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
        return 0;
    }

    virtual void on_idle_timeout(brpc::StreamId id) {
        LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
    }

    virtual void on_closed(brpc::StreamId id) {
        LOG(INFO) << "Stream=" << id << " is closed";
    }
};
 
  2、加入 service:
brpc::Server server;

StreamExpample::StreamServiceIml stream_servive_iml;
if(server.AddService(&stream_servive_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
}
 
  3、启动:
 
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if(server.Start(FLAGS_port, &options) != 0) {
    LOG(ERROR) << "Fail to start server";
    return -1;
}
 

Attachment+读写数据缓冲区

对于数据重分布,每次发送数据包的大小相对较大,对于bRPC有 Attachment 和 stream API。但是 Stream API 会独占连接,所以选择 Attachment 方式,直接收发二进制数据,然后写进缓冲区,让其他线程处理。
异步client+attachment,同步server+share memory
Client:
#include <brpc/callback.h>
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "buffer_demo.pb.h"

DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "pooled", "single, pooled, short");
DEFINE_int32(timeout_ms, 100, "对应Channel上所有RPC的总超时, 单位毫秒");
DEFINE_int32(max_retry, 3, "该Channel上所有RPC的最大重试次数, 默认为3, 0表示不重试");
DEFINE_string(server, "0.0.0.0:8000", "服务器的 IP");
DEFINE_string(load_balancer, "", "负载均衡的算法, 为空则等价于连接单机server");

int count = 0;

void HandleResponse(brpc::Controller* cntl, BufferDemo::Response* response) {
    // std::unique_ptr 确保在return之前删除 cntl/response 对象
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<BufferDemo::Response> response_guard(response);

    if(cntl->Failed()) {
        LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
        return;
    }
}

int main() {
    brpc::Channel channel;
    brpc::ChannelOptions options;
    options.protocol = FLAGS_protocol;
    options.connection_type = FLAGS_connection_type;
    options.timeout_ms = FLAGS_timeout_ms;
    options.max_retry = FLAGS_max_retry;
    if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
        LOG(ERROR) << "Fail to init channel";
        return -1;
    }

    BufferDemo::BufferService_Stub stub(&channel);

    while(!brpc::IsAskedToQuit()) {
        // CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
        BufferDemo::Response* response = new BufferDemo::Response();
        brpc::Controller* cntl = new brpc::Controller();

        // Channel/request 可以定义在栈上
        BufferDemo::Request request;
        cntl->request_attachment().append("count = " + std::to_string(count++) + ", ");
        cntl->request_attachment().append("Buffer attachment from client");

        /*
         * append_user_data函数测试, 可以使用
        */
        // std::string message = "buffer append_user_data attachment from client";
        // void* dataPtr = reinterpret_cast<void*>(&message[0]);
        // size_t dataSize = message.size();
        // // 向附件中追加用户数据,并指定删除器函数
        // int result = cntl->request_attachment().append_user_data(dataPtr, dataSize, [](void*) {});

        google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse, cntl, response);
        stub.BufferFun(cntl, &request, response, done);

        sleep(5);
    }

    LOG(INFO) << "Client is going to quit";
    return 0;
}
 
Server:
 
#include <butil/iobuf.h>
#include <gflags/gflags.h>
#include <brpc/log.h>
#include <brpc/server.h>
#include "buffer_demo.pb.h"
#include <brpc/stream.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <unistd.h>
#include <iostream>

#define SHARED_MEMORY_KEY 1234
#define SHARED_MEMORY_SIZE 1024

char* sharedMemory;
int shmid;

DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
             "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(port, 8000, "TCP Port of this server");

// 实现proto文件中服务接口的远程过程调用(RPC)方法, 这里是存入到共享内存中
namespace BufferDemo{
class BufferServiceIml : public BufferService {
public:
    BufferServiceIml() {}
    virtual ~BufferServiceIml() {}

    virtual void BufferFun(::google::protobuf::RpcController* controller,
                        const ::BufferDemo::Request* request,
                        ::BufferDemo::Response* response,
                        ::google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);

        std::string attachment = cntl->request_attachment().to_string();
        strncpy(sharedMemory, attachment.c_str(), SHARED_MEMORY_SIZE);  
    }
};
}

int main() {
    // 创建共享内存
    shmid = shmget(SHARED_MEMORY_KEY, SHARED_MEMORY_SIZE, IPC_CREAT | 0666);
    if (shmid == -1) {
        std::cerr << "Failed to create shared memory" << std::endl;
        return -1;
    }

    // 附加共享内存
    sharedMemory = (char*)shmat(shmid, NULL, 0);
    if (sharedMemory == (char*)-1) {
        std::cerr << "Failed to attach shared memory" << std::endl;
        return -1;
    }

    brpc::Server server;
    BufferDemo::BufferServiceIml buffer_service_iml;
    if(server.AddService(&buffer_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        LOG(ERROR) << "Fail to add service";
        return -1;
    }

    brpc::ServerOptions options;
    options.idle_timeout_sec = FLAGS_idle_timeout_s;
    if(server.Start(FLAGS_port, &options) != 0) {
        LOG(ERROR) << "Fail to start server";
        return -1;
    }

    // 创建子进程
    pid_t pid = fork();
    if (pid < 0) {
        perror("fork");
        exit(1);
    } else if (pid == 0) {
        // 子进程用于读取共享内存数据并清空
        while (1) {
            // 检查共享内存是否有数据
            if (strlen(sharedMemory) > 0) {
                // 读取共享内存中的数据
                printf("Child Process read data from buffer: %s\n", sharedMemory);
                // 清空共享内存
                memset(sharedMemory, 0, SHARED_MEMORY_SIZE);
            }
        }
    }

    server.RunUntilAskedToQuit();
    // 等待子进程结束
    wait(NULL);
    // 分离共享内存
    shmdt(sharedMemory);
    // 删除共享内存
    shmctl(shmid, IPC_RMID, NULL);
    return 0;
}

运行结果如下图:

 客户端:

服务端:

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0