searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

使用unix domain socket和prototbuf 实现跨语言进程通信

2023-11-30 05:46:40
32
0

前言

       Unix domain socket(也称为 Unix 域套接字)是一种用于同一台主机上进程间通信(IPC)的机制。与常规网络套接字不同,Unix domain socket 不依赖于网络协议,并且只能用于在同一台机器上的进程之间通信, 这使得 Unix socket 比网络套接字更快和更有效。

       protobuf(Google Protocol Buffers)是Google提供一个高效的协议数据交换格式工具库, 具有比JSON提高3到5倍的时间和空间效率。

       在最近的一个项目中,使用了两种不同的语言来实现业务应用和运维工具:业务应用使用C语言实现,运维工具为了开发效率使用了golang来实现。 由于C语言没有原生的gRPC库实现,所以业务应用和运维工具直接使用unix domain socket进行交互,数据传输使用protobuf 进行序列化和反序列化。本文将以一个简单的通信为例介绍如何使用unix domain socket和protobuf 实现跨语言的进程通信。

工作模式

        Unix domain socket 工作模式为C/S ( Client/Server, 客户端/服务端) 模式,客户端和服务端使用固定的socket文件进行通信。在本项目中,业务应用作为服务端,接收来自客户端的请求,客户端连接服务端发送请求,展示服务端返回的请求结果。

消息头格式

       为了更好的做消息控制,约定了请求和响应使用固定的报文头。

struct sk_request_msg {

    char traceid[TRACEID_LEN];

    uint32_t version;

    uint16_t op_id;

    uint16_t op_type;

    size_t len;

    char data[0];

};

struct sk_reply_msg {

    char errstr[ERRSTR_LEN];

    uint32_t version;

    uint16_t op_id;

    uint16_t op_type;

    int errcode;

    uint32_t padding;

    size_t len;

    char data[0];

};

        注: 在消息中,data 部分即是使用protobuf序列化过后的的数据内容,这里需要注意的是消息头使用64bit对齐,因此在回复消息中可以看到一个32bit的padding字段。

        相应的golang 定义如下

type Request struct {

    TraceId [64]byte

    Version uint32

    ID uint16

    Type uint16

    Len uint64

    Data []byte

}

 type Response struct {

    ErrMsg []byte

    Version uint32

    ID uint16

    Type uint16

    ErrCode int32

    Rsvd uint32

    Len uint64

    Data []byte

}

        针对发送消息,需要对消息头也进行序列化,因此针对请求需要实现完整的序列化方法

// Marshal serialization for socket data

func (r *Request) Marshal() ([]byte, error) {

    data := make([]byte, 0, RequestHDRLen+r.Len)

    wt := bytes.Buffer{}

    err := binary.Write(&wt, binary.LittleEndian, r.TraceId)

    if err != nil {

        return []byte{}, err

    }

    data = append(data, wt.Bytes()...)

 

    buf := make([]byte, 4)

    binary.LittleEndian.PutUint32(buf, uint32(r.Version))

    data = append(data, buf...)

 

    buf1 := make([]byte, 2)

    binary.LittleEndian.PutUint16(buf1, uint16(r.ID))

    data = append(data, buf1...)

 

    buf2 := make([]byte, 2)

    binary.LittleEndian.PutUint16(buf2, uint16(r.Type))

    data = append(data, buf2...)

 

    buf3 := make([]byte, 8)

    binary.LittleEndian.PutUint64(buf3, uint64(r.Len))

    data = append(data, buf3...)

 

    wd := bytes.Buffer{}

    err = binary.Write(&wd, binary.LittleEndian, r.Data)

    if err != nil {

        return []byte{}, err

    }

 

    data = append(data, wd.Bytes()...)

    return data, nil

}

         针对回复消息要进行反序列化

// Unmarshal deserialization from binary

func (r *Response) Unmarshal(d []byte) error {

    if len(d) < ResponseHDRLen {

        return ErrInvalPkt

    }

    r.ErrMsg = d[:64]

    r.Version = binary.LittleEndian.Uint32(d[64:])

    r.ID = binary.LittleEndian.Uint16(d[68:])

    r.Type = binary.LittleEndian.Uint16(d[70:])

    r.ErrCode = int32(binary.LittleEndian.Uint32(d[72:]))

    r.Len = binary.LittleEndian.Uint64(d[80:])

    r.Data = d[88 : r.Len+88]

 

    return nil

}

         注: 当前实现未对大端机型进行适配, 如要在大端机型上运行,需要按需进行适配。

Protobuf定义请求

syntax = "proto3";

// [START go_declaration]

option  go_package = "./proto";

// [END go_declaration]

enum Magic {

    UNKNOWN = 0;

    CTYUN = 0x27102711;

};

message Version {

    Magic magic = 1;

    string version = 2;

}

service TestService {

    rpc ShowVersion(Version) returns (Version);

}

       使用proto工具生成对应的c和golang定义:

OBJS := $(shell find . -name '*.proto' | sort)

 

