交易量。第一部分
#go #microservices #patterns

此模式也称为应用程序事件,尝试解决以下问题。

问题

如何可靠/原子更新数据库并发送消息/事件?

军队

  1. 2pc不是一个选项。 2pc代表两个阶段提交... Google:)。
  2. 如果必须发送数据库事务提交消息。相反,如果数据库回滚,则不得发送消息
  3. 必须按照服务发送的顺序将消息发送到消息代理。必须在更新相同聚合的多个服务实例中保留此顺序。

申请说明

为了更明确地说明我的示例,我将描述一个愚蠢的应用程序,哪种业务模型围绕猫旋转,好吗?所以当然我们有一个
我们的数据库中的表格为“猫”,该表存储了有关我们平台中注册的猫的基本信息。我们要解决的问题是
在我们在DB中对猫进行的每次更新中,例如,地址更改,名称更改,饥饿状态更改等...我们想让此通知到其他
服务或在这种情况下的消费者可能会使用此信息。一切都清楚,现在在部队部分阅读,您必须注意到这些要求并不那么疯狂。现在让我们去解决方案,为此模式提出建议。

解决方案

该模式提出的解决方案如下。除了桌子“猫”,还有有关猫的所有信息之外,我们还将有另一个桌子,它将是发件框(以我心爱的语言为“buzã³ndesalida”),让我们命名此表“ events”。在此表中,只有在CATS DB中的操作成功时,我们才会存储消息/事件。作为两个操作,将写入“猫”表和写入“事件”表的一个操作将发生在
中 同样的交易,这确保我们只有在两个操作成功时才​​会进行交易。
现在,在此写入“事件”表之后,我们将有一个从该表中读取的过程,并将其发布到消息经纪。

流程在下图中更好地描述。

Pattern diagram

好处

  1. 仅当数据库事务提交
  2. 时,保证消息将被发送时发送。
  3. 消息以应用程序发送的顺序发送到消息代理

缺点

  1. 开发人员可能会在更新数据库后忘记发布消息/事件。
  2. 如果消息继电器崩溃,您可能最终会发送多次相同的消息。这意味着消息消费者应该是愿意的,所以 第二次处理相同的输入时,第二次处理相同的消息的处理不会产生效果。阅读有关愿意的更多信息 在这个Stackoverflow question中。

实施示例

到目前为止,我们基本上一直在从Pattern: Transactional outbox,无耻的复制和粘贴中复制和粘贴,这是正确的。作为我几乎所有的文章,目的是真正弄脏我的手,并真正了解我没有的东西
线索。为了很好地掌握这种模式,我们需要对其进行编码,否则我会忘记它。我们开始做吧。我将使用Golang为此。

项目结构

code/
├── config
│   └── config.go
├── controllers
│   ├── cat
│   │   └── controller.go
│   └── model
│       ├── cat_converter.go
│       └── cat.go
├── database
│   ├── database.go
│   └── migrations
│       ├── 20221005203113_events_table.sql
│       └── 20221005203128_cat_table.sql
├── docker-compose.yaml
├── go.mod
├── go.sum
├── main.go
├── Makefile
├── middlewares
│   └── cors.go
├── msgrelay
│   └── msgrelay.go
├── README.md
├── repositories
│   ├── cat
│   │   └── repository.go
│   ├── event
│   │   └── repository.go
│   └── model
│       ├── cat.go
│       └── event.go
└── routes
    └── routes.go

楷模

让我们在Golang声明我们的模型。

这是我们要在平台中反映的猫的一些特征。

  • 名称
  • 颜色
  • 重量
  • 智力(以1-5的比例为1-5,因为只有1-10的规模,它们更聪明)
  • 懒惰(以1-10的比例)
  • 好奇心(以1-10的比例)
  • 社交能力(以1-10的规模为1-10)
  • 利己主义(以5-10的比例)
  • Miau Power(从1-10起,他能多么响亮)
  • 攻击力量(以1-10的比例)

根据这些要求,我们有以下结构:

文件:repository/model/cat.go


package model

type Cat struct {
    ID           string  `gorm:"primaryKey"`
    Name         string  `gorm:"column:name;size:100"`
    Color        string  `gorm:"column:color;size:100"`
    Weight       float64 `gorm:"column:weight"`
    Intelligence int     `gorm:"column:intelligence"`
    Laziness     int     `gorm:"column:laziness"`
    Curiosity    int     `gorm:"column:curiosity"`
    Sociability  int     `gorm:"column:sociability"`
    Egoism       int     `gorm:"column:egoism"`
    MiauPower    int     `gorm:"column:miau_power"`
    Attack       int     `gorm:"column:attack"`
}

请记住,这是我们的主要资源,想要我们要执行更新操作机智。

发件箱

在此表中,我们将存储要发送给其他服务的消息/事件信息,这取决于您在这里想要的内容,但是您应该在
最少参考哪些资源已更改。在我们的情况下,我们只想存储资源的ID,资源类型以及触发的事件类型。

文件:repository/model/event.go


package model

type Event struct {
    ID           string `gorm:"primaryKey"`
    Type         string `gorm:"column:type"`
    ResourceID   string `gorm:"column:resource_id"`
    ResourceType string `gorm:"column:resource_type"`
    Published    bool   `gorm:"column:published"`
}

基本创建和更新操作

就我们的目的而言,最简单的情况如下,我创建了一只猫,然后更新猫,此更新操作应触发事件。所以我们会
只能创建这两个操作,随意创建Crud的另一部分,我很懒惰。

文件:repository/cat/repository.go


package cat

import (
    "context"

    "github.com/Gealber/outbox/repositories/model"
    "gorm.io/gorm"
    "gorm.io/gorm/clause"
)

type repository struct {
    db *gorm.DB
}

func New(db *gorm.DB) *repository {
    return &repository{db: db}
}

// Create creates a new cat :).
func (r *repository) Create(ctx context.Context, cat model.Cat) (*model.Cat, error) {
    if err := r.db.Create(&cat).Error; err != nil {
        return nil, err
    }

    return &cat, nil
}

// Update perform update operation to specified cat :).
func (r *repository) Update(ctx context.Context, id string, cat model.Cat) (*model.Cat, error) {
    err := r.db.Transaction(func(tx *gorm.DB) error {
        // update record in cats table.
        result := tx.Model(&model.Cat{}).Clauses(clause.Returning{}).
            Where("id = ?", id).
            Updates(&cat)

        if result.RowsAffected < 1 {
            return gorm.ErrRecordNotFound
        }

        // create event to store.
        event := model.Event{
            Type:         "update",
            ResourceType: "cat",
            ResourceID:   id,
        }

        // write event in events table.
        if err := tx.Model(&model.Event{}).Create(&event).Error; err != nil {
            return err
        }

        return nil
    })

    if err != nil {
        return nil, err
    }

    return &cat, nil
}

未完待续...

现在,这种模式中的部分之一是消息继电器。让我们在其他文章中讨论一下,我厌倦了打字:),像猫一样懒惰。

代码

代码的一部分可以在此repository中找到,我之所以说部分是因为消息继电器使用投票发布者模式,这是不可扩展的。我会其他时间讨论。

参考书目

  1. Transactional outbox