一、分布式事务

1.1、分布式事务问题

==分布式系统环境下由不同的服务之间通过网络远程协作完成事务称为分布式事务。==

单体应用被拆分为微服务应用,原来的一个程序被拆分为多个完全独立的应用,这几个应用连接多个独立的数据源

以下单支付这个操作为例,如果仓储服务、订单服务、账户服务运行在三个独立的微服务中,且这三个独立的微服务各自连接不同的数据源。假设用户现在下单某件商品.

  • 仓储服务:对指定商品扣除仓储数量
  • 订单服务:根据用户需求创建订单
  • 账户服务:从用户余额中抽除金额

在上面的场景中,每个微服务内部数据的一致性可以通过 ==本地== 事务保证,但是全局数据的一致性无法保证

img

==在一次业务操作中,如果需要跨 多个数据源 或跨 多个系统 进行远程调用,那么就会产生分布式事务问题。==

Seata 用于保证全局数据一致性。

1.2、分布式事务基础理论 – CAP

0、场景模拟

如下图,是商品信息管理的执行流程

image-20210514192550158

整体执行流程如下:

  • 商品服务请求主数据库写入商品信息(添加、删除、修改商品)
  • 主数据库向商品服务响应写操作成功
  • 商品服务请求从数据库读取商品信息

1、什么是 CAP ?

CAP原则又称CAP定理,指的是在一个分布式系统中,【一致性】(Consistency)、【可用性】(Availability)、【分区容错性】(Partition tolerance)。

CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

2、C-Consistency

一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上时,从任意节点读取到的数据都是最新的状态。

  • ==在上面的场景中,商品信息的读写要满足一致性就是要实现如下目标:==
  1. 商品服务写入主数据库成功,则向从服务器查询新数据时也成功。

  2. 商品服务写入主数据库失败,则向从服务器查询新数据时也失败。

  • ==如何实现数据一致性?==
  1. 向主数据库进行写操作后要将数据同步到从数据库。
  2. 写入主数据库后,在向从数据库同步期间要将从数据库锁定,等待同步完成后再释放锁,以免在新数据库写入成功后,向从数据库查询到旧的数据。
  • 分布式系统一致性特点:
  1. 由于存在数据同步的过程,所以写操作的响应会有一定延迟

  2. 为了保证数据一致性,所以会对资源暂时进行锁定,数据同步完成后释放锁定资源

  3. 如果请求数据同步失败的结点会返回错误信息,但一定不会返回旧数据

2、A-Availability

可用性指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。

追求==弱一致性==(不要求即时同步)和==最终一致性==(最后结果要求一致)

  • ==在上述场景中,可用性要求实现以下目标==
  1. 从数据库接收到数据查询请求的请求则立即能够响应数据查询结果
  2. 从数据库不允许出现响应超时或者响应错误
  • ==如何实现==
  1. 写入主数据库后要将数据同步到从数据库
  2. 由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定
  3. 即使数据还未同步过来,从数据库也要返回待查询数据,哪怕是旧数据,如果没有旧数据,那么可以按照约定返回一个默认值,但不能返回错误或者响应超时
  • 特点

任何请求都有响应,且不会出现响应超时或响应错误

3、P-Partition tolerance

通常分布式系统的各结点都部署在不同的子网,也就是网络分区,不可避免地会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这就叫==分区容忍性==

  • 上述场景中,商品服务读写满足以下目标:
  1. 主数据库向从数据库同步数据失败不影响读写操作。
  2. 其中一个结点挂掉不影响另一个结点对外提供服务。
  • 如何实现
  1. 尽量用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据库,这样结点间可以有效的实现松耦合
  2. 添加从数据库结点,其中一个结点挂掉时,可以由其他结点对外提供服务。
  • 特点

分区容忍性是分布式系统具备的基本能力

4、CAP 组合方式

由于分布式系统默认具备分区容忍性,因此一致性与可用性不能共存,在满足分区容忍性的前提下,一致性与可用性存在矛盾,无法共存!

  • 一般来说,放弃一致性,追求分区容忍性和可用性是很多分布式系统的选择。(AP)

1.3、分布式事务基础理论 – BASE 理论

