跳到主要内容

RabbitMQ

安装

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash

sudo apt-insatll rabbitmq-server

docker

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management

tencent cloud

docker run -d --name rabbitmq3.12 -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  \ 
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=adminwangzhy rabbitmq:3.12-management

默认用户 guest/guest

rabbitmqctl add_user root root
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
rabbitmqctl set_user_tags root adminstrator

生产者与消费者模型

product 生产者,创建并投递消息

消息 payload

  • 消息体 :业务逻辑数据,例如 JSON
  • 标签 label : 交换器的名称和一个路由键

consumer:消费者,接收消息

broker:消息中间件的服务节点

queue:队列,RabbitMQ 内部对象,存储消息。订阅者均摊(不是每个,消费者不会接收到所有的消息)

Exchange

exchange:交换器。生产者将消息发生到交换器上,然后交换器将消息路由到队列上。

Exchange 类型

  • default exchange:是一个特殊的 direct exchange
  • fanout:路由到所有与交换器绑定的队列,忽略 routing key 。
  • direct:路由到 routingkey 与 bindingkey 完全匹配的队列
  • topic:routingkey 与 bindingkey 相似匹配的队列
  • headers:根据消息的 headers 属性来匹配。(性能差,基本用不上)

Exchange 重要的属性

  • name
  • durability 持久性
  • Auto-delete 自动删除
  • Arguments 参数,通过插件和 broker-specific features

Queue

在使用队列之前,必须声明它。如果队列不存在,则声明一个队列将导致创建它。如果队列已经存在并且其属性与声明中的属性相同,则该声明将无效。当现有队列属性与声明中的属性不同时,将引发代码为 406 (PRECONDITION_FAILED) 的通道级异常

  • name
  • durable:持久化
  • exclusive:仅由一个连接使用,当该连接关闭时队列将被删除
  • auto-delete
  • arguments

Queue Name

Queue Durability

queues can be declared as durable or transient.

Bindings

Bindings are rules that exchanges use (among other things) to route messages to queues. Bingings 是 exchanges 用来路由message 到 队列的规则。

The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. 路由键的目的是选择发布到交换器的某些消息以路由到绑定队列。

Consumers

  • push API :subscribe to a queue
  • pull API 不推荐

Message Acknowledgements

消息确认

when should the broker remove messages from queues?

代理应该什么时候从队列中移除消息?

  1. After broker sends a message to an application (using either basic.deliver or basic.get-ok method).
    • 在 borker 向应用程序发送消息后(使用 basic.deliver 或 basic.get-ok 方法)。
    • 自动确认机制
  2. After the application sends back an acknowledgement (using the basic.ack method).
    • 在应用程序发回确认后(使用 basic.ack 方法)。
    • 显式确认机制
      • 可以在收到消息之后
      • 在处理之前将其持久化到数据存储之后
      • 在完全处理消息之后

If a consumer dies without sending an acknowledgement, the broker will redeliver it to another consumer or, if none are available at the time, the broker will wait until at least one consumer is registered for the same queue before attempting redelivery. 如果消费者在没有发送确认消息的情况下死亡,代理会将其重新交付给另一个消费者,或者如果当时没有可用的消费者,代理将等到至少有一个消费者在同一队列中注册,然后再尝试重新交付。

Rejecting Messages

When a consumer application receives a message, processing of that message may or may not succeed. An application can indicate to the broker that message processing has failed (or cannot be accomplished at the time) by rejecting a message. When rejecting a message, an application can ask the broker to discard or requeue it. When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again.
当消费者应用程序收到一条消息时,对该消息的处理可能成功也可能不成功。应用程序可以通过拒绝消息来向代理指示消息处理失败(或当时无法完成)。拒绝消息时,应用程序可以要求代理丢弃或重新排队。当队列中只有一个消费者时,请确保您不会通过一遍又一遍地拒绝来自同一消费者的消息并将其重新排队来创建无限的消息传递循环。

Negative Acknowledgements

basic.reject

Prefetching Messages

Message Attributes and Payload

消息属性

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

Connections

Channels

Virtual Hosts

虚拟主机

Others

routing key/ binding key

