mmmm123 发表于 2025-2-7 02:35:23

[python] asyncio库常见问题与实践案例

本文详细介绍了在使用asyncio库编写异步程序时常见的错误和问题,并进一步通过实践案例进行分析和讨论,以便在项目中更有效地应用asyncio库。有关asyncio库的详细介绍,可参考:Python 异步编程库 asyncio 使用指北。

目录

[*]1 asyncio程序的常见错误

[*]1.1 试图直接调用并运行协程
[*]1.2 主协程过早退出
[*]1.3 错误使用asyncio的低级API
[*]1.4 程序出现竞争条件或死锁问题

[*]1.4.1 竞争条件问题
[*]1.4.2 死锁问题


[*]2 asyncio程序的常见问题

[*]2.1 任务的等待、停止、结果获取

[*]2.1.1 如何等待任务
[*]2.1.2 何时停止任务
[*]2.1.3 如何获取任务的返回值

[*]2.2 如何在后台运行和等待任务

[*]2.2.1 如何在后台运行任务
[*]2.2.2 如何等待所有后台任务

[*]2.3 任务的延迟后运行和后续运行

[*]2.3.1 任务的延迟后运行
[*]2.3.2 任务的后续运行

[*]2.4 如何显示运行任务的进度

[*]2.4.1 基于回调函数的任务进度显示
[*]2.4.2 基于tqdm库的任务进度显示

[*]2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数

[*]2.5.1 使用 asyncio.to_thread()
[*]2.5.2 使用 loop.run_in_executor()

[*]2.6 Python协程:操作系统原生支持吗

[*]3 应用实例

[*]3.1 在基于线程的程序中调用asyncio代码
[*]3.2 基于asyncio实现多核异步处理
[*]3.3 图片下载器
[*]3.4 生产者消费者模型

[*]4 参考


1 asyncio程序的常见错误

本节展示了在使用asyncio模块时,开发人员常遇到的一些常见错误示例。以下是四个最常见的异步编程错误:

[*]直接调用并运行协程。
[*]主协程过早退出。
[*]错误使用asyncio的低级API。
[*]程序出现竞争条件或死锁问题。
1.1 试图直接调用并运行协程

协程通常通过async def定义,如下所示:
# 自定义协程async def custom_coro():    print('hi there')若直接像函数一样调用该协程,通常不会执行预期的操作,而是创建一个协程对象。这种调用方式不会触发协程的执行:
# 错误:像函数一样调用协程custom_coro()# 这只是创建了一个协程对象,并不会执行此时,返回的是一个协程对象,而不是立即执行协程主体,这忽略协程必须在事件循环中运行。如果协程未被执行,系统将发出以下运行时警告:
sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited要正确执行协程,需要在asyncio事件循环中等待该对象。例如,使用asyncio.run()启动事件循环来执行协程:
# 正确:通过 asyncio.run() 运行协程import asyncioasyncio.run(custom_coro()) 另一种执行协程方法是通过await表达式在现有协程中挂起并调度其他协程。例如,定义一个新的协程,在其中调用 custom_coro():
# 正确:在协程中使用 await 调度另一个协程async def main():    await custom_coro() # 使用 asyncio.run 启动事件循环asyncio.run(main()) 1.2 主协程过早退出

在异步编程中,任务的执行可能无法按预期及时完成。通过asyncio.create_task()可以并行运行多个协程,但如果主协程提前退出,这些任务可能会被强制中止。为确保所有任务能够在主协程退出前完成,主协程应在无其他活动时显式等待剩余任务的完成。可以使用asyncio.all_tasks()来获取当前事件循环中的所有任务,并在移除主协程本身后,通过asyncio.wait()等待其他任务的执行结果。如果不移除当前协程,asyncio.wait会等待所有任务完成,包括当前协程,从而导致程序不退出(死锁)。示例如下:
import asyncioasync def task_1():    print("任务 1 开始")    await asyncio.sleep(2)    print("任务 1 完成")async def task_2():    print("任务 2 开始")    await asyncio.sleep(1)    print("任务 2 完成")async def main():    # 创建多个任务    task1 = asyncio.create_task(task_1())    task2 = asyncio.create_task(task_2())      # 获取所有正在运行的任务的集合    all_tasks = asyncio.all_tasks()      # 获取当前任务(即主协程)    current_task = asyncio.current_task()      # 从所有任务列表中删除当前任务    all_tasks.remove(current_task)      # 暂停直到所有任务完成    await asyncio.wait(all_tasks)# 运行主协程asyncio.run(main())代码运行结果为:
任务 1 开始任务 2 开始任务 2 完成任务 1 完成1.3 错误使用asyncio的低级API

asyncio提供了两类API:一类是面向应用程序开发者的高级API,另一类是面向框架开发者的低级API。低级API主要为高级API提供底层支持,如事件循环、传输协议等内部结构。在大多数情况下,推荐优先使用高级API,特别是在学习阶段。只有在需要实现特定功能时,才应考虑使用低级API。尽管学习低级API具有一定的价值,但不应在刚开始时就使用。建议先通过高级API熟悉异步编程的基本概念,进行应用开发,掌握核心知识后,再深入探讨技术细节。例如:
import asyncio# 高级API:推荐的用法async def hello_world():    print("你好,世界!")# 使用 asyncio.run 来启动事件循环def run_hello_world():    asyncio.run(hello_world())# 低级API:不推荐直接使用async def low_level_example():    loop = asyncio.get_event_loop()# 获取当前事件循环    task = loop.create_task(hello_world())# 创建任务    await task# 显式等待任务完成# 运行高级 API 示例print("使用 asyncio.run 运行:")run_hello_world()# 运行低级 API 示例print("\n使用低级 API 运行:")asyncio.run(low_level_example())1.4 程序出现竞争条件或死锁问题

竞争条件和死锁是并发编程中常见的错误。竞争条件发生在多个任务同时访问相同资源时,缺乏适当的控制可能导致数据错误或丢失。死锁则是指不同任务互相等待对方释放资源,最终导致所有任务无法继续执行。
许多Python开发者认为,使用asyncio协程可以避免这些问题,因为在任何时刻,事件循环中只有一个协程在执行。然而,协程在运行过程中可能会暂停和恢复,并且可能会访问共享资源。如果对这些资源没有适当的保护,就可能会引发竞争条件。此外,在协程同步资源时处理不当,也有可能导致死锁。因此,在编写asyncio程序时,确保协程的安全性至关重要。
1.4.1 竞争条件问题