1、理解强一致性和最终一致性

CAP 理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。

==其中 AP 在实际应用中较多,AP 即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性==。

比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,==这种一致性和 CAP 中的一致性不同,CAP 中的一致性要求在任何时间查询每个结点数据都必须一致==,它强调的是==强一致性==,但是最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。

2、BASE 理论介绍

BASE是 BasicallyAvailable (基本可用)、Softstate (软状态)和Eventuallyconsistent (最终一致性)三个短语的缩写。

==BASE理论是对CAP中AP的一个扩展==,通过牺牲强一致性来获得可用性,当==出现故障允许部分不可用但要保证核心功能可用==,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

1.4、分布式事务解决方案

  • XA
  • TCC
  • 可靠消息最终一致
  • TA

1.5、分阶段提交

1、DTP 和 XA

分布式事务解决手段之一,就是两阶段提交协议(2PC:Two-Phase Commit),1994 年,X/OPEN 组织定义了分布式事务处理的==DTP==模型。该模型定义了这样几个角色

  • 应用程序(AP):我们的微服务模块
  • 事务管理器(TM):全局事务管理者
  • 资源管理器(RM):一般是数据库
  • 通信资源管理器(CRM):是 TM 和 RM 间的通信中间件

在该模型中,一个分布式事务(全局事务)可以被拆分成许多个本地事务,运行在不同的AP和RM上。每个本地事务的 ACID 很好实现,但是全局事务必须保证其中包含的每一个本地事务都能同时成功,==若有一个本地事务失败,则所有其它事务都必须回滚。==但问题是,本地事务处理过程中,井不知道其它事务的运行状态。因此,就需要通过CRM来通知各个本地事务,同步事务执行的状态

因此,各个本地事务的通信必须有统一的标准,否则不同数据库之间就无法进行通信。XA 就是 DTP 模型中通信中间件与 TM 间联系的==接口规范==,定义了用于通知事务开始、提交、终止、回滚等接口,各个数据库产商都必须实现这个接口。

2、二阶段提交

  • ==二阶段提交协议==将全局事务拆分为两个阶段来执行:
  1. 阶段一:准备阶段,各个本地事务完成本地事务的准备工作

  2. 阶段二:执行阶段,各个本地事务根据上一阶段执行结果,进行提交或者回滚。

这个过程需要一个协调者,还有事务的参与者

  • 正常情况(以仓储服务、订单服务和支付服务为例说明)
  1. 进入准备阶段,协调者(通信中间件)提示事务参与者开始事务
  2. 仓储服务减少库存,并表示自己的代码执行成功(不提交事务)
  3. 订单服务创建订单,并表示自己的代码执行成功(不执行事务)
  4. 支付服务调用支付接口,用户付款成功,此时支付服务表示自己的代码执行成功(不执行事务)
  5. 进入提交阶段,由于三个事务的参与者的代码均指向成功,所以协调者令三个服务提交自己的事务,此时全局事务执行成功

image-20210514205157105

  • 全局事务情况
  1. 进入准备阶段,协调者(通信中间件)提示事务参与者开始事务
  2. 仓储服务减少库存,并表示自己的代码执行成功(不提交事务)
  3. 订单服务创建订单,并表示自己的代码执行成功(不执行事务)
  4. 支付服务调用支付接口,用户付款失败,此时支付服务告知自己的代码执行失败(不提交事务)
  5. 进入提交阶段,虽然前两个微服务的代码执行成功,但支付服务中用户支付失败,所以协调者令三个服务均丢弃自己的事务,进行回滚操作,此时全局事务回滚成功

协调者如果发现有一个或者多个事务参与者的返回结果是 disagree ,那么就认为事务执行失败,此时会让各个事务参与者回滚事务

image-20210514205401600

  • 二阶段提交的问题
  1. 单点故障问题

如果在提交阶段时协调者挂了,那么此时事务参与者不知道其他事务是否成功。

  1. 阻塞问题

在准备阶段、提交阶段中,每个事务参与者都会锁定自己的资源,并等待其他事务的执行结果,如果阻塞时间较长,资源锁定时间太久,那么执行效率就会降低。

1.6、TCC

