sharding-jdbc处理流程源码分析

目录

前言

sharding-jdbc主要功能是分片,我们实现不同分片算法来进行分库分表,另外一个扩展点就是主键生成, 本文主要记录下sharding-jdbc执行流程和分片路由具体实现以及主键生成,在工作中方便排查问题。

主要记录三个问题:

1.sharding-jdbc执行流程

2.自定义分片算法是如何被sharding-jdbc框架调用的

3.主键是在何处何时生成

4.扩展机制spi

1. sharding-jdbc处理流程

操作数据库套路是:数据源获取数据库连接,数据库连接生成Statement,然后执行Statement,获取sql执行结果。

那么对于sharding来说

入口获取数据库连接就是ShardingDataSource.getConnection()

接着生成PreparedStatement:ShardingConnection.prepareStatement(String),生成ShardingPreparedStatement

对于增删改查就是ShardingPreparedStatement的execute()、executeUpdate()、executeQuery()、、

execute()为例:

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.execute()
@Override
public boolean execute() throws SQLException {
    try {
        clearPrevious();//本地缓存清空
        shard();//路由,路由结果保存到this.routeResult。核心功能
        initPreparedStatementExecutor();//初始化执行器
        return preparedStatementExecutor.execute();//真实sql执行jdbc操作
    } finally {
        clearBatch();
    }
}

分析核心路由功能shard()

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.shard()
private void shard() {
        routeResult = shardingEngine.shard(sql, getParameters());
    }

//org.apache.shardingsphere.core.BaseShardingEngine.shard(String, List<Object>)
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
    List<Object> clonedParameters = cloneParameters(parameters);
    SQLRouteResult result = route(sql, clonedParameters);//路由核心实现
    result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));//非hint,重写sql
    if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
        boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
        SQLLogger.logSQL(sql, showSimple, result.getSqlStatement(), result.getRouteUnits());//打印真实sql
    }
    return result;
}

//org.apache.shardingsphere.core.PreparedQueryShardingEngine.route(String, List<Object>)
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
    return routingEngine.route(parameters);
}
//org.apache.shardingsphere.core.route.PreparedStatementRoutingEngine.route(List<Object>)
public SQLRouteResult route(final List<Object> parameters) {
    if (null == sqlStatement) {
        sqlStatement = shardingRouter.parse(logicSQL, true);//代码@1
    }
    return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));//代码@2
}

代码@1

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.parse(String, boolean) 
//解析sql
@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
    parsingHook.start(logicSQL);//sharding-jdbc为开发预留的钩子,我们可以实现钩子接口在解析sql前后做一些扩展
    try {
        SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable(), parsingResultCache).parse(useCache);//代码@1.1,解析sql的核心
        parsingHook.finishSuccess(result, shardingMetaData.getTable());
        return result;
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        parsingHook.finishFailure(ex);
        throw ex;
    }
}

代码@1处解析sql比较复杂,只需要知道是解析sql,解析结果SQLStatement,这个是也不是我们的关注点,知道有个hook接口可以在sql解析前后进行扩展即可,比如通过该Hook可以用作计算sql执行时长。

知道增删改查对对应的SQLStatement如下:

对于insert来说SQLStatement是InsertStatement。DML

对于update delete语句来说SQLStatement是DMLStatement。DML

对于select语句来说SQLStatement是SelectStatement。 DQL

SQLStatement是个逻辑sql。

类关系图如下:

sharding-jdbc处理流程源码分析

代码@2

masterSlaveRouter是读写分离路由,不使用的情况下,可以忽略。

分片的路由核心实现在shardingRouter.route(logicSQL, parameters, sqlStatement),下面分析这个

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List<Object>, SQLStatement)
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
                ? GeneratedKey.getGenerateKey(shardingRule, parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();//代码@2.1
        SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());//代码@2.2
        OptimizeResult optimizeResult = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();//代码@2.3
        if (generatedKey.isPresent()) {
            setGeneratedKeys(result, generatedKey.get());//代码@2.4
        }
        boolean needMerge = false;
        if (sqlStatement instanceof SelectStatement) {
            needMerge = isNeedMergeShardingValues((SelectStatement) sqlStatement);//代码@2.5
        }
        if (needMerge) {
            checkSubqueryShardingValues(sqlStatement, optimizeResult.getShardingConditions());
            mergeShardingValues(optimizeResult.getShardingConditions());//代码@2.6
        }
        RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, optimizeResult).route();//代码@2.7
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit() && !routingResult.isSingleRouting()) {
            result.setLimit(getProcessedLimit(parameters, (SelectStatement) sqlStatement));//代码@2.8
        }
        if (needMerge) {
            Preconditions.checkState(1 == routingResult.getTableUnits().getTableUnits().size(), "Must have one sharding with subquery.");//代码@2.9
        }
        result.setRoutingResult(routingResult);//代码@2.10
        result.setOptimizeResult(optimizeResult);
        return result;//代码@2.11
    }

