五、Spring 整合 RabbitMQ
需求:使用 Spring 整合 RabbitMQ
5.1、搭建生产者工程
1、创建模块,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.12.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.12.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <source>1.8</source> </configuration> </plugin> </plugins> </build>
|
2、编写配置文件
在 resource 目录下创建配置文件 rabbitmq.properties
,添加以下配置
1 2 3 4 5
| rabbitmq.host=192.168.56.101 rabbitmq.port=5672 rabbitmq.username=admin rabbitmq.password=admin rabbitmq.virtual-host=/wuhu
|
创建 Spring
核心配置文件 spring-rabbitmq-producer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1"/> <rabbit:binding queue="spring_fanout_queue_2"/> </rabbit:bindings> </rabbit:fanout-exchange>
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/> <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/> <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/> <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/> <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>
|
3、创建测试类测试
- 使用
RabbitTemplate
发送消息,简单模式的 routing key 必须和队列名称相同
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHelloWorld() { rabbitTemplate.convertAndSend("spring_queue","hello,world Spring...!"); } }
|
启动 testHelloWorld
方法

- 使用
RabbitTemplate
发送消息,发送消息类型为 Fanout
广播类型
1 2 3 4
| @Test public void testFanout() { rabbitTemplate.convertAndSend("spring_fanout_exchange","","Spring fanout...!"); }
|
查看 RabbitMQ
控制台

- 使用
RabbitTemplate
发送消息,发送消息类型为 Topic
广播类型
1 2 3 4 5
| @Test public void testTopic() { rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","Spring Topic...!"); }
|
运行方法后,可以看到只有well队列收到消息,这是因为其他两个队列的通配符不匹配

三个队列的匹配规则如下

5.2、搭建消费者工程
1、创建模块,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.12.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.2.12.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <source>1.8</source> </configuration> </plugin> </plugins> </build>
|
2、编写配置文件
在 resource 目录下创建配置文件 rabbitmq.properties
,添加以下配置
1 2 3 4 5
| rabbitmq.host=192.168.56.101 rabbitmq.port=5672 rabbitmq.username=admin rabbitmq.password=admin rabbitmq.virtual-host=/wuhu
|
创建 Spring
核心配置文件 spring-rabbitmq-consumer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.hzx.rabbitmq.listener.SpringQueueListener"/> <bean id="fanoutListener1" class="com.hzx.rabbitmq.listener.FanoutListener1"/> <bean id="fanoutListener2" class="com.hzx.rabbitmq.listener.FanoutListener2"/> <bean id="topicListenerStar" class="com.hzx.rabbitmq.listener.TopicListenerStar"/> <bean id="topicListenerWell" class="com.hzx.rabbitmq.listener.TopicListenerWell"/> <bean id="topicListenerWell2" class="com.hzx.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/> <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/> <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/> <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/> </rabbit:listener-container> </beans>
|
3、创建消息监听器
创建的监听器必须和上面 bean 中配置的class路径一致,创建的监听器必须实现一个 MessageListener
接口,实现 onMessage
方法
创建第一个监听器
1 2 3 4 5 6
| public class SpringQueueListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println("收到的消息为:" + new String(message.getBody()); } }
|
由于广播队列二的监听器代码差不多,所以不再赘述
1 2 3 4 5 6
| public class FanoutListener1 implements MessageListener { @Override public void onMessage(Message message) { System.out.println("广播队列一收到的消息为:" + new String(message.getBody())); } }
|
1 2 3 4 5 6
| public class TopicListenerStar implements MessageListener { @Override public void onMessage(Message message) { System.out.println("topic队列一收到的消息为:" + new String(message.getBody())); } }
|
4、使用测试类进行测试
编写一个测试类,只需要加载Spring配置文件和监听器,消费者即可收到信息
1 2 3 4 5 6 7 8 9 10
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void testListener() { while(true) { } } }
|
查看结果

