异步编程入门指南

摘要

C10k问题 仍然是程序员需要解决的一个难题。通常,开发人员会通过 **线程**、**epoll** 或 **kqueue** 来处理大量的 I/O 操作,以避免软件等待耗时的任务。然而,由于数据共享和任务依赖性,开发易读且无错误的并发代码具有挑战性。即使一些强大的工具,如 Valgrind,可以帮助开发人员检测死锁或其他异步问题,但在软件规模扩大时,解决这些问题可能非常耗时。因此,许多编程语言(如 Python、Javascript 或 C++)致力于开发更好的库、框架或语法来帮助程序员正确地管理并发任务。本文主要关注异步编程模式背后的设计理念,而不是如何使用现代并行 API。

使用线程是开发人员分派任务而不阻塞主线程的一种更自然的方式。但是,线程可能会导致性能问题,例如锁定关键部分以执行一些原子操作。虽然在某些情况下使用事件循环可以提高性能,但由于回调问题(例如,回调地狱),编写易读的代码具有挑战性。幸运的是,像 Python 这样的编程语言引入了 async/await 的概念,以帮助开发人员编写易于理解且高性能的代码。下图展示了使用 async/await 处理套接字连接(类似于使用线程)的主要目标。

../_images/event-loop-vs-thread.png

引言

处理 I/O 操作(如网络连接)是程序中最耗时的任务之一。以一个简单的 TCP 阻塞回显服务器为例(以下代码片段)。如果客户端成功连接到服务器但没有发送任何请求,它就会阻塞其他客户端的连接。即使客户端尽快发送数据,如果没有任何客户端尝试建立连接,服务器也无法处理其他请求。此外,处理多个请求效率低下,因为它浪费了大量时间等待来自硬件(如网络接口)的 I/O 响应。因此,并发套接字编程对于管理大量的请求变得不可避免。

import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10)

while True:
    conn, addr = s.accept()
    msg = conn.recv(1024)
    conn.send(msg)

防止服务器等待 I/O 操作的一个可能的解决方案是将任务分派到其他线程。以下示例展示了如何创建一个线程来同时处理连接。但是,创建大量线程可能会消耗所有计算能力而没有高吞吐量。更糟糕的是,应用程序可能会浪费时间等待锁来处理关键部分中的任务。虽然使用线程可以解决套接字服务器的阻塞问题,但其他因素(如 CPU 利用率)对于程序员克服 C10k 问题至关重要。因此,在不创建无限线程的情况下,事件循环是管理连接的另一种解决方案。

import threading
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)

def handler(conn):
    while True:
        msg = conn.recv(65535)
        conn.send(msg)

while True:
    conn, addr = s.accept()
    t = threading.Thread(target=handler, args=(conn,))
    t.start()

一个简单的事件驱动的套接字服务器包括三个主要组件:一个 I/O 多路复用模块(例如,select)、一个调度程序(循环)和回调函数(事件)。例如,以下服务器在一个循环中使用高级 I/O 多路复用 selectors 来检查 I/O 操作是否就绪。如果数据可供读取/写入,循环获取 I/O 事件并执行回调函数 acceptreadwrite 来完成任务。

import socket

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from functools import partial

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)

sel = DefaultSelector()

def accept(s, mask):
    conn, addr = s.accept()
    conn.setblocking(False)
    sel.register(conn, EVENT_READ, read)

def read(conn, mask):
    msg = conn.recv(65535)
    if not msg:
        sel.unregister(conn)
        return conn.close()
    sel.modify(conn, EVENT_WRITE, partial(write, msg=msg))

def write(conn, mask, msg=None):
    if msg:
        conn.send(msg)
    sel.modify(conn, EVENT_READ, read)

sel.register(s, EVENT_READ, accept)
while True:
    events = sel.select()
    for e, m in events:
        cb = e.data
        cb(e.fileobj, m)

虽然通过线程管理连接可能效率不高,但使用事件循环来调度任务的程序不容易阅读。为了提高代码的可读性,包括 Python 在内的许多编程语言引入了协程、future 或 async/await 等抽象概念来处理 I/O 多路复用。为了更好地理解编程术语并正确使用它们,以下部分将讨论这些概念是什么以及它们试图解决什么样的问题。

