searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

如何让数据统计效率提升

2024-05-13 03:13:06
28
0

背景

在数据统计中,难免有大量数据需要进行网络API获取和IO写入操作,如果网络API请求返回数据较慢,如1秒才返回,如果数据量大了,比如成千上万的数据,全部进行串行执行那就需要将近半小时甚至几小时的时间,效率太低。

示例代码

import openpyxl

def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
    devoloper_name = []
    if id:
        formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
        # 发送API请求
        response = requests.get(formatted_url, headers=headers)
        data = json.loads(response.text)
        if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
            # 执行必要处理...
            return devoloper_name

def write_workload_csv(data_requirement, titles_list, output_path):
    # 创建一个新的Excel工作簿
    wb = openpyxl.Workbook()
	data_sheet_requirement = wb.create_sheet('详细需求数据')
	# 设置表头
	data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
    for item in data_requirement:
        values = []
        devoloper_name = api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"), 
                                                                            get_value_by_title_name(item, "System_WorkItemType_key"), 
                                                                            get_value_by_title_name(item, "workspaceKey"))
        values.append(devoloper_name)
        # 其他数据写入操作...
        for title in titles_list:
            values.append(get_value_by_title_name(item, title))
    
        # 在excel表写入数据
        data_sheet_requirement.append(values)
        
    # 保存工作簿
    wb.save(output_path)

以上代码在get_state_diagram_devoloper_name_by_id函数中要进行http请求,如请求需要1秒后才有返回结果,如果有1000个id串行调用函数需要就需要15分钟,这样效率十分低下,所以有必要考虑进入并发编程。

 

Python中的并发编程

在Python中,处理大量并发任务时,多线程(threading)和异步编程(asyncio)是两种常用的方法。然而,直接使用thread模块进行多线程编程可能会遇到一些问题,如全局解释器锁(GIL)的限制和线程管理的复杂性。为了更高效地利用多线程进行并发处理,Python标准库提供了concurrent.futures模块,其中的ThreadPoolExecutor类是一个强大的工具,用于简化多线程编程。

ThreadPoolExecutor简介

ThreadPoolExecutor是concurrent.futures模块中的一个类,它提供了一个高层次的接口来异步执行可调用对象。这个类会管理一个线程池,并允许你提交任务到线程池中进行异步执行。通过submit()方法,你可以提交一个可调用对象(如函数)和任何数量的参数,然后submit()会返回一个Future对象,你可以使用它来检查任务的状态或获取任务的返回值。

ThreadPoolExecutor的优势

  1. 简化线程管理:ThreadPoolExecutor提供了map和submit方法,自动管理线程池的大小和生命周期,不需要手动创建、启动或销毁线程,减少了重复创建线程的开销。
  2. 避免GIL的限制:虽然GIL仍然限制了Python线程对CPU密集型任务的并行执行,但对于I/O密集型任务(如网络请求、文件读写等),多线程仍然可以显著提高程序的吞吐量。
  3. 异步并发执行:通过使用ThreadPoolExecutor,提交任务后,线程池会处理这些任务的并发执行,而不需要等待一个任务完成后再开始下一个任务。

 

threading.Thread模块简介

threading.Thread是Python中更传统的多线程实现。它允许程序员直接控制线程的创建和管理。每个Thread对象代表一个执行线程。

 

threading.Thread优势

细粒度控制:线程的创建、启动和同步可以由程序员直接控制。

  1. 资源共享:多个线程可以共享进程资源,如内存和文件句柄。
  2. 同步机制:提供了各种同步机制,如锁、事件、条件变量和信号量。

 

ThreadPoolExecutor与thread模块的区别

thread模块是Python中用于多线程编程的低级接口。相比之下,ThreadPoolExecutor提供了更高级、更易于使用的接口。以下是它们之间的一些主要区别:

线程管理:thread模块要求开发人员手动管理线程的生命周期,而ThreadPoolExecutor则自动管理线程池。

GIL的影响:两者都受到GIL的限制,但ThreadPoolExecutor通过异步执行I/O密集型任务来更好地利用GIL。

错误处理:ThreadPoolExecutor提供了更强大的错误处理能力,通过Future.exception()方法检查任务是否引发了异常。

并发控制:ThreadPoolExecutor允许限制线程池的大小,从而控制并发任务的数量。

性能:在执行大量短生命周期的任务时,ThreadPoolExecutor可以提供更好的性能,因为它重用了线程,减少了线程创建和销毁的开销。

同步:threading.Thread提供了更多的同步原语,对于需要细粒度控制的并发任务,它可能是更好的选择。

 

代码优化

在这里,因为不需要对并发进行细粒度控制,所以选择使用更简单高效的ThreadPoolExecutor进行多线程处理。

