RocketMQ-最佳实践&积压消息如何快速处理?

admin 2025年2月10日11:58:15评论11 views字数 2003阅读6分40秒阅读模式

1、RocketMQ最佳实践一些做法

(1)合理分配Topic、Tag

一个应用尽可能用一个Topic,消息子类型用tags标识.

(2)使用Key加快消息索引

Key可以参与消息过滤,建议每个消息key分配一个业务层面的唯一标识码。可以配合Tag进行更精确的消息过滤,Broker端会为每个消息创建一个哈希索引。

(3)关注错误消息重试
  • 消费者端如果处理消息失败,Broker会重新投送消息。重试时,RocketMQ会为每个消费者组创建一个对应的重试队列。

  • 超过重试次数,消息将不再投递,转为进入死信队列。重试次数默认16次,可以定制,时间间隔均为2小时。

  • 消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效,最后启动的Consumer会覆盖之前启动的Consumer的配置。

(4)手动处理死信队列

要处理死信队列,就得先了解什么是死信队列?有什么特征?

  • 一个死信队列对应一个ConsumeGroup,不是对应某个消费者实例。如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为它创建相应的死信队列。

  • 一个死信队列包含这个ConsumeGroup里的所有死信消息,不区分该消息属于哪个Topic。死信队列中的消息不再会被消费者正常消费。

  • 死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性。超过有效期的消息都会被删除,不管消息是否消费过。

(5)消费者端进行幂等控制

消息幂等有三种实现语义

  • at most once 最多一次: 每条消息最多只会被消费一次,用异步发送、sendOneWay等方式可以保证。

  • at least once 至少一次:  每条消息至少会被消费一次,同步发送、事务消息等方式能够保证。

  • exactly once 刚刚好一次: 每条消息都只会被消费一次,需要由业务系统自行保证消息的幂等性。

消息重复的3个场景

① 发送时

  • 当一条消息已经被成功发送到服务端,并完成持久化,

  • 出现网络闪断或客户端宕机,导致服务端对客户端应答失败,

  • 生产者意识到消息发送失败并尝试再次发送消息,

  • 消费者会收到两条相同内容和Message ID的消息。

② 投递时

  • 消息已经投递到消费者,并完成业务处理,

  • 当客户端给服务端反馈应答时网络闪断,

  • 为了保证消息至少被消费一次,服务端会在网络恢复后,再次尝试投递已被处理过的消息,

  • 消费者会收到两条相同内容和Message ID的消息。

③ 负载均衡时

  • 当Broker或客户端重启、扩容或缩容时,

  • 会触发负载均衡,消费者可能会收到重复消息。

处理方式或方案

需要在业务上自行来保证消息消费的幂等性,使用业务唯一标识作为消息的Key进行传递。

————我是分割线————

关于积压消息如何快速处理问题?第一步确认是否有消息积压?

2、第一步确认是否有消息积压?

正常情况下,消息生产速度和消费速度整体上是平衡的。如果部分消费者系统出现故障,会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。还有一种情况就是业务突然爆发性增长,也会出现消息积压。

有三种方式可以查看

(1)最简单的方式,可以通过web控制台,就能直接看到消息的积压情况。就是RocketMQ Dashboard的界面,如何搭建可以参考之前写过的文章《RocketMQ-管控利器【Dashboard 2.0】服务搭建》。

(2)通过mqadmin指令在后台检查各个Topic的消息延迟情况。

(3)查看 ${storePathRootDir}/config 目录下落地一系列的json文件,可以用来跟踪消息积压情况。

对于积压消息的处理,就看消息是否重要了。如果不重要,可以在管理控制台直接使用跳过堆积的功能;如果很重要,且是消费者端处理能力不够,造成消息大量积压,则需要设计一套消息转移方案来处理。

3、消息积压转移的方案

具体要采用哪种方案,要看Topic下的MessageQueue配置是否足够多?

2.1、Topic下的MessageQueue配置足够多
  • 每个Consumer实际上会分配多个MessageQueue消费,可以通过增加Consumer的服务节点数量加快消息的消费。

  • 等积压消息消费完了,再恢复成正常情况。

  • 最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。

2.2、Topic下的MessageQueue配置不够
  • 可以创建一个新的Topic,配置足够多的MessageQueue。

  • 把所有消费者节点的目标Topic转向新的Topic,紧急上线一组新的消费者,只负责消费旧Topic的消息,转储到新的Topic。

  • 在新的Topic上,可以通过增加消费者个数提高消费速度,之后再根据情况恢复成正常情况。

4、我的公众号

敬请关注我的公众号:大象只为你,持续更新技术知识......

原文始发于微信公众号(大象只为你):RocketMQ-最佳实践&积压消息如何快速处理?

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年2月10日11:58:15
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   RocketMQ-最佳实践&积压消息如何快速处理?http://cn-sec.com/archives/3719764.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息