searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

etcd索引treeIndex接口分析

2023-09-04 03:09:17
13
0
读写框架
 
为避免混淆,以下所有描述中,"key"、"value"表示结构体中的键值对,“键”、“值”表示从用户端存储的“key-value”数据对。
etcd中数据的整体结构由两部分组成,keyIndex存储由键到revision的映射,用于从用户给定的键查找对应版本号;boltdb存储revision到[键,值]的映射,用于通过版本号查找指定的数据信息。
这种设计满足了etcd多版本的特性,每个键可以保留多个版本的value。
 
 
keyIndex的数据结构中包含:
  • key []byte 用于存储键
  • modified revision 存储当前键值的最新版本号
  • generation 存储键的生命周期信息
    • ver 存储该声明周期内键修改次数
    • createed revision 存储generation创建时的版本号
    • revs 存储每次修改键后的revision
      • main 全局递增的版本号
      • sub 一个事务内的子版本号
接口分析
put
//treeIndex的Put函数
func (ti *treeIndex) Put(key []byte, rev revision) {
    keyi := &keyIndex{key: key}

    ti.Lock()
    defer ti.Unlock()
    okeyi, ok := ti.tree.Get(keyi)//范围搜索,所以可以keyi与tree中的键不完全相等。
    if !ok {
        keyi.put(ti.lg, rev.main, rev.sub)
        ti.tree.ReplaceOrInsert(keyi)
        return
    }
    okeyi.put(ti.lg, rev.main, rev.sub)
}
//keyIndex的put函数
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64){
 //省略日志处理部分(给定版本如果小于最新版本则返回错误)  
  if len(ki.generations) == 0 {
        ki.generations = append(ki.generations, generation{})
    }
    //如果是新建的key,则在generation中新建一条
    
    g := &ki.generations[len(ki.generations)-1]
    if len(g.revs) == 0 { // create a new key
        keysGauge.Inc()
        g.created = rev
    }
    //取出最新的一条generation,如果是新建key,则初始化rev
    g.revs = append(g.revs, rev)
    g.ver++
    ki.modified = rev
    //插入一条版本,修改次数+1,最新版本修改为rev
}
 
put函数向treeIndex中插入新写入的键,这里分为两种情况:
  • 创建键:创建一个新的keyIndex,key赋值为键值;向新建的keyIndex插入版本相关信息;最后插入tree
    • 由于新键,所以没有generation,所以新键并插入rev
  • 复写键:找到对应的keyIndex,写入版本信息。
get
//treeIndex的unsafeGet函数(Get函数实际调用接口)
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
    keyi := &keyIndex{key: key}//根据参数key设置初始化keyIndex结构体
    if keyi = ti.keyIndex(keyi); keyi == nil {
        return revision{}, revision{}, 0, ErrRevisionNotFound
    }//判断是否存在该key
    return keyi.get(ti.lg, atRev)//调用keyIndex的get函数
}