import openpyxl
import concurrent.futures

def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
    devoloper_name = []
    if id:
        formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
        # 发送API请求
        response = requests.get(formatted_url, headers=headers)
        data = json.loads(response.text)
        if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
            # 执行必要处理...
            return devoloper_name

def process_requirement(item, titles_list):
    values = []
    devoloper_name= api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"), 
                                                                        get_value_by_title_name(item, "System_WorkItemType_key"), 
                                                                        get_value_by_title_name(item, "workspaceKey"))
    values.append(devoloper_name)
    # 写入其他统计字段数据
    for title in titles_list:
        values.append(get_value_by_title_name(item, title))
    return values

def write_workload_csv2(data_requirement, output_path):
    wb = openpyxl.Workbook()
    data_sheet_requirement = wb.create_sheet('详细需求数据')
    if data_requirement:
        # 设置表头
        data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
        # 开始多线程请求接口  data_requirement:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_requirement, item) for item in data_requirement]
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    data_sheet_requirement.append(result)

    # 保存工作簿
    wb.save(output_path)

代码加入多线程优化后,通过submit函数提交并发处理请求,在这里就是ThreadPoolExecutor被用于并发处理一组需求数据。每个需求数据被提交为一个任务到线程池中,然后异步执行。当所有任务都完成时,将结果写入Excel文件。

如请求http接口需要1秒后才有返回结果,有1000个id并行调用函数需要1分钟,比原来15分钟的效率提升15倍。

 

结论

在Python中,concurrent.futures.ThreadPoolExecutor()是一个强大的工具,它简化了并发编程,特别是在处理IO密集型任务时。它的易用性和性能优势使其成为多线程编程的首选。然而,对于需要更细粒度控制的场景,threading.Thread仍然是一个有用的选择。

在实际应用中,选择哪种方法取决于具体的需求和场景。理解这两种方法的优势和局限,可以帮助开发者更有效地利用Python进行并发编程。

0条评论
作者已关闭评论
丽如
1文章数
0粉丝数
丽如
1 文章 | 0 粉丝
丽如
1文章数
0粉丝数
丽如
1 文章 | 0 粉丝
原创

如何让数据统计效率提升

2024-05-13 03:13:06
28
0

背景

在数据统计中,难免有大量数据需要进行网络API获取和IO写入操作,如果网络API请求返回数据较慢,如1秒才返回,如果数据量大了,比如成千上万的数据,全部进行串行执行那就需要将近半小时甚至几小时的时间,效率太低。

示例代码

import openpyxl

def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
    devoloper_name = []
    if id:
        formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
        # 发送API请求
        response = requests.get(formatted_url, headers=headers)
        data = json.loads(response.text)
        if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
            # 执行必要处理...
            return devoloper_name

def write_workload_csv(data_requirement, titles_list, output_path):
    # 创建一个新的Excel工作簿
    wb = openpyxl.Workbook()
	data_sheet_requirement = wb.create_sheet('详细需求数据')
	# 设置表头
	data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
    for item in data_requirement:
        values = []
        devoloper_name = api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"), 
                                                                            get_value_by_title_name(item, "System_WorkItemType_key"), 
                                                                            get_value_by_title_name(item, "workspaceKey"))
        values.append(devoloper_name)
        # 其他数据写入操作...
        for title in titles_list:
            values.append(get_value_by_title_name(item, title))
    
        # 在excel表写入数据
        data_sheet_requirement.append(values)
        
    # 保存工作簿
    wb.save(output_path)

以上代码在get_state_diagram_devoloper_name_by_id函数中要进行http请求,如请求需要1秒后才有返回结果,如果有1000个id串行调用函数需要就需要15分钟,这样效率十分低下,所以有必要考虑进入并发编程。

 

Python中的并发编程

在Python中,处理大量并发任务时,多线程(threading)和异步编程(asyncio)是两种常用的方法。然而,直接使用thread模块进行多线程编程可能会遇到一些问题,如全局解释器锁(GIL)的限制和线程管理的复杂性。为了更高效地利用多线程进行并发处理,Python标准库提供了concurrent.futures模块,其中的ThreadPoolExecutor类是一个强大的工具,用于简化多线程编程。

ThreadPoolExecutor简介

ThreadPoolExecutor是concurrent.futures模块中的一个类,它提供了一个高层次的接口来异步执行可调用对象。这个类会管理一个线程池,并允许你提交任务到线程池中进行异步执行。通过submit()方法,你可以提交一个可调用对象(如函数)和任何数量的参数,然后submit()会返回一个Future对象,你可以使用它来检查任务的状态或获取任务的返回值。

