一、MQ基本概念

1.1、MQ概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。用于分布式系统之间进行通信。

image-20210314160621807

使用MQ中间件

image-20210314160641024

小结

  • MQ即消息队列,是用于存储消息的中间件。
  • 分布式系统通信两种方式:直接远程调用 和 借助第三方完成间接通信
  • 发送方称为生产者,接收方称为消费者

1.2、MQ 的优势和劣势

1、优势

  • 应用解耦
  • 异步提速
  • 削峰填谷

2、劣势

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

3、MQ优势之应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。

  • 以下订单为例,在没有引入MQ之前,订单系统与库存系统、支付系统、物流系统耦合在一起。

如果与订单系统关联的某个系统(比如说库存系统)挂了,那么可能导致订单系统也跟着挂了,此时系统的容错低。

如果要新增一个与订单系统相关联的系统,那么此时只能修改订单系统的代码,此时系统的可维护性和可扩展性低。

image-20210314162029231

  • 在引入MQ中间件后,上面的流程图的变化如下图所示

image-20210314162544295

在引入MQ后,订单系统只需要发一条消息给MQ,然后让消费者(库存、支付、物流、X系统)从MQ中拿出消息进行消费即可,使用MQ使得应用间解耦,提示容错性和可维护性。

如果此时库存系统挂了,那么订单系统将不会受到影响,因为库存系统和订单系统之间是隔离的。

如果此时需要添加一个X系统,那么只需要让X系统从MQ中拿出消息进行消费即可,此时不需要修改订单系统代码,提升了系统的可维护性和可扩展性。

4、MQ优势之异步提速

如果应用流程中订单系统远程调用其他系统的时间均为300ms,访问数据库的时间为20ms,由于订单系统调用其他系统的行为是同步的(需要调完A再调用B),所以一个下单操作总共耗费的时间为 920ms,速度太慢。

image-20210314163256067

引入MQ后,订单系统只需要将消息放入MQ,然后操作自己的数据库之后即可给用户返回 下单成功 的提示,并不需要等其他系统消费完从MQ拿出的消息。

此时下单操作所耗费的时间为:25ms(写数据库花费20ms,发送消息花费5ms)

提升用户体验系统吞吐量(单位时间内处理请求的数目)

image-20210314163652543

5、MQ优势之削峰填谷

使用MQ后,可以提高系统稳定性。

如果一个A系统一秒钟最多处理1000个请求,如果此时请求量暴增至1秒5000个,那么A系统可能直接宕机,整个系统可能直接瘫痪,用户体验降低。

image-20210314164223206

在引入MQ后,即使请求量真的从1秒1000个瞬间暴增至1秒5000个,由于此时直接与用户对接的不是A系统,而是MQ首当其冲,所以此时我们令A系统每秒最多从MQ中拉取1000个请求,来保证A系统不会宕机。

image-20210314164343109

  • 削峰

在请求量暴增的情况下,MQ可以限制消费消息的速度,将高峰期产生的流量堆积在MQ中,这样以来,高峰就被削掉了

image-20210314164853758

  • 填谷

由于在“削峰”过程中,大量消息堆积在MQ中,所以在高峰期过后,消费消息的速度还是会在一段时间内维持高水平(在上面的场景中为1000),直到消费完积压得消息,这就是填谷。

  • 削峰填谷示意图

image-20210314165114086

6、MQ劣势之系统可用性降低

在引入MQ前,只需要保证组成分布式系统的各个系统没有问题,在引入MQ后还需要保证MQ没有问题。

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。

如何保证MQ的高可用是我们引入MQ后必须思考的一个问题。

7、MQ劣势之系统复杂度提高

在引入MQ后,系统的复杂性提高,我们要考虑以下问题

  • 生产者发送的消息会不会被消费者重复消费?
  • 如何处理消息丢失的情况?
  • 如何保证消息传递的顺序性?

8、MQ劣势之一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.3、使用MQ应该满足什么条件?

  • 生产者不需要从消费者处获得反馈。

引入消息队列之前的远程调用,其接口的返回值应该为空,这才让明明下层的动作还没做完,上层却当动作做完了继续往后走,即所谓 异步 成为了可能。

举个例子:如果A调用完B后需要拿着B的返回结果作为参数再去调用C,那么这个时候引入MQ无法实现异步调用。

  • 容许短暂的不一致性。
  • 确实是用了有效果。

即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

1.4、常见的MQ产品

目前业界有很多的 MQ 产品,例如 RabbitMQRocketMQActiveMQ、KafkaZeroMQMetaMQ等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

  • 常见的MQ产品对比

image-20210314171817994

二、RabbitMQ