以下示例代码模拟了两个异步任务并行增加共享变量counter,每个任务循环10000次对counter进行递增操作。通过awaitasyncio.sleep(0)来模拟上下文切换,确保两个任务能够交替执行。然而,由于未使用同步机制(如锁),会导致竞态条件。因此,最终的counter值可能小于预期的20000,而不是20000,因为两个任务可能在读取和更新counter的值时发生冲突,导致多个协程可能重复更新相同的数据:
import asyncio# 共享资源counter = 0async def increment():    global counter    for _ in range(10000):      temp = counter      temp += 1      await asyncio.sleep(0)# 让出控制权,模拟上下文切换      counter = tempasync def main():    tasks =     await asyncio.gather(*tasks)    print("最终计数器的值:", counter)# 运行 asyncio 程序asyncio.run(main())代码运行结果为:
最终计数器的值: 10000为了解决这个问题,可以使用 asyncio.Lock 来同步对共享资源 counter 的访问。然而,由于asyncio.Lock与asyncio.run之间的事件循环可能不匹配,通常会在某些环境中(如特定的 IDE 或脚本运行环境)出现问题。原因在于asyncio.run 创建并管理一个新的事件循环,而锁 (asyncio.Lock) 可能会被不同的事件循环使用,从而导致不一致。为避免这种情况,可以显式创建并使用一个事件循环,如下所示:
import asyncio# 共享资源counter = 0# 创建锁lock = asyncio.Lock()async def increment():    global counter    for _ in range(10000):      async with lock:# 确保在修改 counter 时,只有一个任务可以访问            temp = counter            temp += 1            await asyncio.sleep(0)# 让出控制权,模拟上下文切换            counter = tempasync def main():    tasks =     await asyncio.gather(*tasks)    print("最终计数器的值:", counter)# 显式创建事件循环并运行loop = asyncio.get_event_loop()loop.run_until_complete(main())代码运行结果为:
最终计数器的值: 200001.4.2 死锁问题

死锁介绍
死锁(Deadlock)是并发编程中的一种常见问题,它发生在多个任务之间的资源争用中,导致所有任务都陷入无法继续执行的僵局。即使在Python中使用asyncio协程框架,资源竞争和同步问题也可能导致死锁的发生,尤其是在协程需要同步资源(如锁)时。如果同步机制设计不当,容易引发死锁。
死锁的特征如下:

[*]循环等待:多个任务之间相互等待对方释放资源,从而形成一个循环等待的关系。例如,任务1等待任务2释放资源,而任务2又在等待任务1释放资源,形成闭环。
[*]不可抢占:每个任务持有的资源(如锁)不能被其他任务强制抢占。只有在任务主动释放资源时,其他任务才能获取该资源。
[*]持有资源且等待:任务持有某些资源(如锁),同时又在等待其他资源的释放。由于任务在持有资源的情况下无法继续执行,导致系统中的任务无法前进。
以下代码中的死锁是典型的循环等待问题,所有相关任务陷入相互等待的死循环,无法继续执行:
import asyncio# 创建两个共享锁lock1 = asyncio.Lock()lock2 = asyncio.Lock()async def task1():    print("任务1:尝试获取锁1")    await lock1.acquire()# 获取锁1    print("任务1:已获取锁1,尝试获取锁2")    await asyncio.sleep(1)# 模拟一些操作    await lock2.acquire()# 获取锁2    print("任务1:已获取锁2")      # 释放锁    lock1.release()    lock2.release()async def task2():    print("任务2:尝试获取锁2")    await lock2.acquire()# 获取锁2    print("任务2:已获取锁2,尝试获取锁1")    await asyncio.sleep(1)# 模拟一些操作    await lock1.acquire()# 获取锁1    print("任务2:已获取锁1")      # 释放锁    lock1.release()    lock2.release()async def main():    # 启动两个任务    await asyncio.gather(task1(), task2())# 创建事件循环并运行loop = asyncio.get_event_loop()loop.run_until_complete(main())代码运行结果如下,由于两个任务都被挂起,程序无法退出,且永远不会打印出"任务1:已获取锁2"或"任务2:已获取锁1":
任务1:尝试获取锁1任务1:已获取锁1,尝试获取锁2任务2:尝试获取锁2任务2:已获取锁2,尝试获取锁1...asyncio中死锁的避免
在使用asyncio时,为了避免死锁,可以采取以下几种方法:

[*]锁的顺序管理:确保所有任务按照相同的顺序获取锁,以防止发生相互等待的情况。
[*]尝试获取锁:使用asyncio.Lock的acquire方法并设置超时时间,避免任务长时间处于等待锁的状态。
[*]使用async with:通过async with语句来管理锁,这样可以确保在任务完成后自动释放锁,避免因忘记释放锁而引发问题。
根据这一思路,前面死锁的案例解决示例代码如下:
import asyncio# 创建两个共享锁lock1 = asyncio.Lock()lock2 = asyncio.Lock()async def task1():    print("任务1:尝试获取锁1")    async with lock1:# 使用async with获取锁,自动释放      print("任务1:已获取锁1,尝试获取锁2")      await asyncio.sleep(1)# 模拟一些操作                print("任务1:尝试获取锁2")      async with lock2:# 使用async with获取锁,自动释放            print("任务1:已获取锁2")async def task2():    print("任务2:尝试获取锁1")    async with lock1:# 使用async with获取锁,自动释放      print("任务2:已获取锁1,尝试获取锁2")      await asyncio.sleep(1)# 模拟一些操作                print("任务2:尝试获取锁2")      async with lock2:# 使用async with获取锁,自动释放            print("任务2:已获取锁2")async def main():    # 启动两个任务    await asyncio.gather(task1(), task2())# 创建事件循环并运行loop = asyncio.get_event_loop()loop.run_until_complete(main())代码运行结果如下,可以看到两个任务避免了死锁:
任务1:尝试获取锁1任务1:已获取锁1,尝试获取锁2任务2:尝试获取锁1任务1:尝试获取锁2任务1:已获取锁2任务2:已获取锁1,尝试获取锁2任务2:尝试获取锁2任务2:已获取锁22 asyncio程序的常见问题

在使用asyncio编写异步程序时,开发者可能会遇到一系列常见问题,这些问题涉及到任务的管理、执行流程、性能优化等多个方面。以下是一些常见的问题和挑战:

[*]任务的等待、停止、结果获取
[*]如何在后台运行和等待任务
[*]任务的延迟后运行和后续运行
[*]如何显示运行任务的进度
[*]如何在asyncio中执行阻塞I/O或CPU密集型函数
[*]Python协程:操作系统原生支持吗
2.1 任务的等待、停止、结果获取

2.1.1 如何等待任务

