from collections import Counter
import subprocess
def parse_cpulist(cpulist_str):
cpus = []
for part in cpulist_str.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
cpus.extend(range(start, end + 1))
else:
cpus.append(int(part))
return cpus
def get_nic_status(interface):
# 获取网卡的状态
try:
result = subprocess.run(['cat', f'/sys/class/net/{interface}/operstate'], capture_output=True, text=True, check=True)
status = result.stdout.strip()
except subprocess.CalledProcessError:
status = 'unknown'
return status
def get_nic_numa_info(interface):
# 获取网卡的 NUMA 节点信息
try:
result = subprocess.run(['cat', f'/sys/class/net/{interface}/device/numa_node'], capture_output=True, text=True, check=True)
numa_node = int(result.stdout.strip())
except subprocess.CalledProcessError:
numa_node = -1 # 如果获取失败,设置为 -1
# 获取 NUMA 节点上的 CPU 列表
try:
result = subprocess.run(['cat', f'/sys/devices/system/node/node{numa_node}/cpulist'], capture_output=True, text=True, check=True)
cpulist = result.stdout.strip()
cpus = parse_cpulist(cpulist)
except subprocess.CalledProcessError:
cpus = []
return {
'interface': interface,
'numa_node': numa_node,
'cpus': cpus
}
def get_all_nics():
# 获取系统中所有的网卡名称
try:
result = subprocess.run(['ls', '/sys/class/net'], capture_output=True, text=True, check=True)
nics = result.stdout.strip().split('\n')
except subprocess.CalledProcessError:
nics = []
return nics
def merge_cpus(nic_info_list):
all_cpus = set()
for info in nic_info_list:
all_cpus.update(info['cpus'])
return list(all_cpus)
def allocate_processes_to_cpus(total_processes, total_cpus, included_cpus, ratio):
included_count = len(included_cpus)
excluded_count = total_cpus - included_count
included_processes = total_processes // (ratio + 3)
excluded_processes = total_processes - included_processes
included_cpu_allocation = {cpu: 0 for cpu in included_cpus}
excluded_cpu_allocation = {cpu: 0 for cpu in range(total_cpus) if cpu not in included_cpus}
included_cpu_allocation1 = [cpu for cpu in included_cpus]
excluded_cpu_allocation1 = [cpu for cpu in range(total_cpus) if cpu not in included_cpus]
process_cpu_dict = {}
for i in range(included_processes):
process_cpu_dict[i] = included_cpu_allocation1[i % len(included_cpu_allocation1)]
for i in range(included_processes, total_processes):
process_cpu_dict[i] = excluded_cpu_allocation1[i % len(excluded_cpu_allocation1)]
# 提取字典的所有值
values = process_cpu_dict.values()
# 使用Counter统计每个值出现的次数
value_counts = Counter(values)
# 将Counter对象转换为字典
value_counts_dict = dict(value_counts)
# 按照键(这里的键是指原始字典的值)从小到大排序
sorted_value_counts = sorted(value_counts_dict.items())
# 如果需要将排序结果转换回字典形式
sorted_value_counts_dict = dict(sorted_value_counts)
# 打印排序后的结果
print(sorted_value_counts) # 排序后的元组列表
print(sorted_value_counts_dict) # 排序后的字典
# 打印结果
print("开始\n\n",value_counts)
print("\n\n")
print(process_cpu_dict)
print("\n\n")
return process_cpu_dict
# # 分配进程到 included_cpus
# for i in range(included_processes):
# included_cpu_allocation[included_cpus[i % included_count]] += 1
# # 分配进程到 excluded_cpus
# for i in range(excluded_processes):
# excluded_cpu_allocation[list(excluded_cpu_allocation.keys())[i % excluded_count]] += 1
# return included_cpu_allocation, excluded_cpu_allocation
if __name__ == "__main__":
nics = get_all_nics()
nic_info_list = []
for nic in nics:
status = get_nic_status(nic)
if status == 'up':
nic_info = get_nic_numa_info(nic)
nic_info_list.append(nic_info)
print(nic_info)
else:
print(f"{nic} is {status}, skipping...")
# 打印所有网卡的信息
print("All UP NICs and their NUMA information:")
for info in nic_info_list:
print(info)
# 合并所有网卡的 CPU 列表
all_cpus = merge_cpus(nic_info_list)
print("Merged CPUs:", all_cpus)
# 分配进程到 CPU 核心
total_processes = 600
total_cpus = 128 # 0-127
included_cpus = all_cpus
ratio = 3 # 网卡所在cpu 1 : 其他cpu n ratio = n
# included_cpu_allocation, excluded_cpu_allocation = allocate_processes_to_cpus(total_processes, total_cpus, included_cpus, ratio)
# print("Included CPU Allocation:", included_cpu_allocation)
# print("Excluded CPU Allocation:", excluded_cpu_allocation)
process_cpu_dict = allocate_processes_to_cpus(total_processes, total_cpus, included_cpus, ratio)
print(f"process_cpu_dict: {process_cpu_dict}")
可以根据上述代码中的process_cpu_dict
来给进程绑定cpu亲和性,其中key是进程号,value是想要绑定的cpu核心。
例如:multiprocessing.Process(target=main_test, args=(process_cpu_dict[i]))
def main_test(core_id):
process_id = os.getpid()
process = psutil.Process(process_id)
process.cpu_affinity([core_id])