2.1、AMQP

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

2006年,AMQP 规范发布。类比HTTP。

image-20210314172848450

生产者发送消息给交换机,交换机根据各种各样的路由规则将消息发送到不同的容器中,再由消费者进行消费。

2.2、RabbitMQ简介

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1、基础架构

RabbitMQ的基础架构如下图:

image-20210314173356306

交换机(Exchange)通过 Binding 和队列(Queue)一一对应。

2、RabbitMQ中的相关概念

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。

  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

  • Connection:publisher/consumer 和 broker 之间的 TCP 连接。

  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。

  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:

    1. direct (point-to-point)
    2. topic (publish-subscribe)
    3. fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走

  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

3、RabbitMQ的六种工作模式

RabbitMQ 提供了 6 种工作模式:

  • 简单模式
  • work queues
  • Publish/Subscribe (发布订阅模式)
  • Routing (路由模式)
  • Topics (主题模式)
  • RPC 远程调用模式 (远程调用,不太算 MQ;暂不作介绍)

官网中关于以上模式的介绍

image-20210314180250054

2.3、AMQP和JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

1、AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

2、JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

JMS 是 JavaEE 规范中的一种,类比JDBC。

3 AMQP 与 JMS 区别

JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

JMS规定了两种消息模式;而AMQP的消息模式更加丰富

4 再谈市场上常见的消息队列

ActiveMQ:基于JMS

ZeroMQ:基于C语言开发

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

RocketMQ:基于JMS,阿里巴巴产品

Kafka:类似MQ的产品;分布式消息系统,高吞吐量。

三、RabbitMQ 快速入门

3.1、入门程序

1、需求

使用RabbitMQ六种模式之中的简单模式完成消息传递

image-20210320210713517

2、步骤

  • 创建工程(消费者,生产者)
  • 分别添加依赖
  • 编写代码,令生产者发送消息,消费者接收消息。

3、引入依赖

在生产者和消费者模块中分别引入以下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

4、编写生产者代码

步骤如下:

  • 创建连接工厂

  • 设置参数

    • 设置ip地址,默认只为 localhost
    • 设置port端口,默认为 5672
    • 设置虚拟机,默认为 /
    • 设置用户密码,默认值都为 admin
  • 创建连接 Connection

  • 创建 Channel

参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数

  • 创建队列 Queue
  • 发送消息

参数:
1 exchange
交换机每次,简单模式下交换机会使用默认的
2 routingKey
路由名称
3 props:配置信息
4 body:发送消息的数据(byte数组)

  • 释放资源

生产者的具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建队列Queue
/*
参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数
*/
//如果没有名字为hello_world的队列,那么会自动创建该队列,否则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
//6 发送消息
/*
参数:
1 exchange
交换机每次,简单模式下交换机会使用默认的
2 routingKey
路由名称
3 props:配置信息
4 body:发送消息的数据(byte数组)
*/
String data = "hello rabbitMQ";
//这里如果使用默认的交换机,那么routingKey必须和交换机名称一致
//即同样为hello_world
channel.basicPublish("","hello_world",null,data.getBytes());
//7 释放资源(先开启的后关闭)
channel.close();
connection.close();

运行代码, 可以看到有一个名为 hello_world 的队列被创建

image-20210320220220877

点击队列,可以看到有一个消息已经就绪,等待被消费

image-20210320220337441

5、编写消费者代码

步骤如下:

  • 创建连接工厂

  • 设置参数

    • 设置ip地址,默认只为 localhost
    • 设置port端口,默认为 5672
    • 设置虚拟机,默认为 /
    • 设置用户密码,默认值都为 admin
  • 创建连接 Connection

  • 创建 Channel

参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数

  • 创建队列 Queue
  • 创建回调 DefaultConsumer 对象

重写 DefaultConsumerhandleDelivery 方法,消费者收到消息后会自动调用该方法

参数说明

/*
这里的 DefaultConsumer 对象为回调对象,
我们可以重写回调方法 handleDelivery
收到消息后会自动执行该方法

参数说明:
1 consumerTag:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/

  • 消费者接收消息

/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/

  • 消费者不需要关闭连接释放资源,因为消费者需要监听端口

消费者完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
 //1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建队列Queue
/*
参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数
*/
//如果没有名字为hello_world的队列,那么会自动创建该队列,否则不会创建
channel.queueDeclare("hello_world",true,false,false,null);

//6 消费者接收消息
/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/
/*
这里的DefaultConsumer对象为回调对象,
我们可以重写回调方法handleDelivery,
收到消息后会自动执行该方法
参数说明:
1 consumerTag:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.getExchange());
System.out.println("routingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
//这里队列的名称必须和生产者创建的队列名称相同
channel.basicConsume("hello_world",true,consumer);

//消费者不需要关闭资源,需要一直监听
  • 启动消费者,查看结果

image-20210320223307615

3.2、小结

上述的入门案例中其实使用的是以下的简单方式

image-20210320223411565

在上图的模型中,有以下概念

  • P:生产者,也就是要发送消息的程序
  • C:消费者,消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息

四、RabbitMQ工作模式

4.1、Work queues工作队列模式

1、模式说明

image-20210320224013389

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

注意,工作队列模式的消费者间是竞争关系,即最后只有一个消费者可以从消息队列中取到消息。

2、应用场景

对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

3、编写生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建队列Queue
channel.queueDeclare("work_queues",true,false,false,null);
for (int i = 0; i < 10; i++) {
String data = "hello rabbitMQ + i";
//6 发送消息
channel.basicPublish("","work_queues",null,data.getBytes());
}
//7 释放资源(先开启的后关闭)
channel.close();
connection.close();

4、编写两个消费者

这里给出其中一个的代码,第二个和第一个代码一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建队列Queue
//如果没有名字为hello_world的队列,那么会自动创建该队列,否则不会创建
channel.queueDeclare("work_queues",true,false,false,null);

//6 消费者接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
//这里队列的名称必须和生产者创建的队列名称相同
channel.basicConsume("work_queues",true,consumer);

//消费者不需要关闭资源,需要一直监听

5、测试

先启动两个消费者,查看工作窗口中的连接情况

image-20210320230335776

启动生产者,查看消息被消费情况

  • 消费者1

image-20210320230830722

  • 消费者2

image-20210320230901377

6、小结

  • 在一个队列中如果有多个消费者,那么消费者之间对同一个消息队列中的消息的关系是竞争关系。

  • Work Queues 对于任务过重或任务较多情况,使用工作队列可以提高任务处理的速度

例如:短信服务部署有多个,只需要有一个节点发送成功即可。

4.2、Pub/Sub订阅发布模式

在Pub/Sub之后的工作模式中,一条消息可以被多个消费者消费。

1、模式说明

image-20210321124903796

相比 Work_Queues和简单模式,订阅发布模式多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机X
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息,缓存消息
  • Exchange:交换机X,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交某个特别队列、递交给所有队列、或是将消息丢弃,具体如何操作,取决于交换机的类型。交换机有以下三种类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列。
    • Direct:定向,将消息交给符合指定 routing key 的队列。
    • Topic:通配符,将消息交给符合 routing pattern (路由模式) 的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!

2、需求

生产者生产一条消息,然后将这条消息分发给 C1 和 C2,这条消息是一条日志,C1会将消息打印到控制台,C2将日志存储数据库。

3、生产者代码

步骤如下:

  • 创建连接工厂
  • 设置参数
  • 创建连接
  • 创建Channel
  • 创建交换机

参数说明:

1 exchange:交换机名称
2 type:交换机类型(建议使用枚举类型)
DIRECT(“direct”):定向
FANOUT(“fanout”):广播
TOPIC(“topic”):通配符方式
HEADERS(“headers”):参数匹配
3 durable:是否持久化
4 autoDelete:是否自动删除
5 internal:内部使用,一般设置为false
6 arguments:参数

  • 创建队列
  • 绑定交换机和队列

参数说明:
1 queue:队列的名称
2 exchange:交换机名称
3 routing key:路由键,绑定规则
如果交换机类型为 fanout,那么routing key设置为””
这样交换机会把消息发给所有队列

  • 发送消息
  • 释放资源

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建交换机
/***
* 参数说明
* 1 exchange:交换机名称
* 2 type:交换机类型(建议使用枚举类型)
* DIRECT("direct"):定向
* FANOUT("fanout"):广播
* TOPIC("topic"):通配符方式
* HEADERS("headers"):参数匹配
* 3 durable:是否持久化
* 4 autoDelete:是否自动删除
* 5 internal:内部使用,一般设置为false
* 6 arguments:参数
*/
String exchangeName = "test_fanout";
//这样就创建了一个名为test_fanout的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6 创建队列,这里需要创建两个队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7 绑定队列和交换机
/***
* 参数说明:
* 1 queue:队列的名称
* 2 exchange:交换机名称
* 3 routing key:路由键,绑定规则
* 如果交换机类型为 fanout,那么routing key设置为""
* 这样交换机会把消息发给所有队列
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
//8 发送消息
String data = "日志信息如下:用户[张三]调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"",null,data.getBytes());
//9 释放资源
channel.close();
connection.close();

4、消费者代码

消费者不需要创建队列,只需要绑定生产者创建的队列即可。

下面只给出消费者2的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
    //1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//5 消费者接收消息
/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/
/*
这里的DefaultConsumer对象为回调对象,
我们可以重写回调方法handleDelivery,
收到消息后会自动执行该方法
参数说明:
1 consumerTag:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("消费者一将日志信息保存到数据库...");
System.out.println("----------------------------------------");
}
};
//这里队列的名称必须和生产者创建的队列名称相同
channel.basicConsume(queue2Name,true,consumer);

//消费者不需要关闭资源,需要一直监听
}

5、测试

先启动生产者,然后启动两个消费者,发现两个消费者的控制台都输出了对应的日志信息。

  • 消费者1

image-20210321135557577

  • 消费者2

image-20210321135534003

  • rabbitmq控制台

image-20210321135639849

6、小结

  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

  • 生产者需要创建一个 fanout 类型的交换机

注意:fanout 类型的交换机不需要指定 routing key

  • Pub/Sub的交换机会将消息发送给每一个与此交换机绑定的队列。

7、与WorkQueues工作模式的区别

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

4.3、Routing路由模式

1、模式说明

  • 队列和交换机的绑定不能是任意绑定了,而是要指定一个 Routing Key(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 Routing Key
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routing Key 和消息的 Routing Key 完全一致时,才会接收到消息。

image-20210321141048174

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2、需求

只有当日志级别为 error 时,才将日志信息存储到数据库(同时输出到控制台)。

当日志级别在 error 以下时,只将日志输出到控制台。

3、生产者

  • 在创建交换机时需要指定交换机的类型为 DIRECT
1
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
  • 在绑定队列和交换机时需要指定 Routing Key

如果需要给一个队列绑定多个 Routing Key,那么可以绑定多次

1
2
3
4
5
6
//队列一的绑定
channel.queueBind(queue1Name,exchangeName,"error");
//队列二的绑定:info、error和warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"warning");
channel.queueBind(queue2Name,exchangeName,"error");
  • 完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建交换机
/***
* 参数说明
* 1 exchange:交换机名称
* 2 type:交换机类型(建议使用枚举类型)
* DIRECT("direct"):定向
* FANOUT("fanout"):广播
* TOPIC("topic"):通配符方式
* HEADERS("headers"):参数匹配
* 3 durable:是否持久化
* 4 autoDelete:是否自动删除
* 5 internal:内部使用,一般设置为false
* 6 arguments:参数
*/
String exchangeName = "test_direct";
//这样就创建了一个名为test_direct的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6 创建队列,这里需要创建两个队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7 绑定队列和交换机
/***
* 参数说明:
* 1 queue:队列的名称
* 2 exchange:交换机名称
* 3 routing key:路由键,绑定规则
* 如果交换机类型为 fanout,那么routing key设置为""
* 这样交换机会把消息发给所有队列
*/
//队列一的绑定
channel.queueBind(queue1Name,exchangeName,"error");
//队列二的绑定:info、error和warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"warning");
channel.queueBind(queue2Name,exchangeName,"error");
//8 发送消息
String data = "日志信息如下:用户[张三]调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"info",null,data.getBytes());
//9 释放资源
channel.close();
connection.close();
  • 启动,查看 RabbitMQ控制台

image-20210321142722924

可以看到,队列2有一条消息待发送,而队列1没有消息。

这是因为我们发送消息时使用的 Routing Key 为info,故 Routing Key 为error的队列无法收到消息。

4、消费者代码

这里只给出保存到消费者1的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//5 消费者接收消息
/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/
/*
这里的DefaultConsumer对象为回调对象,
我们可以重写回调方法handleDelivery,
收到消息后会自动执行该方法
参数说明:
1 consumerTag:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("消费者一将日志信息保存到数据库...");
System.out.println("----------------------------------------");
}
};
//这里队列的名称必须和生产者创建的队列名称相同
channel.basicConsume(queue1Name,true,consumer);

//消费者不需要关闭资源,需要一直监听

5、启动两个消费者,查看结果

如果此时生产者发送的日志级别(routing key)为 info,那么只有消费者二可以接收到此消息

如果此时生产者发送的日志级别(routing key)为 error,那么消费者一和消费者二都可以接收到此消息

  • 当发送的日志级别(routing key)为 info
1
2
3
//8 发送消息
String data = "日志信息如下:用户[张三]调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"info",null,data.getBytes());

消费者一

image-20210321144244543

消费者二

image-20210321144325191

  • 当生产者发送的日志级别(routing key 为 error
1
2
3
//8 发送消息
String data = "日志信息如下:用户[张三]调用了batchDelete方法...日志级别:error...";
channel.basicPublish(exchangeName,"error",null,data.getBytes());

消费者一

image-20210321144809814

消费者二

image-20210321144827854

6、小结

Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

4.4、Topic 通配符模式

1、模式说明

image-20210321145318552

image-20210321145340635

图解:

  • 红色Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色Queue:绑定的是 #.news,因此凡是以 .news 结尾的 routing key 都会被匹配

所以在上面的第二张图中,routing keyusa.news 的消息会被转发到 第一个队列和第二个队列中。

routing keyusa.weather 的消息会被转发到 第一个队列和第三个队列中。

routing keyeurope.news 的消息会被转发到 第二个队列和第四个队列中。

routing keyeurope.weather 的消息会被转发到 第三个队列和第四个队列中。

  • 通配符说明

“*”可以匹配一个标识符(代替一个单词)

“#”可以匹配0个或多个标识符(代替0个或多个单词)

2、需求

除了 error 级别的日志,我还希望将订单系统中的所有日志都记录到数据库中

3、生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
//5 创建交换机
/***
* 参数说明
* 1 exchange:交换机名称
* 2 type:交换机类型(建议使用枚举类型)
* DIRECT("direct"):定向
* FANOUT("fanout"):广播
* TOPIC("topic"):通配符方式
* HEADERS("headers"):参数匹配
* 3 durable:是否持久化
* 4 autoDelete:是否自动删除
* 5 internal:内部使用,一般设置为false
* 6 arguments:参数
*/
String exchangeName = "test_topic";
//这样就创建了一个名为test_topic的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6 创建队列,这里需要创建两个队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7 绑定队列和交换机
/***
* 参数说明:
* 1 queue:队列的名称
* 2 exchange:交换机名称
* 3 routing key:路由键,绑定规则
* 如果交换机类型为 fanout,那么routing key设置为""
* 这样交换机会把消息发给所有队列
*/
//队列一的绑定,系统的名称,日志的级别
//需求,所有error日志和订单系统的日志都记录入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
//队列二的绑定:什么日志都打印到控制台
channel.queueBind(queue2Name,exchangeName,"*.*");
//8 发送消息
String data = "日志信息如下:用户[张三]调用了batchDelete方法...日志级别:error...";
channel.basicPublish(exchangeName,"order.info",null,data.getBytes());
//9 释放资源
channel.close();
connection.close();
  • 启动生产者,查看RabbitMQ控制台

