在接收KAFKA消息后,在使用KAFKA进行事件驱动的应用程序时,您的应用程序需要对其进行操作。对于此博客文章,让我们称此部分为“处理消息”。
卡夫卡又是什么?
但是,在我们深入研究之前,让我们确保我们都在同一页面上,并刷新我们的记忆。
Apache Kafka是一个超强大的分布式流式流平台,可实时处理,存储和分析大量数据流。它的构建是为了处理大量,高吞吐量和低延迟数据流。
在Kafka中,消费者是一个读取一个或多个主题消息的应用程序。消费者订阅一个或多个主题,并消费这些主题上产生的消息。消费者跟踪他们消耗的消息的抵销,以便在失败或重新启动时可以接收到停止的位置。
消费者也可以成为消费者组的一部分,在该组中,该组中的每个消费者都被分配了一个主题的分区子集,从而可以并行消耗数据。
与示例一起工作
现在我们都在同一页面上,让我们创建一个示例。想象一下,您为一家名为ACME的公司工作,并且您有一个KAFKA主题,每次创建新客户时都会收到新消息。该客户需要收到一封电子邮件,说他们的帐户是完全创建的,他们需要验证他们的电子邮件。
当我们阅读类似“当某事发生”之类的东西时,请始终考虑事件!在卡夫卡的活动是消息!
我们将创建一个将:
的微服务- 从Kafka接收此新客户消息。
- 编写一个不错的电子邮件。
- 将其发送给客户。
- 向另一个主题添加新消息,说该消息已发送!
让我们的图表以获取更多可见性:
检索在哪里?
很酷,但是,这是一篇关于重试的博客文章吗?所有这些都在哪里?
好吧,处理的任何部分都会出错,我们不希望我们的客户错过他们的帐户创建电子邮件,对吗?因此,让我们谈谈一些可能出错的事情,以及我们如何通过重试处理它们:
- 电子邮件模板数据库可能会关闭。没问题,我们将一直试图连接直到备份为止。
- 模板在其内部持续存在可能是无效的。我们将检查此信息并提醒团队在需要时进行更新。
- SMTP服务器可能会关闭或删除消息。我们将继续尝试发送电子邮件,直到通过。
- 该应用程序可能会严重构成SMTP消息,而SMTP服务器拒绝它。我们将捕获此错误并修复构图,然后再尝试。
- 将消息传达给其他Kafka主题可能会在Kafka的服务器端返回错误。我们将继续尝试直到成功发送为止。
提到的某些错误可能需要解决方案的代码或数据修改,例如更新模板或纠正应用程序撰写SMTP消息的方式。这些类型的错误可能不适合重试,因为它们可能需要手动干预。
另一方面,可以通过重试操作来解决其他错误,例如临时数据库停机时间,片状消息生产或超载SMTP服务器。
解锁卡夫卡消费者重试
作为我们几个微服务的每日挑战,卡夫卡中缺乏内置重试机制可能会令人沮丧。我已经尝试了各种软件架构来重试消息处理,但是它们都不是完美的解决方案 - 但最容易处理的解决方案。
在潜入解决方案之前,让我在以前的尝试中分享一些背景。
AWS SQS:一个不错的选择,但是有限制
作为AWS的强有力的拥护者,我在考虑重试时的第一个想法是AWS SQS。
亚马逊简单队列服务(SQS)是一款完全管理的消息队列服务,可实现微服务,分布式系统和无服务器应用程序的分离和缩放。它具有内置的重试功能,例如向后算法,在移至死语队列之前的最大重试限制,并支持256 kb的最大消息大小。
虽然SQS是重试的绝佳选择,但在消息传递大小方面,它跌幅不足。 Kafka消息的大小最多可达到1 MB,甚至可以超过它。
但是您告诉我“好吧,亚历克斯,但这没什么大不了的”,这有点是,因为我们没有很好的控制消息大小的方法,它可以增长和破坏您的应用程序,而无需您意识到!到那时,您将丢失消息,或者您的手上将发生大型生产事件。
因此,尽管SQS是一个不错的选择,但它并不是我们基于Kafka的微服务的最佳选择。但是请放心,我们找到了一个更好的解决方案。
卡夫卡
是的,仅此而已,因为SQS不是一个合适的解决方案,为什么不使用kafka本身?
我试图创建一个内部应用程序KAFKA主题,该主题只能由应用程序使用,该应用程序将在其中向消息推出消息,并且在失败的情况下,它将再次使用新的计数属性来重新加入消息。标题。让我示意一下,以使其更加清晰:
尽管这种方法确实有效,但它具有一些重要的缺点。首先,设置Kafka主题并不像SQ那样简单,创建多个主题可能会有些麻烦,并且与代码中的所有消费者和生产者打交道可能会变得非常混乱。
此外,Kafka不支持内置的重试。这是因为KAFKA协议是为高通量和低延迟数据流而设计的,并重试为系统增加了开销和复杂性。
数据库
绝望的时代要求采取绝望的措施,对
作为最后的度假胜地,我转向使用数据库进行消息重试。现代SQL数据库具有可以利用表作为队列的锁定机制。
令我惊讶的是,这种方法奏效了!但是,这不是一个完美的解决方案。数据库的设计不是作为队列起作用,并且以这种方式使用它们可能是一个伸展。这种方法有一些弊端:
- 数据库是为了使用交易而构建的,可以使它们能够使其表现出色,但是它们永远不会像Kafka这样的专业工具。这很容易成为系统中的瓶颈。
- 管理所有这些代码很容易成为问题,并且在整个公司中共享这并不是一个容易的模式。
所以现在怎么办?记忆!
尝试了上述所有解决方案并对结果不满意后,我决定研究其他公司和框架如何处理重试。
对我来说突出的一个框架是Spring,这是一个开源Java框架,为构建现代企业应用程序提供了全面的编程和配置模型。
春天
Spring中的一个模块之一是Spring Kafka,它提供了几种处理消息时处理重试的方法。其中包括使用弹簧重试库中的RetryTemplate
和RetryCallback
接口来定义重试策略,使用@KafkaListener
注释以人均方式配置重试行为,或使用aop使用RetryInterceptor
。
Spring Kafka允许开发人员选择最适合其用例的方法,无论是全球重试策略还是更细粒度的每方法重试配置。
自从我不使用Java以来已经很长时间了,春季代码远非易于理解(这是非常高级的东西),但是在深入研究Spring Kafka框架之后,我发现它实现了错误处理程序使用一种策略来处理消费者抛出的例外。
当不是BatchListenerFailedException
的异常(Spring知道是不可重试的)时,错误处理程序将从内存中重试批次记录。这样可以防止在扩展重试序列中进行消费者的重新平衡,并提供更优雅的解决方案。
我诚实认为,春季Kafka框架提供了一种强大而灵活的处理方法,可以考虑任何企业应用程序。
它为开发人员提供了一系列选项,并允许对重试行为进行更细粒度的控制,这是任何希望为其消息传递系统实施重试机制的组织的绝佳选择。
在GO中重新创建Spring Kafka的重试机制
我目前与(和Loveð)一起工作,就我的研究而言,我没有发现春季kafka和重试框架的任何重写,所以,让我们构建自己的吧!
在这里,我想指出与Spring Kafka的方法相比,我的实现的一个关键区别。我感谢暂停消费者并在发生灾难性失败的情况下回到批次的先前记录的想法。但是,我不认为这是必要的,并且选择以不同的方式实施。
我的方法包括:
- 创建一个从主题读取的消费者。
- 创建处理重试的GO频道。
- 如果消息未能处理,则将其发送到重试频道。
- 主要消费者继续处理主要主题,而其他消息则被重述。
- 如果筋疲力尽,该消息将发送到持续的DLQ进行以后调查。
â€this重要的是,该方法假设您的应用程序并不强烈依赖消息的顺序。如果是这种情况,您将需要在重试消息时停止消费者。但是,这种方法允许在尝试重试时在主题上取得进展。
这是代码
纯文本说明
kafka_retry_dlq
软件包提供了一种机制,可以在处理错误时使用重试机制从Kafka主题中食用消息。
类型
-
ProcessRetryHandler
:必须通过处理消息处理的类型来实现的接口。它具有以下方法:-
Process(context.Context, kafka.Message) error
:处理消息,如果处理失败,则返回错误。 -
MoveToDLQ(context.Context, kafka.Message)
:将消息移至死信队列。 -
MaxRetries() int
:返回消息的最大重试。 -
Backoff() backoff.BackOff
:返回退缩消息的退缩策略。
-
变量
-
retryQueue
:一个带有1000条消息的缓冲区的kafka.Message
频道。
职能
-
NewConsumerWithRetry(ctx context.Context, brokers []string, topic string, partition int, handler ProcessRetryHandler)
:创建具有重试机制的新的Kafka消费者。- 它创建了一个新的Kafka读者,其中包括提供的经纪人,主题,分区和缓冲区大小。
- 它启动了一个goroutine,该goroutine读取来自kafka主题的消息,并使用提供的
handler
进行处理。 - 如果消息的处理返回错误,则该消息将发送到
retryQueue
频道。 - 它启动了另一个goroutine,它会倾听
retryQueue
频道并使用提供的handler
处理消息,直到达到最大重试或成功处理消息。 。
- 如果达到了最大的检索数,则使用
handler
的MoveToDLQ
方法移动到死信队列。
综上所述
检索通过允许它们处理意外的故障和错误来提高事件驱动系统的弹性。但是,至关重要的是,还必须将重试与指标,服务级别目标(SLO)和服务级别协议(SLA)结合起来,以确保系统始终处于错误状态。目的是仅在特殊情况下重试应用程序,而Reties与SLO和SLA的结合有助于保证这一点。