searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Linux - Sbitmap

2025-07-31 03:00:11
0
0

Introduction

概述

传统bitmap是一个整体全局变量,这就会带来如下问题:

  • 并发访问需要加锁保护,竞争非常激烈时锁的开销不可忽略
  • 由于是共享变量,每一次write操作内部都会有大量MESI消息,且会造成cache颠簸

sbitmap(scalable bitmap), 重点在于scalable, 那么如何可扩展呢?核心思想是:将整个bitmap进行拆分,将全局竞争变为局部竞争。这是一种通用的思路,例如 Tree RCU 对 Classic RCU 的取代。

bitmap和sbitmap各有优劣。sbitmapp消耗了更多内存资源,且获取一个bit使用的指令也更多,适用于多核竞争激烈的场景;故在竞争可控的场景下,建议还是选择bitmap。


‍‍

Sbitmap实现

概述

本章重点介绍sbitmap的实现,对应源码:

linux-6.6/include/linux/bitmap.h
linux-6.6/lib/bitmap.c

数据结构

/*
 * sbitmap: scalable bitmap, 本质是bitmap,如何scalab呢?
 * 传统bitmap是整体,并发访问时需要一把全局锁。当竞争变激烈时,会造成性能严重下滑。
 * sbitmap将bitmap切割为小块map,这样可以大大减少竞争,特别是在per-cpu的竞争中。同时考虑cacheline(如何理解?)
 * depth表示总的bit数量; 1<<shift表示每一块bit的数量; map_nr表示小块map的数量
 */
struct sbitmap {
	unsigned int depth; //总bit的数量
	unsigned int shift; //每个map管理的bit数量, 不能大于一个word(unsigned long)
	unsigned int map_nr; //map的数量 = depth / (1 << shift)
	struct sbitmap_word *map; //bitmap区域

	/* round_robin只是影响单个map查询是从offset=0开始(false),还是offset=hint开始(true).
	 * 在实际代码中会影响hint, warp两个变量,但本质是一件事: 若从offset=hint开始查询,
	 * 没找到需要wrap. 故代码里面感觉两者可以合并
	 */
	bool round_robin;

	/* per_cpu的变量,用于记录上一次该cpu获取和释放到bit的信息nrbit, 加快bit的获取
	 * get:下一次的起点是nrbit+1;  put:下一次的起点是上次释放的位置
	 * 从get接口来看,若round_robin=false, alloc_hint好像也没有任何作用?
	 */
	unsigned int __percpu *alloc_hint;
};

/*
 * 当free时,归还者仅标记对应cleard即可,不会操作word. 仅当word已经满时会将cleard记录的归还.
 * 这样保证了word仅被get动作改变, 错开的get和free操作, 金少竞争。
 * 为什么 word 不标记为cacheline对齐?
 */
struct sbitmap_word {
	unsigned long word;
	unsigned long cleared ____cacheline_aligned_in_smp;
} ____cacheline_aligned_in_smp;

API

/* 分配+初始化一个sbmitmap内部资源 */
//alloc_hin=false, 不能使用sbitmap_get()
int sbitmap_init_node(struct sbitmap *sb, unsigned int depth, int shift,
		      gfp_t flags, int node, bool round_robin, bool alloc_hint);
void sbitmap_free(struct sbitmap *sb);

/* 获取/释放 对应的bit */
int sbitmap_get(struct sbitmap *sb); //仅支持hint的模式分配
void sbitmap_put(struct sbitmap *sb, unsigned int bitnr)

bool sbitmap_any_bit_set(const struct sbitmap *sb);

实现流程

这里重点分析get主要流程,其他操作可以从该流程推导出

sbitmap_get()
|-update_alloc_hint_before_get() //获取上个cpu的hint
|-__sbitmap_get() //获取到哪个bit是空闲的, 首先找到map,然后找到map中对应的bit. RR为true表示要从offset=hint开始查询
|-update_alloc_hint_after_get() //更新该cpu本次访问的hint

代码流程

int sbitmap_get(struct sbitmap *sb)
{
	...
	depth = READ_ONCE(sb->depth);
	hint = update_alloc_hint_before_get(sb, depth); //读取sb->alloc_hint
	nr = __sbitmap_get(sb, hint);
	update_alloc_hint_after_get(sb, depth, hint, nr); //将nr+1写入到sb->alloc_hint,下次获取就可以快速的找到
	...
}

/*
 * 分配的核心逻辑:首先找到对应的map,然后从map中找到合适的bit
 * round_robin会用到hint? 非RR从0开始
 */