image-20210321152029637

4、消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 //1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2 设置参数
//2.1 设置ip地址,默认值为localhost
factory.setHost("192.168.56.101");
//2.2 设置端口,默认值为5672
factory.setPort(5672);
//2.3 设置虚拟机,默认值为"/"
factory.setVirtualHost("/wuhu");
//2.4 设置用户,默认为 guest
factory.setUsername("admin");
//2.5 设置密码,默认为 guest
factory.setPassword("admin");
//3 创建连接
Connection connection = factory.newConnection();
//4 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//5 消费者接收消息
/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/
/*
这里的DefaultConsumer对象为回调对象,
我们可以重写回调方法handleDelivery,
收到消息后会自动执行该方法
参数说明:
1 consumerTag:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("消费者二将日志信息打印到控制台...");
System.out.println("----------------------------------------");
}
};
//这里队列的名称必须和生产者创建的队列名称相同
channel.basicConsume(queue2Name,true,consumer);

//消费者不需要关闭资源,需要一直监听

上面只给出消费者2的代码

5、测试

  • 当生产者发送的消息的 routing key 为 order.info 时
1
2
3
//8 发送消息
String data = "日志信息如下:用户[张三]调用了batchDelete方法...日志级别:error...";
channel.basicPublish(exchangeName,"order.info",null,data.getBytes());

查看消费者1

image-20210321152858577

消费者2

image-20210321152917266

  • 当生产者发送消息的 routing key 为 “goods.info” 时
1
2
3
//8 发送消息
String data = "日志信息如下:用户[张三]调用了batchDelete方法...日志级别:error...";
channel.basicPublish(exchangeName,"goods.info",null,data.getBytes());

消费者1

image-20210321153122385

消费者2

image-20210321153154041

  • 当生产者发送消息的 routing key 为 “goods.error” 时
1
2
3
//8 发送消息
String data = "日志信息如下:用户[张三]调用了batchDelete方法...日志级别:error...";
channel.basicPublish(exchangeName,"goods.error",null,data.getBytes());

消费者1

image-20210321153255138

消费者2

image-20210321153327988

6、小结

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

4.5、模式总结

RabbitMQ工作模式:

1、简单模式

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列