前言
在上文章SpringCloud整合Seata分布式事务(上)之基础环境搭建中我们已经搭建好基础的环境了,pom依赖和yml配置也完成了,在本文中,我们主要完成三个微服务订单服务:cloudalibaba-seata-order-service2001
、库存服务:cloudalibaba-seata-storage-service2002
和余额服务:cloudalibaba-seata-account-service2003
的功能实现。
整个订单服务的包结构如下所示,三个服务的包结构基本都是这样,就不一一贴出了。
当然为避免篇幅,实体类都使用lombok插件简化getter、setter等方法,maven依赖中添加后别忘了在idea中下载插件。
创建好订单模块之后,我们先建立POJO(也可以叫domain、entity等),也就是建立与数据库表映射的实体类。
订单实体类:
package com.dl.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order
{
private Long id; //主键id
private Long userId; //用户id
private Long productId; //产品id
private Integer count; //数量
private BigDecimal money; //金额
private Integer status; //订单状态:0:创建中;1:已完结
}
接下来就从dao层开始编写,编写mapper接口以及相应的xml文件,为了快速实现xml和mapper接口之间跳转,使用一个Free Mybatis plugin插件。
安装好之后,编写了mapper和xml就可以利用这个小箭头快速跳转了。
订单dao接口:
package com.dl.dao;
import com.dl.pojo.Order;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper //采用mybatis的原生搭配注解
public interface OrderDao {
//1 新建订单
int create(Order order);
//2 修改订单状态,从零改为1
int update(@Param("userId") Long userId,@Param("status") Integer status);
}
创建好dao之后,我们在resources目录下的mapper文件夹中创建对应的xml文件OrderMapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.dl.dao.OrderDao">
<resultMap id="BaseResultMap" type="com.dl.pojo.Order">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="count" property="count" jdbcType="INTEGER"/>
<result column="money" property="money" jdbcType="DECIMAL"/>
<result column="status" property="status" jdbcType="INTEGER"/>
</resultMap>
<insert id="create">
insert into t_order (id,user_id,product_id,count,money,status)
values (null,#{userId},#{productId},#{count},#{money},0);
</insert>
<update id="update">
update t_order set status = 1
where user_id=#{userId} and status = #{status};
</update>
</mapper>
然后编写service业务逻辑层,在这里主要包含三个接口,一个订单创建接口,另外两个接口为远程调用库存服务和余额服务使用的接口。
订单接口:
package com.dl.service;
import com.dl.pojo.Order;
public interface OrderService {
int create(Order order);
}
在远程调用时候我们使用了openfeign来进行远程调用,底层实际上还是封装了ribbon再下面就是使用httpclient,订单创建好之后需要进行库存扣减,根据产品id和产品数量进行库存扣减,远程调用库存服务接口如下:
package com.dl.service;
import com.dl.response.CommonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "seata-storage-service") //value为库存服务的application.name服务名(yml中有配置的)
public interface StorageService {
//扣减库存,调用库存服务
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
最后还需要扣减账户余额,因此远程调用账户余额服务,远程调用扣减账户余额接口如下:
package com.dl.service;
import com.dl.response.CommonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;
@FeignClient(value = "seata-account-service") //远程调用账户余额服务
public interface AccountService {
//扣减账户
@PostMapping(value = "/account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
然后我们再继续完善订单创建的实现类,主要包含三个逻辑,订单创建–库存扣减—账户余额扣减,其中每一个环节出现问题都应该进行整体事务的回滚,因此使用Seata提供的注解@GlobalTransactional
来保证事务回滚,比如库存扣减失败、账户余额不足扣减失败,远程调用超时等等,相当于在这儿开启了全局事务,每个本地事务我们还是通过@Transactional
来保证。
package com.dl.service.impl;
import com.dl.dao.OrderDao;
import com.dl.pojo.Order;
import com.dl.service.AccountService;
import com.dl.service.OrderService;
import com.dl.service.StorageService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
* 下订单->扣库存->减余额->改状态,在一个分布式事务中
* @param order
*/
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public int create(Order order) {
log.info("------------开始创建订单!------------");
int result = orderDao.create(order);
log.info("------------调用库存服务开始扣减库存!------------");
storageService.decrease(order.getProductId(),order.getCount());
log.info("------------扣减库存结束!------------");
log.info("------------调用库存服务开始扣减账户!------------");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("------------扣减账户余额结束!------------");
log.info("------------开始修改订单状态!------------");
result=orderDao.update(order.getUserId(),0); //0:创建中,1:创建完成
log.info("------------订单状态修改结束!------------");
log.info("------------订单交易结束!------------");
return result;
}
}
同时,由于Seata需要和数据库打交道,因此我们需要取消数据源的自动配置,将数据源的代理权给Seata,首先添加config配置类如下:
package com.dl.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
/**
* 我们知道,对于事务的处理,最重要的是要拿到数据源,因为通过数据源我们可以控制事务什么时候回滚或提交,
* 所以数据源我们需要让seata来代理,在我们的启动注解上排除自动加载的数据源@SpringBootApplication
* (exclude = {DataSourceAutoConfiguration.class})并配置seata数据源代理
*/
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){
return new DruidDataSource();
}
/**
* 创建DataSourceProxy
*/
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
/**
* 将原有的DataSource对象替换为DataSourceProxy
*/
@Bean(name = "sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
然后在主启动类上去掉数据源的自动配置如下(其余两个服务也需要做这个配置):
package com.dl;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableDiscoveryClient
@EnableFeignClients
//取消数据源的自动创建,而是使用自己定义的,Seata需要拿到数据源的代理权
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan({"com.dl.dao"})
public class SeataOrder2001Application {
public static void main(String[] args) {
SpringApplication.run(SeataOrder2001Application.class,args);
}
}
最后就是controller了,在这里提供给用户一个下单接口就ok,同时为了统一响应,先定义一个简单的响应结果类:
package com.dl.response;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class CommonResult<T>{
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message, T data){
this.code=code;
this.message=message;
this.data=data;
}
public static <T> CommonResult<T>createBySuccessMsg(String message,T data){
return new CommonResult<T>(200,message,data);
}
public static <T> CommonResult<T>createByErrorMsg(String message,T data){
return new CommonResult<T>(404,message,data);
}
}
订单服务controller的代码:
package com.dl.controller;
import com.dl.pojo.Order;
import com.dl.response.CommonResult;
import com.dl.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping(value="/order/create")
public CommonResult<String> create(Order order){
int result = orderService.create(order);
if(result>0){
return CommonResult.createBySuccessMsg("订单交易成功!",null);
}else{
return CommonResult.createByErrorMsg("订单交易失败!",null);
}
}
}
在库存服务中,对应的实体类为:
package com.dl.pojo;
import lombok.Data;
@Data
public class Storage {
private Long id;
/**
* 产品id
*/
private Long productId;
/**
* 总库存
*/
private Integer total;
/**
* 已用库存
*/
private Integer used;
/**
* 剩余库存
*/
private Integer residue;
}
库存服务扣减库存dao:
package com.dl.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface StorageDao {
//扣减库存
int decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
对应的mapper文件StorageMapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.dl.dao.StorageDao">
<resultMap id="BaseResultMap" type="com.dl.pojo.Storage">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="INTEGER"/>
<result column="used" property="used" jdbcType="INTEGER"/>
<result column="residue" property="residue" jdbcType="INTEGER"/>
</resultMap>
<update id="decrease">
UPDATE
t_storage
SET
used = used + #{count},residue = residue - #{count}
WHERE
product_id = #{productId} AND residue>#{count}
</update>
</mapper>
业务逻辑层的扣减库存接口:
package com.dl.service;
import org.springframework.web.bind.annotation.RequestParam;
public interface StorageService {
int decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
对应实现类:
package com.dl.service.impl;
import com.dl.dao.StorageDao;
import com.dl.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageDao storageDao;
@Transactional(rollbackFor = Exception.class) //保证本地事务回滚
@Override
public int decrease(Long productId, Integer count) {
log.info("---------------库存服务开始扣减库存---------------");
return storageDao.decrease(productId,count); //这里直接返回的,就没有写扣减结束的日志了
}
}
在controller中提供一个接口供外部服务调用:
package com.dl.controller;
import com.dl.response.CommonResult;
import com.dl.service.StorageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StorageController {
@Autowired
private StorageService storageService;
/**
* 扣减库存
*/
@RequestMapping("/storage/decrease")
public CommonResult<String> decrease(Long productId, Integer count) {
int result = storageService.decrease(productId, count);
if(result>0){
return CommonResult.createBySuccessMsg("扣减库存成功!",null);
}else{
return CommonResult.createByErrorMsg("扣减库失败!",null);
}
}
}
服务名通过配置文件配置的,且我们都是以nacos作服务发现的,因此调用方就能直接通过nacos获取这个服务。
至于config和response以及主启动类配置,三个服务中都是保持一致的。
账户余额服务
账户实体类:
package com.dl.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
private Long id;
/**
* 用户id
*/
private Long userId;
/**
* 总额度
*/
private BigDecimal total;
/**
* 已用额度
*/
private BigDecimal used;
/**
* 剩余额度
*/
private BigDecimal residue;
}
dao接口:
package com.dl.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.math.BigDecimal;
@Mapper
public interface AccountDao {
int decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
对应的AccountMapper.xml文件:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.dl.dao.AccountDao">
<resultMap id="BaseResultMap" type="com.dl.pojo.Account">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="DECIMAL"/>
<result column="used" property="used" jdbcType="DECIMAL"/>
<result column="residue" property="residue" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
UPDATE t_account
SET
residue = residue - #{money},used = used + #{money}
WHERE
user_id = #{userId} AND residue>#{money}
</update>
</mapper>
service业务逻辑层
service接口:
package com.dl.service;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;
public interface AccountService {
/**
* 扣减账户余额
* @param userId 用户id
* @param money 金额
*/
int decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
对应的实现类:
package com.dl.service.impl;
import com.dl.dao.AccountDao;
import com.dl.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountDao accountDao;
@Transactional(rollbackFor = Exception.class)
@Override
public int decrease(Long userId, BigDecimal money) {
log.info("---------------账户服务开始进行账户金额扣减--------------");
return accountDao.decrease(userId,money);
}
}
最后就是controller:
package com.dl.controller;
import com.dl.response.CommonResult;
import com.dl.service.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
@RestController
public class AccountController {
@Autowired
private AccountService accountService;
@RequestMapping("/account/decrease")
public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
int result = accountService.decrease(userId,money);
if(result>0){
return CommonResult.createBySuccessMsg("账户扣减成功",null);
}else{
return CommonResult.createByErrorMsg("账户扣减失败!",null);
}
}
}
至此就完成了三个服务就完成了基础实现了,然后我们利用接口进行测试:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
在测试之前保证Nacos和Seata服务启动,然后启动2002、2003库存和账户余额服务以及2001订单服务,就能使用上面接口进行测试了,进行正常测试发现业务功能正常,也可以进行异常测试,比如我们使用远程调用,调用接口有一定响应时间,超时则失败,可以手动关掉库存或账户服务出现异常之后发现其余服务已经本地提交的能正常回滚,至此完成Cloud整合Seata分布式事务测试。