前言
sharding-jdbc主要功能是分片,我们实现不同分片算法来进行分库分表,另外一个扩展点就是主键生成, 本文主要记录下sharding-jdbc执行流程和分片路由具体实现以及主键生成,在工作中方便排查问题。
主要记录三个问题:
1.sharding-jdbc执行流程
2.自定义分片算法是如何被sharding-jdbc框架调用的
3.主键是在何处何时生成
4.扩展机制spi
1. sharding-jdbc处理流程
操作数据库套路是:数据源获取数据库连接,数据库连接生成Statement,然后执行Statement,获取sql执行结果。
那么对于sharding来说
入口获取数据库连接就是ShardingDataSource.getConnection()
接着生成PreparedStatement:ShardingConnection.prepareStatement(String)
,生成ShardingPreparedStatement
对于增删改查就是ShardingPreparedStatement的execute()、executeUpdate()、executeQuery()、、
以execute()
为例:
//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.execute()
@Override
public boolean execute() throws SQLException {
try {
clearPrevious();//本地缓存清空
shard();//路由,路由结果保存到this.routeResult。核心功能
initPreparedStatementExecutor();//初始化执行器
return preparedStatementExecutor.execute();//真实sql执行jdbc操作
} finally {
clearBatch();
}
}
分析核心路由功能shard()
//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.shard()
private void shard() {
routeResult = shardingEngine.shard(sql, getParameters());
}
//org.apache.shardingsphere.core.BaseShardingEngine.shard(String, List<Object>)
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
SQLRouteResult result = route(sql, clonedParameters);//路由核心实现
result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));//非hint,重写sql
if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
SQLLogger.logSQL(sql, showSimple, result.getSqlStatement(), result.getRouteUnits());//打印真实sql
}
return result;
}
//org.apache.shardingsphere.core.PreparedQueryShardingEngine.route(String, List<Object>)
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
return routingEngine.route(parameters);
}
//org.apache.shardingsphere.core.route.PreparedStatementRoutingEngine.route(List<Object>)
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlStatement) {
sqlStatement = shardingRouter.parse(logicSQL, true);//代码@1
}
return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));//代码@2
}
代码@1
//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.parse(String, boolean)
//解析sql
@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
parsingHook.start(logicSQL);//sharding-jdbc为开发预留的钩子,我们可以实现钩子接口在解析sql前后做一些扩展
try {
SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable(), parsingResultCache).parse(useCache);//代码@1.1,解析sql的核心
parsingHook.finishSuccess(result, shardingMetaData.getTable());
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
parsingHook.finishFailure(ex);
throw ex;
}
}
代码@1处解析sql比较复杂,只需要知道是解析sql,解析结果SQLStatement,这个是也不是我们的关注点,知道有个hook接口可以在sql解析前后进行扩展即可,比如通过该Hook可以用作计算sql执行时长。
知道增删改查对对应的SQLStatement如下:
对于insert来说SQLStatement是InsertStatement。DML
对于update delete语句来说SQLStatement是DMLStatement。DML
对于select语句来说SQLStatement是SelectStatement。 DQL
SQLStatement是个逻辑sql。
类关系图如下:
代码@2
masterSlaveRouter是读写分离路由,不使用的情况下,可以忽略。
分片的路由核心实现在shardingRouter.route(logicSQL, parameters, sqlStatement)
,下面分析这个
//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List<Object>, SQLStatement)
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
? GeneratedKey.getGenerateKey(shardingRule, parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();//代码@2.1
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());//代码@2.2
OptimizeResult optimizeResult = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();//代码@2.3
if (generatedKey.isPresent()) {
setGeneratedKeys(result, generatedKey.get());//代码@2.4
}
boolean needMerge = false;
if (sqlStatement instanceof SelectStatement) {
needMerge = isNeedMergeShardingValues((SelectStatement) sqlStatement);//代码@2.5
}
if (needMerge) {
checkSubqueryShardingValues(sqlStatement, optimizeResult.getShardingConditions());
mergeShardingValues(optimizeResult.getShardingConditions());//代码@2.6
}
RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, optimizeResult).route();//代码@2.7
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit() && !routingResult.isSingleRouting()) {
result.setLimit(getProcessedLimit(parameters, (SelectStatement) sqlStatement));//代码@2.8
}
if (needMerge) {
Preconditions.checkState(1 == routingResult.getTableUnits().getTableUnits().size(), "Must have one sharding with subquery.");//代码@2.9
}
result.setRoutingResult(routingResult);//代码@2.10
result.setOptimizeResult(optimizeResult);
return result;//代码@2.11
}
从上面路由核心代码可以看出,ShardingRouter是解析和路由的核心接口,其实现类为ParsingSQLRouter,它使用四个引擎对sql进行解析、解析和重写,这四个引擎为:
-
SQLParsingEngine
解析sql,返回SQLStatement作为解析的结果。 -
OptimizeEngine
对SQLStatement进行优化,返回ShardingConditions对象。 -
RoutingEngine
根据库表分片配置以及ShardingConditions找到目标库表,返回RoutingResult对象。 -
SQLRewriteEngine
根据路由结果重写sql。
代码@2.1处:如果是insert,则生成分布式主键,GeneratedKey封装了分片键和分布式主键值。比如,insert语句,这里使用Snowflake算法生成分布式主键。
代码@2.2处:创建sql路由结果对象SQLRouteResult,封装SQLStatement和分布式主键对象GeneratedKey。此时SQLRouteResult只是包含了sql语句和主键值,并没有生成实际待执行sql。
代码@2.3处:使用OptimizeEngine对SQLStatement进行优化,返回OptimizeResult对象。该对象在重写sql时候用到,作用就是对SQLStatement进行优化,返回ShardingConditions对象。
代码@2.4处:保存生成的分布式主键。
代码@2.5处:select语句是否需要合并结果
代码@2.6处:需要合并查询结果,则合并
代码@2.7处:使用不同的RoutingEngine生成路由结果RoutingResult。比如标准分片是StandardRoutingEngine、复合分片是ComplexRoutingEngine、广播是DatabaseBroadcastRoutingEngine、不分片是DefaultDatabaseRoutingEngine等。 这里是核心代码,总体功能就是路由,找到实际的数据源和真实表
代码@2.8处:select语句设置limit。既然分库分表了,通常也就不使用分页了。
代码@2.9处:预检,需要合并结果,需要分片键在查询结果上。
代码@2.10处:把路由结果RoutingResult、优化结果OptimizeResult保存到SQLRouteResult。
代码@2.11处:返回sql路由结果对象SQLRouteResult,该对象封装了路由结果,知道要到哪个真实库去执行哪个真实表。
核心代码@2.7处分析
RoutingEngineFactory.newInstance()根据不同的分片规则采用对应的RoutingEngine生成路由结果RoutingResult,以标准分片路由为例
//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.route()
@Override
public RoutingResult route() {
return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));//1.getTableRule根据逻辑表获取TableRule,2.getDataNodes根据TableRule和分片算法获取真实的数据源和真实表Collection<DataNode>
}
//shardingRule.getTableRule(logicTableName)根据逻辑表从分片规则ShardingRule获取表规则TableRule,TableRule信息封装的较多,有逻辑表、全部数据源等
//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.getDataNodes(TableRule)
//获取真实节点,真实数据源和真实表。DataNode封装了真实数据源和真实表
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
if (shardingRule.isRoutingByHint(tableRule)) {//hint路由
return routeByHint(tableRule);
}
if (isRoutingByShardingConditions(tableRule)) {//条件路由,即非hint路由
return routeByShardingConditions(tableRule);
}
return routeByMixedConditions(tableRule);
}
private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
return optimizeResult.getShardingConditions().getShardingConditions().isEmpty() ? route(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList())
: routeByShardingConditionsWithCondition(tableRule);
}
private Collection<DataNode> route(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);//获取真实数据源
Collection<DataNode> result = new LinkedList<>();
for (String each : routedDataSources) {
result.addAll(routeTables(tableRule, each, tableShardingValues));//获取真实表
}
return result;
}
//根据分片键获取数据源
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
if (databaseShardingValues.isEmpty()) {
return availableTargetDatabases;
}
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));//这里通过分片策略调用自定义的分片算法
Preconditions.checkState(!result.isEmpty(), "no database route info");
return result;
}
//根据分片键获取DataNode,即数据源+真实表
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));//这里通过分片策略调用自定义的分片算法
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
可以看到route()方法是入口,此方法首先通过ShardingRule获取到逻辑表所对应的TableRule对象,在sharding-jdbc启动阶段,TableRule保存了逻辑表对应的实际的库表关系集合,接着根据库和表的ShardingStrategy的类型走了三个不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪个方法最终都会执行到含有三个参数的route()方法,此方法先调用routeDataSources()方法路由数据源(库),接着调用routeTables()方法路由表,路由库表的方法也很简单:
从TableRule中获取可用的库表集合。
从TableRule中获取库表的分片策略ShardingStrategy对象。
执行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的库表。
路由的结果以RoutingResult的形式返回,接着调用SQLRewriteEngine重写sql,因为此时sql中的表还只是逻辑表名,并不是具体的哪个表,接着生成SQLUnit,并最终以SQLRouteResult形式返回路由结果。
重点是个SQLRouteResult,关系较复杂,类图封装关系如下
使用xmind画出的处理流程
思维导图地址:https://gitee.com/yulewo123/mdpicture/blob/master/document/sharding-jdbc%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B.xmind
总结:
sharding-jdbc的处理流程核心就是路由,即根据分片键以及算法从从TableRule.actualDataNodes获取真实库表对象DataNode。那么TableRule是怎么来的呢?是ShardingRule根据逻辑表获取,而ShardingRule是核心,在sharding-jdbc启动时候就创建完成。
路由获取后,就可以重写sql,然后通过jdbc执行sql到真实的数据源执行真实sql。
关键debug点记录如下,工作中遇到问题,方便快速回顾debug定为问题
org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List