一步一步从初级到高级的介绍一下asyncio 的用法

asyncio是 Python 用来处理异步编程的库,简单说就是让程序在“等东西”的时候不傻乎乎干等着,而是去干点别的活儿,提高效率。

初级:啥是异步?怎么跑起来?

1. 理解异步的核心:别干等

想象你在煮饭:水开了要等 10 分钟才能下面条。同步编程就像你傻站那儿盯着锅等 10 分钟啥也不干;异步编程呢,就是水烧着的时候你去刷个碗、切个菜,等水开了再回来下面条。asyncio 就是帮你实现这种“不干等”的工具。

2. 最简单的起步:跑个 Hello World

Python 自带 asyncio,不用装啥,直接开干。咱先写个最简单的异步代码:

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 假装等1秒,就像烧水
    print("World")

asyncio.run(say_hello())
  • async def:定义一个异步函数,告诉 Python 这函数可以“暂停”。
  • await:遇到 await 就先暂停,等它后面的东西(比如 sleep)干完再继续。
  • asyncio.run():启动异步程序的入口。

跑一下,输出是:

Hello
(等1秒)
World

这就是异步的基本味儿:程序不会卡死,能“暂停”再“继续”。

3. 多干点活儿:跑多个任务

光一个任务没啥意思,异步的牛逼之处是能同时干多个事。比如:

import asyncio

async def task1():
    print("任务1开始")
    await asyncio.sleep(2)
    print("任务1结束")

async def task2():
    print("任务2开始")
    await asyncio.sleep(1)
    print("任务2结束")

async def main():
    await asyncio.gather(task1(), task2())  # 一起跑

asyncio.run(main())

输出:

任务1开始
任务2开始
任务2结束  # 1秒后
任务1结束  # 2秒后

asyncio.gather():把多个任务塞一块儿跑,类似于“烧水的时候切菜”。

任务1等2秒,任务2等1秒,但它们是一起开始的,总时间只花了2秒,而不是3秒(2+1)。

中级:整点实用的活儿

1. 加点交互:模拟网络请求

现实中,异步最常用在等网络请求(比如爬网页、调接口)。咱用 sleep 模拟一下延迟:

import asyncio

async def fetch_data(name, delay):
    print(f"{name} 开始请求数据")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"{name} 数据拿到啦")
    return f"{name} 的结果"

async def main():
    results = await asyncio.gather(
        fetch_data("网页A", 2),
        fetch_data("网页B", 1),
        fetch_data("网页C", 3)
    )
    print("所有结果:", results)

asyncio.run(main())

输出:

网页A 开始请求数据
网页B 开始请求数据
网页C 开始请求数据
网页B 数据拿到啦  # 1秒
网页A 数据拿到啦  # 2秒
网页C 数据拿到啦  # 3秒
所有结果: ['网页A 的结果', '网页B 的结果', '网页C 的结果']

这里模拟了三个“请求”,每个耗时不同,但总耗时是3秒(最长的那个),而不是6秒(2+1+3)。

2. 加个超时:别等太久

有时候网络太慢,咱得加个“等不及就放弃”的逻辑:

import asyncio

async def slow_task():
    await asyncio.sleep(5)
    return "终于好了"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)  # 只等2秒
        print(result)
    except asyncio.TimeoutError:
        print("等不及了,超时了!")

asyncio.run(main())

输出:

等不及了,超时了!

asyncio.wait_for():设置超时,超过时间就抛异常。

高级:整复杂点的活儿

1. 任务调度:手动控制谁先跑

有时候你不想全扔一块儿跑,而是手动控制任务。可以用 asyncio.create_task():

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("任务1完成")

async def task2():
    await asyncio.sleep(2)
    print("任务2完成")

async def main():
    t1 = asyncio.create_task(task1())  # 创建任务,马上跑
    t2 = asyncio.create_task(task2())
    print("任务都安排上了")
    await t1  # 等任务1完成
    print("任务1好了,继续干别的")
    await t2  # 再等任务2

asyncio.run(main())

输出:

任务都安排上了
任务1完成  # 1秒
任务1好了,继续干别的
任务2完成  # 2秒

create_task():把任务扔到后台跑,马上返回一个“任务对象”,你可以随时 await 它。

2. 队列和生产者-消费者模式

异步还能玩“流水线”,比如一个任务生产数据,另一个消费数据:

import asyncio
import random

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)
        item = f"货物{i}"
        print(f"生产了 {item}")
        await queue.put(item)  # 放进队列

async def consumer(queue):
    while True:
        item = await queue.get()  # 从队列拿东西
        print(f"消费了 {item}")
        await asyncio.sleep(random.uniform(0.5, 1.5))  # 随机耗时
        queue.task_done()  # 标记任务完成

async def main():
    queue = asyncio.Queue()  # 创建队列
    await asyncio.gather(producer(queue), consumer(queue))

asyncio.run(main())

输出(随机时间会有变化):

生产了 货物0
消费了 货物0
生产了 货物1
消费了 货物1
生产了 货物2

asyncio.Queue():异步队列,生产者放东西,消费者拿东西,适合多人协作的场景。

3. 异常处理:别崩了

异步任务多了,崩了咋办?加个异常处理:

import asyncio

async def risky_task():
    await asyncio.sleep(1)
    raise ValueError("我崩了!")

async def main():
    try:
        await asyncio.gather(risky_task(), return_exceptions=True)  # 捕获异常
    except ValueError as e:
        print(f"抓到错误:{e}")

asyncio.run(main())

输出:

抓到错误:我崩了!

return_exceptions=True:让 gather 不直接抛异常,而是返回错误对象。

