Python SDK:潜入工人和工作流程
#编程 #python #temporal #pythonsdk

在我们的previous post中,我们宣布我们在GA中拥有Python SDK,并分享了一些历史,围绕着为什么我们建立自己的工作以及您需要进行的一些先决条件。现在,让我们深入研究,让您过去和跑步。出于本文的目的,我们将分解hello_activity.py示例中显示的单个文件并将其分为多个文件,以便我们更紧密地匹配您的应用程序的外观!

工作流程代码:workflow.py

我们要首先创建工作流,因为工人类要调用此功能。如果我们不先执行此操作,那么您的代码就不会运行,这也不是任何乐趣。

就像任何好的python文件一样,我们文件的顶部都具有我们需要做这件事的导入。我们还定义了一个python数据流,这将是我们用来将数据传递到工作流程的对象。通过将python数据流传递到工作流程中,我们可以从数据级别添加或删除字段,而不是修改调用工作流程的方式。这允许一些灵活性来进行以后的更改,而无需版本的工作流程。

在这种情况下,ComposeGreetingInput允许我们将greeting字符串和name字符串传递到工作流程中。

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

# Temporal strongly encourages using a single dataclass so 
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

ComposeGreetingInput下方,我们将添加两个新的代码,首先是一个活动。

颞python SDK配备了为您工作的装饰器。在活动的情况下,它将此方法注册为可以由工人运行的有效活动(见下文)。一个活动会吸收一些数据,对其进行处理并可以返回值。 (实际上,它就像其他任何方法一样)。我们应该召集的一件事是,如果您要做任何不确定性的事情,例如生成UUID,您想在活动或活动所要求的方法中执行此操作。<<<<<<<<< /p>

compose_greeting接收我们上面定义的ComposeGreetingInput dataclass对象,将一些信息记录到终端,然后从传递的对象返回串联字符串。我们可以很容易地将数据推到数据库中,或完成了一些计算,或完成了一些计算甚至在这种方法中呼吁第三方API。

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

现在我们有一项活动,我们将添加工作流程。此代码是您的时间脚本的骨干。在此示例中,暂时性将为您提供的所有内容都在工作流程中。
我们使用workflow.defn进行装饰以正确注册,因此临时工作者(见下文)知道这是有效的工作流程。
此示例类中的第一个方法是run,但是有几件事要召唤:
run方法用@workflow.run进行了装饰,该方法告诉工人这是我们开始工作流程时运行的方法。它可用于设置工作流量变量,召集一个或多个活动以及更多的事情。要了解更多信息,请查看:

该方法本身是一种异步方法,因为暂时的python库在引擎盖下使用asyncio,并且所谓的方法需要异步运行。
WorkFlow run方法期望Worker拨打

调用字符串name

在该方法中,我们正在调用termals workflow.execute_activity方法,该方法采用:

  • 一个活动方法参考,在这种情况下为compose_greeting
  • 参数,在这种情况下,我们正在使用Dataclasses的功能传递“ Hello”和name的名称到我们之前创建的ComposeGreetingInput Dataclass。
  • 超时。在这种情况下,我们提供了start_to_close_timeout,它告诉时间服务器在活动开始时从10秒开始超时此活动。
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

当您全部完成此文件时,它看起来像这样:

workflow.py

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

现在我们有了工作流程代码,我们将其与工作者一起将其全部绑在一起,这是时间上使用的代码,因为在队列上显示任务时,该代码可与工作流程执行。在这种情况下,我们的示例非常简单,因此脚本将在创建工人后立即将任务添加到队列中。在我们的samples repo中,您会发现更多的深入示例,示例如何构建应用程序,在这些应用程序之外,这两个脚本可以使工作流程不同步地执行其他任务。您甚至可以使用query询问有关其自身的工作流程或为您提供的数据。

执行代码:worker.py

让我们设置脚本的顶部,以便我们拥有所有正确的导入和环境。这将是我们应用程序的骨架:

worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
    print("Hello, world!")

if __name__ == "__main__":
   asyncio.run(main())