可以通过直接等待asyncio.Task对象来等待任务的完成:
# 等待任务完成await task也同时创建并等待任务完成。例如:
# 创建并等待任务完成await asyncio.create_task(custom_coro())与协程不同,任务可以多次等待而不会引发错误。以下是一个演示如何多次等待同一任务的示例,在此例中,await task两次都能成功执行,因为task已经完成并保存了返回值:
import asyncioasync def other_coro():    await asyncio.sleep(1)    return "任务完成"async def main():    # 将协程包装在任务中并安排其执行    task = asyncio.create_task(other_coro())      # 第一次等待任务并获取返回值    value1 = await task    print(value1)      # 再次等待任务(任务已经完成)    value2 = await task    print(value2)# 运行主协程asyncio.run(main())2.1.2 何时停止任务

可以通过asyncio.Task对象的cancel()方法取消任务。若任务被成功取消,cancel()方法返回True,否则返回False。例如:
# 取消任务was_cancelled = task.cancel()2.1.3 如何获取任务的返回值

在Python中创建一个asyncio任务后,有两种方法可以从 asyncio.Task 中检索返回值:

[*]等待任务(使用 await)。
[*]调用 result() 方法。
基于await函数,等待任务时,调用者会挂起,直到任务完成并返回结果。如果任务已完成,返回值会立即提供。以下代码展示了如何等待任务并获取其返回值:
import asyncioasync def other_coro():    await asyncio.sleep(1)    return "任务完成"async def main():    # 将协程包装在任务中并安排其执行    task = asyncio.create_task(other_coro())      # 等待任务完成并获取返回值    value = await task    print(value)# 运行主协程asyncio.run(main())也可以通过调用 asyncio.Task 对象的 result() 方法获取任务的返回值。此时要求任务已完成。如果任务未完成,调用 result() 会引发 InvalidStateError 异常。如果任务被取消,则会引发 CancelledError 异常。以下是一个使用 result() 方法的例子:
import asyncioasync def other_coro():    await asyncio.sleep(1)    return "任务完成"async def main():    task = asyncio.create_task(other_coro())      # 等待任务完成    await task      try:      # 获取任务的返回值      value = task.result()      print(value)    except asyncio.InvalidStateError:      print("任务尚未完成")    except asyncio.CancelledError:      print("任务已取消")# 运行主协程asyncio.run(main())2.2 如何在后台运行和等待任务

2.2.1 如何在后台运行任务

通过 asyncio.create_task()可以将协程封装为Task对象,并在后台执行。创建的任务对象会立即返回,且不会阻塞调用者的执行。为了确保任务能够开始执行,可以使用 await asyncio.sleep(0) 暂停片刻。之所以使用 await asyncio.sleep(0),是因为新创建的任务并不会立刻开始执行。事件循环负责管理多个任务,它会根据调度策略决定哪个任务优先执行。通过 await asyncio.sleep(0) 暂时让出执行权,使得事件循环有机会调度并执行刚刚创建的任务。这样,await asyncio.sleep(0) 确保了任务在创建后能尽早开始执行,同时不会阻塞主协程的其他操作。示例代码如下:
import asyncioasync def other_coroutine():    print("开始执行 other_coroutine")    await asyncio.sleep(2)    print("other_coroutine 执行完毕")async def main():    # 创建并调度任务    task = asyncio.create_task(other_coroutine())      # 暂停片刻以确保任务开始执行    await asyncio.sleep(0)      print("主协程正在执行")      # 等待任务完成    await task    print("任务执行完毕")# 运行主协程asyncio.run(main())此外,后台任务可以在程序运行时执行,不会妨碍主程序的结束。如果主程序没有其他待执行的任务,而后台任务仍在进行中,那么需要确保程序在后台任务完成后才会完全退出。
2.2.2 如何等待所有后台任务

在使用asyncio时,可能需要等待多个独立的任务完成。比如,当多个任务同时运行时,有时想要等待所有任务完成,但又不想一直阻塞当前正在运行的任务。为了实现这个功能,可以通过以下步骤:

[*]获取所有当前任务:使用asyncio.all_tasks()可以获取到当前事件循环中的所有任务。
[*]排除当前任务:通过asyncio.current_task()获取当前正在运行的任务,并将其从任务集合中移除。这样可以避免等待当前任务自己。
[*]等待所有剩余任务完成:使用asyncio.wait()来等待所有任务完成,直到它们都执行完毕。
示例代码如下:
import asyncioasync def example_coroutine(name):    # 这是一个模拟任务的协程,睡眠 1 秒钟    await asyncio.sleep(1)    print(f"任务 {name} 完成。")async def main():    # 创建多个协程任务    tasks =       # 获取所有正在运行的任务    all_tasks = asyncio.all_tasks()      # 获取当前正在运行的任务(即 main 协程)    current_task = asyncio.current_task()      # 从任务集合中移除当前任务    all_tasks.remove(current_task)      # 等待所有其他任务完成    await asyncio.wait(all_tasks)# 启动事件循环并执行主协程asyncio.run(main())2.3 任务的延迟后运行和后续运行

2.3.1 任务的延迟后运行

想要实现任务的延迟后运行,可以通过开发一个自定义的包装协程,使其在延迟指定时间后执行目标协程。该包装协程接受两个参数:目标协程和延迟时间(单位为秒)。它会先休眠指定的延迟时间,然后执行传入的目标协程。
以下代码展示了如何通过自定义包装协程 delay,在指定的延迟时间后执行目标协程。delay 协程通过 asyncio.sleep() 实现延时,随后再执行传入的目标协程。可以在不同场景中使用该方法,如直接挂起协程或将任务安排为独立执行:
import asyncio# 延迟几秒后启动另一个协程的包装协程async def delay(coro, seconds):    """    延迟指定时间(秒)后执行目标协程。    参数:    coro: 要执行的目标协程    seconds: 延迟时间,单位为秒    """    # 暂停指定时间(以秒为单位)    await asyncio.sleep(seconds)    # 执行目标协程    await coro# 示例目标协程async def my_coroutine():    print("目标协程开始执行")    # 模拟一些工作    await asyncio.sleep(2)    print("目标协程执行完成")# 使用包装协程时,可以创建协程对象并直接等待,或将其作为任务独立执行# 1. 调用者可以挂起并调度延迟后的协程async def main():    print("延迟10秒后执行目标协程:")    await delay(my_coroutine(), 10)    print("目标协程已经完成执行")# 2. 或者调用者可以安排延迟协程独立运行async def schedule_task():    print("将目标协程安排为独立任务,延迟10秒后执行")    task = asyncio.create_task(delay(my_coroutine(), 10))    await task# 等待任务完成    print("任务已完成")# 运行示例if __name__ == "__main__":    asyncio.run(main())# 运行主协程    # 或者运行独立任务的调度    # asyncio.run(schedule_task())2.3.2 任务的后续运行