从上面路由核心代码可以看出,ShardingRouter是解析和路由的核心接口,其实现类为ParsingSQLRouter,它使用四个引擎对sql进行解析、解析和重写,这四个引擎为:

  • SQLParsingEngine
    解析sql,返回SQLStatement作为解析的结果。

  • OptimizeEngine
    对SQLStatement进行优化,返回ShardingConditions对象。

  • RoutingEngine
    根据库表分片配置以及ShardingConditions找到目标库表,返回RoutingResult对象。

  • SQLRewriteEngine
    根据路由结果重写sql。

代码@2.1处:如果是insert,则生成分布式主键,GeneratedKey封装了分片键和分布式主键值。比如,insert语句,这里使用Snowflake算法生成分布式主键。

代码@2.2处:创建sql路由结果对象SQLRouteResult,封装SQLStatement和分布式主键对象GeneratedKey。此时SQLRouteResult只是包含了sql语句和主键值,并没有生成实际待执行sql。

代码@2.3处:使用OptimizeEngine对SQLStatement进行优化,返回OptimizeResult对象。该对象在重写sql时候用到,作用就是对SQLStatement进行优化,返回ShardingConditions对象。

代码@2.4处:保存生成的分布式主键。

代码@2.5处:select语句是否需要合并结果

代码@2.6处:需要合并查询结果,则合并

代码@2.7处:使用不同的RoutingEngine生成路由结果RoutingResult。比如标准分片是StandardRoutingEngine、复合分片是ComplexRoutingEngine、广播是DatabaseBroadcastRoutingEngine、不分片是DefaultDatabaseRoutingEngine等。 这里是核心代码,总体功能就是路由,找到实际的数据源和真实表

sharding-jdbc处理流程源码分析

代码@2.8处:select语句设置limit。既然分库分表了,通常也就不使用分页了。

代码@2.9处:预检,需要合并结果,需要分片键在查询结果上。

代码@2.10处:把路由结果RoutingResult、优化结果OptimizeResult保存到SQLRouteResult。

代码@2.11处:返回sql路由结果对象SQLRouteResult,该对象封装了路由结果,知道要到哪个真实库去执行哪个真实表。

核心代码@2.7处分析

RoutingEngineFactory.newInstance()根据不同的分片规则采用对应的RoutingEngine生成路由结果RoutingResult,以标准分片路由为例

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.route()
@Override
public RoutingResult route() {
    return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));//1.getTableRule根据逻辑表获取TableRule,2.getDataNodes根据TableRule和分片算法获取真实的数据源和真实表Collection<DataNode>
}
//shardingRule.getTableRule(logicTableName)根据逻辑表从分片规则ShardingRule获取表规则TableRule,TableRule信息封装的较多,有逻辑表、全部数据源等

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.getDataNodes(TableRule)
//获取真实节点,真实数据源和真实表。DataNode封装了真实数据源和真实表
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
    if (shardingRule.isRoutingByHint(tableRule)) {//hint路由
        return routeByHint(tableRule);
    }
    if (isRoutingByShardingConditions(tableRule)) {//条件路由,即非hint路由
        return routeByShardingConditions(tableRule);
    }
    return routeByMixedConditions(tableRule);
}

private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
    return optimizeResult.getShardingConditions().getShardingConditions().isEmpty() ? route(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList())
        : routeByShardingConditionsWithCondition(tableRule);
}

private Collection<DataNode> route(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
    Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);//获取真实数据源
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedDataSources) {
        result.addAll(routeTables(tableRule, each, tableShardingValues));//获取真实表
    }
    return result;
}

//根据分片键获取数据源
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
    Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
    if (databaseShardingValues.isEmpty()) {
        return availableTargetDatabases;
    }
    Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));//这里通过分片策略调用自定义的分片算法
    Preconditions.checkState(!result.isEmpty(), "no database route info");
    return result;
}

//根据分片键获取DataNode,即数据源+真实表
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
    Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
    Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                                                          : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));//这里通过分片策略调用自定义的分片算法
    Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedTables) {
        result.add(new DataNode(routedDataSource, each));
    }
    return result;
}

