SpringBoot整合RabbitMQ

创建日期:2021/10/26   修改日期:2023/07/19   Java   SpringBoot   RabbitMQ

示例代码:


本文环境

  • SpringBoot 2.6.x
  • RabbitMQ 3.9.x

依赖

SpringBoot已经整合了RabbitMQ,使用时只需要如下依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.amqp</groupId>
  11. <artifactId>spring-rabbit-test</artifactId>
  12. <scope>test</scope>
  13. </dependency>

配置

配置连接也很简单,只要在application.yml添加如下配置即可

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1 # 地址
  4. port: 5672 # 端口
  5. username: guest # 用户名
  6. password: guest # 密码

入门使用

下文中介绍RabbitMQ5种入门使用方法

  1. 使用默认交换机类型模式:
    1. "Hello World!"入门:最简单的消息发送与接收
    2. Work queues工作队列:分配任务(单生产多消费)(轮询模式)
  2. 使用指定交换机类型模式:根据交换机类型分为如下三种
    1. Publish/Subscribe发布/订阅:同时向许多消费者发送信息
    2. Routing路由:有选择地接收消息
    3. Topics主题:基于模式(主题)接收消息

Hello World入门

入门示例介绍一个生产者和一个消费者的情况。在下图中:

  • P是生产者,用于发送消息
  • C是消费者,用于接收消息
  • 红色的框是队列,是消息缓冲区

队列

在发送和接收消息之前,需要先声明队列。消费者只能监听已存在的队列,所以声明队列配置放在消费端。

此处声明了一个队列,名称为hello-world

  1. @Component
  2. public class HelloWorldConfig {
  3. /**
  4. * 入门队列配置
  5. */
  6. @Bean
  7. public Queue helloWorldQueue() {
  8. // 创建一个队列,并指定队列名称
  9. return new Queue("hello-world");
  10. }
  11. }

消费者

使用@RabbitListener注解创建一个监听器,用于指定监听哪个队列。使用@RabbitHandler指定方法接收数据,根据入参类型处理不同类型的数据。下文展示了处理不同类型的消息

  1. @Component
  2. @RabbitListener(
  3. // 指定要监听哪些队列(可指定多个)
  4. queues = "hello-world")
  5. public class HelloWorldReceiver {
  6. /**
  7. * 接收字符串
  8. */
  9. @RabbitHandler
  10. public void receive(String msg) {
  11. System.out.println("----Received String:" + msg);
  12. }
  13. /**
  14. * 接收数字
  15. */
  16. @RabbitHandler
  17. public void receive(Integer msg) {
  18. System.out.println("====Received Integer:" + msg);
  19. }
  20. /**
  21. * 接收实体
  22. */
  23. @RabbitHandler
  24. public void receive(User msg) {
  25. System.out.println("||||Received Entity:" + msg);
  26. }
  27. }

生产者

生产者使用RabbitTemplate发送消息,在ControllerService、或者其他类中使用@Autowired注解引入RabbitTemplate即可使用。使用convertAndSend方法自动将对象转换为消息并发送。下文中展示了发送不同类型的消息

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 入门 生产者
  11. */
  12. @GetMapping("hello-world")
  13. public Integer helloWorld() {
  14. System.out.println("~~~~Sent:" + i);
  15. // 发送字符串
  16. rabbitTemplate.convertAndSend(
  17. // 发送到哪个队列,不填写该参数时默认为空队列
  18. "hello-world",
  19. // 具体消息内容
  20. i + "");
  21. // 发送数字
  22. rabbitTemplate.convertAndSend("hello-world", i);
  23. // 发送对象
  24. rabbitTemplate.convertAndSend("hello-world", new User(i, "TOM"));
  25. return i++;
  26. }
  27. }

结果

访问http://127.0.0.1:8080/hello-world调用生产者发送消息,看到控制台打印如下消息

  1. ~~~~Sent:1
  2. ----Received String:1
  3. ====Received Integer:1
  4. ||||Received Entity:User(id=1, name=TOM)

Work Queues工作队列

当消费者需要很长时间才能处理一条消息时,可以建立多个消费者分配处理任务。队列会将消息以轮询模式分配给消费者

不公平分发

RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好。比方说有两个消费者在处理任务,其中有个消费者A处理任务的速度非常快,而另外一个消费者B处理速度却很慢,这个时候还是采用轮训分发的话就会导致处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活。但是RabbitMQ并不知道这种情况它依然很公平的进行分发。为了避免这种情况,可以设置参数spring.rabbitmq.listener.simple.refetch

  1. spring:
  2. rabbitmq:
  3. listener:
  4. type: simple # 默认
  5. simple:
  6. prefetch: 1 # 每个消费者未确认的消息最大数量

