跳到主要内容

RabbitMQ

消息推送到接收的过程

Exchange

  • Direct Exchange
  • Fanout Exchange
  • Topic Exchange

消息重复消费?

消费者消费消息出现异常后,消息会不断 requeue (重新入队),会导致 mq 压力剧增

Spring 的 retry 机制

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息
  • ImmediateRequeueMessageRecoverer:返回 nack,消息重新入队。
  • RepublishMessageRecoverer:将失败消息投递到指定的交换机,

如何保证消息不丢失?

消息丢失的三种情况

  1. 生产者在将消息发送到 MQ 的时候丢失。 开启事务或开启消息确认机制

  2. MQ 丢失数据,未持久化。 开启 RabbitMQ 持久化

  3. 消费者收到消息,还没消费数据就宕机了。 关闭 RabbitMQ 自动确认机制

  4. 消息持久化

    • Exchange 持久化
    • 创建队列时,将队列声明为持久化的。 durable=true
    • 发送持久化消息。 delivery_mode=2
  5. 消息确认机制

    • 发送消息给 MQ 时
      • 启用发布者确认模式
      • 发送消息
      • 处理确认

如何保证消息不重复消费

  1. 确保消费者处理消息的操作是幂等的
  2. 使用消息确认机制,如果消费者消费失败,MQ 将该消息重新投递给其他消费者。
  3. 消息中添加唯一标识符,在处理之前先检查数据库或缓存是否已处理过该标识符的消息。
  4. 设置死信队列来处理无法正常消费的消息。(可以将那些重复投递超过特定次数的消息转移到死信队列,避免无限循环消费)

死信队列

什么消息会成为死信呢?

同时满足以下条件:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject,并且此时 requeue 被设置为 false
  2. 消息在队列的存活时间超过设置的 TTL 时间。
    • 消息通过设置 expiration 字段来设置 TTL
    • 队列通过设置 x-message-ttl 来设置 TTL
  3. 消息队列的消息数量已经超过最大队列长度。
    • 队列通过设置 x-max-length 来设置消息的最大数量,默认无限制。

如何配置死信队列?

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由 key
  3. 为死信交换机配置死信队列

延时队列

希望消息在一定时间之后再被处理

  1. 订单超时
  2. 用户注册成功之后,未在 24 小时内登录

死信队列实现

控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某个指定的交换机。

缺点

  1. MQ 只会检查队列中第一个消息是否过期
  2. 需要创建一个普通队列加一个死信队列

插件实现方式

rabbitmq_delayed_message_exchange 插件

	// 创建延时交换机
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.directExchange("delay.exchange").delayed().durable(true).build();
}

// 创建队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue").build();
}

// 创建延时交换机和队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange,
@Qualifier("delayQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}

@RabbitListener(queues = "delay.queue")
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("延迟队列收到消息:{}", msg);
}