一、分布式事务 1.1、分布式事务问题 ==分布式系统环境下由不同的服务之间通过网络远程协作完成事务称为分布式事务。==
单体应用被拆分为微服务应用,原来的一个程序被拆分为多个完全独立的应用,这几个应用连接多个独立的数据源
以下单支付这个操作为例,如果仓储服务、订单服务、账户服务运行在三个独立的微服务中,且这三个独立的微服务各自连接不同的数据源。假设用户现在下单某件商品.
仓储服务:对指定商品扣除仓储数量 订单服务:根据用户需求创建订单 账户服务:从用户余额中抽除金额 在上面的场景中,每个微服务内部数据的一致性可以通过 ==本地== 事务保证 ,但是全局数据的一致性无法保证 。
==在一次业务操作中,如果需要跨 多个数据源 或跨 多个系统 进行远程调用,那么就会产生分布式事务问题。==
Seata
用于保证全局数据一致性。
1.2、分布式事务基础理论 – CAP 0、场景模拟 如下图,是商品信息管理的执行流程
整体执行流程如下:
商品服务请求主数据库写入商品信息(添加、删除、修改商品) 主数据库向商品服务响应写操作成功 商品服务请求从数据库读取商品信息 1、什么是 CAP ? CAP原则又称CAP定理,指的是在一个分布式系统中,【一致性】(Consistency)、【可用性】(Availability)、【分区容错性】(Partition tolerance)。
CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
2、C-Consistency 一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上时,从任意节点读取到的数据都是最新的状态。
==在上面的场景中,商品信息的读写要满足一致性就是要实现如下目标:== 商品服务写入主数据库成功,则向从服务器查询新数据时也成功。
商品服务写入主数据库失败,则向从服务器查询新数据时也失败。
向主数据库进行写操作后要将数据同步到从数据库。 写入主数据库后,在向从数据库同步期间要将从数据库锁定,等待同步完成后再释放锁,以免在新数据库写入成功后,向从数据库查询到旧的数据。 由于存在数据同步的过程,所以写操作的响应会有一定延迟
为了保证数据一致性,所以会对资源暂时进行锁定,数据同步完成后释放锁定资源
如果请求数据同步失败的结点会返回错误信息,但一定不会返回旧数据
2、A-Availability 可用性指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。
追求==弱一致性==(不要求即时同步)和==最终一致性==(最后结果要求一致)
从数据库接收到数据查询请求的请求则立即能够响应数据查询结果 从数据库不允许出现响应超时或者响应错误 写入主数据库后要将数据同步到从数据库 由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定 即使数据还未同步过来,从数据库也要返回待查询数据,哪怕是旧数据,如果没有旧数据,那么可以按照约定返回一个默认值,但不能返回错误或者响应超时 任何请求都有响应,且不会出现响应超时或响应错误
3、P-Partition tolerance 通常分布式系统的各结点都部署在不同的子网,也就是网络分区,不可避免地会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这就叫==分区容忍性==
主数据库向从数据库同步数据失败不影响读写操作。 其中一个结点挂掉不影响另一个结点对外提供服务。 尽量用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据库,这样结点间可以有效的实现松耦合 添加从数据库结点,其中一个结点挂掉时,可以由其他结点对外提供服务。 分区容忍性是分布式系统具备的基本能力
4、CAP 组合方式 由于分布式系统默认具备分区容忍性,因此一致性与可用性不能共存,在满足分区容忍性的前提下,一致性与可用性存在矛盾,无法共存!
一般来说,放弃一致性,追求分区容忍性和可用性是很多分布式系统的选择。(AP) 1.3、分布式事务基础理论 – BASE 理论 1、理解强一致性和最终一致性 CAP
理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency
)、可用性(Availability
)和分区容忍性(Partition tolerance
)这三项中的两项。
==其中 AP 在实际应用中较多,AP 即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性==。
比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,==这种一致性和 CAP 中的一致性不同,CAP 中的一致性要求在任何时间查询每个结点数据都必须一致==,它强调的是==强一致性==,但是最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。
2、BASE 理论介绍 BASE是 BasicallyAvailable
(基本可用)、Softstate
(软状态)和Eventuallyconsistent
(最终一致性)三个短语的缩写。
==BASE理论是对CAP中AP的一个扩展==,通过牺牲强一致性来获得可用性 ,当==出现故障允许部分不可用但要保证核心功能可用==,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。
1.4、分布式事务解决方案 1.5、分阶段提交 1、DTP 和 XA 分布式事务解决手段之一,就是两阶段提交协议(2PC:Two-Phase Commit),1994 年,X/OPEN 组织定义了分布式事务处理的==DTP==模型。该模型定义了这样几个角色
应用程序(AP):我们的微服务模块 事务管理器(TM):全局事务管理者 资源管理器(RM):一般是数据库 通信资源管理器(CRM):是 TM 和 RM 间的通信中间件 在该模型中,一个分布式事务(全局事务)可以被拆分成许多个本地事务,运行在不同的AP和RM上。每个本地事务的 ACID 很好实现,但是全局事务必须保证其中包含的每一个本地事务都能同时成功,==若有一个本地事务失败,则所有其它事务都必须回滚。==但问题是,本地事务处理过程中,井不知道其它事务的运行状态。因此,就需要通过CRM来通知各个本地事务,同步事务执行的状态 。
因此,各个本地事务的通信必须有统一的标准,否则不同数据库之间就无法进行通信。XA 就是 DTP 模型中通信中间件与 TM 间联系的==接口规范==,定义了用于通知事务开始、提交、终止、回滚等接口,各个数据库产商都必须实现这个接口。
2、二阶段提交 ==二阶段提交协议==将全局事务拆分为两个阶段来执行: 阶段一:准备阶段 ,各个本地事务完成本地事务的准备工作
阶段二:执行阶段 ,各个本地事务根据上一阶段执行结果,进行提交或者回滚。
这个过程需要一个协调者,还有事务的参与者
正常情况(以仓储服务、订单服务和支付服务为例说明) 进入准备阶段,协调者(通信中间件)提示事务参与者开始事务 仓储服务减少库存,并表示自己的代码执行成功(不提交事务) 订单服务创建订单,并表示自己的代码执行成功(不执行事务) 支付服务调用支付接口,用户付款成功,此时支付服务表示自己的代码执行成功(不执行事务) 进入提交阶段,由于三个事务的参与者的代码均指向成功,所以协调者令三个服务提交自己的事务,此时全局事务执行成功
进入准备阶段,协调者(通信中间件)提示事务参与者开始事务 仓储服务减少库存,并表示自己的代码执行成功(不提交事务) 订单服务创建订单,并表示自己的代码执行成功(不执行事务) 支付服务调用支付接口,用户付款失败,此时支付服务告知自己的代码执行失败(不提交事务) 进入提交阶段,虽然前两个微服务的代码执行成功,但支付服务中用户支付失败,所以协调者令三个服务均丢弃自己的事务,进行回滚操作,此时全局事务回滚成功 协调者如果发现有一个或者多个事务参与者的返回结果是 disagree
,那么就认为事务执行失败,此时会让各个事务参与者回滚事务
单点故障问题 如果在提交阶段时协调者挂了,那么此时事务参与者不知道其他事务是否成功。
阻塞问题 在准备阶段、提交阶段中,每个事务参与者都会锁定自己的资源,并等待其他事务的执行结果,如果阻塞时间较长,资源锁定时间太久,那么执行效率就会降低。
1.6、TCC TCC 可以解决二阶段提交中的资源锁定和阻塞问题,减少资源锁定时间。
1、基本原理 本质上是一种补偿思路,事务运行过程包括三个方法
Try:资源的检测和预留 Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功。 Cancel:预留资源释放 执行分两个阶段:
准备阶段(Try):资源的检测和预留 执行阶段(Confirm / Cancel):根据上一步结果,执行下面的方法,如果上一步中所有事务参与者都成功,那么这里执行 Confirm ,否则执行 Cancel 。
2、优势 TCC 执行的每一个阶段都会执行本地事务并释放锁,并不需要等待其他事务的执行结果。而如果其他事务执行失败,最后不是回滚,而是进行补偿操作。这样就避免了资源的长期锁定和阻塞等待,执行效率比较高,属于性能比较好的分布式事务方式。
3、劣势 需要人为编写代码实现 Try 、 Confirm 和 Cancel,代码侵入较多
一个业务需要拆分为 3 个步骤,分别编写业务实现,业务编写较复杂
如果 Cancel 执行失败,那么资源可能无法释放,需要引入重试机制,而重试可能导致重复执行,此时需要保证重试的幂等性。
1.7、可靠消息服务 基本设计思想是将远程分布式事务拆分为一系列本地事务。
1、基本原理 一般分为事务的发起者 A 和事务的其他参与者 B :
事务发起者 A 执行本地事务 事务发起者 A 通过 MQ 将需要执行的事务信息发送给事务参与者 B 事务参与者 B 接收到消息后执行本地事务
2、注意事项 事务发起者 A 必须保证本地事务成功后,消息一定发送成功 MQ 必须保证消息正确投递和持久化保存 事务参与者 B 必须保证消息最终一定能消费,如果失败需要多次重试 事务 B 执行失败,会重试,但不会导致事务 A 回滚 1.8、AT AT 模式是 Seata
开源的一种无侵入的分布式事务解决方案。
可以看为对 TCC 或者二阶段提交模型的一种优化,解决了 TCC 中的代码侵入,编码复杂等问题。
在 AT 模式下,用户只需要关注自己 ==业务的 SQL== ,用户的 ==业务 SQL== 作为一阶段, Seata
框架会自动生产事务的二阶段提交和回滚操作。
1、基本原理 先看一张流程图
与 TCC 的执行流程相似,都是分两个阶段,不同的是第二阶段的提交回滚代码不需要我们手写,而是由 Seata
自动帮我们完成。也就是说,==我们写的代码与本地事务代码一样,无需手动处理分布式事务==
2、底层原理 在一阶段,Seata
会拦截 ==业务 SQL== ,首先解析 SQL 语义,找到 ==业务 SQL== 要更新的业务数据,在业务数据被更新前,将其保存为 before image
,然后执行 ==业务 SQL== 更新业务数据,在业务数据更新之后,再将其保存为 after image
,最后执行全局行锁,提交事务 ,由于以上操作全部在一个数据库事务内完成,所以保证了一阶段操作的原子性。
这里的 before image
和 after image
类似于数据库中的 undo 和 redo 日志,但其实是用数据库模拟的。
如果二阶段执行提交操作,那么由于 ==业务 SQL== 在一阶段已经提交到了数据库,所以 Seata
框架只需要将一阶段保存的快照数据和行锁删除,完成数据清理即可。
如果二阶段执行的是回滚操作,那么 Seata
需要回滚一阶段已经执行的 ==业务 SQL== ,还原业务数据。
回滚方式是使用 before image
还原业务数据;但在还原前需要检验数据脏写,对比数据库当前业务数据和 after image
中的业务数据,如果两份数据完全一致,证明没有脏写,可以还原业务数据,如果不一致证明存在==脏写==,此时需要人工介入
3、AT 概念说明 TC(Transaction Coordinator) ==事务协调者==,负责维护全局和分支事务的状态,驱动全局事务提交或回滚(TM之间的协调者)
==事务管理器==,定义全局事务的范围,开启全局事务、提交或回滚全局事务
==资源管理器==,管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动事务提交或回滚。
4、AT 模式使用前提 基于支持本地 ACID 事务的关系型数据库。 Java 应用,通过 JDBC 访问数据库。 二、Seata
2.1、Seata
简介 1、是什么? Seata
是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
如下图所示,Seata
中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata
的客户端与业务系统集成在一起,TC 作为 Seata
的服务端独立部署。
2、Seata
支持的事务模型 Seata
会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。其中最常用的是 AT
模式
2.2、分布式事务处理过程 1、分布式处理过程的一 ID + 三组件模型 全局唯一的事务 ID
TC(Transaction Coordinator) ==事务协调者==,负责维护全局和分支事务的状态,驱动全局事务提交或回滚(TM之间的协调者)
TM(Transaction Manager) ==事务管理器==,定义全局事务的范围,开启全局事务、提交或回滚全局事务
RM(Resource Manager) ==资源管理器==,管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
2、处理过程 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的事务ID,即 XID
XID
在微服务调用链路的上下文中传播;RM 向 TC 注册分支事务,将其纳入 XID
对应全局事务的管辖。 TM 向 TC 发射针对 XID
的全局提交或回滚决议。 TC 调度 XID
下管辖的全部分支事务完成提交或回滚请求 2.3、TC 协调者搭建 TC 即 seata-server
1、解压 seata-server-1.3.0.tar.gz
1 tar -zxvf seata-server-1.3.0.tar.gz
2、修改 file.conf
修改存储模式为数据库存储 DB ,同时在阿里云数据库中新建一个 seata
数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 store { ## store mode: file、db、redis mode = "db" ## database store property db { datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://120.78.198.32:3306/seata" user = "mysql" password = "mysql" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } ... }
3、修改 registry.conf
指定注册中心为 nacos
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" nacos { application = "seata-server" serverAddr = "120.78.198.32:8848" group = "SEATA_GROUP" namespace = "" cluster = "default" username = "nacos" password = "nacos" } ... }
4、在 seata
数据库中创建三张表 分别为 global_table
、 branch_table
和 lock_table
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 -- the table to store GlobalSession data drop table if exists `global_table`; create table `global_table` ( `xid` varchar(128) not null, `transaction_id` bigint, `status` tinyint not null, `application_id` varchar(32), `transaction_service_group` varchar(32), `transaction_name` varchar(128), `timeout` int, `begin_time` bigint, `application_data` varchar(2000), `gmt_create` datetime, `gmt_modified` datetime, primary key (`xid`), key `idx_gmt_modified_status` (`gmt_modified`, `status`), key `idx_transaction_id` (`transaction_id`) ); -- the table to store BranchSession data drop table if exists `branch_table`; create table `branch_table` ( `branch_id` bigint not null, `xid` varchar(128) not null, `transaction_id` bigint , `resource_group_id` varchar(32), `resource_id` varchar(256) , `lock_key` varchar(128) , `branch_type` varchar(8) , `status` tinyint, `client_id` varchar(64), `application_data` varchar(2000), `gmt_create` datetime, `gmt_modified` datetime, primary key (`branch_id`), key `idx_xid` (`xid`) ); -- the table to store lock data drop table if exists `lock_table`; create table `lock_table` ( `row_key` varchar(128) not null, `xid` varchar(96), `transaction_id` long , `branch_id` long, `resource_id` varchar(256) , `table_name` varchar(32) , `pk` varchar(36) , `gmt_create` datetime , `gmt_modified` datetime, primary key(`row_key`) );
5、后台启动 seata-server
切换到 seata/bin
目录下,使用以下命令后台启动
1 ./seata-server.sh -p 8091 -h ip -m db
可以看到 seata
启动成功,端口为 8091
此时可以在 nacos
服务列表中看到 seata-server
6、使用 nacos
作为 seata
的配置中心 修改 registry.conf
,将原来的 file 改为 nacos
,同时在 nacos
配置块中添加配置
1 2 3 4 5 6 7 8 9 10 11 12 config { # file、nacos 、apollo、zk、consul、etcd3 type = "nacos" nacos { serverAddr = "120.78.198.32:8848" namespace = "3985d454-42f8-4f3f-9b35-bb336253f4da" group = "SEATA_GROUP" username = "nacos" password = "nacos" } }
7、下载 nacos-config
脚本和 config.txt
放在 seata
目录下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 service.vgroupMapping.my_test_tx_group=default service.default.grouplist=120.78.198.32:8091 service.enableDegrade=false service.disableGlobalTransaction=false client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 store.mode=db store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://120.78.198.32:3306/seata?useUnicode=true store.db.user=root store.db.password=password store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 store.redis.host=120.78.198.32 store.redis.port=6379 store.redis.maxConn=10 store.redis.minConn=1 store.redis.database=0 store.redis.password=null store.redis.queryLimit=100 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.log.exceptionRate=100 transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
在 seata
目录下创建一个 script 目录,将该脚本放入其中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 #!/usr/bin/env bash while getopts ":h:p:g:t:u:w:" optdo case $opt in h) host=$OPTARG ;; p) port=$OPTARG ;; g) group=$OPTARG ;; t) tenant=$OPTARG ;; u) username=$OPTARG ;; w) password=$OPTARG ;; ?) echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] " exit 1 ;; esac done urlencode () { for ((i=0; i < ${#1} ; i++)) do char="${1:$i:1} " case $char in [a-zA-Z0-9.~_-]) printf $char ;; *) printf '%%%02X' "'$char " ;; esac done } if [[ -z ${host} ]]; then host=120.78.198.32 fi if [[ -z ${port} ]]; then port=8848 fi if [[ -z ${group} ]]; then group="SEATA_GROUP" fi if [[ -z ${tenant} ]]; then tenant="" fi if [[ -z ${username} ]]; then username="" fi if [[ -z ${password} ]]; then password="" fi nacosAddr=$host :$port contentType="content-type:application/json;charset=UTF-8" echo "set nacosAddr=$nacosAddr " echo "set group=$group " failCount=0 tempLog=$(mktemp -u) function addConfig () { curl -X POST -H "${contentType} " "http://$nacosAddr /nacos/v1/cs/configs?dataId=$(urlencode $1) &group=$group &content=$(urlencode $2) &tenant=$tenant &username=$username &password=$password " >"${tempLog} " 2>/dev/null if [[ -z $(cat "${tempLog} " ) ]]; then echo " Please check the cluster status. " exit 1 fi if [[ $(cat "${tempLog} " ) =~ "true" ]]; then echo "Set $1 =$2 successfully " else echo "Set $1 =$2 failure " (( failCount++ )) fi } count=0 for line in $(cat $(dirname "$PWD " )/config.txt | sed s/[[:space:]]//g); do (( count++ )) key=${line%%=*} value=${line#*=} addConfig "${key} " "${value} " done echo "=========================================================================" echo " Complete initialization parameters, total-count:$count , failure-count:$failCount " echo "=========================================================================" if [[ ${failCount} -eq 0 ]]; then echo " Init nacos config finished, please start seata-server. " else echo " init nacos config fail. " fi
8、执行 nacos-config.sh
脚本 切换到 seata/script
目录,然后使用以下命令执行脚本 1 2 3 $ cd script # -h 主机,你可以使用localhost,-p 端口号 你可以使用8848,-t 命名空间ID,-u 用户名,-p 密码 $ sh nacos-config.sh -h 120.78.198.32 -p 8848 -g SEATA_GROUP -t 3985d454-42f8-4f3f-9b35-bb336253f4da -u nacos -w nacos
此时查看 nacos
配置,可以看到 seata
命名空间中出现配置
2.4、RM
代码实现 1、创建订单和库存服务的数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 -- 库存服务DB执行 CREATE TABLE `tab_storage` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `product_id` bigint(11) DEFAULT NULL COMMENT '产品id', `total` int(11) DEFAULT NULL COMMENT '总库存', `used` int(11) DEFAULT NULL COMMENT '已用库存', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; INSERT INTO `tab_storage` (`product_id`, `total`,`used`)VALUES ('1', '96', '4'); INSERT INTO `tab_storage` (`product_id`, `total`,`used`)VALUES ('2', '100','0'); -- 订单服务DB执行 CREATE TABLE `tab_order` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `user_id` bigint(11) DEFAULT NULL COMMENT '用户id', `product_id` bigint(11) DEFAULT NULL COMMENT '产品id', `count` int(11) DEFAULT NULL COMMENT '数量', `money` decimal(11,0) DEFAULT NULL COMMENT '金额', `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完成', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2、在各业务数据库中加入 undo_log
表 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
3、创建 seata
父工程,添加版本管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 <parent > <artifactId > spring-boot-starter-parent</artifactId > <groupId > org.springframework.boot</groupId > <version > 2.3.2.RELEASE</version > <relativePath /> </parent > <properties > <mysql.version > 8.0.19</mysql.version > <springboot.version > 2.3.2.RELEASE</springboot.version > <java.version > 1.8</java.version > <mybatis-plus.version > 3.4.1</mybatis-plus.version > <seata.version > 1.3.0</seata.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > Hoxton.SR9</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-alibaba-dependencies</artifactId > <version > 2.2.1.RELEASE</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-boot-starter</artifactId > <version > ${mybatis-plus.version}</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > ${mysql.version}</version > </dependency > </dependencies > </dependencyManagement >
4、创建 order-service 子工程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > <exclusions > <exclusion > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.3.0</version > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > io.github.openfeign</groupId > <artifactId > feign-okhttp</artifactId > <version > 10.2.3</version > </dependency > </dependencies >
bootstrap.properties
,这个配置文件用于指定 nacos
地址
1 spring.cloud.nacos.server-addr =120.78.198.32:8848
bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 server: port: 8010 mybatis-plus: mapper-locations: classpath:com/hzx/seata/order/mapper/xml/*.xml spring: application: name: order-service cloud: nacos: discovery: server-addr: 120.78 .198 .32 :8848 register-enabled: true namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da config: server-addr: 120.78 .198 .32 :8848 enabled: true file-extension: yaml namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: username password: password url: jdbc:mysql://120.78.198.32:3306/seata_order?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true seata: enabled: true application-id: order-service tx-service-group: my_test_tx_group config: type: nacos nacos: namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da serverAddr: 120.78 .198 .32 :8848 group: SEATA_GROUP username: nacos password: nacos registry: type: nacos nacos: application: seata-server server-addr: 120.78 .198 .32 :8848 group: SEATA_GROUP namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da username: nacos password: nacos
1 2 3 4 5 6 7 8 9 10 11 @Data @Accessors(chain = true) public class Order { @TableId(value = "id", type = IdType.AUTO) private Long id; private Long userId; private Long productId; private int count; private BigDecimal money; private int status; }
1 2 3 @Repository public interface OrderMapper extends BaseMapper <Order > {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Slf4j @Service public class OrderServiceImpl extends ServiceImpl <OrderMapper , Order > implements OrderService { @Autowired private OrderMapper orderMapper; @Override @Transactional public Boolean create (Order order) { log.info("------------------>开始创建订单" ); int result = orderMapper.insert(order); log.info("------------------>创建订单结束" ); return result > 0 ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RestController @RequestMapping("order") public class OrderController { @Autowired private OrderService orderService; @GetMapping("create") public Boolean create (Long userId, Long productId) { Order order = new Order(); order.setCount(1 ) .setMoney(BigDecimal.valueOf(88 )) .setProductId(productId) .setStatus(0 ) .setUserId(userId); return orderService.create(order); } }
启动后,访问 http://localhost:8010/order/create ,可以看到数据库中多了一条数据,此时 order-service 搭建成功
依葫芦画瓢构建出第二个 RM
模块,即 storage-service
2.5、TM 代码实现 一般来说, TM 也是一个微服务,这个微服务需要调用其他微服务接口完成业务开发。
1、创建 business-service
微服务,引入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.baomidou</groupId > <artifactId > mybatis-plus-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > <exclusions > <exclusion > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.3.0</version > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > io.github.openfeign</groupId > <artifactId > feign-okhttp</artifactId > <version > 10.2.3</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
2、编写配置文件 需要添加 OpenFeign
相关的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 server: port: 8000 mybatis-plus: mapper-locations: classpath:com/hzx/seata/business/mapper/xml/*.xml spring: application: name: business-service cloud: nacos: discovery: server-addr: 120.78 .198 .32 :8848 register-enabled: true namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da server-addr: 120.78 .198 .32 :8848 seata: enabled: true application-id: business-service tx-service-group: my_test_tx_group config: type: nacos nacos: namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da serverAddr: 120.78 .198 .32 :8848 group: SEATA_GROUP username: nacos password: nacos registry: type: nacos nacos: application: seata-server server-addr: 120.78 .198 .32 :8848 group: SEATA_GROUP namespace: 3985d454-42f8-4f3f-9b35-bb336253f4da username: nacos password: nacos feign: client: config: default: connect-timeout: 30000 read-timeout: 30000
3、编写主启动类 1 2 3 4 5 6 7 8 9 10 @EnableFeignClients @EnableDiscoveryClient @MapperScan("com.hzx.seata.business.mapper") @SpringBootApplication(scanBasePackages = "com.hzx.seata.business") public class SeataBusinessServiceApplication { public static void main (String[] args) { SpringApplication.run(SeataBusinessServiceApplication.class, args); } }
4、编写 OpenFeign
远程调用接口 用于调用 order-service
和 storage-service
微服务的接口。
当使用 Feign
时,如果发送的是 get 请求,那么需要在请求参数前加上 @RequestParam
注解修饰,Controller里面可以不加该注解修饰。
这里必须使用 @RequestParam
注解
1 2 3 4 5 6 @Service @FeignClient("order-service") public interface OrderClient { @GetMapping("order/create") Boolean create (@RequestParam("userId") Long userId,@RequestParam("productId") Long productId) ; }
1 2 3 4 5 6 @Service @FeignClient("storage-service") public interface StorageClient { @GetMapping("storage/change") Boolean changeStorage (@RequestParam("productId") Long productId, @RequestParam("used") Integer used) ; }
5、编写 business-service
中的业务方法 1 2 3 4 5 6 7 8 9 10 11 12 @RestController @RequestMapping("business") public class BusinessController { @Autowired private BusinessService businessService; @GetMapping("buy") public String buy (Long userId, Long productId) { businessService.buy(userId, productId); return "ok!" ; } }
这个方法用于开启全局事务,使用 @GlobalTransactional
注解,这个注解由 seata
提供
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j @Service public class BusinessServiceImpl implements BusinessService { @Autowired private OrderClient orderClient; @Autowired private StorageClient storageClient; @Override @GlobalTransactional public void buy (Long userId, Long productId) { log.info("--------------->buy方法开始<---------------" ); log.info("--------------->创建订单:开始<---------------" ); orderClient.create(userId, productId); log.info("--------------->创建订单:结束<---------------" ); log.info("--------------->修改库存:开始<---------------" ); storageClient.changeStorage(productId, 10 ); log.info("--------------->修改库存:结束<---------------" ); log.info("--------------->buy方法结束<---------------" ); } }
6、测试 将订单数据库原有的数据删去,并将库存数据库中的总额全部改为100,已使用额置为0
查看数据库,此时可以看到订单数据库中出现一条新纪录,且库存数据库中的库存发生变化
7、查看全局事务回滚是否生效 在 storage-service
微服务中模拟一个异常 StorageServiceImpl
1 2 3 4 5 6 7 8 @Override @Transactional public Boolean updateStorage (Long productId, Integer used) { log.info("---------------------------->模拟异常,检测全局事务回滚是否生效<----------------------------" ); int i = 100 / 0 ; int result = storageMapper.updateStorage(productId, used); return result > 0 ; }
如果全局事务不生效,那么由于先调用 order-service
微服务中的 create
接口后调用 storage-service
微服务的 update
接口,所以在 order
数据库中势必会多一条脏数据,如果全局事务回滚生效,那么 order
数据库中不会多出脏数据。 查看 order 数据库中现有的数据
重启微服务,继续访问 http://localhost:8000/business/buy?userId=1&productId=1 ,查看结果
可以看到上面的接口访问出错,我们现在看一下 order 数据库中的数据,可以看到仍然没有脏数据产生。
如果删除 business
微服务中的 @GlobalTransaction
注解,那么全局事务不生效,在 order
数据库中会多出一条脏数据,同时库存数据没有更新。 2.6、AT 模式原理解析 1、相关表解析 2、AT 如何做到对业务的无侵入? 由于二阶段原理已经在上面提及,所以这里不做赘述,单独根据此演示对一阶段进行分析
TM business-service.buy(Long, Long)
方法执行时,由于此接口的实现方法中有 @GlobalTransactional
注解,所以该 TM 会向 TC 发起全局事务,生成全局事务 ID
(即 XID
)
RM:order-service.create(Long, Long)
向 order
表中写入数据,在 Order 数据库中的 UNDO_LOG
记录回滚日志(Branch ID),通知 TC 操作结果
RM:storage-service.update(Long, Long)
向 storage
表中写入数据,在 Storage 数据库中的 UNDO_LOG
记录回滚日志(Branch ID),通知 TC 操作结果
RM 向表进行写操作的过程中,Seata
先是会拦截业务 SQL
,解析 SQL
语义,在业务数据被更新前,将其保存为 before image
,然后执行业务 SQL
,在业务数据更新之后,再将其保存为 after image
,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
3、UNDO_LOG 中的前置镜像 (before image) 和后置镜像 (after image) 保存在 UNDO_LOG
中的 rollback_info
中 这里以 order
数据库中的 undo_log 为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 { "@class" : "io.seata.rm.datasource.undo.BranchUndoLog" , "xid" : "192.168.2.196:8091:104983180048351232" , "branchId" : 104983197731536896 , "sqlUndoLogs" : ["java.util.ArrayList" , [{ "@class" : "io.seata.rm.datasource.undo.SQLUndoLog" , "sqlType" : "INSERT" , "tableName" : "tab_order" , "beforeImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords" , "tableName" : "tab_order" , "rows" : ["java.util.ArrayList" , []] }, "afterImage" : { "@class" : "io.seata.rm.datasource.sql.struct.TableRecords" , "tableName" : "tab_order" , "rows" : ["java.util.ArrayList" , [{ "@class" : "io.seata.rm.datasource.sql.struct.Row" , "fields" : ["java.util.ArrayList" , [{ "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "id" , "keyType" : "PRIMARY_KEY" , "type" : -5 , "value" : ["java.lang.Long" , 18 ] }, { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "user_id" , "keyType" : "NULL" , "type" : -5 , "value" : ["java.lang.Long" , 1 ] }, { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "product_id" , "keyType" : "NULL" , "type" : -5 , "value" : ["java.lang.Long" , 1 ] }, { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "count" , "keyType" : "NULL" , "type" : 4 , "value" : null }, { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "money" , "keyType" : "NULL" , "type" : 3 , "value" : ["java.math.BigDecimal" , 88 ] }, { "@class" : "io.seata.rm.datasource.sql.struct.Field" , "name" : "status" , "keyType" : "NULL" , "type" : 4 , "value" : null }]] }]] } }]] }