回调函数

回调函数用于在事件被调用时控制运行时的数据流。但是,保留当前回调函数的状态具有挑战性。例如,如果程序员想要在 TCP 服务器上实现握手,他/她可能需要在某个地方存储先前状态。

import socket

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from functools import partial

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)

sel = DefaultSelector()
is_hello = {}

def accept(s, mask):
    conn, addr = s.accept()
    conn.setblocking(False)
    is_hello[conn] = False;
    sel.register(conn, EVENT_READ, read)

def read(conn, mask):
    msg = conn.recv(65535)
    if not msg:
        sel.unregister(conn)
        return conn.close()

    # check whether handshake is successful or not
    if is_hello[conn]:
        sel.modify(conn, EVENT_WRITE, partial(write, msg=msg))
        return

    # do a handshake
    if msg.decode("utf-8").strip() != "hello":
        sel.unregister(conn)
        return conn.close()

    is_hello[conn] = True

def write(conn, mask, msg=None):
    if msg:
        conn.send(msg)
    sel.modify(conn, EVENT_READ, read)

sel.register(s, EVENT_READ, accept)
while True:
    events = sel.select()
    for e, m in events:
        cb = e.data
        cb(e.fileobj, m)

虽然变量 is_hello 有助于存储状态以检查握手是否成功,但代码变得更难理解。实际上,先前实现的概念很简单。它等同于以下代码片段(阻塞版本)。

def accept(s):
    conn, addr = s.accept()
    success = handshake(conn)
    if not success:
        conn.close()

def handshake(conn):
    data = conn.recv(65535)
    if not data:
        return False
    if data.decode('utf-8').strip() != "hello":
        return False
    conn.send(b"hello")
    return True

为了将类似的结构从阻塞迁移到非阻塞,一个函数(或任务)需要在需要等待 I/O 操作时捕获当前状态,包括参数、变量和断点。此外,调度程序应该能够在 I/O 操作完成后重新进入函数并执行剩余的代码。与 C++ 等其他编程语言不同,Python 可以轻松实现上述概念,因为它的 **生成器** 可以保留所有状态,并通过调用内置函数 next() 来重新进入。通过利用生成器,可以在事件循环内以非阻塞的形式处理 I/O 操作(如前面的代码片段),这称为 *内联回调*。

事件循环

事件循环是一个调度程序,用于管理程序中的任务,而不是依赖于操作系统。以下代码片段展示了一个简单的事件循环如何异步处理套接字连接。实现的概念是将任务附加到一个 FIFO 作业队列中,并在 I/O 操作未就绪时注册一个 *选择器*。此外,一个 *生成器* 保留任务的状态,使其能够在 I/O 结果可用时执行其剩余作业,而无需回调函数。因此,通过观察事件循环的工作原理,将有助于理解 Python 生成器实际上是一种 *协程*。

# loop.py

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

class Loop(object):
    def __init__(self):
        self.sel = DefaultSelector()
        self.queue = []

    def create_task(self, task):
        self.queue.append(task)

    def polling(self):
        for e, m in self.sel.select(0):
            self.queue.append((e.data, None))
            self.sel.unregister(e.fileobj)

    def is_registered(self, fileobj):
        try:
            self.sel.get_key(fileobj)
        except KeyError:
            return False
        return True

    def register(self, t, data):
        if not data:
            return False

        if data[0] == EVENT_READ:
            if self.is_registered(data[1]):
                self.sel.modify(data[1], EVENT_READ, t)
            else:
                self.sel.register(data[1], EVENT_READ, t)
        elif data[0] == EVENT_WRITE:
            if self.is_registered(data[1]):
                self.sel.modify(data[1], EVENT_WRITE, t)
            else:
                self.sel.register(data[1], EVENT_WRITE, t)
        else:
            return False

        return True

    def accept(self, s):
        conn, addr = None, None
        while True:
            try:
                conn, addr = s.accept()
            except BlockingIOError:
                yield (EVENT_READ, s)
            else:
                break
        return conn, addr

    def recv(self, conn, size):
        msg = None
        while True:
            try:
                msg = conn.recv(1024)
            except BlockingIOError:
                yield (EVENT_READ, conn)
            else:
                break
        return msg

    def send(self, conn, msg):
        size = 0
        while True:
            try:
                size = conn.send(msg)
            except BlockingIOError:
                yield (EVENT_WRITE, conn)
            else:
                break
        return size

    def once(self):
        self.polling()
        unfinished = []
        for t, data in self.queue:
            try:
                data = t.send(data)
            except StopIteration:
                continue

            if self.register(t, data):
                unfinished.append((t, None))

        self.queue = unfinished

    def run(self):
        while self.queue or self.sel.get_map():
            self.once()