在asyncio中,触发后续任务的方式主要有三种:

[*]通过已完成的任务本身调度后续任务
[*]通过任务发起方调度后续任务
[*]使用回调函数自动调度后续任务
逐一分析这三种方式:
1. 通过已完成的任务本身调度后续任务
已完成的任务可以触发后续任务的调度,通常依赖于某些状态检查来决定是否应该发起后续任务。任务调度可以通过asyncio.create_task()来完成。示例代码展示了运行指定任务后直接调度后续任务:
import asyncioasync def task():    print("任务开始执行。")    await asyncio.sleep(2)# 模拟任务执行    print("任务执行完成。")    await followup_task()# 在任务完成后直接调度后续任务async def followup_task():    print("正在执行后续任务。")    await asyncio.sleep(2)# 模拟后续任务执行    print("后续任务执行完成。")# 启动事件循环,执行任务async def main():    await task()asyncio.run(main())2. 通过任务发起方调度后续任务
任务发起方可以根据实际需要决定是否继续启动后续任务。在启动第一个任务时,可以保留 asyncio.Task 对象,通过检查任务的结果或状态,来判断是否启动后续任务。任务发起方还可以选择等待后续任务完成,也可以选择不等待。示例代码如下:
import asyncioasync def task():    # 模拟一个任务    await asyncio.sleep(1)    return True# 假设任务成功完成,返回Trueasync def followup_task():    # 模拟后续任务    await asyncio.sleep(1)    print("后续任务执行")async def main():    # 发起并等待第一个任务    task_1 = asyncio.create_task(task())      # 等待第一个任务完成    result = await task_1      # 检查任务结果    if result:      # 发起后续任务      await followup_task()# 运行主程序asyncio.run(main())3. 使用回调函数自动调度后续任务
在任务发起时,可以为其注册一个回调函数。该回调函数会在任务完成后自动执行。回调函数接收一个 asyncio.Task 对象作为参数,但它不会等待后续任务的执行。因为回调函数通常是普通的Python函数,无法进行异步操作。示例代码:
import asyncio# 定义回调函数def callback(task):    # 安排并启动后续任务    # 注意:这里不能直接使用 await,需通过 create_task 调度异步任务    asyncio.create_task(followup())# 定义第一个异步任务async def work():    print("工作任务正在执行...")    await asyncio.sleep(2)# 模拟一些异步操作    print("工作任务完成!")# 定义后续异步任务async def followup():    print("后续任务正在执行...")    await asyncio.sleep(1)# 模拟一些异步操作    print("后续任务完成!")# 创建事件循环并运行任务async def main():    # 发起任务并注册回调函数    task = asyncio.create_task(work())    task.add_done_callback(callback)    # 等待任务完成    await task    # 确保后续任务完成    await asyncio.sleep(1)# 等待回调任务完成的时间# 执行事件循环asyncio.run(main())2.4 如何显示运行任务的进度

2.4.1 基于回调函数的任务进度显示

每个任务的回调函数可用于显示进度。asyncio.Task 对象支持注册回调函数,这些函数会在任务完成时被调用,无论是正常完成还是以异常结束。回调函数是普通函数而非协程,且接受与其关联的 asyncio.Task 对象作为参数。通过为所有任务注册相同的回调函数,可以统一报告任务进度:
import asyncio# 回调函数,用于显示任务完成的进度,区分任务def progress(task):    task_name = task.get_name()# 获取任务的名称    print(f"任务 {task_name} 完成。")async def example_task(n, task_name):    """模拟一个异步任务,表示处理n秒的任务,并设置任务名称"""    await asyncio.sleep(n)    return task_nameasync def main():    # 定义多个异步任务并添加回调函数    tasks = []    for i in range(1, 6):      task_name = f"Task-{i}"# 为每个任务分配一个唯一名称      task = asyncio.create_task(example_task(i, task_name))# 创建任务,模拟不同的执行时间      task.set_name(task_name)# 设置任务名称      # 为任务添加回调函数,回调函数会在相应任务执行完毕时被调用      task.add_done_callback(progress)          tasks.append(task)    # 等待所有任务完成    await asyncio.gather(*tasks)# 运行主程序asyncio.run(main())2.4.2 基于tqdm库的任务进度显示

使用tqdm库显示任务总体进度
以下代码演示了如何结合tqdm库和asyncio库,来展示异步任务的总体执行进度:
import asynciofrom tqdm.asyncio import tqdmasync def example_task(n, task_name):    """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""    await asyncio.sleep(n)# 模拟任务处理时间    return task_name# 返回任务名称async def main():    # 定义多个异步任务并使用 tqdm 显示进度    tasks = []    total_tasks = 5# 总任务数    task_durations = # 每个任务的持续时间(秒)    # 使用 tqdm 创建进度条,`total` 为任务的数量    progress_bar = tqdm(total=total_tasks, desc="已完成任务数", ncols=100)    # 创建任务    for i, n in enumerate(task_durations):      task_name = f"Task-{i+1}"# 为每个任务分配一个唯一名称      task = asyncio.create_task(example_task(n, task_name))# 创建任务,模拟不同的执行时间      tasks.append(task)    # 等待任务完成并更新进度条    for task in asyncio.as_completed(tasks):      await task# 等待每个任务完成      progress_bar.update(1)# 每完成一个任务,更新进度条    progress_bar.close()# 关闭进度条# 运行主程序asyncio.run(main())使用tqdm库为多个任务设置单独进度条
以下示例代码演示了如何使用asyncio并行执行多个异步任务,同时通过tqdm库为每个任务单独显示进度条:
import asynciofrom tqdm.asyncio import tqdm async def example_task(n, task_name, progress_bar):    """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""    for _ in range(n):# 每秒更新一次进度      await asyncio.sleep(1)# 模拟任务处理时间      progress_bar.update(1)# 更新当前任务的进度    return task_name# 返回任务名称async def main():    # 定义多个异步任务并使用 tqdm 显示进度    tasks = []    total_tasks = 5# 总任务数    task_durations = # 每个任务的持续时间(秒)    # 创建进度条并为每个任务单独设置    progress_bars = []    for i, n in enumerate(task_durations):      task_name = f"Task-{i+1}"# 为每个任务分配一个唯一名称      progress_bar = tqdm(total=n, desc=task_name, ncols=100, position=i)# 创建任务对应的进度条      progress_bars.append(progress_bar)      task = asyncio.create_task(example_task(n, task_name, progress_bar))# 创建任务      tasks.append(task)    # 等待任务完成    await asyncio.gather(*tasks)# 使用 asyncio.gather 同时等待所有任务完成    # 关闭所有进度条    for progress_bar in progress_bars:      progress_bar.close()# 运行主程序asyncio.run(main())2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数

