searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

etcd client接口代码分析

2023-08-30 03:25:20
48
0
1.cluster 
 
cluster结构体
 
type cluster struct { remote pb.ClusterClient callOpts []grpc.CallOption } ​
remotecluster的api集合,包含成员增删改查的功能调用接口:
type ClusterClient interface {
    
    MemberAdd(ctx context.Context, in *MemberAddRequest, opts ...grpc.CallOption) (*MemberAddResponse, error)
    
    MemberRemove(ctx context.Context, in *MemberRemoveRequest, opts ...grpc.CallOption) (*MemberRemoveResponse, error)
    
    MemberUpdate(ctx context.Context, in *MemberUpdateRequest, opts ...grpc.CallOption) (*MemberUpdateResponse, error)
​
    MemberList(ctx context.Context, in *MemberListRequest, opts ...grpc.CallOption) (*MemberListResponse, error)
    
    MemberPromote(ctx context.Context, in *MemberPromoteRequest, opts ...grpc.CallOption) (*MemberPromoteResponse, error)
}
该成员为一个接口,etcd提供两种实现方式。callOpts表示连接设置,该接口具有两个方法,分别为beforeafter
该成员为一个接口,etcd提供两种实现方式。callOpts表示连接设置,该接口具有两个方法,分别为before和after
type CallOption interface {
    before(*callInfo) error//在发送数据之前需要执行的检查
    after(*callInfo, *csAttempt)//在发送数据之后要执行的操作
}
根据数据类型的不同重写CallOption接口可以对不同数据执行不同操作。
 
接口
 
type Cluster interface包含以下函数:
MemberList(ctx context.Context) (*MemberListResponse, error)
返回当前cluster的成员列表
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
cluster中添加一个新成员
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
cluster中添加一个无投票权的成员
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
删除一个成员
MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
将无投票权成员晋升为可投票节点
这些函数调用type ClusterClient interface接口实现的方法,直接调用底层gRPC的方法向服务器传输指令。MemberAddMemberAddAsLearner稍有不同,这两个函数共用一个接口,通过一个bool变量控制是否有投票权,然后调用gRPC方法。
 
函数
func NewCluster(c *Client) Cluster //调用RetryClusterClient为原本不是集群中的节点新建一个集群 //根据c中的callOps配置连接设置
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster //将节点c添加进现有集群中,现有集群对应api为remote
 
2.op 
 
type opType intop使用int作为类型,支持四种类型
const (
    //defalut op的optype值为0,表示无效
    tRange opType = iota + 1
    tPut
    tDeleteRange
    tTxn
)
 
op结构体
 
type Op struct {
    t   opType
    key []byte
    end []byte
​
    // 用于查询
    limit        int64  
    sort         *SortOption
    serializable bool
    keysOnly     bool
    countOnly    bool
    minModRev    int64
    maxModRev    int64
    minCreateRev int64
    maxCreateRev int64
​
    // 用于查询和watch
    rev int64
​
    // 用于查询、watch和删除
    prevKV bool
​
    // 用于 watch
    //通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分
    // "--max-request-bytes" flag value + 512-byte
    fragment bool
​
    // 用于put
    ignoreValue bool
    ignoreLease bool
​
    // 用于进度更新
    progressNotify bool
    // 用于创建事件
    createdNotify bool
    // 用于watch的筛选器
    filterPut    bool
    filterDelete bool
​
    // 用于put
    val     []byte
    leaseID LeaseID
​
    // 用于事务三元组if then else
    cmps    []Cmp
    thenOps []Op
    elseOps []Op
​
    isOptsWithFromKey bool
    isOptsWithPrefix  bool
}
 
字段
解释
类型
用途
limit
y用于限制get请求返回结果的数量,如果值为0表示无限制。
int64
range
sort
用于选取get请求返回结果的排列方式
*SortOption
range
serializable
串行化读(默认为线性读liner)
bool
range
keysOnly
控制get请求只返回key而不返回值
bool
range
countOnly
控制get请求只返回key的计数而不返回值和key
bool
range
minModRev
用于过滤修改版本号小于给定值的key
int64
range
maxModRev
用于过滤修改版本号大于给定值的key
int64
range
minCreateRev
用于过滤创建版本号小于给定值的key
int64
range
maxCreateRev
用于过滤创建版本号大于给定值的key
int64
range
rev
用于get请求特定版本号的key或从给定的版本号开始watch
int64
watch range
prevKV
用于返回上一个值,如果上一个值已经持久化则不返回
bool
range watch delete
fragment
通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分"--max-request-bytes" flag value + 512-byte
bool
watch
ignoreValue
使用key的当前值更新当前key,用于刷新版本
bool
put
ignoreLease
使用key的当前lease更新当前key,即不用指定lease
bool
put
progressNotify
在客户端没有事件发送时也定时发送心跳
bool
进度更新
createdNotify
创建事件
bool
创健
filterPut
过滤掉put事件
bool
watch
filterDelete
过滤掉删除事件
bool
watch
val
[]byte
put
leaseID
租约
LeaseID
put
cmps
txn执行的条件
[]Cmp
txn
thenOps
cmps比较结果为true执行的操作集合
[]Op
txn
elseOps
cmps比较结果为false执行的操作集合
[]Op
txn
isOptsWithFromKey
范围操作开关
bool
txn
isOptsWithPrefix
前缀操作开关
bool
txn
 
3.KV 
 
接口
 
