重新发明车轮以了解Python Asyncio。
#python #并发性 #asyncio

在过去的几年中 我发现人们最近开始使用它(在实施时没有去过那里)在编码或调试时遇到了不同的问题。

在本文中,我们将在探索并发的基础之后,不使用asyncioasyncawait关键字。

这篇文章假设您已经熟悉Python,Python迭代器和发电机以及套接字编程。

什么是并发?

根据Wikipedia

并发计算是一种计算形式,在重叠时间段中同时执行几个计算

实际上,这意味着虽然我们的函数a()正在执行,但其他函数也可以执行也可以以交错的方式完成。
但是,我们的程序仍然一次执行一件事(因为我们的CPU核心能够)。

并行性(一次执行更多的事情)是一种特殊的并发形式,但我们今天不是在谈论它。

我们如何在计划中达成并发

编写并发程序的最简单方法是使用线程:您在线程中产生一个函数,它开始运行,任何时候出现机会时,我们的CPU都会在线程之间切换。

但是,线程编程等众所周知的问题,例如同步,内存使用,没有对上下文开关的控制,所有这些组合都会导致一定的可伸缩性限制(您可以找到有关在线搜索这些问题的有用资源和文章)。开发人员正在寻找更轻巧,可扩展的东西,可以与多线程(如果需要)结合使用,并带来。迭代器

与迭代器并发

我们如何与迭代器达到并发?有两个核心概念要考虑:

  • 交错。
  • 在重叠时间期间执行。

如果您考虑我们如何在不产卵线程的情况下交织不同的代码单位执行,您可能会发现您需要一种暂停/简历/简历该代码单位的方法。
查看迭代器的基本实现:

class ConcurrentUnit:
    def __init__(self, to: int):
        self._to = to
        self._i = -1

    def __iter__(self):
        return self

    def __next__(self):
        self._i += 1
        if self._i >= self._to:
            raise StopIteration
        return self._i

您已经知道一个循环只是一直致电.__next__,直到StopIteration升起。让我们滥用此操作以同时执行代码。

from typing import TypeVar

T = TypeVar('T')


class ConcurrentUnit:
    def __init__(self, to: int, return_value: T):
        self._to = to
        self._i = -1
        self._return_value = return_value

    def __iter__(self):
        return self

    def __next__(self):
        self._i += 1
        if self._i >= self._to:
            raise StopIteration(self._return_value)
        return self._i


if __name__ == '__main__':
    cu1 = ConcurrentUnit(5, 'cu1')
    cu2 = ConcurrentUnit(3, 'cu2')

    tasks = [cu1, cu2]

    while tasks:
        t = tasks.pop(0)
        try:
            step = next(t)
            print(step)
        except StopIteration as e:
            print(e.value)
        else:
            tasks.append(t)

如果运行该代码,则输出将是:

0
0
1
1
2
2
3
cu2
4
cu1

您可以看到我们的单位在重叠的时间内以交错的方式执行,因此,是的,即使没有任何好处,我们也写了并发代码。让我们详细看一下。

从行为的角度来看,ConcurrentUnit类应该非常容易理解,它正在模拟range(x)的用法(我已经省略了start以保持简单),但是它也具有带有通用类型注释的return_value参数,从执行中启用返回值。当由__next__提出时,return_value被绑定到StopIteration,我们需要手动处理它在try/except块中调用__next__(我们不能简单地使用会默默处理例外的循环)。

)。

在我们的主块中,我们创建了两个并发单元(它们可能会更多),然后将它们存储在列表中(就像我们将使用调度程序完成的,稍后再进行此操作),我们运行循环:< /p>

  • 首先,我们将第一个单元弹出列表。
  • 我们在try/except街区的设备上打电话给next,然后打印出结果。
  • 如果它提高了StopIteration,我们会得到return_value并将其打印出来。在这一点上,我们知道我们的单位已经完成。
  • 否则,我们知道我们的单位没有完成,因此我们将其附加到我们的列表中。

Python发电机