//keyIndex的get函数
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
    if ki.isEmpty() {
        lg.Panic(
            "'get' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }//判空
    g := ki.findGeneration(atRev)//根据atRev找到对应的generation,通过比对每个generation的第一个版本号和最后一个版本号确定atRev是否在当前generation中。
    if g.isEmpty() {
        return revision{}, revision{}, 0, ErrRevisionNotFound
    }
    //判空
    n := g.walk(func(rev revision) bool { return rev.main > atRev })
    if n != -1 {
        return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
    }
    //遍历g,找到第一个小于等于atRev的版本。返回该版本的相关信息
    return revision{}, revision{}, 0, ErrRevisionNotFound
    //未找到则返回空和错误
}
 
get的最上层输入是key和atRev,key用来在treeIndex寻找keyIndex,atRev用于在keyIndex中寻找符合条件的版本,最终得到的输出是key对应的最新版本号、创建版本号、修改次数信息,后续得到value依靠的是最新版本号去boltdb中读取。
这里需要注意的是,参数atRev并非是找到key的atRev版本,而是找到key小于且最接近atRev的版本。
 
since
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
    if ki.isEmpty() {
        lg.Panic(
            "'since' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }
    //判空
    since := revision{rev, 0}
    var gi int
    // find the generations to start checking
    for gi = len(ki.generations) - 1; gi > 0; gi-- {
        g := ki.generations[gi]
        if g.isEmpty() {
            continue
        }
        if since.GreaterThan(g.created) {
            break
        }
    }
    //找到创建版本号小于rev的generation,从该g开始遍历
    var revs []revision
    var last int64
    for ; gi < len(ki.generations); gi++ {
        for _, r := range ki.generations[gi].revs {
            if since.GreaterThan(r) {
                continue
            }
            if r.main == last {
                //在一个事务中对同一个键值对多次操作会导致出现多个版本main相同的rev,因此需要取子版本号sub最大的返回
                revs[len(revs)-1] = r
                continue
            }
            revs = append(revs, r)
            last = r.main
        }
    }
    return revs
}
这是一个未用到的函数接口,其引用仅在key_index_test.go中存在。该函数的作用是找到给定rev之后的所有版本,如果存在一个tx中对该键的多次操作导致的rev相同,则返回子版本sub最大的版本。
 
compact
 
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error)
  1. 创建信道,用于通知compact结果;
  2. 创建一个调度任务:
    1. 上下文检查;
    2. 执行scheduleCompaction函数,获取返回的hash;
    3. 验证上次compact的hash是否计算完毕,如果计算完毕则存储当前hash结果,否则跳过(为了保证数据的正确性和连续性);
    4. 关闭信道。
  3. 将这个调度任务加入fifo队列;
  4. 返回信道和错误。
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error)
  1. 压缩操作初始化,设置计时;
  2. 执行treeIndex的compact函数,获取需要保留的rev的map keep;
  3. 设置需要从内存中读取的数据的范围,[last,end],last为上次compact的版本,即现存的最小版本,end为命令中指定的版本compactMainRev+1;
  4. 进入循环处理batch:
    1. 从schema中取出一批Key,这些数据的格式为key:Key ,value:rev;
    2. 将取出的rev依次与keep中存储的rev进行比较,如果存在于keep中保留,否则删除(这一部分删除的是数据链中rev->value这部分)
    3. 对于保留的数据将写入新建的hash中
  5. 如果获取的keys的长度小于batchNum,说明已经将所有数据写入hash,返回h
  6. 否则将last设置为最后一个这一批最后一个rev,将这批事务提交,等待ticker。
 
func (ti *treeIndex) Compact(rev int64) map[revision]struct{}
  1. 创建available 类型为revision map,用于存储需要保留的key的rev
  2. 给tree上锁并克隆,之后所有的操作在克隆出来的tree上进行操作,目的是为了保证在compact期间不影响集群正常功能。
  3. 对克隆出来的tree上的每一个keyIndex做一下操作
    1. 判空。如果当前keyIndex为空,则报错;
    2. 调用doCompacy函数,获取需要压缩的genIdx和revIndex信息
      1. 定义一个比较函数f,逻辑为如果当前rev小于给定的atRev,则将其加入available中,并返回false,否则返回true;其目的在于通过倒序遍历generation中的revs,找到第一个小于等于atRev的rev,并保留该rev。
      2. 初始化genIdx和g,genIdx为g的指示器,g为当前遍历的代;找到当前keyIndex中存在大于等于atRev的rev的generation,则该generation之前的g都应该被删除。
      3. 调用walk(f),对当前g中的每个rev执行f操作;倒序遍历当前g,找到第一个小于等于atRev的rev,并返回其在g中的位置;如果不存在这样的rev,则返回-1;
      4. 返回genIdx和revIndex
    3. 如果genIdx指示的g不为空,则
      1. 如果revIndex不等于-1,即表示g中存在小于等于atRev的rev,则将revIndex之前的版本全部删除;
      2. 如果g.revs的长度为1并且当前g不是该key的最后一代,说明该key已被放置墓碑,则将available中的该key对应的rev删除,表示该rev不需要保留,并将该genIdx+1.
    4. 将keyIndex的generations根据当前的genIdx重新赋值。
  4. 返回需要保留的rev的map available。
