异步编程入门指南¶
摘要¶
C10k问题 仍然是程序员需要解决的一个难题。通常,开发人员会通过 **线程**、**epoll** 或 **kqueue** 来处理大量的 I/O 操作,以避免软件等待耗时的任务。然而,由于数据共享和任务依赖性,开发易读且无错误的并发代码具有挑战性。即使一些强大的工具,如 Valgrind,可以帮助开发人员检测死锁或其他异步问题,但在软件规模扩大时,解决这些问题可能非常耗时。因此,许多编程语言(如 Python、Javascript 或 C++)致力于开发更好的库、框架或语法来帮助程序员正确地管理并发任务。本文主要关注异步编程模式背后的设计理念,而不是如何使用现代并行 API。
使用线程是开发人员分派任务而不阻塞主线程的一种更自然的方式。但是,线程可能会导致性能问题,例如锁定关键部分以执行一些原子操作。虽然在某些情况下使用事件循环可以提高性能,但由于回调问题(例如,回调地狱),编写易读的代码具有挑战性。幸运的是,像 Python 这样的编程语言引入了 async/await
的概念,以帮助开发人员编写易于理解且高性能的代码。下图展示了使用 async/await
处理套接字连接(类似于使用线程)的主要目标。

引言¶
处理 I/O 操作(如网络连接)是程序中最耗时的任务之一。以一个简单的 TCP 阻塞回显服务器为例(以下代码片段)。如果客户端成功连接到服务器但没有发送任何请求,它就会阻塞其他客户端的连接。即使客户端尽快发送数据,如果没有任何客户端尝试建立连接,服务器也无法处理其他请求。此外,处理多个请求效率低下,因为它浪费了大量时间等待来自硬件(如网络接口)的 I/O 响应。因此,并发套接字编程对于管理大量的请求变得不可避免。
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10)
while True:
conn, addr = s.accept()
msg = conn.recv(1024)
conn.send(msg)
防止服务器等待 I/O 操作的一个可能的解决方案是将任务分派到其他线程。以下示例展示了如何创建一个线程来同时处理连接。但是,创建大量线程可能会消耗所有计算能力而没有高吞吐量。更糟糕的是,应用程序可能会浪费时间等待锁来处理关键部分中的任务。虽然使用线程可以解决套接字服务器的阻塞问题,但其他因素(如 CPU 利用率)对于程序员克服 C10k 问题至关重要。因此,在不创建无限线程的情况下,事件循环是管理连接的另一种解决方案。
import threading
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
def handler(conn):
while True:
msg = conn.recv(65535)
conn.send(msg)
while True:
conn, addr = s.accept()
t = threading.Thread(target=handler, args=(conn,))
t.start()
一个简单的事件驱动的套接字服务器包括三个主要组件:一个 I/O 多路复用模块(例如,select)、一个调度程序(循环)和回调函数(事件)。例如,以下服务器在一个循环中使用高级 I/O 多路复用 selectors 来检查 I/O 操作是否就绪。如果数据可供读取/写入,循环获取 I/O 事件并执行回调函数 accept
、read
或 write
来完成任务。
import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from functools import partial
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)
sel = DefaultSelector()
def accept(s, mask):
conn, addr = s.accept()
conn.setblocking(False)
sel.register(conn, EVENT_READ, read)
def read(conn, mask):
msg = conn.recv(65535)
if not msg:
sel.unregister(conn)
return conn.close()
sel.modify(conn, EVENT_WRITE, partial(write, msg=msg))
def write(conn, mask, msg=None):
if msg:
conn.send(msg)
sel.modify(conn, EVENT_READ, read)
sel.register(s, EVENT_READ, accept)
while True:
events = sel.select()
for e, m in events:
cb = e.data
cb(e.fileobj, m)
虽然通过线程管理连接可能效率不高,但使用事件循环来调度任务的程序不容易阅读。为了提高代码的可读性,包括 Python 在内的许多编程语言引入了协程、future 或 async/await 等抽象概念来处理 I/O 多路复用。为了更好地理解编程术语并正确使用它们,以下部分将讨论这些概念是什么以及它们试图解决什么样的问题。
回调函数¶
回调函数用于在事件被调用时控制运行时的数据流。但是,保留当前回调函数的状态具有挑战性。例如,如果程序员想要在 TCP 服务器上实现握手,他/她可能需要在某个地方存储先前状态。
import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from functools import partial
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)
sel = DefaultSelector()
is_hello = {}
def accept(s, mask):
conn, addr = s.accept()
conn.setblocking(False)
is_hello[conn] = False;
sel.register(conn, EVENT_READ, read)
def read(conn, mask):
msg = conn.recv(65535)
if not msg:
sel.unregister(conn)
return conn.close()
# check whether handshake is successful or not
if is_hello[conn]:
sel.modify(conn, EVENT_WRITE, partial(write, msg=msg))
return
# do a handshake
if msg.decode("utf-8").strip() != "hello":
sel.unregister(conn)
return conn.close()
is_hello[conn] = True
def write(conn, mask, msg=None):
if msg:
conn.send(msg)
sel.modify(conn, EVENT_READ, read)
sel.register(s, EVENT_READ, accept)
while True:
events = sel.select()
for e, m in events:
cb = e.data
cb(e.fileobj, m)
虽然变量 is_hello
有助于存储状态以检查握手是否成功,但代码变得更难理解。实际上,先前实现的概念很简单。它等同于以下代码片段(阻塞版本)。
def accept(s):
conn, addr = s.accept()
success = handshake(conn)
if not success:
conn.close()
def handshake(conn):
data = conn.recv(65535)
if not data:
return False
if data.decode('utf-8').strip() != "hello":
return False
conn.send(b"hello")
return True
为了将类似的结构从阻塞迁移到非阻塞,一个函数(或任务)需要在需要等待 I/O 操作时捕获当前状态,包括参数、变量和断点。此外,调度程序应该能够在 I/O 操作完成后重新进入函数并执行剩余的代码。与 C++ 等其他编程语言不同,Python 可以轻松实现上述概念,因为它的 **生成器** 可以保留所有状态,并通过调用内置函数 next()
来重新进入。通过利用生成器,可以在事件循环内以非阻塞的形式处理 I/O 操作(如前面的代码片段),这称为 *内联回调*。
事件循环¶
事件循环是一个调度程序,用于管理程序中的任务,而不是依赖于操作系统。以下代码片段展示了一个简单的事件循环如何异步处理套接字连接。实现的概念是将任务附加到一个 FIFO 作业队列中,并在 I/O 操作未就绪时注册一个 *选择器*。此外,一个 *生成器* 保留任务的状态,使其能够在 I/O 结果可用时执行其剩余作业,而无需回调函数。因此,通过观察事件循环的工作原理,将有助于理解 Python 生成器实际上是一种 *协程*。
# loop.py
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
class Loop(object):
def __init__(self):
self.sel = DefaultSelector()
self.queue = []
def create_task(self, task):
self.queue.append(task)
def polling(self):
for e, m in self.sel.select(0):
self.queue.append((e.data, None))
self.sel.unregister(e.fileobj)
def is_registered(self, fileobj):
try:
self.sel.get_key(fileobj)
except KeyError:
return False
return True
def register(self, t, data):
if not data:
return False
if data[0] == EVENT_READ:
if self.is_registered(data[1]):
self.sel.modify(data[1], EVENT_READ, t)
else:
self.sel.register(data[1], EVENT_READ, t)
elif data[0] == EVENT_WRITE:
if self.is_registered(data[1]):
self.sel.modify(data[1], EVENT_WRITE, t)
else:
self.sel.register(data[1], EVENT_WRITE, t)
else:
return False
return True
def accept(self, s):
conn, addr = None, None
while True:
try:
conn, addr = s.accept()
except BlockingIOError:
yield (EVENT_READ, s)
else:
break
return conn, addr
def recv(self, conn, size):
msg = None
while True:
try:
msg = conn.recv(1024)
except BlockingIOError:
yield (EVENT_READ, conn)
else:
break
return msg
def send(self, conn, msg):
size = 0
while True:
try:
size = conn.send(msg)
except BlockingIOError:
yield (EVENT_WRITE, conn)
else:
break
return size
def once(self):
self.polling()
unfinished = []
for t, data in self.queue:
try:
data = t.send(data)
except StopIteration:
continue
if self.register(t, data):
unfinished.append((t, None))
self.queue = unfinished
def run(self):
while self.queue or self.sel.get_map():
self.once()
通过将作业分配到事件循环中来处理连接,编程模式类似于使用线程来管理 I/O 操作,但使用用户级调度程序。此外,PEP 380 启用了生成器委托,它允许一个生成器等待其他生成器完成其作业。显然,以下代码片段比使用回调函数处理 I/O 操作更直观且更易读。
# foo.py
# $ python3 foo.py &
# $ nc localhost 5566
import socket
from selectors import EVENT_READ, EVENT_WRITE
# import loop.py
from loop import Loop
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)
loop = Loop()
def handler(conn):
while True:
msg = yield from loop.recv(conn, 1024)
if not msg:
conn.close()
break
yield from loop.send(conn, msg)
def main():
while True:
conn, addr = yield from loop.accept(s)
conn.setblocking(False)
loop.create_task((handler(conn), None))
loop.create_task((main(), None))
loop.run()
使用 yield from
语法与事件循环一起可以管理连接而不会阻塞主线程,这是 Python 3.5 之前 asyncio
模块的使用方式。但是,使用 yield from
语法是不明确的,因为它可能会让程序员陷入困境:为什么添加 @asyncio.coroutine
会使生成器变成协程?PEP 492 建议协程应该成为 Python 中一个独立的概念,而不是使用 yield from
来处理异步操作,这就是新的语法 async/await
被引入以增强异步编程可读性的原因。
什么是协程?¶
Python 文档将协程定义为子例程的广义形式。但是,这个定义不明确,阻碍了开发人员理解协程是什么。根据前面的讨论,事件循环负责调度生成器执行特定任务,这类似于将作业分派到线程。在这种情况下,生成器充当线程的角色,负责“例行工作”。显然,协程是一个术语,表示由程序中的事件循环而不是操作系统调度执行的任务。以下代码片段展示了 @coroutine
的作用。此装饰器主要将函数转换为生成器函数,并使用包装器 types.coroutine
来保持向后兼容性。
import asyncio
import inspect
import types
from functools import wraps
from asyncio.futures import Future
def coroutine(func):
"""Simple prototype of coroutine"""
if inspect.isgeneratorfunction(func):
return types.coroutine(func)
@wraps(func)
def coro(*a, **k):
res = func(*a, **k)
if isinstance(res, Future) or inspect.isgenerator(res):
res = yield from res
return res
return types.coroutine(coro)
@coroutine
def foo():
yield from asyncio.sleep(1)
print("Hello Foo")
loop = asyncio.get_event_loop()
loop.run_until_complete(loop.create_task(foo()))
loop.close()
结论¶
由于现代语法和库的支持,通过事件循环进行异步编程现在变得更加简单和易读。大多数编程语言(包括 Python)都实现了库来通过与新语法交互来管理任务调度。虽然新语法一开始看起来很神秘,但它们为程序员提供了一种在代码中开发逻辑结构的方法,就像使用线程一样。此外,在任务完成后无需调用回调函数,程序员无需担心如何将当前任务状态(如局部变量和参数)传递到其他回调中。因此,程序员将能够专注于开发他们的程序,而无需浪费大量时间来解决并发问题。