5、小结
消费者只需要在配置文件中使用bean标签配置监听器对象并创建监听器类即可。
监听器类必须实现 MessageListener
接口,重写 onMessage
方法
六、Spring Boot整合RabbitMQ
6.1、生产者
1、整合步骤
2、引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
|
3、编写主启动类和配置类
下面给出配置类的代码
使用 ExchangeBuilder
.topicExchange(交换机名)来创建一个通配符交换机,使用.durable(true)方法来持久化该交换机,最终使用build()方法创建该交换机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_topic_queue";
@Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME) .durable(true) .build(); } }
|
使用QueueBuilder
的durable方法持久化队列,使用build方法创建该队列
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_topic_queue";
@Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } }
|
- 在配置类中绑定队列和交换机,并指定routing key
其中 bind 方法中填写要绑定的队列,to方法添加要绑定的交换机,with方法指定 routing key,noargs表示没有其他参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_topic_queue";
@Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with("boot.#") .noargs(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.hzx.config;
import com.rabbitmq.client.AMQP; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_topic_queue";
@Bean("bootExchange") public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME) .durable(true) .build(); }
@Bean("bootQueue") public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); }
@Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with("boot.#") .noargs(); } }
|
4、编写测试类代码,发送消息
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate;
public void testSend() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.wuhu","Spring Boot With MQ 芜湖"); } }
|
- 运行上述的
testSend
方法,然后查看 MQ 控制台

- 在 Queues 中点击
getMessages
可以查看到我们发送的消息

6.2、消费者
1、整合步骤
- 创建工程
- 引入依赖
- 编写 配置文件,配置基本信息
- 定义监听类,使用
@RabbitListener
注解完成队列监听
2、创建工程,引入依赖
1 2 3 4 5 6 7 8 9 10 11 12
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
3、创建监听器类
使用 @RabbitListener
注解完成队列监听
1 2 3 4 5 6 7 8 9 10 11 12
| @Component public class RabbitMQListener {
@RabbitListener(queues = "boot_topic_queue") public void ListenQueue(Message message) { System.out.println("接收到的消息为:" + message); } }
|
4、启动主启动类,查看结果
消费者端接收到了消息

6.3、Spring Boot整合MQ–Fanout模式
创建一个生产者工程
1、通过 Spring Initializr
快速构建一个Spring Boot项目


2、编写 yml
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: rabbitmq: host: 192.168.56.101 port: 5672 username: admin password: admin virtual-host: / server: port: 8080
|
3、编写 OrderService
服务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate;
public void makeOrder(String userId,String productId,int nums) { String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String exchangeName = "fanout_order_exchange"; String routingKey = ""; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); } }
|
4、编写配置类,创建队列与交换机
1 2 3 4 5 6 7 8 9
|
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_order_exchange",true,false); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Bean public Queue smsQueue() { return new Queue("sms.fanout.queue",true); }
@Bean public Queue emailQueue() { return new Queue("email.fanout.queue",true); }
@Bean public Queue duanxinQueue() { return new Queue("duanxin.fanout.queue",true); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); }
@Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); }
@Bean public Binding duanxinBinding() { return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Configuration public class RabbitMQConfiguration {
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_order_exchange",true,false); }
@Bean public Queue smsQueue() { return new Queue("sms.fanout.queue",true); }
@Bean public Queue emailQueue() { return new Queue("email.fanout.queue",true); }
@Bean public Queue duanxinQueue() { return new Queue("duanxin.fanout.queue",true); }
@Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); }
@Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); }
@Bean public Binding duanxinBinding() { return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()); } }
|
5、在测试类中发送消息
1 2 3 4 5 6 7
| @Autowired private OrderService orderService;
@Test void contextLoads() { orderService.makeOrder("1","1",12); }
|

可以看到创建的三个队列都有消息等待消费

6、构建项目
使用 Spring Initializr
创建项目,引入依赖
7、编写 yml
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: rabbitmq: host: 192.168.56.101 port: 5672 username: admin password: admin virtual-host: / server: port: 8082
|
8、创建Consumer类,绑定队列接收消息
- 创建
FanoutDuanxinConsumer
类,绑定 duanxin.fanout.queue
队列
使用 @RabbitListener
的queues属性绑定队列
1 2 3 4 5 6 7 8
| @Service @RabbitListener(queues = "duanxin.fanout.queue") public class FanoutDuanxinConsumer { @RabbitHandler public void receiveMessage(String message) { System.out.println("duanxin fanout ---接收到的广播信息为:" + message); } }
|
在需要接收消息的方法上使用 @RabbitHandler
注解,然后使用 message 参数接收消息
9、启动消费者主启动类,查看结果
可以看到收到了消息