ThreadPoolExecutor的优势

  1. 简化线程管理:ThreadPoolExecutor提供了map和submit方法,自动管理线程池的大小和生命周期,不需要手动创建、启动或销毁线程,减少了重复创建线程的开销。
  2. 避免GIL的限制:虽然GIL仍然限制了Python线程对CPU密集型任务的并行执行,但对于I/O密集型任务(如网络请求、文件读写等),多线程仍然可以显著提高程序的吞吐量。
  3. 异步并发执行:通过使用ThreadPoolExecutor,提交任务后,线程池会处理这些任务的并发执行,而不需要等待一个任务完成后再开始下一个任务。

 

threading.Thread模块简介

threading.Thread是Python中更传统的多线程实现。它允许程序员直接控制线程的创建和管理。每个Thread对象代表一个执行线程。

 

threading.Thread优势

细粒度控制:线程的创建、启动和同步可以由程序员直接控制。

  1. 资源共享:多个线程可以共享进程资源,如内存和文件句柄。
  2. 同步机制:提供了各种同步机制,如锁、事件、条件变量和信号量。

 

ThreadPoolExecutor与thread模块的区别

thread模块是Python中用于多线程编程的低级接口。相比之下,ThreadPoolExecutor提供了更高级、更易于使用的接口。以下是它们之间的一些主要区别:

线程管理:thread模块要求开发人员手动管理线程的生命周期,而ThreadPoolExecutor则自动管理线程池。

GIL的影响:两者都受到GIL的限制,但ThreadPoolExecutor通过异步执行I/O密集型任务来更好地利用GIL。

错误处理:ThreadPoolExecutor提供了更强大的错误处理能力,通过Future.exception()方法检查任务是否引发了异常。

并发控制:ThreadPoolExecutor允许限制线程池的大小,从而控制并发任务的数量。

性能:在执行大量短生命周期的任务时,ThreadPoolExecutor可以提供更好的性能,因为它重用了线程,减少了线程创建和销毁的开销。

同步:threading.Thread提供了更多的同步原语,对于需要细粒度控制的并发任务,它可能是更好的选择。

 

代码优化

在这里,因为不需要对并发进行细粒度控制,所以选择使用更简单高效的ThreadPoolExecutor进行多线程处理。

import openpyxl
import concurrent.futures

def get_state_diagram_devoloper_name_by_id(id, workItemTypeKey, product):
    devoloper_name = []
    if id:
        formatted_url = urls[product]["STATE_DIAGRAM"].format(id, workItemTypeKey)
        # 发送API请求
        response = requests.get(formatted_url, headers=headers)
        data = json.loads(response.text)
        if "msgId" in data["code"] and data["code"]["msgId"] == "SUCCESS":
            # 执行必要处理...
            return devoloper_name

def process_requirement(item, titles_list):
    values = []
    devoloper_name= api_requests.get_state_diagram_devoloper_name_by_id(get_value_by_title_name(item, "System_Id"), 
                                                                        get_value_by_title_name(item, "System_WorkItemType_key"), 
                                                                        get_value_by_title_name(item, "workspaceKey"))
    values.append(devoloper_name)
    # 写入其他统计字段数据
    for title in titles_list:
        values.append(get_value_by_title_name(item, title))
    return values

def write_workload_csv2(data_requirement, output_path):
    wb = openpyxl.Workbook()
    data_sheet_requirement = wb.create_sheet('详细需求数据')
    if data_requirement:
        # 设置表头
        data_sheet_requirement.append(["类型", "编号", "标题", "提测人", "创建人", "提测日期", "状态", "小组名称"])
        # 开始多线程请求接口  data_requirement:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_requirement, item) for item in data_requirement]
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    data_sheet_requirement.append(result)

    # 保存工作簿
    wb.save(output_path)

代码加入多线程优化后,通过submit函数提交并发处理请求,在这里就是ThreadPoolExecutor被用于并发处理一组需求数据。每个需求数据被提交为一个任务到线程池中,然后异步执行。当所有任务都完成时,将结果写入Excel文件。

如请求http接口需要1秒后才有返回结果,有1000个id并行调用函数需要1分钟,比原来15分钟的效率提升15倍。

 

结论

在Python中,concurrent.futures.ThreadPoolExecutor()是一个强大的工具,它简化了并发编程,特别是在处理IO密集型任务时。它的易用性和性能优势使其成为多线程编程的首选。然而,对于需要更细粒度控制的场景,threading.Thread仍然是一个有用的选择。

在实际应用中,选择哪种方法取决于具体的需求和场景。理解这两种方法的优势和局限,可以帮助开发者更有效地利用Python进行并发编程。

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
2
1