补偿动作,与萨加斯的完整早餐的一部分
#python #go #distributedsystems #designpatterns

早上!补偿动作模式可以说(有些人称其为补偿交易模式),但很容易消化。在分布式系统中,我们经常在与数据一致性问题作斗争。尽管微服务提高了可伸缩性,可检验性和单个团队代码速度,但它们引入了一个新问题:基本说明序列现在需要服务之间的通信,并且可以使整个系统在失败时处于不良状态。就像家庭成员的混乱单独制作早餐谷物一样,一些跨越服务的操作需要作为一个单元完成,类似于数据库交易。否则,一个人可能会倒出谷物,只是发现他们的孩子用完了最后的牛奶。有了多个早餐制作者,微服务是不可能保证跨服务多步操作的所有步骤(谷物,然后是牛奶)将在没有任何打ic的情况下运行。

补偿动作模式提供了交易 - 在此分布式系统混乱中为一系列操作提供了保证。该模式通过让操作的顺序成功地完成完成完成,或者面对失败,撤消了执行操作所做的变化,就可以模拟交易,仿佛从来没有发生过。远期进度或撤消称为长期交易。重要的是要注意,撤消状态的变化与将数据库滚动回到先前的快照状态不同。这是因为在执行我自己长期运行的交易的操作时,另一个服务或交易也可以成功修改数据库,并且在回滚时会丢失该状态。决定补偿给定操作的方式取决于特定情况。处理汇款?最好将资金放回原处。运输问题?也许只是保留库存。由于补偿方式取决于应用程序的特定需求,因此开发人员需要在计划中定义它。

总而言之,补偿措施(有时称为暂时说明中的交易补偿活动)是处理分布式分布式失败的设计模式,但相关服务。如果有:

  • 一系列跨越一项服务的操作,必须作为一个单位完成(全部或没有类似交易的语义),
  • 如果整个交易未能完成
  • ,则必须撤消某些操作

然后,补偿动作模式可以确保您在失败时保持一致状态!

补偿动作是著名的传奇设计模式的重要组成部分。补偿行动确保有办法倒退(撤消)并以一致的状态最终。传奇是一种实现故障弹性的设计模式,使用补偿,但也确保有一种方法可以向前(通过重试),并保持整个操作序列的作用,就像单个交易一样。

编码补偿措施

关于订购赔偿的快速记录

假设谷物类比的步骤和补偿如下:

getBowl()
addCereal()
addMilk()

a 幼稚编码步骤和补偿的方式可能看起来像这样:

// DON'T DO THIS
try {
   getBowl()
   addCompensation(putBowlAway)
   addCereal()
   addCompensation(putCerealBackInBox)
   addMilk()
} catch (Exception e) {
   compensate()
   throw e
}

假设getBowl失败。如果由于机柜中没有碗而失败,则上述代码看起来正确。但是,假设在执行getBowl时,您会从橱柜里拿出一碗碗,然后整个早晨都对蓝军上的事情感到分心。您的早餐工作流程(因此是getBowl失败),因为现在是午餐。好吧,现在我们有一个问题,因为我们跳起来执行compensate,没有注册的补偿,但是我们的碗在厨房柜台上!如果一个活动时间(在时间细节中,koude4 koude5 and koude6可以设置这样的限制),或者在您想补偿的事情之后崩溃,可能会发生这种情况。已执行。在时间场景中,如果用户在我们想要补偿的事情之后取消活动已经执行,这也会发生。所有这些问题*可以通过简单地通过以下方式重新排序您的步骤来解决:

try {
   addCompensation(putBowlAway)
   getBowl()
   addCompensation(putCerealBackInBox)
   addCereal()
   addMilk()
} catch (Exception e) {
   compensate()
   throw e
}

轻松。

在代码上!

如果您写了Java,则可以利用Temporal’s Saga library注册您喜欢作为补偿运行的功能,而Perimal将完成其余的工作:

// Inside your Workflow class
// You can set parallel compensations if appropriate with the Builder
Saga saga = new Saga(new Saga.Options.Builder().build());
try {
   saga.addCompensation(breakfastActivity::putBowlAway);
   breakfastActivity.getBowl();
   saga.addCompensation(breakfastActivity::putCerealBackInBox);
   breakfastActivity.addCereal();
   breakfastActivity.addMilk();
} catch (ActivityFailure e) {
   saga.compensate();
   throw e;
}

如果在addCereal()addMilk()期间发生故障,SAGA类将致电compensate(),所有注册的补偿功能将执行(putBowlAway()putCerealBackInBox())。完整的存储库是available on GitHub

如果您与打字稿,Python或Go合作,则需要自己跟踪薪酬,但这并不具有挑战性。您需要做的就是跟踪补偿功能列表,然后在失败时执行它们。

这是Python版本:

class Compensations:
   def __init__(self, parallel_compensations=False):
       self.parallel_compensations = parallel_compensations
       self.compensations = []

   def add(self, function: typing.Callable[..., typing.Awaitable[None]]):
       self.compensations.append(function)

   def __iadd__(self, function: typing.Callable[..., typing.Awaitable[None]]):
       self.add(function)
       return self

   async def compensate(self):
       async def run_compensation(
           compensation: typing.Callable[..., typing.Awaitable[None]]
       ) -> None:
           try:
               await workflow.execute_activity(
                   compensation,
                   start_to_close_timeout=time_delta,
                   retry_policy=common_retry_policy,
               )
           except:
               workflow.logger.exception("failed to compensate")

       if self.parallel_compensations:
           all_compensations = [run_compensation(c) for c in self.compensations]
           await asyncio.gather(*all_compensations)

       else:
           for f in reversed(self.compensations):
               await run_compensation(f)

然后,注册您的自定义补偿部分看起来与Java示例非常相似:


@workflow.defn
class BreakfastWorkflow:
   @workflow.run
   async def run(self, parallel_compensations) -> None:
       compensations = Compensations(parallel_compensations=parallel_compensations)
       try:
           compensations += put_bowl_away
           await workflow.execute_activity(
               get_bowl,
               start_to_close_timeout=time_delta,
               retry_policy=common_retry_policy,
           )
           compensations += put_cereal_back_in_box
           await workflow.execute_activity(
               add_cereal,
               start_to_close_timeout=time_delta,
               retry_policy=common_retry_policy,
           )
           await workflow.execute_activity(
               add_milk,
               start_to_close_timeout=time_delta,
               retry_policy=common_retry_policy,
           )
       except Exception:
           task = asyncio.create_task(compensations.compensate())
           # Ensure the compensations run in the face of cancelation.
           await asyncio.shield(task)
           raise

完整的存储库是up on GitHub

这是打字稿版本:

type Compensation = () => Promise<void>

async function compensate(compensations: Compensation[], compensateInParallel = false) {
   if (compensateInParallel) {
     compensations.map(comp => comp().catch(err => console.error(`failed to compensate: $error`)))
   }
    for (const comp of compensations) {
     try {
       await comp()
     } catch (err) {
       console.error(`failed to compensate: ${err}`)
     }
   }
 }

export async function breakfastWorkflow(compensateInParallel = false): Promise<void> {
 const compensations: Compensation[] = []
 try {
   compensations.unshift(putBowlAway)
   await getBowl()
   compensations.unshift(putCerealBackInBox)
   await addCereal()
   await addMilk()
 } catch (err) {
   await compensate(compensations, compensateInParallel)
   throw err
 }
}

Complete repository on GitHub.

在GO中,我们可以使用defer关键字来检查功能是否正常中止或是否需要运行赔偿:

type Compensations []any

func (s *Compensations) AddCompensation(activity any) {
   *s = append(*s, activity)
}

func (s Compensations) Compensate(ctx workflow.Context, inParallel bool) {
   if !inParallel {
       for i := len(s) - 1; i >= 0; i-- {
           errCompensation := workflow.ExecuteActivity(ctx, s[i]).Get(ctx, nil)
           if errCompensation != nil {
               workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
           }
       }
   } else {
       selector := workflow.NewSelector(ctx)
       for i := 0; i < len(s); i++ {
           execution := workflow.ExecuteActivity(ctx, s[i])
           selector.AddFuture(execution, func(f workflow.Future) {
               if errCompensation := f.Get(ctx, nil); errCompensation != nil {
                   workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
               }
           })
       }
       for range s {
           selector.Select(ctx)
       }

   }
}

func BreakfastWorkflow(ctx workflow.Context, parallelCompensations bool) (err error) {
   // Omitted for brevity: set activity options and retry policy here.

   var compensations Compensations

   defer func() {
       // Defer is at the top so that it is executed regardless of which step might fail.
       if err != nil {
           // activity failed, and workflow context is canceled
           disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx)
           compensations.Compensate(disconnectedCtx, parallelCompensations)
       }
   }()

   compensations.AddCompensation(PutBowlAway)
   err = workflow.ExecuteActivity(ctx, GetBowl).Get(ctx, nil)
   if err != nil {
       return err
   }

   compensations.AddCompensation(PutCerealBackInBox)
   err = workflow.ExecuteActivity(ctx, AddCereal).Get(ctx, nil)
   if err != nil {
       return err
   }

   err = workflow.ExecuteActivity(ctx, AddMilk).Get(ctx, nil)

   return err
}

GO代码的完整存储库是available on GitHub

*精美的印刷品

您应该意识到的一些时间特定的配置或操作可能会阻止您的薪酬按照您的任务执行:如果您设置超时或在 workfrows上重新验证,您的补偿可能不会有机会在工作流程之前运行(因此,如果您想确保赔偿运行,请不要设置这些限制)。此外,终止 reset 不允许工作流代码执行任何finallydefer语句,因此也避免使用这些语句。如果避免使用这些方案,则可以确保赔偿正确完成。

概括

补偿动作(或补偿交易)是一种分布式系统设计模式,用于模拟在多个数据库中分布的操作的原子执行。如果其中一个分布式操作失败,则通过补偿措施取消其影响。补偿是较大的传奇设计模式的组成部分,在下一篇文章中,我将更详细地详细介绍。

本文中提到的所有代码的完整项目是available on GitHub

看到我的一位同事Dominik Tornow,给出了基于这些想法的Sagas的简介,请查看our YouTube video