这篇文章介绍了持久执行的概念,该由条纹,Netflix,Coinbase,Snap和许多其他人使用,以解决分布式系统中的广泛问题。然后,它显示了使用我们的Typescript/JavaScript SDK编写耐用代码的简单简单。
分布式系统
当构建以支持交易的单个数据库为支持的请求响应巨石时,我们没有许多分布式系统问题。我们可以拥有简单的故障模式,并容易保持准确的状态:
- 如果客户端可以到达服务器,则客户端重新验证。
- 如果客户端到达服务器,但是服务器可以到达数据库,则服务器会响应错误,并且客户端会重新检索。
- 如果服务器到达数据库,但是交易失败,服务器会响应错误,并且客户端重新验证。
- 如果交易成功,但是服务器在响应客户端之前就降低了为了判断更新是否已应用),并且服务器向客户端报告了该操作已经执行的。
我们一旦引入了国家生活的第二名,无论是使用自己的数据库还是外部API的服务,处理失败和保持一致性(所有数据存储的准确性)都会变得更加复杂。例如,如果我们的服务器必须为信用卡充电并更新数据库,则我们不能再编写简单的代码:
function handleRequest() {
paymentAPI.chargeCard()
database.insertOrder()
return 200
}
如果第一步(充电卡)成功,但是第二步(将订单添加到数据库)失败,则系统最终处于不一致的状态;我们为他们的卡充电,但数据库中没有记录。为了保持一致性,我们可能要重试第二步,直到我们可以到达数据库为止。但是,运行我们代码的过程也可能会失败,在这种情况下,我们不知道第一步发生了。要解决此问题,我们需要做三件事:
- 坚持订单详细信息
- 坚持我们完成的程序的哪个步骤
- 运行一个工作过程,该过程检查数据库是否不完整订单,并继续下一步
,加上持续的重试状态并为每个步骤添加超时,这是很多编写代码,并且很容易错过某些边缘案例或故障模式(请参阅the full, scalable architecture)。如果我们不必编写和调试所有这些代码,我们就可以更快,更可靠地构建东西。我们不必,因为我们可以使用持久的执行。
执行耐用
耐用的执行系统以持续的每个步骤来运行我们的代码。如果运行代码的进程或容器死亡,则代码会自动在另一个状态完整的过程中继续运行,包括呼叫堆栈和本地变量。
持久的执行可确保执行代码到完成,无论硬件多么可靠或下游服务脱机多长时间。自动进行重试和超时,并且当代码不做任何事情时(例如,在sleep(‘1 month’)
语句中等待时)。
持久的执行使实施分布式系统模式(例如事件驱动的体系结构,任务队列,sagas,circuit breakers和transactional outboxes)变得琐碎或不必要。它是在更高级别的抽象上进行的编程,您不必担心服务器崩溃或网络问题等瞬态故障。它打开了:
的新可能性- 将状态存储在本地变量而不是数据库中,因为局部变量是自动为我们存储的
- 编写一个月睡眠的代码,因为我们不必担心开始睡眠的过程下个月仍在那里,或者在持续时间内被绑架的资源
- 函数可以永远运行,并且我们可以与我们互动(将命令发送到或查询数据)
耐用执行系统的一些示例包括Azure耐用功能,Amazon SWF,Uber Cadence,Infinitic和Temporal(我在哪里工作)。我认为暂时性是这些选择中最好的。
耐用的JavaScript
现在,我们在分布式系统中越来越稳定,并且执行什么是持久的执行,让我们看一个实际的例子。我构建了此食品交付应用程序,以显示耐用代码的样子以及它解决了什么问题:
不怪我的徽标,这就是稳定扩散为您提供耐用的送货应用徽标时所给您的徽标。 ðÖ
该应用具有四个主要功能: 当我们从菜单订购一个项目时,它会出现在交付驱动程序站点(drive.temporal.menu)中,并且驱动程序可以将订单标记为拾取的订单,然后将其标记为交付。 所有这些功能都可以在耐用的JavaScript或Typescript的单个函数中实现。我们将使用后者,我推荐打字稿,我们的库被命名为Typescript SDK,但它以JavaScript为javaScript发布给NPM,可以在任何node.js项目中使用。 让我们看一下此应用程序的代码。我们会看到一些API路线,但大部分都越过名为 用户单击订单按钮时,React前端呼叫由TRPC后端定义的
创建订单
order
的单个耐用功能。如果您想运行该应用程序或在计算机上查看代码,则将下载并设置该项目:
npx @temporalio/create@latest --sample food-delivery
createOrder
突变。 createOrder
API路由处理程序通过启动耐用的order
函数来创建订单。耐用的函数称为 workflows @temporalio/client
中的客户端实例,该实例已添加到ctx.temporal
下的TRPC上下文中。路由处理程序接收了经过验证的input
(带有productId
编号和orderId
字符串的对象),并调用ctx.temporal.workflow.start
启动order
workflow,将input.productId
作为参数提供:
import { initTRPC } from '@trpc/server'
import { z } from 'zod'
import { taskQueue } from 'common'
import { Context } from 'common/trpc-context'
import { order } from 'workflows'
const t = initTRPC.context<Context>().create()
export const appRouter = t.router({
createOrder: t.procedure
.input(z.object({ productId: z.number(), orderId: z.string() }))
.mutation(async ({ input, ctx }) => {
await ctx.temporal.workflow.start(order, {
workflowId: input.orderId,
args: [input.productId],
taskQueue,
})
return 'Order received and persisted!'
}),
order
函数开始验证输入,设置初始状态并为客户收费:
type OrderState = 'Charging card' | 'Paid' | 'Picked up' | 'Delivered' | 'Refunding'
export async function order(productId: number): Promise<void> {
const product = getProductById(productId)
if (!product) {
throw ApplicationFailure.create({ message: `Product ${productId} not found` })
}
let state: OrderState = 'Charging card'
let deliveredAt: Date
try {
await chargeCustomer(product)
} catch (err) {
const message = `Failed to charge customer for ${product.name}. Error: ${errorMessage(err)}`
await sendPushNotification(message)
throw ApplicationFailure.create({ message })
}
state = 'Paid'
任何可能失败的功能都会自动重述。在这种情况下,chargeCustomer
和sendPushNotification
都与目前可能正在下降或可能返回暂时错误消息(例如暂时不可用的服务)交谈。这是可配置的)。这些功能还可能会抛出不可退回的错误,例如卡片下降,在这种情况下,它们将被重述。取而代之的是,该错误将被从chargeCustomer(product)
中丢弃并被捕获块捕获。客户收到一条通知,说他们的付款方式失败了,我们扔了一个ApplicationFailure
以使order
工作流程失败。
获取订单状态
下一个代码需要一些背景:正常功能可以长时间运行,因为它们会在等待事情发生时占用资源,在某个时候他们当我们部署新代码并且旧容器被关闭时,LL会死。耐用功能可以任意时间长度,原因有两个:
- 他们在等待某件事时就不会占用资源。
- 是否关闭了运行它们的过程并不重要,因为另一个过程将无缝执行。
因此,尽管某些耐用功能在短时间内运行,例如成功的汇款功能 - 某些运行时间更长,例如我们的订单功能,该功能在交付订单时结束,客户功能持续客户的寿命。
能够与长期运行的功能进行交互是有用的,因此,时间提供了我们所谓的信号用于将数据发送到功能中的内容,并且 Queries 用于获取数据来自功能的数据。驱动程序站点通过通过此API路线向订单函数发送查询来显示每个订单的状态:
getOrderStatus: t.procedure
.input(z.string())
.query(({ input: orderId, ctx }) => ctx.temporal.workflow.getHandle(orderId).query(getStatusQuery)),
它获得了订单函数的特定实例(称为 workflow执行),发送getStatusQuery
并返回结果。 getStatusQuery
在订单文件中定义并在订单函数中处理:
import { defineQuery, setHandler } from '@temporalio/workflow'
export const getStatusQuery = defineQuery<OrderStatus>('getStatus')
export async function order(productId: number): Promise<void> {
let state: OrderState = 'Charging card'
let deliveredAt: Date
// …
setHandler(getStatusQuery, () => {
return { state, deliveredAt, productId }
})
当订单函数接收getStatusQuery
时,将传递给setHandler
的函数被调用,该函数返回局部变量的值。在致电chargeCustomer
成功之后,该州更改为’Paid’
,并且一直在调查getStatusQuery
的驱动程序站点将获得更新的状态。它显示“接卡”按钮。
接订单
当驱动程序敲击按钮以将订单标记为拾取的订单时,该站点将pickUp
突变发送到API服务器,该突变将pickedUpSignal
发送到订单功能:
apps/driver/pages/api/[trpc].ts
pickUp: t.procedure
.input(z.string())
.mutation(async ({ input: orderId, ctx }) =>
ctx.temporal.workflow.getHandle(orderId).signal(pickedUpSignal)
),
订单函数通过更新状态来处理信号:
export const pickedUpSignal = defineSignal('pickedUp')
export async function order(productId: number): Promise<void> {
// …
setHandler(pickedUpSignal, () => {
if (state === 'Paid') {
state = 'Picked up'
}
})
与此同时,在向客户收取费用后,该功能一直在等待拾音器发生:
:import { condition } from '@temporalio/workflow'
export async function order(productId: number): Promise<void> {
// …
try {
await chargeCustomer(product)
} catch (err) {
// …
}
state = 'Paid'
const notPickedUpInTime = !(await condition(() => state === 'Picked up', '1 min'))
if (notPickedUpInTime) {
state = 'Refunding'
await refundAndNotify(
product,
'⚠️ No drivers were available to pick up your order. Your payment has been refunded.'
)
throw ApplicationFailure.create({ message: 'Not picked up in time' })
}
await condition(() => state === 'Picked up', '1 min')
最多等待1分钟,以使该州更改为Picked up
。如果一分钟没有变化,它将返回错误,我们退还客户。 (要么我们对厨师和送货驱动程序的速度有很高的标准,要么我们希望演示应用程序的用户能够看到所有故障模式ð。)
送货
类似地,通过“交付”按钮发送了一个deliveredSignal
,如果驾驶员在接送后一分钟内没有完整的交货,则将退还客户。
export const deliveredSignal = defineSignal('delivered')
export async function order(productId: number): Promise<void> {
setHandler(deliveredSignal, () => {
if (state === 'Picked up') {
state = 'Delivered'
deliveredAt = new Date()
}
})
// …
await sendPushNotification('🚗 Order picked up')
const notDeliveredInTime = !(await condition(() => state === 'Delivered', '1 min'))
if (notDeliveredInTime) {
state = 'Refunding'
await refundAndNotify(product, '⚠️ Your driver was unable to deliver your order. Your payment has been refunded.')
throw ApplicationFailure.create({ message: 'Not delivered in time' })
}
await sendPushNotification('✅ Order delivered!')
如果交货成功,该功能将等待一分钟才能让客户吃饭并要求他们对经验进行评分。
await sleep('1 min') // this could also be hours or even months
await sendPushNotification(`✍️ Rate your meal. How was the ${product.name.toLowerCase()}?`)
}
最终推送通知后,订单函数的执行结束,工作流执行成功完成。即使该功能已经完成,我们仍然可以发送查询,因为Perimal具有保存功能的最终状态。我们可以通过在订单交付后一分钟来刷新页面来测试:getStatusQuery
仍然有效,并且已显示为状态:
概括
我们看到了如何使用单个耐用函数实现多步订单流。该功能可以在存在故障的情况下完成,包括:
- 网络,数据存储或下游服务的临时问题
- 运行功能失败的过程
- 基本的时间服务或数据库降低
这解决了我们许多分布式系统的关注,这意味着:
- 我们可以使用本地变量,而不是将状态保存到数据库中。
- 我们不需要在数据库中设置计时器以进行应用程序逻辑,例如取消订单过长或用于重试和计时暂时函数(例如
chargeCustomer
)的内置功能。</li>。 - 我们不需要建立一个调查的工作队列,要么是为要么进行下一步或捡起失败流程删除的未完成任务。
在下一篇文章中,我们将查看更多的交付应用程序代码,并了解时间范围如何为我们提供持久的执行。要收到通知,您可以在Twitter或LinkedIn上关注我们。
如果您有任何疑问,我很乐意为您提供帮助!暂时的使命是帮助开发人员,我个人也为此感到高兴。我是在Twitter上的@lorendsr,我回答(和upvoteð)任何用koude33标记的stackoverflow问题,而@loren在community Slack-上。
。了解更多
要了解更多信息,我推荐以下资源:
- Video: Intro to Temporal and using the TypeScript SDK
- 一些common use cases
- 打字稿SDK文档:t.mp/ts
- 打字稿API参考:t.mp/ts-api
- TypeScript tutorials
有关我们的打字稿SDK的更多博客文章:
- Using Temporal as a Node.js Task Queue
- Caching API Requests with Long-Lived Workflows
- Express middleware that creates a REST API for your Workflows
- 1.0.0 release of the TS SDK
- How we use V8 isolates to enforce Workflow determinism
感谢杰西卡·韦斯特(Jessica West),布莱恩·霍根(Brian Hogan),阿米莉亚·芒果(Amelia Mango)和吉姆·沃克(Jim Walker)阅读此帖子的草稿。