默认情况下prefetch的值为250,即消费者最多同时接收250条消息,并在消费一条或多条之后统一给RabbitMQ返回ack应答消息

队列

同入门示例,再建立一个队列

  1. @Component
  2. public class WorkQueuesConfig {
  3. /**
  4. * 工作队列配置
  5. */
  6. @Bean
  7. public Queue workQueue() {
  8. return new Queue("work-queues");
  9. }
  10. }

消费者

创建两个消费者

  1. @Component
  2. @RabbitListener(queues = "work-queues")
  3. public class WorkQueuesReceiver1 {
  4. @RabbitHandler
  5. public void receive(Integer msg) {
  6. try {
  7. Thread.sleep(1000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println("---Received1:" + msg);
  12. }
  13. }
  1. @Component
  2. @RabbitListener(queues = "work-queues")
  3. public class WorkQueuesReceiver2 {
  4. @RabbitHandler
  5. public void receive(Integer msg) {
  6. try {
  7. Thread.sleep(5000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println("===Received2:" + msg);
  12. }
  13. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 工作队列 生产者
  11. */
  12. @GetMapping("work-queues")
  13. public Integer workQueues() {
  14. System.out.println("~~~~Sent:" + i);
  15. rabbitTemplate.convertAndSend("work-queues", i);
  16. return i++;
  17. }
  18. }

结果

多次访问http://127.0.0.1:8080/work-queues,看到如下结果

prefetch=250

消费者平均分配消息

  1. ~~~~Sent:1
  2. ~~~~Sent:2
  3. ~~~~Sent:3
  4. ~~~~Sent:4
  5. ---Received1:1
  6. ---Received1:3
  7. ===Received2:2
  8. ===Received2:4

prefetch=1

消费快的消费者消费更多消息

  1. ~~~~Sent:1
  2. ~~~~Sent:2
  3. ~~~~Sent:3
  4. ~~~~Sent:4
  5. ---Received1:1
  6. ---Received1:3
  7. ---Received1:4
  8. ===Received2:2

Publish/Subscribe发布/订阅

RabbitMQ消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者经常甚至根本不知道消息是否会被传送到任何队列。

相反,生产者只能将消息发送到交换机exchange,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

交换机总共有以下类型:

  • 直接direct
  • 主题topic
  • 标题headers
  • 扇出fanout

本教程的前面部分生产者并未设置交换机,但仍然能够将消息发送到队列。之前能实现的原因是因为使用的是默认交换,即空字符串"",详见convertAndSend源码

发布订阅图示如下:

  1. 扇出fanout:发布订阅模式需要使用扇出交换机,扇出交换机非常简单,它将收到的所有消息广播到它绑定的所有队列。
  2. 临时队列AnonymousQueue:每当连接到RabbitMQ时,需要一个全新的空队列,为此可以创建一个具有随机名称的队列,其次一旦断开了消费者的连接,队列将被自动删除。
  3. 绑定Binding:其实是交换机和队列之间的桥梁,它告诉交换机和哪个队列进行了绑定。

交换机和队列

  1. @Component
  2. public class PublishSubscribeConfig {
  3. /**
  4. * 声明一个扇出类型的交换机
  5. */
  6. @Bean
  7. public FanoutExchange fanout() {
  8. // 指定交换机名称为:fanout,可自定义
  9. return new FanoutExchange("fanout");
  10. }
  11. /**
  12. * 声明队列
  13. */
  14. @Bean
  15. public Queue autoDeleteQueue1() {
  16. return new AnonymousQueue();
  17. }
  18. /**
  19. * 声明一个绑定关系,将队列绑定到交换机
  20. */
  21. @Bean
  22. public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
  23. return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
  24. }
  25. // 下同
  26. @Bean
  27. public Queue autoDeleteQueue2() {
  28. return new AnonymousQueue();
  29. }
  30. @Bean
  31. public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
  32. return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
  33. }
  34. }

消费者

  1. @RabbitListener注解可以直接作用在方法上,并处理消息,不需要@RabbitHandler注解。适用于队列内消息对象类型只有一种时使用
  2. 上文中使用AnonymousQueue声明随机名称队列,所以注解内使用表达式获取队列名称
  1. @Component
  2. public class PublishSubscribeReceiver {
  3. @RabbitListener(queues = "#{autoDeleteQueue1.name}")
  4. public void receive1(Integer msg) {
  5. System.out.println("===Received1:" + msg);
  6. }
  7. @RabbitListener(queues = "#{autoDeleteQueue2.name}")
  8. public void receive2(Integer msg) {
  9. System.out.println("===Received2:" + msg);
  10. }
  11. }

生产者

生产者发送消息时需要指定交换机,但是不能指定队列,所以使用""

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 发布订阅 生产者
  11. */
  12. @GetMapping("publish-subscribe")
  13. public Integer publishSubscribe() {
  14. System.out.println("~~~~Sent:" + i);
  15. rabbitTemplate.convertAndSend(
  16. // 指定交换机名称
  17. "fanout",
  18. // 不指定队列名称
  19. "", i);
  20. return i++;
  21. }
  22. }

结果

多次访问http://127.0.0.1:8080/publish-subscribe,看到如下结果

  1. ~~~~Sent:1
  2. ===Received2:1
  3. ===Received1:1
  4. ~~~~Sent:2
  5. ===Received2:2
  6. ===Received1:2
  7. ~~~~Sent:3
  8. ===Received1:3
  9. ===Received2:3

Routing路由

本节将添加一些特别的功能:比方说只让某个消费者订阅发布的部分消息。例如只把严重错误消息定向存储到日志文件,同时仍然能够在控制台上打印所有日志消息。

绑定

绑定是交换器和队列之间的关系。这可以简单地理解为:队列只对它绑定的交换机的消息感兴趣。绑定可以采用额外的绑定键参数。Spring AMQP使用了一个fluent API来让这种关系非常清晰。将交换机和队列传入BindingBuilder并简单地用绑定键将队列绑定到交换机,如下所示:

  1. @Bean
  2. public Binding binding(DirectExchange direct, Queue autoDeleteQueue3) {
  3. return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("error");
  4. }

绑定键的含义取决于交换机类型。以前使用的扇形交换,完全忽略了它的价值。

直接交换

上一节中的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。扇出这种交换类型并不能带来很大的灵活性,它只能进行无意识的广播,在这里将使用direct(直连)这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。

在上面这张图中,可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green。在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为blackgreen的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。

多重绑定

如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是它表现的就和fanout类似了,就跟广播差不多,如上图所示。

综合

直接交换多重绑定放在一起,如上图所示。

交换机和队列

  1. @Component
  2. public class RoutingConfig {
  3. /**
  4. * 声明一个直连类型的交换机
  5. */
  6. @Bean
  7. public DirectExchange direct() {
  8. return new DirectExchange("direct");
  9. }
  10. /**
  11. * 声明队列
  12. *
  13. * @return
  14. */
  15. @Bean
  16. public Queue autoDeleteQueue3() {
  17. return new AnonymousQueue();
  18. }
  19. /**
  20. * 声明一个绑定关系,将队列绑定到交换机,并指定要监听的 routingKey
  21. */
  22. @Bean
  23. public Binding binding3a(DirectExchange direct, Queue autoDeleteQueue3) {
  24. return BindingBuilder.bind(autoDeleteQueue3).to(direct).with("error");
  25. }
  26. // 下同
  27. @Bean
  28. public Queue autoDeleteQueue4() {
  29. return new AnonymousQueue();
  30. }
  31. @Bean
  32. public Binding binding4a(DirectExchange direct, Queue autoDeleteQueue4) {
  33. return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("info");
  34. }
  35. @Bean
  36. public Binding binding4b(DirectExchange direct, Queue autoDeleteQueue4) {
  37. return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("warning");
  38. }
  39. @Bean
  40. public Binding binding4c(DirectExchange direct, Queue autoDeleteQueue4) {
  41. return BindingBuilder.bind(autoDeleteQueue4).to(direct).with("error");
  42. }
  43. }

消费者

  1. @Component
  2. public class RoutingReceiver {
  3. @RabbitListener(queues = "#{autoDeleteQueue3.name}")
  4. public void receive1(String msg) {
  5. System.out.println("===Received1:" + msg);
  6. }
  7. @RabbitListener(queues = "#{autoDeleteQueue4.name}")
  8. public void receive2(String msg) {
  9. System.out.println("===Received2:" + msg);
  10. }
  11. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 路由 生产者
  7. */
  8. @GetMapping("routing")
  9. public void routing() {
  10. String[] keys = {"debug", "info", "warning", "error"};
  11. for (String key : keys) {
  12. // 发送四种类型的消息日志
  13. rabbitTemplate.convertAndSend("direct", key, key);
  14. System.out.println("~~~~Sent:" + key);
  15. }
  16. }
  17. }

结果

多次访问http://127.0.0.1:8080/routing,看到如下结果

  1. ~~~~Sent:debug
  2. ~~~~Sent:info
  3. ~~~~Sent:warning
  4. ~~~~Sent:error
  5. ===Received1:error
  6. ===Received2:info
  7. ===Received2:warning
  8. ===Received2:error

Topic主题

尽管使用直连交换机改进了系统,但是它仍然存在局限性:它不能基于多个标准进行路由。比如:接收的日志类型有info.baseinfo.advantage,某个队列只想接收info.base的消息,那这个时候直连就办不到了。这个时候就只能使用Topic(主题)类型

topic交换机的routing_key编写规则

topic交换机的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以.分隔开。这些单词可以是任意单词,比如说:stock.usd.nysenyse.vmwquick.orange.rabbit这种类型的。当然这个单词列表最多不能超过255个字节。

在这个规则列表中,有两个替换符:

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

案例

上图是一个队列绑定关系图,他们之间数据接收情况如下:

  • quick.orange.rabbit被队列Q1 Q2接收到
  • lazy.orange.elephant被队列Q1 Q2接收到
  • quick.orange.fox被队列Q1接收到
  • lazy.brown.fox被队列Q2接收到
  • lazy.pink.rabbit虽然满足两个绑定但只被队列Q2接收一次
  • quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
  • quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
  • lazy.orange.male.rabbit是四个单词但匹配Q2

当队列绑定关系是下列这种情况时需要引起注意

  1. 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout
  2. 如果队列绑定键当中没有#*出现,那么该队列绑定类型就是direct

交换机和队列

  1. @Component
  2. public class TopicConfig {
  3. /**
  4. * 声明一个主题类型的交换机
  5. */
  6. @Bean
  7. public TopicExchange topic() {
  8. return new TopicExchange("topic");
  9. }
  10. /**
  11. * 声明队列
  12. */
  13. @Bean
  14. public Queue autoDeleteQueue5() {
  15. return new AnonymousQueue();
  16. }
  17. @Bean
  18. public Queue autoDeleteQueue6() {
  19. return new AnonymousQueue();
  20. }
  21. /**
  22. * 声明绑定关系,将队列绑定到交换机,并指定要监听的 routingKey
  23. */
  24. @Bean
  25. public Binding binding5a(TopicExchange topic, Queue autoDeleteQueue5) {
  26. return BindingBuilder.bind(autoDeleteQueue5).to(topic).with("*.orange.*");
  27. }
  28. @Bean
  29. public Binding binding6a(TopicExchange topic, Queue autoDeleteQueue6) {
  30. return BindingBuilder.bind(autoDeleteQueue6).to(topic).with("*.*.rabbit");
  31. }
  32. @Bean
  33. public Binding binding6b(TopicExchange topic, Queue autoDeleteQueue6) {
  34. return BindingBuilder.bind(autoDeleteQueue6).to(topic).with("lazy.#");
  35. }
  36. }

消费者

  1. @Component
  2. public class TopicReceiver {
  3. @RabbitListener(queues = "#{autoDeleteQueue5.name}")
  4. public void receive1(String msg) {
  5. System.out.println("===Received1:" + msg);
  6. }
  7. @RabbitListener(queues = "#{autoDeleteQueue6.name}")
  8. public void receive2(String msg) {
  9. System.out.println("===Received2:" + msg);
  10. }
  11. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 主题 生产者
  7. */
  8. @GetMapping("topic")
  9. public void topic() {
  10. String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox",
  11. "lazy.pink.rabbit", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"};
  12. for (String key : keys) {
  13. rabbitTemplate.convertAndSend("topic", key, key);
  14. System.out.println("~~~~Sent:" + key);
  15. }
  16. }
  17. }

结果

多次访问http://127.0.0.1:8080/topic,看到如下结果

  1. ~~~~Sent:quick.orange.rabbit
  2. ~~~~Sent:lazy.orange.elephant
  3. ~~~~Sent:quick.orange.fox
  4. ~~~~Sent:lazy.brown.fox
  5. ~~~~Sent:lazy.pink.rabbit
  6. ~~~~Sent:quick.brown.fox
  7. ~~~~Sent:quick.orange.male.rabbit
  8. ~~~~Sent:lazy.orange.male.rabbit
  9. ===Received1:quick.orange.rabbit
  10. ===Received2:quick.orange.rabbit
  11. ===Received1:lazy.orange.elephant
  12. ===Received2:lazy.orange.elephant
  13. ===Received1:quick.orange.fox
  14. ===Received2:lazy.brown.fox
  15. ===Received2:lazy.pink.rabbit
  16. ===Received2:lazy.orange.male.rabbit

进阶使用1

如图,一条消息完整的流程分为:

  1. 生产者发布
  2. RabbitMQ缓存
  3. 消费者消费

以上每一个步骤都会出现消息丢失的情况,所以需要进行消息确认

Publisher Confirms发布确认

生产者在发送消息时,如果发送到错误的交换机,或者没有队列可以处理该消息,生产者应当知道消息未发送成功。需要对生产者进行配置。

队列、交换机

无需额外配置。这里额外介绍使用Builder创建队列和交换机以及使用new创建绑定关系

  1. @Component
  2. public class PublisherConfirmsConfig {
  3. /**
  4. * 声明交换机
  5. */
  6. @Bean
  7. public DirectExchange publisherConfirmsExchange() {
  8. // return new DirectExchange("direct");
  9. // 也可以使用Builder模式创建
  10. return ExchangeBuilder
  11. // 使用直连交换机
  12. .directExchange("publisher.confirms.exchange").build();
  13. }
  14. /**
  15. * 声明确认队列
  16. */
  17. @Bean
  18. public Queue publisherConfirmsQueue() {
  19. // return new AnonymousQueue();
  20. // 也可以使用Builder模式创建
  21. return QueueBuilder
  22. // 使用消息持久化,不使用nonDurable(final String name),使用随机队列名称
  23. .nonDurable()
  24. // 队列自动删除
  25. .autoDelete().build();
  26. }
  27. /**
  28. * 声明确认队列绑定关系
  29. */
  30. @Bean
  31. public Binding queueBinding(DirectExchange publisherConfirmsExchange, Queue publisherConfirmsQueue) {
  32. // return BindingBuilder.bind(publisherConfirmsQueue).to(publisherConfirmsExchange).with("key1");
  33. // 也可以使用 new 方法创建绑定关系
  34. return new Binding(publisherConfirmsQueue.getName(), Binding.DestinationType.QUEUE,
  35. publisherConfirmsExchange.getName(), "key1", null);
  36. }
  37. }

消费者

无需额外配置

  1. @Component
  2. public class PublisherConfirmsReceiver {
  3. @RabbitListener(queues = "#{publisherConfirmsQueue.name}")
  4. public void receiveMsg(String msg) {
  5. System.out.println("===Received:" + msg);
  6. }
  7. }

生产者

配置:生产者的yml需要添加如下配置

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: correlated # 设置发布确认模式(针对交换机)
  4. publisher-returns: true # 设置发布退回(针对队列)

回调:新建回调类,编写回调方法并注入RabbitTemplate

  1. @Component
  2. public class RabbitTemplateCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  3. /**
  4. * 交换机是否收到消息的一个回调方法
  5. *
  6. * @param correlationData
  7. * 消息相关数据
  8. * @param ack
  9. * 交换机是否收到消息
  10. */
  11. @Override
  12. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  13. String id = correlationData != null ? correlationData.getId() : "";
  14. if (ack) {
  15. System.out.println("交换机已经收到消息 id 为:" + id);
  16. } else {
  17. System.out.println("交换机还未收到消息 id 为:" + id + ",由于原因:" + cause);
  18. }
  19. }
  20. /**
  21. * 队列未接收到消息的时候的回调方法
  22. *
  23. * @param message
  24. * 返回的数据
  25. */
  26. @Override
  27. public void returnedMessage(ReturnedMessage message) {
  28. System.out.println("消息:" + new String(message.getMessage().getBody()) + "\t被交换机:" + message.getExchange()
  29. + "退回\t退回原因:" + message.getReplyText() + "\t路由key:" + message.getRoutingKey());
  30. }
  31. @Autowired
  32. private RabbitTemplate rabbitTemplate;
  33. /**
  34. * 在依赖注入 rabbitTemplate 之后再设置它的回调对象
  35. */
  36. @PostConstruct
  37. public void init() {
  38. rabbitTemplate.setConfirmCallback(this);
  39. rabbitTemplate.setReturnsCallback(this);
  40. // 同配置文件中的 spring.rabbitmq.publisher-returns=true
  41. // rabbitTemplate.setMandatory(true);
  42. }
  43. }

