Spring 数据库读写分离

读写分离常见有俩种方式

  1 第一种方式比较常用就是定义2个数据库连接,一个是Master,另一个是Slave。更新数据时我们取Master,查询数据时取Slave。太过简单不做介绍。

  2 第二种方数据源式动态切换,将数据源动态织入到程序中,达到动态选择读取主库还是从库的目的。主要使用的技术是:annotation,Spring AOP ,反射。下面介绍这种方式

首先创建DatabaseConfiguration
package com.testdatasource.third.configuration.datasource;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.MybatisConfiguration;
import com.baomidou.mybatisplus.entity.GlobalConfiguration;
import com.baomidou.mybatisplus.mapper.LogicSqlInjector;
import com.baomidou.mybatisplus.spring.MybatisSqlSessionFactoryBean;
import com.testdatasource.common.enums.DatasourceType;
import com.testdatasource.third.configuration.properties.DruidProperties;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map; @Configuration
@EnableAutoConfiguration
public class DatabaseConfiguration {
@Autowired
private ApplicationContext appContext; @Autowired
private DruidProperties druidProperties; @Bean(name = "readDatasource")
@Primary
@ConfigurationProperties(prefix = "read.datasource")
public DataSource readDatasource() {
return getReadDruidDataSource();
} /**
* 写入数据源连接池配置
*/
@Bean(name = "writeDatasource")
@ConfigurationProperties(prefix = "write.datasource")
public DataSource writeDatasource() {
return getWriteDruidDataSource();
} private DruidDataSource getReadDruidDataSource() {
DruidDataSource dataSource = new DruidDataSource();
druidProperties.configR(dataSource);
return dataSource;
} private DruidDataSource getWriteDruidDataSource() {
DruidDataSource dataSource = new DruidDataSource();
druidProperties.configW(dataSource);
return dataSource;
} //初始化数据源bean,这个bean很重要,后面事务控制也会用到
@Bean
public AbstractRoutingDataSource roundRobinDataSouceProxy(@Qualifier("readDatasource")DataSource read, @Qualifier("writeDatasource") DataSource write) {
DynamicDataSource proxy = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
targetDataSources.put( DatasourceType.read.getName(), read);
targetDataSources.put(DatasourceType.write.getName(), write);
proxy.setDefaultTargetDataSource(read);
proxy.setTargetDataSources(targetDataSources);
return proxy;
}
//初始化SqlSessionFactory,将自定义的多数据源ReadWriteSplitRoutingDataSource类实例注入到工厂中
@Bean
public SqlSessionFactory sqlSessionFactory(@Qualifier("readDatasource")DataSource read, @Qualifier("writeDatasource") DataSource write) throws Exception { MybatisSqlSessionFactoryBean sqlSessionFactory = new MybatisSqlSessionFactoryBean();
sqlSessionFactory. setDataSource(this.roundRobinDataSouceProxy(read, write));
sqlSessionFactory.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/*Mapper.xml"));
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.setMapUnderscoreToCamelCase(true);
configuration.setCacheEnabled(false);
sqlSessionFactory.setConfiguration(configuration);
sqlSessionFactory.setGlobalConfig(globalConfiguration());
return sqlSessionFactory.getObject();
} @Bean
public GlobalConfiguration globalConfiguration() {
GlobalConfiguration conf = new GlobalConfiguration(new LogicSqlInjector());
conf.setLogicDeleteValue("-1");
conf.setLogicNotDeleteValue("1");
conf.setIdType(0);
conf.setDbColumnUnderline(true);
conf.setRefresh(true);
return conf;
}
}

创建 DataSourceContextHolder

package com.testdatasource.third.configuration.datasource;

import com.testdatasource.common.enums.DatasourceType;

/**
* ClassName:DataSourceContextHolder
*
* @Description : 当前线程数据源
*/
public class DataSourceContextHolder {
/**
* 默认数据源
*/
public static final String DEFAULT_DS = DatasourceType.read.getName(); private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); // 设置数据源名
public static void setDB(String dbType) {
contextHolder.set(dbType);
} // 获取数据源名
public static String getDB() {
return (contextHolder.get());
} // 清除数据源名
public static void clearDB() {
contextHolder.remove();
}
}

创建 DynamicDataSource

package com.testdatasource.third.configuration.datasource;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

/**
* ClassName:DynamicDataSource
*
* @Description : 动态数据源
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDB();
}
}

创建 MybatisPlusConfig

package com.testdatasource.third.configuration.datasource;

import com.baomidou.mybatisplus.plugins.PaginationInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* ClassName:MybatisPlusConfig
* @Description : mybatis分页插件
* @version
*/
@Configuration
public class MybatisPlusConfig { /**
* mybatis-plus分页插件<br>
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
paginationInterceptor.setLocalPage(true);// 开启 PageHelper 的支持
return paginationInterceptor;
}
}

创建 MyDataSourceTransactionManagerAutoConfiguration

package com.testdatasource.third.configuration.datasource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; /**
*
* 多数据源事物
*/
@Configuration
@EnableTransactionManagement
public class MyDataSourceTransactionManagerAutoConfiguration extends DataSourceTransactionManagerAutoConfiguration {
@Autowired
private ApplicationContext appContext;
/**
* 自定义事务
* MyBatis自动参与到spring事务管理中,无需额外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的数据源与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
* @return
*/
@Bean(name = "transactionManager")
public DataSourceTransactionManager transactionManagers() {
return new DataSourceTransactionManager((DataSource)appContext.getBean("roundRobinDataSouceProxy"));
}
}

数据源枚举类

package com.testdatasource.common.enums;

/**
* ClassName:DatasourceType
*
* @Description :数据库读写类型
* @version
*/
public enum DatasourceType {
write("write"), read("read"); private String name; private DatasourceType(String name) {
this.name = name();
} public String getName() {
return this.name;
} public void setName(String name) {
this.name = name;
}
}

数据源配置类

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component; import java.sql.SQLException; /**
* ClassName:DruidProperties
*
* @Description :
* druid连接池配置文件属性,说明:这个类中包含了许多默认配置,若这些配置符合您的情况,您可以不用管,若不符合,建议不要修改本类,建议直接在"application.yml"中配置即可
* </p>
*/
@Component
@ConfigurationProperties(prefix = "spring.datasource")
public class DruidProperties { @Value("${spring.datasource.read_url}")
private String rurl;
@Value("${spring.datasource.read_username}")
private String rusername;
@Value("${spring.datasource.read_password}")
private String rpassword;
@Value("${spring.datasource.write_url}")
private String wurl;
@Value("${spring.datasource.write_username}")
private String wusername;
@Value("${spring.datasource.write_password}")
private String wpassword; private String driverClassName = "com.mysql.jdbc.Driver"; private Integer initialSize = 2; private Integer minIdle = 1; private Integer maxActive = 20; private Integer maxWait = 60000; private Integer timeBetweenEvictionRunsMillis = 60000; private Integer minEvictableIdleTimeMillis = 300000; private String validationQuery = "SELECT 1 FROM DUAL"; private Boolean testWhileIdle = true; private Boolean testOnBorrow = false; private Boolean testOnReturn = false; private Boolean poolPreparedStatements = true; private Integer maxPoolPreparedStatementPerConnectionSize = 20; private String filters = "stat"; public void configR(DruidDataSource dataSource) { dataSource.setUrl(rurl);
dataSource.setUsername(rusername);
// dataSource.setPassword(AESUtil.decrypt(rpassword, null));
dataSource.setPassword(AESUtils.AESDecode(null,rpassword)); dataSource.setDriverClassName(driverClassName);
dataSource.setInitialSize(initialSize); // 定义初始连接数
dataSource.setMinIdle(minIdle); // 最小空闲
dataSource.setMaxActive(maxActive); // 定义最大连接数
dataSource.setMaxWait(maxWait); // 最长等待时间 // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); // 配置一个连接在池中最小生存的时间,单位是毫秒
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setValidationQuery(validationQuery);
dataSource.setTestWhileIdle(testWhileIdle);
dataSource.setTestOnBorrow(testOnBorrow);
dataSource.setTestOnReturn(testOnReturn); // 打开PSCache,并且指定每个连接上PSCache的大小
dataSource.setPoolPreparedStatements(poolPreparedStatements);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize); try {
dataSource.setFilters(filters);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void configW(DruidDataSource dataSource) { dataSource.setUrl(wurl);
dataSource.setUsername(wusername);
dataSource.setPassword(AESUtils.AESDecode(null,wpassword)); dataSource.setDriverClassName(driverClassName);
dataSource.setInitialSize(initialSize); // 定义初始连接数
dataSource.setMinIdle(minIdle); // 最小空闲
dataSource.setMaxActive(maxActive); // 定义最大连接数
dataSource.setMaxWait(maxWait); // 最长等待时间 // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); // 配置一个连接在池中最小生存的时间,单位是毫秒
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setValidationQuery(validationQuery);
dataSource.setTestWhileIdle(testWhileIdle);
dataSource.setTestOnBorrow(testOnBorrow);
dataSource.setTestOnReturn(testOnReturn); // 打开PSCache,并且指定每个连接上PSCache的大小
dataSource.setPoolPreparedStatements(poolPreparedStatements);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize); try {
dataSource.setFilters(filters);
} catch (SQLException e) {
e.printStackTrace();
}
}
public String getRurl() {
return rurl;
} public void setRurl(String rurl) {
this.rurl = rurl;
} public String getRusername() {
return rusername;
} public void setRusername(String rusername) {
this.rusername = rusername;
} public String getRpassword() {
return rpassword;
} public void setRpassword(String rpassword) {
this.rpassword = rpassword;
} public String getWurl() {
return wurl;
} public void setWurl(String wurl) {
this.wurl = wurl;
} public String getWusername() {
return wusername;
} public void setWusername(String wusername) {
this.wusername = wusername;
} public String getWpassword() {
return wpassword;
} public void setWpassword(String wpassword) {
this.wpassword = wpassword;
} public String getDriverClassName() {
return driverClassName;
} public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
} public Integer getInitialSize() {
return initialSize;
} public void setInitialSize(Integer initialSize) {
this.initialSize = initialSize;
} public Integer getMinIdle() {
return minIdle;
} public void setMinIdle(Integer minIdle) {
this.minIdle = minIdle;
} public Integer getMaxActive() {
return maxActive;
} public void setMaxActive(Integer maxActive) {
this.maxActive = maxActive;
} public Integer getMaxWait() {
return maxWait;
} public void setMaxWait(Integer maxWait) {
this.maxWait = maxWait;
} public Integer getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
} public void setTimeBetweenEvictionRunsMillis(Integer timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
} public Integer getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
} public void setMinEvictableIdleTimeMillis(Integer minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
} public String getValidationQuery() {
return validationQuery;
} public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
} public Boolean getTestWhileIdle() {
return testWhileIdle;
} public void setTestWhileIdle(Boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
} public Boolean getTestOnBorrow() {
return testOnBorrow;
} public void setTestOnBorrow(Boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
} public Boolean getTestOnReturn() {
return testOnReturn;
} public void setTestOnReturn(Boolean testOnReturn) {
this.testOnReturn = testOnReturn;
} public Boolean getPoolPreparedStatements() {
return poolPreparedStatements;
} public void setPoolPreparedStatements(Boolean poolPreparedStatements) {
this.poolPreparedStatements = poolPreparedStatements;
} public Integer getMaxPoolPreparedStatementPerConnectionSize() {
return maxPoolPreparedStatementPerConnectionSize;
} public void setMaxPoolPreparedStatementPerConnectionSize(Integer maxPoolPreparedStatementPerConnectionSize) {
this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
} public String getFilters() {
return filters;
} public void setFilters(String filters) {
this.filters = filters;
}
}