6.4、Spring Boot整合MQ–Direct模式
1、创建 DirectRabbitMQConfiguration
配置类
创建交换机和队列,完成绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Configuration public class DirectRabbitMQConfiguration {
@Bean public DirectExchange directExchange() { return new DirectExchange("direct_order_exchange",true,false); }
@Bean public Queue smsDirectQueue() { return new Queue("sms.direct.queue",true); }
@Bean public Queue emailDirectQueue() { return new Queue("email.direct.queue",true); }
@Bean public Queue duanxinDirectQueue() { return new Queue("duanxin.direct.queue",true); }
@Bean public Binding smsBinding() { return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms"); }
@Bean public Binding emailBinding() { return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email"); }
@Bean public Binding duanxinBinding() { return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin"); } }
|
2、发送消息
给短信和email发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void makeOrderDirect(String userId,String productId,int nums) { String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String exchangeName = "direct_order_exchange"; String routingKey = "sms"; rabbitTemplate.convertAndSend(exchangeName,"email",orderId); rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); }
|
3、查看结果
控制台

rabbitMQ
控制台

4、创建Consumer类
以 email.direct.queue
队列为例
1 2 3 4 5 6 7 8
| @Service @RabbitListener(queues = "email.direct.queue") public class DirectEmailConsumer { @RabbitHandler public void receiveMessage(String message) { System.out.println("email direct ---接收到的信息为:" + message); } }
|
5、启动消费者主启动类,查看结果
可以看到只有 email
和 sms
收到了消息

6.5、Spring Boot整合TTL
1、队列过期时间的配置
这个配置类用于创建 TTL 用到的交换机、队列
使用Map来传递参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration public class TTLRabbitMQConfiguration { @Bean public DirectExchange directExchange() { return new DirectExchange("ttl_direct_exchange",true,false); }
@Bean public Queue directTTLQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); return new Queue("ttl.direct.queue",true,false,false,args); } @Bean public Binding directTTLBinding() { return BindingBuilder.bind(directTTLQueue()).to(directExchange()).with("ttl"); } }
|
在生产者端编写方法发送消息
1 2 3 4 5 6 7 8 9 10 11 12
| public void makeOrderDirectTTL(String userId,String productId,int nums) { String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String exchangeName = "ttl_direct_exchange"; String routingKey = "ttl"; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); }
|
可以看到在ttl.direct.queue
队列中出现了一条消息,5s后消失

2、消息过期时间的配置
1 2 3 4 5 6 7 8 9
| @Bean public Queue directTTLMessageQueue() { return new Queue("ttlmsg.direct.queue",true,false,false); }
@Bean public Binding directTTLMessageBinding() { return BindingBuilder.bind(directTTLMessageQueue()).to(directExchange()).with("ttlmsg"); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void makeOrderDirectTTLMsg(String userId,String productId,int nums) { String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String exchangeName = "ttl_direct_exchange"; String routingKey = "ttl"; MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor); }
|
6.6、Spring Boot整合死信队列
1、新增一个配置类,用于创建死信队列和死信交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Bean public DirectExchange deadDirectExchange() { return new DirectExchange("dead_direct_exchange",true,false); }
@Bean public Queue deadQueue() { return new Queue("dead.direct.queue",true); }
|
2、绑定死信队列和死信交换机
1 2 3 4
| @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead"); }
|
3、让正常的TTL队列与死信交换机绑定关系
使用Map传入参数,绑定死信交换机
1 2 3 4 5 6 7 8 9
| @Bean public Queue directTTLQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl",5000); args.put("x-dead-letter-exchange","dead_direct_exchange"); args.put("x-dead-letter-routing-key","dead"); return new Queue("ttl.direct.queue",true,false,false,args); }
|
6.7、小结
- Spring Boot提供了快速整合 Rabbit MQ 的方式
- 基本信息在
yml
配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置 - 生产端直接注入
RabbitTemplate
完成消息发送 - 消费端直接使用
@RabbitListener
注解完成消息接收。
七、RabbitMQ 高级特性
7.1、消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方式希望杜绝任何消息丢失或者投递失败的场景。 RabbitMQ 为我们提供了两种方式来控制消息的投递可靠性模式。
RabbitMQ 整个消息投递的路径为:
producer –> rabbitmq broker
–> exchange –> queue –> consumer
- confirm模式:消息从 producer 到 exchange 则会返回一个
confirmCallback
(不管投递失败与否都会返回,但返回的值有所不同) - return模式:消息从 exchange–>queue 投递失败则会返回一个
returnCallback
(只有投递失败才会返回)
我们可以利用上面的两个 callBack
控制消息的可靠性投递
1、confirm代码演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="test_exchange_confirm" > <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
</beans>
|
确认模式开启步骤:
- 在connection-factory(配置文件)中开启 publisher-confirms=”true”
- 在
rabbitTemplate
中定义ConfirmCallBack
回调函数(函数式编程) - 发送消息,可以看到回调函数被执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testConfirm() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("confirm方法被执行了..."); } });
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm..."); } }
|