type KV interface {
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
    Do(ctx context.Context, op Op) (OpResponse, error)
    Txn(ctx context.Context) Txn
}
type KV interface接口包含以下函数
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
函数用来向数据库中添加键值对,支持字符串格式和字符数组格式。
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
用于查询数据库中指定key的value。
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
函数删除指定key。
可以使用WithRange(end)删除[key, end)中的所有key;
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
用于将key现在rev的value和上一个rev的value进行比较。
Do(ctx context.Context, op Op) (OpResponse, error)
函数用于执行上述所有函数的操作。
do函数是延时操作,即函数执行并不会立即向数据库请求或写入数据。例如put操作:
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
case tPut:
//...
        var resp *pb.PutResponse
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
//...
 }
函数首先根据op构建request结构体,将对应的key、value、lease等信息写入结构体;然后定义对应操作的response结构体用于接受请求结果。然后调用kvclient的remote接口中的put方法,试图与服务器建立连接。
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
    out := new(PutResponse)
    err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}
到remote中的put函数,将request中的数据通过invoke函数调用服务器上注册的对应服务,继而将数据写入数据库。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}
invoke函数建立客户端和服务器的流连接,发送request数据给服务器并接受来自服务器的response。
Txn(ctx context.Context)
Txn用于创建一个事务。
 
4.lease 
 
一个lease可以关联ETCD集群中的一个或多个key。当租约过期或者被撤销时,关联的key会被自动删除。
 
lease接口
 
type Lease interface接口包含以下函数:
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
创建一个新的lease
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
撤销一个存在的租约
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
检索给定id租约的相关信息
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
返回当前存在的所有租约
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
使给定id的租约无期限,永久有效如果信道拥堵,该函数的响应有可能会被丢弃,但KeepAlive请求会被服务器接受如果被KeepAlive的数据因某些情况中止,函数会返回ErrKeepAliveHalted并包含错误原因类似于多次调用KeepAliveOnce,每次函数执行都会返回一个response
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
用于续订一次租约,相当于keepAlive创建的信道返回一次消息。
Close() error
函数释放所有与etcd服务器相连的有关lease的资源。
 
lessor结构体
type lessor struct {
    mu sync.Mutex 
​
    donec   chan struct{}
    loopErr error
​
    remote pb.LeaseClient
​
    stream       pb.Lease_LeaseKeepAliveClient
    streamCancel context.CancelFunc
​
    stopCtx    context.Context
    stopCancel context.CancelFunc
​
    keepAlives map[LeaseID]*keepAlive
​
    firstKeepAliveTimeout time.Duration
​
    firstKeepAliveOnce sync.Once
​
    callOpts []grpc.CallOption
​
    lg *zap.Logger
}
lessor结构体用于保存所有lease相关的信息,每个连接服务器的client对应一个lessor管理租约相关信息。
 
字段
解释
类型
mu
同步锁
sync.Mutex
donec
recvKeepAliveLoop停止标志
chan struct{}
loopErr
recvKeepAliveLoop停止错误原因
error
remote
rpc中与服务器通信的接口
pb.LeaseClient
stream
keepalivelease与服务器通信接口
pb.Lease_LeaseKeepAliveClient
streamCancel
keepalive取消时context
context.CancelFunc
stopCtx
 
context.Context
stopCancel
 
context.CancelFunc
keepAlives
b保存活跃的keepalive连接
map[LeaseID]*keepAlive
firstKeepAliveTimeout
 
time.Duration
firstKeepAliveOnce
 
sync.Once
callOpts
通信配置
[]grpc.CallOption
lg
日志
*zap.Logger
 
keepAlive结构体
 
type keepAlive struct {
    chs  []chan<- *LeaseKeepAliveResponse
    ctxs []context.Context
    deadline time.Time
    nextKeepAlive time.Time
    donec chan struct{}
}
字段
解释
类型
chs
keepAlive的租约的保活信道
[]chan<- *LeaseKeepAliveResponse
ctxs
用来保存活跃租约的ctx
[]context.Context
deadline
租约保活信道关闭所需的无响应时间
time.Time
nextKeepAlive
两条保活消息之间的间隔
time.Time
donec
租约撤销时的通知信道
chan struct{}
 
函数
 
