状态流:在Golang中组织分布式处理
#网络开发人员 #go #功能 #discuss

在这里,我想提出一些关于上一届article中提出的国家管理的想法。在功能编程中,它的习惯明确指定了状态。但是,这可能导致所谓的国家爆炸。有时,最好保持某种一般状态隐式并操作替代集。

考虑现实生活中的情况。用户订购一些产品。作为对我们服务的HTTP请求实施的内容。处理此查询,我们需要向会计服务提出请求,以创建记录并检查是否有可用的产品,并向金融服务提供另一个请求以提供付款。当然,我们需要将订单存储到数据库中。所有这些外部服务返回连续或不响应。这导致向其他服务的请求更新。例如,如果金融服务返回余额状态,我们应在会计服务中更新记录,为拒绝和免费的保留产品,并更新数据库中的订单状态。

因此,有三个单元:存储,簿记和财务。每个模块具有两种类型的功能:将订单应用于相应的服务和更新状态。它们形成流动。因此,单位互相发送消息。每个模块都有自己的状态,具体取决于外部服务的响应。我们可能会创建与每个单位状态相对应的一般状态。例如,簿记访问者,簿记簿,簿记簿,簿记内部和其他人的类比。但是整个单元网可以在两个状态下:成功或继续取消。这些状态被转化为单位的行动。服务的响应转变为流状态。如果簿记服务返回该产品,则流量的状态将取消,并将消息传播到模块。我们不需要明确存储流状态,我们只能在处理模块之间使用两种类型的信号。所以我们得到了状态流。

在图中显示了一些可能的情况

Flow diagram

看一下sample application。信息处理流本身表示为订单流结构

type OrderStatus string

const (
  OrderCreated             OrderStatus = "created"
  OrderSuccess             OrderStatus = "success"
  OrderInternalError       OrderStatus = "internal_error"
  OrderProductNotAvailable OrderStatus = "product_not_available"
  OrderBalanceNotEnough    OrderStatus = "balance_not_enough"
)

type OrderFlow struct {
  Config       *Config
  OrderStatus  OrderStatus
  OrderID      int
  UserID       uuid.UUID
  ProductID    int
  ProductPrice decimal.Decimal
}

此结构存储我们在答案中返回的订单信息和状态。单位通过StatusStream结构进行交流

type FlowStatus int

const (
  Proceed FlowStatus = 1
  Cancel  FlowStatus = 2
)

type StatusStream struct {
  Forward chan FlowStatus
  Back    chan FlowStatus
}

模块通常采用两个StatusStream结构。第一个提供了与先前模块的连接,第二个模块与下一个模块连接。通过向前频道单元将状态消息发送到下一个单元,然后通过后频道从下一个单元接收消息。

考虑簿记单元

type BookkepingUnit model.OrderFlow

func (f *BookkepingUnit) Process(previous *model.StatusStream, next *model.StatusStream) {
  status := <-previous.Forward
  if status == model.Cancel {
    next.Forward <- model.Cancel
    return
  }

...

resp, err := general.MakeHTTPRequest[ApplyRequest, ApplyResponse]("POST", f.Config.BookkepingApplyURL, &breq)
switch resp.Result.Status {
  case Success:
    next.Forward <- model.Proceed
  case ProductNotAvailable:
    f.OrderStatus = model.OrderProductNotAvailable
    next.Forward <- model.Cancel
    previous.Back <- model.Cancel
    go logger.LogUnit(logger.Info, f.Config.Name, nil,
      f.OrderID, unit, string(ProductNotAvailable))
    return
}

...

status = <-next.Back
if status == model.Cancel {
  previous.Back <- model.Cancel
  updateStatus = OrderCanceled
}
updresp, err := general.MakeHTTPRequest[interface{}, UpdateResponse]("PUT", url, nil)
if updresp.Status == general.StatusError {
  go logger.LogUnit(logger.Info, f.Config.Name, nil,
    f.OrderID, unit, string(model.OrderInternalError))
  if status == model.Proceed {
    next.Forward <- model.Cancel
    previous.Back <- model.Cancel
  }
}

从OrderFlow继承的模块,为逻辑分离使用类型重新定义。首先,单位等待上一个阶段的信号。然后,它要求外部服务。如果连续响应,则将收回信号发送到下一个单元。例如,如果响应被认为没有成功,例如,没有剩下产品,则取消信号会传播到上一个模块和下一个模块。根据从顺序返回的状态,更新请求是通过订单或订购状态进行的。如果我们在更新记录状态时遇到错误,我们还取消了订单并将取消信号发送给模块。

启动流是在流中进行的。进程函数

func Process(flow *model.OrderFlow) {
  repo := storage.NewRepository(general.NewPgsql(flow.Config.DBConnectionString))
  bu := (*bookkeeping.BookkepingUnit)(flow)
  fu := (*financials.FinancialsUnit)(flow)
  su := (*storage.StorageUnit)(flow)
  start := model.NewStatusStream()
  storage2bookkeeping := model.NewStatusStream()
  bookkeeping2financials := model.NewStatusStream()
  go su.Process(repo, start, storage2bookkeeping)
  go bu.Process(storage2bookkeeping, bookkeeping2financials)
  go fu.Process(bookkeeping2financials)
  start.Forward <- model.Proceed
  <-start.Back
}

在这里,我们创建单位和渠道,启动处理并等待处理完成。还要注意此操作的陈述风格。
在端点层中,进行了预处理和后处理。

func MakeOrderEndpoint(cfg *model.Config) general.Endpoint {
  return func(w http.ResponseWriter, r *http.Request) {
    w.Header().Add("Content-Type", "application/json")
    req, err := general.RequestDecode[model.OrderRequest](r)

...

    f := &model.OrderFlow{
      Config:       cfg,
      UserID:       req.UserID,
      ProductID:    req.ProductID,
      ProductPrice: req.ProductPrice,
    }
    flow.Process(f)
    res := model.OrderResult{
      OrderStatus: f.OrderStatus,
      OrderID:     f.OrderID,
    }
    resp := model.OrderResponse{
      Status: general.StatusOK,
      Result: &res,
    }

...

    json.NewEncoder(w).Encode(resp)

在此层中,我们对用户订单请求进行解码,形成orderflow对象,启动流程并制作DTO并将答案返回给用户。
我们需要向用户向订单的状态发送消息。因此,在每个单元中,制作了一些状态代数。服务响应被转化为订单状态。在罚款单元中,例如

switch resp.Result.Status {
  case Success:
    transactionID = *resp.Result.TransactionID
    previous.Back <- model.Proceed
  case BalanceNotEnough:
    f.OrderStatus = model.OrderBalanceNotEnough
    go logger.LogUnit(logger.Info, f.Config.Name, nil,
      f.OrderID, unit, string(model.OrderBalanceNotEnough))
    previous.Back <- model.Cancel
}

在模块中,我们记录错误,因此我们可以重建事件,如果出现问题。

golang具有用于分布式信息处理的强大工具。我在这里提出了状态流量的概念。它可以分支并成为状态的网络。提出的工具包也可以帮助解决这种情况。