Skip to content

asyncio.py

文件信息

  • 📄 原文件:03_asyncio.py
  • 🔤 语言:python

Python asyncio 异步编程 本文件介绍 Python 中的 asyncio 异步编程。

asyncio 是 Python 的异步 I/O 框架,使用协程实现并发。 适合 I/O 密集型任务,如网络请求、文件操作等。

完整代码

python
import asyncio
import time
from typing import List


async def main01_coroutine_basics():
    """
    ============================================================
                    1. 协程基础
    ============================================================
    """
    print("=" * 60)
    print("1. 协程基础")
    print("=" * 60)

    # 【定义协程】使用 async def
    async def hello():
        print("  Hello")
        await asyncio.sleep(0.1)  # 异步等待
        print("  World")
        return "完成"

    # 【运行协程】
    print("运行协程:")
    result = await hello()
    print(f"返回值: {result}")

    # 【协程对象】
    print(f"\n--- 协程对象 ---")
    coro = hello()  # 创建协程对象,不执行
    print(f"协程对象: {coro}")
    await coro  # 执行协程

    # 【await 关键字】
    print(f"\n--- await 关键字 ---")
    print("await 用于等待:")
    print("  - 协程 (coroutine)")
    print("  - Task 对象")
    print("  - Future 对象")
    print("  - 任何实现 __await__ 的对象")


async def main02_tasks():
    """
    ============================================================
                    2. 任务 (Task)
    ============================================================
    """
    print("\n" + "=" * 60)
    print("2. 任务 (Task)")
    print("=" * 60)

    async def fetch_data(name, delay):
        print(f"  开始获取 {name}")
        await asyncio.sleep(delay)
        print(f"  完成获取 {name}")
        return f"{name} 的数据"

    # 【创建任务】asyncio.create_task()
    print("创建任务并发执行:")
    start = time.perf_counter()

    task1 = asyncio.create_task(fetch_data("A", 0.2))
    task2 = asyncio.create_task(fetch_data("B", 0.1))
    task3 = asyncio.create_task(fetch_data("C", 0.15))

    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    result3 = await task3

    elapsed = time.perf_counter() - start
    print(f"总耗时: {elapsed:.2f}秒(并发执行)")
    print(f"结果: {result1}, {result2}, {result3}")

    # 【asyncio.gather】同时等待多个协程
    print(f"\n--- asyncio.gather ---")
    start = time.perf_counter()

    results = await asyncio.gather(
        fetch_data("X", 0.1),
        fetch_data("Y", 0.15),
        fetch_data("Z", 0.2),
    )

    elapsed = time.perf_counter() - start
    print(f"gather 耗时: {elapsed:.2f}秒")
    print(f"结果: {results}")

    # 【gather 处理异常】
    print(f"\n--- gather 处理异常 ---")

    async def maybe_fail(n):
        if n == 2:
            raise ValueError(f"任务 {n} 失败")
        await asyncio.sleep(0.05)
        return n

    # return_exceptions=True 返回异常而不是抛出
    results = await asyncio.gather(
        maybe_fail(1),
        maybe_fail(2),
        maybe_fail(3),
        return_exceptions=True
    )
    print(f"结果(含异常): {results}")


async def main03_wait_and_timeout():
    """
    ============================================================
                3. 等待和超时
    ============================================================
    """
    print("\n" + "=" * 60)
    print("3. 等待和超时")
    print("=" * 60)

    async def slow_task(n, delay):
        await asyncio.sleep(delay)
        return f"任务{n}完成"

    # 【asyncio.wait】更细粒度的控制
    print("--- asyncio.wait ---")

    tasks = [
        asyncio.create_task(slow_task(i, 0.1 * i))
        for i in range(1, 4)
    ]

    # 等待第一个完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    print(f"第一个完成: {[t.result() for t in done]}")
    print(f"待完成数: {len(pending)}")

    # 等待剩余任务
    if pending:
        done, _ = await asyncio.wait(pending)
        print(f"剩余完成: {[t.result() for t in done]}")

    # 【asyncio.wait_for】超时控制
    print(f"\n--- asyncio.wait_for 超时 ---")

    async def long_task():
        await asyncio.sleep(1)
        return "完成"

    try:
        result = await asyncio.wait_for(long_task(), timeout=0.1)
    except asyncio.TimeoutError:
        print("任务超时!")

    # 【asyncio.timeout】Python 3.11+ 上下文管理器
    print(f"\n--- asyncio.timeout (Python 3.11+) ---")
    try:
        async with asyncio.timeout(0.1):
            await asyncio.sleep(1)
    except asyncio.TimeoutError:
        print("上下文超时!")