NewLeaseNewLeaseFromLeaseClient为需要lease服务的client创建一个lessor,lessor用于调用lease接口定义的函数,并存储该client中有关租约的所有信息。该函数由client调用。
Grant函数由lessor实现,向etcd服务器发送注册lease请求并创建responce结构体接收响应,如果服务器端未出错,则根据返回的resp创建leaseGrantResponce结构体返回给client。
Revoke函数由lessor实现,向etcd服务器发送撤销lease请求并接收响应,如果未出错,则将resp转为leaseRevokeResponse结构体返回client。
TimeToLive函数由lessor实现,用来查看lease存续的相关信息,向etcd服务器发送请求并接受响应,随后根据接收到的响应信息构建包括leaseID,lease创建\续约时间以及服务器设定的租约存续时间等信息,并返回给client。
leases函数是由lessor实现,用来查看当前存续的所有lease信息,根据服务器返回的信息构建LeaseStatus结构体用于存储所有lease的信息。
KeepAlive是一个多路复用的租约保活机制,可以同时将服务器返回的LeaseKeepAliveResponse发送到多个channel中。这个信道在创建租约时创建,读取行为由客户端定义。该函数具体方法解释如下。
1. 创建一个带缓存的*LeaseKeepAliveResponse channel,用于接收服务端的response。 2. 获取 lessor 的锁,通过select监听donec,如果这个channel被关闭,说明keepALive任务已停止,返回一个 ErrKeepAliveHalted 3. 判断当前的keepAlives map中是否有 `ID`的映射,如果没有,就创建一个新的`keepAlive` 对象,将ch和ctx传进去,并创建映射;如果有,就将 ch 和 ctx 直接加入到该 keepAlive`对象的 chs 和 ctxs slice中 4. 释放 lessor 的锁,并启动一个 keepAliveCtxCloser 协程,用于在 ctx 取消或租约过期、撤销时,关闭 ch 和清除 keepAlives map 中对应的 keepAlive 对象。 5. 如果这个方法是首次调用,会启动recvKeepAliveLoopdeadlineLoop协程,分别用于处理KeepALive响应;监测租约是否过期并执行清理操作 ​ 输入为ctx和租约id,返回值为返回LeaseKeepAliveResponse租约保活响应的信道;如果该信道被关闭,则返回对应的一个error表示租约已经结束。
keepAliveOnce函数用于执行一次续约操作,创建与服务器进行keepAlive通信的rpc服务,发送keepAliveRequest并接受回应,只需约一次。返回值区别于keepAliveLeaseKeepAliveResponse
recvKeepAliveLoop函数在keepAlive函数第一次启动时执行,用于循环接收来自服务器的keepAliveResponse,这也是keepAlivekeepAliveOnce的区别之处。
recvKeepAlive用于处理单次的keepAliveResponse,包括对租约进行更新等操作。
deadlibeLoop用于清理长时间未收到keepAliveResonse的租约对应的chan,每秒检查一次所有租约的deadline时间是否超时,超时则关闭。
sendKeepAliveLoop用于在给定的流的整个生命周期中持续发送keepAliveRequest
close用于关闭lessor提供的租约服务,通过关闭donec使得lessor执行的其他协程检测到关闭命令并结束运行。
 
5.txn 
 
该模块负责进行要求强一致性的数据读写操作,txn结构体实现了四个方法:
方法
用途
If
将比较条件加入txn结构体中
Then
将比较条件为true时要执行的操作加入txn结构体
Else
将比较条件为false时要执行的操作加入txn结构体
Commit
将组织好的txn结构体包装成TxnRequest结构体发送给服务器并返回服务器的响应
If、Else、Then三个函数并非执行函数,而是数据组织函数,因此这三个函数可以分段执行。
 
6.watch 
 
watch监视机制可以使得用户针对存储在etcd中特定范围的数据变化进行实时检视,在watch过程中,当对应数据发生变化时,etcd会根据watch记录追溯到用户,对变更事件进行同步。常见的使用场景有分布式锁和配置中心。
 
watcher结构体
watcher是客户端的监听回调模块,内置了用于与grpc服务端建立长连接的remote;并通过一个map类型字段streams记录了多组通过ctxKey映射和服务端间通信的长连接代理对象watchGrpcStream。
字段
类型
解释
remote
pb.WatchClient
与服务端建立长连接
callOpts
[]grpc.CallOption
监视条件
mu
sync.Mutex
同步锁
streams
map[string]*watchGrpcStream
长连接代理对象的映射map
lg
*zap.Logger
日志
 
watchGrpcStream结构体
 
watchGrpcStream是对etcd客户端和etcd服务端之间grpc长连接的抽象,同时也用以处理应用放创建/删除watch请求以及处理服务端watch回调事件的中枢模块。
字段
类型
解释
owner
*watcher
所有者
remote
pb.WatchClient
与服务端建立长连接
ctxKey
string
表示该watchGrpcStream的密钥
substreams
map[int64]*watcherStream
建立watchID与对应子处理流watcherStream之间的映射
resuming
[]*watcherStream
保留使用该流的所有watch
reqc
watchStreamRequest
面向客户端的信道
respc
*pb.WatchResponse
面向服务端的信道
 
watcherStream结构体
 
watcherStream 是对某个特定的watch的处理流的抽象。
字段
类型
解释
initReq
watchRequest
创建watch时的传递请求参数
outc
chan WatchResponse
将watch回调事件推到上层endpointManager中时使用的chan
recvc
chan *WatchResponse
用于接收来自watchGrpcStream分配的watch回调事件的chan
buf
[]*WatchResponse
用于缓存 watch 回调事件的缓冲区
 
watch创建链路过程分析(客户端)
 
watcher.Watch支持的context
类别
用途
context.WithBackground() / context.WithCancel()
最常用的ctx,设置此ctx,watch会保证底层连接创建成功,也就是会不断重试,直到连接成功或者ETCD服务端返回严重错误,或者调用ctx的cancel()方法,也可以强行结束watch
context.WithTimeout()
这个就是Timeout时间后,自动帮你调用cancel(),也就是timeout时间后,不管你的Watch流是正常还是异常,都会强行结束,一般不能用这种context
clientv3.WithRequireLeader(ctx)
在ctx中加入 RequireLeader 标志,这个标志处理的问题是:若ETCD服务集群发生网络分区,且你的客户端Watch流正好连接少数一方的节点,那Watch就卡住了,不会有响应,白白浪费资源。 加入这个标志,Watch在发现自己连接的节点是少数一方节点时,尝试重新连接到其它的节点,这样就自动的处理了分区带来的不可用问题
应用方通过etcd客户端向服务端发出创建watch请求:
 
 
endpointManager.NewWatchChannel方法是watch的入口,实现对endpointManager指定target范围的数据进行监听。
在该方法中,
  • 调用Client.Get方法,获取target数据范围的历史变更记录,允许调用方获得建立watch监听之前的历史变更记录。
  • 创建一个用于向调用方传递watch回调事件的upch(最上层信道),并将该信道返回给调用方
  • 异步启动endpointManager.watch方法,持续监听更底层提供的watch chan,监听到回调事件后将其推送给upch通知调用方。
endpointManager.watch该方法是endpointManager为一个watch启动的监听协程。
  • 通过m.client.Watch方法,获取到watch对应的watch channel
  • 通过基于for+select模型持续监听watch channel,接收来自底层的watch回调事件后,会把事件推送到upch中供上层接收。
watcher.Watch该方法为监听请求创建必要的资源,向服务器提交创建watcher的请求并等待返回watch channel返回给上层用户。每次调用watch都会创建一个watchRequest,经过层层流转发送到etcd服务端,并在服务端与客户端之间建立一条gRpcstream,多个watch共用该stream。
  • 如果长连接代理对象
watcherGrpcStream未初始化,调用watcher.newWatcherGrpcStream方法完成其初始化。watcherGrpcStream本身是有生命周期的,在watcher.newWatcherGrpcStream方法中,会异步开启一个协程负责watcherGrpcStream的运行。(创建或使用缓存的watchGrpcStream
  • 将本次创建/删除watch的请求送入
watcherGrpcStream的reqc中,watcherGrpcStream运行协程处理后发送给服务器(将WatchRequest发送到watchGrpcStream)。
  • 当创建watch请求处理完成后,服务器返回的
watch channel会通过该watch对象对应的watcherSubStream中的watchRequest.retc中传出。在此之前,watcher.Watch会持续监听watchRequest.retc直到返回了watch channelwatchGrpcStream会为WatchRequest生成chan WatchResponse用于接收结果并通过chan chan WatchResponse返回给用户)。
watchGrpcStream.run该方法用于创建管理watcher client的协程。
watchGrpcStream的运行协程中:
  • 通过watchGrpcStream.newWatchClient方法,创建与etcd服务端之间的通信长连接
  • 通过for+select模型,持续轮询处理来自上层的请求和来自下层的响应。
  • 持续从reqc中接收来自上层的请求,没遇到一个创建watch的请求,会调用watchGrpcStream.serveSubstream方法,异步启动一个服务于这个watch的subStream
  • 持续从respc中接收来自etcd服务端的响应事件,并调用watch.dispatchEvent方法,将事件分发给对应的watch的watch subStream
wartchGrpcStream.newWatchClient方法主要完成两项任务:
  • 同步调用watchGrpcStream.openWatchClient->watchClient.Watch方法链,建立和etcd服务端之间的通信长连接。
  • 异步启动watchGrpcStream.serveWatchClient方法,创建出一个接收协程,持续轮询grpc长连接,接收处理etcd服务端返回的响应.
当etcd服务端完成watch的处理后,对应的响应便会在接收协程中通过pb.Watch_WatchClient.Recv方法接收到,并将其投递到watchGrpcStream的respc中,供watchGrpcStream的运行协程接收处理。
watchGrpcStream.serveSubstream
当某个watch对应的subStream通过recvc接收到create类型的watchResponse,标志着服务端已经完成了对watch创建请求的处理,subStream会把watchGrpcStream方法中提前创建好的outc分发到subStream.initReq.retc中,这是endpointManager所需要的watch channel
watch回调链路过程分析(客户端)
当应用方设置的watch监听到数据的变化时,etcd服务端会通过创建过程中建立的grpc长连接将watch回调事件发送到etcd客户端。etcd客户端在接收到watch回调事件后,会一步步自底向上将该事件交付到对应的watch应用方手中。
watch回调事件的起点为etcd客户端接收协程serveWatchClient开始,以endpointManager通过对应的watch subStreamoutc(watch channel)接收到watch回调事件,并将其投递到upch中供应用放消费为终点。
watchGrpcStream.serveWatchClient
在常驻的接收协程watchGrpcStream.serveWatchClient的轮询过程中,如果通过grpc长连接接收到来自etcd客户端的watch回调事件,则会将其分发到watchGrpcStreamrespc中,供运行协程watchGrpcStream.run进行消费处理。
watchGrpcStream.run
watchGrpcStream的运行协程通过从resp接收到来自etcd服务端的回调事件后,会根据事件所属的watch维度对事件进行聚合。
case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } else if cur != nil && cur.WatchId == pbresp.WatchId {
                // merge new events
                cur.Events = append(cur.Events, pbresp.Events...)
                // update "Fragment" field; last response with "Fragment" == false
                cur.Fragment = pbresp.Fragment
如果cur==pbresp,即新收到的响应与上一次收到的响应所属同一个watch,则事件合并。
然后提供过watchGrpcStrem.dispatchEvent->watchFrpcStream.unicastReponse的方法链路将事件分发到对应的watch subStream的recvc当中。
其中,watchGrpcStrem.dispatchEvent方法用来将WatchResponse中的事件抽离出来并将pb.WatchResponse格式的响应转化成clientV3.WatchResponse格式的响应。并根据watchID分辨该响应是属于进度提醒还是watch响应,继而选择要调用的方法。watchGrpcStrem.unicastResponse方法用于检查响应对应的watchsubStream是否存在,如果存在,则将响应推送到该subStreamrecvc中,否则返回失败。
watchGrpcStream.serveSubstream
watch subStream的运行协程中,会通过recvc接收到watch回调事件,然后依据watch维度进行聚合,将事件追加到缓冲区watcherStream.buf之中,之后每轮循环都会将buf中的事件推送到outc(watch channel)当中,供上层endpointManager接收处理。
endpointManager.watch方法的监听协程在接收到来自watch channelwatch回调事件后,会将其分发到upch中,供应用方接收处理。
 
0条评论
0 / 1000
z****n
2文章数
0粉丝数
z****n
2 文章 | 0 粉丝
z****n
2文章数
0粉丝数
z****n
2 文章 | 0 粉丝
原创

etcd client接口代码分析

2023-08-30 03:25:20
48
0
1.cluster 
 
cluster结构体
 
type cluster struct { remote pb.ClusterClient callOpts []grpc.CallOption } ​
remotecluster的api集合,包含成员增删改查的功能调用接口:
type ClusterClient interface {
    
    MemberAdd(ctx context.Context, in *MemberAddRequest, opts ...grpc.CallOption) (*MemberAddResponse, error)
    
    MemberRemove(ctx context.Context, in *MemberRemoveRequest, opts ...grpc.CallOption) (*MemberRemoveResponse, error)
    
    MemberUpdate(ctx context.Context, in *MemberUpdateRequest, opts ...grpc.CallOption) (*MemberUpdateResponse, error)
​
    MemberList(ctx context.Context, in *MemberListRequest, opts ...grpc.CallOption) (*MemberListResponse, error)
    
    MemberPromote(ctx context.Context, in *MemberPromoteRequest, opts ...grpc.CallOption) (*MemberPromoteResponse, error)
}
该成员为一个接口,etcd提供两种实现方式。callOpts表示连接设置,该接口具有两个方法,分别为beforeafter
该成员为一个接口,etcd提供两种实现方式。callOpts表示连接设置,该接口具有两个方法,分别为before和after
type CallOption interface {
    before(*callInfo) error//在发送数据之前需要执行的检查
    after(*callInfo, *csAttempt)//在发送数据之后要执行的操作
}
根据数据类型的不同重写CallOption接口可以对不同数据执行不同操作。
 
接口
 
type Cluster interface包含以下函数:
MemberList(ctx context.Context) (*MemberListResponse, error)
返回当前cluster的成员列表
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
cluster中添加一个新成员
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
cluster中添加一个无投票权的成员
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
删除一个成员
MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
将无投票权成员晋升为可投票节点
这些函数调用type ClusterClient interface接口实现的方法,直接调用底层gRPC的方法向服务器传输指令。MemberAddMemberAddAsLearner稍有不同,这两个函数共用一个接口,通过一个bool变量控制是否有投票权,然后调用gRPC方法。
 
函数
func NewCluster(c *Client) Cluster //调用RetryClusterClient为原本不是集群中的节点新建一个集群 //根据c中的callOps配置连接设置
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster //将节点c添加进现有集群中,现有集群对应api为remote
 
2.op 
 
type opType intop使用int作为类型,支持四种类型
const (
    //defalut op的optype值为0,表示无效
    tRange opType = iota + 1
    tPut
    tDeleteRange
    tTxn
)
 
op结构体
 
type Op struct {
    t   opType
    key []byte
    end []byte
​
    // 用于查询
    limit        int64  
    sort         *SortOption
    serializable bool
    keysOnly     bool
    countOnly    bool
    minModRev    int64
    maxModRev    int64
    minCreateRev int64
    maxCreateRev int64
​
    // 用于查询和watch
    rev int64
​
    // 用于查询、watch和删除
    prevKV bool
​
    // 用于 watch
    //通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分
    // "--max-request-bytes" flag value + 512-byte
    fragment bool
​
    // 用于put
    ignoreValue bool
    ignoreLease bool
​
    // 用于进度更新
    progressNotify bool
    // 用于创建事件
    createdNotify bool
    // 用于watch的筛选器
    filterPut    bool
    filterDelete bool
​
    // 用于put
    val     []byte
    leaseID LeaseID
​
    // 用于事务三元组if then else
    cmps    []Cmp
    thenOps []Op
    elseOps []Op
​
    isOptsWithFromKey bool
    isOptsWithPrefix  bool
}
 
字段
解释
类型
用途
limit
y用于限制get请求返回结果的数量,如果值为0表示无限制。
int64
range
sort
用于选取get请求返回结果的排列方式
*SortOption
range
serializable
串行化读(默认为线性读liner)
bool
range
keysOnly
控制get请求只返回key而不返回值
bool
range
countOnly
控制get请求只返回key的计数而不返回值和key
bool
range
minModRev
用于过滤修改版本号小于给定值的key
int64
range
maxModRev
用于过滤修改版本号大于给定值的key
int64
range
minCreateRev
用于过滤创建版本号小于给定值的key
int64
range
maxCreateRev
用于过滤创建版本号大于给定值的key
int64
range
rev
用于get请求特定版本号的key或从给定的版本号开始watch
int64
watch range
prevKV
用于返回上一个值,如果上一个值已经持久化则不返回
bool
range watch delete
fragment
通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分"--max-request-bytes" flag value + 512-byte
bool
watch
ignoreValue
使用key的当前值更新当前key,用于刷新版本
bool
put
ignoreLease
使用key的当前lease更新当前key,即不用指定lease
bool
put
progressNotify
在客户端没有事件发送时也定时发送心跳
bool
进度更新
createdNotify
创建事件
bool
创健
filterPut
过滤掉put事件
bool
watch
filterDelete
过滤掉删除事件
bool
watch
val
[]byte
put
leaseID
租约
LeaseID
put
cmps
txn执行的条件
[]Cmp
txn
thenOps
cmps比较结果为true执行的操作集合
[]Op
txn
elseOps
cmps比较结果为false执行的操作集合
[]Op
txn
isOptsWithFromKey
范围操作开关
bool
txn
isOptsWithPrefix
前缀操作开关
bool
txn
 
3.KV 
 
接口
 
type KV interface {
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
    Do(ctx context.Context, op Op) (OpResponse, error)
    Txn(ctx context.Context) Txn
}
type KV interface接口包含以下函数
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
函数用来向数据库中添加键值对,支持字符串格式和字符数组格式。
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
用于查询数据库中指定key的value。
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
函数删除指定key。
可以使用WithRange(end)删除[key, end)中的所有key;
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
用于将key现在rev的value和上一个rev的value进行比较。
Do(ctx context.Context, op Op) (OpResponse, error)
函数用于执行上述所有函数的操作。
do函数是延时操作,即函数执行并不会立即向数据库请求或写入数据。例如put操作:
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
case tPut:
//...
        var resp *pb.PutResponse
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
//...
 }
函数首先根据op构建request结构体,将对应的key、value、lease等信息写入结构体;然后定义对应操作的response结构体用于接受请求结果。然后调用kvclient的remote接口中的put方法,试图与服务器建立连接。
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
    out := new(PutResponse)
    err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}
到remote中的put函数,将request中的数据通过invoke函数调用服务器上注册的对应服务,继而将数据写入数据库。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}
invoke函数建立客户端和服务器的流连接,发送request数据给服务器并接受来自服务器的response。
Txn(ctx context.Context)
Txn用于创建一个事务。
 
4.lease 
 
一个lease可以关联ETCD集群中的一个或多个key。当租约过期或者被撤销时,关联的key会被自动删除。
 
lease接口
 
type Lease interface接口包含以下函数:
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
创建一个新的lease
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
撤销一个存在的租约
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
检索给定id租约的相关信息
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
返回当前存在的所有租约
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
使给定id的租约无期限,永久有效如果信道拥堵,该函数的响应有可能会被丢弃,但KeepAlive请求会被服务器接受如果被KeepAlive的数据因某些情况中止,函数会返回ErrKeepAliveHalted并包含错误原因类似于多次调用KeepAliveOnce,每次函数执行都会返回一个response
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
用于续订一次租约,相当于keepAlive创建的信道返回一次消息。
Close() error
函数释放所有与etcd服务器相连的有关lease的资源。
 
lessor结构体
type lessor struct {
    mu sync.Mutex 
​
    donec   chan struct{}
    loopErr error
​
    remote pb.LeaseClient
​
    stream       pb.Lease_LeaseKeepAliveClient
    streamCancel context.CancelFunc
​
    stopCtx    context.Context
    stopCancel context.CancelFunc
​
    keepAlives map[LeaseID]*keepAlive
​
    firstKeepAliveTimeout time.Duration
​
    firstKeepAliveOnce sync.Once
​
    callOpts []grpc.CallOption
​
    lg *zap.Logger
}
lessor结构体用于保存所有lease相关的信息,每个连接服务器的client对应一个lessor管理租约相关信息。
 
字段
解释
类型
mu
同步锁
sync.Mutex
donec
recvKeepAliveLoop停止标志
chan struct{}
loopErr
recvKeepAliveLoop停止错误原因
error
remote
rpc中与服务器通信的接口
pb.LeaseClient
stream
keepalivelease与服务器通信接口
pb.Lease_LeaseKeepAliveClient
streamCancel
keepalive取消时context
context.CancelFunc
stopCtx
 
context.Context
stopCancel
 
context.CancelFunc
keepAlives
b保存活跃的keepalive连接
map[LeaseID]*keepAlive
firstKeepAliveTimeout
 
time.Duration
firstKeepAliveOnce
 
sync.Once
callOpts
通信配置
[]grpc.CallOption
lg
日志
*zap.Logger
 
keepAlive结构体
 
type keepAlive struct {
    chs  []chan<- *LeaseKeepAliveResponse
    ctxs []context.Context
    deadline time.Time
    nextKeepAlive time.Time
    donec chan struct{}
}
字段
解释
类型
chs
keepAlive的租约的保活信道
[]chan<- *LeaseKeepAliveResponse
ctxs
用来保存活跃租约的ctx
[]context.Context
deadline
租约保活信道关闭所需的无响应时间
time.Time
nextKeepAlive
两条保活消息之间的间隔
time.Time
donec
租约撤销时的通知信道
chan struct{}
 
函数
 
NewLeaseNewLeaseFromLeaseClient为需要lease服务的client创建一个lessor,lessor用于调用lease接口定义的函数,并存储该client中有关租约的所有信息。该函数由client调用。
Grant函数由lessor实现,向etcd服务器发送注册lease请求并创建responce结构体接收响应,如果服务器端未出错,则根据返回的resp创建leaseGrantResponce结构体返回给client。
Revoke函数由lessor实现,向etcd服务器发送撤销lease请求并接收响应,如果未出错,则将resp转为leaseRevokeResponse结构体返回client。
TimeToLive函数由lessor实现,用来查看lease存续的相关信息,向etcd服务器发送请求并接受响应,随后根据接收到的响应信息构建包括leaseID,lease创建\续约时间以及服务器设定的租约存续时间等信息,并返回给client。
leases函数是由lessor实现,用来查看当前存续的所有lease信息,根据服务器返回的信息构建LeaseStatus结构体用于存储所有lease的信息。
KeepAlive是一个多路复用的租约保活机制,可以同时将服务器返回的LeaseKeepAliveResponse发送到多个channel中。这个信道在创建租约时创建,读取行为由客户端定义。该函数具体方法解释如下。
1. 创建一个带缓存的*LeaseKeepAliveResponse channel,用于接收服务端的response。 2. 获取 lessor 的锁,通过select监听donec,如果这个channel被关闭,说明keepALive任务已停止,返回一个 ErrKeepAliveHalted 3. 判断当前的keepAlives map中是否有 `ID`的映射,如果没有,就创建一个新的`keepAlive` 对象,将ch和ctx传进去,并创建映射;如果有,就将 ch 和 ctx 直接加入到该 keepAlive`对象的 chs 和 ctxs slice中 4. 释放 lessor 的锁,并启动一个 keepAliveCtxCloser 协程,用于在 ctx 取消或租约过期、撤销时,关闭 ch 和清除 keepAlives map 中对应的 keepAlive 对象。 5. 如果这个方法是首次调用,会启动recvKeepAliveLoopdeadlineLoop协程,分别用于处理KeepALive响应;监测租约是否过期并执行清理操作 ​ 输入为ctx和租约id,返回值为返回LeaseKeepAliveResponse租约保活响应的信道;如果该信道被关闭,则返回对应的一个error表示租约已经结束。
keepAliveOnce函数用于执行一次续约操作,创建与服务器进行keepAlive通信的rpc服务,发送keepAliveRequest并接受回应,只需约一次。返回值区别于keepAliveLeaseKeepAliveResponse
recvKeepAliveLoop函数在keepAlive函数第一次启动时执行,用于循环接收来自服务器的keepAliveResponse,这也是keepAlivekeepAliveOnce的区别之处。
recvKeepAlive用于处理单次的keepAliveResponse,包括对租约进行更新等操作。
deadlibeLoop用于清理长时间未收到keepAliveResonse的租约对应的chan,每秒检查一次所有租约的deadline时间是否超时,超时则关闭。
sendKeepAliveLoop用于在给定的流的整个生命周期中持续发送keepAliveRequest
close用于关闭lessor提供的租约服务,通过关闭donec使得lessor执行的其他协程检测到关闭命令并结束运行。
 
