Introduction
概述
传统bitmap是一个整体全局变量,这就会带来如下问题:
- 并发访问需要加锁保护,竞争非常激烈时锁的开销不可忽略
- 由于是共享变量,每一次write操作内部都会有大量MESI消息,且会造成cache颠簸
sbitmap(scalable bitmap), 重点在于scalable, 那么如何可扩展呢?核心思想是:将整个bitmap进行拆分,将全局竞争变为局部竞争。这是一种通用的思路,例如 Tree RCU 对 Classic RCU 的取代。
bitmap和sbitmap各有优劣。sbitmapp消耗了更多内存资源,且获取一个bit使用的指令也更多,适用于多核竞争激烈的场景;故在竞争可控的场景下,建议还是选择bitmap。
Sbitmap实现
概述
本章重点介绍sbitmap的实现,对应源码:
linux-6.6/include/linux/bitmap.h
linux-6.6/lib/bitmap.c
数据结构
/*
* sbitmap: scalable bitmap, 本质是bitmap,如何scalab呢?
* 传统bitmap是整体,并发访问时需要一把全局锁。当竞争变激烈时,会造成性能严重下滑。
* sbitmap将bitmap切割为小块map,这样可以大大减少竞争,特别是在per-cpu的竞争中。同时考虑cacheline(如何理解?)
* depth表示总的bit数量; 1<<shift表示每一块bit的数量; map_nr表示小块map的数量
*/
struct sbitmap {
unsigned int depth; //总bit的数量
unsigned int shift; //每个map管理的bit数量, 不能大于一个word(unsigned long)
unsigned int map_nr; //map的数量 = depth / (1 << shift)
struct sbitmap_word *map; //bitmap区域
/* round_robin只是影响单个map查询是从offset=0开始(false),还是offset=hint开始(true).
* 在实际代码中会影响hint, warp两个变量,但本质是一件事: 若从offset=hint开始查询,
* 没找到需要wrap. 故代码里面感觉两者可以合并
*/
bool round_robin;
/* per_cpu的变量,用于记录上一次该cpu获取和释放到bit的信息nrbit, 加快bit的获取
* get:下一次的起点是nrbit+1; put:下一次的起点是上次释放的位置
* 从get接口来看,若round_robin=false, alloc_hint好像也没有任何作用?
*/
unsigned int __percpu *alloc_hint;
};
/*
* 当free时,归还者仅标记对应cleard即可,不会操作word. 仅当word已经满时会将cleard记录的归还.
* 这样保证了word仅被get动作改变, 错开的get和free操作, 金少竞争。
* 为什么 word 不标记为cacheline对齐?
*/
struct sbitmap_word {
unsigned long word;
unsigned long cleared ____cacheline_aligned_in_smp;
} ____cacheline_aligned_in_smp;
API
/* 分配+初始化一个sbmitmap内部资源 */
//alloc_hin=false, 不能使用sbitmap_get()
int sbitmap_init_node(struct sbitmap *sb, unsigned int depth, int shift,
gfp_t flags, int node, bool round_robin, bool alloc_hint);
void sbitmap_free(struct sbitmap *sb);
/* 获取/释放 对应的bit */
int sbitmap_get(struct sbitmap *sb); //仅支持hint的模式分配
void sbitmap_put(struct sbitmap *sb, unsigned int bitnr)
bool sbitmap_any_bit_set(const struct sbitmap *sb);
实现流程
这里重点分析get主要流程,其他操作可以从该流程推导出
sbitmap_get()
|-update_alloc_hint_before_get() //获取上个cpu的hint
|-__sbitmap_get() //获取到哪个bit是空闲的, 首先找到map,然后找到map中对应的bit. RR为true表示要从offset=hint开始查询
|-update_alloc_hint_after_get() //更新该cpu本次访问的hint
代码流程
int sbitmap_get(struct sbitmap *sb)
{
...
depth = READ_ONCE(sb->depth);
hint = update_alloc_hint_before_get(sb, depth); //读取sb->alloc_hint
nr = __sbitmap_get(sb, hint);
update_alloc_hint_after_get(sb, depth, hint, nr); //将nr+1写入到sb->alloc_hint,下次获取就可以快速的找到
...
}
/*
* 分配的核心逻辑:首先找到对应的map,然后从map中找到合适的bit
* round_robin会用到hint? 非RR从0开始
*/
static int __sbitmap_get(struct sbitmap *sb, unsigned int alloc_hint)
{
unsigned int index;
//找到对应的map: alloc_hint>>sb->shift
index = SB_NR_TO_INDEX(sb, alloc_hint);
if (sb->round_robin)
alloc_hint = SB_NR_TO_BIT(sb, alloc_hint); //找到map对应的bit
else
alloc_hint = 0; //从bit0开始分配
return sbitmap_find_bit(sb, UINT_MAX, index, alloc_hint, !sb->round_robin);
}
static int sbitmap_find_bit(struct sbitmap *sb, unsigned int depth, unsigned int index, unsigned int alloc_hint, bool wrap)
{
unsigned int i;
int nr = -1;
/* 最外层for仅控制次数,i并不控制从哪个map开始分配, 由index指定.
* hint仅适用于最开始预期map中查找,当切换到下一个map,默认从0开始,这也满足rr策略.
* wrap由底层逻辑处理
*/
for (i = 0; i < sb->map_nr; i++) {
nr = sbitmap_find_bit_in_word(&sb->map[index], min_t(unsigned int, __map_depth(sb, index), depth), alloc_hint, wrap);
if (nr != -1) {
nr += index << sb->shift; //转换为用户视角需要的nr(用户只关心整体bitmap顺序)
break;
}
//++index表示跳转到下一个map
alloc_hint = 0; //在预期外的map分配bit时,采用默认为0的地方开始搜寻
if (++index >= sb->map_nr)
index = 0;
}
return nr;
}
static int sbitmap_find_bit_in_word(struct sbitmap_word *map,
unsigned int depth,
unsigned int alloc_hint,
bool wrap)
{
int nr;
do {
nr = __sbitmap_get_word(&map->word, depth, alloc_hint, wrap);
if (nr != -1)
break;
if (!sbitmap_deferred_clear(map)) //若没要找到,需要将map->cleared已经释放的归还到map->word
break;
} while (1);
return nr;
}
static int __sbitmap_get_word(unsigned long *word, unsigned long depth, unsigned int hint, bool wrap)
{
int nr;
/* don't wrap if starting from 0 */
//为了保证完整查询了bitmap, wrap仅在在hint非零时才有意义,因为要回滚。从0开始查询没有必要回滚
//wrap在查询中核心就是如果找到了从hint开始没有找到,就从投开始在查询一遍
wrap = wrap && hint;
while (1) {
nr = find_next_zero_bit(word, depth, hint);
if (unlikely(nr >= depth)) { //从offset开始查询,没有找到
/*
* We started with an offset, and we didn't reset the
* offset to 0 in a failure case, so start from 0 to
* exhaust the map.
*/
if (hint && wrap) { //从offset=0重新查询一遍
hint = 0;
continue;
}
return -1;
}
if (!test_and_set_bit_lock(nr, word))
break;
//set失败了,该bit被其他人抢占,此时继续从下一个bit查询
hint = nr + 1;
if (hint >= depth - 1)
hint = 0;
}
return nr;
}
案例
内核中直接使用sbitmap的模块并不多,由 blk的hctx 和 scsi模块
scsi sbitmap
scsi的利用方式比较简单,由于alloc_hin=true, 可以使用sbitmap_get()
struct scsi_device {
...
struct sbitmap budget_map;
...
}
int scsi_realloc_sdev_budget_map(struct scsi_device *sdev, unsigned int depth)
{
...
//由于
ret = sbitmap_init_node(&sdev->budget_map, scsi_device_max_queue_depth(sdev),
new_shift, GFP_KERNEL, sdev->request_queue->node, false, true);
...
}
int scsi_dev_queue_ready(struct request_queue *q, struct scsi_device *sdev)
{
...
token = sbitmap_get(&sdev->budget_map);
...
}
hctx sbitmap
htctx的alloc_hin=false, 不能使用sbitmap_get(),所以从其他角度来使用
struct blk_mq_hw_ctx *blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, int node)
{
...
sbitmap_init_node(&hctx->ctx_map, nr_cpu_ids, ilog2(8), gfp, node, false, false);
...
}
static void blk_mq_hctx_mark_pending(struct blk_mq_hw_ctx *hctx,
struct blk_mq_ctx *ctx)
{
const int bit = ctx->index_hw[hctx->type];
if (!sbitmap_test_bit(&hctx->ctx_map, bit))
sbitmap_set_bit(&hctx->ctx_map, bit);
}