作为一支Trentyol索引团队,我们的体系结构最终基于基于事件的系统,最终应该是一致的。因此,即使这个词似乎使人感到宽慰,也很难确保系统中的每个信息都已成功处理。
我们的事件系统依赖于Apache Kafka。当我们需要在事件流中进行性能时,它为我们提供了强大的能力。我们的消息携带无效(每日150m+),应适用于Trentyol含量(300m+)。这种无效可能是关于股票,促销或我们的30个活动之一。
在活动世界中,即使在一些不一致的州,我们也需要协调我们的信息。重试和延迟消息是我们最终的事件生命周期中的关键策略。我们有10多名消费者应该应用此策略。因此,我们创建了一个名为Kafka Cronsumer的开源库,以便于实施。
kafka cronsumer如何工作−
如库名称所建议的那样,它根据给定的cron表达式消耗异常主题中的事件。
例如,我们可以将CRON表达式指定为*/20 * * * *
,这意味着每20分钟运行一次。我们还可以设置一个duration
15分钟(15m)代表我们在固定持续时间内积极消费事件的例外。
- 如果消费者在食用消息时会遇到错误,它会增加消息重试计数,并产生处理下一个迭代(下一个工作时间)的异常主题。
- 如果超过
maxRetry
值,则将一条消息丢弃或移至死信。 - 在每次迭代中,每条消息仅处理一次。因此,如果消费者遇到我们的消费者开始时间后产生的消息,它会暂停并等待下一个迭代。
作为概述,它的作品如下所示。
如何使用它? ð
首先,我们需要设置所需的配置值。我们还可以指定KAFKA消费者和生产者相关的配置,但不是强制性的。
consumer:
groupId: "exception-consumer"
topic: "exception"
maxRetry: 3
concurrency: 1
cron: "*/20 * * * *"
duration: 15m
我们定义了consumeFn
函数,该功能描述了如何消耗异常消息。
之后,我们使用这些配置值和consumeFn
和运行初始化了kafka cronsumer。它开始工作如上所述。
func main() {
// ..
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
您可以在this directory上找到许多现成的示例。
什么时候使用? ð
- 基于迭代的后退策略适用
- 可以以最终一致的状态处理消息
- Max Retry超过消息可以忽略
- 提高消费者的弹性 - 通过并发提高消费者绩效
什么时候避免?
- 应按顺序处理消息
- 肯定应该处理消息(如果超过最大重试,我们丢弃消息)
- 应该投入消息(我们使用自动命令间隔来提高性能)
- 带有TTL的消息(时间是直播的)
感谢您阅读这篇文章。欢迎所有反馈。我们喜欢分享我们的知识。如果您想与我们分享,可以申请我们的open positions。
共同作者:
@abdulsametileri