此示例将在上面的状态下运行,但它要做的就是打印“你好,世界!”到您的终端。

为了利用暂时性,我们需要代码来与临时服务器进行交谈。我们通过客户端连接到服务器。我们的代码将通过此连接将活动,信号和查询发送到临时服务器内的时间任务队列。当执行队列上的这些任务时,它们将致力于工作流程历史记录,这允许暂时知道已运行了什么代码,剩下什么代码以及您的应用程序状态在任何给定时间时的状态!

创建客户

使用相同的默认Local -Host URL路径和临时服务器端口替换上面的main()函数:

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

注意:我们不使用http://localhost:7233,而是localhost:7233

接下来,我们要添加我们称为工人的内容,这是实际上称我们为队列活动的工作流程代码的代码。

创建工人

在客户端下方的线路上,main()内部,添加以下内容:

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

运行此代码告诉临时服务器,通过传递以下信息,有一个工人准备处理任务:

  • client-允许工人伸出手来说我在这里,临时服务器,给我工作!
  • task_queue-告诉临时服务器,我仅设置为从此队列处理任务
  • workflows- python类参考的列表,称为Workflows(见下文),专门为处理运行的activities编写,以处理请求的task_queue上的任务
  • activities-可以在任务队列上处理任务的Python函数参考

此时,您的代码看起来像这样。这些代码所做的只是连接到临时服务器,并允许工作代码运行您的print语句。我们还没有意识到暂时性的全部潜力。此代码将向您的终端打印Still saying ‘hello’ to you, world!。运行此代码后,如果您转到Web UI URL(默认为koude36),则实际上不会在工作流列表中看到任何内容。

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

if __name__ == "__main__":
   asyncio.run(main())

要运行我们的GreetingWorkflow,我们要求客户执行工作流程。然后,当我们运行脚本时,临时服务器将跟踪到目前为止的代码所做的事情,您将能够在Web UI中查看Premal为您所做的所有工作。

在上面的代码中,用以下内容替换打印语句:

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           "World",
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

此代码将允许工人告诉客户:

  • 异步调用execute_workflow(即运行工作流run方法)
  • 将输入世界传递给GreetingWorkflow.run
  • 给工作流程标识符hello-activity-workflow-id
  • 将工作流生成的任务放在hello-activity-task-queue上(您可以从上面的几行中识别出较远的位置,当我们创建工作工人时)

运行全部

在您的终端中运行此操作:python worker.py

工人执行代码后,它将设置从compose_greeting返回到result的字符串,然后将其打印到终端。您最终应该在终端看到Hello, World!

在Web UI中,您会看到类似的内容,每次执行代码时都有一个工作流程:

Recent Workflows

单击其中一行的工作流ID,当您的工人执行工作流程代码时,您会看到代码和临时服务器发生的所有内容:

Workflow Detail

真实的谈话:时间是为应用程序构建的,该应用程序不仅是对您的终端打印文本更重要的应用程序,但是我们想给出一种非常简单的方法来向您展示不同的作品。在后续博客文章中,我们将向您展示一个实际应用程序,该应用程序使用暂时性将状态保持在工作流程中。

为了娱乐,如果您想做一些简单的事情,但这会给您带来更多功能,可以使用此版本的worker.py
从控制台中输入输入。

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("What's your name?")
       name = input()

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           name,
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

if __name__ == "__main__":
   asyncio.run(main())

在我的情况下,输出看起来像这样:

Hello, Matt

此代码还导致Web UI中工作流的事件历史记录中的WorkflowExecutionStarted事件略有不同。我们可以看到输入 - 马特(Mattâ)传递给了活动,而不是世界。

单击“工作流执行”在您的工作流程上以查看差异!

Workflow Execution Started

成功!

在那里,工作流和工人分解为某些领域,供您在应用程序中进行工作。希望这可以帮助您使用Python应用程序,并更好地了解如何使用时间。

想进一步迈出一步吗?查看我们的开发商倡导者的帖子,了解他们如何使用某些时间基础知识。有关完整教程,请查看Build a Temporal Application from scratch in Python