环境准备
环境 | 版本 |
---|---|
JDK | 11 |
Seata | 1.4.2 |
Dubbo | 2.7.2 |
Zookeeper | |
MybatisPlus |
sql脚本
seata服务使用的数据库脚本seata库
/*
Navicat Premium Data Transfer
Source Server : l-mysql
Source Server Type : MySQL
Source Server Version : 50733
Source Host : 10.211.55.4:3306
Source Schema : seata
Target Server Type : MySQL
Target Server Version : 50733
File Encoding : 65001
Date: 07/05/2021 14:49:55
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
业务服务使用的sql脚本test库
/*
Navicat Premium Data Transfer
Source Server : l-mysql
Source Server Type : MySQL
Source Server Version : 50733
Source Host : 10.211.55.4:3306
Source Schema : test
Target Server Type : MySQL
Target Server Version : 50733
File Encoding : 65001
Date: 07/05/2021 14:51:42
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` mediumtext,
`branch_id` mediumtext,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for order_account
-- ----------------------------
DROP TABLE IF EXISTS `order_account`;
CREATE TABLE `order_account` (
`order_account_id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_id` bigint(20) DEFAULT NULL,
`order_id` bigint(20) DEFAULT NULL,
`is_deleted` int(1) DEFAULT NULL,
PRIMARY KEY (`order_account_id`)
) ENGINE=InnoDB AUTO_INCREMENT=67 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for order_item
-- ----------------------------
DROP TABLE IF EXISTS `order_item`;
CREATE TABLE `order_item` (
`order_id` bigint(20) NOT NULL AUTO_INCREMENT,
`type` int(1) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
SEATA服务安装启动
修改registry.conf配置项
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "zk"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "10.211.55.4:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "zk"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
## apolloConfigService will cover apolloMeta
apolloMeta = "http://192.168.1.204:8801"
apolloConfigService = "http://192.168.1.204:8080"
namespace = "application"
apolloAccesskeySecret = ""
cluster = "seata"
}
zk {
serverAddr = "10.211.55.4:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
修改项目:
registry.type=“zk”
registry.zk:serverAddr=“10.211.55.4:2181”
config.type=“zk”
config.zk:serverAddr=“10.211.55.4:2181”
启动seata服务:
nohup sh bin/seata-server.sh -p 8091 -n 2 &
把Seata配置同步到zk中
zk-config.properties
该文件放在项目的resources
目录下,配置内容如下,需要写个脚本把这些配置项初始化zk中。
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.vgroupMapping.account-web-group=default
service.vgroupMapping.account-service-app-group=default
service.vgroupMapping.order-service-app-group=default
service.default.grouplist=10.211.55.4:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://10.211.55.4:3306/seata?useUnicode=true
store.db.user=volc
store.db.password=uURUx%7i
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=10.211.55.4
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
修改配置:
service.default.grouplist=10.211.55.4:8091
store.db.url=jdbc:mysql://10.211.55.4:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
service.vgroupMapping.account-web-group=default
# account服务组:account-service-app-group,组名:default
service.vgroupMapping.account-service-app-group=default
# order服务组:order-service-app-group,组名:default
service.vgroupMapping.order-service-app-group=default
初始化seata配置脚本
import io.seata.config.zk.DefaultZkSerializer;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.springframework.util.ResourceUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Set;
@Slf4j
public class ZkDataInit {
private static volatile ZkClient zkClient;
public static void main(String[] args) {
if (zkClient == null) {
ZkSerializer zkSerializer = new DefaultZkSerializer();
zkClient = new ZkClient("10.211.55.4:2181", 6000, 2000, zkSerializer);
}
if (!zkClient.exists("/seata")) {
zkClient.createPersistent("/seata", true);
}
//获取key对应的value值
Properties properties = new Properties();
// 使用ClassLoader加载properties配置文件生成对应的输入流
// 使用properties对象加载输入流
try {
File file = ResourceUtils.getFile("classpath:zk-config.properties");
InputStream in = new FileInputStream(file);
properties.load(in);
Set<Object> keys = properties.keySet();//返回属性key的集合
for (Object key : keys) {
boolean b = putConfig(key.toString(), properties.get(key).toString());
log.info(key.toString() + "=" + properties.get(key) + "result=" + b);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @param dataId
* @param content
* @return
*/
public static boolean putConfig(final String dataId, final String content) {
Boolean flag = false;
String path = "/seata/" + dataId;
if (!zkClient.exists(path)) {
zkClient.create(path, content, CreateMode.PERSISTENT);
flag = true;
} else {
zkClient.writeData(path, content);
flag = true;
}
return flag;
}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N9211n4s-1627813162219)(https://i.loli.net/2021/05/07/GkZ4ytr1iRjPe5A.png)]
~连接zk,查看初始化情况。
业务服务整合Seata
依赖包
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.2</version>
</dependency>
resources目下添加配置文件registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "zk"
zk {
cluster = "default"
serverAddr = "10.211.55.4:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "zk"
zk {
serverAddr = "10.211.55.4:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
}
配置类
package com.enterprise.order.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.spring.annotation.GlobalTransactionScanner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
/**
* 代理数据库
*/
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
/**
* 配置全局事务扫描器,有两个参数,一个是应用名称,一个是事务分组
*
* @return
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("order-service-app", "order-service-app-group");
}
}
mybatisplus扫描mapper接口配置类
package com.enterprise.order.config;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@MapperScan("com.enterprise.order.dal.*.mapper*")
public class MybatisPlusConfig {
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
return paginationInterceptor;
}
}
启动类
@EnableDubbo
@MapperScan("com.enterprise.order.dal.mapper")
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class OrderServiceApp {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApp.class, args);
}
}
业务代码添加分布式事务
@Override
@GlobalTransactional
public CommonRes<OrderRes> createOrder(OrderReq orderReq) {
log.info("全局事务id :" + RootContext.getXID());
// 通知账户中心下单开始
accountClient.notifyAccountOrder();
// 创建订单数据
OrderRes order = orderService.createOrder(orderReq);
return CommonRes.success(order);
}
account服务同order服务相同的配置,启动order服务,观察seata服务打印的日志
10:58:09.740 INFO --- [rverHandlerThread_1_2_500] i.s.c.r.processor.server.RegRmProcessor : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://10.211.55.4:3306/test', applicationId='order-service-app', transactionServiceGroup='order-service-app-group'},channel:[id: 0x28dca215, L:/10.211.55.4:8091 - R:/10.211.55.2:52633],client version:1.4.2
错误:
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: config type can not be null
无法注入globalTransactionScanner对象原因是resources目录下没有配置registry.conf文件
接口测试
16:34:46.680 [DubboServerHandler-10.211.55.2:20881-thread-3] INFO com.enterprise.order.rpc.OrderRpcImpl - 全局事务id :10.211.55.4:8091:18145212904984580
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sc5ib8yN-1627813162221)(https://i.loli.net/2021/05/07/x2PLqiQAfuedEtl.png)]
回滚日志
16:34:46.680 [DubboServerHandler-10.211.55.2:20881-thread-3] INFO com.enterprise.order.rpc.OrderRpcImpl - 全局事务id :10.211.55.4:8091:18145212904984580
16:34:47.043 [DubboServerHandler-10.211.55.2:20881-thread-3] DEBUG i.s.r.d.undo.AbstractUndoLogManager - Flushing UNDO LOG: {"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"10.211.55.4:8091:18145212904984580","branchId":18145212904984582,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"INSERT","tableName":"order_item","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords","tableName":"order_item","rows":["java.util.ArrayList",[]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"order_item","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"order_id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",101]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"type","keyType":"NULL","type":4,"value":1}]]}]]}}]]}
16:34:47.046 [DubboServerHandler-10.211.55.2:20881-thread-3] DEBUG c.e.order.dal.mapper.OrderMapper.insert - <== Updates: 1
16:34:47.048 [DubboServerHandler-10.211.55.2:20881-thread-3] DEBUG org.mybatis.spring.SqlSessionUtils - Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3c17d29a]
总结:
- seata数据库和业务数据脚本。
- 下载seata,修改registry.conf文件,启动seata服务。
- 同步seata配置到zk(刷脚本)。
- 项目整合,resources目录下添加registry.conf配置。
- 项目整合,seata代理业务数据源DataSourceProxy。
- 项目整合,注入配置全局事务扫描器GlobalTransactionScanner。
- 业务代理使用全局事务注解。