static int __sbitmap_get(struct sbitmap *sb, unsigned int alloc_hint)
{
	unsigned int index;

	//找到对应的map: alloc_hint>>sb->shift
	index = SB_NR_TO_INDEX(sb, alloc_hint);
	if (sb->round_robin)
		alloc_hint = SB_NR_TO_BIT(sb, alloc_hint); //找到map对应的bit
	else
		alloc_hint = 0; //从bit0开始分配

	return sbitmap_find_bit(sb, UINT_MAX, index, alloc_hint, !sb->round_robin);
}


static int sbitmap_find_bit(struct sbitmap *sb, unsigned int depth, unsigned int index, unsigned int alloc_hint, bool wrap)
{
	unsigned int i;
	int nr = -1;

	/* 最外层for仅控制次数,i并不控制从哪个map开始分配, 由index指定.
	 * hint仅适用于最开始预期map中查找,当切换到下一个map,默认从0开始,这也满足rr策略.
	 * wrap由底层逻辑处理
	 */
	for (i = 0; i < sb->map_nr; i++) {
		nr = sbitmap_find_bit_in_word(&sb->map[index], min_t(unsigned int, __map_depth(sb, index), depth), alloc_hint, wrap);

		if (nr != -1) {
			nr += index << sb->shift; //转换为用户视角需要的nr(用户只关心整体bitmap顺序)
			break;
		}

		//++index表示跳转到下一个map
		alloc_hint = 0; //在预期外的map分配bit时,采用默认为0的地方开始搜寻
		if (++index >= sb->map_nr)
			index = 0;
	}

	return nr;
}

static int sbitmap_find_bit_in_word(struct sbitmap_word *map,
				    unsigned int depth,
				    unsigned int alloc_hint,
				    bool wrap)
{
	int nr;

	do {
		nr = __sbitmap_get_word(&map->word, depth, alloc_hint, wrap);
		if (nr != -1)
			break;
		if (!sbitmap_deferred_clear(map)) //若没要找到,需要将map->cleared已经释放的归还到map->word
			break;
	} while (1);

	return nr;
}


static int __sbitmap_get_word(unsigned long *word, unsigned long depth, unsigned int hint, bool wrap)
{
	int nr;

	/* don't wrap if starting from 0 */
	//为了保证完整查询了bitmap, wrap仅在在hint非零时才有意义,因为要回滚。从0开始查询没有必要回滚
	//wrap在查询中核心就是如果找到了从hint开始没有找到,就从投开始在查询一遍
	wrap = wrap && hint;

	while (1) {
		nr = find_next_zero_bit(word, depth, hint);
		if (unlikely(nr >= depth)) { //从offset开始查询,没有找到
			/*
			 * We started with an offset, and we didn't reset the
			 * offset to 0 in a failure case, so start from 0 to
			 * exhaust the map.
			 */
			if (hint && wrap) { //从offset=0重新查询一遍
				hint = 0;
				continue;
			}
			return -1;
		}

		if (!test_and_set_bit_lock(nr, word))
			break;

		//set失败了,该bit被其他人抢占,此时继续从下一个bit查询
		hint = nr + 1;
		if (hint >= depth - 1)
			hint = 0;
	}

	return nr;
}

案例

内核中直接使用sbitmap的模块并不多,由 blk的hctx 和 scsi模块

scsi sbitmap

scsi的利用方式比较简单,由于alloc_hin=true, 可以使用sbitmap_get()

struct scsi_device {
	...
	struct sbitmap budget_map;
	...
}

int scsi_realloc_sdev_budget_map(struct scsi_device *sdev, unsigned int depth)
{
	...
	//由于
	ret = sbitmap_init_node(&sdev->budget_map, scsi_device_max_queue_depth(sdev),
				new_shift, GFP_KERNEL, sdev->request_queue->node, false, true);
	...
}

int scsi_dev_queue_ready(struct request_queue *q, struct scsi_device *sdev)
{
	...
	token = sbitmap_get(&sdev->budget_map);
	...
}

hctx sbitmap

htctx的alloc_hin=false, 不能使用sbitmap_get(),所以从其他角度来使用

struct blk_mq_hw_ctx *blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, int node)
{
	...
	sbitmap_init_node(&hctx->ctx_map, nr_cpu_ids, ilog2(8), gfp, node, false, false);
	...
}


static void blk_mq_hctx_mark_pending(struct blk_mq_hw_ctx *hctx,
				     struct blk_mq_ctx *ctx)
{
	const int bit = ctx->index_hw[hctx->type];

	if (!sbitmap_test_bit(&hctx->ctx_map, bit))
		sbitmap_set_bit(&hctx->ctx_map, bit);
}
0条评论
0 / 1000
c****k
3文章数
1粉丝数
c****k
3 文章 | 1 粉丝
c****k
3文章数
1粉丝数
c****k
3 文章 | 1 粉丝
原创