发送:发送消息时,额外携带CorrelationData对象并设置对象id,方便回调时知道是哪一条消息失败

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 发布确认生产者
  7. */
  8. @GetMapping("publisherConfirms")
  9. public void publisherConfirms() {
  10. rabbitTemplate.convertAndSend("publisher.confirms.exchange", "key1", "message", new CorrelationData("1"));
  11. rabbitTemplate.convertAndSend("publisher.confirms.exchange1", "key1", "message", new CorrelationData("2"));
  12. rabbitTemplate.convertAndSend("publisher.confirms.exchange", "key2", "message", new CorrelationData("3"));
  13. }
  14. }

结果

访问http://127.0.0.1:8080/publisherConfirms,看到如下结果

  1. 交换机已经收到消息 id 为:1
  2. 2021-10-23 20:48:49.177 ERROR 19532 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'publisher.confirms.exchange1' in vhost '/', class-id=60, method-id=40)
  3. ===Received:message
  4. 交换机还未收到消息 id 为:2,由于原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'publisher.confirms.exchange1' in vhost '/', class-id=60, method-id=40)
  5. 消息:message 被交换机:publisher.confirms.exchange退回 退回原因:NO_ROUTE 路由keykey2
  6. 交换机已经收到消息 id 为:3
  • 如果发送到了错误的交换机,系统会记录ERROR日志,且confirmackfalse
  • 如果发送到了错误的队列,系统不会有记录,配置的returnedMessage会收到消息,但是confirmacktrue