5.txn 
 
该模块负责进行要求强一致性的数据读写操作,txn结构体实现了四个方法:
方法
用途
If
将比较条件加入txn结构体中
Then
将比较条件为true时要执行的操作加入txn结构体
Else
将比较条件为false时要执行的操作加入txn结构体
Commit
将组织好的txn结构体包装成TxnRequest结构体发送给服务器并返回服务器的响应
If、Else、Then三个函数并非执行函数,而是数据组织函数,因此这三个函数可以分段执行。
 
6.watch 
 
watch监视机制可以使得用户针对存储在etcd中特定范围的数据变化进行实时检视,在watch过程中,当对应数据发生变化时,etcd会根据watch记录追溯到用户,对变更事件进行同步。常见的使用场景有分布式锁和配置中心。
 
watcher结构体
watcher是客户端的监听回调模块,内置了用于与grpc服务端建立长连接的remote;并通过一个map类型字段streams记录了多组通过ctxKey映射和服务端间通信的长连接代理对象watchGrpcStream。
字段
类型
解释
remote
pb.WatchClient
与服务端建立长连接
callOpts
[]grpc.CallOption
监视条件
mu
sync.Mutex
同步锁
streams
map[string]*watchGrpcStream
长连接代理对象的映射map
lg
*zap.Logger
日志
 
