背景
什么是 RPC?
RPC 把网络交互类比为“client端访问server端上的函数”:
-
client向server发送request后开始等待
-
server收到请求后,处理、回复client
-
client又再度恢复并根据response做出反应
RPC 框架三大要素:
-
Call ID 映射
-
序列化和反序列化
-
网络传输
希望解决什么问题?
当分布式数据库在进行数据重分布时会存在下面的问题:
-
SharedQueue 存在的问题:单节点连接数(进程数):D(重分布个数)*N(节点数)*P(并发数),处理复杂查询时资源消耗过高
-
用RPC替换:单节点连接数(进程数):N(节点数)*C(常量)
所以用 bRPC 代替 SharedQueue 能够大大降低数据重分布的资源消耗过高的问题。
Protobuf service
Protocal Buffers (简称protobuf) 是谷歌的一项技术,用于数据序列化、反序列化
比如同步demo中 syn_echo.proto 文件内容如下:
syntax = "proto2"; // 声明pb版本
package synEcho; // 定义包名为synEcho, 生成代码时包名会作为命名空间
option cc_generic_services = true; // 启用C++通用服务支持
// 定义请求消息
message EchoRequest {
required string message = 1; // 表示第1字段
};
message EchoResponse {
required string message = 1;
};
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
};
-
package 语句:将相关的消息和服务放置在同一个包中,在生成代码时,包名也会被用作命名空间
-
option:protobuf 编译器 protoc 根据 protobuf service 中选择的语言(option cc_generic_services = true)生成 rpc 服务的接口和 stub 存根
-
message: RPC 的请求(request)和回复(response)定义为 protobuf 的 message:
-
字段修饰符(Field Modifiers):指定字段的修饰符,可以是 required、optional 或 repeated。
required
表示字段在每个消息中都必须存在,optional
表示字段可以存在也可以不存在,而repeated
表示字段可以重复出现,即可以包含多个相同类型的值 -
字段类型(Field Type):指定字段的类型,可以是标量类型(例如整数、浮点数、布尔值、字符串等)或消息类型(嵌套的 Protocol Buffers 消息)
-
字段名称(Field Name):字段的名称,用于在源代码中引用字段
-
字段标识号(Field Number):为每个字段分配的唯一标识号,用于在二进制序列化和反序列化时识别和定位字段
-
-
service:用于定义服务接口,其中包含一个或多个远程过程调用(RPC)方法。在服务器的业务代码中,EchoService 为基类,其中的 Echo 等价于成员函数,需要在源代码中定义。
当编写完proto文件,通过
protoc --cpp_out=./ ./xxx.proto
来生成 xxx.pb.cc 和 xxx.pb.h 文件,在业务源代码中需要 include xxx.pb.h。同步 bRPC
客户端
在 bRPC 中没有 client 对应的实体,取而代之的是
brpc::Channel
。可以把 Channel 视作 Client。 1、建立 Channel 及初始化
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
// Init 连接服务器, 如果指定LD连接服务器集群; 否则连接单台服务器
if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
Init 函数分为连接一台服务器和连接服务集群:
-
连接一台服务器:
// options为NULL时取默认值
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
这类初始化连接的服务器有固定的 IP 地址,不需要命名服务和负载均衡,创建起来相对轻量。
-
连接服务集群
int Init(const char* naming_service_url,
const char* load_balancer_name,
const ChannelOptions* options);
这类 Channel 需要定期从
naming_service_url
指定的命名服务中获得服务器列表,并通过load_balancer_name
指定的负载均衡算法选择出一台机器发送请求。
-
naming_service_url
:ip:port 或 域名:port -
load_balancer_name
:指定负载均衡算法,为 NULL 时等同于连接单台 server 的 Init
2、创建 stub
synEcho::EchoService_Stub stub(&channel);
一般不直接调用 Channel.CallMethod,而是通过 protobuf XXX_Stub 更像是“调用函数”:
XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);
3、远程过程调用
while(!brpc::IsAskedToQuit()) {
synEcho::EchoRequest request;
synEcho::EchoResponse response;
brpc::Controller cntl;
request.set_message("synEcho test from client");
cntl.request_attachment().append(FLAGS_attachment);
stub.Echo(&cntl, &request, &response, NULL);
if(!cntl.Failed()) {
LOG(INFO) << "Receive response from" << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << response.message() << "(attached="
<< cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us";
}
else {
LOG(WARNING) << cntl.ErrorText();
}
sleep(1);
}
brpc::Controller:是一个用于控制和管理 RPC 调用的对象。它包含了与 RPC 调用相关的状态和控制信息。
-
Controller
的主要作用如下:-
处理 attachment:attachment 由用户自定义,不经过protobuf的序列化,在应用层上通过编码和解码操作来处理 attachment 的转换。
-
错误处理:
Controller
提供了Failed()
方法用于检查 RPC 调用是否失败。如果调用失败,可以通过调用ErrorText()
方法获取错误信息 -
超时设置:
Controller
允许设置 RPC 调用的超时时间,通过调用SetTimeoutMs()
方法可以设置超时时间,单位为毫秒。
-
-
同步访问:
stub.some_method(controller, request, response, NULL);
-
CallMethod会阻塞直到收到server端返回response或者发生错误
-
服务端
1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace synEcho {
class EchoServiceIml : public EchoService {
public:
EchoServiceIml() {}
virtual ~EchoServiceIml() {}
virtual void Echo(google::protobuf::RpcController* controller,
const ::synEcho::EchoRequest* request,
synEcho::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
LOG(INFO) << "Received request from " << cntl->remote_side()
<< " to " << cntl->local_side()
<< ": " << request->message()
<< " (attached=" << cntl->request_attachment() << ")";
// server 回复消息
response->set_message("synEcho from server");
// 是否有attachment
if(FLAGS_echo_attachment) {
cntl->response_attachment().append(FLAGS_attachment);
}
}
};
}
当客户端发来请求,函数Echo会被调用,其中参数done由框架创建,递给服务回调,包含了调用服务回调后的后续动作,包括检查response正确性,序列化,打包,发送等逻辑。不管成功失败,done->Run()必须在请求处理完成后被用户调用一次。为了确保 done->Run() 会被调用,使用使用 ClosureGuard 确保done->Run()被调用,RAII机制(ClosureGuard析构函数内调用了done->Run())
2、创建server并加入Service
brpc::Server server;
synEcho::EchoServiceIml echo_service_iml;
// Add the service into server
if(server.AddService(&echo_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
默认构造后的Server不包含任何服务,也不会对外提供服务,仅仅是一个对象,需要加入 Service:
int AddService
(
google
::
protobuf
::
Service
*
service
,
ServiceOwnership ownership
);
ownership:
-
SERVER_OWNS_SERVICE,Server在析构时会一并删除Service
-
SERVER_DOESNT_OWN_SERVICE,不会删除 Service
3、监听端口
一个server只能监听一个端口,需要监听N个端口就起N个Server
butil::EndPoint point;
if(!FLAGS_listen_addr.empty()) {
if(butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
}
}
else {
point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
}
4、启动
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_sec;
if(server.Start(point, &options) != 0) {
LOG(ERROR) << "Fail to start server";
return -1;
}
Start 有四个重载:
int Start(const char* ip_and_port_str, const ServerOptions* opt);
int Start(EndPoint ip_and_port, const ServerOptions* opt);
int Start(int port, const ServerOptions* opt);
int Start(const char *ip_str, PortRange port_range, const ServerOptions *opt); // r32009后增加, 在可用端口范围中找到第一个可用的端口
异步 bRPC
客户端
1、建立 Channel 及初始化
2、创建 stub
3、远程过程调用
给CallMethod传递一个额外的回调对象done,CallMethod在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后,所以:
-
Channel/request 可以定义在栈上
-
response/controller 仍可能被框架及done->Run() 使用,它们一般得创建在堆上,并在done->Run()中删除
void HandleEchoResponse(brpc::Controller* cntl, asyEcho::EchoResponse* response) {
// std::unique_ptr 确保在return之前删除 cntl/response 对象
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<asyEcho::EchoResponse> response_guard(response);
if(cntl->Failed()) {
LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
return;
}
LOG(INFO) << "Received response from " << cntl->remote_side()
<< ": " << response->message()
<< " latency=" << cntl->latency_us() << "us";
}
// CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
asyEcho::EchoResponse* response = new asyEcho::EchoResponse();
brpc::Controller* cntl = new brpc::Controller();
// Channel/request 可以定义在栈上
asyEcho::EchoRequest request;
request.set_message("asyEcho test from client");
google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, cntl, response);
stub.Echo(cntl, &request, response, done);
服务端
1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace asyEcho{
class EchoServiceIml : public EchoService {
public:
EchoServiceIml() {}
virtual ~EchoServiceIml() {}
virtual void Echo(::google::protobuf::RpcController* controller,
const ::asyEcho::EchoRequest* request,
::asyEcho::EchoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
LOG(INFO) << "Receive request from " << cntl->remote_side()
<< ": " << request->message();
// server 回复消息
response->set_message("asyEcho from server");
}
};
Service 和 Channel 都可以使用 done 来表达后续的操作,但它们完全不同:
-
Service 的 done 由框架创建,用户处理请求后调用done把response发回给client
-
Channel 的 done 由用户创建,待 RPC 结束后被框架调用以执行用户的后续代码
如果采用异步Service,退出服务回调时请求未处理完成,done->Run()不应被调用,done应被保存下来供以后调用,但是异步Service会因各种原因跳出回调,如果不使用ClosureGuard,一些分支很可能会在return前忘记done->Run(),所以也建议在异步service中使用done_guard,但与同步Service不同的是,为了避免正常脱离函数时done->Run()也被调用,调用done_guard.release()来释放其中的done
2、创建server并加入Service
3、监听端口
4、启动
流式 bRPC
如果需要发送大量数据,client/server 之间虽然可以通过多次 RPC 把数据切分后传输过去,但存在如下问题:
-
如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑复杂。
-
如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时。
流式 RPC 能够让大块数据以流水线的方式在client/server之间传递。 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。
Streaming RPC 保证:
-
有消息边界
-
接收消息的顺序和发送消息的顺序严格一致
-
全双工
-
支持流控
-
提供超时提醒
proto文件:
syntax = "proto2";
package StreamExpample;
option cc_generic_services = true;
message StreamRequest {
required string message = 1;
};
message StreamResponse {
required string message = 1;
};
service StreamService {
rpc StreamFun(StreamRequest) returns (StreamResponse);
};
客户端
1、建立 Channel 及初始化
Stream RPC 必须使用baidu_std协议
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
2、建立 Stream
Stream 都由 Client 端建立:
-
Client 先在本地创建一个 Stream
-
通过一次 PRC(必须使用baidu_std协议)与指定 Service 建立一个 Stream
-
如果 Service 在收到请求之后选择接受这个 Stream, 那在 response 返回 Client 后 Stream 就会建立成功
bRPC 用 StreamId 代表一个 Stream,对 Stream 的读写、关闭都作用在这个 Id 上。
int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);
struct StreamOptions {
// 对端允许的未消费数据的最大大小。
// 如果 max_buf_size <= 0,则表示缓冲区大小没有限制。默认值为 2097152(2M)
int max_buf_size;
// 表示在最后一次调用 on_received_messages 或 on_idle_timeout 完成后
// 如果在至少 idle_timeout_ms 毫秒内没有数据可用,则通知用户。
// 如果 idle_timeout_ms 的值为 -1,则表示没有设置超时时间。默认值为 -1。
long idle_timeout_ms;
// 表示传递给 handler->on_received_messages 的最大批量消息数。默认值为 128
size_t messages_in_batch;
// 处理输入消息的处理程序。
// 如果 handler 为 NULL,则表示远程端不允许写入任何消息,写入操作将返回 EBADF 错误
// 默认值为 NULL
StreamInputHandler* handler;
};
brpc::Controller cntl;
brpc::StreamId stream;
if(brpc::StreamCreate(&stream, cntl, NULL) != 0) {
LOG(ERROR) << "Fail to create stream";
return -1;
}
StreamExpample::StreamRequest request;
StreamExpample::StreamResponse response;
request.set_message("Client to connect stream");
stub.StreamFun(&cntl, &request, &response, NULL);
if(cntl.Failed()) {
LOG(ERROR) << "Fail to connect stream";
return -1;
}
3、写入 Stream
int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
功能描述:Write |message| into |stream_id|. 远程端的处理程序将按写入顺序接收消息
while(!brpc::IsAskedToQuit()) {
butil::IOBuf msg1;
msg1.append("abcdefghijklmnopqrstuvwxyz");
brpc::StreamWrite(stream, msg1);
butil::IOBuf msg2;
msg2.append("1234567890");
brpc::StreamWrite(stream, msg2);
sleep(2);
}
4、关闭 Stream
int StreamClose
(
StreamId stream_id
);
CHECK_EQ(0, brpc::StreamClose(stream));
LOG(INFO) << "Client is going to quit";
服务端
1、实现生成的 Service 接口
// 实现proto文件中服务接口的远程过程调用(RPC)方法
namespace StreamExpample{
class StreamServiceIml : public StreamService {
public:
StreamServiceIml() {}
virtual ~StreamServiceIml() {}
virtual void StreamFun(::google::protobuf::RpcController* controller,
const ::StreamExpample::StreamRequest* request,
::StreamExpample::StreamResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;
stream_options.handler = &_StreamReceiver;
if(brpc::StreamAccept(&_streamid, *cntl, &stream_options) != 0) {
cntl->SetFailed("Fail to accept stream");
return;
}
response->set_message("Server accepted stream");
}
private:
brpc::StreamId _streamid;
StreamReceiver _StreamReceiver;
};
}
在实现的 Service 接口中,包含接受Stream和读取Stream的实现:
-
接受 Stream:Service 在收到Client的Stream连接,通过 StreamAccept 接受:
int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);
接受后 Server 端对应产生的 Stream 存放在 response_stream 中,Server 通过这个 Stream 向 Client 发送数据 -
读取 Stream:在建立或接受一个 Stream 的时候,可以继承 StreamInputHandler 并把这个 handler 填入 StreamOptions 中,通过这个 handler,可以处理对端的写入数据、连接关闭、idle timeout
class StreamInputHandler {
public:
// 当接收到消息后被调用
virtual int on_received_messages(StreamId id,
butil::IOBuf *const messages[],
size_t size) = 0;
// 当Stream上长时间没有数据交互后被调用
virtual void on_idle_timeout(StreamId id) = 0;
// 当Stream被关闭时被调用
virtual void on_closed(StreamId id) = 0;
};
比如该例子中:
class StreamReceiver : public brpc::StreamInputHandler {
public:
virtual int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) {
std::ostringstream os;
for (size_t i = 0; i < size; ++i) {
os << "msg[" << i << "]=" << *messages[i];
}
LOG(INFO) << "Received from Stream=" << id << ": " << os.str();
return 0;
}
virtual void on_idle_timeout(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
}
virtual void on_closed(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " is closed";
}
};
2、加入 service:
brpc::Server server;
StreamExpample::StreamServiceIml stream_servive_iml;
if(server.AddService(&stream_servive_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
3、启动:
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if(server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start server";
return -1;
}
Attachment+读写数据缓冲区
对于数据重分布,每次发送数据包的大小相对较大,对于bRPC有 Attachment 和 stream API。但是 Stream API 会独占连接,所以选择 Attachment 方式,直接收发二进制数据,然后写进缓冲区,让其他线程处理。
异步client+attachment,同步server+share memory
Client:
#include <brpc/callback.h>
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "buffer_demo.pb.h"
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "pooled", "single, pooled, short");
DEFINE_int32(timeout_ms, 100, "对应Channel上所有RPC的总超时, 单位毫秒");
DEFINE_int32(max_retry, 3, "该Channel上所有RPC的最大重试次数, 默认为3, 0表示不重试");
DEFINE_string(server, "0.0.0.0:8000", "服务器的 IP");
DEFINE_string(load_balancer, "", "负载均衡的算法, 为空则等价于连接单机server");
int count = 0;
void HandleResponse(brpc::Controller* cntl, BufferDemo::Response* response) {
// std::unique_ptr 确保在return之前删除 cntl/response 对象
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<BufferDemo::Response> response_guard(response);
if(cntl->Failed()) {
LOG(WARNING) << "Fail to send EchoRequest, " << cntl->ErrorText();
return;
}
}
int main() {
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if(channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to init channel";
return -1;
}
BufferDemo::BufferService_Stub stub(&channel);
while(!brpc::IsAskedToQuit()) {
// CallMethod结束不意味着RPC结束, response/controller仍可能被框架及done->Run()使用, 创建在堆上
BufferDemo::Response* response = new BufferDemo::Response();
brpc::Controller* cntl = new brpc::Controller();
// Channel/request 可以定义在栈上
BufferDemo::Request request;
cntl->request_attachment().append("count = " + std::to_string(count++) + ", ");
cntl->request_attachment().append("Buffer attachment from client");
/*
* append_user_data函数测试, 可以使用
*/
// std::string message = "buffer append_user_data attachment from client";
// void* dataPtr = reinterpret_cast<void*>(&message[0]);
// size_t dataSize = message.size();
// // 向附件中追加用户数据,并指定删除器函数
// int result = cntl->request_attachment().append_user_data(dataPtr, dataSize, [](void*) {});
google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse, cntl, response);
stub.BufferFun(cntl, &request, response, done);
sleep(5);
}
LOG(INFO) << "Client is going to quit";
return 0;
}
Server:
#include <butil/iobuf.h>
#include <gflags/gflags.h>
#include <brpc/log.h>
#include <brpc/server.h>
#include "buffer_demo.pb.h"
#include <brpc/stream.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <unistd.h>
#include <iostream>
#define SHARED_MEMORY_KEY 1234
#define SHARED_MEMORY_SIZE 1024
char* sharedMemory;
int shmid;
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(port, 8000, "TCP Port of this server");
// 实现proto文件中服务接口的远程过程调用(RPC)方法, 这里是存入到共享内存中
namespace BufferDemo{
class BufferServiceIml : public BufferService {
public:
BufferServiceIml() {}
virtual ~BufferServiceIml() {}
virtual void BufferFun(::google::protobuf::RpcController* controller,
const ::BufferDemo::Request* request,
::BufferDemo::Response* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
std::string attachment = cntl->request_attachment().to_string();
strncpy(sharedMemory, attachment.c_str(), SHARED_MEMORY_SIZE);
}
};
}
int main() {
// 创建共享内存
shmid = shmget(SHARED_MEMORY_KEY, SHARED_MEMORY_SIZE, IPC_CREAT | 0666);
if (shmid == -1) {
std::cerr << "Failed to create shared memory" << std::endl;
return -1;
}
// 附加共享内存
sharedMemory = (char*)shmat(shmid, NULL, 0);
if (sharedMemory == (char*)-1) {
std::cerr << "Failed to attach shared memory" << std::endl;
return -1;
}
brpc::Server server;
BufferDemo::BufferServiceIml buffer_service_iml;
if(server.AddService(&buffer_service_iml, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if(server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start server";
return -1;
}
// 创建子进程
pid_t pid = fork();
if (pid < 0) {
perror("fork");
exit(1);
} else if (pid == 0) {
// 子进程用于读取共享内存数据并清空
while (1) {
// 检查共享内存是否有数据
if (strlen(sharedMemory) > 0) {
// 读取共享内存中的数据
printf("Child Process read data from buffer: %s\n", sharedMemory);
// 清空共享内存
memset(sharedMemory, 0, SHARED_MEMORY_SIZE);
}
}
}
server.RunUntilAskedToQuit();
// 等待子进程结束
wait(NULL);
// 分离共享内存
shmdt(sharedMemory);
// 删除共享内存
shmctl(shmid, IPC_RMID, NULL);
return 0;
}
运行结果如下图:
客户端:
服务端: