五、Spring 整合 RabbitMQ

需求:使用 Spring 整合 RabbitMQ

5.1、搭建生产者工程

1、创建模块,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.12.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>


<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.12.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>

2、编写配置文件

resource 目录下创建配置文件 rabbitmq.properties,添加以下配置

1
2
3
4
5
rabbitmq.host=192.168.56.101
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/wuhu

创建 Spring 核心配置文件 spring-rabbitmq-producer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:properties/rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>

<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>

<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

3、创建测试类测试

  • 使用RabbitTemplate发送消息,简单模式的 routing key 必须和队列名称相同
1
2
3
4
5
6
7
8
9
10
11
12
13
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
//1 注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testHelloWorld() {
//2 发送消息
rabbitTemplate.convertAndSend("spring_queue","hello,world Spring...!");
}
}

启动 testHelloWorld方法

image-20210321162508832

  • 使用 RabbitTemplate 发送消息,发送消息类型为 Fanout 广播类型
1
2
3
4
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("spring_fanout_exchange","","Spring fanout...!");
}

查看 RabbitMQ 控制台

image-20210321163720570

  • 使用 RabbitTemplate 发送消息,发送消息类型为 Topic 广播类型
1
2
3
4
5
@Test
public void testTopic() {
//这里的routing key 设置为 heima.hehe.haha 后,只有heima.#可以匹配到
rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","Spring Topic...!");
}

运行方法后,可以看到只有well队列收到消息,这是因为其他两个队列的通配符不匹配

image-20210321164625313

三个队列的匹配规则如下

image-20210321164657913

5.2、搭建消费者工程

1、创建模块,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.12.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>


<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.12.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>

2、编写配置文件

resource 目录下创建配置文件 rabbitmq.properties,添加以下配置

1
2
3
4
5
rabbitmq.host=192.168.56.101
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/wuhu

创建 Spring 核心配置文件 spring-rabbitmq-consumer.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>



<bean id="springQueueListener" class="com.hzx.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.hzx.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.hzx.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.hzx.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.hzx.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.hzx.rabbitmq.listener.TopicListenerWell2"/>

<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>

3、创建消息监听器

创建的监听器必须和上面 bean 中配置的class路径一致,创建的监听器必须实现一个 MessageListener 接口,实现 onMessage 方法

  • 队列监听器

创建第一个监听器

1
2
3
4
5
6
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("收到的消息为:" + new String(message.getBody());
}
}
  • 广播队列监听器一

由于广播队列二的监听器代码差不多,所以不再赘述

1
2
3
4
5
6
public class FanoutListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("广播队列一收到的消息为:" + new String(message.getBody()));
}
}
  • topic队列监听器一
1
2
3
4
5
6
public class TopicListenerStar implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("topic队列一收到的消息为:" + new String(message.getBody()));
}
}

4、使用测试类进行测试

编写一个测试类,只需要加载Spring配置文件和监听器,消费者即可收到信息

1
2
3
4
5
6
7
8
9
10
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void testListener() {
while(true) {

}
}
}

查看结果

image-20210321171600419

5、小结

消费者只需要在配置文件中使用bean标签配置监听器对象并创建监听器类即可。

监听器类必须实现 MessageListener 接口,重写 onMessage 方法

六、Spring Boot整合RabbitMQ

6.1、生产者

1、整合步骤

  • 创建生产者Spring Boot工程

  • 引入依赖坐标

  • 编写 配置文件 ,进行基本信息配置。

  • 定义交换机,队列及绑定关系的配置类

  • 注入RabbitTemplate ,调用方法完成消息发送

2、引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>

3、编写主启动类和配置类

下面给出配置类的代码

  • 在配置类中创建一个交换机对象

使用 ExchangeBuilder.topicExchange(交换机名)来创建一个通配符交换机,使用.durable(true)方法来持久化该交换机,最终使用build()方法创建该交换机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_topic_queue";
/***
* 1 交换机
* @return
*/
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
}
  • 在配置类中创建一个队列对象

使用QueueBuilder的durable方法持久化队列,使用build方法创建该队列

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_topic_queue";
/***
* 2 Queue队列
* @return
*/
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
}
  • 在配置类中绑定队列和交换机,并指定routing key

其中 bind 方法中填写要绑定的队列,to方法添加要绑定的交换机,with方法指定 routing key,noargs表示没有其他参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_topic_queue";

/***
* 3 队列和交换机绑定关系,binding
* 3.1 需要知道是哪个队列
* 3.2 需要知道是哪个交换机
* 3.3 设置routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("boot.#")
.noargs();
}
}

  • 配置类全部代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.hzx.config;

import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_topic_queue";
/***
* 1 交换机
* @return
*/
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}

/***
* 2 Queue队列
* @return
*/
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}

/***
* 3 队列和交换机绑定关系,binding
* 3.1 需要知道是哪个队列
* 3.2 需要知道是哪个交换机
* 3.3 设置routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("boot.#")
.noargs();
}
}

4、编写测试类代码,发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//1 注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//2 发送消息
public void testSend() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.wuhu","Spring Boot With MQ 芜湖");
}
}
  • 运行上述的 testSend 方法,然后查看 MQ 控制台

image-20210321200801250

  • 在 Queues 中点击 getMessages 可以查看到我们发送的消息

image-20210321200945203

6.2、消费者

1、整合步骤

  • 创建工程
  • 引入依赖
  • 编写 配置文件,配置基本信息
  • 定义监听类,使用 @RabbitListener 注解完成队列监听

2、创建工程,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

3、创建监听器类

使用 @RabbitListener 注解完成队列监听

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class RabbitMQListener {
/***
* 使用@RabbitListener注解监听一个队列中的消息
* 然后将消息封装到方法参数 message 中
* @param message
*/
@RabbitListener(queues = "boot_topic_queue")
public void ListenQueue(Message message) {
System.out.println("接收到的消息为:" + message);
}
}

4、启动主启动类,查看结果

消费者端接收到了消息

image-20210321204808938

6.3、Spring Boot整合MQ–Fanout模式

创建一个生产者工程

1、通过 Spring Initializr 快速构建一个Spring Boot项目

  • 添加模块

image-20210322195551672

  • 引入依赖

image-20210322195630695

2、编写 yml 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置RabbitMQ的基本信息,包括ip、端口、用户账号密码
spring:
rabbitmq:
# 配置IP
host: 192.168.56.101
# 端口
port: 5672
# 账号
username: admin
# 密码
password: admin
# 虚拟机
virtual-host: /
server:
port: 8080

3、编写 OrderService 服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;

/***
* 模拟用户下单
* @param userId
* @param productId
* @param nums
*/
public void makeOrder(String userId,String productId,int nums) {
//1 根据商品id查询库存是否充足
//2 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
//3 通过MQ完成消息的分发,参数1:交换机、参数2:路由key/queue名称、参数3:消息内容
String exchangeName = "fanout_order_exchange";
//广播模式下路由key为空
String routingKey = "";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}

4、编写配置类,创建队列与交换机

  • 创建 fanout 模式的交换机
1
2
3
4
5
6
7
8
9
/***
* 1 声明注册 fanout 模式的交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
//创建一个名为 fanout_order_exchange 的交换机,持久化,不自动删除
return new FanoutExchange("fanout_order_exchange",true,false);
}
  • 创建三个队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***
* 2 声明队列
* sms.fanout.queue
* email.fanout.queue
* duanxin.fanout.queue
*/
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue",true);
}

@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue",true);
}

@Bean
public Queue duanxinQueue() {
return new Queue("duanxin.fanout.queue",true);
}
  • 完成广播交换机与三个队列的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/***
* 3 完成队列和交换机的绑定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}

@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}

@Bean
public Binding duanxinBinding() {
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
  • 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class RabbitMQConfiguration {
/***
* 1 声明注册 fanout 模式的交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
//创建一个名为 fanout_order_exchange 的交换机,持久化,不自动删除
return new FanoutExchange("fanout_order_exchange",true,false);
}

/***
* 2 声明队列
* sms.fanout.queue
* email.fanout.queue
* duanxin.fanout.queue
*/
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue",true);
}

@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue",true);
}

@Bean
public Queue duanxinQueue() {
return new Queue("duanxin.fanout.queue",true);
}


/***
* 3 完成队列和交换机的绑定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}

@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}

@Bean
public Binding duanxinBinding() {
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}

5、在测试类中发送消息

1
2
3
4
5
6
7
@Autowired
private OrderService orderService;

@Test
void contextLoads() {
orderService.makeOrder("1","1",12);
}
  • 结果

image-20210322203454380

  • 查看控制台

可以看到创建的三个队列都有消息等待消费

image-20210322203540517

6、构建项目

使用 Spring Initializr 创建项目,引入依赖

7、编写 yml 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置RabbitMQ的基本信息,包括ip、端口、用户账号密码
spring:
rabbitmq:
# 配置IP
host: 192.168.56.101
# 端口
port: 5672
# 账号
username: admin
# 密码
password: admin
# 虚拟机
virtual-host: /
server:
port: 8082

8、创建Consumer类,绑定队列接收消息

  • 创建 FanoutDuanxinConsumer 类,绑定 duanxin.fanout.queue 队列

使用 @RabbitListener 的queues属性绑定队列

1
2
3
4
5
6
7
8
@Service
@RabbitListener(queues = "duanxin.fanout.queue")
public class FanoutDuanxinConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("duanxin fanout ---接收到的广播信息为:" + message);
}
}

在需要接收消息的方法上使用 @RabbitHandler 注解,然后使用 message 参数接收消息

9、启动消费者主启动类,查看结果

可以看到收到了消息

image-20210322210801958

6.4、Spring Boot整合MQ–Direct模式

1、创建 DirectRabbitMQConfiguration 配置类

创建交换机和队列,完成绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class DirectRabbitMQConfiguration {
/***
* 1 声明注册 direct 模式的交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
//创建一个名为 direct_order_exchange 的交换机,持久化,不自动删除
return new DirectExchange("direct_order_exchange",true,false);
}

/***
* 2 声明队列
* sms.direct.queue
* email.direct.queue
* duanxin.direct.queue
*/
@Bean
public Queue smsDirectQueue() {
return new Queue("sms.direct.queue",true);
}

@Bean
public Queue emailDirectQueue() {
return new Queue("email.direct.queue",true);
}

@Bean
public Queue duanxinDirectQueue() {
return new Queue("duanxin.direct.queue",true);
}


/***
* 3 完成队列和交换机的绑定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}

@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}

@Bean
public Binding duanxinBinding() {
return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin");
}
}

2、发送消息

给短信和email发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
public void makeOrderDirect(String userId,String productId,int nums) {
//1 根据商品id查询库存是否充足
//2 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
//3 通过MQ完成消息的分发,参数1:交换机、参数2:路由key/queue名称、参数3:消息内容
String exchangeName = "direct_order_exchange";
//广播模式下路由key为空
String routingKey = "sms";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"email",orderId);
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}

3、查看结果

控制台

image-20210322212657794

rabbitMQ控制台

image-20210322212824711

4、创建Consumer类

email.direct.queue 队列为例

1
2
3
4
5
6
7
8
@Service
@RabbitListener(queues = "email.direct.queue")
public class DirectEmailConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("email direct ---接收到的信息为:" + message);
}
}

5、启动消费者主启动类,查看结果

可以看到只有 emailsms 收到了消息

image-20210322213248111

6.5、Spring Boot整合TTL

1、队列过期时间的配置

  • 创建一个配置类

这个配置类用于创建 TTL 用到的交换机、队列

使用Map来传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class TTLRabbitMQConfiguration {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("ttl_direct_exchange",true,false);
}

@Bean
public Queue directTTLQueue() {
//设置队列过期时间
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
return new Queue("ttl.direct.queue",true,false,false,args);
}
@Bean
public Binding directTTLBinding() {
return BindingBuilder.bind(directTTLQueue()).to(directExchange()).with("ttl");
}
}
  • 发送消息

在生产者端编写方法发送消息

1
2
3
4
5
6
7
8
9
10
11
12
public void makeOrderDirectTTL(String userId,String productId,int nums) {
//1 根据商品id查询库存是否充足
//2 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
//3 通过MQ完成消息的分发,参数1:交换机、参数2:路由key/queue名称、参数3:消息内容
String exchangeName = "ttl_direct_exchange";
//广播模式下路由key为空
String routingKey = "ttl";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
  • 查看结果

可以看到在ttl.direct.queue队列中出现了一条消息,5s后消失

image-20210322215609134

2、消息过期时间的配置

  • 创建一个普通队列
1
2
3
4
5
6
7
8
9
@Bean
public Queue directTTLMessageQueue() {
return new Queue("ttlmsg.direct.queue",true,false,false);
}

@Bean
public Binding directTTLMessageBinding() {
return BindingBuilder.bind(directTTLMessageQueue()).to(directExchange()).with("ttlmsg");
}
  • 在发送消息时给消息设置过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void makeOrderDirectTTLMsg(String userId,String productId,int nums) {
//1 根据商品id查询库存是否充足
//2 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
//3 通过MQ完成消息的分发,参数1:交换机、参数2:路由key/queue名称、参数3:消息内容
String exchangeName = "ttl_direct_exchange";
//广播模式下路由key为空
String routingKey = "ttl";
//发送消息,这里需要给消息设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
}

6.6、Spring Boot整合死信队列

1、新增一个配置类,用于创建死信队列和死信交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/***
* 创建一个名为 dead_direct_exchange 的死信交换机
* @return
*/
@Bean
public DirectExchange deadDirectExchange() {
return new DirectExchange("dead_direct_exchange",true,false);
}

/***
* 创建一个名为 dead.direct.queue 的死信队列
* @return
*/
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue",true);
}

2、绑定死信队列和死信交换机

1
2
3
4
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
}

3、让正常的TTL队列与死信交换机绑定关系

使用Map传入参数,绑定死信交换机

1
2
3
4
5
6
7
8
9
@Bean
public Queue directTTLQueue() {
//设置队列过期时间
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl.direct.queue",true,false,false,args);
}

6.7、小结

  • Spring Boot提供了快速整合 Rabbit MQ 的方式
  • 基本信息在yml配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
  • 生产端直接注入 RabbitTemplate 完成消息发送
  • 消费端直接使用 @RabbitListener 注解完成消息接收。

七、RabbitMQ 高级特性

7.1、消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方式希望杜绝任何消息丢失或者投递失败的场景。 RabbitMQ 为我们提供了两种方式来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

RabbitMQ 整个消息投递的路径为:

producer –> rabbitmq broker –> exchange –> queue –> consumer

  • confirm模式:消息从 producer 到 exchange 则会返回一个 confirmCallback(不管投递失败与否都会返回,但返回的值有所不同)
  • return模式:消息从 exchange–>queue 投递失败则会返回一个 returnCallback (只有投递失败才会返回)

我们可以利用上面的两个 callBack 控制消息的可靠性投递

1、confirm代码演示

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />

<!-- 配置消息的可靠性投递(在生产端) -->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm" >
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

</beans>
  • 创建测试类

确认模式开启步骤:

  1. 在connection-factory(配置文件)中开启 publisher-confirms=”true”
  2. rabbitTemplate中定义ConfirmCallBack回调函数(函数式编程)
  3. 发送消息,可以看到回调函数被执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;


/***
* 确认模式
* 步骤:
* 1 确认模式开启,在connection-factory(配置文件)中开启
* publisher-confirms="true"
* 2 在rabbitTemplate中定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被执行了...");
}
});

//3 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm...");
}
}
  • 启动,查看结果

image-20210321214139793

2、confirm方法的参数说明

  • confirm方法
1
2
3
4
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了...");
}

参数说明

  • CorrelationData

相关配置信息

  • ack

关键参数,表示 exchange 交换机是否成功收到了消息

true表示成功,false表示失败

  • cause

失败的原因

  • 重新修改上面的代码,添加 ack 判断
1
2
3
4
5
6
7
8
9
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了...");
if(ack) {
System.out.println("信息成功投递到exchange中!" + cause);
} else {
System.out.println("信息无法正确投递到exchange中!失败原因:" + cause);
}
}
  • 重新执行结果如下

image-20210321215616593

如果投递失败,以后可能会进行一些处理,保证消息再次投递

3、return代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/***
* 回退模式:消息发送到exchange后,Exchange路由到Queue失败时才会执行
* ReturnCallBack
* 步骤:
* 1 开启回退模式
* publisher-returns="true"
* 2 设置ReturnCallBack
* 3 设置Exchange处理消息的模式
* 3.1 如果消息没有路由到Queue,则丢弃
* 3.2 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {
//设置交换机处理失败消息的模式:这里设置为返回
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("return 方法执行了...");
}
});


//3 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm...");
}

4、return处理方式说明

  • 需要在connection-factory中开启回退模式
1
2
3
4
5
6
7
8
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"/>
  • 设置交换机处理失败消息的模式,如果没有设置,那么默认丢弃信息,如果我们希望信息处理失败返回提示,那么我们需要设置交换机处理失败消息的方式
1
2
//设置交换机处理失败消息的模式:这里设置为返回
rabbitTemplate.setMandatory(true);
  • 将上述代码中的 routing Key 故意写错,重新启动程序查看结果

image-20210321222218150

5、return方法参数说明

1
2
3
4
5
6
7
8
9
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 方法执行了...");
System.out.println("消息为:" + new String(message.getBody()));
System.out.println("错误码为:" + replyCode);
System.out.println("错误文本为:" + replyText);
System.out.println("交换机为:" + exchange);
System.out.println("路由键为:" + routingKey);
}
  • message:消息对象
  • replyCode:错误码
  • replyText:错误信息
  • exchange:交换机
  • routingKey:路由键

再次执行上面产生错误的代码

image-20210321222916150

6、消息可靠投递小结

  • 设置ConnectionFactory的publisher-confirms=”true” 开启 确认模式。
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。

当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  • 设置ConnectionFactory的publisher-returns=”true” 开启 回退 模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。

使用channel下列方法,完成事务控制:

  1. txSelect(), 用于将当前channel设置成transaction模式
  2. txCommit(),用于提交事务
  3. txRollback(),用于回滚事务

7.2、Consumer Ack

1、ack说明

ack 指的是 Acknowledge,确认。表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge = “none”

收到消息后直接进行确认,并不管消息的处理结果如何,如果消费者在处理这条消息进行业务处理的过程中出现异常,那么就相当于这个消息没有发挥作用(相当于消息丢失了)

  • 手动确认:acknowledge = “manual”

消费者收到消息后并不直接确认,这个时候可以等待业务处理,如果业务处理成功,那么直接告诉 Broker 接收成功。

如果业务处理失败,可以进行一些其他的处理(比如让Broker再发一次消息)。

  • 根据异常情况确认:acknowledge = “auto”,这种方式使用过于麻烦,我们不做讲解

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

2、代码演示

在消费者模块中引入编写配置文件

  • spring-rabbitmq-consumer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>

<!-- 定义包扫描,这样我们只需要在listener包下类上添加@Conponent注解即可 -->
<context:component-scan base-package="com.hzx.listener" />

<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />
</rabbit:listener-container>
</beans>
  • 创建一个监听器类 AckListener
1
2
3
4
5
6
7
8
@Component
public class AckListener implements MessageListener {
@Override
public void onMessage(Message message) {

System.out.println("接收到的消息为:" + new String(message.getBody()));
}
}
  • 创建一个测试类加载配置文件
1
2
3
4
5
6
7
8
9
10
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void testAck() {
while(true) {

}
}
}
  • 启动方法,查看结果

image-20210321225552003

上面情况演示的就是自动签收

3、设置手动签收(消费者端)

  • rabbitMQ 的 监听器容器中配置手动签收
1
2
3
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm" />
</rabbit:listener-container>
  • 让监听器不再实现 MessageListener 接口,而是实现 ChannelAwareMessageListener 接口(这个接口是前者的子接口),并重写 onMessage方法,和上一个接口的 onMessage 方法不同的是,这个 onMessage 方法有一个 Channel 对象,可以用于消息的手动签收
1
2
3
4
@Override
public void onMessage(Message message, Channel channel) throws Exception {

}
  • 如果消息成功处理,调用 Channel 对象的 basicAck() 进行签收;如果消息处理失败,则调用 Channel 对象的 basicNack() 进行拒收,此时可以让broker重新发送消息给 consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1 接收消息
System.out.println("收到的消息为:" + new String(message.getBody()));
//2 处理业务逻辑
System.out.println("处理业务逻辑...");
//3 手动签收
//第二个参数:签收多条消息
channel.basicAck(deliveryTag,true);
} catch (IOException e) {
//4 出现异常时拒绝签收,让消息重新发送
//第三个参数requeue:重回队列,如果设置为true,则消息重回到queue,broker会重新发送该消息给消费者
channel.basicNack(deliveryTag,true,true);
}
}

4、测试手动签收(成功)

修改生产者端的代码,使之能正确发送消息

1
2
//3 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","测试手动签收...");

先启动消费者监听消息,然后启动生产者发送消息,可以看到消费端接收到了消息

image-20210322104227341

5、测试手动签收(失败)

修改监听器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1 接收消息
System.out.println("收到的消息为:" + new String(message.getBody()));
//2 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;
//3 手动签收
//第二个参数:签收多条消息
channel.basicAck(deliveryTag,true);
System.out.println("信息签收成功!");
} catch (Exception e) {
System.out.println("信息签收失败,正在重新发送!");
//4 出现异常时拒绝签收,让消息重新发送
//第三个参数requeue:重回队列,如果设置为true,则消息重回到queue,broker会重新发送该消息给消费者
channel.basicNack(deliveryTag,true,true);
}
}

重新启动消费者,然后启动生产者发送消息

执行结果如下

image-20210322104853086

6、Consumer Ack 小结

  • rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  • 如果出现异常,则在catch中调用 basicNackbasicReject,拒绝消息,让MQ重新发送消息。

7.3、消息可靠性总结

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 生产方确认Confirm
  • 消费方确认Ack
  • Broker高可用

7.4、消费端限流

1、消费端限流步骤

  • 确保ack机制为手动签收

acknowledge=”manual”

1
2
3
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm" />
</rabbit:listener-container>
  • listener-container配置属性 prefetch = num

表示消费端一次最多从MQ中拉取num条消息进行消费,直到手动确认这些消息消费完毕后,才会继续拉取

2、在消费端编写 QosListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/***
* Consumer 限流机制
* 步骤:
* 1 确保ack机制为手动签收
* acknowledge="manual"
* 2 在listener-container配置属性 prefetch = num
* 表示消费端一次最多从MQ中拉取 num 条消息进行消费
* 直到手动确认这些消息消费完毕后,才会继续拉取
* prefetch="1"
*/
@Component
public class QosListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("接收到的消息为:" + new String(message.getBody()));
System.out.println("处理业务逻辑...");
channel.basicAck(deliveryTag,true);
System.out.println("--------------------------------------------");
}
}

3、编写生产者端代码,发送多条消息

1
2
3
4
5
6
7
@Test
public void testSend() {
for (int i = 0; i < 10; i++) {
//发送多条消息
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","测试手动签收...");
}
}

同时注释掉监听器中用于签收消息的代码,这样在第一条消息没有被签收的情况下,消费者不会从MQ中再次拉取消息消费,所以控制台中只有一条消息被打印

1
//channel.basicAck(deliveryTag,true);

4、测试

先启动消费者,然后启动生产者

可以看到,消费者只拉取了一条消息进行消费,没有处理生产者中发送的其余9条消息

image-20210322112310933

如果添加签收,那么消费者会拉取一条,消费一条,签收一条,然后再进行拉取

image-20210322112842161

5、消费端限流小结

  • 在 < rabbit:listener-container > 中配置 prefetch属性设置消费端一次拉取多少消息
  • 消费端的确认模式一定为手动确认。acknowledge=”manual”

7.5、TTL

1、概念说明

  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

image-20210322113201168

用户在订单系统下订单后,订单系统会发送一个消息给支付系统,如果下订单的三十分钟内支付系统还没有取走消息进行消费(用户没有付款)的话,那么这条消息会从消息队列中被清除。

2、代码实现

在配置文件中配置一个新的队列,设置队列内消息过期时间(TTL)为10s

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- ttl -->
<rabbit:queue name="test-queue-ttl" id="test-queue-ttl">
<rabbit:queue-arguments>
<!-- x-message-ttl指队列的过期时间 -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test-queue-ttl"/>
</rabbit:bindings>
</rabbit:topic-exchange>

编写代码,发送消息

1
2
3
4
5
6
7
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
//发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
}
}

此时查看控制台,可以看到 test_queue_ttl 队列中有十条消息

image-20210322141344751

十秒后再查看,发现队列中的消息全部消失,这是由于消息的存活时间到期

image-20210322141448263

3、设置消息单独过期

需要创建一个 MessagePostProcessor 对象来指定,可以使用匿名内部类的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
/***
* 消息的后处理对象,用于设置一些消息的参数信息
*/
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1 设置message的信息,设置消息过期时间,单位为毫秒
message.getMessageProperties().setExpiration("5000");
//2 返回该信息
return message;
}
};
//发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
}
}

此时重新发送消息,可以看到消息在五秒钟后过期

4、过期时间说明

  • 如果既设置了队列的过期时间,也设置了消息的过期时间,那么以时间段的为准
  • 队列过期后,会将队列中所有消息全部移除
  • 消息过期后,只有消息在队列顶端,才会判断是否过期(移除掉)

5、TTL小结

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。

7.6、死信队列

1、概念说明

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

image-20210322143915389

生产者将一个带有过期时间的消息投送到交换机,由交换机发送到与其绑定的队列中,当消息的过期时间到了之后,这个消息就成为了 死信 ,这个时候,存放死信的队列可以将死信发送到死信交换机,死信交换机可以将该死信发送给与其绑定的队列,这样可以让死信为其他消费者所消费。

我们需要思考两个问题:

  • 队列如何绑定死信交换机?
  • 消息如何成为死信?

2、消息成为死信的三种情况

  • 队列消息长度到达限制(队列最多能存储10个消息,那么第十一个消息就会成为死信);
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间未被消费;

3、队列绑定死信交换机

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

image-20210322145019954

4、死信队列演示步骤

  • 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
1
2
3
4
5
6
7
8
9
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"/>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
1
2
3
4
5
6
7
8
9
<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • 正常队列绑定死信交换机,设置两个参数
  1. x-dead-letter-exchange:死信交换机名称
  2. x-dead-letter-routing-key:发送给死信交换机的routing key

为上面的正常队列设置参数,绑定死信交换机

1
2
3
4
5
6
7
8
9
 <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!-- 3 正常队列绑定死信交换机 -->
<rabbit:queue-arguments>
<!-- 3.1 x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!-- 3.2 x-dead-letter-routing-key:发送给死信交换机的routing key -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
</rabbit:queue-arguments>
</rabbit:queue>
  • 让正常队列中的消息成为死信

方法一:设置队列的过期时间

1
2
3
4
5
6
7
8
<rabbit:queue-arguments>
<!-- 3.1 x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!-- 3.2 x-dead-letter-routing-key:发送给死信交换机的routing key -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!-- 4.1 设置队列的过期时间 ttl -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>

方法二:设置队列的最大长度

1
2
3
4
5
6
7
8
<rabbit:queue-arguments>
<!-- 3.1 x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!-- 3.2 x-dead-letter-routing-key:发送给死信交换机的routing key -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!-- 4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
  • 发送测试死信消息
  1. 测试过期时间的死信消息
1
2
3
4
5
6
7
8
9
10
11
//1 测试过期时间,信息消息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1 设置message的信息,设置消息过期时间,单位为毫秒
message.getMessageProperties().setExpiration("5000");
//2 返回该信息
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,它会成为死信,到达死信队列中...",messagePostProcessor);

启动程序后,正常队列中会出现一条信息,在存活5s后,这条消息会过期,然后经由死信交换机跑到死信队列。

查看控制台,发现正常队列中有一条消息

image-20210322151923962

5s后,发现正常队列的消息消失了,而死信队列中出现了一条信息

image-20210322152145353

在死信队列中查看该消息的详细信息

image-20210322152438057

  1. 测试长度限制后,消息死信
1
2
3
4
5
6
7
@Test
public void testDlx() {
//2 测试消息个数超过队列长度后,消息成为死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,超过10后的消息会成为死信,当前为第:" + (i + 1) + "条");
}
}

启动程序后, 有十条消息会进入正常队列,而剩下的十条消息会进入死信队列,然后死信队列中会拥有11条消息;

等到队列过期后,正常队列中的剩下十条消息也全变为死信,此时死信队列有21条消息

image-20210322153049236

队列过期后,死信队列有21条消息

image-20210322153112939

  1. 拒收消息使消息成为死信

在消费者模块中编写一个 DlxListener 监听器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1 接收消息
System.out.println("收到的消息为:" + new String(message.getBody()));
//2 处理业务逻辑
System.out.println("处理业务逻辑...");
//手动模拟一个异常
int i = 3/0;
channel.basicAck(deliveryTag,true);
System.out.println("信息签收成功!");
} catch (Exception e) {
System.out.println("出现异常,拒绝接收");
channel.basicNack(deliveryTag,true,false);
}
}
}

在配置文件中配置此监听器

1
2
3
4
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!-- 令监听器监听正常队列 -->
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx" />
</rabbit:listener-container>

启动消费者,然后使用生产者发送消息

1
2
3
4
@Test
public void testDlx() {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.wuhu","这是一条消息,它会被拒收,然后进入死信队列...");
}

发送消息后,死信队列中的消息数目会直接变为22

image-20210322154348127

idea控制台输出如下

image-20210322154408776

  • 上述完整代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!-- 3 正常队列绑定死信交换机 -->
<rabbit:queue-arguments>
<!-- 3.1 x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!-- 3.2 x-dead-letter-routing-key:发送给死信交换机的routing key -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!-- 4.1 设置队列的过期时间 ttl -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!-- 4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"/>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>

5、小结

  • 死信交换机和死信队列和普通的没有区别
  • 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
  • 消息成为死信的三种情况:
  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,并且不重回队列;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

7.7、延迟队列

1、概念说明

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

  • 需求:
  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

  • 实现方式:
  1. 定时器(不优雅)
  2. 延迟队列

image-20210322155500743

很可惜,在RabbitMQ中并未直接提供延迟队列的功能

不过我们可以使用 TTL + 死信队列 组合的方式来实现延迟队列的效果

2、RabbitMQ实现延迟队列

RabbitMQ 使用 TTL + 死信队列 组合的方式来实现延迟队列的效果

订单系统向交换机发送一条存活时长为30min的消息后,消息在正常队列中存活30min,在30min后,消息从正常队列跑到死信交换机,死信交换机又将消息发送给死信队列,此时,绑定死信队列的库存系统就可以在订单系统发送消息的30min后拿到消息。

这样和延迟队列的效果一致。

image-20210322160120416

以下是关于延迟队列的实现

3、使用 TTL + 死信队列 实现延迟队列

  • 定义正常的交换机(order_exchange)和队列(order_queue)
1
2
3
4
5
6
7
<!-- 1 定义正常的交换机(order_exchange)和队列(order_queue) -->
<rabbit:queue id="order_queue" name="order_queue" />
<rabbit:topic-exchange name="order_exchange" id="order_exchange" >
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
1
2
3
4
5
6
7
<!-- 2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx) -->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx" />
<rabbit:topic-exchange name="order_exchange_dlx" id="order_exchange_dlx" >
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • 绑定,设置正常队列过期时间为30min
1
2
3
4
5
6
7
8
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3 绑定,设置正常队列过期时间为30min -->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
  • 全部配置为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!-- 1 定义正常的交换机(order_exchange)和队列(order_queue) -->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3 绑定,设置正常队列过期时间为30min -->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange" id="order_exchange" >
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx) -->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx" />
<rabbit:topic-exchange name="order_exchange_dlx" id="order_exchange_dlx" >
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/>
</rabbit:bindings>
</rabbit:topic-exchange>
  • 编写生产者代码
1
2
3
4
5
6
7
8
9
10
@Test
public void testDelay() throws InterruptedException {
//1 订单系统在下单完成后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id = 1,time = 16点26分");
//2 打印一个倒计时
for (int i = 10; i > 0; i--) {
System.out.println(i + "...");
Thread.sleep(1000);
}
}
  • 在消费者模块编写一个OrderListener监听器类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到的消息为:" + new String(message.getBody()));
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为已支付...");
System.out.println("取消订单,回滚库存...");
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
channel.basicNack(deliveryTag,true,false);
}
}
}

配置 OrderListener 监听器,注意这里必须监听死信队列,如果监听正常队列的话,那么订单系统一发出消息,消费者就接收到了

1
2
3
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx" />
</rabbit:listener-container>
  • 启动消费者,然后启动生产者发送消息

可以看到,在生产者发送完消息后,消费者无法直接看到消息,只有在生产者发送消息的十秒后,消费者才能收到消息。

image-20210322164335674

4、小结

  • 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  • RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + 死信队列 来实现延迟队列效果。

7.8、日志与监控

1、概述

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

image-20210322164632418

2、常用命令

  • 查看队列

rabbitmqctl list_queues

  • 查看exchanges

rabbitmqctl list_exchanges

  • 查看用户

rabbitmqctl list_users

  • 查看连接

rabbitmqctl list_connections

  • 查看消费者信息

rabbitmqctl list_consumers

  • 查看环境变量

rabbitmqctl environment

  • 查看未被确认的队列

rabbitmqctl list_queues name messages_unacknowledged

  • 查看单个队列的内存使用

rabbitmqctl list_queues name memory

  • 查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

7.9、消息追踪

1、概念说明

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

2、消息追踪-Firehose

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangenamedeliver.queuename。其中exchangenamequeuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。

  1. rabbitmqctl trace_on:开启Firehose命令
  2. rabbitmqctl trace_off:关闭Firehose命令

3、消息追踪-rabbitmq_tracing

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

启用插件:rabbitmq-plugins enable rabbitmq_tracing

八、RabbitMQ 应用问题

8.1、消息可靠性保障–消息补偿机制

image-20210322185933727

8.2、消息幂等性保障

1、什么是幂等性

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

2、实现–乐观锁机制

比如说用户发了两条消息,消息的version都是一,且此时数据库的记录的version也为1;

当数据消费完一条消息后,将数据库的记录的version改为2,此时收到第二条消息,由于此时数据库记录的version为2,与消息的version不和,所以不会第二条消息不会起实质性作用。

image-20210322191006079