前言
Sharding-JDBC是一个开源的分布式数据库中间件,它无需额外部署和依赖,完全兼容JDBC和各种ORM框架。Sharding-JDBC作为面向开发的微服务云原生基础类库,完整的实现了分库分表、读写分离和分布式主键功能,并初步实现了柔性事务。
以2.0.3为例maven包依赖如下
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dongpeng</groupId>
<artifactId>sharding-jdbc</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>sharding-jdbc</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.shardingjdbc</groupId>
<artifactId>sharding-jdbc-core</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
简单的分库demo介绍如下
package com.dongpeng.sharding.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import io.shardingjdbc.core.api.ShardingDataSourceFactory;
import io.shardingjdbc.core.api.config.ShardingRuleConfiguration;
import io.shardingjdbc.core.api.config.TableRuleConfiguration;
import io.shardingjdbc.core.api.config.strategy.InlineShardingStrategyConfiguration;
/**
* sharding-jdbc分库的demo
* @author Admin
*
*/
public class ShardingDataDemo {
public static void main(String[] args) throws Exception{
Map<String, DataSource> dataSourceMap = new HashMap<String, DataSource>();
ComboPooledDataSource dataSource1 = new ComboPooledDataSource();
dataSource1.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
dataSource1.setJdbcUrl("jdbc:mysql://localhost:3306/db_0?useSSL=false");
dataSource1.setUser("root");
dataSource1.setPassword("root");
dataSourceMap.put("db_0", dataSource1);
ComboPooledDataSource dataSource2 = new ComboPooledDataSource();
dataSource2.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
dataSource2.setJdbcUrl("jdbc:mysql://localhost:3306/db_1?useSSL=false");
dataSource2.setUser("root");
dataSource2.setPassword("root");
dataSourceMap.put("db_1", dataSource1);
/**
* 配置分库规则
*/
TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration();
orderTableRuleConfig.setLogicTable("t_order");
orderTableRuleConfig.setActualDataNodes("db_${0..1}.t_order_0");
orderTableRuleConfig.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("user_id", "db_${user_id % 2}"));
/**
* 分片规则配置
*/
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
Properties properties = new Properties();
properties.put("sql.show", "true");
DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,new HashMap<String,Object>(),properties);
Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement("select * from t_order where user_id=?");
statement.setInt(1, 1);
ResultSet rs = statement.executeQuery();
while(rs.next()) {
System.out.println(rs.getString("user_id"));
}
rs.close();
statement.close();
connection.close();
}
}
几个重要的类
ShardingRuleConfiguration
ShardingDataSourceFactory
ShardingDataSource
ShardingConnection
ShardingPreparedStatement
ShardingStatement
源码分析
ShardingRuleConfiguration
分片的规则配置类主要的属性如下
private String defaultDataSourceName;
private Collection<TableRuleConfiguration> tableRuleConfigs = new LinkedList<>();
private Collection<String> bindingTableGroups = new LinkedList<>();
private ShardingStrategyConfiguration defaultDatabaseShardingStrategyConfig;
private ShardingStrategyConfiguration defaultTableShardingStrategyConfig;
private String defaultKeyGeneratorClass;
private Collection<MasterSlaveRuleConfiguration> masterSlaveRuleConfigs = new LinkedList<>();
defaultDataSourceName指定没有分片规则库使用的数据源
tableRuleConfigs 配置指定表的分片规则
TableRuleConfiguration的属性如下
private String logicTable;
private String actualDataNodes;
private ShardingStrategyConfiguration databaseShardingStrategyConfig;
private ShardingStrategyConfiguration tableShardingStrategyConfig;
private String keyGeneratorColumnName;
private String keyGeneratorClass;
private String logicIndex;
logicTable配置逻辑表比如数据库中有t_order_0,t_order_1两张表,逻辑表设置为t_order
actualDataNodes逻辑节点配置,用的类InlineExpressionParser作为解析器,配置如 db_${0..1}.t_order_0
databaseShardingStrategyConfig配置数据库的分片规则
tableShardingStrategyConfig 表的分片规则
两个分片规则都实现了ShardingStrategyConfiguration接口,用于构建ShardingStrategy分片类,以下是sharding-jdbc提供的分片配置类,具体实现分片规则可以查看源码
keyGeneratorColumnName配置分布式主键
keyGeneratorClass配置id生成类
logicIndex配置逻辑分片索引位置
还有properties,map的方式配置一些信息这个可参考文档,如sql.show在properties中配置等等
ShardingDataSourceFactory
shardingDataSourceFactory是ShardingDataSource的工厂类,用于创建ShardingDataSource生成方式
支持文件,自定义等等如下
public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig,
final Map<String, Object> configMap, final Properties props) throws SQLException {
return new ShardingDataSource(shardingRuleConfig.build(dataSourceMap), configMap, props);
}
/**
* Create sharding data source.
*
* @param yamlFile yaml file for rule configuration of databases and tables sharding with data sources
* @return sharding data source
* @throws SQLException SQL exception
* @throws IOException IO exception
*/
public static DataSource createDataSource(final File yamlFile) throws SQLException, IOException {
YamlShardingConfiguration config = unmarshal(yamlFile);
return new ShardingDataSource(config.getShardingRule(Collections.<String, DataSource>emptyMap()), config.getShardingRule().getConfigMap(), config.getShardingRule().getProps());
}
ShardingDataSource
主要用于初始化构建dataSource的一些配置信息封装成ShardingContext提供给ShardingConnection使用
ShardingConnection
继承自AbstractConnectionAdapter同时实现了connection接口,封了preparedStatement和statement的调用方法,ShardingPreparedStatement和ShardingStatement类来实现,主要的方法是获取connection的方法如下
public Connection getConnection(final String dataSourceName, final SQLType sqlType) throws SQLException {
if (getCachedConnections().containsKey(dataSourceName)) {
return getCachedConnections().get(dataSourceName);
}
DataSource dataSource = shardingContext.getShardingRule().getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the rule of %s in DataSourceRule", dataSourceName);
String realDataSourceName;
if (dataSource instanceof MasterSlaveDataSource) {
NamedDataSource namedDataSource = ((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
realDataSourceName = namedDataSource.getName();
if (getCachedConnections().containsKey(realDataSourceName)) {
return getCachedConnections().get(realDataSourceName);
}
dataSource = namedDataSource.getDataSource();
} else {
realDataSourceName = dataSourceName;
}
Connection result = dataSource.getConnection();
getCachedConnections().put(realDataSourceName, result);
replayMethodsInvocation(result);
return result;
}
这个方法的作用是,如果拿到的数据源是一个MasterSalveDataSource的数据源,需要进行读写分离的判断,并最终返回执行的connection
ShardingPreparedStatement和ShardingStatement
具体的执行类,都有对应的route方法封装最终都交由SQLRouter来解析sql获取最终的路由信息,最后执行相应的sql