上面的代码非常奇怪,在迭代器类中包装函数的逻辑很快就会导致大意大利面代码。
幸运的是,对于我们来说,Python有发电机,这将使我们声明像迭代器一样行为的函数,此外,它们将return语句的价值绑定到StopIteration实例。

我们可以将上述代码转换为:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    cu1 = concurrent_unit(5)
    cu2 = concurrent_unit(3)

    tasks = [cu1, cu2]

    while tasks:
        t = tasks.pop(0)
        try:
            step = next(t)
            print(step)
        except StopIteration as e:
            print(e.value)
        else:
            tasks.append(t)

,代码的行为将相同。

在我们继续前进之前,您需要了解一个重要的概念,这就是 Generator对象 Generator函数之间的区别
A Generator函数是一个函数,只需返回A Generator Object :它除了创建 Generator Object Object 之外,它不会执行任何代码。可以将它们识别为功能主体至少包含一个yield
与使用__next__执行代码到下一个yield语句的迭代协议相比,由此产生的生成器对象
此概念也适用于coroutines:async def函数是 coroutine函数,返回 coroutine对象时。
从现在开始,当我说 Generator 时,我可以参考函数或对象,上下文将清楚。

构建我们自己的并发库

在本节中,我们将使用发电机来开发并发库的基本。
首先,我们将定义一个Task对象,以缠绕发电机,并在我们的花花公子上提供抽象层。然后,我们将编写一个调度程序来处理任务执行。让我们深入研究。

from collections.abc import Generator
from typing import Any, TypeVar

T = TypeVar("T")


class Task:
    def __init__(
        self,
        generator: Generator[Any, Any, T],
        *,
        name: str | None = None,
        debug: bool = False,
    ) -> None:
        self._generator = generator
        self._name = name
        self._debug = debug
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done = False

    def __repr__(self) -> str:
        return f"<Task: {self._name}, done: {self._done}>"

    def _step(self) -> None:
        if self._done:
            raise RuntimeError(f"{self}: Cannot step a done task")
        try:
            step = self._generator.send(None)
            if self._debug:
                print(f"{self}: {step}")
        except StopIteration as e:
            self._done = True
            self._result = e.value
        except Exception as e:
            self._done = True
            self._exception = e
            if self._debug:
                print(f"{self}: Exception: {e}")

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def done(self) -> bool:
        return self._done

我们的Task类存储一个发电机对象,并具有3个值得一看的属性:

  • _done指示该任务是否可以考虑完成。
  • _result指示发电机返回值,如果有。
  • _exception除了我们的发电机可能会升高的StopIteration以外的其他例外。

_step方法构建在迭代器之前使用的执行逻辑上:它代表我们任务的一个“ step” 。它在self._generator上调用nextgen.send(None)next(gen)相同),如果我们得到结果(包装在StopIteration错误中)或异常,则将其存储在相应的属性中。

您可能会问自己“为什么他只是存储例外而不是提出它?” 。在下一部分中,我将回答这个问题。到现在为止,继续为我们的任务构建调度程序:

from collections.abc import Callable, Generator
from typing import Any, TypeVar

from .tasks import Task

T = TypeVar("T")

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0

    def create_task(
        self, generator: Generator[Any, Any, T], *, name: str | None = None
    ) -> Task:
        task = Task(
            generator,
            name=name or f"Task-{self._tasks_counter}",
            debug=self._debug,
        )
        self._tasks.append(task)
        self._tasks_counter += 1
        return task

    def run_until_complete(
        self,
        generator: Generator[Any, Any, T],
        *,
        task_name: str | None = None,
    ) -> T:
        main_task = self.create_task(generator, name=task_name)
        while not main_task._done:
            for task in self._tasks:
                task._step()
                if task._done:
                    self._tasks.remove(task)
        if main_task._exception:
            raise main_task._exception
        return main_task._result

看起来很熟悉?我们的事件循环具有创建新任务对象的方法,一个用于运行它们的方法。
run_until_complete接收一个发电机,从中创建一个任务(main_task),然后运行所有计划的任务,直到main_task完成为止。执行逻辑与我们使用迭代器的第一个POC没有什么不同:我们通过self._tasks迭代,运行“一个步骤” ,任何任务完成时,我们都会从列表中删除它。

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    t1 = loop.create_task(concurrent_unit(2))
    t2 = loop.create_task(concurrent_unit(3))

    loop.run_until_complete(concurrent_unit(5))

# output
# <Task: Task-0, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-0, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-2, done: False>: 2
# <Task: Task-1, done: False>: 2
# <Task: Task-2, done: False>: 3
# <Task: Task-2, done: False>: 4

酷,但是等待呢?

正如我之前说过的,我们不允许使用asyncawait关键字,那么我们将如何在这里实现相同的功能?
到目前为止,我们了解到Coroutines只是迭代器(或更好的发电机),那么我们如何等待迭代器?如果您想到 对于loops ,那么您是对的。让我们看下面的示例:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        yield 'a'

    loop.run_until_complete(main())

# output
# <Task: Task-0, done: False>: a
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-2, done: False>: 1

您可以看到t1t2尚未完成,这是因为main()在他们面前完成。如果您查看run_until_complete源代码,您会发现,当完成main_task时,我们将退出WARE循环,无论是否仍然有Undement 任务。虽然这是预期的行为,但我们需要一种方法来等待在继续之前完成特定任务,我们将使用循环进行此操作:

def concurrent_unit(to: int) -> Generator[str]:
    for i in range(to):
        yield i
    return f"run for {to} times"


if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        for step in concurrent_unit(5):
            yield step

    loop.run_until_complete(main(), task_name='main_task')

# output
# <Task: main_task, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: main_task, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: main_task, done: False>: 2
# <Task: main_task, done: False>: 3
# <Task: Task-2, done: False>: 2
# <Task: main_task, done: False>: 4

这次完成所有任务。
在进行错误之前,我们必须再次感谢Python和Generators:我们可以使用yield from语法:yield from genfor x in gen: yield x相同。从现在开始,我们将使用yield from作为我们的await(这就是Python在引擎盖下也是如此)。

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        yield from concurrent_unit(5)

正如我在上述代码引入一些陷阱之前所述。由于我们无法等待任务,因此我们无法等待任务,实际上您可能已经注意到我没有从concurrent_unit(5)中产生任务。要拥有一定的一致性,我们必须找到一种方法来完成yield from任务。
我们可以编写一个辅助功能,该功能接收任务对象并一直调用_step直到完成为止,但这也会与事件循环拨打_step相抵触。我们可以使Task成为定义__iter____next__的迭代器,并且可以使用(您可以将yield from与迭代器一起使用)。但是,发电机通常比迭代器快(我不会潜入它,如果您对该主题感兴趣,您可以在Google上找到有用的资源搜索),因此我选择在任务接口,一个生成器函数上编写新方法只是屈服于事件循环,直到完成任务为止。

class Task:
    def __init__(
        self,
        generator: Generator[Any, Any, T],
        *,
        name: str | None = None,
        debug: bool = False,
    ) -> None:
        self._generator = generator
        self._name = name
        self._debug = debug
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done = False

    def __repr__(self) -> str:
        return f"<Task: {self._name}, done: {self._done}>"

    def _step(self) -> None:
        if self._done:
            raise RuntimeError(f"{self}: Cannot step a done task")
        try:
            step = self._generator.send(None)
            if self._debug:
                print(f"{self}: {step}")
        except StopIteration as e:
            self._done = True
            self._result = e.value
        except Exception as e:
            self._done = True
            self._exception = e
            if self._debug:
                print(f"{self}: Exception: {e}")

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def done(self) -> bool:
        return self._done

    def wait(self) -> T:
        while not self._done:
            yield
        if self._exception:
            raise self._exception
        return self._result

现在我们可以使用新的等待逻辑来重构我们的示例:

if __name__ == '__main__':
    loop = EventLoop(debug=True)

    def main():
        t1 = loop.create_task(concurrent_unit(2))
        t2 = loop.create_task(concurrent_unit(3))
        t3 = loop.create_task(concurrent_unit(5))
        # yield from task.wait() will either raise
        # task._exception (if it's not None) or
        # return task._result.
        # This means that exceptions do not propagate
        # until the task is awaited.
        result = yield from t3.wait()
        yield 'a'
        yield 'b'

    loop.run_until_complete(main(), task_name='main_task')

# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-3, done: False>: 0
# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-3, done: False>: 1
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 2
# <Task: main_task, done: False>: None
# <Task: Task-2, done: False>: 2
# <Task: Task-3, done: False>: 3
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 4
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: a
# <Task: main_task, done: False>: b

您可以看到,我们正在等待任务,从其wait方法产生。没有这种方法,只要我们需要检索任务的结果(值或异常),我们就应该访问相关属性。
正如Python Asyncio所做的那样,我们通过等待来传播例外:抓到它们后,我们不会立即重新启动,但是当任务等待时。

我想提一下关于python中await的最后一件事:对此的普遍误解是,当我们await await或一个任务时,我们告诉事件循环到”但是,从上一个示例的输出中可以看到,提出“ ”,在我们的yield from(以及so await)语句之前安排的任务仍在与已久的任务(t3)中交织在一起。
await真正讲述的是事件循环(这是一个近似值,因为事件循环不知道它)是:“ 在我等待的任务完成之前,请勿执行当前的任务。同时,您仍然可以运行其他计划的任务“其中当前任务main,而任务我正在等待在我们当前的上下文中是t3。<
同样,该句子描述了await行为,但这并不是真的,因为任务无法控制事件循环的作用。实际上,我们正在照顾防止在t3完成之前执行当前任务,而不是对事件循环进行说明。

运行阻止代码

有时您可能需要使用阻止功能(无法屈服于事件循环的功能)。您可以使用线程运行此类功能,以避免阻止事件循环的执行,而最有效的方法之一就是使用ThreadPool。
由于我们只是在探索核心概念,因此我们不会自己实现线程池,但是我们只使用一种方法来在新线程中产生可可。您可以通过自己的搜索或通过Python concurrent.futures.thread源代码来了解有关ThreadPools搜索的更多信息。

我们可以修改事件循环实现以处理一组工作线程:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()

    def _spawn(self, callable: Callable[..., Any]):
        thread = threading.Thread(target=callable)
        thread.start()
        self._workers.add(thread)

    ...

上述代码将起作用,但是有两个问题:

  • 每当我们想使用参数和关键字参数时,我们都必须依靠functools.partial
  • 我们需要一种方法来检索执行结果。

要解决这些问题,我们可以编写一个封装所有属性和我们需要的逻辑的类,并更新EventLoop._spawn签名以匹配它:

class _Work:
    def __init__(
        self, fn: Callable[..., T], /, *args, **kwargs
    ) -> None:
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.result: T | None = None
        self.exception: Exception | None = None

    def run(self) -> None:
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception as e:
            self.exception = e
        else:
            self.result = result

您可能已经注意到已经出现了一个模式:我们有resultexception,我们可以编写像Task这样的wait Generator方法,以与非阻止代码互操作。


要清洁,让我们将常见的等待逻辑放在基本界面中:

T = TypeVar('T')


class Waitable(ABC, Generic[T]):
    @abstractmethod
    def wait(self) -> Generator[Any, Any, T]:
        ...

然后,我们可以使Task_WorkWaitable继承。
但是,如果您开始考虑该逻辑的所有可能应用程序,则可能会提出一个更好的解决方案。虽然Task是一个特殊情况,但_Work的用例可能会在将来重新出现,我们应该为此构建可重复使用的界面:

class Waiter(Waitable):
    def __init__(self) -> None:
        self._result: T | None = None
        self._exception: Exception | None = None
        self._done: bool = False

    def __repr__(self) -> str:
        return f'<Waiter: done: {self.done()}>'

    def done(self) -> bool:
        return self._done

    def result(self) -> T:
        return self._result

    def exception(self) -> Exception:
        return self._exception

    def set_result(self, result: T) -> None:
        if self._done:
            raise RuntimeError('Waiter is already done')
        self._done = True
        self._result = result

    def set_exception(self, exception: Exception) -> None:
        if self._done:
            raise RuntimeError('Waiter is already done')
        self._done = True
        self._exception = exception

    def wait(self) -> Generator[Any, Any, T]:
        while not self.done():
            yield
        if self._exception:
            raise self._exception
        return self._result

