此模式也称为应用程序事件,尝试解决以下问题。
问题
如何可靠/原子更新数据库并发送消息/事件?
军队
- 2pc不是一个选项。 2pc代表两个阶段提交... Google:)。
- 如果必须发送数据库事务提交消息。相反,如果数据库回滚,则不得发送消息
- 必须按照服务发送的顺序将消息发送到消息代理。必须在更新相同聚合的多个服务实例中保留此顺序。
申请说明
为了更明确地说明我的示例,我将描述一个愚蠢的应用程序,哪种业务模型围绕猫旋转,好吗?所以当然我们有一个
我们的数据库中的表格为“猫”,该表存储了有关我们平台中注册的猫的基本信息。我们要解决的问题是
在我们在DB中对猫进行的每次更新中,例如,地址更改,名称更改,饥饿状态更改等...我们想让此通知到其他
服务或在这种情况下的消费者可能会使用此信息。一切都清楚,现在在部队部分阅读,您必须注意到这些要求并不那么疯狂。现在让我们去解决方案,为此模式提出建议。
解决方案
该模式提出的解决方案如下。除了桌子“猫”,还有有关猫的所有信息之外,我们还将有另一个桌子,它将是发件框(以我心爱的语言为“buzã³ndesalida”),让我们命名此表“ events”。在此表中,只有在CATS DB中的操作成功时,我们才会存储消息/事件。作为两个操作,将写入“猫”表和写入“事件”表的一个操作将发生在
中
同样的交易,这确保我们只有在两个操作成功时才会进行交易。
现在,在此写入“事件”表之后,我们将有一个从该表中读取的过程,并将其发布到消息经纪。
流程在下图中更好地描述。
好处
- 仅当数据库事务提交 时,保证消息将被发送时发送。
- 消息以应用程序发送的顺序发送到消息代理
缺点
- 开发人员可能会在更新数据库后忘记发布消息/事件。
- 如果消息继电器崩溃,您可能最终会发送多次相同的消息。这意味着消息消费者应该是愿意的,所以 第二次处理相同的输入时,第二次处理相同的消息的处理不会产生效果。阅读有关愿意的更多信息 在这个Stackoverflow question中。
实施示例
到目前为止,我们基本上一直在从Pattern: Transactional outbox,无耻的复制和粘贴中复制和粘贴,这是正确的。作为我几乎所有的文章,目的是真正弄脏我的手,并真正了解我没有的东西
线索。为了很好地掌握这种模式,我们需要对其进行编码,否则我会忘记它。我们开始做吧。我将使用Golang为此。
项目结构
code/
├── config
│ └── config.go
├── controllers
│ ├── cat
│ │ └── controller.go
│ └── model
│ ├── cat_converter.go
│ └── cat.go
├── database
│ ├── database.go
│ └── migrations
│ ├── 20221005203113_events_table.sql
│ └── 20221005203128_cat_table.sql
├── docker-compose.yaml
├── go.mod
├── go.sum
├── main.go
├── Makefile
├── middlewares
│ └── cors.go
├── msgrelay
│ └── msgrelay.go
├── README.md
├── repositories
│ ├── cat
│ │ └── repository.go
│ ├── event
│ │ └── repository.go
│ └── model
│ ├── cat.go
│ └── event.go
└── routes
└── routes.go
楷模
让我们在Golang声明我们的模型。
猫
这是我们要在平台中反映的猫的一些特征。
- 名称
- 颜色
- 重量
- 智力(以1-5的比例为1-5,因为只有1-10的规模,它们更聪明)
- 懒惰(以1-10的比例)
- 好奇心(以1-10的比例)
- 社交能力(以1-10的规模为1-10)
- 利己主义(以5-10的比例)
- Miau Power(从1-10起,他能多么响亮)
- 攻击力量(以1-10的比例)
根据这些要求,我们有以下结构:
文件:repository/model/cat.go
package model
type Cat struct {
ID string `gorm:"primaryKey"`
Name string `gorm:"column:name;size:100"`
Color string `gorm:"column:color;size:100"`
Weight float64 `gorm:"column:weight"`
Intelligence int `gorm:"column:intelligence"`
Laziness int `gorm:"column:laziness"`
Curiosity int `gorm:"column:curiosity"`
Sociability int `gorm:"column:sociability"`
Egoism int `gorm:"column:egoism"`
MiauPower int `gorm:"column:miau_power"`
Attack int `gorm:"column:attack"`
}
请记住,这是我们的主要资源,想要我们要执行更新操作机智。
发件箱
在此表中,我们将存储要发送给其他服务的消息/事件信息,这取决于您在这里想要的内容,但是您应该在
最少参考哪些资源已更改。在我们的情况下,我们只想存储资源的ID,资源类型以及触发的事件类型。
文件:repository/model/event.go
package model
type Event struct {
ID string `gorm:"primaryKey"`
Type string `gorm:"column:type"`
ResourceID string `gorm:"column:resource_id"`
ResourceType string `gorm:"column:resource_type"`
Published bool `gorm:"column:published"`
}
基本创建和更新操作
就我们的目的而言,最简单的情况如下,我创建了一只猫,然后更新猫,此更新操作应触发事件。所以我们会
只能创建这两个操作,随意创建Crud的另一部分,我很懒惰。
文件:repository/cat/repository.go
package cat
import (
"context"
"github.com/Gealber/outbox/repositories/model"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type repository struct {
db *gorm.DB
}
func New(db *gorm.DB) *repository {
return &repository{db: db}
}
// Create creates a new cat :).
func (r *repository) Create(ctx context.Context, cat model.Cat) (*model.Cat, error) {
if err := r.db.Create(&cat).Error; err != nil {
return nil, err
}
return &cat, nil
}
// Update perform update operation to specified cat :).
func (r *repository) Update(ctx context.Context, id string, cat model.Cat) (*model.Cat, error) {
err := r.db.Transaction(func(tx *gorm.DB) error {
// update record in cats table.
result := tx.Model(&model.Cat{}).Clauses(clause.Returning{}).
Where("id = ?", id).
Updates(&cat)
if result.RowsAffected < 1 {
return gorm.ErrRecordNotFound
}
// create event to store.
event := model.Event{
Type: "update",
ResourceType: "cat",
ResourceID: id,
}
// write event in events table.
if err := tx.Model(&model.Event{}).Create(&event).Error; err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return &cat, nil
}
未完待续...
现在,这种模式中的部分之一是消息继电器。让我们在其他文章中讨论一下,我厌倦了打字:),像猫一样懒惰。
。代码
代码的一部分可以在此repository中找到,我之所以说部分是因为消息继电器使用投票发布者模式,这是不可扩展的。我会其他时间讨论。