本文介绍了从研究区块链项目中学到的一些经验教训,该团队通常对透明的分布式并发。我们的API服务器缩放到3个复制品,其中引入了我们的应用程序中的许多并发细微差别和竞赛条件。这篇文章提到了其中一个问题,我们用咨询锁解决了这些问题。所有示例均以Go编写。
介绍
这个故事来自一些项目,我们的团队乐观地为Kubernetes上的服务器设置了一个以上的复制品(Heck,这仍然发生了)。乍一看,这是一件好事,因为我们认为一旦任何一个复制品或服务器掉落,我们就可以始终拥有一些故障转移。这在许多利用Kubernetes的中小型项目中很常见。
但是,这希望该应用程序或多或少地意识到自己有多个实例。任何状态申请都需要知道所请求实体的当前状态。具有相同状态的多个应用程序的多个实例使我们陷入并发问题。不幸的是,这些GO项目并非旨在处理复制的状态工作负载。因此,他们成为比赛条件的受害者并写了争论。
并发设计
并发设计对于开始扩展的应用程序至关重要,这可以帮助提高性能和可扩展性。但是,为更分布式系统的设计并发系统并不那么琐碎,尤其是当我们可以在Kubernetes上使用GO的实例和各自数据库的GO组合时。这与您的平均并发设计相反,因为我们更专注于处理应用程序可伸缩性,而不是阻止/非阻止请求。
用分布式并发表明
处理并发是任何地鼠的第二天性。我们的问题分布得更多,但并不是很多,以至于我们将其称为最真实的分布式系统。但是,这确实意味着我们需要与您的普通宠物项目有所不同。有两种解决此问题的方法:
- 分布式消息传递 - 带有消息传递库,例如 Ergo 或 RabbitMQ ,我们可以在GO服务器之间创建应用程序级别的协议,以在哪些服务器之间进行交流,以便传达哪些服务器正在工作以及需要依次或并行完成哪些工作。
- 分布式锁定 - 使用诸如 Redis 的应用用于并行工作。
印刷精美
我们需要记住,这个问题最终比我们想象的要普遍。任何使用Kubernetes的现代系统最终都会导致团队na -Velly认为他们需要添加更多的复制品。添加更多图书馆或更多应用程序来补偿该项目的维护和技术债务表面。我们希望尽可能多地使用现有技术。
赢得胜利
幸运的是,这些项目中的一件事是使用PostgreSQL。除了Kafka和偶尔的Redis外,很少有常规尺寸的服务使用Postgres以外的其他任何东西。我们可以利用它来利用它,因为我们可以利用Postgres的某些特定应用特定功能来用作控制共享资源的机制。
处理与PostgreSQL的并发
*问题
在这个项目中,我们在API服务器中嵌入了一个Cronjob,作为Goroutines(不要问为什么)每次运行每03:00和15:00 UTC 。这些goroutines以代币和NFT的实时价格为基础,并通过我们的主钱包有效地更新智能合约的配置。
最初的假设是该API服务器只能具有1个副本实例,但是由于某种原因,我们决定使用3-这意味着我们的cronjob将有效运行3次。在普通应用中,我们可能会忽略它以冗余,但是每个电话要花费一定数量的汽油费,如果您的外观足够长的时间,它们会很快堆积。更不用说我们不能自动化我们的应用程序。否则,我们会自动破产。
咨询锁的使用
Postgres中的一个非常优雅的解决方案是咨询锁。咨询锁是一个应用程序级锁,在阻止/非阻止物质中处理共享资源。这些锁对我们的情况特别有用,因为我们可以使用它们在所有API服务器实例上标记作业。
执行
我们实际上使用github.com/robfig/cron来设置我们的cronjob goroutines。这是一种定义cron时间并将回调引用到它的优雅方法,尽管它偶尔会混淆,因为它可以选择数秒钟。我们将此库导入并使用:
import (
"github.com/robfig/cron"
...
)
func setupRouter(cfg config.Config, s repo.DBRepo) *echo.Echo {
c := cron.New()
c.AddFunc("0 0 3 * * *", h.UpdateContractConfigs)
c.AddFunc("0 0 15 * * *", h.UpdateContractConfigs)
...
}
我们的cronjob的UpdateContractConfigs
回调非常简单。我们创建了一个标签的基于事务的咨询锁,我们通过done
回调来控制该锁定。我们还在回调的上下文中应用了它,以防止其僵局。
Cronjob应始终重试,直到在我们给予的最多条件下成功。由于我们按时迫使我们实施此功能,因此我们通过使用goto
进行重试逻辑的牺牲方式。我不推荐它,但它令人惊讶地有效,并且看不到:
func (h *Handler) UpdateContractConfigs() {
ctx, cancel := context.WithTimeout(context.Background(), consts.AdvisoryLockTime*time.Second)
defer cancel()
tx, done := h.store.NewTransactionWithContext(ctx)
TryXactLock:
result, err := h.repo.Advisory.TryXactLock(tx, consts.AdvisoryCronjobNamespace, consts.AdvisoryLockContractConfig)
if err != nil {
zap.L().Sugar().Errorf("cannot claim advisory lock %v::%v : %v", consts.AdvisoryCronjobNamespace, consts.AdvisoryLockContractConfig, err)
goto TryXactLock
}
if !result.PgTryAdvisoryXactLock {
zap.L().Sugar().Errorf("cannot claim advisory lock %v::%v : %v", consts.AdvisoryCronjobNamespace, consts.AdvisoryLockContractConfig, err)
done(err)
return
}
...
}
我们有一张货币地图,我们在数据库中跟踪这些货币以及这些货币的相关配置,以根据实时价格数据进行映射。我们将它们分开,因为我们在开发人员,生产和环境中具有不同的配置。
CurrencyMap:
currencyMap, err := h.repo.CurrencyTranslation.GetAllMap(h.store)
if err != nil {
zap.L().Sugar().Errorf("h.repo.CurrencyTranslation.GetAllMap(): %v", err)
goto CurrencyMap
}
ConfigPriceUSD:
err = h.UpdateConfigPriceUSD(currencyMap)
if err != nil {
zap.L().Sugar().Errorf("h.UpdateContractSalaryConfig(): %v", err)
goto ConfigPriceUSD
}
使用我们所有的价格输入设置,我们在其他配置中使用此数据来更新下降率,宽松,费用,排行榜等。该代码与上面的goto
样式基本相同,并增加了睡眠计时器。
睡眠计时器有助于避免给我们给予区块链,并防止我们获得比率限制的。这也使我们无法通过网络进行比赛条件,这很少见,但以前发生过。
SalaryConfig:
err = h.UpdateContractSalaryConfig(currencyMap)
if err != nil {
zap.L().Sugar().Errorf("h.UpdateContractSalaryConfig(): %v", err)
goto SalaryConfig
}
time.Sleep(time.Second * 60)
锁定桌子的旁注
您可能已经注意到,您还可以使用咨询锁来帮助您在Postgres上的表格上插入插件和更新。可能不建议这样做,因为一个更好的选择是为您的桌子使用SERIALIZABLE
隔离级别,尤其是关于金融的任何事情。
另一件事要考虑
除了摆脱goto
s之外,最好将Cronjob作为带有标签的Queuupobs。这样,我们可以避免通过标记的锁来创建对动作的隐式跟踪,并通过标记的作业在队列中明确跟踪它们。
结论
设计用于分布式系统的并发系统可能有些棘手。自然,每个人都在决定是否应该缩放某些东西时会犯错。 Postgres以及其他支持应用程序级锁定的应用程序可以利用在阻止/非阻止物质中处理共享资源。老实说,我确实相信有更好的解决方案,但是写这项功能是一个有趣的经历。希望这使您了解哪些解决方案可用于类似问题。
参考
在
上关注我们网站:https://dwarves.foundation
不和谐:https://discord.gg/dwarvesv
粉丝页:https://www.facebook.com/dwarvesf
LinkedIn:https://www.linkedin.com/company/dwarvesf
替代:https://note.d.foundation/