分布式事务seata测试用例

项目地址:https://gitee.com/weicj0426/demo-seata

下载项目后按照如下操作

一、新建数据库

数据库初始化脚本(这里使用的是mysql8):./db 目录下,数据库字符集:utf8mb4 ,排序规则:utf8mb4_general_ci

  1. 创建数据库:seata 并执行 db/seata.sql 脚本
  2. 创建数据库:seata_account 并执行 db/seata_account.sql 脚本
  3. 创建数据库:seata_order 并执行 db/seata_order.sql 脚本
  4. 创建数据库:seata_storage 并执行 db/seata_storage.sql 脚本

二、启动nacos服务

在 nacos-2.1 目录下 进入 nacos/bin 目录,执行startup.sh启动nacos服务

1
.\startup.cmd -m standalone

三、启动seata服务

  1. 进入 seata/conf 目录下 application.yml 修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
server:
port: 7091

spring:
application:
name: seata-server

logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash

console:
user:
username: seata
password: seata

seata:
security:
secretKey: seata # 请确保这里有一个具体的值
tokenValidityInMilliseconds: 1000000000
config:
# support: nacos 、 consul 、 apollo 、 zk 、 etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
username: nacos
password: nacos
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key: ""
#secret-key: ""
data-id: seataServer.properties
consul:
server-addr: 127.0.0.1:8500
acl-token:
key: seata.properties
apollo:
appId: seata-server
apollo-meta: http://192.168.1.204:8801
apollo-config-service: http://192.168.1.204:8080
namespace: application
apollo-access-key-secret:
cluster: seata
zk:
server-addr: 127.0.0.1:2181
session-timeout: 6000
connect-timeout: 2000
username:
password:
node-path: /seata/seata.properties
etcd3:
server-addr: http://localhost:2379
key: seata.properties
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 30.240.*
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
cluster: default
username: nacos
password: nacos
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key: ""
#secret-key: ""
eureka:
service-url: http://localhost:8761/eureka
application: default
weight: 1
redis:
server-addr: localhost:6379
db: 0
password:
cluster: default
timeout: 0
zk:
cluster: default
server-addr: 127.0.0.1:2181
session-timeout: 6000
connect-timeout: 2000
username: ""
password: ""
consul:
cluster: default
server-addr: 127.0.0.1:8500
acl-token:
etcd3:
cluster: default
server-addr: http://localhost:2379
sofa:
server-addr: 127.0.0.1:9603
application: default
region: DEFAULT_ZONE
datacenter: DefaultDataCenter
cluster: default
group: SEATA_GROUP
address-wait-time: 3000

server:
service-port: 8091 #If not configured, the default is '${server.port} + 1000'
max-commit-retry-timeout: -1
max-rollback-retry-timeout: -1
rollback-retry-timeout-unlock-enable: false
enable-check-auth: true
enable-parallel-request-handle: true
retry-dead-threshold: 130000
xaer-nota-retry-timeout: 60000
recovery:
handle-all-session-period: 1000
undo:
log-save-days: 7
log-delete-period: 86400000
session:
branch-async-queue-size: 5000 #branch async remove queue size
enable-branch-async-remove: false #enable to asynchronous remove branchSession
store:
# support: file 、 db 、 redis
mode: db
session:
mode: file
lock:
mode: file
file:
dir: sessionStore
max-branch-session-size: 16384
max-global-session-size: 512
file-write-buffer-cache-size: 16384
session-reload-read-size: 100
flush-disk-mode: async
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
user: mysql
password: 123456
min-conn: 5
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 100
max-wait: 5000
redis:
mode: single
database: 0
min-conn: 1
max-conn: 10
password:
max-total: 100
query-limit: 100
single:
host: 127.0.0.1
port: 6379
sentinel:
master-name:
sentinel-hosts:
metrics:
enabled: false
registry-type: compact
exporter-list: prometheus
exporter-prometheus-port: 9898
transport:
rpc-tc-request-timeout: 30000
enable-tc-server-batch-send-response: false
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
boss-thread-size: 1


  1. 在 nacos 配置列表中添加 seataServer.properties 注意分组 SEATA_GROUP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
  1. 在 nacos 配置列表中添加两个分组 service.vgroupMapping.default_tx_group,service.vgroupMapping.my_test_tx_group 注意分组 SEATA_GROUP
1
default
  1. 在 seata-1.5 目录下 进入 seata/bin 目录,执行startup.sh启动seata服务
1
.\seata-server.bat
  1. 启动成功后,在nacos中可以看到seata的配置信息

这里要注意:seata的配置信息是分组的,默认是SEATA_GROUP,如果需要修改,可以在启动脚本中修改

四、客户端配置

  1. 项目中需要添加seata依赖
1
2
3
4
5
6
<!-- SpringBoot Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2021.0.6.1</version>
</dependency>
  1. 在application.yml中配置seata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
seata:
enabled: true
application-id: ${spring.application.name} #微服务应用名称
tx-service-group: my_test_tx_group #此处配置自定义的seata事务分组名称
enable-auto-data-source-proxy: true #开启数据库代理
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848 #注册中心nacos地址
group: SEATA_GROUP #分组名 对应服务器端配置
cluster: default #默认集群名
namespace: #命名空间 对应nacos中配置中心seata
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848 #配置中心nacos地址
group: SEATA_GROUP #分组名 对应服务器端配置
namespace: #命名空间 对应nacos中配置中心seata
service:
vgroup-mapping:
my_test_tx_group: default #事务分组

