在我们的previous post中,我们宣布我们在GA中拥有Python SDK,并分享了一些历史,围绕着为什么我们建立自己的工作以及您需要进行的一些先决条件。现在,让我们深入研究,让您过去和跑步。出于本文的目的,我们将分解hello_activity.py示例中显示的单个文件并将其分为多个文件,以便我们更紧密地匹配您的应用程序的外观!
工作流程代码:workflow.py
我们要首先创建工作流,因为工人类要调用此功能。如果我们不先执行此操作,那么您的代码就不会运行,这也不是任何乐趣。
就像任何好的python文件一样,我们文件的顶部都具有我们需要做这件事的导入。我们还定义了一个python数据流,这将是我们用来将数据传递到工作流程的对象。通过将python数据流传递到工作流程中,我们可以从数据级别添加或删除字段,而不是修改调用工作流程的方式。这允许一些灵活性来进行以后的更改,而无需版本的工作流程。
在这种情况下,ComposeGreetingInput
允许我们将greeting
字符串和name
字符串传递到工作流程中。
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
# Temporal strongly encourages using a single dataclass so
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
在ComposeGreetingInput
下方,我们将添加两个新的代码,首先是一个活动。
颞python SDK配备了为您工作的装饰器。在活动的情况下,它将此方法注册为可以由工人运行的有效活动(见下文)。一个活动会吸收一些数据,对其进行处理并可以返回值。 (实际上,它就像其他任何方法一样)。我们应该召集的一件事是,如果您要做任何不确定性的事情,例如生成UUID,您想在活动或活动所要求的方法中执行此操作。<<<<<<<<< /p>
compose_greeting
接收我们上面定义的ComposeGreetingInput
dataclass对象,将一些信息记录到终端,然后从传递的对象返回串联字符串。我们可以很容易地将数据推到数据库中,或完成了一些计算,或完成了一些计算甚至在这种方法中呼吁第三方API。
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
现在我们有一项活动,我们将添加工作流程。此代码是您的时间脚本的骨干。在此示例中,暂时性将为您提供的所有内容都在工作流程中。
我们使用workflow.defn
进行装饰以正确注册,因此临时工作者(见下文)知道这是有效的工作流程。
此示例类中的第一个方法是run
,但是有几件事要召唤:
run
方法用@workflow.run
进行了装饰,该方法告诉工人这是我们开始工作流程时运行的方法。它可用于设置工作流量变量,召集一个或多个活动以及更多的事情。要了解更多信息,请查看:
该方法本身是一种异步方法,因为暂时的python库在引擎盖下使用asyncio,并且所谓的方法需要异步运行。
WorkFlow run
方法期望Worker拨打
name
在该方法中,我们正在调用termals workflow.execute_activity
方法,该方法采用:
- 一个活动方法参考,在这种情况下为
compose_greeting
- 参数,在这种情况下,我们正在使用Dataclasses的功能传递“ Hello”和
name
的名称到我们之前创建的ComposeGreetingInput
Dataclass。 - 超时。在这种情况下,我们提供了
start_to_close_timeout
,它告诉时间服务器在活动开始时从10秒开始超时此活动。
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
当您全部完成此文件时,它看起来像这样:
workflow.py
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
现在我们有了工作流程代码,我们将其与工作者一起将其全部绑在一起,这是时间上使用的代码,因为在队列上显示任务时,该代码可与工作流程执行。在这种情况下,我们的示例非常简单,因此脚本将在创建工人后立即将任务添加到队列中。在我们的samples repo中,您会发现更多的深入示例,示例如何构建应用程序,在这些应用程序之外,这两个脚本可以使工作流程不同步地执行其他任务。您甚至可以使用query询问有关其自身的工作流程或为您提供的数据。
执行代码:worker.py
让我们设置脚本的顶部,以便我们拥有所有正确的导入和环境。这将是我们应用程序的骨架:
worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
print("Hello, world!")
if __name__ == "__main__":
asyncio.run(main())
此示例将在上面的状态下运行,但它要做的就是打印“你好,世界!”到您的终端。
为了利用暂时性,我们需要代码来与临时服务器进行交谈。我们通过客户端连接到服务器。我们的代码将通过此连接将活动,信号和查询发送到临时服务器内的时间任务队列。当执行队列上的这些任务时,它们将致力于工作流程历史记录,这允许暂时知道已运行了什么代码,剩下什么代码以及您的应用程序状态在任何给定时间时的状态!
创建客户
使用相同的默认Local -Host URL路径和临时服务器端口替换上面的main()
函数:
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
注意:我们不使用http://localhost:7233
,而是localhost:7233
!
接下来,我们要添加我们称为工人的内容,这是实际上称我们为队列活动的工作流程代码的代码。
创建工人
在客户端下方的线路上,main()
内部,添加以下内容:
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
运行此代码告诉临时服务器,通过传递以下信息,有一个工人准备处理任务:
-
client
-允许工人伸出手来说我在这里,临时服务器,给我工作! -
task_queue
-告诉临时服务器,我仅设置为从此队列处理任务 -
workflows
- python类参考的列表,称为Workflows
(见下文),专门为处理运行的activities
编写,以处理请求的task_queue
上的任务 -
activities
-可以在任务队列上处理任务的Python函数参考
此时,您的代码看起来像这样。这些代码所做的只是连接到临时服务器,并允许工作代码运行您的print
语句。我们还没有意识到暂时性的全部潜力。此代码将向您的终端打印Still saying ‘hello’ to you, world!
。运行此代码后,如果您转到Web UI URL(默认为koude36),则实际上不会在工作流列表中看到任何内容。
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
if __name__ == "__main__":
asyncio.run(main())
要运行我们的GreetingWorkflow
,我们要求客户执行工作流程。然后,当我们运行脚本时,临时服务器将跟踪到目前为止的代码所做的事情,您将能够在Web UI中查看Premal为您所做的所有工作。
在上面的代码中,用以下内容替换打印语句:
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
此代码将允许工人告诉客户:
- 异步调用execute_workflow(即运行工作流
run
方法) - 将输入世界传递给
GreetingWorkflow.run
- 给工作流程标识符
hello-activity-workflow-id
- 将工作流生成的任务放在
hello-activity-task-queue
上(您可以从上面的几行中识别出较远的位置,当我们创建工作工人时)
运行全部
在您的终端中运行此操作:python worker.py
工人执行代码后,它将设置从compose_greeting
返回到result
的字符串,然后将其打印到终端。您最终应该在终端看到Hello, World!
。
在Web UI中,您会看到类似的内容,每次执行代码时都有一个工作流程:
单击其中一行的工作流ID,当您的工人执行工作流程代码时,您会看到代码和临时服务器发生的所有内容:
真实的谈话:时间是为应用程序构建的,该应用程序不仅是对您的终端打印文本更重要的应用程序,但是我们想给出一种非常简单的方法来向您展示不同的作品。在后续博客文章中,我们将向您展示一个实际应用程序,该应用程序使用暂时性将状态保持在工作流程中。
为了娱乐,如果您想做一些简单的事情,但这会给您带来更多功能,可以使用此版本的worker.py
:
从控制台中输入输入。
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("What's your name?")
name = input()
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
name,
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
在我的情况下,输出看起来像这样:
此代码还导致Web UI中工作流的事件历史记录中的WorkflowExecutionStarted
事件略有不同。我们可以看到输入 - 马特(Mattâ)传递给了活动,而不是世界。
单击“工作流执行”在您的工作流程上以查看差异!
成功!
在那里,工作流和工人分解为某些领域,供您在应用程序中进行工作。希望这可以帮助您使用Python应用程序,并更好地了解如何使用时间。
想进一步迈出一步吗?查看我们的开发商倡导者的帖子,了解他们如何使用某些时间基础知识。有关完整教程,请查看Build a Temporal Application from scratch in Python。