耐用的执行方式
#javascript #typescript #node #distributedsystems

这是该系列的第2部分,“构建Node.js中的可靠分布式系统”。在part 1中,我们介绍了什么是持久的执行,其好处以及耐用功能的外观。在这篇文章中,我们将研究时间范围如何提供持久的执行。

一个不会失败,可以永远持续并且不需要将数据存储在数据库中的函数?听起来像魔术。必须只能使用一小部分语言,或者它仅适用于专门的硬件。但是实际上,这只是JavaScriptâ您可以使用整个语言,并且可以在任何可以运行Node.js的服务器上运行。

那么这一切如何工作?您可以查看How Temporal Works图,该图用GO代码解释了该过程。在这篇文章中,我们将使用该系列中的previous post的打字稿代码进行整个过程。

客户端 - 服务器工作人员

开始,临时应用程序有三个部分:客户端,服务器和工人。

client-server-worker

客户端和工作人员都连接到具有数据库并维护状态的服务器。客户说“启动order()耐用函数”,“向delivered信号发送”和“终止功能”之类的话。该工人是一个长期运行的node.js进程,它具有我们的代码,并对任务的服务器进行轮询。任务看起来像“运行order()耐用函数”或“运行正常的sendPushNotification()函数”。工人运行代码后,它将结果报告回服务器。

在我们的交付应用程序中,我们在koude4中创建客户端,然后在我们的下一个函数中使用它:

  • koude5temporal.menu的API,它是
    • 启动order工作流程。
    • 查询其状态的order工作流程。
  • koude8drive.temporal.menu的API,它
    • pickedUpdelivered信号发送到order工作流程。
    • 获取所有order工作流和查询的列表。

我们在koude13中创建工人:

import { NativeConnection, Worker } from '@temporalio/worker'
import * as activities from 'activities'
import { taskQueue } from 'common'
import { namespace, getConnectionOptions } from 'common/lib/temporal-connection'

async function run() {
 const connection = await NativeConnection.connect(getConnectionOptions())
 const worker = await Worker.create({
   workflowsPath: require.resolve('../../../packages/workflows/'),
   activities,
   connection,
   namespace,
   taskQueue,
 })

 await worker.run()
}

run().catch((err) => {
 console.error(err)
 process.exit(1)
})

我们将工人传递给我们的代码:

并设置Render以自动构建和部署到main

render

运行每个部分

在生产中,我们的Web应用程序及其无服务器功能已部署到Vercel,我们长期运行的工作过程被部署以渲染,并且他们都与暂时云托管的服务器实例进行了交谈。该服务器是与数据库(SQL或Cassandra)和Elasticsearch一起使用的open source服务群集。您还可以托管yourself的所有内容,也可以节省大量时间,并在paying the experts托管它的可靠性和规模上更加安心来托管它ð。

在开发中,我们可以在本地运行所有三个部分。首先,我们安装了具有服务器的开发版本的临时CLI:

  • Homebrew:brew install temporal
  • cURL: curl -sSf https://temporal.download/cli.sh | sh
  • 手册:下载并提取latest release,然后将其添加到您的路径中。

我们使用:
启动服务器

temporal server start-dev

在另一个终端(具有Node V16或更高的终端)运行:

npx @temporalio/create@latest ./temporal-delivery --sample food-delivery
cd ./temporal-delivery
npm run dev

dev脚本运行两个下一个。JSWeb应用程序和工人。菜单在koude22上运行,我们可以单击“订单”,驱动程序门户位于koude23,我们可以在其中标记我们订购的项目并交付的项目。完成此操作后,我们可以在两个Web应用程序中看到该订单的“已交付”状态。我们还可以在koude24上查看服务器Web UI中相应的工作流执行的状态:

workflow-list

我们可以看到其状态已完成,当我们单击它时,我们会看到事件历史记录'在执行order()函数期间发生的事件列表:

event-history

第一个事件始终是WorkflowExecutionStarted,其中包含正在启动的工作流的类型,在这种情况下是order工作流。我们将在下一部分中更多地查看事件。

在“查询”选项卡中,我们可以选择getStatus查询,该查询(假设我们的工人仍在运行)将向order函数发送查询,该函数响应订单已交付,交付时间以及哪个项目是交付:

query

事件顺序

现在让我们看一下订单期间幕后发生的事情。

开始订单

当我们单击“订单”按钮时,API处理程序使用临时客户端将开始命令发送到服务器:

koude5

start

服务器将WorkflowExecutionStarted事件保存到事件历史记录并返回。它还创建了一个WorkflowTaskScheduled事件,该事件导致“工作流任务”(一个运行order()函数的指令)被添加到任务队列中,该工人正在对此进行轮询。

workflow-task

工作人员接收任务,服务器添加了WorkflowTaskStarted事件,在这种情况下,工人执行任务,调用order(3)koude6 function一直运行直到达到这一行:

const { chargeCustomer, refundOrder, sendPushNotification } = proxyActivities<typeof activities>({ ... })