通过将作业分配到事件循环中来处理连接,编程模式类似于使用线程来管理 I/O 操作,但使用用户级调度程序。此外,PEP 380 启用了生成器委托,它允许一个生成器等待其他生成器完成其作业。显然,以下代码片段比使用回调函数处理 I/O 操作更直观且更易读。

# foo.py
# $ python3 foo.py &
# $ nc localhost 5566

import socket

from selectors import EVENT_READ, EVENT_WRITE

# import loop.py
from loop import Loop

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5566))
s.listen(10240)
s.setblocking(False)

loop = Loop()

def handler(conn):
    while True:
        msg = yield from loop.recv(conn, 1024)
        if not msg:
            conn.close()
            break
        yield from loop.send(conn, msg)

def main():
    while True:
        conn, addr = yield from loop.accept(s)
        conn.setblocking(False)
        loop.create_task((handler(conn), None))

loop.create_task((main(), None))
loop.run()

使用 yield from 语法与事件循环一起可以管理连接而不会阻塞主线程,这是 Python 3.5 之前 asyncio 模块的使用方式。但是,使用 yield from 语法是不明确的,因为它可能会让程序员陷入困境:为什么添加 @asyncio.coroutine 会使生成器变成协程?PEP 492 建议协程应该成为 Python 中一个独立的概念,而不是使用 yield from 来处理异步操作,这就是新的语法 async/await 被引入以增强异步编程可读性的原因。

什么是协程?

Python 文档将协程定义为子例程的广义形式。但是,这个定义不明确,阻碍了开发人员理解协程是什么。根据前面的讨论,事件循环负责调度生成器执行特定任务,这类似于将作业分派到线程。在这种情况下,生成器充当线程的角色,负责“例行工作”。显然,协程是一个术语,表示由程序中的事件循环而不是操作系统调度执行的任务。以下代码片段展示了 @coroutine 的作用。此装饰器主要将函数转换为生成器函数,并使用包装器 types.coroutine 来保持向后兼容性。

import asyncio
import inspect
import types

from functools import wraps
from asyncio.futures import Future

def coroutine(func):
    """Simple prototype of coroutine"""
    if inspect.isgeneratorfunction(func):
        return types.coroutine(func)

    @wraps(func)
    def coro(*a, **k):
        res = func(*a, **k)
        if isinstance(res, Future) or inspect.isgenerator(res):
            res = yield from res
        return res
    return types.coroutine(coro)

@coroutine
def foo():
    yield from asyncio.sleep(1)
    print("Hello Foo")

loop = asyncio.get_event_loop()
loop.run_until_complete(loop.create_task(foo()))
loop.close()

结论

由于现代语法和库的支持,通过事件循环进行异步编程现在变得更加简单和易读。大多数编程语言(包括 Python)都实现了库来通过与新语法交互来管理任务调度。虽然新语法一开始看起来很神秘,但它们为程序员提供了一种在代码中开发逻辑结构的方法,就像使用线程一样。此外,在任务完成后无需调用回调函数,程序员无需担心如何将当前任务状态(如局部变量和参数)传递到其他回调中。因此,程序员将能够专注于开发他们的程序,而无需浪费大量时间来解决并发问题。

参考文献

  1. asyncio — 异步 I/O

  2. PEP 342 - 通过增强生成器实现协程

  3. PEP 380 - 子生成器委托语法

  4. PEP 492 - 使用 async 和 await 语法实现协程