keep
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
    if ki.isEmpty() {
        return
    }
    //判空
    genIdx, revIndex := ki.doCompact(atRev, available)
    //调用docompact得到需要压缩的generation和rev
    g := &ki.generations[genIdx]
    if !g.isEmpty() {
        //删除保留版本数组中被放置了墓碑的版本
        if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
            delete(available, g.revs[revIndex])
        }
    }
}

func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
    available := make(map[revision]struct{})
    ti.RLock()
    defer ti.RUnlock()
    ti.tree.Ascend(func(keyi *keyIndex) bool {
        keyi.keep(rev, available)
        return true
    })
    return available
}
keep函数用于返回按照atRev版本进行compact时需要保留的keyIndex中的rev,与compact函数逻辑基本相同,只是不对keyIndex进行删除。
treeIndex中的keep函数也与compact函数逻辑基本相同,只是不需要克隆treeIndex,只需遍历当前treeIndex。
Restore
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
    if len(ki.generations) != 0 {
        lg.Panic(
            "'restore' got an unexpected non-empty generations",
            zap.Int("generations-size", len(ki.generations)),
        )
    }
    //如果generations不为空,则报错不可恢复

    ki.modified = modified//赋值最新版本
    g := generation{created: created, ver: ver, revs: []revision{modified}}
    ki.generations = append(ki.generations, g)//把给定恢复的generation插入
    keysGauge.Inc()
}
restore将数据恢复到指定版本
tombstone
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
    if ki.isEmpty() {
        lg.Panic(
            "'tombstone' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }
    //如果没有该key,则报错
    if ki.generations[len(ki.generations)-1].isEmpty() {
        return ErrRevisionNotFound
    }
    //防止重复置空
    ki.put(lg, main, sub)
    ki.generations = append(ki.generations, generation{})
    keysGauge.Dec()
    return nil
}
墓碑函数,用于指示该keyindex对应的key已经删除
0条评论
0 / 1000
z****n
2文章数
0粉丝数
z****n
2 文章 | 0 粉丝
z****n
2文章数
0粉丝数
z****n
2 文章 | 0 粉丝
原创

etcd索引treeIndex接口分析

2023-09-04 03:09:17
13
0
读写框架
 
为避免混淆,以下所有描述中,"key"、"value"表示结构体中的键值对,“键”、“值”表示从用户端存储的“key-value”数据对。
etcd中数据的整体结构由两部分组成,keyIndex存储由键到revision的映射,用于从用户给定的键查找对应版本号;boltdb存储revision到[键,值]的映射,用于通过版本号查找指定的数据信息。
这种设计满足了etcd多版本的特性,每个键可以保留多个版本的value。
 
 
keyIndex的数据结构中包含:
  • key []byte 用于存储键
  • modified revision 存储当前键值的最新版本号
  • generation 存储键的生命周期信息
    • ver 存储该声明周期内键修改次数
    • createed revision 存储generation创建时的版本号
    • revs 存储每次修改键后的revision
      • main 全局递增的版本号
      • sub 一个事务内的子版本号
接口分析
put
//treeIndex的Put函数
func (ti *treeIndex) Put(key []byte, rev revision) {
    keyi := &keyIndex{key: key}

    ti.Lock()
    defer ti.Unlock()
    okeyi, ok := ti.tree.Get(keyi)//范围搜索,所以可以keyi与tree中的键不完全相等。
    if !ok {
        keyi.put(ti.lg, rev.main, rev.sub)
        ti.tree.ReplaceOrInsert(keyi)
        return
    }
    okeyi.put(ti.lg, rev.main, rev.sub)
}
//keyIndex的put函数
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64){
 //省略日志处理部分(给定版本如果小于最新版本则返回错误)  
  if len(ki.generations) == 0 {
        ki.generations = append(ki.generations, generation{})
    }
    //如果是新建的key,则在generation中新建一条
    
    g := &ki.generations[len(ki.generations)-1]
    if len(g.revs) == 0 { // create a new key
        keysGauge.Inc()
        g.created = rev
    }
    //取出最新的一条generation,如果是新建key,则初始化rev
    g.revs = append(g.revs, rev)
    g.ver++
    ki.modified = rev
    //插入一条版本,修改次数+1,最新版本修改为rev
}
 