Waiter可能类似于您的python Future对象..再一次,你是对的。我们定义了一个对象,该对象是可以通过其他功能设置的将来结果的占位符(值或异常)。这也是同步的基本基础。

让我们在_Work类中使用Waiter

class _Work:
    def __init__(
        self, waiter: Waiter, fn: Callable[..., T], /, *args, **kwargs
    ) -> None:
        self.waiter = waiter
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self) -> None:
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception as e:
            self.waiter.set_exception(e)
        else:
            self.waiter.set_result(result)

现在更新事件循环实现:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()

    def _spawn(self, work: _Work):
        thread = threading.Thread(target=work.run)
        thread.start()
        self._workers.add(thread)

    def run_in_thread(self, fn: Callable[..., T], /, *args, **kwargs) -> Waiter[T]:
        waiter = Waiter()
        work = _Work(waiter, fn, *args, **kwargs)
        self._spawn(work)
        return waiter

    ...

让我们尝试一下:

if __name__ == '__main__':
    # set it to `True` to better understand the behavior
    loop = EventLoop(debug=False)

    def blockingf(i: int) -> int:
        time.sleep(1)
        return f'BLOCKING finished after {i} seconds'

    def genf(i: int) -> Generator[Any, Any, str]:
        for j in range(i):
            yield i
        return f'non-blocking finished after {i} iterations'

    def main():
        t1 = loop.create_task(genf(3))
        t2 = loop.create_task(genf(2))

        w1 = loop.run_in_thread(blockingf, 2)
        res_blocking = yield from w1.wait()

        res1 = yield from t1.wait()
        res2 = yield from t2.wait()

        print(res1, res2, res_blocking, sep='\n')

    loop.run_until_complete(main())

# output
# non-blocking finished after 3 iterations
# non-blocking finished after 2 iterations
# BLOCKING finished after 2 seconds

我们已成功将阻止代码与“ non Blocking” 代码。
在下一节中,我们将使用到目前为止开发的内容构建并发网络服务。

插座和选择器

在这篇文章的开头,我要求您了解有关套接字编程的知识。
如果您曾经使用Python建立了TCP服务,则最终与如何处理并发连接的旧问题保持联系。
Python插座编程Howto有一个有关非阻止插座和select的部分(我建议您阅读它,go here)。
现在,我们将使用selectors使用selectors,这是select的高级接口。
如果您想阅读有关选择器的更多信息,则可以浏览文档,但是出于本文的目的,可以理解我们可以使用选择器:

  • 注册一个套接字对象,等待读取事件,将数据关联到它(任何对象,甚至是可呼叫)。
  • 更新事件类型或注册套接字的数据。
  • 取消注册的插座。
  • 致电selector.select获取即将插座的列表(其中还有其他信息,例如事件类型和我们相关的数据)。我们可以肯定地断言selector.select返回的套接字对象上的调用方法不会阻止。

首先,我们需要实现在事件循环中注册/取消注册插座的方法:

class EventLoop:
    def __init__(self, *, debug: bool = False) -> None:
        self._debug = debug
        self._tasks: list[Task] = []
        self._tasks_counter: int = 0
        self._workers: set[threading.Thread] = set()
        self._selector = selectors.DefaultSelector()

    ...

    def _create_waiter(self) -> Waiter:
        return Waiter()

    def add_reader(self, fd: int, callback: Callable[..., None]) -> None:
        try:
            self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_READ, callback)

    def remove_reader(self, fd: int) -> None:
        try:
            self._selector.unregister(fd)
        except KeyError:
            pass

    def add_writer(self, fd: int, callback: Callable[..., None]) -> None:
        try:
            self._selector.get_key(fd)
        except KeyError:
            self._selector.register(fd, selectors.EVENT_WRITE, callback)

    def remove_writer(self, fd: int) -> None:
        try:
            self._selector.unregister(fd)
        except KeyError:
            pass