annotation类作用于强制写库或者读库

package com.testdatasource.common.annotation;

import com.testdatasource.common.enums.DatasourceType;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; /**
* ClassName:DS
* @Description :自定义切换数据源注解
* @version
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DS {
DatasourceType value() default DatasourceType.write;
}

切面类DynamicDataSourceAspect 用于辅助自定义切面注解切换数据源

package com.testdatasource.third.aspect;

import com.testdatasource.common.annotation.DS;
import com.testdatasource.third.configuration.datasource.DataSourceContextHolder;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import java.lang.reflect.Method; /**
* ClassName:DynamicDataSourceAspect
*
* @Description : 自定义切面注解切换数据源
* @version
*/
@Aspect
@Component
@Order(value = 2)
public class DynamicDataSourceAspect { @Pointcut("@annotation(com.testdatasource.common.annotation.DS)")
public void ds() { } @Before("ds()")
public void beforeSwitchDS(JoinPoint point) { // 获得当前访问的class
Class<?> className = point.getTarget().getClass(); // 获得访问的方法名
String methodName = point.getSignature().getName();
// 得到方法的参数的类型
Class[] argClass = ((MethodSignature) point.getSignature()).getParameterTypes();
String dataSource = DataSourceContextHolder.DEFAULT_DS;
try {
// 得到访问的方法对象
Method method = className.getMethod(methodName, argClass); // 判断是否存在@DS注解
if (method.isAnnotationPresent(DS.class)) {
DS annotation = method.getAnnotation(DS.class);
// 取出注解中的数据源名
dataSource = annotation.value().getName();
}
} catch (Exception e) {
e.printStackTrace();
} // 切换数据源
DataSourceContextHolder.setDB(dataSource); } @After("ds()")
public void afterSwitchDS(JoinPoint point) {
DataSourceContextHolder.clearDB();
}
}

yml配置

spring:
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
read_url: jdbc:mysql://localhost:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&pinGlobalTxToPhysicalConnection=true
read_username: root
read_password: root
write_url: jdbc:mysql://localhost:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&pinGlobalTxToPhysicalConnection=true
write_username: root
write_password: root

好了 这样配置就没问题了。

上一篇:Java流程控制01-用户交互Scanner


下一篇:kafka 创建消费者报错 consumer zookeeper is not a recognized option