Linux - Sbitmap

2025-07-31 03:00:11
0
0

Introduction

概述

传统bitmap是一个整体全局变量,这就会带来如下问题:

  • 并发访问需要加锁保护,竞争非常激烈时锁的开销不可忽略
  • 由于是共享变量,每一次write操作内部都会有大量MESI消息,且会造成cache颠簸

sbitmap(scalable bitmap), 重点在于scalable, 那么如何可扩展呢?核心思想是:将整个bitmap进行拆分,将全局竞争变为局部竞争。这是一种通用的思路,例如 Tree RCU 对 Classic RCU 的取代。

bitmap和sbitmap各有优劣。sbitmapp消耗了更多内存资源,且获取一个bit使用的指令也更多,适用于多核竞争激烈的场景;故在竞争可控的场景下,建议还是选择bitmap。


‍‍

Sbitmap实现

概述

本章重点介绍sbitmap的实现,对应源码:

linux-6.6/include/linux/bitmap.h
linux-6.6/lib/bitmap.c

数据结构

/*
 * sbitmap: scalable bitmap, 本质是bitmap,如何scalab呢?
 * 传统bitmap是整体,并发访问时需要一把全局锁。当竞争变激烈时,会造成性能严重下滑。
 * sbitmap将bitmap切割为小块map,这样可以大大减少竞争,特别是在per-cpu的竞争中。同时考虑cacheline(如何理解?)
 * depth表示总的bit数量; 1<<shift表示每一块bit的数量; map_nr表示小块map的数量
 */
struct sbitmap {
	unsigned int depth; //总bit的数量
	unsigned int shift; //每个map管理的bit数量, 不能大于一个word(unsigned long)
	unsigned int map_nr; //map的数量 = depth / (1 << shift)
	struct sbitmap_word *map; //bitmap区域

	/* round_robin只是影响单个map查询是从offset=0开始(false),还是offset=hint开始(true).
	 * 在实际代码中会影响hint, warp两个变量,但本质是一件事: 若从offset=hint开始查询,
	 * 没找到需要wrap. 故代码里面感觉两者可以合并
	 */
	bool round_robin;

	/* per_cpu的变量,用于记录上一次该cpu获取和释放到bit的信息nrbit, 加快bit的获取
	 * get:下一次的起点是nrbit+1;  put:下一次的起点是上次释放的位置
	 * 从get接口来看,若round_robin=false, alloc_hint好像也没有任何作用?
	 */
	unsigned int __percpu *alloc_hint;
};

/*
 * 当free时,归还者仅标记对应cleard即可,不会操作word. 仅当word已经满时会将cleard记录的归还.
 * 这样保证了word仅被get动作改变, 错开的get和free操作, 金少竞争。
 * 为什么 word 不标记为cacheline对齐?
 */
struct sbitmap_word {
	unsigned long word;
	unsigned long cleared ____cacheline_aligned_in_smp;
} ____cacheline_aligned_in_smp;

API

/* 分配+初始化一个sbmitmap内部资源 */
//alloc_hin=false, 不能使用sbitmap_get()
int sbitmap_init_node(struct sbitmap *sb, unsigned int depth, int shift,
		      gfp_t flags, int node, bool round_robin, bool alloc_hint);
void sbitmap_free(struct sbitmap *sb);

/* 获取/释放 对应的bit */
int sbitmap_get(struct sbitmap *sb); //仅支持hint的模式分配
void sbitmap_put(struct sbitmap *sb, unsigned int bitnr)

bool sbitmap_any_bit_set(const struct sbitmap *sb);

实现流程

这里重点分析get主要流程,其他操作可以从该流程推导出

sbitmap_get()
|-update_alloc_hint_before_get() //获取上个cpu的hint
|-__sbitmap_get() //获取到哪个bit是空闲的, 首先找到map,然后找到map中对应的bit. RR为true表示要从offset=hint开始查询
|-update_alloc_hint_after_get() //更新该cpu本次访问的hint

代码流程

int sbitmap_get(struct sbitmap *sb)
{
	...
	depth = READ_ONCE(sb->depth);
	hint = update_alloc_hint_before_get(sb, depth); //读取sb->alloc_hint
	nr = __sbitmap_get(sb, hint);
	update_alloc_hint_after_get(sb, depth, hint, nr); //将nr+1写入到sb->alloc_hint,下次获取就可以快速的找到
	...
}

/*
 * 分配的核心逻辑:首先找到对应的map,然后从map中找到合适的bit
 * round_robin会用到hint? 非RR从0开始
 */