routing key:路由键。需要与交换器类型和 binding key 联合才能生效

binding key:绑定。将交换器与队列关联起来

AMQP

  • Module Layer :定义供客户端调用的命令
  • Session Layer :将客户端命令发送给服务器,将服务器应答返回给客户端
  • Transport Layer :传输数据。

mandatory / immediaate

mandatory

  • true:无法找到符合条件的队列时,会把消息返回给生产者
  • false:直接丢弃

immediate:RabbitMQ3.0 开始不支持

备份交换机 Alternate Exchange

Map<String,Object> args = new HashMap<>();
args.put("alternate-exchange","my_alternate_exchange");
channel.exchangeDeclare("normal_exchange_name","direct",true,false,args);

优先级高于 mandatory 参数。同时设置 alternate-exchange 和 mandatory 的情况下,mandatory 参数会失效。

过期时间 TTL

设置消息的 ttl 的两种方式

  • 在创建队列的时候设置: x-message-ttl
  • 对每条消息单独设置:message-ttl

如果消息队列和消息都设置了 ttl,则 ttl 取最小值。

如果消息的生存时间超过了 ttl 值,就会变成死信。dead message

ttl = 0,表示如果不能找到消费者,就直接将消息丢弃。

设置队列的 ttl

  • x-expires 参数

如果队列在ttl 过期之后,会将队列删除。

死信队列 Dead-Letter-Exchange

消息成为死信的几种情况

  • 消息被拒绝 (Basic.reject || Basic.nack) && requeue=false
  • 消息过期 ttl
  • 队列达到最大长度

给队列指定死信交换机:"x-dead-letter-exchange" 指定路由键:"x-dead-letter-routing-key",如果不指定的话,采用原来的值。

延时队列插件

安装插件

  1. 官网下载 rabbitmq_delayed_message_exchange 插件: https://www.rabbitmq.com/community-plugins
  2. 复制到 docker 容器的 /plugins 目录中。
    • docker cp rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins
  3. 启用插件。
    • docker exec -it rabbitmq bash
    • rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  4. 查看插件是否安装成功
    • rabbitmq-plugins list | grep delayed
    • RabbitMQ 管理页面可以新建一个 x-delayed-messageExchange

RabbitMQ 的作用

  1. 流量削峰
  2. 应用解耦
  3. 异步处理

概念

  1. 生产者
  2. 交换机
  3. 队列
  4. 消费者

交换机1--队列N

七大模式

  1. Hello World
  2. Work Queues
  3. Publish/Subscribe
  4. Routing
    1. 匹配与 routing-key 相同的 queue
  5. Topics
    1. * : 匹配一个单词
    2. # :匹配 0 或多个单词
  6. RPC
  7. Publisher Confirms

消息模式

  • 请求/回复
    • producer -> message -> request exchange -> request queue -> consumer -> response exchange -> response queue -> producer
  • 点对点
    • binding-key (直接匹配 queue)
  • 发布-订阅
    • Fanout (广播)
  • 消息分发
    • Topic (类似于关键字匹配)
    • Header

可靠性

  1. 如何保证消息的传递

使用接受确认机制。发信方会收到 Broker 发送的接收确认,而 Broker 会收到收信方的接收确认。

  1. Broker 如何解决单点故障问题?
  • 持久化:设置队列为 Durable 以及消息为 Persisten
  • 队列镜像 Mirrored Queue

死信队列

  • TTL 过期
  • 队列满了
  • 消息被拒绝 (basic.reject || basic.nack) && requeue == false

延迟队列

插件

发布确认

幂等性问题

优先级队列

惰性队列

x-queue-mode = lazy

集群

Spring Boot 整合 RabbitMQ

application.properties

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

Exchange

1、创建 exchange

代码执行之后

重启 rabbitmq 之后(停止执行代码)

durable=true 将 exchange 持久化。 autoDelete=true 在 exchange 没有绑定 queue 时,会自动将 exchange 删除。

durable 优先级高于 autoDelete。或者说,autoDelete 只在 durable=false 时生效。

Queue

durable:持久化
exclusive:只允许声明这个 queue 的连接使用
autoDelete:自动删除