可以看到route()方法是入口,此方法首先通过ShardingRule获取到逻辑表所对应的TableRule对象,在sharding-jdbc启动阶段,TableRule保存了逻辑表对应的实际的库表关系集合,接着根据库和表的ShardingStrategy的类型走了三个不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪个方法最终都会执行到含有三个参数的route()方法,此方法先调用routeDataSources()方法路由数据源(库),接着调用routeTables()方法路由表,路由库表的方法也很简单:

从TableRule中获取可用的库表集合。
从TableRule中获取库表的分片策略ShardingStrategy对象。
执行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的库表。
路由的结果以RoutingResult的形式返回,接着调用SQLRewriteEngine重写sql,因为此时sql中的表还只是逻辑表名,并不是具体的哪个表,接着生成SQLUnit,并最终以SQLRouteResult形式返回路由结果。

重点是个SQLRouteResult,关系较复杂,类图封装关系如下

sharding-jdbc处理流程源码分析

使用xmind画出的处理流程

sharding-jdbc处理流程源码分析

思维导图地址:https://gitee.com/yulewo123/mdpicture/blob/master/document/sharding-jdbc%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B.xmind

总结:

sharding-jdbc的处理流程核心就是路由,即根据分片键以及算法从从TableRule.actualDataNodes获取真实库表对象DataNode。那么TableRule是怎么来的呢?是ShardingRule根据逻辑表获取,而ShardingRule是核心,在sharding-jdbc启动时候就创建完成。

路由获取后,就可以重写sql,然后通过jdbc执行sql到真实的数据源执行真实sql。

关键debug点记录如下,工作中遇到问题,方便快速回顾debug定为问题

org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List, SQLStatement) 路由核心

2.sharding-jdbc的扩展点

sharding-jdbc设计采用jdk的spi进行扩展,所有扩展注册都会调用org.apache.shardingsphere.core.spi.NewInstanceServiceLoader.register(Class<T>),因此跟踪这个方法调用如下

sharding-jdbc处理流程源码分析

前面三个是在启动过程进行注册,后面五个是在首次运行过程中进行注册。

列举出sharding的spi扩展点

启动过程

org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm 读写分离扩展,可以扩展使用什么算法来选择对应的数据源。

org.apache.shardingsphere.spi.encrypt.ShardingEncryptor 加密方式扩展,扩展加密方式

interface org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator 分布式主键生成,可以通过实行这个接口增加分布式主键生成算法

首次运行

interface org.apache.shardingsphere.core.execute.hook.RootInvokeHook 根调用钩子,具体就是在创建shardingConnection时候就启用,关闭时候完成,可以用于统计一个sharding从开始执行到执行结束耗时,可用于监控

interface org.apache.shardingsphere.core.parse.hook.ParsingHook sql解析钩子,比如可以用于统计sql解析耗时等

interface org.apache.shardingsphere.core.parse.spi.ShardingParseEngine 解析引擎,sharding-jdbc已经针对每个数据库类型增加了对应的解析引擎

interface org.apache.shardingsphere.core.rewrite.hook.RewriteHook 重写sql钩子

interface org.apache.shardingsphere.core.execute.hook.SQLExecutionHook sql执行钩子,可以计算sql执行时长,用于监控

以上这些spi接口,断点打在这些spi接口的子类hook上即可,观测到调用。

比如添加一个自定义打印sql执行耗时:

public class CustomShardingSQLExecutionHook implements SQLExecutionHook {

	@Override
	public void start(RouteUnit routeUnit, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread,
			Map<String, Object> shardingExecuteDataMap) {
		// TODO Auto-generated method stub
		System.err.println("start");//控制台红色打印
	}

	@Override
	public void finishSuccess() {
		// TODO Auto-generated method stub
		System.err.println("finishSuccess");

	}

	@Override
	public void finishFailure(Exception cause) {
		// TODO Auto-generated method stub
		System.err.println("finishFailure");
	}
}
//同时在resources下创建META-INF/services/org.apache.shardingsphere.core.execute.hook.SQLExecutionHook,内容如下
//com.zyj.sharding.hook.CustomShardingSQLExecutionHook
//这样自定义的sql执行钩子就生效了,就是jdk的spi写法。

参考 https://www.jianshu.com/p/4cb5b2b68f8e

上一篇:springboot整合sharding-jdbc操作


下一篇:java-Zookeeper用于分配分片索引