这里启动经常会因为 vgroup-mapping 配置问题导致无法使用,
这里的 my_test_tx_group 在 nacos中的 config是有配置的
另外 seata分组和命名空间在配置的时候需要对应好 不然会找不到

  1. 在需要分布式事务管理的service中添加注解@GlobalTransactional

OrderServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.example.demoorder.service.impl;

import com.demo.seata.entities.Order;
import com.demo.seata.resp.ResultData;
import com.demo.seata.service.OrderService;
import com.example.demoorder.api.AccountFeignApi;
import com.example.demoorder.api.StorageFeignApi;
import com.example.demoorder.mapper.OrderMapper;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
* @author weicj
* @since 2025.02.10
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

@Resource
private OrderMapper orderMapper;
@Resource//订单微服务通过OpenFeign去调用库存微服务
private StorageFeignApi storageFeignApi;
@Resource//订单微服务通过OpenFeign去调用账户微服务
private AccountFeignApi accountFeignApi;


@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public ResultData create(Order order) {

//xid检查
String xid = RootContext.getXID();

//1. 新建订单
log.info("==================>开始新建订单"+"\t"+"xid_order:" +xid);
//订单状态status:0:创建中;1:已完结
order.setStatus(0);
int result = orderMapper.insert(order);

//插入订单成功后获得插入mysql的实体对象
Order orderFromDB = null;
if(result > 0)
{
orderFromDB = orderMapper.selectById(order.getId());
//orderFromDB = orderMapper.selectByPrimaryKey(order.getId());
log.info("-------> 新建订单成功,orderFromDB info: "+orderFromDB);
System.out.println();
//2. 扣减库存
log.info("-------> 订单微服务开始调用Storage库存,做扣减count");
storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());
log.info("-------> 订单微服务结束调用Storage库存,做扣减完成");
System.out.println();
//3. 扣减账号余额
log.info("-------> 订单微服务开始调用Account账号,做扣减money");
accountFeignApi.decrease(orderFromDB.getUserId(), orderFromDB.getMoney());
log.info("-------> 订单微服务结束调用Account账号,做扣减完成");
System.out.println();
//4. 修改订单状态
//订单状态status:0:创建中;1:已完结
log.info("-------> 修改订单状态");
orderFromDB.setStatus(1);

int updateResult = orderMapper.updateById(orderFromDB);

log.info("-------> 修改订单状态完成"+"\t"+updateResult);
log.info("-------> orderFromDB info: "+orderFromDB);
}
System.out.println();
log.info("==================>结束新建订单"+"\t"+"xid_order:" +xid);
return ResultData.success("订单创建成功");
}
}

StorageServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example.demostorage.service.impl;

import com.demo.seata.entities.Storage;
import com.demo.seata.resp.ResultData;
import com.demo.seata.service.StorageService;
import com.example.demostorage.mapper.StorageMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
* @author weicj
* @since 2025.02.10
*/
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {

@Resource
private StorageMapper storageMapper;

/**
* 扣减库存
* !!! 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public ResultData decrease(Long productId, Integer count) {
log.info("------->storage-service中扣减库存开始");
// storageMapper.decrease(productId,count);

Storage storage = storageMapper.selectById(productId);

if (storage.getUsed() + count > storage.getTotal()) {
return ResultData.fail("库存不足");
}

storage.setUsed(storage.getUsed() + count);
storage.setResidue(storage.getTotal() - storage.getUsed());
storageMapper.updateById(storage);

log.info("------->storage-service中扣减库存结束");
return ResultData.success();
}

}

AccountServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.demoaccount.service.impl;

import com.demo.seata.entities.Account;
import com.demo.seata.resp.ResultData;
import com.demo.seata.service.AccountService;
import com.example.demoaccount.mapper.AccountMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

/**
* @author weicj
* @since 2025.02.10
*/
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {


@Resource
private AccountMapper accountMapper;
/**
* 扣减账户余额
* !!! 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public ResultData decrease(Long userId, BigDecimal money) {
log.info("------->account-service中扣减账户余额开始");

// accountMapper.decrease(userId,money);
Account account = accountMapper.selectById(userId);
account.setUsed(account.getUsed().add(money));
account.setResidue(account.getResidue().subtract(money));
accountMapper.updateById(account);

myTimeOut();
// int age = 10/0;
log.info("------->account-service中扣减账户余额结束");
return ResultData.success();
}

/**
* 模拟超时异常,全局事务回滚
*/
private static void myTimeOut()
{
try { TimeUnit.SECONDS.sleep(65); } catch (InterruptedException e) { e.printStackTrace(); }
}

}

五、测试

  1. 启动nacos、seata、订单、库存、账户服务
  2. 调用订单服务,可以看到库存和账户服务都执行了,但是订单服务回滚了,库存和账户服务都回滚了
1
http://localhost:18092/order/create?userId=1&productId=1&count=10&money=100