Java应用学习(五)-RabbitMQ学习笔记(一)
一、MQ基本概念
1.1、MQ概述
MQ全称 Message Queue(消息队列)
,是在消息的传输过程中保存消息的容器。多
用于分布式系统之间进行通信。
使用MQ中间件
小结
- MQ即消息队列,是用于存储消息的中间件。
- 分布式系统通信两种方式:直接远程调用 和 借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者。
1.2、MQ 的优势和劣势
1、优势
- 应用解耦
- 异步提速
- 削峰填谷
2、劣势
- 系统可用性降低
- 系统复杂度提高
- 一致性问题
3、MQ优势之应用解耦
系统的耦合性越高,容错性就越低,可维护性就越低。
- 以下订单为例,在没有引入MQ之前,订单系统与库存系统、支付系统、物流系统耦合在一起。
如果与订单系统关联的某个系统(比如说库存系统)挂了,那么可能导致订单系统也跟着挂了,此时系统的容错低。
如果要新增一个与订单系统相关联的系统,那么此时只能修改订单系统的代码,此时系统的可维护性和可扩展性低。
- 在引入MQ中间件后,上面的流程图的变化如下图所示
在引入MQ后,订单系统只需要发一条消息给MQ,然后让消费者(库存、支付、物流、X系统)从MQ中拿出消息进行消费即可,使用MQ使得应用间解耦,提示容错性和可维护性。
如果此时库存系统挂了,那么订单系统将不会受到影响,因为库存系统和订单系统之间是隔离的。
如果此时需要添加一个X系统,那么只需要让X系统从MQ中拿出消息进行消费即可,此时不需要修改订单系统代码,提升了系统的可维护性和可扩展性。
4、MQ优势之异步提速
如果应用流程中订单系统远程调用其他系统的时间均为300ms,访问数据库的时间为20ms,由于订单系统调用其他系统的行为是同步的(需要调完A再调用B),所以一个下单操作总共耗费的时间为
920ms
,速度太慢。
引入MQ后,订单系统只需要将消息放入MQ,然后操作自己的数据库之后即可给用户返回
下单成功
的提示,并不需要等其他系统消费完从MQ拿出的消息。此时下单操作所耗费的时间为:
25ms
(写数据库花费20ms,发送消息花费5ms)提升用户体验和系统吞吐量(单位时间内处理请求的数目)
5、MQ优势之削峰填谷
使用MQ后,可以提高系统稳定性。
如果一个A系统一秒钟最多处理1000个请求,如果此时请求量暴增至1秒5000个,那么A系统可能直接宕机,整个系统可能直接瘫痪,用户体验降低。
在引入MQ后,即使请求量真的从1秒1000个瞬间暴增至1秒5000个,由于此时直接与用户对接的不是A系统,而是MQ首当其冲,所以此时我们令A系统每秒最多从MQ中拉取1000个请求,来保证A系统不会宕机。
- 削峰
在请求量暴增的情况下,MQ可以限制消费消息的速度,将高峰期产生的流量堆积在MQ中,这样以来,高峰就被削掉了
- 填谷
由于在“削峰”过程中,大量消息堆积在MQ中,所以在高峰期过后,消费消息的速度还是会在一段时间内维持高水平(在上面的场景中为1000),直到消费完积压得消息,这就是填谷。
- 削峰填谷示意图
6、MQ劣势之系统可用性降低
在引入MQ前,只需要保证组成分布式系统的各个系统没有问题,在引入MQ后还需要保证MQ没有问题。
系统引入的外部依赖越多,系统稳定性越差。一旦
MQ
宕机,就会对业务造成影响。如何保证MQ的高可用是我们引入MQ后必须思考的一个问题。
7、MQ劣势之系统复杂度提高
在引入MQ后,系统的复杂性提高,我们要考虑以下问题
- 生产者发送的消息会不会被消费者重复消费?
- 如何处理消息丢失的情况?
- 如何保证消息传递的顺序性?
8、MQ劣势之一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
1.3、使用MQ应该满足什么条件?
- 生产者不需要从消费者处获得反馈。
引入消息队列之前的远程调用,其接口的返回值应该为空,这才让明明下层的动作还没做完,上层却当动作做完了继续往后走,即所谓 异步 成为了可能。
举个例子:如果A调用完B后需要拿着B的返回结果作为参数再去调用C,那么这个时候引入MQ无法实现异步调用。
- 容许短暂的不一致性。
- 确实是用了有效果。
即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
1.4、常见的MQ产品
目前业界有很多的 MQ 产品,例如
RabbitMQ
、RocketMQ
、ActiveMQ、Kafka
、ZeroMQ
、MetaMQ
等,也有直接使用Redis
充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
- 常见的MQ产品对比
二、RabbitMQ
2.1、AMQP
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议)
,是一个网络协议,是应用层协议
的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
2006年,AMQP 规范发布。类比HTTP。
生产者发送消息给交换机,交换机根据各种各样的路由规则将消息发送到不同的容器中,再由消费者进行消费。
2.2、RabbitMQ简介
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。
Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
1、基础架构
RabbitMQ的基础架构如下图:
交换机(Exchange)通过
Binding
和队列(Queue)一一对应。
2、RabbitMQ中的相关概念
Broker:
接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
Virtual host:
出于多租户和安全因素设计的,把AMQP
的基本组件划分到一个虚拟的分组中,类似于网络中的namespace
概念。当多个不同的用户使用同一个RabbitMQ server
提供的服务时,可以划分出多个vhost
,每个用户在自己的vhost
创建exchange/queue
等。Connection:
publisher/consumer 和 broker 之间的 TCP 连接。Channel:
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。Exchange:
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,
分发消息到queue 中去。常用的类型有:direct (point-to-point)
topic (publish-subscribe)
fanout (multicast)
Queue:
消息最终被送到这里等待 consumer 取走Binding:
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
3、RabbitMQ的六种工作模式
RabbitMQ 提供了 6 种工作模式:
- 简单模式
- work queues
- Publish/Subscribe (发布订阅模式)
- Routing (路由模式)
- Topics (主题模式)
- RPC 远程调用模式 (远程调用,不太算 MQ;暂不作介绍)
官网中关于以上模式的介绍
2.3、AMQP和JMS
MQ是消息通信的模型;
实现MQ的大致有两种主流方式:AMQP、JMS。
1、AMQP
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
2、JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,
是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS 是
JavaEE
规范中的一种,类比JDBC。
3 AMQP 与 JMS 区别
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
4 再谈市场上常见的消息队列
ActiveMQ
:基于JMS
ZeroMQ
:基于C语言开发
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ
:基于JMS,阿里巴巴产品
Kafka:类似MQ的产品;分布式消息系统,高吞吐量。
三、RabbitMQ 快速入门
3.1、入门程序
1、需求
使用RabbitMQ六种模式之中的简单模式完成消息传递
2、步骤
- 创建工程(消费者,生产者)
- 分别添加依赖
- 编写代码,令生产者发送消息,消费者接收消息。
3、引入依赖
在生产者和消费者模块中分别引入以下依赖
1 | <dependencies> |
4、编写生产者代码
步骤如下:
创建连接工厂
设置参数
- 设置
ip
地址,默认只为localhost
- 设置
port
端口,默认为 5672 - 设置虚拟机,默认为 /
- 设置用户密码,默认值都为
admin
- 设置
创建连接 Connection
创建 Channel
参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数
- 创建队列 Queue
- 发送消息
参数:
1 exchange
交换机每次,简单模式下交换机会使用默认的
2 routingKey
路由名称
3 props:配置信息
4 body:发送消息的数据(byte数组)
- 释放资源
生产者的具体代码如下
1 | //1 创建连接工厂 |
运行代码, 可以看到有一个名为 hello_world 的队列被创建
点击队列,可以看到有一个消息已经就绪,等待被消费
5、编写消费者代码
步骤如下:
创建连接工厂
设置参数
- 设置
ip
地址,默认只为localhost
- 设置
port
端口,默认为 5672 - 设置虚拟机,默认为 /
- 设置用户密码,默认值都为
admin
- 设置
创建连接 Connection
创建 Channel
参数介绍
1 queque:队列名称
2 durable:是否持久化,如果不持久化,那么mq重启即失
3 exclusive:
是否独占(只能有一个消费者监听此队列)
当Connection关闭时,是否删除此队列
4 autoDelete:
是否自动删除,当没有Consumer时,自动删除队列
5 arguments:参数
- 创建队列 Queue
- 创建回调
DefaultConsumer
对象
重写
DefaultConsumer
的handleDelivery
方法,消费者收到消息后会自动调用该方法参数说明
/*
这里的DefaultConsumer
对象为回调对象,
我们可以重写回调方法handleDelivery
,
收到消息后会自动执行该方法参数说明:
1consumerTag
:标识
2 envelope:获取信息的对象,比如说交换器、路由key
3 properties:配置信息
4 body:数据
*/
- 消费者接收消息
/*
参数:
1 queue:队列名称
2 autoACK:是否自动确认
3 callback:回调对象
*/
- 消费者不需要关闭连接释放资源,因为消费者需要监听端口
消费者完整代码如下:
1 | //1 创建连接工厂 |
- 启动消费者,查看结果
3.2、小结
上述的入门案例中其实使用的是以下的简单方式
在上图的模型中,有以下概念
- P:生产者,也就是要发送消息的程序
- C:消费者,消息的接收者,会一直等待消息到来
- queue:消息队列,图中红色部分,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从中取出消息
四、RabbitMQ工作模式
4.1、Work queues工作队列模式
1、模式说明
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。注意,工作队列模式的消费者间是竞争关系,即最后只有一个消费者可以从消息队列中取到消息。
2、应用场景
对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
3、编写生产者代码
1 | //1 创建连接工厂 |
4、编写两个消费者
这里给出其中一个的代码,第二个和第一个代码一致
1 | //1 创建连接工厂 |
5、测试
先启动两个消费者,查看工作窗口中的连接情况
启动生产者,查看消息被消费情况
- 消费者1
- 消费者2
6、小结
在一个队列中如果有多个消费者,那么消费者之间对同一个消息队列中的消息的关系是竞争关系。
Work Queues 对于任务过重或任务较多情况,使用工作队列可以提高任务处理的速度
例如:短信服务部署有多个,只需要有一个节点发送成功即可。
4.2、Pub/Sub订阅发布模式
在Pub/Sub之后的工作模式中,一条消息可以被多个消费者消费。
1、模式说明
相比 Work_Queues和简单模式,订阅发布模式多了一个 Exchange 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机X
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息,缓存消息
- Exchange:交换机X,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交某个特别队列、递交给所有队列、或是将消息丢弃,具体如何操作,取决于交换机的类型。交换机有以下三种类型
- Fanout:广播,将消息交给所有绑定到交换机的队列。
- Direct:定向,将消息交给符合指定 routing key 的队列。
- Topic:通配符,将消息交给符合 routing pattern (路由模式) 的队列。
Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!
2、需求
生产者生产一条消息,然后将这条消息分发给 C1 和 C2,这条消息是一条日志,C1会将消息打印到控制台,C2将日志存储数据库。
3、生产者代码
步骤如下:
- 创建连接工厂
- 设置参数
- 创建连接
- 创建Channel
- 创建交换机
参数说明:
1 exchange:交换机名称
2 type:交换机类型(建议使用枚举类型)
DIRECT(“direct”):定向
FANOUT(“fanout”):广播
TOPIC(“topic”):通配符方式
HEADERS(“headers”):参数匹配
3 durable:是否持久化
4 autoDelete:是否自动删除
5 internal:内部使用,一般设置为false
6 arguments:参数
- 创建队列
- 绑定交换机和队列
参数说明:
1 queue:队列的名称
2 exchange:交换机名称
3 routing key:路由键,绑定规则
如果交换机类型为 fanout,那么routing key设置为””
这样交换机会把消息发给所有队列
- 发送消息
- 释放资源
完整代码如下:
1 | //1 创建连接工厂 |
4、消费者代码
消费者不需要创建队列,只需要绑定生产者创建的队列即可。
下面只给出消费者2的代码
1 | //1 创建连接工厂 |
5、测试
先启动生产者,然后启动两个消费者,发现两个消费者的控制台都输出了对应的日志信息。
- 消费者1
- 消费者2
- rabbitmq控制台
6、小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
生产者需要创建一个 fanout 类型的交换机
注意:fanout 类型的交换机不需要指定 routing key
- Pub/Sub的交换机会将消息发送给每一个与此交换机绑定的队列。
7、与WorkQueues
工作模式的区别
工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
4.3、Routing路由模式
1、模式说明
- 队列和交换机的绑定不能是任意绑定了,而是要指定一个 Routing Key(路由key)
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 Routing Key
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routing Key 和消息的 Routing Key 完全一致时,才会接收到消息。
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
2、需求
只有当日志级别为 error 时,才将日志信息存储到数据库(同时输出到控制台)。
当日志级别在 error 以下时,只将日志输出到控制台。
3、生产者
- 在创建交换机时需要指定交换机的类型为 DIRECT
1 | channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); |
- 在绑定队列和交换机时需要指定 Routing Key
如果需要给一个队列绑定多个 Routing Key,那么可以绑定多次
1 | //队列一的绑定 |
- 完整代码
1 | //1 创建连接工厂 |
- 启动,查看 RabbitMQ控制台
可以看到,队列2有一条消息待发送,而队列1没有消息。
这是因为我们发送消息时使用的 Routing Key 为info,故 Routing Key 为error的队列无法收到消息。
4、消费者代码
这里只给出保存到消费者1的代码
1 | //1 创建连接工厂 |
5、启动两个消费者,查看结果
如果此时生产者发送的日志级别(routing key)为 info,那么只有消费者二可以接收到此消息
如果此时生产者发送的日志级别(routing key)为 error,那么消费者一和消费者二都可以接收到此消息
- 当发送的日志级别(routing key)为 info时
1 | //8 发送消息 |
消费者一
消费者二
- 当生产者发送的日志级别(routing key 为 error 时
1 | //8 发送消息 |
消费者一
消费者二
6、小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
4.4、Topic 通配符模式
1、模式说明
图解:
- 红色Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配
所以在上面的第二张图中,
routing key
为 usa.news 的消息会被转发到 第一个队列和第二个队列中。
routing key
为 usa.weather 的消息会被转发到 第一个队列和第三个队列中。
routing key
为 europe.news 的消息会被转发到 第二个队列和第四个队列中。
routing key
为 europe.weather 的消息会被转发到 第三个队列和第四个队列中。
- 通配符说明
“*”可以匹配一个标识符(代替一个单词)
“#”可以匹配0个或多个标识符(代替0个或多个单词)
2、需求
除了 error 级别的日志,我还希望将订单系统中的所有日志都记录到数据库中
3、生产者代码
1 | //1 创建连接工厂 |
- 启动生产者,查看RabbitMQ控制台
4、消费者代码
1 | //1 创建连接工厂 |
上面只给出消费者2的代码
5、测试
- 当生产者发送的消息的 routing key 为 order.info 时
1 | //8 发送消息 |
查看消费者1
消费者2
- 当生产者发送消息的 routing key 为 “goods.info” 时
1 | //8 发送消息 |
消费者1
消费者2
- 当生产者发送消息的 routing key 为 “goods.error” 时
1 | //8 发送消息 |
消费者1
消费者2
6、小结
Topic主题模式可以实现
Publish/Subscribe发布与订阅模式
和Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
4.5、模式总结
RabbitMQ工作模式:
1、简单模式
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列