世界是异步,就像最近proclaimed at re:invent 2022的亚马逊CTO Werner Vogels博士。他的意思是,即使我们大多数人编写同步代码,尤其是在使用Python等语言工作时,异步的系统和操作本质和计算机科学都无处不在。今天,我想探索Python中有关与AWS服务的相互作用的异步计算的当前状态。我将解释基本的构建块以及如何使用它们通过示例与AWS服务进行交互。
编写异步代码的目标之一是改善资源利用率。特别是从处理器的角度来看,诸如与网络上的存储设备或服务进行交互之类的I/O操作非常慢。在等待Web服务的响应时,处理器can perform a lot of meaningful work。它也在操作系统级别上做到这一点,但是在我们自己的Python代码中,情况通常不是这种情况。
默认情况下,python中的大多数I/O操作都是 blocking 操作,这意味着它们会阻止程序执行的流动,直到完成为止。尤其是当我们的代码打电话给第三方API(如AWS)时,如果我们在等待响应时不做一些事情,我们会在桌面上留出很多宝贵的计算时间。但是我们该怎么做呢?答案在于标准库的asyncio模块。还有其他方法,例如(多)threading和multiprocessing,我们将不会进入这里。如果您不熟悉Asyncio,我强烈建议您You check out this article by realpython.com。这是该主题的绝佳概述。
我将对Python的基本构建块提供高度简化的解释。有关更多详细信息,请阅读上面提到的文章。自Python 3.4以来,在Python中编写异步代码是可能的,尽管该功能成熟到Python 3.7。这两个最重要的关键字是async
和await
。我们在函数之前使用async
将其定义为异步。在引擎盖下,这意味着在等待事情发生时可以暂停。它还允许我们在功能正文中使用await
关键字,该功能正文不允许在async
函数外部使用。
基本上,您可以调用已定义为异步的函数,并将收到所谓的等待作为即时响应。该功能不会立即执行,但是当有可用的计算时间时。调用异步功能后,您可以继续定期的程序流。如果您需要函数调用的结果,则您将await
the等待,它将阻止您的代码直到执行其他函数。
这是一个示例函数,除了等待随机时间外,没有多大操作。您通常会调用第三方API或从存储中读取文件。
import asyncio
import random
import time
import typing
async def sleepy_boi(name: str) -> int:
"""Sleeps for a bit and returns the number of seconds it slept for."""
# In practice, this would be I/O operations
sleep_for_s = random.randint(1, 5)
await asyncio.sleep(sleep_for_s)
print(f"Sleepy Boi {name} woke up after {sleep_for_s}s")
return sleep_for_s
我们需要另一个难题,看看这如何使我们受益。 asyncio.gather
函数允许您传递多个等待设备,并且可以等待所有这些功能完成计算。这是上面的示例的扩展,该示例接收到启动的异步操作的数量,然后根据需要多次调用sleepy_boi
:
async def sleepy_bois(number_of_bois: int) -> typing.List[int]:
"""Calls many sleepy bois"""
return await asyncio.gather(
*[sleepy_boi(f"boi_{i}") for i in range(number_of_bois)]
)
如果您密切关注,您已经注意到我们只能在定义为async
的函数中使用await
,那么我们如何等待 first 异步函数?使用asyncio.run(sleepy_bois(argument))
调用了第一个异步函数,该功能将隐含地等待该功能。
def main():
"""Main function that triggers the async magic."""
start = time.perf_counter()
result = asyncio.run(sleepy_bois(5))
runtime = time.perf_counter() - start
print(f"It took {runtime:.3f}s to execute, the total wait time was {sum(result)}s.")
if __name__ == "__main__":
main()
这是一堆代码;让我们执行它以查看会发生什么。
$ python async_example.py
Sleepy Boi boi_4 woke up after 1s
Sleepy Boi boi_0 woke up after 4s
Sleepy Boi boi_2 woke up after 4s
Sleepy Boi boi_3 woke up after 4s
Sleepy Boi boi_1 woke up after 5s
It took 5.005s to execute, the total wait time was 18s.
您可以看到,即使总的等待时间为18秒,五个呼叫大约在5秒内完成。这是异步执行的力量。
听起来很棒。如何将其与Boto3一起使用?好吧...你不能直接。 Boto3中没有本机支持,但幸运的是,第三方包裹在Boto3周围包裹以使其成为可能。它被称为koude11,已经存在了一段时间。它几乎支持boto3所做的一切,但是语法略有不同,因为所有异步和等待。
您可以使用PIP:pip install aioboto3
安装模块,然后查看docs以获取一些使用信息。老实说,该文档可以使用更多的工作,但这并不那么棘手。让我向您展示如何与现实世界的示例一起使用。
设想
在最近的一个项目中,我们具有一个lambda函数,该函数基于配置执行了一些计算(大约是通用的)。它是从带有配置标识符的步骤函数触发的,下载配置,然后从S3获取一些数据,执行转换并将其存储在S3。
中。不幸的是,我们在部署过程中遇到了问题,因为某些配置与Lambda功能当前迭代的功能不符。由于这种深入的配置验证是非平凡的,因此我们选择了以下方法。我们在lambda事件中添加了一个可选的system_test
标志,如果是这样,则该函数将执行所有操作,除了存储输出数据外,因此不会更改任何状态 1 。
要执行此系统测试,我们需要使用每个配置标识符同步触发lambda函数(Invocation类型RequestResponse
)并检查输出。后来,这将通过具有地图状态和一些并行性的步骤函数来完成。此问题的短期解决方案涉及将成为部署管道的一部分的脚本。
在伪代码中,此脚本非常微不足道:
all_configurations = list_configurations()
for configuration in all_configurations:
response = invoke_lambda(configuration.identifier)
report_result(response)
问题在于,由于没有并行化,这是非常效率的。由于这主要是I/O限制,即我们正在等待Lambda功能做出响应,因此异步方法可以使我们受益。在看看这一点之前,让我向您展示一个虚拟lambda功能,您可以使用:
import random
import time
def lambda_handler(event, context):
print(event)
sleep_time = random.randint(1, 10)
print(f"Sleeping for {sleep_time}s")
time.sleep(sleep_time)
if random.randint(1, 100) <= 15:
# 15% chance of errors
raise ValueError("Something went wrong!")
return {
"configuration_identifier": event.get("configuration_identifier"),
"slept_for": sleep_time,
"system_test_successful": True
}
此lambda函数在一到十之间的随机数量。它在所有调用中大约15%失败,模拟了损坏的配置。在所有其他情况下,它都会返回处理成功的信息。让我们现在看一下实施。如果您有兴趣,可以找到complete code on Github。
主函数设置日志记录并生成一些虚拟配置标识符,然后使用asyncio.run()
来启动系统测试以调用异步功能。测试完成后,它将记录有关结果的一些信息。
def main() -> None:
"""Generates configurations for the system test, triggers it, and reports the result."""
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
LOGGER.setLevel(logging.INFO)
configuration_identifiers = [f"config_{n}" for n in range(40)]
start_time = time.perf_counter()
results = asyncio.run(run_system_test_for_configurations(configuration_identifiers))
runtime_in_s = time.perf_counter() - start_time
LOGGER.info("The system test took %.3fs", runtime_in_s)
counts = collections.Counter(results)
LOGGER.info(
"%s of %s passed the test - %s failed",
counts[True],
len(configuration_identifiers),
counts[False],
)
if __name__ == "__main__":
main()
run_system_test_for_configurations
异步函数呼叫run_system_test
,用于在参数中收到的每种配置,并使用asyncio.gather
等待所有结果并将它们返回单个列表中。
async def run_system_test_for_configurations(
configuration_identifiers: typing.List[str],
) -> typing.List[bool]:
"""Schedules system tests for all configurations and waits until they're done."""
return await asyncio.gather(
*[
run_system_test(config_identifier)
for config_identifier in configuration_identifiers
]
)
run_system_test
是魔术发生的地方。它实例化了一个aioboto3.Session
,您可以用它来获取从boto3中使用的异步客户端或资源对象。然后,它使用此会话来创建lambda客户端并调用lambda函数。注意调用之前的await
。这会导致它暂停,直到API调用返回响应。响应可用后,我们检查是否发生了错误,请相应地报告并返回False
。否则,我们会检索有效负载,这是任何异步操作,并在返回功能结果之前对其进行调试。
async def run_system_test(configuration_identifier: str) -> bool:
"""Invoke the lambda function with the configuration identifier and parse the result"""
session = aioboto3.Session()
async with session.client("lambda") as lambda_client:
response = await lambda_client.invoke(
FunctionName=LAMBDA_NAME,
InvocationType="RequestResponse",
Payload=json.dumps(
{
"configuration_identifier": configuration_identifier,
"system_test": True,
}
),
LogType="Tail",
)
if "FunctionError" in response:
await format_error(configuration_identifier, response)
return False
LOGGER.info("Configuration %s PASSED the system test", configuration_identifier)
payload: dict = json.loads(await response["Payload"].read())
LOGGER.debug("Payload %s", json.dumps(payload))
return payload.get("system_test_successful", False)
有一些奇怪的地方可以将您的头缠绕在这里。首先,我们需要一个会话来创建客户端。您可以在boto3中类似地执行此操作,但是通常使用隐式会话。对于AIOBOTO3,会议是强制性的。另一个意外的行为是await response["Payload"].read()
。我一开始就被这被绊倒了,但是如果您认为asyncio
的全部要点是使I/O操作异步,那是有道理的。 API调用返回StreamingBody
数据类型,并且读取它是I/O。
如果运行脚本,我们将找到与上面纯Python示例相似的结果。我们能够在大约14秒内测试40个配置,并且此过程应该很好地扩展。
$ python aio_boto3.py
[...]
Configuration config_28 PASSED the system test
Configuration config_36 FAILED the system test
The error message is: Unhandled
============================== Log output ==============================
START RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c Version: $LATEST
{'configuration_identifier': 'config_36', 'system_test': True}
Sleeping for 8s
raise ValueError("Something went wrong!")2, in lambda_handler
END RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c
REPORT RequestId: 4356aa76-9d26-4601-ace2-f6ded8cf8d0c Duration: 8012.19 ms Billed Duration: 8013 ms Memory Size: 4096 MB Max Memory Used: 70 MB Init Duration: 471.54 ms
============================== End of log ==============================
Configuration config_35 PASSED the system test
Configuration config_12 PASSED the system test
Configuration config_7 PASSED the system test
Configuration config_31 PASSED the system test
Configuration config_10 PASSED the system test
The system test took 13.927s
35 of 40 passed the test - 5 failed
今天就这样。我已经解释了异步计算背后的动机以及如何在Python中使用。我还向您展示了如何使用它的真实示例。
希望您从这个博客中获得了一些东西,我期待您的问题,反馈和疑虑。
maurice
P.S。:Check out the code on Github
-
是的,这不是一个非常有效的实现。它使用了相当多的计算能力。然而,这是一个非常有价值的后部署后检查,因为它是带有实际数据的深入检查。这是务实的。