TCC 可以解决二阶段提交中的资源锁定和阻塞问题,减少资源锁定时间。

1、基本原理

本质上是一种补偿思路,事务运行过程包括三个方法

  • Try:资源的检测和预留
  • Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功。
  • Cancel:预留资源释放

执行分两个阶段:

  • 准备阶段(Try):资源的检测和预留
  • 执行阶段(Confirm / Cancel):根据上一步结果,执行下面的方法,如果上一步中所有事务参与者都成功,那么这里执行 Confirm ,否则执行 Cancel 。

image-20210514210836280

2、优势

TCC 执行的每一个阶段都会执行本地事务并释放锁,并不需要等待其他事务的执行结果。而如果其他事务执行失败,最后不是回滚,而是进行补偿操作。这样就避免了资源的长期锁定和阻塞等待,执行效率比较高,属于性能比较好的分布式事务方式。

3、劣势

  • 代码侵入

需要人为编写代码实现 Try 、 Confirm 和 Cancel,代码侵入较多

  • 开发成本高

一个业务需要拆分为 3 个步骤,分别编写业务实现,业务编写较复杂

  • 安全性考虑

如果 Cancel 执行失败,那么资源可能无法释放,需要引入重试机制,而重试可能导致重复执行,此时需要保证重试的幂等性。

1.7、可靠消息服务

基本设计思想是将远程分布式事务拆分为一系列本地事务。

1、基本原理

一般分为事务的发起者 A 和事务的其他参与者 B :

  • 事务发起者 A 执行本地事务
  • 事务发起者 A 通过 MQ 将需要执行的事务信息发送给事务参与者 B
  • 事务参与者 B 接收到消息后执行本地事务

image-20210514213129584

2、注意事项

  • 事务发起者 A 必须保证本地事务成功后,消息一定发送成功
  • MQ 必须保证消息正确投递和持久化保存
  • 事务参与者 B 必须保证消息最终一定能消费,如果失败需要多次重试
  • 事务 B 执行失败,会重试,但不会导致事务 A 回滚

1.8、AT

AT 模式是 Seata 开源的一种无侵入的分布式事务解决方案。

可以看为对 TCC 或者二阶段提交模型的一种优化,解决了 TCC 中的代码侵入,编码复杂等问题。

在 AT 模式下,用户只需要关注自己 ==业务的 SQL== ,用户的 ==业务 SQL== 作为一阶段, Seata 框架会自动生产事务的二阶段提交和回滚操作。

1、基本原理

先看一张流程图

image-20210514220436394

与 TCC 的执行流程相似,都是分两个阶段,不同的是第二阶段的提交回滚代码不需要我们手写,而是由 Seata 自动帮我们完成。也就是说,==我们写的代码与本地事务代码一样,无需手动处理分布式事务==

2、底层原理

  • 一阶段

在一阶段,Seata 会拦截 ==业务 SQL== ,首先解析 SQL 语义,找到 ==业务 SQL== 要更新的业务数据,在业务数据被更新前,将其保存为 before image ,然后执行 ==业务 SQL== 更新业务数据,在业务数据更新之后,再将其保存为 after image ,最后执行全局行锁,提交事务,由于以上操作全部在一个数据库事务内完成,所以保证了一阶段操作的原子性。

这里的 before imageafter image 类似于数据库中的 undo 和 redo 日志,但其实是用数据库模拟的。

image-20210514221916691

  • 二阶段提交

如果二阶段执行提交操作,那么由于 ==业务 SQL== 在一阶段已经提交到了数据库,所以 Seata 框架只需要将一阶段保存的快照数据和行锁删除,完成数据清理即可。

  • 二阶段回滚

如果二阶段执行的是回滚操作,那么 Seata 需要回滚一阶段已经执行的 ==业务 SQL== ,还原业务数据。

回滚方式是使用 before image 还原业务数据;但在还原前需要检验数据脏写,对比数据库当前业务数据和 after image 中的业务数据,如果两份数据完全一致,证明没有脏写,可以还原业务数据,如果不一致证明存在==脏写==,此时需要人工介入

image-20210514222742794

3、AT 概念说明

  • TC(Transaction Coordinator)

