我们的任务是从9个国家 /地区收集数据,并将其标准化为特定格式,之后我们必须将其提供给其他团队以进行进一步处理。这要求我们每天处理数千条消息,并对它们执行数据库操作。在某些情况下,我们获得了完整的数据转储,将发布率提高到每秒4000至5000条消息。
问题的定义
我们面临的是,消息发布率很高,导致兔子队列超过其长度限制。我们最初的解决方案是通过添加更多的消费者来扩展,但是即使有20多个消费者,该系统也很难处理大量消息。虽然DB操作的批处理处理和创建正确的索引有所帮助,但并不能阻止队列最终完整。
另外,处理数据导入/导出脱机是不可行的,因为这将导致数据不一致,因为数据每天更新。
最后,该代码旨在通过使用异步方法来处理并发过程。
添加redis层
为了解决问题,我们探索了各种解决方案,并决定在RabbitMQ和下游工人之间引入重新层。通过将REDIS作为具有较大队列容量的缓冲区引入,这些消息将从RabbitMQ转移到Redis,然后直接被Redis工人吞噬。这种方法有助于减轻兔子队列的问题,并允许更稳定的消息处理系统。通过采用这种方法,我们能够优化消息系统的性能并提高数据处理管道的整体效率。
该解决方案使我们能够利用RabbitMQ作为接收消息的可靠手段,而实际的消息消耗和处理是通过REDIS进行的。 Redis充当中介缓冲区,我们能够处理大量消息而不承担兔子队的负担。
这种方法还使我们能够分批处理消息,这有助于提高数据库操作的整体效率。通过分批处理消息,我们最大程度地减少了所需的单个数据库操作的数量,这又有助于减少处理时间并提高整体系统性能。
总的来说,这种方法被证明是管理大量消息的有效解决方案,并帮助我们实现了优化消息处理和改善系统性能的目标。
技术实施
我们的团队依靠Helm和Kubernetes的强大组合来在GCP环境中部署我们的服务。我们的部署方法主要集中于部署消费者实例,这些实例是专门用于处理消息处理和数据库操作的
构建的。至于RabbitMQ,它是由基础设施团队部署在根级的。这意味着公司内的每个团队每个环境都使用相同的RabbitMQ。这种方法允许在不同的团队和服务之间提高效率和一致性。
我们选择使用Mongo Atlas作为数据库解决方案,因为它可以提供可扩展的群集解决方案。这意味着我们可以根据所处理的数据量轻松地扩展或向下扩展,以确保我们的服务能够处理大量消息而无需任何性能问题。
凭借Mongo Atlas的功能和灵活性,我们能够构建和部署能够有效处理我们的消息处理工作流的复杂需求的强大服务。
以前,我们用头盔和K8部署了Redis独立的。但是,我们经常遇到由于溢出或代码逻辑故障而导致REDIS崩溃的问题。这不仅引起了Redis本身的问题,而且还影响了KEDA的表现,Keda对于大规模应用并不理想。
结果,我们已经独立部署了Redis,而是开始使用GCP中的MemoryStore。 MemoryStore一直在按预期进行表现,并帮助我们避免了Redis所遇到的问题。总体而言,我们发现内存店是大规模应用程序的更可靠的解决方案。
简单的建筑概述将如下所示:
如前所述,这些消息将源自多个来源,最初发送到RabbitMQ。但是,由于每秒发表的消息量大多,队列很容易变得不知所措。
结果,而不是直接消耗消息,而是将其转发给Redis。然后将消息分批从Redis处理并保存到Mongo中。
结论
总而言之,我们的团队成功完成了从九个不同国家收集和标准化数据的任务,并有效地将其提供给其他团队以进行进一步处理。此任务要求我们每天处理大量消息并执行复杂的数据库操作,包括管理大大提高发布率的完整数据转储。