searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

python学习笔记-task取消和子协程调用

2023-09-09 10:38:42
6
0

1 利用future.loop取消事件循环

run_forever不会停止,那么loop.run_until_complete是怎样停止的。

asyncio/base_envents.py

def run_until_complete(self, future):
    """运行直到Future完成。
    如果参数是一个协程,它会被包装成一个Task。
    警告:不要多次调用run_until_complete(),否则会包装成不同的Task,这可能会导致问题。
    返回Future的结果,或者引发它的异常。
    """
    self._check_closed()  # 检查事件循环是否已关闭
​
    new_task = not futures.isfuture(future)  # 检查future是否是一个新任务
    future = tasks.ensure_future(future, loop=self)  # 确保将future包装成一个任务
    if new_task:
        # 如果future是一个新任务,那么不需要记录“销毁挂起任务”的消息
        future._log_destroy_pending = False
​
    future.add_done_callback(_run_until_complete_cb)  # 添加一个回调函数,用于在Future完成时执行
    try:
        self.run_forever()  # 运行事件循环直到完成
    except:
        if new_task and future.done() and not future.cancelled():
            # 如果协程引发了BaseException,消耗异常以避免记录警告,调用者无法访问本地任务。
            future.exception()
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)  # 移除回调函数
    if not future.done():
        raise RuntimeError('事件循环在Future完成之前停止。')
​
    return future.result()  # 返回Future的结果

其中在future结束后回调_run_until_complete_cb

def _run_until_complete_cb(fut):
    exc = fut._exception  # 获取Future对象的异常信息

    if (isinstance(exc, BaseException)
    and not isinstance(exc, Exception)):
        # 如果异常是BaseException的子类但不是Exception的子类,
        # 这通常表示一个不应该停止事件循环的情况(例如,SystemExit)。
        # 在这种情况下,不需要停止事件循环。
        return

    fut._loop.stop()  # 停止事件循环

可以看到run_until_complete的主要逻辑也是run_forever,通过对Future添加回调函数,实现在执行完成后停止的效果。实际上loop是future的一个属性,这样的话在任何一个future或者task当中, loop都可以被停止掉。(同时,future也注册到loop中形成了一个环状,容易引起循环引用)

2 取消future/task

import asyncio

# 定义一个异步函数 get_html,用于模拟下载网页
async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after {}s".format(sleep_times))

if __name__ == "__main__":
    # 创建三个不同的协程任务,分别休眠 2 秒、3 秒和 3 秒
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    # 将任务添加到任务列表中
    tasks = [task1, task2, task3]

    # 获取事件循环对象
    loop = asyncio.get_event_loop()
    try:
        # 启动协程任务的执行,等待所有任务完成
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        # 处理用户中断程序的情况
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            # 尝试取消每个任务
            print("cancel task")
            print(task.cancel())
        # 停止事件循环
        loop.stop()
        # 保持事件循环处于运行状态,等待任务完成,没有这一句会抛异常
        loop.run_forever()
    finally:
        # 关闭事件循环
        loop.close()

这段代码演示了如果用户中断程序,如何取消任务并在取消后关闭事件循环。这里使用 asyncio.Task.all_tasks方法获得了全部的task,那么这是如何实现的。

asyncio\tasks.py

@classmethod
def all_tasks(cls, loop=None):
    """    
    返回一个事件循环中的所有任务的集合。
    默认情况下,返回当前事件循环中的所有任务。
    """
    if loop is None:
        loop = events.get_event_loop()
    return {t for t in cls._all_tasks if t._loop is loop}

3 携程嵌套

以python 官方文档的例子描述嵌套协程的执行流程

import asyncio

async def compute(x, y):
     print("Compute {} + {}...".format(x, y))
     await asyncio.sleep(2.0)
     return x + y

async def print_sum(x, y):
     result = await compute(x, y)
     print("{} + {} = {}".format(x, y, result))
     
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

对应的时序图如下:

img

这里有两个注意的点:

  1. Task和compute之间建立的一个通道,跳过print_sum(委托方)直接从 compute返回给Task;
  2. 通过StopIteration获得rerurn的值。
0条评论
0 / 1000