高级用法 1:事件循环(Event Loop)手动操控

asyncio.run() 其实是个“懒人封装”,背后真正的“大脑”是事件循环(Event Loop)。高级场景下,你可能需要自己掌控这个循环。

啥是事件循环? 事件循环就像个“任务调度员”,负责安排谁跑、谁暂停、啥时候继续。默认 asyncio.run() 会帮你启动和关闭它,但你也可以手动操作。

手动跑循环

import asyncio

async def say_hi():
    print("Hi")
    await asyncio.sleep(1)
    print("Bye")

loop = asyncio.get_event_loop()  # 拿到事件循环
loop.run_until_complete(say_hi())  # 跑一个任务
loop.close()  # 关掉循环

geteventloop():拿到当前的事件循环。 rununtilcomplete():跑某个异步任务直到结束。

为啥要手动? 比如你想在一个程序里跑多个阶段的任务,或者在循环里塞点非异步的代码:

import asyncio

async def task(name):
    print(f"{name} 开始")
    await asyncio.sleep(1)
    print(f"{name} 结束")

loop = asyncio.get_event_loop()
loop.run_until_complete(task("第一波"))  # 第一阶段
print("中间干点别的")
loop.run_until_complete(task("第二波"))  # 第二阶段
loop.close()

输出:

第一波 开始
第一波 结束
中间干点别的
第二波 开始
第二波 结束

注意:一个循环只能用一次,关了就不能再开。想多次跑,得用 asyncio.neweventloop() 创建新的。

高级点:自定义调度

可以用 callsoon 或 calllater 在循环里插队跑普通函数:

import asyncio

def normal_func():
    print("我是一个普通函数")

async def main():
    loop = asyncio.get_running_loop()  # 获取当前循环
    loop.call_later(1, normal_func)  # 1秒后跑普通函数
    await asyncio.sleep(2)  # 等2秒,确保看到效果

asyncio.run(main())

输出:

(等1秒)
我是一个普通函数
(再等1秒程序结束)

高级用法 2:异步上下文管理器

有时候你需要初始化和清理资源(比如数据库连接),异步版本的 with 语句能帮你。

自定义异步上下文

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("资源初始化")
        await asyncio.sleep(1)  # 模拟异步初始化
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("资源清理")
        await asyncio.sleep(1)  # 模拟异步清理

async def main():
    async with AsyncResource() as r:
        print("使用资源中")
        await asyncio.sleep(1)

asyncio.run(main())

输出:

资源初始化
(等1秒)
使用资源中
(等1秒)
资源清理
(等1秒)

aenter:进入时异步初始化。 aexit:退出时异步清理。

实际用处 比如异步文件操作(用 aiofiles 库):

import aiofiles
import asyncio

async def write_file():
    async with aiofiles.open("test.txt", "w") as f:
        await f.write("异步写入文件")
    print("写完了")

asyncio.run(write_file())

aiofiles 是异步文件操作库,搭配 asyncio 用,避免阻塞。

高级用法 3:信号处理与任务取消

异步程序跑着跑着,可能得中途停下来,或者响应外部信号(比如 Ctrl+C)。

取消任务

import asyncio

async def long_task():
    try:
        print("开始长任务")
        await asyncio.sleep(10)
        print("长任务结束")
    except asyncio.CancelledError:
        print("任务被取消了!")
        raise  # 抛出异常,确保清理

async def main():
    task = asyncio.create_task(long_task())
    await asyncio.sleep(2)  # 等2秒
    task.cancel()  # 取消任务
    try:
        await task  # 等任务被取消
    except asyncio.CancelledError:
        print("取消确认")

asyncio.run(main())

输出:

开始长任务
(等2秒)
任务被取消了!
取消确认

task.cancel():取消某个任务。 CancelledError:捕获取消时的异常。

响应信号 比如优雅地停止程序:

import asyncio
import signal

async def worker():
    while True:
        print("我在干活")
        await asyncio.sleep(1)

def stop_loop(loop):
    print("收到停止信号,关机啦")
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, stop_loop, loop)  # 捕获 Ctrl+C
    await worker()

asyncio.run(main())

跑起来后按 Ctrl+C:

我在干活
我在干活
^C收到停止信号,关机啦

高级用法 4:异步迭代器与生成器

异步还能玩“边等边拿”的迭代器,适合处理流式数据。

异步迭代器

import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(1)  # 模拟异步等待
        self.current += 1
        return self.current - 1

async def main():
    async for num in AsyncCounter(3):
        print(f"拿到数字:{num}")

asyncio.run(main())

输出:

(等1秒)
拿到数字:0
(等1秒)
拿到数字:1
(等1秒)
拿到数字:2

async for:异步迭代,边等边处理。

异步生成器 更简单点的写法:

import asyncio

async def async_gen():
    for i in range(3):
        await asyncio.sleep(1)
        yield i  # 异步生成值

async def main():
    async for num in async_gen():
        print(f"生成:{num}")

asyncio.run(main())

输出跟上面一样。异步生成器用 yield 产出值,适合流式处理大数据。

高级用法 5:线程与异步混用

有时候你得跑点阻塞的同步代码(比如第三方库不支持异步),可以用 runinexecutor 扔到线程里。

import asyncio
import time

def blocking_task():
    time.sleep(2)  # 模拟阻塞
    return "阻塞任务完成"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_task)  # 扔到默认线程池
    print(result)

asyncio.run(main())

输出:

(等2秒)
阻塞任务完成

runinexecutor(None, 函数):把同步函数扔到线程池跑,不卡主循环。

文档信息

版权声明:可自由转载(请注明转载出处)-非商用-非衍生

发表时间:2025年7月1日 09:53