RabbitMQ
消息推送到接收的过程
Exchange
- Direct Exchange
- Fanout Exchange
- Topic Exchange
消息重复消费?
消费者消费消息出现异常后,消息会不断 requeue (重新入队),会导致 mq 压力剧增
Spring 的 retry 机制
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息
- ImmediateRequeueMessageRecoverer:返回 nack,消息重新入队。
- RepublishMessageRecoverer:将失败消息投递到指定的交换机,
如何保证消息不丢失?
消息丢失的三种情况
-
生产者在将消息发送到 MQ 的时候丢失。 开启事务或开启消息确认机制
-
MQ 丢失数据,未持久化。 开启 RabbitMQ 持久化
-
消费者收到消息,还没消费数据就宕机了。 关闭 RabbitMQ 自动确认机制
-
消息持久化
- Exchange 持久化
- 创建队列时,将队列声明为持久化的。
durable=true
- 发送持久化消息。
delivery_mode=2
-
消息确认机制
- 发送消息给 MQ 时
- 启用发布者确认模式
- 发送消息
- 处理确认
- 发送消息给 MQ 时
如何保证消息不重复消费
- 确保消费者处理消息的操作是幂等的
- 使用消息确认机制,如果消费者消费失败,MQ 将该消息重新投递给其他消费者。
- 消息中添加唯一标识符,在处理之前先检查数据库或缓存是否已处理过该标识符的消息。
- 设置死信队列来处理无法正常消费的消息。(可以将那些重复投递超过特定次数的消息转移到死信队列,避免无限循环消费)
死信队列
什么消息会成为死信呢?
同时满足以下条件:
- 消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
被设置为false
- 消息在队列的存活时间超过设置的 TTL 时间。
- 消息通过设置
expiration
字段来设置 TTL - 队列通过设置
x-message-ttl
来设置 TTL
- 消息通过设置
- 消息队列的消息数量已经超过最大队列长度。
- 队列通过设置
x-max-length
来设置消息的最大数量,默认无限制。
- 队列通过设置
如何配置死信队列?
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由 key
- 为死信交换机配置死信队列
延时队列
希望消息在一定时间之后再被处理
- 订单超时
- 用户注册成功之后,未在 24 小时内登录
死信队列实现
控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某个指定的交换机。
缺点
- MQ 只会检查队列中第一个消息是否过期
- 需要创建一个普通队列加一个死信队列