这是该系列的第2部分,“构建Node.js中的可靠分布式系统”。在part 1中,我们介绍了什么是持久的执行,其好处以及耐用功能的外观。在这篇文章中,我们将研究时间范围如何提供持久的执行。
一个不会失败,可以永远持续并且不需要将数据存储在数据库中的函数?听起来像魔术。必须只能使用一小部分语言,或者它仅适用于专门的硬件。但是实际上,这只是JavaScriptâ您可以使用整个语言,并且可以在任何可以运行Node.js的服务器上运行。
那么这一切如何工作?您可以查看How Temporal Works图,该图用GO代码解释了该过程。在这篇文章中,我们将使用该系列中的previous post的打字稿代码进行整个过程。
客户端 - 服务器工作人员
开始,临时应用程序有三个部分:客户端,服务器和工人。
客户端和工作人员都连接到具有数据库并维护状态的服务器。客户说“启动order()
耐用函数”,“向delivered
信号发送”和“终止功能”之类的话。该工人是一个长期运行的node.js进程,它具有我们的代码,并对任务的服务器进行轮询。任务看起来像“运行order()
耐用函数”或“运行正常的sendPushNotification()
函数”。工人运行代码后,它将结果报告回服务器。
在我们的交付应用程序中,我们在koude4中创建客户端,然后在我们的下一个函数中使用它:
-
koude5是temporal.menu的API,它是
- 启动
order
工作流程。 - 查询其状态的
order
工作流程。
- 启动
-
koude8是drive.temporal.menu的API,它
- 将
pickedUp
和delivered
信号发送到order
工作流程。 - 获取所有
order
工作流和查询的列表。
- 将
我们在koude13中创建工人:
import { NativeConnection, Worker } from '@temporalio/worker'
import * as activities from 'activities'
import { taskQueue } from 'common'
import { namespace, getConnectionOptions } from 'common/lib/temporal-connection'
async function run() {
const connection = await NativeConnection.connect(getConnectionOptions())
const worker = await Worker.create({
workflowsPath: require.resolve('../../../packages/workflows/'),
activities,
connection,
namespace,
taskQueue,
})
await worker.run()
}
run().catch((err) => {
console.error(err)
process.exit(1)
})
我们将工人传递给我们的代码:
- 通往我们Workflows的道路,它与Webpack和run in isolated contexts捆绑在一起,以确保determinism。
- 我们导入的活动 <我们的
order
工作流程所调用的不耐用函数:koude3, koude16, and koude17。
并设置Render以自动构建和部署到main
:
运行每个部分
在生产中,我们的Web应用程序及其无服务器功能已部署到Vercel,我们长期运行的工作过程被部署以渲染,并且他们都与暂时云托管的服务器实例进行了交谈。该服务器是与数据库(SQL或Cassandra)和Elasticsearch一起使用的open source服务群集。您还可以托管yourself的所有内容,也可以节省大量时间,并在paying the experts托管它的可靠性和规模上更加安心来托管它ð。
在开发中,我们可以在本地运行所有三个部分。首先,我们安装了具有服务器的开发版本的临时CLI:
- Homebrew:
brew install temporal
- cURL:
curl -sSf https://temporal.download/cli.sh | sh
- 手册:下载并提取latest release,然后将其添加到您的路径中。
我们使用:
启动服务器
temporal server start-dev
在另一个终端(具有Node V16或更高的终端)运行:
npx @temporalio/create@latest ./temporal-delivery --sample food-delivery
cd ./temporal-delivery
npm run dev
dev
脚本运行两个下一个。JSWeb应用程序和工人。菜单在koude22上运行,我们可以单击“订单”,驱动程序门户位于koude23,我们可以在其中标记我们订购的项目并交付的项目。完成此操作后,我们可以在两个Web应用程序中看到该订单的“已交付”状态。我们还可以在koude24上查看服务器Web UI中相应的工作流执行的状态:
我们可以看到其状态已完成,当我们单击它时,我们会看到事件历史记录'在执行order()
函数期间发生的事件列表:
第一个事件始终是WorkflowExecutionStarted
,其中包含正在启动的工作流的类型,在这种情况下是order
工作流。我们将在下一部分中更多地查看事件。
在“查询”选项卡中,我们可以选择getStatus
查询,该查询(假设我们的工人仍在运行)将向order
函数发送查询,该函数响应订单已交付,交付时间以及哪个项目是交付:
事件顺序
现在让我们看一下订单期间幕后发生的事情。
开始订单
当我们单击“订单”按钮时,API处理程序使用临时客户端将开始命令发送到服务器:
服务器将WorkflowExecutionStarted
事件保存到事件历史记录并返回。它还创建了一个WorkflowTaskScheduled
事件,该事件导致“工作流任务”(一个运行order()
函数的指令)被添加到任务队列中,该工人正在对此进行轮询。
工作人员接收任务,服务器添加了WorkflowTaskStarted
事件,在这种情况下,工人执行任务,调用order(3)
。 koude6 function一直运行直到达到这一行:
const { chargeCustomer, refundOrder, sendPushNotification } = proxyActivities<typeof activities>({ ... })
export async function order(productId: number): Promise<void> {
...
await chargeCustomer(product)
通话活动
当功能调用chargeCustomer
活动时,工人告诉服务器:
服务器添加了WorkflowTaskCompleted
事件(工作流执行并不仅仅完成“运行order()
函数并查看发生的事情”的初始任务)和带有活动类型和参数的ActivityTaskScheduled
事件:
然后,服务器将活动任务添加到任务队列中的活动任务(指令运行活动函数),该工作队员拾取了该任务:
在开发中,我们只运行一个工程流程,因此它正在完成所有任务,但是在生产中,我们有足够的工人来处理我们的负载,其中任何一个都可以接受活动任务。不仅是运行order
函数的一个。
工人遵循活动任务说明,运行chargeCustomer()
函数:
称为paymentService.charge()
:
如果函数引发错误,则工人将其报告回服务器:
,服务器安排重试。 default初始间隔为1秒,因此在1秒内,活动任务将添加回队列以供工人接收。
如果功能成功完成,则工作人员报告成功(在这种情况下没有返回值,但没有)返回服务器:
服务器添加了ActivityTaskStarted
和ActivityTaskCompleted
事件。现在该活动已经完成,order()
函数可以继续执行,因此服务器添加了另一个WorkflowTaskScheduled
事件。它还向队列添加了相应的工作流程任务,该工人拾取了该任务(这时服务器添加了另一个WorkflowTaskStarted
事件)。
第二个工作流任务
如果工人仍然具有订单函数的执行上下文,则可以解决chargeCustomer(product)
Promise,并且该函数将继续执行。如果工人没有执行上下文(因为它是从缓存中驱逐的,以便为另一个工作流程腾出空间,请参见WorkerOptions.maxCachedWorkflows或崩溃或重新启动的过程),则该工人从服务器中获取事件历史记录,创建一个新的分离株,然后再次调用该功能:
这次,当功能击中await chargeCustomer(product)
线时,该工人从事件7,ActivityTaskCompleted
中知道chargeCustomer
已经运行,因此它立即解决了承诺,而不是向服务器发送“呼叫活动”命令。该功能继续运行,直到下一个await
:
const notPickedUpInTime = !(await condition(() => state === 'Picked up', '1 min'))
condition()
将等到状态变为Picked up
或1分钟。当被调用时,工人告诉服务器设置计时器:
服务器将WorkflowTaskCompleted
和TimerStarted
事件添加到事件历史记录中,并在数据库中设置一个计时器。当计时器关闭时,将添加TimerFired
事件以及队列上的WorkflowTaskScheduled
和Workflow任务,告诉工人1分钟已经开始,此时工人将知道解决condition()
Promise。
发送信号
但就我们而言,这没有发生。相反,在一分钟开始之前,我们单击了驱动程序门户中的“拾取”按钮,sent a koude9 Signal到工作流程:
服务器然后添加了两个事件:带信号信息的WorkflowExecutionSignaled
事件,以及另一个WorkflowTaskScheduled
事件。然后将工作流任务添加到队列中,并带有信号信息,该信号信息是由工人拾取的:
工人然后运行pickedUp
信号处理程序:
setHandler(pickedUpSignal, () => {
if (state === 'Paid') {
state = 'Picked up'
}
})
处理程序将状态更改为Picked up
,并调用了信号处理程序后,工人运行所有condition()
功能。现在,() => state === 'Picked up'
将返回true
,因此工作者将解决condition()
Promise并继续执行,以查看函数下一步的操作,这将确定它发送到服务器的下一个命令。
事件历史
一起,我们涵盖的活动历史的一部分是:
事件历史记录是启用持久执行的核心:工作流程所做和发送的所有内容的日志。它允许我们CTRL-C我们的工作过程,重新启动它,打开UI,选择我们已完成的order
工作流程,转到UI的“查询”选项卡,然后发送getStatus
查询,服务器将在该Queue上进行排队拾取的工人,该工人将在缓存中没有工作流程,因此它将从服务器中获取事件历史记录,然后调用order()
函数,立即解决与历史记录最初结果的任何异步功能,然后调用getStatus
处理程序功能:
setHandler(getStatusQuery, () => {
return { state, deliveredAt, productId }
})
由于已运行了整个函数和所有信号处理程序,因此{ state, deliveredAt, productId }
变量都将从最初执行该函数时具有最终值,并将返回服务器,将其返回到客户端,将它们返回到UI以显示在我们的屏幕上:
概括
我们检查了持久的代码如何在Hood下工作:
- 写函数无法完成执行(因为当工作过程死亡时,下一个掌握任务的工作者将获得函数的事件历史记录,并使用事件重新运行代码,直到它处于同一代码中状态)。
- 重试可能具有瞬态故障的功能(如果
chargeCustomer
活动无法达到付款服务,服务器会自动安排另一个活动任务)。
最好的部分是,这些故障对应用程序开发人员是透明的。我们只是编写我们的工作流程和活动代码,并可靠地处理它。 ð
在下一篇文章中,您将学习更多耐用功能可以做的事情。要收到通知,您可以在Twitter或LinkedIn上关注我们。另外,查看我们的新Temporal 101 course!
ð直到下次!
Loren
感谢Brian Hogan,Roey Berman,Patrick Rachford和Dail Magee Jr的阅读草稿。