2、confirm方法的参数说明
1 2 3 4
| @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了..."); }
|
参数说明
相关配置信息
关键参数,表示 exchange
交换机是否成功收到了消息
true表示成功,false表示失败
失败的原因
1 2 3 4 5 6 7 8 9
| @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了..."); if(ack) { System.out.println("信息成功投递到exchange中!" + cause); } else { System.out.println("信息无法正确投递到exchange中!失败原因:" + cause); } }
|

如果投递失败,以后可能会进行一些处理,保证消息再次投递
3、return代码演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Test public void testReturn() { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("return 方法执行了..."); } });
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm..."); }
|
4、return处理方式说明
- 需要在connection-factory中开启回退模式
1 2 3 4 5 6 7 8
| <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true"/>
|
- 设置交换机处理失败消息的模式,如果没有设置,那么默认丢弃信息,如果我们希望信息处理失败返回提示,那么我们需要设置交换机处理失败消息的方式
1 2
| rabbitTemplate.setMandatory(true);
|
- 将上述代码中的 routing Key 故意写错,重新启动程序查看结果

5、return方法参数说明
1 2 3 4 5 6 7 8 9
| @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 方法执行了..."); System.out.println("消息为:" + new String(message.getBody())); System.out.println("错误码为:" + replyCode); System.out.println("错误文本为:" + replyText); System.out.println("交换机为:" + exchange); System.out.println("路由键为:" + routingKey); }
|
- message:消息对象
replyCode
:错误码replyText
:错误信息- exchange:交换机
routingKey
:路由键
再次执行上面产生错误的代码

6、消息可靠投递小结
- 设置
ConnectionFactory
的publisher-confirms=”true” 开启 确认模式。 - 使用
rabbitTemplate.setConfirmCallback
设置回调函数。
当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
- 设置
ConnectionFactory
的publisher-returns=”true” 开启 回退 模式。 - 使用
rabbitTemplate.setReturnCallback
设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)
参数,则会将消息退回给producer。并执行回调函数returnedMessage
。 - 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
- txSelect(), 用于将当前channel设置成transaction模式
txCommit()
,用于提交事务txRollback()
,用于回滚事务
7.2、Consumer Ack
1、ack说明
ack
指的是 Acknowledge
,确认。表示消费端收到消息后的确认方式。
有三种确认方式:
- 自动确认:acknowledge = “none”
收到消息后直接进行确认,并不管消息的处理结果如何,如果消费者在处理这条消息进行业务处理的过程中出现异常,那么就相当于这个消息没有发挥作用(相当于消息丢失了)
- 手动确认:acknowledge = “manual”
消费者收到消息后并不直接确认,这个时候可以等待业务处理,如果业务处理成功,那么直接告诉 Broker
接收成功。
如果业务处理失败,可以进行一些其他的处理(比如让Broker再发一次消息)。
- 根据异常情况确认:acknowledge = “auto”,这种方式使用过于麻烦,我们不做讲解
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck()
,手动签收,如果出现异常,则调用channel.basicNack()
方法,让其自动重新发送消息。
2、代码演示
在消费者模块中引入编写配置文件
spring-rabbitmq-consumer.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.hzx.listener" /> <rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" /> </rabbit:listener-container> </beans>
|
1 2 3 4 5 6 7 8
| @Component public class AckListener implements MessageListener { @Override public void onMessage(Message message) {
System.out.println("接收到的消息为:" + new String(message.getBody())); } }
|
1 2 3 4 5 6 7 8 9 10
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void testAck() { while(true) {
} } }
|