消息持久化

当消息发送到队列,但是未被消费时,需要将消息存储在磁盘中,以防止RabbitMQ服务宕机造成消息丢失

  • new Queue()方式:使用有参构造器时设置boolean durable参数,true为磁盘存储,false为内存存储
  • QueueBuilder方式:使用QueueBuilder.durable()设置为磁盘存储,QueueBuilder.nonDurable()设置为内存存储

设置消息持久化的队列,在RabbitMQ控制面板Features中会显示为D

ConsumerAcknowledgements消费者确认

ACK确认模式

通过spring.rabbitmq.listener.simple.acknowledge-modespring.rabbitmq.listener.direct.acknowledge-mode进行设置

  1. none不确认
    • 默认所有消息消费成功,队列会不断的向消费者推送消息
    • 因为RabbitMQ认为所有消息都被消费成功,所以消息存在丢失的危险
  2. auto自动确认(自动)
    • Spring依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。如果消息已经被处理,但后续代码抛出异常,使用Spring进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
    • 使用自动确认模式时,需要考虑的另一件事是消费者过载
  3. manual手动确认
    • 手动确认则当消费者调用acknackreject几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被ACK则会发送到下一个消费者
    • 手动确认模式可以使用prefetch,限制通道上未完成的(“正在进行中的”)发送的数量
    • 忘记ACK确认
      忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。只要程序还在运行,没确认的消息就一直是Unacked状态,无法被RabbitMQ重新投递。RabbitMQ消息消费并没有超时机制,也就是说,程序不重启,消息就永远是Unacked状态。处理运维事件时不要忘了这些Unacked状态的消息。当程序关闭时(实际只要 消费者 关闭就行),消息会恢复为 Ready 状态。