在编程中,“阻塞调用”指的是某些操作(例如读取文件、等待网络请求或执行数据库查询等)需要一定时间才能完成。在执行这些操作时,程序会暂停,无法继续处理其他任务,这就是“阻塞”。另外,CPU密集型操作也可能会导致程序阻塞。因此,为了在异步环境中仍然能够处理阻塞调用,asyncio模块提供了两种方法来在异步程序中执行阻塞调用:

[*]asyncio.to_thread() :此方法简化了线程管理流程,特别适合处理大多数I/O密集型任务。它允许将阻塞调用委派给一个线程,从而避免阻塞主事件循环。
[*]loop.run_in_executor() :此方法提供了更高的灵活性,支持使用自定义的执行器,比如线程池或进程池。这适用于需要精细控制执行环境的场景。
这两种方法均可有效地将阻塞调用转为异步任务,以下逐一分析这两种方式:
2.5.1 使用 asyncio.to_thread()

asyncio.to_thread() 是一个高级 API,适用于大多数应用场景。它会将指定的函数和参数提交到一个独立的线程中执行,并返回一个可等待的协程。这样,阻塞操作就可以在后台线程池中执行,而不会阻塞事件循环。需要注意的是,任务并不会立即执行,而是会等待事件循环空闲时再开始执行。由于 asyncio.to_thread() 会在后台创建一个 ThreadPoolExecutor 来处理阻塞任务,因此它特别适合 I/O 密集型的操作。示例代码如下:
import asyncioimport timedef blocking_task(task_id):    # 模拟一个耗时的阻塞操作    time.sleep(2)    return f"任务 {task_id} 完成"# 同步执行多个任务def sync_main():    start_time = time.time()      # 顺序执行多个阻塞任务    results =       end_time = time.time()      for result in results:      print(result)      print(f"同步任务执行时间: {end_time - start_time:.4f} 秒")# 异步运行多个阻塞任务async def async_main():    start_time = time.time()      # 使用 asyncio.to_thread 来并发运行多个阻塞任务    tasks =     results = await asyncio.gather(*tasks)      end_time = time.time()      for result in results:      print(result)      print(f"异步任务执行时间: {end_time - start_time:.4f} 秒")# 执行同步任务print("同步执行开始:")sync_main()# 执行异步任务print("\n异步执行开始:")asyncio.run(async_main())以上代码展示了同步执行阻塞任务与异步执行阻塞任务的对比。通过使用asyncio.to_thread(),I/O 密集型操作的处理被委托给独立的线程池,从而避免了阻塞事件循环,显著提升了异步任务的效率:

[*]同步执行:在 sync_main() 中,多个阻塞任务按顺序逐一执行,每个任务需等待前一个任务完成后才能开始,整体执行时间为所有任务总时间(即 5 * 2 秒)。
[*]异步执行:在 async_main() 中,多个阻塞任务并发执行。尽管每个任务仍然是阻塞的,但它们在后台线程中并行处理,因此总执行时间仅为单个任务的执行时间(即约 2 秒)。
代码运行结果如下:
同步执行开始:任务 0 完成任务 1 完成任务 2 完成任务 3 完成任务 4 完成同步任务执行时间: 10.0317 秒异步执行开始:任务 0 完成任务 1 完成任务 2 完成任务 3 完成任务 4 完成异步任务执行时间: 2.0089 秒2.5.2 使用 loop.run_in_executor()

loop.run_in_executor()是asyncio提供的低级API,需先获取事件循环(例如,使用asyncio.get_running_loop())。该函数允许指定执行器(默认是ThreadPoolExecutor)以及要执行的函数。
与asyncio.to_thread()相比,run_in_executor()提供了更大的灵活性,支持使用自定义执行器,而不仅限于线程池。此外,调用该函数后,任务会立即开始执行,无需等待返回的可等待对象来触发任务的启动。
示例代码如下:
import asyncioimport time# 定义一个需要执行的阻塞任务def task():    print("任务开始")    time.sleep(2)    print("任务结束")# 在单独的线程中执行函数async def main():    # 获取事件循环    loop = asyncio.get_running_loop()    # 使用run_in_executor来将task函数异步执行在线程池中    # None 表示使用默认的线程池执行器    await loop.run_in_executor(None, task)# 执行主任务asyncio.run(main())如果希望使用进程池,可以创建一个自定义的执行器并传递给 run_in_executor()。在这种情况下,调用者需要负责管理执行器的生命周期,使用完后要手动关闭。示例代码如下:
import asynciofrom concurrent.futures import ProcessPoolExecutorimport time# 定义一个耗时的任务def task(name):    print(f"任务 {name} 开始")    time.sleep(2)# 模拟一个阻塞的操作    print(f"任务 {name} 完成")    return f"来自 {name} 的结果"# 使用自定义的执行器来运行任务async def main():    # 创建一个进程池    with ProcessPoolExecutor() as executor:      # 获取当前的事件循环      loop = asyncio.get_running_loop()      # 使用 run_in_executor 来在进程池中执行任务      results = await asyncio.gather(            loop.run_in_executor(executor, task, "A"),            loop.run_in_executor(executor, task, "B"),            loop.run_in_executor(executor, task, "C")      )      # 打印所有任务的结果      for result in results:            print(result)# 启动 asyncio 事件循环并执行 mainif __name__ == "__main__":    asyncio.run(main())2.6 Python协程:操作系统原生支持吗

异步编程和协程并不总是解决程序中所有并发问题的最佳方案。Python 中的协程是由软件管理的,它们通过asyncio事件循环来执行和调度。与操作系统提供的线程和进程不同,协程并不由操作系统直接支持,而是通过Python的软件框架来实现的。在这个意义上,Python中的协程并不是“原生”的。它们并不像线程或进程那样具有独立的执行上下文,反而是在同一个线程内通过协作式调度来切换任务。
此外,Python的GIL(全局解释器锁)用来保护解释器内部的状态,防止多个线程同时访问和修改解释器的数据。而asyncio的事件循环是单线程运行的,这意味着所有的协程都在同一个线程里执行。由于协程本身是通过事件循环调度的,而不是通过多线程或多进程并行执行,因此,尽管Python中的多线程模型受到GIL的限制,协程在处理 I/O 密集型任务时能够有效避免GIL的影响,从而提高并发性能。这也是为什么在处理大量I/O操作时,使用asyncio和协程能够带来较好的性能表现。
然而,协程并不适用于所有类型的并发任务。例如,对于计算密集型任务,使用线程或进程模型可能更为合适,因为协程并不会突破GIL的限制,计算密集型任务依然会在单个CPU核心上串行执行。因此,在选择是否使用协程时,需要根据任务的特性做出权衡。