async def main04_semaphore_lock():
    """
    ============================================================
                4. 异步同步原语
    ============================================================
    """
    print("\n" + "=" * 60)
    print("4. 异步同步原语")
    print("=" * 60)

    # 【asyncio.Lock】异步锁
    print("--- asyncio.Lock ---")

    lock = asyncio.Lock()
    shared_resource = []

    async def safe_append(item):
        async with lock:
            print(f"  获取锁,添加 {item}")
            shared_resource.append(item)
            await asyncio.sleep(0.05)
            print(f"  释放锁")

    await asyncio.gather(*[safe_append(i) for i in range(3)])
    print(f"结果: {shared_resource}")

    # 【asyncio.Semaphore】限制并发数
    print(f"\n--- asyncio.Semaphore ---")

    semaphore = asyncio.Semaphore(2)  # 最多2个并发

    async def limited_task(n):
        async with semaphore:
            print(f"  任务 {n} 开始")
            await asyncio.sleep(0.1)
            print(f"  任务 {n} 结束")

    await asyncio.gather(*[limited_task(i) for i in range(5)])

    # 【asyncio.Event】事件
    print(f"\n--- asyncio.Event ---")

    event = asyncio.Event()

    async def waiter(name):
        print(f"  {name} 等待事件")
        await event.wait()
        print(f"  {name} 收到事件!")

    async def setter():
        await asyncio.sleep(0.1)
        print("  设置事件!")
        event.set()

    await asyncio.gather(
        waiter("A"),
        waiter("B"),
        setter()
    )

    # 【asyncio.Queue】异步队列
    print(f"\n--- asyncio.Queue ---")

    queue: asyncio.Queue = asyncio.Queue()

    async def producer():
        for i in range(3):
            await queue.put(i)
            print(f"  生产: {i}")
            await asyncio.sleep(0.05)

    async def consumer():
        while True:
            try:
                item = await asyncio.wait_for(queue.get(), timeout=0.2)
                print(f"  消费: {item}")
                queue.task_done()
            except asyncio.TimeoutError:
                break

    await asyncio.gather(producer(), consumer())


async def main05_async_generators():
    """
    ============================================================
                5. 异步生成器和迭代器
    ============================================================
    """
    print("\n" + "=" * 60)
    print("5. 异步生成器和迭代器")
    print("=" * 60)

    # 【异步生成器】async def + yield
    async def async_range(start, stop):
        for i in range(start, stop):
            await asyncio.sleep(0.05)
            yield i

    print("异步生成器:")
    async for num in async_range(0, 5):
        print(f"  {num}")

    # 【异步列表推导】
    print(f"\n异步列表推导:")
    result = [x async for x in async_range(0, 3)]
    print(f"结果: {result}")

    # 【异步上下文管理器】
    print(f"\n--- 异步上下文管理器 ---")

    class AsyncResource:
        async def __aenter__(self):
            print("  获取资源")
            await asyncio.sleep(0.05)
            return self

        async def __aexit__(self, exc_type, exc_val, exc_tb):
            print("  释放资源")
            await asyncio.sleep(0.05)
            return False

        async def do_something(self):
            print("  使用资源")

    async with AsyncResource() as resource:
        await resource.do_something()


async def main06_practical_examples():
    """
    ============================================================
                6. 实际应用示例
    ============================================================
    """
    print("\n" + "=" * 60)
    print("6. 实际应用示例")
    print("=" * 60)

    # 【模拟并发 HTTP 请求】
    print("--- 模拟并发 HTTP 请求 ---")

    async def fetch_url(url: str) -> dict:
        """模拟 HTTP 请求"""
        delay = 0.1 + (hash(url) % 10) / 100
        await asyncio.sleep(delay)
        return {"url": url, "status": 200}

    urls = [
        "https://api.example.com/users",
        "https://api.example.com/posts",
        "https://api.example.com/comments",
        "https://api.example.com/albums",
    ]

    start = time.perf_counter()
    results = await asyncio.gather(*[fetch_url(url) for url in urls])
    elapsed = time.perf_counter() - start

    print(f"并发请求 {len(urls)} 个 URL,耗时: {elapsed:.2f}秒")
    for r in results:
        print(f"  {r['url']}: {r['status']}")

    # 【限速请求】
    print(f"\n--- 限速请求(使用 Semaphore)---")

    semaphore = asyncio.Semaphore(2)  # 最多2个并发

    async def rate_limited_fetch(url: str) -> dict:
        async with semaphore:
            return await fetch_url(url)

    start = time.perf_counter()
    results = await asyncio.gather(*[rate_limited_fetch(url) for url in urls])
    elapsed = time.perf_counter() - start

    print(f"限速请求耗时: {elapsed:.2f}秒")

    # 【生产者消费者模式】
    print(f"\n--- 生产者消费者模式 ---")

    async def producer(queue: asyncio.Queue, items: List[int]):
        for item in items:
            await queue.put(item)
            print(f"  生产: {item}")
            await asyncio.sleep(0.02)
        await queue.put(None)  # 结束信号

    async def consumer(queue: asyncio.Queue, name: str):
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)  # 传递结束信号
                break
            print(f"  {name} 消费: {item}")
            await asyncio.sleep(0.05)

    queue: asyncio.Queue = asyncio.Queue()
    await asyncio.gather(
        producer(queue, list(range(5))),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )


