kafka异常c {r}onsumerð¥ð
#go #kafka #cron

作为一支Trentyol索引团队,我们的体系结构最终基于基于事件的系统,最终应该是一致的。因此,即使这个词似乎使人感到宽慰,也很难确保系统中的每个信息都已成功处理。

我们的事件系统依赖于Apache Kafka。当我们需要在事件流中进行性能时,它为我们提供了强大的能力。我们的消息携带无效(每日150m+),应适用于Trentyol含量(300m+)。这种无效可能是关于股票,促销或我们的30个活动之一。

在活动世界中,即使在一些不一致的州,我们也需要协调我们的信息。重试和延迟消息是我们最终的事件生命周期中的关键策略。我们有10多名消费者应该应用此策略。因此,我们创建了一个名为Kafka Cronsumer的开源库,以便于实施。

Figure 1: Show me the code


kafka cronsumer如何工作−

如库名称所建议的那样,它根据给定的cron表达式消耗异常主题中的事件。

例如,我们可以将CRON表达式指定为*/20 * * * *,这意味着每20分钟运行一次。我们还可以设置一个duration 15分钟(15m)代表我们在固定持续时间内积极消费事件的例外。

  • 如果消费者在食用消息时会遇到错误,它会增加消息重试计数,并产生处理下一个迭代(下一个工作时间)的异常主题。
  • 如果超过maxRetry值,则将一条消息丢弃或移至死信。
  • 在每次迭代中,每条消息仅处理一次。因此,如果消费者遇到我们的消费者开始时间后产生的消息,它会暂停并等待下一个迭代。

作为概述,它的作品如下所示。

Figure 2: Kafka Exception Cronsumer timeline overview


如何使用它? ð

首先,我们需要设置所需的配置值。我们还可以指定KAFKA消费者和生产者相关的配置,但不是强制性的。

consumer:
  groupId: "exception-consumer"
  topic: "exception"
  maxRetry: 3
  concurrency: 1
  cron: "*/20 * * * *"
  duration: 15m

我们定义了consumeFn函数,该功能描述了如何消耗异常消息。

之后,我们使用这些配置值和consumeFn和运行初始化了kafka cronsumer。它开始工作如上所述。

func main() {
        // ..
    var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
        fmt.Printf("consumer > Message received: %s\n", string(message.Value))
        return nil
    }

    c := cronsumer.New(kafkaConfig, consumeFn)
    c.Run()
}

您可以在this directory上找到许多现成的示例。


什么时候使用? ð

  • 基于迭代的后退策略适用
  • 可以以最终一致的状态处理消息
  • Max Retry超过消息可以忽略
  • 提高消费者的弹性 - 通过并发提高消费者绩效

什么时候避免?

  • 应按顺序处理消息
  • 肯定应该处理消息(如果超过最大重试,我们丢弃消息)
  • 应该投入消息(我们使用自动命令间隔来提高性能)
  • 带有TTL的消息(时间是直播的)

感谢您阅读这篇文章。欢迎所有反馈。我们喜欢分享我们的知识。如果您想与我们分享,可以申请我们的open positions


共同作者:
@abdulsametileri