watchGrpcStream结构体
 
watchGrpcStream是对etcd客户端和etcd服务端之间grpc长连接的抽象,同时也用以处理应用放创建/删除watch请求以及处理服务端watch回调事件的中枢模块。
字段
类型
解释
owner
*watcher
所有者
remote
pb.WatchClient
与服务端建立长连接
ctxKey
string
表示该watchGrpcStream的密钥
substreams
map[int64]*watcherStream
建立watchID与对应子处理流watcherStream之间的映射
resuming
[]*watcherStream
保留使用该流的所有watch
reqc
watchStreamRequest
面向客户端的信道
respc
*pb.WatchResponse
面向服务端的信道
 
watcherStream结构体
 
watcherStream 是对某个特定的watch的处理流的抽象。
字段
类型
解释
initReq
watchRequest
创建watch时的传递请求参数
outc
chan WatchResponse
将watch回调事件推到上层endpointManager中时使用的chan
recvc
chan *WatchResponse
用于接收来自watchGrpcStream分配的watch回调事件的chan
buf
[]*WatchResponse
用于缓存 watch 回调事件的缓冲区
 
watch创建链路过程分析(客户端)
 
watcher.Watch支持的context
类别
用途
context.WithBackground() / context.WithCancel()
最常用的ctx,设置此ctx,watch会保证底层连接创建成功,也就是会不断重试,直到连接成功或者ETCD服务端返回严重错误,或者调用ctx的cancel()方法,也可以强行结束watch
context.WithTimeout()
这个就是Timeout时间后,自动帮你调用cancel(),也就是timeout时间后,不管你的Watch流是正常还是异常,都会强行结束,一般不能用这种context
clientv3.WithRequireLeader(ctx)
在ctx中加入 RequireLeader 标志,这个标志处理的问题是:若ETCD服务集群发生网络分区,且你的客户端Watch流正好连接少数一方的节点,那Watch就卡住了,不会有响应,白白浪费资源。 加入这个标志,Watch在发现自己连接的节点是少数一方节点时,尝试重新连接到其它的节点,这样就自动的处理了分区带来的不可用问题
应用方通过etcd客户端向服务端发出创建watch请求:
 
 
endpointManager.NewWatchChannel方法是watch的入口,实现对endpointManager指定target范围的数据进行监听。
在该方法中,
  • 调用Client.Get方法,获取target数据范围的历史变更记录,允许调用方获得建立watch监听之前的历史变更记录。
  • 创建一个用于向调用方传递watch回调事件的upch(最上层信道),并将该信道返回给调用方
  • 异步启动endpointManager.watch方法,持续监听更底层提供的watch chan,监听到回调事件后将其推送给upch通知调用方。