static int __sbitmap_get(struct sbitmap *sb, unsigned int alloc_hint)
{
	unsigned int index;

	//找到对应的map: alloc_hint>>sb->shift
	index = SB_NR_TO_INDEX(sb, alloc_hint);
	if (sb->round_robin)
		alloc_hint = SB_NR_TO_BIT(sb, alloc_hint); //找到map对应的bit
	else
		alloc_hint = 0; //从bit0开始分配

	return sbitmap_find_bit(sb, UINT_MAX, index, alloc_hint, !sb->round_robin);
}


static int sbitmap_find_bit(struct sbitmap *sb, unsigned int depth, unsigned int index, unsigned int alloc_hint, bool wrap)
{
	unsigned int i;
	int nr = -1;

	/* 最外层for仅控制次数,i并不控制从哪个map开始分配, 由index指定.
	 * hint仅适用于最开始预期map中查找,当切换到下一个map,默认从0开始,这也满足rr策略.
	 * wrap由底层逻辑处理
	 */
	for (i = 0; i < sb->map_nr; i++) {
		nr = sbitmap_find_bit_in_word(&sb->map[index], min_t(unsigned int, __map_depth(sb, index), depth), alloc_hint, wrap);

		if (nr != -1) {
			nr += index << sb->shift; //转换为用户视角需要的nr(用户只关心整体bitmap顺序)
			break;
		}

		//++index表示跳转到下一个map
		alloc_hint = 0; //在预期外的map分配bit时,采用默认为0的地方开始搜寻
		if (++index >= sb->map_nr)
			index = 0;
	}

	return nr;
}

static int sbitmap_find_bit_in_word(struct sbitmap_word *map,
				    unsigned int depth,
				    unsigned int alloc_hint,
				    bool wrap)
{
	int nr;

	do {
		nr = __sbitmap_get_word(&map->word, depth, alloc_hint, wrap);
		if (nr != -1)
			break;
		if (!sbitmap_deferred_clear(map)) //若没要找到,需要将map->cleared已经释放的归还到map->word
			break;
	} while (1);

	return nr;
}


static int __sbitmap_get_word(unsigned long *word, unsigned long depth, unsigned int hint, bool wrap)
{
	int nr;

	/* don't wrap if starting from 0 */
	//为了保证完整查询了bitmap, wrap仅在在hint非零时才有意义,因为要回滚。从0开始查询没有必要回滚
	//wrap在查询中核心就是如果找到了从hint开始没有找到,就从投开始在查询一遍
	wrap = wrap && hint;

	while (1) {
		nr = find_next_zero_bit(word, depth, hint);
		if (unlikely(nr >= depth)) { //从offset开始查询,没有找到
			/*
			 * We started with an offset, and we didn't reset the
			 * offset to 0 in a failure case, so start from 0 to
			 * exhaust the map.
			 */
			if (hint && wrap) { //从offset=0重新查询一遍
				hint = 0;
				continue;
			}
			return -1;
		}

		if (!test_and_set_bit_lock(nr, word))
			break;

		//set失败了,该bit被其他人抢占,此时继续从下一个bit查询
		hint = nr + 1;
		if (hint >= depth - 1)
			hint = 0;
	}

	return nr;
}

案例

内核中直接使用sbitmap的模块并不多,由 blk的hctx 和 scsi模块

scsi sbitmap

scsi的利用方式比较简单,由于alloc_hin=true, 可以使用sbitmap_get()

struct scsi_device {
	...
	struct sbitmap budget_map;
	...
}

int scsi_realloc_sdev_budget_map(struct scsi_device *sdev, unsigned int depth)
{
	...
	//由于
	ret = sbitmap_init_node(&sdev->budget_map, scsi_device_max_queue_depth(sdev),
				new_shift, GFP_KERNEL, sdev->request_queue->node, false, true);
	...
}

int scsi_dev_queue_ready(struct request_queue *q, struct scsi_device *sdev)
{
	...
	token = sbitmap_get(&sdev->budget_map);
	...
}

hctx sbitmap

htctx的alloc_hin=false, 不能使用sbitmap_get(),所以从其他角度来使用

struct blk_mq_hw_ctx *blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, int node)
{
	...
	sbitmap_init_node(&hctx->ctx_map, nr_cpu_ids, ilog2(8), gfp, node, false, false);
	...
}


static void blk_mq_hctx_mark_pending(struct blk_mq_hw_ctx *hctx,
				     struct blk_mq_ctx *ctx)
{
	const int bit = ctx->index_hw[hctx->type];

	if (!sbitmap_test_bit(&hctx->ctx_map, bit))
		sbitmap_set_bit(&hctx->ctx_map, bit);
}
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0