==事务协调者==,负责维护全局和分支事务的状态,驱动全局事务提交或回滚(TM之间的协调者)

  • TM(Transaction Manager)

==事务管理器==,定义全局事务的范围,开启全局事务、提交或回滚全局事务

  • RM(Resource Manager)

==资源管理器==,管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动事务提交或回滚。

4、AT 模式使用前提

  • 基于支持本地 ACID 事务的关系型数据库。
  • Java 应用,通过 JDBC 访问数据库。

二、Seata

2.1、Seata 简介

1、是什么?

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

如下图所示,Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

image-20210514225602184

2、Seata 支持的事务模型

Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。其中最常用的是 AT 模式

image-20210514225643280

2.2、分布式事务处理过程

1、分布式处理过程的一 ID + 三组件模型

  • ==Transaction ID== :XID

全局唯一的事务 ID

  • 三组件:即上述 AT 模式提到的三个组件
  1. TC(Transaction Coordinator)

==事务协调者==,负责维护全局和分支事务的状态,驱动全局事务提交或回滚(TM之间的协调者)

  1. TM(Transaction Manager)

==事务管理器==,定义全局事务的范围,开启全局事务、提交或回滚全局事务

  1. RM(Resource Manager)

==资源管理器==,管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

2、处理过程

  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的事务ID,即 XID
  • XID 在微服务调用链路的上下文中传播;
  • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
  • TM 向 TC 发射针对 XID 的全局提交或回滚决议。
  • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求

2.3、TC 协调者搭建

TC 即 seata-server

1、解压 seata-server-1.3.0.tar.gz

1
tar -zxvf seata-server-1.3.0.tar.gz

image-20210516120748369

2、修改 file.conf

修改存储模式为数据库存储 DB ,同时在阿里云数据库中新建一个 seata 数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
store {
## store mode: file、db、redis
mode = "db"
## database store property
db {
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://120.78.198.32:3306/seata"
user = "mysql"
password = "mysql"
minConn = 5
maxConn = 30
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
...
}

3、修改 registry.conf

指定注册中心为 nacos

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"

nacos {
application = "seata-server"
serverAddr = "120.78.198.32:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
...
}

4、在 seata 数据库中创建三张表

分别为 global_tablebranch_tablelock_table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- the table to store GlobalSession data
drop table if exists `global_table`;
create table `global_table` (
`xid` varchar(128) not null,
`transaction_id` bigint,
`status` tinyint not null,
`application_id` varchar(32),
`transaction_service_group` varchar(32),
`transaction_name` varchar(128),
`timeout` int,
`begin_time` bigint,
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`xid`),
key `idx_gmt_modified_status` (`gmt_modified`, `status`),
key `idx_transaction_id` (`transaction_id`)
);

-- the table to store BranchSession data
drop table if exists `branch_table`;
create table `branch_table` (
`branch_id` bigint not null,
`xid` varchar(128) not null,
`transaction_id` bigint ,
`resource_group_id` varchar(32),
`resource_id` varchar(256) ,
`lock_key` varchar(128) ,
`branch_type` varchar(8) ,
`status` tinyint,
`client_id` varchar(64),
`application_data` varchar(2000),
`gmt_create` datetime,
`gmt_modified` datetime,
primary key (`branch_id`),
key `idx_xid` (`xid`)
);

-- the table to store lock data
drop table if exists `lock_table`;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(36) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);

5、后台启动 seata-server

切换到 seata/bin 目录下,使用以下命令后台启动

1
./seata-server.sh -p 8091 -h ip -m db 

可以看到 seata 启动成功,端口为 8091

image-20210516130737643

此时可以在 nacos 服务列表中看到 seata-server

image-20210516130504961

6、使用 nacos 作为 seata 的配置中心

修改 registry.conf ,将原来的 file 改为 nacos ,同时在 nacos 配置块中添加配置

1
2
3
4
5
6
7
8
9
10
11
12
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"

nacos {
serverAddr = "120.78.198.32:8848"
namespace = "3985d454-42f8-4f3f-9b35-bb336253f4da"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
}

image-20210516131624758

7、下载 nacos-config 脚本和 config.txt

  • config.txt