endpointManager.watch该方法是endpointManager为一个watch启动的监听协程。
  • 通过m.client.Watch方法,获取到watch对应的watch channel
  • 通过基于for+select模型持续监听watch channel,接收来自底层的watch回调事件后,会把事件推送到upch中供上层接收。
watcher.Watch该方法为监听请求创建必要的资源,向服务器提交创建watcher的请求并等待返回watch channel返回给上层用户。每次调用watch都会创建一个watchRequest,经过层层流转发送到etcd服务端,并在服务端与客户端之间建立一条gRpcstream,多个watch共用该stream。
  • 如果长连接代理对象
watcherGrpcStream未初始化,调用watcher.newWatcherGrpcStream方法完成其初始化。watcherGrpcStream本身是有生命周期的,在watcher.newWatcherGrpcStream方法中,会异步开启一个协程负责watcherGrpcStream的运行。(创建或使用缓存的watchGrpcStream
  • 将本次创建/删除watch的请求送入
watcherGrpcStream的reqc中,watcherGrpcStream运行协程处理后发送给服务器(将WatchRequest发送到watchGrpcStream)。
  • 当创建watch请求处理完成后,服务器返回的
watch channel会通过该watch对象对应的watcherSubStream中的watchRequest.retc中传出。在此之前,watcher.Watch会持续监听watchRequest.retc直到返回了watch channelwatchGrpcStream会为WatchRequest生成chan WatchResponse用于接收结果并通过chan chan WatchResponse返回给用户)。
watchGrpcStream.run该方法用于创建管理watcher client的协程。
watchGrpcStream的运行协程中:
  • 通过watchGrpcStream.newWatchClient方法,创建与etcd服务端之间的通信长连接
  • 通过for+select模型,持续轮询处理来自上层的请求和来自下层的响应。
  • 持续从reqc中接收来自上层的请求,没遇到一个创建watch的请求,会调用watchGrpcStream.serveSubstream方法,异步启动一个服务于这个watch的subStream
  • 持续从respc中接收来自etcd服务端的响应事件,并调用watch.dispatchEvent方法,将事件分发给对应的watch的watch subStream