上面情况演示的就是自动签收
3、设置手动签收(消费者端)
- 在
rabbitMQ
的 监听器容器中配置手动签收
1 2 3
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="ackListener" queue-names="test_queue_confirm" /> </rabbit:listener-container>
|
- 让监听器不再实现
MessageListener
接口,而是实现 ChannelAwareMessageListener
接口(这个接口是前者的子接口),并重写 onMessage
方法,和上一个接口的 onMessage
方法不同的是,这个 onMessage
方法有一个 Channel
对象,可以用于消息的手动签收
1 2 3 4
| @Override public void onMessage(Message message, Channel channel) throws Exception { }
|
- 如果消息成功处理,调用 Channel 对象的
basicAck()
进行签收;如果消息处理失败,则调用 Channel 对象的 basicNack()
进行拒收,此时可以让broker重新发送消息给 consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("收到的消息为:" + new String(message.getBody())); System.out.println("处理业务逻辑..."); channel.basicAck(deliveryTag,true); } catch (IOException e) { channel.basicNack(deliveryTag,true,true); } }
|
4、测试手动签收(成功)
修改生产者端的代码,使之能正确发送消息
1 2
| rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","测试手动签收...");
|
先启动消费者监听消息,然后启动生产者发送消息,可以看到消费端接收到了消息

5、测试手动签收(失败)
修改监听器代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("收到的消息为:" + new String(message.getBody())); System.out.println("处理业务逻辑..."); int i = 3/0; channel.basicAck(deliveryTag,true); System.out.println("信息签收成功!"); } catch (Exception e) { System.out.println("信息签收失败,正在重新发送!"); channel.basicNack(deliveryTag,true,true); } }
|
重新启动消费者,然后启动生产者发送消息
执行结果如下

6、Consumer Ack 小结
- 在
rabbit:listener-container
标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认 - 如果在消费端没有出现异常,则调用
channel.basicAck(deliveryTag,false);
方法确认签收消息 - 如果出现异常,则在catch中调用
basicNack
或 basicReject
,拒绝消息,让MQ重新发送消息。
7.3、消息可靠性总结
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
7.4、消费端限流
1、消费端限流步骤
acknowledge=”manual”
1 2 3
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="qosListener" queue-names="test_queue_confirm" /> </rabbit:listener-container>
|
- 在
listener-container
配置属性 prefetch = num
表示消费端一次最多从MQ中拉取num
条消息进行消费,直到手动确认这些消息消费完毕后,才会继续拉取
2、在消费端编写 QosListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Component public class QosListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("接收到的消息为:" + new String(message.getBody())); System.out.println("处理业务逻辑..."); channel.basicAck(deliveryTag,true); System.out.println("--------------------------------------------"); } }
|
3、编写生产者端代码,发送多条消息
1 2 3 4 5 6 7
| @Test public void testSend() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","测试手动签收..."); } }
|
同时注释掉监听器中用于签收消息的代码,这样在第一条消息没有被签收的情况下,消费者不会从MQ中再次拉取消息消费,所以控制台中只有一条消息被打印
4、测试
先启动消费者,然后启动生产者
可以看到,消费者只拉取了一条消息进行消费,没有处理生产者中发送的其余9条消息

如果添加签收,那么消费者会拉取一条,消费一条,签收一条,然后再进行拉取

5、消费端限流小结
- 在 <
rabbit:listener-container
> 中配置 prefetch属性设置消费端一次拉取多少消息 - 消费端的确认模式一定为手动确认。acknowledge=”manual”
7.5、TTL
1、概念说明
- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ
可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

用户在订单系统下订单后,订单系统会发送一个消息给支付系统,如果下订单的三十分钟内支付系统还没有取走消息进行消费(用户没有付款)的话,那么这条消息会从消息队列中被清除。
2、代码实现
在配置文件中配置一个新的队列,设置队列内消息过期时间(TTL)为10s
1 2 3 4 5 6 7 8 9 10 11 12 13
| <rabbit:queue name="test-queue-ttl" id="test-queue-ttl"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" > <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test-queue-ttl"/> </rabbit:bindings> </rabbit:topic-exchange>
|
编写代码,发送消息
1 2 3 4 5 6 7
| @Test public void testTtl() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl..."); } }
|
此时查看控制台,可以看到 test_queue_ttl
队列中有十条消息