async def main07_error_handling():
    """
    ============================================================
                7. 错误处理
    ============================================================
    """
    print("\n" + "=" * 60)
    print("7. 错误处理")
    print("=" * 60)

    # 【单个任务异常】
    print("--- 单个任务异常 ---")

    async def risky_task(n):
        if n == 2:
            raise ValueError(f"任务 {n} 出错")
        await asyncio.sleep(0.05)
        return f"任务 {n} 成功"

    try:
        result = await risky_task(2)
    except ValueError as e:
        print(f"捕获异常: {e}")

    # 【gather 中的异常】
    print(f"\n--- gather 中的异常 ---")

    results = await asyncio.gather(
        risky_task(1),
        risky_task(2),
        risky_task(3),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"  任务 {i+1} 失败: {result}")
        else:
            print(f"  任务 {i+1} 成功: {result}")

    # 【TaskGroup (Python 3.11+)】
    print(f"\n--- TaskGroup (Python 3.11+) ---")

    async def safe_task(n):
        await asyncio.sleep(0.05)
        return f"任务 {n} 完成"

    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(safe_task(1))
            task2 = tg.create_task(safe_task(2))
        print(f"TaskGroup 结果: {task1.result()}, {task2.result()}")
    except* ValueError as eg:
        print(f"TaskGroup 异常组: {eg.exceptions}")


async def main08_patterns():
    """
    ============================================================
                8. 常用模式
    ============================================================
    """
    print("\n" + "=" * 60)
    print("8. 常用模式")
    print("=" * 60)

    # 【批量处理】
    print("--- 批量处理 ---")

    async def process_item(item):
        await asyncio.sleep(0.02)
        return item * 2

    async def batch_process(items, batch_size=3):
        results = []
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            batch_results = await asyncio.gather(*[process_item(x) for x in batch])
            results.extend(batch_results)
            print(f"  处理批次 {i // batch_size + 1}: {batch_results}")
        return results

    items = list(range(10))
    results = await batch_process(items, batch_size=3)
    print(f"最终结果: {results}")

    # 【重试模式】
    print(f"\n--- 重试模式 ---")

    attempt_count = 0

    async def unreliable_task():
        nonlocal attempt_count
        attempt_count += 1
        if attempt_count < 3:
            raise ConnectionError(f"尝试 {attempt_count} 失败")
        return "成功!"

    async def retry(coro_func, max_retries=3, delay=0.1):
        for attempt in range(max_retries):
            try:
                return await coro_func()
            except Exception as e:
                print(f"  尝试 {attempt + 1} 失败: {e}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay)
        raise Exception("重试次数用尽")

    result = await retry(unreliable_task, max_retries=5)
    print(f"最终结果: {result}")

    # 【超时重试】
    print(f"\n--- 超时重试 ---")

    async def slow_or_fast():
        import random
        delay = random.uniform(0.05, 0.2)
        await asyncio.sleep(delay)
        return f"耗时 {delay:.2f}秒"

    async def with_timeout_retry(coro_func, timeout=0.1, max_retries=3):
        for attempt in range(max_retries):
            try:
                return await asyncio.wait_for(coro_func(), timeout=timeout)
            except asyncio.TimeoutError:
                print(f"  尝试 {attempt + 1} 超时")
        raise asyncio.TimeoutError("重试超时")

    try:
        result = await with_timeout_retry(slow_or_fast, timeout=0.15, max_retries=5)
        print(f"结果: {result}")
    except asyncio.TimeoutError:
        print("所有尝试都超时")


async def main():
    """主函数"""
    await main01_coroutine_basics()
    await main02_tasks()
    await main03_wait_and_timeout()
    await main04_semaphore_lock()
    await main05_async_generators()
    await main06_practical_examples()
    await main07_error_handling()
    await main08_patterns()


if __name__ == "__main__":
    asyncio.run(main())

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布