与Python中AWS API的异步相互作用简介
#aws #python #asyncio

世界是异步,就像最近proclaimed at re:invent 2022的亚马逊CTO Werner Vogels博士。他的意思是,即使我们大多数人编写同步代码,尤其是在使用Python等语言工作时,异步的系统和操作本质和计算机科学都无处不在。今天,我想探索Python中有关与AWS服务的相互作用的异步计算的当前状态。我将解释基本的构建块以及如何使用它们通过示例与AWS服务进行交互。

编写异步代码的目标之一是改善资源利用率。特别是从处理器的角度来看,诸如与网络上的存储设备或服务进行交互之类的I/O操作非常慢。在等待Web服务的响应时,处理器can perform a lot of meaningful work。它也在操作系统级别上做到这一点,但是在我们自己的Python代码中,情况通常不是这种情况。

默认情况下,python中的大多数I/O操作都是 blocking 操作,这意味着它们会阻止程序执行的流动,直到完成为止。尤其是当我们的代码打电话给第三方API(如AWS)时,如果我们在等待响应时不做一些事情,我们会在桌面上留出很多宝贵的计算时间。但是我们该怎么做呢?答案在于标准库的asyncio模块。还有其他方法,例如(多)threadingmultiprocessing,我们将不会进入这里。如果您不熟悉Asyncio,我强烈建议您You check out this article by realpython.com。这是该主题的绝佳概述。

我将对Python的基本构建块提供高度简化的解释。有关更多详细信息,请阅读上面提到的文章。自Python 3.4以来,在Python中编写异步代码是可能的,尽管该功能成熟到Python 3.7。这两个最重要的关键字是asyncawait。我们在函数之前使用async将其定义为异步。在引擎盖下,这意味着在等待事情发生时可以暂停。它还允许我们在功能正文中使用await关键字,该功能正文不允许在async函数外部使用。

基本上,您可以调用已定义为异步的函数,并将收到所谓的等待作为即时响应。该功能不会立即执行,但是当有可用的计算时间时。调用异步功能后,您可以继续定期的程序流。如果您需要函数调用的结果,则您将await the等待,它将阻止您的代码直到执行其他函数。

这是一个示例函数,除了等待随机时间外,没有多大操作。您通常会调用第三方API或从存储中读取文件。

import asyncio
import random
import time
import typing


async def sleepy_boi(name: str) -> int:
    """Sleeps for a bit and returns the number of seconds it slept for."""
    # In practice, this would be I/O operations
    sleep_for_s = random.randint(1, 5)
    await asyncio.sleep(sleep_for_s)
    print(f"Sleepy Boi {name} woke up after {sleep_for_s}s")
    return sleep_for_s

我们需要另一个难题,看看这如何使我们受益。 asyncio.gather函数允许您传递多个等待设备,并且可以等待所有这些功能完成计算。这是上面的示例的扩展,该示例接收到启动的异步操作的数量,然后根据需要多次调用sleepy_boi

async def sleepy_bois(number_of_bois: int) -> typing.List[int]:
    """Calls many sleepy bois"""

    return await asyncio.gather(
        *[sleepy_boi(f"boi_{i}") for i in range(number_of_bois)]
    )

如果您密切关注,您已经注意到我们只能在定义为async的函数中使用await,那么我们如何等待 first 异步函数?使用asyncio.run(sleepy_bois(argument))调用了第一个异步函数,该功能将隐含地等待该功能。

def main():
    """Main function that triggers the async magic."""

    start = time.perf_counter()

    result = asyncio.run(sleepy_bois(5))

    runtime = time.perf_counter() - start
    print(f"It took {runtime:.3f}s to execute, the total wait time was {sum(result)}s.")



if __name__ == "__main__":
    main()

这是一堆代码;让我们执行它以查看会发生什么。

$ python async_example.py
Sleepy Boi boi_4 woke up after 1s
Sleepy Boi boi_0 woke up after 4s
Sleepy Boi boi_2 woke up after 4s
Sleepy Boi boi_3 woke up after 4s
Sleepy Boi boi_1 woke up after 5s
It took 5.005s to execute, the total wait time was 18s.

您可以看到,即使总的等待时间为18秒,五个呼叫大约在5秒内完成。这是异步执行的力量。

听起来很棒。如何将其与Boto3一起使用?好吧...你不能直接。 Boto3中没有本机支持,但幸运的是,第三方包裹在Boto3周围包裹以使其成为可能。它被称为koude11,已经存在了一段时间。它几乎支持boto3所做的一切,但是语法略有不同,因为所有异步和等待。

您可以使用PIP:pip install aioboto3安装模块,然后查看docs以获取一些使用信息。老实说,该文档可以使用更多的工作,但这并不那么棘手。让我向您展示如何与现实世界的示例一起使用。

设想

在最近的一个项目中,我们具有一个lambda函数,该函数基于配置执行了一些计算(大约是通用的)。它是从带有配置标识符的步骤函数触发的,下载配置,然后从S3获取一些数据,执行转换并将其存储在S3。

中。

不幸的是,我们在部署过程中遇到了问题,因为某些配置与Lambda功能当前迭代的功能不符。由于这种深入的配置验证是非平凡的,因此我们选择了以下方法。我们在lambda事件中添加了一个可选的system_test标志,如果是这样,则该函数将执行所有操作,除了存储输出数据外,因此不会更改任何状态 1

System Check Architecture

要执行此系统测试,我们需要使用每个配置标识符同步触发lambda函数(Invocation类型RequestResponse)并检查输出。后来,这将通过具有地图状态和一些并行性的步骤函数来完成。此问题的短期解决方案涉及将成为部署管道的一部分的脚本。

在伪代码中,此脚本非常微不足道:

all_configurations = list_configurations()
for configuration in all_configurations:
    response = invoke_lambda(configuration.identifier)
    report_result(response)

问题在于,由于没有并行化,这是非常效率的。由于这主要是I/O限制,即我们正在等待Lambda功能做出响应,因此异步方法可以使我们受益。在看看这一点之前,让我向您展示一个虚拟lambda功能,您可以使用:

import random
import time

def lambda_handler(event, context):
    print(event)
    sleep_time = random.randint(1, 10)
    print(f"Sleeping for {sleep_time}s")
    time.sleep(sleep_time)

    if random.randint(1, 100) <= 15:
        # 15% chance of errors
        raise ValueError("Something went wrong!")

    return {
        "configuration_identifier": event.get("configuration_identifier"),
        "slept_for": sleep_time,
        "system_test_successful": True
    }

此lambda函数在一到十之间的随机数量。它在所有调用中大约15%失败,模拟了损坏的配置。在所有其他情况下,它都会返回处理成功的信息。让我们现在看一下实施。如果您有兴趣,可以找到complete code on Github

主函数设置日志记录并生成一些虚拟配置标识符,然后使用asyncio.run()来启动系统测试以调用异步功能。测试完成后,它将记录有关结果的一些信息。

def main() -> None:
    """Generates configurations for the system test, triggers it, and reports the result."""

    LOGGER.addHandler(logging.StreamHandler(sys.stdout))
    LOGGER.setLevel(logging.INFO)

    configuration_identifiers = [f"config_{n}" for n in range(40)]

    start_time = time.perf_counter()
    results = asyncio.run(run_system_test_for_configurations(configuration_identifiers))
    runtime_in_s = time.perf_counter() - start_time

    LOGGER.info("The system test took %.3fs", runtime_in_s)

    counts = collections.Counter(results)
    LOGGER.info(
        "%s of %s passed the test - %s failed",
        counts[True],
        len(configuration_identifiers),
        counts[False],
    )


if __name__ == "__main__":
    main()

run_system_test_for_configurations异步函数呼叫run_system_test,用于在参数中收到的每种配置,并使用asyncio.gather等待所有结果并将它们返回单个列表中。

async def run_system_test_for_configurations(
    configuration_identifiers: typing.List[str],
) -> typing.List[bool]:
    """Schedules system tests for all configurations and waits until they're done."""

    return await asyncio.gather(
        *[
            run_system_test(config_identifier)
            for config_identifier in configuration_identifiers
        ]
    )

run_system_test是魔术发生的地方。它实例化了一个aioboto3.Session,您可以用它来获取从boto3中使用的异步客户端或资源对象。然后,它使用此会话来创建lambda客户端并调用lambda函数。注意调用之前的await。这会导致它暂停,直到API调用返回响应。响应可用后,我们检查是否发生了错误,请相应地报告并返回False。否则,我们会检索有效负载,这是任何异步操作,并在返回功能结果之前对其进行调试。

async def run_system_test(configuration_identifier: str) -> bool:
    """Invoke the lambda function with the configuration identifier and parse the result"""

    session = aioboto3.Session()

    async with session.client("lambda") as lambda_client:

        response = await lambda_client.invoke(
            FunctionName=LAMBDA_NAME,
            InvocationType="RequestResponse",
            Payload=json.dumps(
                {
                    "configuration_identifier": configuration_identifier,
                    "system_test": True,
                }
            ),
            LogType="Tail",
        )

    if "FunctionError" in response:
        await format_error(configuration_identifier, response)
        return False

    LOGGER.info("Configuration %s PASSED the system test", configuration_identifier)
    payload: dict = json.loads(await response["Payload"].read())
    LOGGER.debug("Payload %s", json.dumps(payload))

    return payload.get("system_test_successful", False)

有一些奇怪的地方可以将您的头缠绕在这里。首先,我们需要一个会话来创建客户端。您可以在boto3中类似地执行此操作,但是通常使用隐式会话。对于AIOBOTO3,会议是强制性的。另一个意外的行为是await response["Payload"].read()。我一开始就被这被绊倒了,但是如果您认为asyncio的全部要点是使I/O操作异步,那是有道理的。 API调用返回StreamingBody数据类型,并且读取它是I/O。

的一种形式

如果运行脚本,我们将找到与上面纯Python示例相似的结果。我们能够在大约14秒内测试40个配置,并且此过程应该很好地扩展。

$ python aio_boto3.py
[...]
Configuration config_28 PASSED the system test
Configuration config_36 FAILED the system test
The error message is: Unhandled
============================== Log output ==============================
START RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c Version: $LATEST
{'configuration_identifier': 'config_36', 'system_test': True}
Sleeping for 8s
    raise ValueError("Something went wrong!")2, in lambda_handler
END RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c
REPORT RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c  Duration: 8012.19 ms    Billed Duration: 8013 ms        Memory Size: 4096 MB    Max Memory Used: 70 MB  Init Duration: 471.54 ms

============================== End of log ==============================
Configuration config_35 PASSED the system test
Configuration config_12 PASSED the system test
Configuration config_7 PASSED the system test
Configuration config_31 PASSED the system test
Configuration config_10 PASSED the system test
The system test took 13.927s
35 of 40 passed the test - 5 failed

今天就这样。我已经解释了异步计算背后的动机以及如何在Python中使用。我还向您展示了如何使用它的真实示例。

希望您从这个博客中获得了一些东西,我期待您的问题,反馈和疑虑。

maurice

P.S。:Check out the code on Github

Lan Pham上的标题照片Unsplash


  1. 是的,这不是一个非常有效的实现。它使用了相当多的计算能力。然而,这是一个非常有价值的后部署后检查,因为它是带有实际数据的深入检查。这是务实的。