wartchGrpcStream.newWatchClient方法主要完成两项任务:
  • 同步调用watchGrpcStream.openWatchClient->watchClient.Watch方法链,建立和etcd服务端之间的通信长连接。
  • 异步启动watchGrpcStream.serveWatchClient方法,创建出一个接收协程,持续轮询grpc长连接,接收处理etcd服务端返回的响应.
当etcd服务端完成watch的处理后,对应的响应便会在接收协程中通过pb.Watch_WatchClient.Recv方法接收到,并将其投递到watchGrpcStream的respc中,供watchGrpcStream的运行协程接收处理。
watchGrpcStream.serveSubstream
当某个watch对应的subStream通过recvc接收到create类型的watchResponse,标志着服务端已经完成了对watch创建请求的处理,subStream会把watchGrpcStream方法中提前创建好的outc分发到subStream.initReq.retc中,这是endpointManager所需要的watch channel
watch回调链路过程分析(客户端)
当应用方设置的watch监听到数据的变化时,etcd服务端会通过创建过程中建立的grpc长连接将watch回调事件发送到etcd客户端。etcd客户端在接收到watch回调事件后,会一步步自底向上将该事件交付到对应的watch应用方手中。
watch回调事件的起点为etcd客户端接收协程serveWatchClient开始,以endpointManager通过对应的watch subStreamoutc(watch channel)接收到watch回调事件,并将其投递到upch中供应用放消费为终点。
watchGrpcStream.serveWatchClient
在常驻的接收协程watchGrpcStream.serveWatchClient的轮询过程中,如果通过grpc长连接接收到来自etcd客户端的watch回调事件,则会将其分发到watchGrpcStreamrespc中,供运行协程watchGrpcStream.run进行消费处理。
watchGrpcStream.run
watchGrpcStream的运行协程通过从resp接收到来自etcd服务端的回调事件后,会根据事件所属的watch维度对事件进行聚合。
case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } else if cur != nil && cur.WatchId == pbresp.WatchId {
                // merge new events
                cur.Events = append(cur.Events, pbresp.Events...)
                // update "Fragment" field; last response with "Fragment" == false
                cur.Fragment = pbresp.Fragment
如果cur==pbresp,即新收到的响应与上一次收到的响应所属同一个watch,则事件合并。
然后提供过watchGrpcStrem.dispatchEvent->watchFrpcStream.unicastReponse的方法链路将事件分发到对应的watch subStream的recvc当中。
其中,watchGrpcStrem.dispatchEvent方法用来将WatchResponse中的事件抽离出来并将pb.WatchResponse格式的响应转化成clientV3.WatchResponse格式的响应。并根据watchID分辨该响应是属于进度提醒还是watch响应,继而选择要调用的方法。watchGrpcStrem.unicastResponse方法用于检查响应对应的watchsubStream是否存在,如果存在,则将响应推送到该subStreamrecvc中,否则返回失败。
watchGrpcStream.serveSubstream
watch subStream的运行协程中,会通过recvc接收到watch回调事件,然后依据watch维度进行聚合,将事件追加到缓冲区watcherStream.buf之中,之后每轮循环都会将buf中的事件推送到outc(watch channel)当中,供上层endpointManager接收处理。
endpointManager.watch方法的监听协程在接收到来自watch channelwatch回调事件后,会将其分发到upch中,供应用方接收处理。
 
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
1
1