放在 seata 目录下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=120.78.198.32:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://120.78.198.32:3306/seata?useUnicode=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=120.78.198.32
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
  • nacos-config.sh

seata 目录下创建一个 script 目录,将该脚本放入其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done

urlencode() {
for ((i=0; i < ${#1}; i++))
do
char="${1:$i:1}"
case $char in
[a-zA-Z0-9.~_-]) printf $char ;;
*) printf '%%%02X' "'$char" ;;
esac
done
}

if [[ -z ${host} ]]; then
host=120.78.198.32
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi
if [[ -z ${username} ]]; then
username=""
fi
if [[ -z ${password} ]]; then
password=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}

count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi

8、执行 nacos-config.sh 脚本

  • 切换到 seata/script 目录,然后使用以下命令执行脚本
1
2
3
$ cd script
# -h 主机,你可以使用localhost,-p 端口号 你可以使用8848,-t 命名空间ID,-u 用户名,-p 密码
$ sh nacos-config.sh -h 120.78.198.32 -p 8848 -g SEATA_GROUP -t 3985d454-42f8-4f3f-9b35-bb336253f4da -u nacos -w nacos

image-20210516135427006

  • 此时查看 nacos 配置,可以看到 seata 命名空间中出现配置

image-20210516140213444

2.4、RM 代码实现