十秒后再查看,发现队列中的消息全部消失,这是由于消息的存活时间到期

3、设置消息单独过期
需要创建一个 MessagePostProcessor
对象来指定,可以使用匿名内部类的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Test public void testTtl() { for (int i = 0; i < 10; i++) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }; rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor); } }
|
此时重新发送消息,可以看到消息在五秒钟后过期
4、过期时间说明
- 如果既设置了队列的过期时间,也设置了消息的过期时间,那么以时间段的为准
- 队列过期后,会将队列中所有消息全部移除
- 消息过期后,只有消息在队列顶端,才会判断是否过期(移除掉)
5、TTL小结
- 设置队列过期时间使用参数:
x-message-ttl
,单位:ms
(毫秒),会对整个队列消息统一过期。 - 设置消息过期时间使用参数:expiration。单位:
ms
(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 - 如果两者都进行了设置,以时间短的为准。
7.6、死信队列
1、概念说明
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

生产者将一个带有过期时间的消息投送到交换机,由交换机发送到与其绑定的队列中,当消息的过期时间到了之后,这个消息就成为了 死信 ,这个时候,存放死信的队列可以将死信发送到死信交换机,死信交换机可以将该死信发送给与其绑定的队列,这样可以让死信为其他消费者所消费。
我们需要思考两个问题:
2、消息成为死信的三种情况
- 队列消息长度到达限制(队列最多能存储10个消息,那么第十一个消息就会成为死信);
- 消费者拒接消费消息,
basicNack/basicReject
,并且不把消息重新放入原目标队列,requeue=false; - 原队列存在消息过期设置,消息到达超时时间未被消费;
3、队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

4、死信队列演示步骤
- 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
1 2 3 4 5 6 7 8 9
|
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"/> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
|
- 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
1 2 3 4 5 6 7 8 9
|
<rabbit:queue name="queue_dlx" id="queue_dlx"/> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
|
- x-dead-letter-exchange:死信交换机名称
- x-dead-letter-routing-key:发送给死信交换机的routing key
为上面的正常队列设置参数,绑定死信交换机
1 2 3 4 5 6 7 8 9
| <rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" /> </rabbit:queue-arguments> </rabbit:queue>
|
方法一:设置队列的过期时间
1 2 3 4 5 6 7 8
| <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments>
|
方法二:设置队列的最大长度
1 2 3 4 5 6 7 8
| <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments>
|
- 测试过期时间的死信消息
1 2 3 4 5 6 7 8 9 10 11
| MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }; rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,它会成为死信,到达死信队列中...",messagePostProcessor);
|
启动程序后,正常队列中会出现一条信息,在存活5s后,这条消息会过期,然后经由死信交换机跑到死信队列。
查看控制台,发现正常队列中有一条消息

5s后,发现正常队列的消息消失了,而死信队列中出现了一条信息

在死信队列中查看该消息的详细信息

