Python 异步编程:从入门到实践

Python 的 asyncio 库为编写并发代码提供了一套完整的解决方案。本文将带你深入了解异步编程的核心概念和实际应用。

什么是异步编程?

异步编程是一种编程范式,允许程序在等待 I/O 操作(如网络请求、文件读写)时,不阻塞主线程去执行其他任务。

同步 vs 异步

import time
import requests

# 同步方式:按顺序执行,耗时 = 各请求时间之和
def sync_fetch():
    urls = ['https://api.example.com/data1', 
            'https://api.example.com/data2',
            'https://api.example.com/data3']
    
    results = []
    for url in urls:
        response = requests.get(url)  # 阻塞等待
        results.append(response.json())
    return results

# 异步方式:并发执行,耗时 ≈ 最慢的那个请求
def async_fetch():
    # 使用 asyncio 和 aiohttp 实现
    pass

asyncio 核心概念

1. 协程(Coroutine)

协程是异步编程的基本单元,使用 async def 定义。

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步操作
    print("World")

# 运行协程
asyncio.run(say_hello())

2. 事件循环(Event Loop)

事件循环是 asyncio 的核心,负责调度和执行协程。

import asyncio

async def main():
    print("Start")
    await asyncio.sleep(1)
    print("End")

# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

# 或者使用 asyncio.run()(Python 3.7+)
asyncio.run(main())

3. 任务(Task)

任务用于并发执行多个协程。

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(2)
    print("Task 2 completed")

async def main():
    # 创建任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待所有任务完成
    await asyncio.gather(t1, t2)

asyncio.run(main())

实战:异步 HTTP 请求

使用 aiohttp 进行异步 HTTP 请求:

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
    ]
    
    start = time.time()
    results = await fetch_all(urls)
    print(f"耗时: {time.time() - start:.2f}秒")
    print(f"获取到 {len(results)} 条数据")

asyncio.run(main())

异步上下文管理器

import asyncio

class DatabaseConnection:
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.connection = None
    
    async def __aenter__(self):
        # 模拟异步连接数据库
        await asyncio.sleep(0.1)
        self.connection = f"Connected to {self.connection_string}"
        print(self.connection)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.1)
        print("Connection closed")
        self.connection = None

async def main():
    async with DatabaseConnection("postgresql://localhost/db") as db:
        print(f"Using: {db.connection}")

asyncio.run(main())

异步迭代器

import asyncio

class AsyncDataSource:
    def __init__(self, data):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        # 模拟异步获取数据
        await asyncio.sleep(0.1)
        value = self.data[self.index]
        self.index += 1
        return value

async def main():
    source = AsyncDataSource([1, 2, 3, 4, 5])
    
    async for item in source:
        print(f"Received: {item}")

asyncio.run(main())

错误处理

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    raise ValueError("Something went wrong")

async def main():
    try:
        await risky_operation()
    except ValueError as e:
        print(f"Caught error: {e}")
    
    # 处理多个任务的错误
    tasks = [
        asyncio.create_task(risky_operation()),
        asyncio.create_task(asyncio.sleep(1)),
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")
        else:
            print(f"Task succeeded: {result}")

asyncio.run(main())

超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "Completed"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

与同步代码集成

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # 模拟阻塞 I/O 操作
    import time
    time.sleep(2)
    return "Blocking result"

async def main():
    loop = asyncio.get_event_loop()
    
    # 在线程池中运行阻塞代码
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print(result)
    
    # 或者使用 asyncio.to_thread (Python 3.9+)
    result = await asyncio.to_thread(blocking_io)
    print(result)

asyncio.run(main())

最佳实践

  1. 始终使用 await - 忘记 await 协程会导致协程未执行
  2. 避免在异步函数中使用阻塞操作 - 使用线程池或专门的异步库
  3. 合理设置并发数 - 使用 asyncio.Semaphore 限制并发
import asyncio

async def limited_fetch(sem, session, url):
    async with sem:
        async with session.get(url) as response:
            return await response.text()

async def main():
    sem = asyncio.Semaphore(10)  # 最多 10 个并发请求
    urls = [...]  # 大量 URL
    
    async with aiohttp.ClientSession() as session:
        tasks = [limited_fetch(sem, session, url) for url in urls]
        results = await asyncio.gather(*tasks)

asyncio.run(main())

总结

Python 的 asyncio 提供了一套强大的异步编程工具:

  • 协程 - 使用 async def 定义
  • 事件循环 - 调度和执行协程
  • 任务 - 并发执行多个协程
  • 异步上下文管理器 - __aenter____aexit__
  • 异步迭代器 - __aiter____anext__

掌握这些概念,你就能编写出高性能的异步 Python 程序。


本文首发于技术博客,转载请注明出处。