asyncio 概述
asyncio 是 Python 3.4 之后引入的一个标准库,用于编写单线程并发网络代码。它使用事件循环(event loop)来管理 I/O 操作,从而避免了传统的多线程或多进程带来的复杂性和性能开销。asyncio 的主要特点是它支持异步 I/O 操作、任务(task)调度和协程(coroutine)。
asyncio 原理性解析
1 事件循环
事件循环是 asyncio 的核心,它负责监听 I/O 事件、执行回调函数以及调度协程。事件循环通常通过 asyncio.get_event_loop()
获取,并使用 run_until_complete()
或 create_task()
来执行协程。但通常我们使用 asyncio.run()
来启动最顶层的协程,并自动创建和关闭事件循环。
1 2 3 4 5 6 7
| import asyncio
async def main(): loop = asyncio.get_running_loop() print(loop)
asyncio.run(main())
|
2 协程
协程是一种轻量级的线程,能够在单线程内实现非阻塞的 I/O 操作。在 asyncio 中,协程通过 async def
定义,并使用 await
关键字来挂起和恢复执行。协程可以看作是一段可以暂停和恢复的代码块,它允许程序在等待 I/O 操作完成时不阻塞其他代码的执行。
1 2 3 4 5 6 7 8 9
| import asyncio
async def hello_world(): print("Hello") await asyncio.sleep(1) print("World")
asyncio.run(hello_world())
|
3 任务
任务是 asyncio 中协程的封装,它可以被提交到事件循环中执行。任务通过 asyncio.create_task()
或 asyncio.ensure_future()
创建,并返回一个 Future
对象。任务在事件循环中运行,当协程执行完毕时,任务也随之完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import asyncio
async def task(name, delay): await asyncio.sleep(delay) print(f"Task {name} completed")
async def main(): tasks = [asyncio.create_task(task(f"Task-{i}", i)) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
|
asyncio 实战
1 异步 I/O 操作
asyncio 提供了丰富的异步 I/O 操作函数,如 asyncio.open_connection()
、asyncio.read()
和 asyncio.write()
等。这些函数都是基于协程的,能够在等待 I/O 操作时不阻塞事件循环。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import asyncio async def fetch_data(host, port): reader, writer = await asyncio.open_connection(host, port) data = await reader.read(100) print(f'Received {data}') writer.close() await writer.wait_closed() async def main(): await asyncio.gather(fetch_data('example.com', 80), fetch_data('google.com', 80)) asyncio.run(main())
|
一个抓取网站的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import asyncio import sys
async def fetch_http(host, port, path="/"): reader, writer = await asyncio.open_connection(host, port)
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\n\r\n".encode()
writer.write(request) await writer.drain()
response = b"" while True: data = await reader.read(1024) if not data: break response += data
writer.close() await writer.wait_closed()
return response
async def main(): host = "example.com" port = 80
response = await fetch_http(host, port)
print(response.decode())
if __name__ == "__main__": asyncio.run(main())
|
另外一种使用aiohttp的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import aiohttp import asyncio
async def fetch(session, url): async with session.get(url) as response: return await response.text()
async def main(): async with aiohttp.ClientSession() as session: content = await fetch(session, 'http://example.com') print(content)
asyncio.run(main())
|
2 任务调度
asyncio 允许在事件循环中调度多个任务,通过 asyncio.gather()
或 await
关键字实现。这样可以在单线程内实现并发的 I/O 操作,提高程序的执行效率。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import asyncio async def task1(): print('Task 1 started') await asyncio.sleep(2) print('Task 1 finished') return 'Task 1 result' async def task2(): print('Task 2 started') await asyncio.sleep(1) print('Task 2 finished') return 'Task 2 result' async def main(): tasks = [task1(), task2()] done, pending = await asyncio.wait(tasks) for task in done: print(f'Task result: {task.result()}') asyncio.run(main())
|
3 异步 WebSocket 服务器
使用 aiohttp
可以创建一个异步 WebSocket 服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import aiohttp import asyncio
async def websocket_handler(request): ws = aiohttp.web.WebSocketResponse() await ws.prepare(request)
async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: await ws.send_str("Hello, {}".format(msg.data)) else: break
return ws
app = aiohttp.web.Application() app.add_routes([aiohttp.web.get('/ws', websocket_handler)])
aiohttp.web.run_app(app)
|
注意事项与最佳实践
1 避免阻塞操作
在使用 asyncio 时,应尽量避免在协程中执行阻塞操作,如同步 I/O、CPU 密集型计算等。这些操作会阻塞事件循环,导致其他协程无法及时执行。对于需要执行阻塞操作的场景,可以考虑使用线程池或进程池来执行。
2 合理使用异步库
asyncio 提供了丰富的异步 I/O 操作函数,但并非所有库都支持异步操作。在选择第三方库时,应优先考虑支持异步操作的库,以充分利用 asyncio 的优势。
3 调试与异常处理
由于 asyncio 的异步特性,调试和异常处理相对复杂。建议使用专门的异步调试工具,并合理处理协程中的异常,确保程序的健壮性。