put函数向treeIndex中插入新写入的键,这里分为两种情况:
  • 创建键:创建一个新的keyIndex,key赋值为键值;向新建的keyIndex插入版本相关信息;最后插入tree
    • 由于新键,所以没有generation,所以新键并插入rev
  • 复写键:找到对应的keyIndex,写入版本信息。
get
//treeIndex的unsafeGet函数(Get函数实际调用接口)
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
    keyi := &keyIndex{key: key}//根据参数key设置初始化keyIndex结构体
    if keyi = ti.keyIndex(keyi); keyi == nil {
        return revision{}, revision{}, 0, ErrRevisionNotFound
    }//判断是否存在该key
    return keyi.get(ti.lg, atRev)//调用keyIndex的get函数
}

//keyIndex的get函数
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
    if ki.isEmpty() {
        lg.Panic(
            "'get' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }//判空
    g := ki.findGeneration(atRev)//根据atRev找到对应的generation,通过比对每个generation的第一个版本号和最后一个版本号确定atRev是否在当前generation中。
    if g.isEmpty() {
        return revision{}, revision{}, 0, ErrRevisionNotFound
    }
    //判空
    n := g.walk(func(rev revision) bool { return rev.main > atRev })
    if n != -1 {
        return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
    }
    //遍历g,找到第一个小于等于atRev的版本。返回该版本的相关信息
    return revision{}, revision{}, 0, ErrRevisionNotFound
    //未找到则返回空和错误
}
 
get的最上层输入是key和atRev,key用来在treeIndex寻找keyIndex,atRev用于在keyIndex中寻找符合条件的版本,最终得到的输出是key对应的最新版本号、创建版本号、修改次数信息,后续得到value依靠的是最新版本号去boltdb中读取。
这里需要注意的是,参数atRev并非是找到key的atRev版本,而是找到key小于且最接近atRev的版本。
 
since
func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
    if ki.isEmpty() {
        lg.Panic(
            "'since' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }
    //判空
    since := revision{rev, 0}
    var gi int
    // find the generations to start checking
    for gi = len(ki.generations) - 1; gi > 0; gi-- {
        g := ki.generations[gi]
        if g.isEmpty() {
            continue
        }
        if since.GreaterThan(g.created) {
            break
        }
    }
    //找到创建版本号小于rev的generation,从该g开始遍历
    var revs []revision
    var last int64
    for ; gi < len(ki.generations); gi++ {
        for _, r := range ki.generations[gi].revs {
            if since.GreaterThan(r) {
                continue
            }
            if r.main == last {
                //在一个事务中对同一个键值对多次操作会导致出现多个版本main相同的rev,因此需要取子版本号sub最大的返回
                revs[len(revs)-1] = r
                continue
            }
            revs = append(revs, r)
            last = r.main
        }
    }
    return revs
}
这是一个未用到的函数接口,其引用仅在key_index_test.go中存在。该函数的作用是找到给定rev之后的所有版本,如果存在一个tx中对该键的多次操作导致的rev相同,则返回子版本sub最大的版本。
 
compact
 
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error)
  1. 创建信道,用于通知compact结果;
  2. 创建一个调度任务:
    1. 上下文检查;
    2. 执行scheduleCompaction函数,获取返回的hash;
    3. 验证上次compact的hash是否计算完毕,如果计算完毕则存储当前hash结果,否则跳过(为了保证数据的正确性和连续性);
    4. 关闭信道。
  3. 将这个调度任务加入fifo队列;
  4. 返回信道和错误。
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error)
  1. 压缩操作初始化,设置计时;
  2. 执行treeIndex的compact函数,获取需要保留的rev的map keep;
  3. 设置需要从内存中读取的数据的范围,[last,end],last为上次compact的版本,即现存的最小版本,end为命令中指定的版本compactMainRev+1;
  4. 进入循环处理batch:
    1. 从schema中取出一批Key,这些数据的格式为key:Key ,value:rev;
    2. 将取出的rev依次与keep中存储的rev进行比较,如果存在于keep中保留,否则删除(这一部分删除的是数据链中rev->value这部分)
    3. 对于保留的数据将写入新建的hash中
  5. 如果获取的keys的长度小于batchNum,说明已经将所有数据写入hash,返回h
  6. 否则将last设置为最后一个这一批最后一个rev,将这批事务提交,等待ticker。
 
