读写框架

为避免混淆,以下所有描述中,"key"、"value"表示结构体中的键值对,“键”、“值”表示从用户端存储的“key-value”数据对。
etcd中数据的整体结构由两部分组成,keyIndex存储由键到revision的映射,用于从用户给定的键查找对应版本号;boltdb存储revision到[键,值]的映射,用于通过版本号查找指定的数据信息。
这种设计满足了etcd多版本的特性,每个键可以保留多个版本的value。

keyIndex的数据结构中包含:
- key []byte 用于存储键
- modified revision 存储当前键值的最新版本号
- generation 存储键的生命周期信息
- ver 存储该声明周期内键修改次数
- createed revision 存储generation创建时的版本号
- revs 存储每次修改键后的revision
- main 全局递增的版本号
- sub 一个事务内的子版本号
接口分析
put
//treeIndex的Put函数
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)//范围搜索,所以可以keyi与tree中的键不完全相等。
if !ok {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.main, rev.sub)
}
//keyIndex的put函数
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64){
//省略日志处理部分(给定版本如果小于最新版本则返回错误)
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
//如果是新建的key,则在generation中新建一条
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 { // create a new key
keysGauge.Inc()
g.created = rev
}
//取出最新的一条generation,如果是新建key,则初始化rev
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
//插入一条版本,修改次数+1,最新版本修改为rev
}
put函数向treeIndex中插入新写入的键,这里分为两种情况:
- 创建键:创建一个新的keyIndex,key赋值为键值;向新建的keyIndex插入版本相关信息;最后插入tree
- 由于新键,所以没有generation,所以新键并插入rev
- 复写键:找到对应的keyIndex,写入版本信息。
get
//treeIndex的unsafeGet函数(Get函数实际调用接口)
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}//根据参数key设置初始化keyIndex结构体
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}//判断是否存在该key
return keyi.get(ti.lg, atRev)//调用keyIndex的get函数
}
//keyIndex的get函数
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}//判空
g := ki.findGeneration(atRev)//根据atRev找到对应的generation,通过比对每个generation的第一个版本号和最后一个版本号确定atRev是否在当前generation中。
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
//判空
n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
//遍历g,找到第一个小于等于atRev的版本。返回该版本的相关信息
return revision{}, revision{}, 0, ErrRevisionNotFound
//未找到则返回空和错误
}
get的最上层输入是key和atRev,key用来在treeIndex寻找keyIndex,atRev用于在keyIndex中寻找符合条件的版本,最终得到的输出是key对应的最新版本号、创建版本号、修改次数信息,后续得到value依靠的是最新版本号去boltdb中读取。
这里需要注意的是,参数atRev并非是找到key的atRev版本,而是找到key小于且最接近atRev的版本。
since
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
if ki.isEmpty() {
lg.Panic(
"'since' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
//判空
since := revision{rev, 0}
var gi int
// find the generations to start checking
for gi = len(ki.generations) - 1; gi > 0; gi-- {
g := ki.generations[gi]
if g.isEmpty() {
continue
}
if since.GreaterThan(g.created) {
break
}
}
//找到创建版本号小于rev的generation,从该g开始遍历
var revs []revision
var last int64
for ; gi < len(ki.generations); gi++ {
for _, r := range ki.generations[gi].revs {
if since.GreaterThan(r) {
continue
}
if r.main == last {
//在一个事务中对同一个键值对多次操作会导致出现多个版本main相同的rev,因此需要取子版本号sub最大的返回
revs[len(revs)-1] = r
continue
}
revs = append(revs, r)
last = r.main
}
}
return revs
}
这是一个未用到的函数接口,其引用仅在key_index_test.go中存在。该函数的作用是找到给定rev之后的所有版本,如果存在一个tx中对该键的多次操作导致的rev相同,则返回子版本sub最大的版本。
compact
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error)
- 创建信道,用于通知compact结果;
- 创建一个调度任务:
- 上下文检查;
- 执行scheduleCompaction函数,获取返回的hash;
- 验证上次compact的hash是否计算完毕,如果计算完毕则存储当前hash结果,否则跳过(为了保证数据的正确性和连续性);
- 关闭信道。
- 将这个调度任务加入fifo队列;
- 返回信道和错误。
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error)
- 压缩操作初始化,设置计时;
- 执行treeIndex的compact函数,获取需要保留的rev的map keep;
- 设置需要从内存中读取的数据的范围,[last,end],last为上次compact的版本,即现存的最小版本,end为命令中指定的版本compactMainRev+1;
- 进入循环处理batch:
- 从schema中取出一批Key,这些数据的格式为key:Key ,value:rev;
- 将取出的rev依次与keep中存储的rev进行比较,如果存在于keep中保留,否则删除(这一部分删除的是数据链中rev->value这部分)
- 对于保留的数据将写入新建的hash中
- 如果获取的keys的长度小于batchNum,说明已经将所有数据写入hash,返回h
- 否则将last设置为最后一个这一批最后一个rev,将这批事务提交,等待ticker。
func (ti *treeIndex) Compact(rev int64) map[revision]struct{}
- 创建available 类型为revision map,用于存储需要保留的key的rev
- 给tree上锁并克隆,之后所有的操作在克隆出来的tree上进行操作,目的是为了保证在compact期间不影响集群正常功能。
- 对克隆出来的tree上的每一个keyIndex做一下操作
- 判空。如果当前keyIndex为空,则报错;
- 调用doCompacy函数,获取需要压缩的genIdx和revIndex信息
- 定义一个比较函数f,逻辑为如果当前rev小于给定的atRev,则将其加入available中,并返回false,否则返回true;其目的在于通过倒序遍历generation中的revs,找到第一个小于等于atRev的rev,并保留该rev。
- 初始化genIdx和g,genIdx为g的指示器,g为当前遍历的代;找到当前keyIndex中存在大于等于atRev的rev的generation,则该generation之前的g都应该被删除。
- 调用walk(f),对当前g中的每个rev执行f操作;倒序遍历当前g,找到第一个小于等于atRev的rev,并返回其在g中的位置;如果不存在这样的rev,则返回-1;
- 返回genIdx和revIndex
- 如果genIdx指示的g不为空,则
- 如果revIndex不等于-1,即表示g中存在小于等于atRev的rev,则将revIndex之前的版本全部删除;
- 如果g.revs的长度为1并且当前g不是该key的最后一代,说明该key已被放置墓碑,则将available中的该key对应的rev删除,表示该rev不需要保留,并将该genIdx+1.
- 将keyIndex的generations根据当前的genIdx重新赋值。
- 返回需要保留的rev的map available。
keep
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
return
}
//判空
genIdx, revIndex := ki.doCompact(atRev, available)
//调用docompact得到需要压缩的generation和rev
g := &ki.generations[genIdx]
if !g.isEmpty() {
//删除保留版本数组中被放置了墓碑的版本
if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[revIndex])
}
}
}
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool {
keyi.keep(rev, available)
return true
})
return available
}
keep函数用于返回按照atRev版本进行compact时需要保留的keyIndex中的rev,与compact函数逻辑基本相同,只是不对keyIndex进行删除。
treeIndex中的keep函数也与compact函数逻辑基本相同,只是不需要克隆treeIndex,只需遍历当前treeIndex。
Restore
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
if len(ki.generations) != 0 {
lg.Panic(
"'restore' got an unexpected non-empty generations",
zap.Int("generations-size", len(ki.generations)),
)
}
//如果generations不为空,则报错不可恢复
ki.modified = modified//赋值最新版本
g := generation{created: created, ver: ver, revs: []revision{modified}}
ki.generations = append(ki.generations, g)//把给定恢复的generation插入
keysGauge.Inc()
}
restore将数据恢复到指定版本
tombstone
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
//如果没有该key,则报错
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
//防止重复置空
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}
墓碑函数,用于指示该keyindex对应的key已经删除