all: $(OBJS)

     # go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

    for i in $(OBJS);                                  \

    do                                                 \

        protoc-c --c_out=. $$i;                        \

        protoc --go_out=../pkg $$i;                    \

    done

服务实现

        在消息头里面定义了op_type 和op_id, 这两个字段主要用于路由不同的操作到对应的回调函数,这里仅列出回调接口内的关键代码实现

    Version version = VERSION__INIT, *verp = NULL;

    char ver_str[VERSION_LEN] = {0};

    uint8_t *request = (uint8_t *)conf;

    uint8_t *reply = NULL;

    size_t sz_version = 0;

 

    verp = version__unpack(NULL, size, request);

    if (verp == NULL) {

        return ERROR_INVPKT;

    }

 

    if (verp->magic != MAGIC__CTYUN) {

        return ERROR _INVAL;

    }

    version__free_unpacked(verp, NULL);

 

    memset(ver_str, 0, VERSION_LEN);

    sprintf(ver_str, "version: %s (%s), build on %s",

            VERSION, COMMIT, BUILD_DATE);

    version.magic = MAGIC__CTYUN;

    version.version = ver_str;

 

    reply = rte_zmalloc(NULL, version__get_packed_size(&version),

                        RTE_CACHE_LINE_SIZE);

    if (reply == NULL) {

        return ERROR_NOMEM;

    }

    sz_version = version__pack(&version, reply);

       其中request 是客户端发送请求序列化过后的数据,因此在服务端首先使用自动生成的version__unpack接口进行反序列化,得到相应的消息结构,对消息的幻数进行校验,校验通过后,构造相应消息,然后使用version__pack序列化回复消息。

       在路由函数调完这个操作回调后,将消息封装上回复消息头,然后通过socket发还给客户端。 其大体实现代码如下:

    memset(&ctl_addr, 0, sizeof(struct sockaddr_un));

    ctl_len = sizeof(ctl_addr);

 

    // socket 侦听

    ctl_fd = accept(srv_fd, (struct sockaddr*)&ctl_addr, &ctl_len);

    if (ctl_fd < 0) {

        return ERROR_IO;

    }

 

    ret = sk_msg_recv(ctl_fd, &msg);

 

    if (ret == ERROR_OK) {

        // 消息路由,调用对应回调

        …

 

        // 构造回复消息头

        memset(&reply_hdr, 0, sizeof(reply_hdr));

        reply_hdr.version = SK_VERSION;

        reply_hdr.id = msg->id;

        reply_hdr.type = msg->type;

        reply_hdr.errcode = ret;

        strncpy(reply_hdr.errstr, strerror(ret), ERRSTR_LEN - 1);

        reply_hdr.len = reply_data_len;

 

        // 发送回复消息

        ret = sk_msg_send(ctl_fd, &reply_hdr, reply_data, reply_data_len);

    }

    sk_msg_free(msg);

客户端实现

        在客户端使用golang 开发,调用过程为先将请求消息通过protobuf 进行序列化,然后封装好消息头,连接Unix域套接字接口发送消息,然后读取回复,解析消息头并反序列化具体的消息内容:

func (c *Client) Call(ctx context.Context, rid, rt uint16, req, resp proto.Message) error {

    d, err := pb.Marshal(req)

    if err != nil {

        return err

    }

 

    request := Request{

        Version: SKVersion,

        ID:      rid,

        Type:    rt,

        Len:     uint64(len(d)),

        Data:    d,

        TraceId:uuid.NewV4().Bytes(),

    }

 

    data, err := request.Marshal()

    if err != nil {

        return err

    }

 

    conn, err := net.DialTimeout("unix", "/var/run/ctl.sock", c.option.Timeout)

    if err != nil {

        return err

    }

    defer conn.Close()

    err = conn.SetDeadline(c.option.Deadline)

    if err != nil {

        return err

    }

 

    _, err = conn.Write(data)

    if err != nil {

        return err

    }

 

    buff, err := io.ReadAll(conn)

    if err != nil {

        return err

    }

 

    var response Response

    err = response.Unmarshal(buff)

    if err != nil {

        return err

    }

 

    if response.ErrCode != 0 {

        return errors.New(string(response.ErrMsg))

    }

 

    if rt == SKGet {

        err = pb.Unmarshal(response.Data, resp)

        if err != nil {

            return err

        }

    }

    return nil

}

        命令行调用上面的call 接口

    req := &pb.Version{

        Magic: pb.Magic_CTYUN,

    }

    resp := &pb.Version{}

 

    err := c.uc.Call(ctx, SKGetVersion, SKGet, req, resp)

    if err != nil {   

        fmt.Println(err)

        os.Exit(1)

    }

 

    fmt.Println(string(resp.Version))

        运行代码进行测试,可以得到如下结果

结论

       Unix 域套接字是一种用于同一台主机上高效的进程间通信(IPC)的机制,同时借助protobuf 消息序列化及反序列化,可以实现应用间的高效通信,而且实现了跨语言的变成开发,实现业务应用和运维工具在不同的团队进行独立维护。

0条评论
0 / 1000