消息应答的方法

  1. Channel.basicAck(用于肯定确认)RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)与Channel.basicNack相比少一个参数

multipletruefalse

  • true代表批量应答channel上未应答的消息:比如channel上有传送tag的消息5,6,7,8当前tag8那么此时5-8的这些还未应答的消息都会被确认收到消息应答
  • false同上面相比只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答

队列

  1. @Component
  2. public class ConsumerAcknowledgementsConfig {
  3. /**
  4. * 声明确认队列
  5. */
  6. @Bean
  7. public Queue consumerAcknowledgementsQueue() {
  8. return new Queue("consumer.acknowledgements");
  9. }
  10. }

消费者

yml配置

yml

  1. spring:
  2. rabbitmq:
  3. listener:
  4. type: simple # 默认
  5. simple:
  6. acknowledge-mode: manual

手动应答

  1. @Component
  2. public class ConsumerAcknowledgementsReceiver {
  3. @RabbitListener(queues = "consumer.acknowledgements")
  4. public void receiveMsg(Channel channel, Message message, Integer msg) {
  5. System.out.println("===Received:start:" + msg);
  6. try {
  7. System.out.println("sleeping");
  8. Thread.sleep(5000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println("===Received:end:" + msg);
  13. // 手动ACK
  14. // 默认情况下如果一个消息被消费者正确接收则会被从队列中移除
  15. // 如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存
  16. // 当有消费者订阅时则会立即发送,当消息被消费者正确接收时,就会被从队列中移除
  17. try {
  18. // 手动ack应答
  19. // 告诉服务器收到这条消息已经被消费了,可以在队列中删掉
  20. // 否则消息服务器以为这条消息没处理掉,后续还会在发
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. // 丢弃这条消息
  25. try {
  26. // 消息重新入队
  27. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  28. // 消息丢弃
  29. // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  30. // 多一个批量参数
  31. // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  32. } catch (IOException e1) {
  33. e1.printStackTrace();
  34. }
  35. }
  36. }
  37. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 消费确认生产者
  11. */
  12. @GetMapping("consumerAcknowledgements")
  13. public Integer consumerAcknowledgements() {
  14. rabbitTemplate.convertAndSend("consumer.acknowledgements", i);
  15. return i++;
  16. }
  17. }

进阶使用2

死信队列

死信:就是无法被消费的消息。一般来说,生产者将消息投递到交换机或者直接到队列,消费者从队列取出消息进行消费,但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有比如用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

队列和交换机

  1. @Component
  2. public class DeadExchangeConfig {
  3. /**
  4. * 声明交换机
  5. */
  6. @Bean
  7. public DirectExchange deadExchange() {
  8. return new DirectExchange("dead.exchange");
  9. }
  10. @Bean
  11. public DirectExchange normalExchange() {
  12. return new DirectExchange("normal.exchange");
  13. }
  14. /**
  15. * 声明队列
  16. */
  17. @Bean
  18. public Queue deadQueue() {
  19. return new Queue("dead.queue");
  20. }
  21. @Bean
  22. public Queue normalQueue() {
  23. // 绑定死信队列信息
  24. Map<String, Object> params = new HashMap<>();
  25. // 设置死信 exchange 参数 key 是固定值
  26. params.put("x-dead-letter-exchange", "dead.exchange");
  27. // 设置死信 routing-key 参数 key 是固定值
  28. params.put("x-dead-letter-routing-key", "key2");
  29. // 设置队列长度限制
  30. params.put("x-max-length", 5);
  31. return new Queue("normal.queue", true, true, false, params);
  32. }
  33. /**
  34. * 声明队列和交换机绑定关系
  35. */
  36. @Bean
  37. public Binding normalQueueBinding(DirectExchange normalExchange, Queue normalQueue) {
  38. return BindingBuilder.bind(normalQueue).to(normalExchange).with("key1");
  39. }
  40. @Bean
  41. public Binding deadQueueBinding(DirectExchange deadExchange, Queue deadQueue) {
  42. return BindingBuilder.bind(deadQueue).to(deadExchange).with("key2");
  43. }
  44. }

消费者

消费者可以暂时不启用,以观察消息的进入死信交换机和队列

  1. // @Component
  2. public class DeadExchangeReceiver {
  3. @RabbitListener(queues = "dead.queue")
  4. public void receiveMsg(String msg) {
  5. System.out.println("Dead===Received:" + msg);
  6. }
  7. }

生产者

发送消息时,需要设置超时时间

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 死信队列生产者
  7. */
  8. @GetMapping("deadQueue")
  9. public void deadQueue() {
  10. for (int j = 0; j < 10; j++) {
  11. rabbitTemplate.convertAndSend("normal.exchange", "key1", j,
  12. // 设置消息过期时间(单位:毫秒)
  13. correlationData -> {
  14. correlationData.getMessageProperties().setExpiration("10000");
  15. return correlationData;
  16. });
  17. }
  18. }
  19. }

结果

访问http://127.0.0.1:8080/deadQueue,然后查看RabbitMQ控制台

  • 5条消息进入普通队列,另外5条消息因为队列长度不够,进入死信队列
  • 当超过10秒之后,因普通队列无消费者,所有消息进入死信队列

延迟交换机

延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列需要安装rabbitmq_delayed_message_exchange插件

在这里新增了一个队列delayed.queue,一个自定义交换机delayed.exchange,绑定关系如下:

交换机和队列

  1. @Component
  2. public class DelayedExchangeConfig {
  3. /**
  4. * 延迟交换机
  5. */
  6. @Bean
  7. public CustomExchange delayedExchange() {
  8. Map<String, Object> args = new HashMap<>();
  9. // 自定义交换机的类型
  10. args.put("x-delayed-type", "direct");
  11. return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
  12. }
  13. /**
  14. * 队列
  15. */
  16. @Bean
  17. public Queue delayedQueue() {
  18. return new Queue("delayed.queue");
  19. }
  20. /**
  21. * 绑定关系
  22. */
  23. @Bean
  24. public Binding bindingDelayedQueue(Queue delayedQueue, CustomExchange delayedExchange) {
  25. return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routingKey").noargs();
  26. }
  27. }

消费者

  1. @Component
  2. public class DelayedExchangeReceiver {
  3. @RabbitListener(queues = "delayed.queue")
  4. public void receiveDelayedQueue(Integer message) {
  5. System.out.println("当前时间:" + LocalDateTime.now() + "\t收到延时队列的消息:" + message);
  6. }
  7. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 延时队列生产者
  11. */
  12. @GetMapping("delayedQueue/{delayTime}")
  13. public Integer delayedQueue(@PathVariable Integer delayTime) {
  14. rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", i,
  15. // 设置消息延时时间
  16. correlationData -> {
  17. correlationData.getMessageProperties().setDelay(delayTime);
  18. return correlationData;
  19. });
  20. System.out.println("当前时间:" + LocalDateTime.now() + "\t发送延时队列的消息:" + i + "\t延时" + delayTime + "毫秒");
  21. return i++;
  22. }
  23. }