- 测试长度限制后,消息死信
1 2 3 4 5 6 7
| @Test public void testDlx() { for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,超过10后的消息会成为死信,当前为第:" + (i + 1) + "条"); } }
|
启动程序后, 有十条消息会进入正常队列,而剩下的十条消息会进入死信队列,然后死信队列中会拥有11条消息;
等到队列过期后,正常队列中的剩下十条消息也全变为死信,此时死信队列有21条消息

队列过期后,死信队列有21条消息

- 拒收消息使消息成为死信
在消费者模块中编写一个 DlxListener
监听器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("收到的消息为:" + new String(message.getBody())); System.out.println("处理业务逻辑..."); int i = 3/0; channel.basicAck(deliveryTag,true); System.out.println("信息签收成功!"); } catch (Exception e) { System.out.println("出现异常,拒绝接收"); channel.basicNack(deliveryTag,true,false); } } }
|
在配置文件中配置此监听器
1 2 3 4
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx" /> </rabbit:listener-container>
|
启动消费者,然后使用生产者发送消息
1 2 3 4
| @Test public void testDlx() { rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,它会被拒收,然后进入死信队列..."); }
|
发送消息后,死信队列中的消息数目会直接变为22

idea控制台输出如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:queue name="queue_dlx" id="queue_dlx"/> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
|
5、小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
7.7、延迟队列
1、概念说明
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
- 定时器(不优雅)
- 延迟队列

很可惜,在RabbitMQ
中并未直接提供延迟队列的功能
不过我们可以使用 TTL
+ 死信队列
组合的方式来实现延迟队列的效果
2、RabbitMQ
实现延迟队列
RabbitMQ
使用 TTL
+ 死信队列
组合的方式来实现延迟队列的效果
订单系统向交换机发送一条存活时长为30min的消息后,消息在正常队列中存活30min,在30min后,消息从正常队列跑到死信交换机,死信交换机又将消息发送给死信队列,此时,绑定死信队列的库存系统就可以在订单系统发送消息的30min后拿到消息。
这样和延迟队列的效果一致。

以下是关于延迟队列的实现
3、使用 TTL + 死信队列 实现延迟队列
- 定义正常的交换机(order_exchange)和队列(order_queue)
1 2 3 4 5 6 7
| <rabbit:queue id="order_queue" name="order_queue" /> <rabbit:topic-exchange name="order_exchange" id="order_exchange" > <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"/> </rabbit:bindings> </rabbit:topic-exchange>
|
- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
1 2 3 4 5 6 7
| <rabbit:queue id="order_queue_dlx" name="order_queue_dlx" /> <rabbit:topic-exchange name="order_exchange_dlx" id="order_exchange_dlx" > <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
|
1 2 3 4 5 6 7 8
| <rabbit:queue id="order_queue" name="order_queue"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <rabbit:queue id="order_queue" name="order_queue"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="order_exchange" id="order_exchange" > <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"/> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx" /> <rabbit:topic-exchange name="order_exchange_dlx" id="order_exchange_dlx" > <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
|
1 2 3 4 5 6 7 8 9 10
| @Test public void testDelay() throws InterruptedException { rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id = 1,time = 16点26分"); for (int i = 10; i > 0; i--) { System.out.println(i + "..."); Thread.sleep(1000); } }
|
- 在消费者模块编写一个OrderListener监听器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("接收到的消息为:" + new String(message.getBody())); System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为已支付..."); System.out.println("取消订单,回滚库存..."); channel.basicAck(deliveryTag,true); } catch (Exception e) { channel.basicNack(deliveryTag,true,false); } } }
|
配置 OrderListener
监听器,注意这里必须监听死信队列,如果监听正常队列的话,那么订单系统一发出消息,消费者就接收到了
1 2 3
| <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <rabbit:listener ref="orderListener" queue-names="order_queue_dlx" /> </rabbit:listener-container>
|
可以看到,在生产者发送完消息后,消费者无法直接看到消息,只有在生产者发送消息的十秒后,消费者才能收到消息。

4、小结
- 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + 死信队列 来实现延迟队列效果。
7.8、日志与监控
1、概述
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

2、常用命令
rabbitmqctl
list_queues
rabbitmqctl
list_exchanges
rabbitmqctl
list_users
rabbitmqctl
list_connections
rabbitmqctl
list_consumers
rabbitmqctl
environment
rabbitmqctl
list_queues name messages_unacknowledged
rabbitmqctl
list_queues name memory
rabbitmqctl
list_queues name messages_ready
7.9、消息追踪
1、概念说明
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing
插件功能来实现消息追踪。
2、消息追踪-Firehose
firehose的机制是将生产者投递给rabbitmq
的消息,rabbitmq
投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace
,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename
和 deliver.queuename
。其中exchangename
和queuename
为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on
:开启Firehose命令rabbitmqctl trace_off
:关闭Firehose命令
3、消息追踪-rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
八、RabbitMQ 应用问题
8.1、消息可靠性保障–消息补偿机制

8.2、消息幂等性保障
1、什么是幂等性
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
2、实现–乐观锁机制
比如说用户发了两条消息,消息的version都是一,且此时数据库的记录的version也为1;
当数据消费完一条消息后,将数据库的记录的version改为2,此时收到第二条消息,由于此时数据库记录的version为2,与消息的version不和,所以不会第二条消息不会起实质性作用。
