Python 异步编程:从入门到实践
Python 的 asyncio 库为编写并发代码提供了一套完整的解决方案。本文将带你深入了解异步编程的核心概念和实际应用。
什么是异步编程?
异步编程是一种编程范式,允许程序在等待 I/O 操作(如网络请求、文件读写)时,不阻塞主线程去执行其他任务。
同步 vs 异步
import time
import requests
# 同步方式:按顺序执行,耗时 = 各请求时间之和
def sync_fetch():
urls = ['https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3']
results = []
for url in urls:
response = requests.get(url) # 阻塞等待
results.append(response.json())
return results
# 异步方式:并发执行,耗时 ≈ 最慢的那个请求
def async_fetch():
# 使用 asyncio 和 aiohttp 实现
pass
asyncio 核心概念
1. 协程(Coroutine)
协程是异步编程的基本单元,使用 async def 定义。
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟异步操作
print("World")
# 运行协程
asyncio.run(say_hello())
2. 事件循环(Event Loop)
事件循环是 asyncio 的核心,负责调度和执行协程。
import asyncio
async def main():
print("Start")
await asyncio.sleep(1)
print("End")
# 获取事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# 或者使用 asyncio.run()(Python 3.7+)
asyncio.run(main())
3. 任务(Task)
任务用于并发执行多个协程。
import asyncio
async def task1():
await asyncio.sleep(1)
print("Task 1 completed")
async def task2():
await asyncio.sleep(2)
print("Task 2 completed")
async def main():
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
await asyncio.gather(t1, t2)
asyncio.run(main())
实战:异步 HTTP 请求
使用 aiohttp 进行异步 HTTP 请求:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
]
start = time.time()
results = await fetch_all(urls)
print(f"耗时: {time.time() - start:.2f}秒")
print(f"获取到 {len(results)} 条数据")
asyncio.run(main())
异步上下文管理器
import asyncio
class DatabaseConnection:
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
async def __aenter__(self):
# 模拟异步连接数据库
await asyncio.sleep(0.1)
self.connection = f"Connected to {self.connection_string}"
print(self.connection)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.1)
print("Connection closed")
self.connection = None
async def main():
async with DatabaseConnection("postgresql://localhost/db") as db:
print(f"Using: {db.connection}")
asyncio.run(main())
异步迭代器
import asyncio
class AsyncDataSource:
def __init__(self, data):
self.data = data
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
# 模拟异步获取数据
await asyncio.sleep(0.1)
value = self.data[self.index]
self.index += 1
return value
async def main():
source = AsyncDataSource([1, 2, 3, 4, 5])
async for item in source:
print(f"Received: {item}")
asyncio.run(main())
错误处理
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
await risky_operation()
except ValueError as e:
print(f"Caught error: {e}")
# 处理多个任务的错误
tasks = [
asyncio.create_task(risky_operation()),
asyncio.create_task(asyncio.sleep(1)),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
else:
print(f"Task succeeded: {result}")
asyncio.run(main())
超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "Completed"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
与同步代码集成
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# 模拟阻塞 I/O 操作
import time
time.sleep(2)
return "Blocking result"
async def main():
loop = asyncio.get_event_loop()
# 在线程池中运行阻塞代码
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
# 或者使用 asyncio.to_thread (Python 3.9+)
result = await asyncio.to_thread(blocking_io)
print(result)
asyncio.run(main())
最佳实践
- 始终使用
await- 忘记 await 协程会导致协程未执行 - 避免在异步函数中使用阻塞操作 - 使用线程池或专门的异步库
- 合理设置并发数 - 使用
asyncio.Semaphore限制并发
import asyncio
async def limited_fetch(sem, session, url):
async with sem:
async with session.get(url) as response:
return await response.text()
async def main():
sem = asyncio.Semaphore(10) # 最多 10 个并发请求
urls = [...] # 大量 URL
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
总结
Python 的 asyncio 提供了一套强大的异步编程工具:
- 协程 - 使用
async def定义 - 事件循环 - 调度和执行协程
- 任务 - 并发执行多个协程
- 异步上下文管理器 -
__aenter__和__aexit__ - 异步迭代器 -
__aiter__和__anext__
掌握这些概念,你就能编写出高性能的异步 Python 程序。
本文首发于技术博客,转载请注明出处。