1.cluster
cluster结构体
type cluster struct { remote pb.ClusterClient callOpts []grpc.CallOption }
remote
为cluster
的api集合,包含成员增删改查的功能调用接口:type ClusterClient interface {
MemberAdd(ctx context.Context, in *MemberAddRequest, opts ...grpc.CallOption) (*MemberAddResponse, error)
MemberRemove(ctx context.Context, in *MemberRemoveRequest, opts ...grpc.CallOption) (*MemberRemoveResponse, error)
MemberUpdate(ctx context.Context, in *MemberUpdateRequest, opts ...grpc.CallOption) (*MemberUpdateResponse, error)
MemberList(ctx context.Context, in *MemberListRequest, opts ...grpc.CallOption) (*MemberListResponse, error)
MemberPromote(ctx context.Context, in *MemberPromoteRequest, opts ...grpc.CallOption) (*MemberPromoteResponse, error)
}
该成员为一个接口,etcd提供两种实现方式。
callOpts
表示连接设置,该接口具有两个方法,分别为before
和after
该成员为一个接口,etcd提供两种实现方式。callOpts表示连接设置,该接口具有两个方法,分别为before和after
type CallOption interface {
before(*callInfo) error//在发送数据之前需要执行的检查
after(*callInfo, *csAttempt)//在发送数据之后要执行的操作
}
根据数据类型的不同重写
CallOption
接口可以对不同数据执行不同操作。接口
type Cluster interface
包含以下函数:MemberList(ctx context.Context) (*MemberListResponse, error)
返回当前
cluster
的成员列表MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
向
cluster
中添加一个新成员MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
向
cluster
中添加一个无投票权的成员MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
删除一个成员
MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
将无投票权成员晋升为可投票节点
这些函数调用
type ClusterClient interface
接口实现的方法,直接调用底层gRPC的方法向服务器传输指令。MemberAdd
和MemberAddAsLearne
r稍有不同,这两个函数共用一个接口,通过一个bool变量控制是否有投票权,然后调用gRPC方法。函数
func NewCluster(c *Client) Cluster //调用RetryClusterClient为原本不是集群中的节点新建一个集群 //根据c中的callOps配置连接设置
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster //将节点c添加进现有集群中,现有集群对应api为remote
2.op
type opType intop
使用int
作为类型,支持四种类型const (
//defalut op的optype值为0,表示无效
tRange opType = iota + 1
tPut
tDeleteRange
tTxn
)
op结构体
type Op struct {
t opType
key []byte
end []byte
// 用于查询
limit int64
sort *SortOption
serializable bool
keysOnly bool
countOnly bool
minModRev int64
maxModRev int64
minCreateRev int64
maxCreateRev int64
// 用于查询和watch
rev int64
// 用于查询、watch和删除
prevKV bool
// 用于 watch
//通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分
// "--max-request-bytes" flag value + 512-byte
fragment bool
// 用于put
ignoreValue bool
ignoreLease bool
// 用于进度更新
progressNotify bool
// 用于创建事件
createdNotify bool
// 用于watch的筛选器
filterPut bool
filterDelete bool
// 用于put
val []byte
leaseID LeaseID
// 用于事务三元组if then else
cmps []Cmp
thenOps []Op
elseOps []Op
isOptsWithFromKey bool
isOptsWithPrefix bool
}
字段
|
解释
|
类型
|
用途
|
limit
|
y用于限制get请求返回结果的数量,如果值为0表示无限制。
|
int64
|
range
|
sort
|
用于选取get请求返回结果的排列方式
|
*SortOption
|
range
|
serializable
|
串行化读(默认为线性读liner)
|
bool
|
range
|
keysOnly
|
控制get请求只返回key而不返回值
|
bool
|
range
|
countOnly
|
控制get请求只返回key的计数而不返回值和key
|
bool
|
range
|
minModRev
|
用于过滤修改版本号小于给定值的key
|
int64
|
range
|
maxModRev
|
用于过滤修改版本号大于给定值的key
|
int64
|
range
|
minCreateRev
|
用于过滤创建版本号小于给定值的key
|
int64
|
range
|
maxCreateRev
|
用于过滤创建版本号大于给定值的key
|
int64
|
range
|
rev
|
用于get请求特定版本号的key或从给定的版本号开始watch
|
int64
|
watch range
|
prevKV
|
用于返回上一个值,如果上一个值已经持久化则不返回
|
bool
|
range watch delete
|
fragment
|
通常禁用,如果不禁用,则当watch请求超过下值时对watch请求进行拆分"--max-request-bytes" flag value + 512-byte
|
bool
|
watch
|
ignoreValue
|
使用key的当前值更新当前key,用于刷新版本
|
bool
|
put
|
ignoreLease
|
使用key的当前lease更新当前key,即不用指定lease
|
bool
|
put
|
progressNotify
|
在客户端没有事件发送时也定时发送心跳
|
bool
|
进度更新
|
createdNotify
|
创建事件
|
bool
|
创健
|
filterPut
|
过滤掉put事件
|
bool
|
watch
|
filterDelete
|
过滤掉删除事件
|
bool
|
watch
|
val
|
值
|
[]byte
|
put
|
leaseID
|
租约
|
LeaseID
|
put
|
cmps
|
txn执行的条件
|
[]Cmp
|
txn
|
thenOps
|
cmps比较结果为true执行的操作集合
|
[]Op
|
txn
|
elseOps
|
cmps比较结果为false执行的操作集合
|
[]Op
|
txn
|
isOptsWithFromKey
|
范围操作开关
|
bool
|
txn
|
isOptsWithPrefix
|
前缀操作开关
|
bool
|
txn
|
3.KV
接口
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
type KV interface
接口包含以下函数Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
函数用来向数据库中添加键值对,支持字符串格式和字符数组格式。
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
用于查询数据库中指定key的value。
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
函数删除指定key。
可以使用
WithRange(end)
删除[key, end)中的所有key;Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
用于将key现在rev的value和上一个rev的value进行比较。
Do(ctx context.Context, op Op) (OpResponse, error)
函数用于执行上述所有函数的操作。
do函数是延时操作,即函数执行并不会立即向数据库请求或写入数据。例如put操作:
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
case tPut:
//...
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
//...
}
函数首先根据op构建request结构体,将对应的key、value、lease等信息写入结构体;然后定义对应操作的response结构体用于接受请求结果。然后调用kvclient的remote接口中的put方法,试图与服务器建立连接。
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
out := new(PutResponse)
err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
到remote中的put函数,将request中的数据通过invoke函数调用服务器上注册的对应服务,继而将数据写入数据库。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
invoke函数建立客户端和服务器的流连接,发送request数据给服务器并接受来自服务器的response。
Txn(ctx context.Context)
Txn用于创建一个事务。
4.lease
一个lease可以关联ETCD集群中的一个或多个key。当租约过期或者被撤销时,关联的key会被自动删除。
lease接口
type Lease interface接口包含以下函数:
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
创建一个新的lease
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
撤销一个存在的租约
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
检索给定id租约的相关信息
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
返回当前存在的所有租约
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
使给定id的租约无期限,永久有效如果信道拥堵,该函数的响应有可能会被丢弃,但KeepAlive请求会被服务器接受如果被KeepAlive的数据因某些情况中止,函数会返回ErrKeepAliveHalted并包含错误原因类似于多次调用KeepAliveOnce,每次函数执行都会返回一个response
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
用于续订一次租约,相当于keepAlive创建的信道返回一次消息。
Close() error
函数释放所有与etcd服务器相连的有关lease的资源。
lessor结构体
type lessor struct {
mu sync.Mutex
donec chan struct{}
loopErr error
remote pb.LeaseClient
stream pb.Lease_LeaseKeepAliveClient
streamCancel context.CancelFunc
stopCtx context.Context
stopCancel context.CancelFunc
keepAlives map[LeaseID]*keepAlive
firstKeepAliveTimeout time.Duration
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
lg *zap.Logger
}
lessor结构体用于保存所有lease相关的信息,每个连接服务器的client对应一个lessor管理租约相关信息。
字段
|
解释
|
类型
|
mu
|
同步锁
|
sync.Mutex
|
donec
|
recvKeepAliveLoop停止标志
|
chan struct{}
|
loopErr
|
recvKeepAliveLoop停止错误原因
|
error
|
remote
|
rpc中与服务器通信的接口
|
pb.LeaseClient
|
stream
|
keepalivelease与服务器通信接口
|
pb.Lease_LeaseKeepAliveClient
|
streamCancel
|
keepalive取消时context
|
context.CancelFunc
|
stopCtx
|
context.Context
|
|
stopCancel
|
context.CancelFunc
|
|
keepAlives
|
b保存活跃的keepalive连接
|
map[LeaseID]*keepAlive
|
firstKeepAliveTimeout
|
time.Duration
|
|
firstKeepAliveOnce
|
sync.Once
|
|
callOpts
|
通信配置
|
[]grpc.CallOption
|
lg
|
日志
|
*zap.Logger
|
keepAlive结构体
type keepAlive struct {
chs []chan<- *LeaseKeepAliveResponse
ctxs []context.Context
deadline time.Time
nextKeepAlive time.Time
donec chan struct{}
}
字段
|
解释
|
类型
|
chs
|
keepAlive的租约的保活信道
|
[]chan<- *LeaseKeepAliveResponse
|
ctxs
|
用来保存活跃租约的ctx
|
[]context.Context
|
deadline
|
租约保活信道关闭所需的无响应时间
|
time.Time
|
nextKeepAlive
|
两条保活消息之间的间隔
|
time.Time
|
donec
|
租约撤销时的通知信道
|
chan struct{}
|
函数
NewLease
和NewLeaseFromLeaseClient
为需要lease服务的client创建一个lessor,lessor用于调用lease接口定义的函数,并存储该client中有关租约的所有信息。该函数由client调用。Grant函数由lessor实现,向etcd服务器发送注册lease请求并创建responce结构体接收响应,如果服务器端未出错,则根据返回的resp创建
leaseGrantResponce
结构体返回给client。Revoke函数由lessor实现,向etcd服务器发送撤销lease请求并接收响应,如果未出错,则将resp转为
leaseRevokeResponse
结构体返回client。TimeToLive函数由lessor实现,用来查看lease存续的相关信息,向etcd服务器发送请求并接受响应,随后根据接收到的响应信息构建包括leaseID,lease创建\续约时间以及服务器设定的租约存续时间等信息,并返回给client。
leases函数是由lessor实现,用来查看当前存续的所有lease信息,根据服务器返回的信息构建
LeaseStatus
结构体用于存储所有lease的信息。KeepAlive是一个多路复用的租约保活机制,可以同时将服务器返回的
LeaseKeepAliveResponse
发送到多个channel中。这个信道在创建租约时创建,读取行为由客户端定义。该函数具体方法解释如下。1. 创建一个带缓存的
*LeaseKeepAliveResponse channel
,用于接收服务端的response。 2. 获取 lessor 的锁,通过select监听donec
,如果这个channel被关闭,说明keepALive
任务已停止,返回一个 ErrKeepAliveHalted
3. 判断当前的keepAlives map
中是否有 `ID`的映射,如果没有,就创建一个新的`keepAlive` 对象,将ch和ctx传进去,并创建映射;如果有,就将 ch 和 ctx 直接加入到该 keepAlive
`对象的 chs 和 ctxs slice中 4. 释放 lessor 的锁,并启动一个 keepAliveCtxCloser 协程,用于在 ctx 取消或租约过期、撤销时,关闭 ch 和清除 keepAlives map 中对应的 keepAlive 对象。 5. 如果这个方法是首次调用,会启动recvKeepAliveLoop
和deadlineLoop
协程,分别用于处理KeepALive响应;监测租约是否过期并执行清理操作 输入为ctx和租约id,返回值为返回LeaseKeepAliveResponse
租约保活响应的信道;如果该信道被关闭,则返回对应的一个error表示租约已经结束。keepAliveOnce函数用于执行一次续约操作,创建与服务器进行keepAlive通信的rpc服务,发送keepAliveRequest并接受回应,只需约一次。返回值区别于
keepAlive
为LeaseKeepAliveResponse
。recvKeepAliveLoop
函数在keepAlive
函数第一次启动时执行,用于循环接收来自服务器的keepAliveResponse
,这也是keepAlive
和keepAliveOnce
的区别之处。recvKeepAlive
用于处理单次的keepAliveResponse
,包括对租约进行更新等操作。deadlibeLoop
用于清理长时间未收到keepAliveResonse
的租约对应的chan,每秒检查一次所有租约的deadline时间是否超时,超时则关闭。sendKeepAliveLoop
用于在给定的流的整个生命周期中持续发送keepAliveRequest
。close用于关闭lessor提供的租约服务,通过关闭donec使得lessor执行的其他协程检测到关闭命令并结束运行。
5.txn
该模块负责进行要求强一致性的数据读写操作,txn结构体实现了四个方法:
方法
|
用途
|
If
|
将比较条件加入txn结构体中
|
Then
|
将比较条件为true时要执行的操作加入txn结构体
|
Else
|
将比较条件为false时要执行的操作加入txn结构体
|
Commit
|
将组织好的txn结构体包装成TxnRequest结构体发送给服务器并返回服务器的响应
|
If、Else、Then三个函数并非执行函数,而是数据组织函数,因此这三个函数可以分段执行。
6.watch
watch监视机制可以使得用户针对存储在etcd中特定范围的数据变化进行实时检视,在watch过程中,当对应数据发生变化时,etcd会根据watch记录追溯到用户,对变更事件进行同步。常见的使用场景有分布式锁和配置中心。
watcher结构体
watcher是客户端的监听回调模块,内置了用于与grpc服务端建立长连接的remote;并通过一个map类型字段streams记录了多组通过ctxKey映射和服务端间通信的长连接代理对象watchGrpcStream。
字段
|
类型
|
解释
|
remote
|
pb.WatchClient
|
与服务端建立长连接
|
callOpts
|
[]grpc.CallOption
|
监视条件
|
mu
|
sync.Mutex
|
同步锁
|
streams
|
map[string]*watchGrpcStream
|
长连接代理对象的映射map
|
lg
|
*zap.Logger
|
日志
|
watchGrpcStream结构体
watchGrpcStream是对etcd客户端和etcd服务端之间grpc长连接的抽象,同时也用以处理应用放创建/删除watch请求以及处理服务端watch回调事件的中枢模块。
字段
|
类型
|
解释
|
owner
|
*watcher
|
所有者
|
remote
|
pb.WatchClient
|
与服务端建立长连接
|
ctxKey
|
string
|
表示该watchGrpcStream的密钥
|
substreams
|
map[int64]*watcherStream
|
建立watchID与对应子处理流watcherStream之间的映射
|
resuming
|
[]*watcherStream
|
保留使用该流的所有watch
|
reqc
|
watchStreamRequest
|
面向客户端的信道
|
respc
|
*pb.WatchResponse
|
面向服务端的信道
|
watcherStream结构体
watcherStream 是对某个特定的watch的处理流的抽象。
字段
|
类型
|
解释
|
initReq
|
watchRequest
|
创建watch时的传递请求参数
|
outc
|
chan WatchResponse
|
将watch回调事件推到上层endpointManager中时使用的chan
|
recvc
|
chan *WatchResponse
|
用于接收来自watchGrpcStream分配的watch回调事件的chan
|
buf
|
[]*WatchResponse
|
用于缓存 watch 回调事件的缓冲区
|
watch创建链路过程分析(客户端)
watcher.Watch支持的context
类别
|
用途
|
context.WithBackground() / context.WithCancel()
|
最常用的ctx,设置此ctx,watch会保证底层连接创建成功,也就是会不断重试,直到连接成功或者ETCD服务端返回严重错误,或者调用ctx的cancel()方法,也可以强行结束watch
|
context.WithTimeout()
|
这个就是Timeout时间后,自动帮你调用cancel(),也就是timeout时间后,不管你的Watch流是正常还是异常,都会强行结束,一般不能用这种context
|
clientv3.WithRequireLeader(ctx)
|
在ctx中加入 RequireLeader 标志,这个标志处理的问题是:若ETCD服务集群发生网络分区,且你的客户端Watch流正好连接少数一方的节点,那Watch就卡住了,不会有响应,白白浪费资源。 加入这个标志,Watch在发现自己连接的节点是少数一方节点时,尝试重新连接到其它的节点,这样就自动的处理了分区带来的不可用问题
|
应用方通过etcd客户端向服务端发出创建watch请求:

endpointManager.NewWatchChannel
方法是watch的入口,实现对endpointManager
指定target范围的数据进行监听。在该方法中,
- 调用
Client.Get
方法,获取target数据范围的历史变更记录,允许调用方获得建立watch监听之前的历史变更记录。 - 创建一个用于向调用方传递watch回调事件的upch(最上层信道),并将该信道返回给调用方
- 异步启动
endpointManager.watch
方法,持续监听更底层提供的watch chan,监听到回调事件后将其推送给upch通知调用方。
endpointManager.watch
该方法是endpointManager
为一个watch启动的监听协程。- 通过
m.client.Watch
方法,获取到watch对应的watch channel
- 通过基于for+select模型持续监听watch channel,接收来自底层的watch回调事件后,会把事件推送到upch中供上层接收。
watcher.Watch
该方法为监听请求创建必要的资源,向服务器提交创建watcher的请求并等待返回watch channel
返回给上层用户。每次调用watch都会创建一个watchRequest
,经过层层流转发送到etcd服务端,并在服务端与客户端之间建立一条gRpcstream,多个watch共用该stream。- 如果长连接代理对象
watcherGrpcStream
未初始化,调用watcher.newWatcherGrpcStream
方法完成其初始化。watcherGrpcStream
本身是有生命周期的,在watcher.newWatcherGrpcStream
方法中,会异步开启一个协程负责watcherGrpcStream
的运行。(创建或使用缓存的watchGrpcStream
)- 将本次创建/删除watch的请求送入
watcherGrpcStream
的reqc中,watcherGrpcStream
运行协程处理后发送给服务器(将WatchRequest
发送到watchGrpcStream
)。- 当创建watch请求处理完成后,服务器返回的
watch channel
会通过该watch对象对应的watcherSubStream
中的watchRequest.retc
中传出。在此之前,watcher.Watch
会持续监听watchRequest.retc
直到返回了watch channel
(watchGrpcStream
会为WatchRequest
生成chan WatchResponse
用于接收结果并通过chan chan WatchResponse
返回给用户)。watchGrpcStream.run
该方法用于创建管理watcher client
的协程。在
watchGrpcStream
的运行协程中:- 通过
watchGrpcStream.newWatchClient
方法,创建与etcd服务端之间的通信长连接
- 通过for+select模型,持续轮询处理来自上层的请求和来自下层的响应。
- 持续从reqc中接收来自上层的请求,没遇到一个创建watch的请求,会调用
watchGrpcStream.serveSubstream
方法,异步启动一个服务于这个watch的subStream - 持续从respc中接收来自etcd服务端的响应事件,并调用
watch.dispatchEvent
方法,将事件分发给对应的watch的watch subStream
。
wartchGrpcStream.newWatchClient
方法主要完成两项任务:- 同步调用
watchGrpcStream.openWatchClient->watchClient.Watch
方法链,建立和etcd服务端之间的通信长连接。
- 异步启动
watchGrpcStream.serveWatchClient
方法,创建出一个接收协程,持续轮询grpc长连接,接收处理etcd服务端返回的响应.
当etcd服务端完成watch的处理后,对应的响应便会在接收协程中通过
pb.Watch_WatchClient.Recv
方法接收到,并将其投递到watchGrpcStream
的respc中,供watchGrpcStream
的运行协程接收处理。watchGrpcStream.serveSubstream
当某个watch对应的subStream通过recvc接收到create类型的
watchResponse
,标志着服务端已经完成了对watch创建请求的处理,subStream
会把watchGrpcStream
方法中提前创建好的outc分发到subStream.initReq.retc
中,这是endpointManager
所需要的watch channel
。watch回调链路过程分析(客户端)
当应用方设置的watch监听到数据的变化时,etcd服务端会通过创建过程中建立的grpc长连接将watch回调事件发送到etcd客户端。etcd客户端在接收到watch回调事件后,会一步步自底向上将该事件交付到对应的watch应用方手中。
watch回调事件的起点为etcd客户端接收协程serveWatchClient开始,以endpointManager通过对应的
watch subStream
的outc(watch channel)
接收到watch回调事件,并将其投递到upch中供应用放消费为终点。watchGrpcStream.serveWatchClient
在常驻的接收协程
watchGrpcStream.serveWatchClient
的轮询过程中,如果通过grpc长连接接收到来自etcd客户端的watch回调事件,则会将其分发到watchGrpcStream
的respc
中,供运行协程watchGrpcStream.run
进行消费处理。watchGrpcStream.run
在
watchGrpcStream
的运行协程通过从resp接收到来自etcd服务端的回调事件后,会根据事件所属的watch维度对事件进行聚合。case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events
cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "Fragment" == false
cur.Fragment = pbresp.Fragment
如果
cur==pbresp
,即新收到的响应与上一次收到的响应所属同一个watch,则事件合并。然后提供过
watchGrpcStrem.dispatchEvent->watchFrpcStream.unicastReponse
的方法链路将事件分发到对应的watch subStream
的recvc当中。其中,
watchGrpcStrem.dispatchEvent
方法用来将WatchResponse
中的事件抽离出来并将pb.WatchResponse
格式的响应转化成clientV3.WatchResponse
格式的响应。并根据watchID分辨该响应是属于进度提醒还是watch响应,继而选择要调用的方法。watchGrpcStrem.unicastResponse
方法用于检查响应对应的watch
的subStream
是否存在,如果存在,则将响应推送到该subStream
的recvc
中,否则返回失败。watchGrpcStream.serveSubstream
watch subStream
的运行协程中,会通过recvc
接收到watch
回调事件,然后依据watch维度进行聚合,将事件追加到缓冲区watcherStream.buf
之中,之后每轮循环都会将buf中的事件推送到outc(watch channel)
当中,供上层endpointManager
接收处理。endpointManager.watch
方法的监听协程在接收到来自watch channel
的watch
回调事件后,会将其分发到upch
中,供应用方接收处理。