export async function order(productId: number): Promise<void> {
 ...

 await chargeCustomer(product)

通话活动

当功能调用chargeCustomer活动时,工人告诉服务器:

call-activity

服务器添加了WorkflowTaskCompleted事件(工作流执行并不仅仅完成“运行order()函数并查看发生的事情”的初始任务)和带有活动类型和参数的ActivityTaskScheduled事件:

activity-task-scheduled

然后,服务器将活动任务添加到任务队列中的活动任务(指令运行活动函数),该工作队员拾取了该任务:

get-activity-task

在开发中,我们只运行一个工程流程,因此它正在完成所有任务,但是在生产中,我们有足够的工人来处理我们的负载,其中任何一个都可以接受活动任务。不仅是运行order函数的一个。

工人遵循活动任务说明,运行chargeCustomer()函数:

koude43

称为paymentService.charge()

koude45

如果函数引发错误,则工人将其报告回服务器:

activity-failed

,服务器安排重试。 default初始间隔为1秒,因此在1秒内,活动任务将添加回队列以供工人接收。

如果功能成功完成,则工作人员报告成功(在这种情况下没有返回值,但没有)返回服务器:

activity-completed

服务器添加了ActivityTaskStartedActivityTaskCompleted事件。现在该活动已经完成,order()函数可以继续执行,因此服务器添加了另一个WorkflowTaskScheduled事件。它还向队列添加了相应的工作流程任务,该工人拾取了该任务(这时服务器添加了另一个WorkflowTaskStarted事件)。

第二个工作流任务

如果工人仍然具有订单函数的执行上下文,则可以解决chargeCustomer(product) Promise,并且该函数将继续执行。如果工人没有执行上下文(因为它是从缓存中驱逐的,以便为另一个工作流程腾出空间,请参见WorkerOptions.maxCachedWorkflows或崩溃或重新启动的过程),则该工人从服务器中获取事件历史记录,创建一个新的分离株,然后再次调用该功能:

get-history

这次,当功能击中await chargeCustomer(product)线时,该工人从事件7,ActivityTaskCompleted中知道chargeCustomer已经运行,因此它立即解决了承诺,而不是向服务器发送“呼叫活动”命令。该功能继续运行,直到下一个await

koude56

const notPickedUpInTime = !(await condition(() => state === 'Picked up', '1 min'))

condition()将等到状态变为Picked up或1分钟。当被调用时,工人告诉服务器设置计时器:

set-timer

服务器将WorkflowTaskCompletedTimerStarted事件添加到事件历史记录中,并在数据库中设置一个计时器。当计时器关闭时,将添加TimerFired事件以及队列上的WorkflowTaskScheduled和Workflow任务,告诉工人1分钟已经开始,此时工人将知道解决condition() Promise。

发送信号

但就我们而言,这没有发生。相反,在一分钟开始之前,我们单击了驱动程序门户中的“拾取”按钮,sent a koude9 Signal到工作流程:

send-signal

服务器然后添加了两个事件:带信号信息的WorkflowExecutionSignaled事件,以及另一个WorkflowTaskScheduled事件。然后将工作流任务添加到队列中,并带有信号信息,该信号信息是由工人拾取的:

get-signal

工人然后运行pickedUp信号处理程序:

setHandler(pickedUpSignal, () => {
 if (state === 'Paid') {
   state = 'Picked up'
 }
})

koude56

处理程序将状态更改为Picked up,并调用了信号处理程序后,工人运行所有condition()功能。现在,() => state === 'Picked up'将返回true,因此工作者将解决condition() Promise并继续执行,以查看函数下一步的操作,这将确定它发送到服务器的下一个命令。

事件历史

一起,我们涵盖的活动历史的一部分是:

just-history

事件历史记录是启用持久执行的核心:工作流程所做和发送的所有内容的日志。它允许我们CTRL-C我们的工作过程,重新启动它,打开UI,选择我们已完成的order工作流程,转到UI的“查询”选项卡,然后发送getStatus查询,服务器将在该Queue上进行排队拾取的工人,该工人将在缓存中没有工作流程,因此它将从服务器中获取事件历史记录,然后调用order()函数,立即解决与历史记录最初结果的任何异步功能,然后调用getStatus处理程序功能:

setHandler(getStatusQuery, () => {
 return { state, deliveredAt, productId }
})

koude56

由于已运行了整个函数和所有信号处理程序,因此{ state, deliveredAt, productId }变量都将从最初执行该函数时具有最终值,并将返回服务器,将其返回到客户端,将它们返回到UI以显示在我们的屏幕上:

query

概括

我们检查了持久的代码如何在Hood下工作:

  • 写函数无法完成执行(因为当工作过程死亡时,下一个掌握任务的工作者将获得函数的事件历史记录,并使用事件重新运行代码,直到它处于同一代码中状态)。
  • 重试可能具有瞬态故障的功能(如果chargeCustomer活动无法达到付款服务,服务器会自动安排另一个活动任务)。

最好的部分是,这些故障对应用程序开发人员是透明的。我们只是编写我们的工作流程和活动代码,并可靠地处理它。 ð

在下一篇文章中,您将学习更多耐用功能可以做的事情。要收到通知,您可以在TwitterLinkedIn上关注我们。另外,查看我们的新Temporal 101 course

ð直到下次!
Loren

感谢Brian Hogan,Roey Berman,Patrick Rachford和Dail Magee Jr的阅读草稿。