selectors.DefaultSelector返回了您平台的最佳选择器实现,而_create_waiter只是一种便利方法。
然后,我们有可以注册插座的方法(一个用于阅读事件,一个用于写入事件),而将其注册。

这样,我们可以定义构建TCP服务器所需的非阻滞方法。对于本文的范围,我们只需要3:socket.acceptsocket.recvsocket.sendall

class EventLoop:

    ...

    def _sock_recv(
        self, sock: socket.socket, nbytes: int, waiter: Waiter
    ) -> None:
        try:
            result = sock.recv(nbytes)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_recv(
        self, sock: socket.socket, nbytes: int
    ) -> Generator[Any, Any, bytes]:
        waiter = self._create_waiter()
        self.add_reader(
            sock.fileno(),
            functools.partial(self._sock_recv, sock, nbytes, waiter),
        )
        res = yield from waiter.wait()
        return res

    def _sock_sendall(
        self, sock: socket.socket, data: bytes, waiter: Waiter
    ) -> None:
        try:
            result = sock.sendall(data)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_sendall(
        self, sock: socket.socket, data: bytes
    ) -> Generator[Any, Any, None]:
        waiter = self._create_waiter()
        self.add_writer(
            sock.fileno(),
            functools.partial(self._sock_sendall, sock, data, waiter),
        )
        res = yield from waiter.wait()
        return res

    def _sock_accept(self, sock: socket.socket, waiter: Waiter) -> None:
        try:
            result = sock.accept()
        except (BlockingIOError, InterruptedError):
            return
        except Exception as e:
            waiter.set_exception(e)
        else:
            waiter.set_result(result)

    def sock_accept(
        self, sock: socket.socket
    ) -> Generator[Any, Any, tuple[socket.socket, Any]]:
        waiter = self._create_waiter()
        self.add_reader(
            sock.fileno(), functools.partial(self._sock_accept, sock, waiter)
        )
        res = yield from waiter.wait()
        return res

    def process_events(
        self,
        events: list[tuple[selectors.SelectorKey, type[selectors.EVENT_READ]]],
    ) -> None:
        for key, mask in events:
            fileobj, callback = key.fileobj, key.data
            callback()
            if mask & selectors.EVENT_READ:
                self.remove_reader(fileobj)
            if mask & selectors.EVENT_WRITE:
                self.remove_writer(fileobj)

    def run_until_complete(
        self,
        generator: Generator[Any, Any, T],
        *,
        task_name: str | None = None,
    ) -> T:
        main_task = self.create_task(generator, name=task_name)
        while not main_task._done:
            ready = self._selector.select(0)
            if ready:
                self.process_events(ready)
            for task in self._tasks:
                task._step()
                if task._done:
                    self._tasks.remove(task)
        if main_task._exception:
            raise main_task._exception
        return main_task._result

    def close(self) -> None:
        for thread in self._workers:
            thread.join()
        self._selector.close()

要了解该代码的情况,让我们看一下_sock_recvsock_recv
前者采用插座和一个Waiter对象,试图运行socket.recv并在服务员上存储结果或任何例外。
后者是一个生成器函数,可创建服务员对象并调用add_reader注册套接字和_sock_recv作为回调(我们使用functools.partial来绑定参数以使其简单)。然后,它等待服务员(从其wait方法屈服)并返回结果。我们知道,Waiter.wait只能屈服直到服务员完成(这意味着_sock_recv已致电set_resultset_exception)。

您可以在run_until_complete中看到,在每次迭代中,我们都会收到我们已在add_readeradd_writer注册的现成插座列表。正如我们之前说的那样

所以如果我们写:

...
data = yield from loop.sock_recv(sock_obj, 1024)
...