1、创建订单和库存服务的数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 库存服务DB执行
CREATE TABLE `tab_storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`total` int(11) DEFAULT NULL COMMENT '总库存',
`used` int(11) DEFAULT NULL COMMENT '已用库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
INSERT INTO `tab_storage` (`product_id`, `total`,`used`)VALUES ('1', '96', '4');
INSERT INTO `tab_storage` (`product_id`, `total`,`used`)VALUES ('2', '100','0');

-- 订单服务DB执行
CREATE TABLE `tab_order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11,0) DEFAULT NULL COMMENT '金额',
`status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完成',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2、在各业务数据库中加入 undo_log

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3、创建 seata 父工程,添加版本管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<properties>
<mysql.version>8.0.19</mysql.version>
<springboot.version>2.3.2.RELEASE</springboot.version>
<java.version>1.8</java.version>
<mybatis-plus.version>3.4.1</mybatis-plus.version>
<seata.version>1.3.0</seata.version>
</properties>

<dependencyManagement>
<dependencies>
<!--SpringCloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR9</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!--Spring Alibaba Cloud-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

4、创建 order-service 子工程

  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--nacos注册中心和配置中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--移除掉该starter中自带的依赖,该依赖版本较低-->
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--单独添加seata 1.3.0的依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>

<!--openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
<version>10.2.3</version>
</dependency>
</dependencies>
  • 配置文件

bootstrap.properties ,这个配置文件用于指定 nacos 地址

1
spring.cloud.nacos.server-addr=120.78.198.32:8848

bootstrap.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
server:
port: 8010
mybatis-plus:
mapper-locations: classpath:com/hzx/seata/order/mapper/xml/*.xml
spring:
application:
name: order-service
cloud:
nacos:
discovery:
server-addr: 120.78.198.32:8848
register-enabled: true
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
config:
server-addr: 120.78.198.32:8848
enabled: true
file-extension: yaml
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: username
password: password
url: jdbc:mysql://120.78.198.32:3306/seata_order?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
seata:
enabled: true
application-id: order-service
# 事务群组(可以每个应用独立取名,也可以使用相同的名字),要与服务端nacos-config.txt中service.vgroup_mapping的后缀对应
tx-service-group: my_test_tx_group
config:
type: nacos
nacos:
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
serverAddr: 120.78.198.32:8848
# 需要server端(registry和config)、nacos配置client端(registry和config)保持一致
group: SEATA_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
# 需要和server端保持一致,即server在nacos中的名称,默认为seata-server
application: seata-server
server-addr: 120.78.198.32:8848
group: SEATA_GROUP
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
username: nacos
password: nacos
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
@Data
@Accessors(chain = true)
public class Order {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
private Long userId;
private Long productId;
private int count;
private BigDecimal money;
private int status;
}
  • OrderMapper
1
2
3
@Repository
public interface OrderMapper extends BaseMapper<Order> {
}
  • OrderServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Autowired
private OrderMapper orderMapper;


@Override
@Transactional
public Boolean create(Order order) {
log.info("------------------>开始创建订单");
int result = orderMapper.insert(order);
log.info("------------------>创建订单结束");
return result > 0;
}
}
  • OrderController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("order")
public class OrderController {
@Autowired
private OrderService orderService;

@GetMapping("create")
public Boolean create(Long userId, Long productId) {
Order order = new Order();
order.setCount(1)
.setMoney(BigDecimal.valueOf(88))
.setProductId(productId)
.setStatus(0)
.setUserId(userId);
return orderService.create(order);
}
}
  • 测试

启动后,访问 http://localhost:8010/order/create ,可以看到数据库中多了一条数据,此时 order-service 搭建成功

  • 依葫芦画瓢构建出第二个 RM 模块,即 storage-service

2.5、TM 代码实现

一般来说, TM 也是一个微服务,这个微服务需要调用其他微服务接口完成业务开发。

1、创建 business-service 微服务,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--nacos注册中心和配置中心-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--移除掉该starter中自带的依赖,该依赖版本较低-->
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--单独添加seata 1.3.0的依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>

<!--openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
<version>10.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

2、编写配置文件

需要添加 OpenFeign 相关的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
server:
port: 8000
mybatis-plus:
mapper-locations: classpath:com/hzx/seata/business/mapper/xml/*.xml
spring:
application:
name: business-service
cloud:
nacos:
discovery:
server-addr: 120.78.198.32:8848
register-enabled: true
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
server-addr: 120.78.198.32:8848
seata:
enabled: true
application-id: business-service
# 事务群组(可以每个应用独立取名,也可以使用相同的名字),要与服务端nacos-config.txt中service.vgroup_mapping的后缀对应
tx-service-group: my_test_tx_group
config:
type: nacos
nacos:
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
serverAddr: 120.78.198.32:8848
# 需要server端(registry和config)、nacos配置client端(registry和config)保持一致
group: SEATA_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
# 需要和server端保持一致,即server在nacos中的名称,默认为seata-server
application: seata-server
server-addr: 120.78.198.32:8848
group: SEATA_GROUP
namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da
username: nacos
password: nacos
# feign组件超时设置,用于查看seata数据库中的临时数据内容
feign:
client:
config:
default:
connect-timeout: 30000
read-timeout: 30000

3、编写主启动类

1
2
3
4
5
6
7
8
9
10
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan("com.hzx.seata.business.mapper")
@SpringBootApplication(scanBasePackages = "com.hzx.seata.business")
public class SeataBusinessServiceApplication {

public static void main(String[] args) {
SpringApplication.run(SeataBusinessServiceApplication.class, args);
}
}

4、编写 OpenFeign 远程调用接口

用于调用 order-servicestorage-service 微服务的接口。

当使用 Feign 时,如果发送的是 get 请求,那么需要在请求参数前加上 @RequestParam 注解修饰,Controller里面可以不加该注解修饰。

  • OrderClient

这里必须使用 @RequestParam 注解

1
2
3
4
5
6
@Service
@FeignClient("order-service")
public interface OrderClient {
@GetMapping("order/create")
Boolean create(@RequestParam("userId") Long userId,@RequestParam("productId") Long productId);
}
  • StorageClient
1
2
3
4
5
6
@Service
@FeignClient("storage-service")
public interface StorageClient {
@GetMapping("storage/change")
Boolean changeStorage(@RequestParam("productId")Long productId, @RequestParam("used")Integer used);
}

5、编写 business-service 中的业务方法

  • BusinessController
1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("business")
public class BusinessController {
@Autowired
private BusinessService businessService;

@GetMapping("buy")
public String buy(Long userId, Long productId) {
businessService.buy(userId, productId);
return "ok!";
}
}
  • BusinessServiceImpl

这个方法用于开启全局事务,使用 @GlobalTransactional 注解,这个注解由 seata 提供

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private OrderClient orderClient;

@Autowired
private StorageClient storageClient;

@Override
@GlobalTransactional
public void buy(Long userId, Long productId) {
log.info("--------------->buy方法开始<---------------");
log.info("--------------->创建订单:开始<---------------");
orderClient.create(userId, productId);
log.info("--------------->创建订单:结束<---------------");
log.info("--------------->修改库存:开始<---------------");
storageClient.changeStorage(productId, 10);
log.info("--------------->修改库存:结束<---------------");
log.info("--------------->buy方法结束<---------------");
}
}

6、测试

  • 将订单数据库原有的数据删去,并将库存数据库中的总额全部改为100,已使用额置为0

image-20210516182059914

image-20210516182417695

  • 查看数据库,此时可以看到订单数据库中出现一条新纪录,且库存数据库中的库存发生变化

image-20210516182713891

7、查看全局事务回滚是否生效

  • storage-service 微服务中模拟一个异常

StorageServiceImpl

1
2
3
4
5
6
7
8
@Override
@Transactional
public Boolean updateStorage(Long productId, Integer used) {
log.info("---------------------------->模拟异常,检测全局事务回滚是否生效<----------------------------");
int i = 100 / 0;
int result = storageMapper.updateStorage(productId, used);
return result > 0;
}
  • 如果全局事务不生效,那么由于先调用 order-service 微服务中的 create 接口后调用 storage-service 微服务的 update 接口,所以在 order 数据库中势必会多一条脏数据,如果全局事务回滚生效,那么 order 数据库中不会多出脏数据。

查看 order 数据库中现有的数据

image-20210516183453979

重启微服务,继续访问 http://localhost:8000/business/buy?userId=1&productId=1 ,查看结果

image-20210516183548034

可以看到上面的接口访问出错,我们现在看一下 order 数据库中的数据,可以看到仍然没有脏数据产生。

image-20210516183626546

  • 如果删除 business 微服务中的 @GlobalTransaction 注解,那么全局事务不生效,在 order 数据库中会多出一条脏数据,同时库存数据没有更新。

2.6、AT 模式原理解析

1、相关表解析

  • global_table全局事务,每当有一个全局事务发起后,就会在该表中记录全局事务的 ID

  • branch_table分支事务,记录每一个分支事务的 ID,分支事务操作的哪个数据库等信息

  • lock_table全局锁

2、AT 如何做到对业务的无侵入?

由于二阶段原理已经在上面提及,所以这里不做赘述,单独根据此演示对一阶段进行分析

  1. TM

business-service.buy(Long, Long) 方法执行时,由于此接口的实现方法中有 @GlobalTransactional 注解,所以该 TM 会向 TC 发起全局事务,生成全局事务 ID(即 XID

  1. RM:order-service.create(Long, Long)

order 表中写入数据,在 Order 数据库中的 UNDO_LOG 记录回滚日志(Branch ID),通知 TC 操作结果

  1. RM:storage-service.update(Long, Long)

storage 表中写入数据,在 Storage 数据库中的 UNDO_LOG 记录回滚日志(Branch ID),通知 TC 操作结果

  • RM 向表进行写操作的过程中,Seata 先是会拦截业务 SQL,解析 SQL 语义,在业务数据被更新前,将其保存为 before image ,然后执行业务 SQL ,在业务数据更新之后,再将其保存为 after image ,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

image-20210516185240159

3、UNDO_LOG 中的前置镜像 (before image) 和后置镜像 (after image)

  • 保存在 UNDO_LOG 中的 rollback_info

这里以 order 数据库中的 undo_log 为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.2.196:8091:104983180048351232",
"branchId": 104983197731536896,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "tab_order",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "tab_order",
"rows": ["java.util.ArrayList", []]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "tab_order",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": -5,
"value": ["java.lang.Long", 18]
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "user_id",
"keyType": "NULL",
"type": -5,
"value": ["java.lang.Long", 1]
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "product_id",
"keyType": "NULL",
"type": -5,
"value": ["java.lang.Long", 1]
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": null
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "money",
"keyType": "NULL",
"type": 3,
"value": ["java.math.BigDecimal", 88]
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "status",
"keyType": "NULL",
"type": 4,
"value": null
}]]
}]]
}
}]]
}
  • 参数说明

image-20210516191421061

  • 事务回滚时,使用分支ID和全局事务ID进行回滚