结果

访问http://127.0.0.1:8080/delayedQueue/5000,输出如下

  1. 当前时间:2021-10-26T23:08:20.173754 发送延时队列的消息:1 延时5000毫秒
  2. 当前时间:2021-10-26T23:08:25.176630500 收到延时队列的消息:1

备份交换机

备份交换机可以理解为RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

交换机和队列

  1. @Component
  2. public class BackupExchangeConfig {
  3. /**
  4. * 声明确认交换机
  5. */
  6. @Bean
  7. public DirectExchange confirmExchange() {
  8. ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange("confirm.exchange").durable(true)
  9. // 设置该交换机的备份交换机
  10. .withArgument("alternate-exchange", "backup.exchange");
  11. return exchangeBuilder.build();
  12. }
  13. /**
  14. * 声明备份交换机
  15. */
  16. @Bean
  17. public FanoutExchange backupExchange() {
  18. return new FanoutExchange("backup.exchange");
  19. }
  20. /**
  21. * 声明队列
  22. */
  23. @Bean
  24. public Queue confirmQueue() {
  25. return QueueBuilder.durable("confirm.queue").build();
  26. }
  27. @Bean
  28. public Queue backQueue() {
  29. return QueueBuilder.durable("backup.queue").build();
  30. }
  31. @Bean
  32. public Queue warningQueue() {
  33. return QueueBuilder.durable("warning.queue").build();
  34. }
  35. /**
  36. * 声明绑定关系
  37. */
  38. @Bean
  39. public Binding confirmBinding(Queue confirmQueue, DirectExchange confirmExchange) {
  40. return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1");
  41. }
  42. @Bean
  43. public Binding warningBinding(Queue warningQueue, FanoutExchange backupExchange) {
  44. return BindingBuilder.bind(warningQueue).to(backupExchange);
  45. }
  46. @Bean
  47. public Binding backupBinding(Queue backQueue, FanoutExchange backupExchange) {
  48. return BindingBuilder.bind(backQueue).to(backupExchange);
  49. }
  50. }