func (ti *treeIndex) Compact(rev int64) map[revision]struct{}
  1. 创建available 类型为revision map,用于存储需要保留的key的rev
  2. 给tree上锁并克隆,之后所有的操作在克隆出来的tree上进行操作,目的是为了保证在compact期间不影响集群正常功能。
  3. 对克隆出来的tree上的每一个keyIndex做一下操作
    1. 判空。如果当前keyIndex为空,则报错;
    2. 调用doCompacy函数,获取需要压缩的genIdx和revIndex信息
      1. 定义一个比较函数f,逻辑为如果当前rev小于给定的atRev,则将其加入available中,并返回false,否则返回true;其目的在于通过倒序遍历generation中的revs,找到第一个小于等于atRev的rev,并保留该rev。
      2. 初始化genIdx和g,genIdx为g的指示器,g为当前遍历的代;找到当前keyIndex中存在大于等于atRev的rev的generation,则该generation之前的g都应该被删除。
      3. 调用walk(f),对当前g中的每个rev执行f操作;倒序遍历当前g,找到第一个小于等于atRev的rev,并返回其在g中的位置;如果不存在这样的rev,则返回-1;
      4. 返回genIdx和revIndex
    3. 如果genIdx指示的g不为空,则
      1. 如果revIndex不等于-1,即表示g中存在小于等于atRev的rev,则将revIndex之前的版本全部删除;
      2. 如果g.revs的长度为1并且当前g不是该key的最后一代,说明该key已被放置墓碑,则将available中的该key对应的rev删除,表示该rev不需要保留,并将该genIdx+1.
    4. 将keyIndex的generations根据当前的genIdx重新赋值。
  4. 返回需要保留的rev的map available。
keep
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
    if ki.isEmpty() {
        return
    }
    //判空
    genIdx, revIndex := ki.doCompact(atRev, available)
    //调用docompact得到需要压缩的generation和rev
    g := &ki.generations[genIdx]
    if !g.isEmpty() {
        //删除保留版本数组中被放置了墓碑的版本
        if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
            delete(available, g.revs[revIndex])
        }
    }
}

func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
    available := make(map[revision]struct{})
    ti.RLock()
    defer ti.RUnlock()
    ti.tree.Ascend(func(keyi *keyIndex) bool {
        keyi.keep(rev, available)
        return true
    })
    return available
}
keep函数用于返回按照atRev版本进行compact时需要保留的keyIndex中的rev,与compact函数逻辑基本相同,只是不对keyIndex进行删除。
treeIndex中的keep函数也与compact函数逻辑基本相同,只是不需要克隆treeIndex,只需遍历当前treeIndex。
Restore
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
    if len(ki.generations) != 0 {
        lg.Panic(
            "'restore' got an unexpected non-empty generations",
            zap.Int("generations-size", len(ki.generations)),
        )
    }
    //如果generations不为空,则报错不可恢复

    ki.modified = modified//赋值最新版本
    g := generation{created: created, ver: ver, revs: []revision{modified}}
    ki.generations = append(ki.generations, g)//把给定恢复的generation插入
    keysGauge.Inc()
}
restore将数据恢复到指定版本
tombstone
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
    if ki.isEmpty() {
        lg.Panic(
            "'tombstone' got an unexpected empty keyIndex",
            zap.String("key", string(ki.key)),
        )
    }
    //如果没有该key,则报错
    if ki.generations[len(ki.generations)-1].isEmpty() {
        return ErrRevisionNotFound
    }
    //防止重复置空
    ki.put(lg, main, sub)
    ki.generations = append(ki.generations, generation{})
    keysGauge.Dec()
    return nil
}
墓碑函数,用于指示该keyindex对应的key已经删除
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0