3 应用实例

3.1 在基于线程的程序中调用asyncio代码

直接调用同步I/O代码
以下代码实现了一个简单的Tkinter应用,点击按钮后,程序会发起一个同步HTTP请求(GET 请求)。在每60毫秒的刷新周期中,程序会根据当前状态更新显示的文本。然而,当点击按钮时,request_remote方法中的 requests.get会发起一个同步请求,这会阻塞主线程,从而导致界面卡顿或无响应。如下代码,App.QUERYING_STATE状态相关信息不会显示出来:
import tkinter as tkimport requestsclass App(tk.Tk):    INIT_STATE = 0         # 初始化状态    QUERYING_STATE = 1   # 请求中状态    RESULT_STATE = 2       # 请求结果状态    def __init__(self):      super().__init__()      self.status_code = 0         # HTTP请求返回的状态码      self._refresh_ms = 60          # 刷新间隔时间(毫秒)      self.state = App.INIT_STATE    # 初始状态      self._button = None            # 按钮      self._label = None             # 标签      self.render_elements()         # 渲染界面元素      self.after(self._refresh_ms, self.refresh)# 设置定时刷新,定时调用refresh方法    def render_elements(self):      """ 设置界面布局,渲染UI元素 """      self.geometry("400x200")# 设置窗口大小      self._button = tk.Button(self, text="请求状态码", command=self.request_remote)# 创建按钮,点击时调用request_remote方法      self._label = tk.Label(self, text="")# 创建标签,初始为空      self._button.pack()# 将按钮添加到窗口中      self._label.pack()   # 将标签添加到窗口中    def request_remote(self):      """ 发起同步HTTP请求 """      self.state = App.QUERYING_STATE# 设置状态为请求中      response = requests.get("https://www.example.com")# 发起GET请求,获取响应      self.status_code = response.status_code# 获取响应返回的状态码      self.state = App.RESULT_STATE# 设置状态为结果状态,表示请求已完成    def refresh(self):      """ 每60毫秒刷新一次UI内容 """      self.update_label()# 更新标签内容      self.after(self._refresh_ms, self.refresh)# 设置下次刷新时间(每60毫秒刷新一次)    def update_label(self):      """ 根据应用状态更新标签内容 """      if self.state == App.INIT_STATE:            self._label.config(text="这里将显示状态码。")# 初始状态下提示文字      elif self.state == App.QUERYING_STATE:            self._label.config(text="正在查询远程...")# 请求中状态时显示提示文字      elif self.state == App.RESULT_STATE:            self._label.config(text=f"返回的状态码是: {self.status_code}")# 请求结果状态时显示返回的状态码    def start(self):      self.mainloop()# 启动Tkinter事件循环,进入GUI界面def main():    app = App()# 创建应用实例    app.start()# 启动应用if __name__ == "__main__":    main()I/O请求的异步调用
可以将requests包替换为aiohttp包,实现I/O请求的异步调用。aiohttp和requests都是Python中常用的HTTP客户端库,但requests适用于同步场景,简单易用,aiohttp则适用于异步并发的场景,能够处理大量并行请求。具体区别如下:

[*]同步vs异步:


[*]requests是一个同步库,意味着每次发送请求时,程序会等待响应回来后才继续执行。适用于一些简单的、串行的HTTP请求场景。
[*]aiohttp是一个异步库,基于Python的asyncio模块,能够在发送HTTP请求时非阻塞地继续执行其他任务。适用于需要大量并发请求或长时间等待的异步场景。

[*]性能:


[*]requests由于是同步的,处理大量请求时容易出现性能瓶颈,因为每个请求必须等待前一个请求完成。
[*]aiohttp通过异步I/O处理,可以在等待响应时同时发起其他请求,极大提高了并发性能,尤其在处理大量HTTP请求时。

[*]用法:


[*]requests用法简单,适合初学者和一般同步的任务。
[*]aiohttp需要使用async和await,适合需要并发或异步操作的任务。
在上述示例代码中,为了替代requests模块的同步请求,可以创建一个继承自App类的AppAsync类,并利用aiohttp和asyncio库实现异步请求。通过async_request方法异步发起HTTP请求:
import aiohttpimport asyncioclass AppAsync(App):    async def async_request(self):      """         异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。      """      async with aiohttp.ClientSession() as session:# 创建一个aiohttp会话对象            async with session.get("https://www.example.com") as response:# 发起GET请求                self.status_code = response.status# 获取响应状态码                self.state = App.RESULT_STATE# 更新应用状态    def __int__(self):      super().__init__()    def request_remote(self):      """ 使用asyncio.run来调用异步请求代码 """      self.state = self.QUERYING_STATE# 设置状态为请求中      asyncio.run(self.async_request())# 异步发起HTTP请求def main():    app = AppAsync()# 创建应用实例    app.start()# 启动应用if __name__ == "__main__":    main()# 运行主程序然而AppAsync类中的asyncio.run(self.async_request())会阻塞Tkinter的主线程,因为asyncio.run()会一直运行,直到异步任务完成。同时Tkinter自身有一个事件循环(mainloop()),与asyncio需要的事件循环冲突。如果在Tkinter内创建新事件循环,可能会导致Tkinter关闭或中断后出现问题。
将asyncio与线程结合
为了解决asyncio事件循环阻塞的问题,可以使用一个单独的守护线程,并在守护线程中运行事件循环,这样asyncio的事件循环就不会阻塞主线程。重写AppAsync类示例如下:
import aiohttpimport asyncioimport threadingclass AppAsync(App):    def __init__(self):      super().__init__()      self._loop_thread = threading.Thread(target=self.run_asyncio_loop, daemon=True)      self._loop_thread.start()# 启动事件循环线程    async def async_request(self):      """         异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。      """      async with aiohttp.ClientSession() as session:# 创建一个aiohttp会话对象            async with session.get("https://www.example.com") as response:# 发起GET请求                self.status_code = response.status# 获取响应状态码                self.state = App.RESULT_STATE# 更新应用状态    def request_remote(self):      """ 使用异步请求,在事件循环中执行 """      self.state = App.QUERYING_STATE# 设置状态为请求中      asyncio.run_coroutine_threadsafe(self.async_request(), self._loop)# 调用异步请求并与当前事件循环进行交互    def run_asyncio_loop(self):      """ 运行asyncio事件循环 """      self._loop = asyncio.new_event_loop()# 创建新的事件循环      asyncio.set_event_loop(self._loop)# 设置当前线程的事件循环      self._loop.run_forever()# 启动事件循环def main():    app = AppAsync()# 创建应用实例    app.start()# 启动应用if __name__ == "__main__":    main()# 运行主程序示例代码运行时,App.QUERYING_STATE状态相关信息会显示出来,AppAsync类主要的改动点如下:

[*]AppAsync类的构造函数:

[*]增加了一个新的线程来运行asyncio事件循环,避免在Tkinter线程中阻塞。
[*]使用threading.Thread启动一个守护线程,执行run_asyncio_loop方法,确保事件循环在后台运行。
[*]在创建线程时设置为守护线程。这样即使主线程退出,守护线程也会自动结束。

[*]run_asyncio_loop方法:

[*]在一个单独的线程中启动新的asyncio事件循环。
[*]使用asyncio.set_event_loop设置当前线程的事件循环,并调用loop.run_forever()来保持事件循环持续运行。

[*]request_remote方法:

[*]使用asyncio.run_coroutine_threadsafe将异步请求任务提交给后台事件循环执行,用于在非主线程中安全地执行协程。

3.2 基于asyncio实现多核异步处理

单核异步处理
asyncio的并发机制是基于协作式多任务(协程),它不会并行地使用多个CPU核心来加速计算,所有的任务都是在单个核心上轮流执行的。以下代码模拟了1000个爬虫任务,并使用单核异步来执行:
import randomimport asyncioimport time# 模拟爬虫任务,执行时会有随机的延迟async def fake_crawlers():    # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数    io_delay = round(random.uniform(0.2, 1.0), 2)    await asyncio.sleep(io_delay)    result = 0    # 随机生成100,000到500,000之间的数字,用于模拟计算密集型任务    # 这段代码耗时大约0.2秒到0.5秒之间    for i in range(random.randint(100000, 500000)):      result += i    return result# 主程序入口,负责创建并执行多个爬虫任务async def main():    # time.monotonic()是用于测量时间间隔的可靠方法,它不受系统时间更改的影响    start = time.monotonic()    tasks = # 模拟创建1000个任务    await asyncio.gather(*tasks)# 等待所有任务完成    # 输出所有任务完成的时间    print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")    # 启动程序asyncio.run(main())代码运行结果如下:
所有任务已完成,耗时 8.51 秒多核异步处理
要实现多核异步处理,可以将异步编程和多进程池结合起来使用。具体来说,主程序会把任务分成多个批次,每个批次由不同的进程来处理。每个进程内部,多个任务又是通过异步方式并行执行的。这样一来,计算密集型的任务可以通过多进程并行处理,而每个进程内部的I/O操作则可以通过asyncio来异步管理,从而大幅提高整体效率。示例代码如下,代码将1000个任务分布到10个子进程中并行执行,每个子进程执行100个模拟的爬虫任务:
import randomimport asyncio#import timefrom concurrent.futures import ProcessPoolExecutor# 模拟爬虫任务,执行时会有随机的延迟async def fake_crawlers():    # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数    io_delay = round(random.uniform(0.2, 1.0), 2)    await asyncio.sleep(io_delay)    result = 0    # 随机生成100,000到500,000之间的数字,用于模拟阻塞任务    # 这段代码耗时大约0.2秒到0.5秒之间    for i in range(random.randint(100000, 500000)):      result += i    return result# 并发查询任务,通过起始和结束索引分配任务async def query_concurrently(begin_idx: int, end_idx: int):    """ 启动并发任务,通过起始和结束序列号 """    tasks = []      # 根据给定的索引范围(从 begin_idx 到 end_idx),创建并发任务    for _ in range(begin_idx, end_idx, 1):      tasks.append(asyncio.create_task(fake_crawlers()))      # 等待所有任务完成,并返回每个任务的结果    results = await asyncio.gather(*tasks)    return results # 批量任务执行函数,使用子进程池并行执行任务def run_batch_tasks(batch_idx: int, step: int):    """ 在子进程中执行批量任务 """    # 计算当前批次任务的起始和结束索引    begin = batch_idx * step + 1# 当前批次任务的起始索引    end = begin + step# 当前批次任务的结束索引    # 使用 asyncio.run() 启动异步任务并获取结果    results =     return results# 主函数,分批次将任务分配到子进程中执行async def main():    """ 将任务分批次分配到子进程中执行 """    start = time.monotonic()      loop = asyncio.get_running_loop()# 获取当前运行的事件循环    # 创建进程池执行器,用于将任务分配到多个子进程中执行    with ProcessPoolExecutor() as executor:      # 启动多个批次任务,并行执行。每个批次执行 100个任务,共启动10个批次      tasks =    # 等待所有子进程任务完成,并将结果汇总    results =     # 输出所有任务完成的时间    print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")# 程序入口if __name__ == "__main__":    asyncio.run(main())代码运行结果如下:
所有任务已完成,耗时 1.83 秒3.3 图片下载器

若经常需要从互联网下载文件,可以使用aiohttp库来实现任务的自动化。下面提供了一个简单的脚本,用于从指定URL下载文件:
建立本地图片服务器
为了提供图片下载链接,以下代码展示了如何使用FastAPI框架创建一个简单的Web应用程序,用于上传、管理和访问图片:
import osfrom fastapi import FastAPI, File, UploadFilefrom fastapi.responses import FileResponsefrom fastapi.staticfiles import StaticFilesimport uvicornapp = FastAPI()# 配置图片存储目录UPLOAD_DIR = "./uploaded_images"if not os.path.exists(UPLOAD_DIR):    os.makedirs(UPLOAD_DIR)# 将图片目录挂载为静态文件目录app.mount("/images", StaticFiles(directory=UPLOAD_DIR), name="images")# 上传图片接口@app.post("/upload/")async def upload_image(file: UploadFile = File(...)):    try:      # 定义图片保存路径      file_path = os.path.join(UPLOAD_DIR, file.filename)                # 保存图片到本地      with open(file_path, "wb") as f:            f.write(file.file.read())      # 返回图片的访问 URL      image_url = f"http://127.0.0.1:8000/images/{file.filename}"      return {"image_url": image_url}      except Exception as e:      return {"error": str(e)}# 获取所有上传图片的链接@app.get("/images_list/")async def list_images():    try:      # 获取目录下的所有文件      files = os.listdir(UPLOAD_DIR)      image_urls =       return {"image_urls": image_urls}    except Exception as e:      return {"error": str(e)}# 获取单个图片@app.get("/image/{image_name}")async def get_image(image_name: str):    try:      file_path = os.path.join(UPLOAD_DIR, image_name)      if os.path.exists(file_path):            return FileResponse(file_path)      else:            return {"error": "Image not found"}    except Exception as e:      return {"error": str(e)}# 启动 FastAPI 服务器if __name__ == "__main__":    uvicorn.run(app, host="127.0.0.1", port=8000)该代码实现了一个图片上传和访问服务,包含以下三个主要接口:

[*]服务器启动后,会监听本地地址127.0.0.1的8000端口。
[*]客户端可以通过以下方式与服务器进行交互:

[*]访问http://127.0.0.1:8000/upload/上传图片,并获取返回的图片 URL。
[*]访问http://127.0.0.1:8000/images_list/查看所有已上传图片的 URL。
[*]访问http://127.0.0.1:8000/images/{image_name} 来查看特定图片。

注意,所有上传和保存的图片都会保存在本地的uploaded_images文件夹中。
图片下载
以下代码利用了aiohttp、asyncio和aiofiles库,通过异步方式从API获取图片URL列表,并将图片下载到指定目录。借助这些库的结合,代码能够高效地处理HTTP请求、文件下载和文件操作,同时确保主程序的执行不被阻塞:
import aiohttp# 导入 aiohttp 库,用于异步 HTTP 请求import asyncio# 导入 asyncio 库,用于管理异步任务import aiofiles# 导入 aiofiles 库,用于异步文件操作import os # 获取图片 URL 列表的异步函数async def get_image_urls(api_url):    try:      # 使用 aiohttp 启动一个异步 HTTP 会话      async with aiohttp.ClientSession() as session:            # 异步发送 GET 请求以获取 API 返回的数据            async with session.get(api_url) as response:                # 如果响应状态码是 200 (请求成功)                if response.status == 200:                  # 将响应内容解析为 JSON 格式                  data = await response.json()                  # 从 JSON 数据中提取图片 URL 列表,若没有则返回空列表                  return data.get("image_urls", [])                else:                  # 如果请求失败,打印错误信息                  print(f"从 {api_url} 获取图片列表失败。状态码: {response.status}")                  return []    except Exception as e:      # 如果发生任何异常,打印错误信息      print(f"获取图片列表时发生错误: {e}")      return []# 下载文件的异步函数async def download_file(url, save_directory):    try:      # 使用 aiohttp 启动异步 HTTP 会话      async with aiohttp.ClientSession() as session:            # 异步发送 GET 请求以获取文件内容            async with session.get(url) as response:                # 如果响应状态码是 200 (请求成功)                if response.status == 200:                  # 确保保存文件的目录存在,若不存在则创建                  os.makedirs(save_directory, exist_ok=True)                                        # 从 URL 中提取文件名                  filename = os.path.join(save_directory, url.split('/')[-1])                  # 异步打开文件以进行写入操作                  async with aiofiles.open(filename, 'wb') as file:                        # 读取响应内容                        content = await response.read()                        # 将内容写入本地文件                        await file.write(content)                  print(f"已下载 {filename}")                else:                  # 如果下载失败,打印错误信息                  print(f"下载 {url} 失败。状态码: {response.status}")    except Exception as e:      # 如果发生任何异常,打印错误信息      print(f"下载 {url} 时发生错误: {e}")# 根据获取的图片 URL 列表进行下载的异步函数async def download_images(api_url, save_directory):    # 调用 get_image_urls 函数获取图片 URL 列表    image_urls = await get_image_urls(api_url)      # 如果没有获取到图片 URL,则打印提示并返回    if not image_urls:      print("没有找到需要下载的图片。")      return      # 为每个图片 URL 创建一个下载任务    tasks =       # 使用 asyncio.gather 并行执行所有下载任务    await asyncio.gather(*tasks)# 启动事件循环,开始下载图片if __name__ == "__main__":    # API 地址,提供图片 URL 列表    api_url = "http://127.0.0.1:8000/images_list/"    # 指定保存下载图片的目录    save_directory = "downloads"      # 获取事件循环并运行下载任务    loop = asyncio.get_event_loop()    loop.run_until_complete(download_images(api_url, save_directory))3.4 生产者消费者模型

生产者-消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,旨在解决多个任务之间生产和消费的协调问题,从而确保资源得到合理利用并保证数据按顺序处理。该模型通过生产者和消费者两个角色,模拟共享资源的生产和消费过程。以下代码实现了一个基本的生产者-消费者模型,采用了asyncio进行异步任务处理:
import asynciofrom asyncio import Queuefrom typing import List# 生产者函数,负责将物品添加到队列async def produce_items(queue: Queue, items: List, producer_name: str):    for item in items:      await queue.put(item)# 将物品放入队列      print(f"{producer_name} 添加物品:{item}")      await asyncio.sleep(0.5)# 模拟生产过程中的等待时间    print(f"{producer_name} 完成所有物品的生产")# 消费者函数,负责从队列中取出并处理物品async def consume_items(queue: Queue, consumer_name: str):    while True:      item = await queue.get()# 阻塞直到获取到一个物品      if item is None:# 使用None作为结束信号            queue.task_done()# 标记任务完成            break# 退出循环      print(f"{consumer_name} 处理物品:{item}")      await asyncio.sleep(1)# 模拟处理物品的时间      queue.task_done()# 标记任务完成# 主函数,负责启动多个生产者和消费者任务async def main():    queue = Queue()# 创建一个队列    items_to_produce = ['A','B','C','D']# 需要生产的物品列表      # 创建产者任务(例如3个生产者)    producer_tasks = [      asyncio.create_task(produce_items(queue, items_to_produce, f"生产者_{i}"))      for i in range(3)      ]      # 创建消费者任务(例如2个消费者)    consumer_tasks = [      asyncio.create_task(consume_items(queue, f"消费者_{i}"))      for i in range(2)    ]      # 等待所有生产者任务完成    await asyncio.gather(*producer_tasks)      # 生产者完成后,发送 None 给消费者,通知它们退出    for _ in consumer_tasks:      await queue.put(None)      # 等待队列中的所有任务处理完成    await queue.join()      # 等待所有消费者任务完成    await asyncio.gather(*consumer_tasks)if __name__ == '__main__':    # 运行主函数    asyncio.run(main())4 参考


[*]Python异步编程库asyncio使用指北
[*]Python Asyncio: The Complete Guide
[*]Combining Traditional Thread-Based Code and Asyncio in Python
[*]Harnessing Multi-Core Power with Asyncio in Python
[*]21 Simple Python Scripts That Will Automate Your Daily Tasks
[*]Asyncio:一个异步的 Python 并发编程库!
页: [1]
查看完整版本: [python] asyncio库常见问题与实践案例