我们正在执行以下操作:

  1. 我们创建了一个新的服务员,我们将loop._sock_recv(sock_obj, 1024, waiter)安排在loop._selector.select会告诉我们SOCK_OBJ一旦准备就绪。
  2. 我们在等待服务员。正如我们之前学到的,sock_recv直到完成yield from waiter.wait()之前才继续前进,但是其他计划的任务仍在运行。
  3. 在某个时候,在run_until_complete中的Wile循环的迭代中,选择器将为我们提供sock_obj和计划的回调(_sock_recv带有步骤1的签名)。 process_events将运行将在我们服务员身上设置结果的回调。
  4. sock_recv获得了waiter的结果(所以它退出了yield from waiter.wait()行)并返回。

我们现在准备构建并发的TCP服务。在下一部分中,我们将通过到目前为止所学到的知识来构建回声服务。

并发回声服务

在这一点上,我们的服务的实施应直接。

loop = EventLoop()


def process_client(client: socket.socket, address: tuple[str, int]) -> None:
    print('New client:', address)
    try:
        while True:
            data = yield from loop.sock_recv(client, 1024)
            print(address, data)
            if not data:
                break
            yield from loop.sock_sendall(client, data)
    finally:
        client.close()
        print('Client closed:', address)


def main1():
    server = socket.create_server(
        ('127.0.0.1', 1234), family=socket.AF_INET, backlog=5, reuse_port=True
    )
    server.setblocking(False)
    while True:
        client, address = yield from loop.sock_accept(server)
        client.setblocking(False)
        loop.create_task(process_client(client, address))


loop.run_until_complete(main1())

在我们的主要功能中,我们创建“服务器” 套接字并将其设置为非阻滞。然后,我们输入一个段循环,并接受loop.sock_accept的新连接。每当我们得到一个时,也将其设置为非障碍物,并从process_client开始一项新任务以处理它。
process_client本身,只要在loop.sock_recv上等待数据,只要获得任何数据,就可以将数据传输回客户。

让我们运行它。我将使用两个带有Telnet的终端连接并发送数据。
输出应该是:

New client: ('127.0.0.1', 39698)
New client: ('127.0.0.1', 39700)
('127.0.0.1', 39700) b'hello world 1\r\n'
('127.0.0.1', 39698) b'hello world 2\r\n'
('127.0.0.1', 39698) b'I go now\r\n'
('127.0.0.1', 39698) b''
Client closed: ('127.0.0.1', 39698)
('127.0.0.1', 39700) b'me too\r\n'
('127.0.0.1', 39700) b''
Client closed: ('127.0.0.1', 39700)

结论

最终,我们既不使用异步也不直接等待并发服务,但是我们已经用类似的基本实现代替了它们,以了解其背后的核心概念。
这篇文章的目的不是要提供更好的性能或更清洁的python asyncio,而是为了 dimistify asyncawait and await for asyncio初学者, re-Building 已经存在功能。
借助与并发编程的基本构建块,您可以实现任何类型的并发功能。考虑异步队列。它将像:
一样简单

class Queue(Generic[T]):
    def __init__(self, max_size: int = -1) -> None:
        self._max_size = max_size
        self._queue: deque[T] = deque()

    def __repr__(self) -> str:
        return f"<Queue max_size={self._max_size} size={len(self._queue)}>"

    def qsize(self) -> int:
        return len(self._queue)

    def empty(self) -> bool:
        return not self._queue

    def full(self) -> bool:
        if self._max_size < 0:
            return False
        return len(self._queue) >= self._max_size

    def put(self, item: T) -> Generator[Any, Any, None]:
        while self.full():
            yield
        self._queue.append(item)

    def get(self) -> Generator[Any, Any, T]:
        while self.empty():
            yield
        return self._queue.pop()

    def put_nowait(self, item: T) -> None:
        if self.full():
            raise RuntimeError(f"{self} is full")
        self._queue.append(item)

    def get_nowait(self) -> T:
        if self.empty():
            raise RuntimeError(f"{self} is empty")
        return self._queue.pop()

还是取消任务呢?您可以挑战自己以最有效的方式实施它(并更新EventLoop.close以取消剩余任务)。

我希望这次旅行对您有用,我迫不及待地想在评论中阅读您的反馈。