背景
在数据统计中,难免有大量数据需要进行网络API获取和IO写入操作,如果网络API请求返回数据较慢,如1秒才返回,如果数据量大了,比如成千上万的数据,全部进行串行执行那就需要将近半小时甚至几小时的时间,效率太低。
示例代码
import openpyxl
def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
devoloper_name = []
if id:
formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
# 发送API请求
response = requests.get(formatted_url, headers=headers)
data = json.loads(response.text)
if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
# 执行必要处理...
return devoloper_name
def write_workload_csv(data_requirement, titles_list, output_path):
# 创建一个新的Excel工作簿
wb = openpyxl.Workbook()
data_sheet_requirement = wb.create_sheet('详细需求数据')
# 设置表头
data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
for item in data_requirement:
values = []
devoloper_name = api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"),
get_value_by_title_name(item, "System_WorkItemType_key"),
get_value_by_title_name(item, "workspaceKey"))
values.append(devoloper_name)
# 其他数据写入操作...
for title in titles_list:
values.append(get_value_by_title_name(item, title))
# 在excel表写入数据
data_sheet_requirement.append(values)
# 保存工作簿
wb.save(output_path)
以上代码在get_state_diagram_devoloper_name_by_id函数中要进行http请求,如请求需要1秒后才有返回结果,如果有1000个id串行调用函数需要就需要15分钟,这样效率十分低下,所以有必要考虑进入并发编程。
Python中的并发编程
在Python中,处理大量并发任务时,多线程(threading)和异步编程(asyncio)是两种常用的方法。然而,直接使用thread模块进行多线程编程可能会遇到一些问题,如全局解释器锁(GIL)的限制和线程管理的复杂性。为了更高效地利用多线程进行并发处理,Python标准库提供了concurrent.futures模块,其中的ThreadPoolExecutor类是一个强大的工具,用于简化多线程编程。
ThreadPoolExecutor简介
ThreadPoolExecutor是concurrent.futures模块中的一个类,它提供了一个高层次的接口来异步执行可调用对象。这个类会管理一个线程池,并允许你提交任务到线程池中进行异步执行。通过submit()方法,你可以提交一个可调用对象(如函数)和任何数量的参数,然后submit()会返回一个Future对象,你可以使用它来检查任务的状态或获取任务的返回值。
ThreadPoolExecutor的优势
- 简化线程管理:ThreadPoolExecutor提供了map和submit方法,自动管理线程池的大小和生命周期,不需要手动创建、启动或销毁线程,减少了重复创建线程的开销。
- 避免GIL的限制:虽然GIL仍然限制了Python线程对CPU密集型任务的并行执行,但对于I/O密集型任务(如网络请求、文件读写等),多线程仍然可以显著提高程序的吞吐量。
- 异步并发执行:通过使用ThreadPoolExecutor,提交任务后,线程池会处理这些任务的并发执行,而不需要等待一个任务完成后再开始下一个任务。
threading.Thread模块简介
threading.Thread是Python中更传统的多线程实现。它允许程序员直接控制线程的创建和管理。每个Thread对象代表一个执行线程。
threading.Thread的优势
细粒度控制:线程的创建、启动和同步可以由程序员直接控制。
- 资源共享:多个线程可以共享进程资源,如内存和文件句柄。
- 同步机制:提供了各种同步机制,如锁、事件、条件变量和信号量。
ThreadPoolExecutor与thread模块的区别
thread模块是Python中用于多线程编程的低级接口。相比之下,ThreadPoolExecutor提供了更高级、更易于使用的接口。以下是它们之间的一些主要区别:
线程管理:thread模块要求开发人员手动管理线程的生命周期,而ThreadPoolExecutor则自动管理线程池。
GIL的影响:两者都受到GIL的限制,但ThreadPoolExecutor通过异步执行I/O密集型任务来更好地利用GIL。
错误处理:ThreadPoolExecutor提供了更强大的错误处理能力,通过Future.exception()方法检查任务是否引发了异常。
并发控制:ThreadPoolExecutor允许限制线程池的大小,从而控制并发任务的数量。
性能:在执行大量短生命周期的任务时,ThreadPoolExecutor可以提供更好的性能,因为它重用了线程,减少了线程创建和销毁的开销。
同步:threading.Thread提供了更多的同步原语,对于需要细粒度控制的并发任务,它可能是更好的选择。
代码优化
在这里,因为不需要对并发进行细粒度控制,所以选择使用更简单高效的ThreadPoolExecutor进行多线程处理。
import openpyxl
import concurrent.futures
def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
devoloper_name = []
if id:
formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
# 发送API请求
response = requests.get(formatted_url, headers=headers)
data = json.loads(response.text)
if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
# 执行必要处理...
return devoloper_name
def process_requirement(item, titles_list):
values = []
devoloper_name= api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"),
get_value_by_title_name(item, "System_WorkItemType_key"),
get_value_by_title_name(item, "workspaceKey"))
values.append(devoloper_name)
# 写入其他统计字段数据
for title in titles_list:
values.append(get_value_by_title_name(item, title))
return values
def write_workload_csv2(data_requirement, output_path):
wb = openpyxl.Workbook()
data_sheet_requirement = wb.create_sheet('详细需求数据')
if data_requirement:
# 设置表头
data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
# 开始多线程请求接口 data_requirement:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_requirement, item) for item in data_requirement]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
data_sheet_requirement.append(result)
# 保存工作簿
wb.save(output_path)
代码加入多线程优化后,通过submit函数提交并发处理请求,在这里就是ThreadPoolExecutor被用于并发处理一组需求数据。每个需求数据被提交为一个任务到线程池中,然后异步执行。当所有任务都完成时,将结果写入Excel文件。
如请求http接口需要1秒后才有返回结果,有1000个id并行调用函数需要1分钟,比原来15分钟的效率提升15倍。
结论
在Python中,concurrent.futures.ThreadPoolExecutor()是一个强大的工具,它简化了并发编程,特别是在处理IO密集型任务时。它的易用性和性能优势使其成为多线程编程的首选。然而,对于需要更细粒度控制的场景,threading.Thread仍然是一个有用的选择。
在实际应用中,选择哪种方法取决于具体的需求和场景。理解这两种方法的优势和局限,可以帮助开发者更有效地利用Python进行并发编程。