消费者

  1. @Component
  2. public class BackupExchangeReceiver {
  3. @RabbitListener(queues = "confirm.queue")
  4. public void receiveConfirmMsg(Integer message) {
  5. System.out.println("收到一般消息" + message);
  6. }
  7. @RabbitListener(queues = "warning.queue")
  8. public void receiveWarningMsg(Integer message) {
  9. System.out.println("报警发现不可路由消息:" + message);
  10. }
  11. }

生产者

  1. @RestController
  2. public class IndexController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. /**
  6. * 测试用的标记序号
  7. */
  8. private static int i = 1;
  9. /**
  10. * 备份交换机生产者
  11. */
  12. @GetMapping("backupExchange")
  13. public Integer backupExchange() {
  14. // 让消息绑定一个 id 值
  15. CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
  16. rabbitTemplate.convertAndSend("confirm.exchange", "key1", i, correlationData1);
  17. System.out.println("发送消息 id 为:" + correlationData1.getId() + "\t内容为:" + i);
  18. CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
  19. rabbitTemplate.convertAndSend("confirm.exchange", "key2", i, correlationData2);
  20. System.out.println("发送消息 id 为:" + correlationData2.getId() + "\t内容为:" + i);
  21. return i++;
  22. }
  23. }

结果

访问http://127.0.0.1:8080/delayedExchange,结果如下:

  1. 发送消息 id 为:9d993eb3-dcf3-424f-9d1b-bc4a2518bb8c 内容为:2
  2. 发送消息 id 为:0e3a2ad3-4a54-4c3e-a866-cb96bc59fd61 内容为:2
  3. 收到一般消息2
  4. 报警发现不可路由消息:2

263

全部评论