Sharding-JDBC 源码之启动流程分析

Sharding-JDBC 是 ShardingSphere 开源的分布式数据库中间件产品之一,提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。
Sharding-JDBC 在 Java 的 JDBC 层提供额外服务,它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架

接下来,让我们由简入繁,逐渐拉来 Sharding-JDBC 的序幕。
首先基于 SpringBoot2.4 快速搭建一个 Demo,主要 jar 包依赖版本分别是:

  • io.shardingsphere.sharding-jdbc-spring-boot-starter:3.0.0.M1
  • com.alibaba.druid-spring-boot-starter:1.1.17
  • org.mybatis.spring.boot.mybatis-spring-boot-starter:2.1.4

配置 application.properties :

spring.application.name=shardingjdbcdemo
server.port=8080
spring.main.allow-bean-definition-overriding=true

#数据源名称
sharding.jdbc.datasource.names=ds0,ds1
#数据源 ds0 的配置
sharding.jdbc.datasource.ds0.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds0.driver-class-name=com.mysql.cj.jdbc.Driver
sharding.jdbc.datasource.ds0.url=jdbc\:mysql\://localhost\:3306/d_sharding_demo1?useUnicode\=true&characterEncoding\=UTF-8&autoReconnect\=true&failOverReadOnly\=false&allowMultiQueries\=true&serverTimezone=GMT
sharding.jdbc.datasource.ds0.username=root
sharding.jdbc.datasource.ds0.password=123456
sharding.jdbc.datasource.ds0.initialSize=1
sharding.jdbc.datasource.ds0.minIdle=1
sharding.jdbc.datasource.ds0.maxActive=50

#数据源 ds1 的配置
sharding.jdbc.datasource.ds1.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds1.driver-class-name=com.mysql.cj.jdbc.Driver
sharding.jdbc.datasource.ds1.url=jdbc\:mysql\://localhost\:3306/d_sharding_demo2?useUnicode\=true&characterEncoding\=UTF-8&autoReconnect\=true&failOverReadOnly\=false&allowMultiQueries\=true&serverTimezone=GMT
sharding.jdbc.datasource.ds1.username=root
sharding.jdbc.datasource.ds1.password=123456
sharding.jdbc.datasource.ds1.initialSize=1
sharding.jdbc.datasource.ds1.minIdle=1
sharding.jdbc.datasource.ds1.maxActive=50

#控制台展示 sql 语句
sharding.jdbc.config.sharding.props.sql.show=true

#实际节点
sharding.jdbc.config.sharding.tables.t_goods.actual-data-nodes=ds$->{0..1}.t_goods_$->{0..1}
#分表列
sharding.jdbc.config.sharding.tables.t_goods.table-strategy.inline.shardingColumn=goods_id
#分片策略
sharding.jdbc.config.sharding.tables.t_goods.table-strategy.inline.algorithm-expression=t_goods_$->{goods_id % 2}

#分库列
sharding.jdbc.config.sharding.tables.t_goods.databaseStrategy.standard.shardingColumn=type
#分库策略
sharding.jdbc.config.sharding.tables.t_goods.databaseStrategy.standard..preciseAlgorithmClassName=com.example.shardingjdbcdemo.algorithm.MyPreciseShardingAlgorithm

客户端测试程序

@Test
public void insert() {
    GoodsDTO order = new GoodsDTO();
    for (int i = 0; i < 1; i++) {
        order.setGoodsId(i + 1L);
        order.setGoodsName("商品" + (i + 1));
        order.setType(0);
        orderInfoMapper.insertGoods(order);
    }
}

源码分析

当执行上面的客户端程序时,Springboot 会去拉取 application.properties 中的配置,下面我们来看下 sharding-jdbc-spring-boot-starter 中的类,
Sharding-JDBC 源码之启动流程分析

SpringBootShardingRuleConfigurationProperties 获取配置
@ConfigurationProperties(prefix = "sharding.jdbc.config.sharding")
public class SpringBootShardingRuleConfigurationProperties extends YamlShardingRuleConfiguration {
}

@ConfigurationProperties 注解修饰的类会将配置文件中的属性名称匹配的的值绑定到类的属性字段上。

SpringBootConfiguration 获取数据源及分库分表策略
@Configuration
@EnableConfigurationProperties({SpringBootShardingRuleConfigurationProperties.class, SpringBootMasterSlaveRuleConfigurationProperties.class})
public class SpringBootConfiguration implements EnvironmentAware {
    @Autowired
    private SpringBootShardingRuleConfigurationProperties shardingProperties;
    
    /**
     * 1. 实现 EnvironmentAware 接口,获取配置信息
     * 2. 根据配置文件信息获取数据源配置
     */
    @Override
    public void setEnvironment(final Environment environment) {
        setDataSourceMap(environment);
    }
    
    @SuppressWarnings("unchecked")
    private void setDataSourceMap(final Environment environment) {
        String prefix = "sharding.jdbc.datasource.";
        String dataSources = environment.getProperty(prefix + "names");
        // 获取数据源并存入 dataSourceMap 中
        for (String each : dataSources.split(",")) {
            try {
                Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + each, Map.class);
                Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource properties!");
                DataSource dataSource = DataSourceUtil.getDataSource(dataSourceProps.get("type").toString(), dataSourceProps);
                dataSourceMap.put(each, dataSource);
            } catch (final ReflectiveOperationException ex) {
                throw new ShardingException("Can't find datasource type!", ex);
            }
        }
    }
    
    @Bean
    public DataSource dataSource() throws SQLException {
        // 获取数据源信息
        return null == masterSlaveProperties.getMasterDataSourceName() 
                ? ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingProperties.getShardingRuleConfiguration(), shardingProperties.getConfigMap(), shardingProperties.getProps())
                : MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveProperties.getMasterSlaveRuleConfiguration(), masterSlaveProperties.getConfigMap());
    }
  • @EnableConfigurationProperties 可以使被 @ConfigurationProperties 注解的类生效;
  • 实现 EnvironmentAware 接口,可以通过 Environment 获取配置文件中的配置信息;
  • @Bean 注解的 dataSource 方法会去真正的解析并封装配置数据,即通过 shardingProperties.getShardingRuleConfiguration()
接下来看如何获取路由规则配置
public class YamlShardingRuleConfiguration {
    
    /**
     * ...... 省略部分属性定义
     */
     private Map<String, YamlTableRuleConfiguration> tables = new LinkedHashMap<>();
    
    /**
     * 将 YamlShardingRuleConfiguration 转为 ShardingRuleConfiguration
     */
    public ShardingRuleConfiguration getShardingRuleConfiguration() {
        ShardingRuleConfiguration result = new ShardingRuleConfiguration();
        result.setDefaultDataSourceName(defaultDataSourceName);
        // 遍历从配置文件中绑定的 tables 对象
        for (Entry<String, YamlTableRuleConfiguration> entry : tables.entrySet()) {
            YamlTableRuleConfiguration tableRuleConfig = entry.getValue();
            tableRuleConfig.setLogicTable(entry.getKey());
            // 将构建完的 tableRuleConfig 添加到 tableRuleConfigs 中
            result.getTableRuleConfigs().add(tableRuleConfig.build());
        }
        // 获取绑定表
        result.getBindingTableGroups().addAll(bindingTables);
        // 获取默认分库策略
        if (null != defaultDatabaseStrategy) {
            result.setDefaultDatabaseShardingStrategyConfig(defaultDatabaseStrategy.build());
        }
        // 获取默认分表策略
        if (null != defaultTableStrategy) {
            result.setDefaultTableShardingStrategyConfig(defaultTableStrategy.build());
        }
        // 获取默认主键生成器
        if (null != defaultKeyGeneratorClassName) {
            result.setDefaultKeyGenerator(KeyGeneratorFactory.newInstance(defaultKeyGeneratorClassName));
        }
        // 获取主从路由配置
        Collection<MasterSlaveRuleConfiguration> masterSlaveRuleConfigs = new LinkedList<>();
        for (Entry<String, YamlMasterSlaveRuleConfiguration> entry : masterSlaveRules.entrySet()) {
            YamlMasterSlaveRuleConfiguration each = entry.getValue();
            each.setName(entry.getKey());
            masterSlaveRuleConfigs.add(entry.getValue().getMasterSlaveRuleConfiguration());
        }
        result.setMasterSlaveRuleConfigs(masterSlaveRuleConfigs);
        return result;
    }
}

在上面的 getShardingRuleConfiguration 中,关键是在 tables 的遍历上面,会逐一构建各个表的路由规则。

循环调用 tableRuleConfig.build() 构建表的规则配置:
    public TableRuleConfiguration build() {
        Preconditions.checkNotNull(logicTable, "Logic table cannot be null.");
        // 表配置对象
        TableRuleConfiguration result = new TableRuleConfiguration();
        result.setLogicTable(logicTable);
        result.setActualDataNodes(actualDataNodes);
        // 设置分库策略
        if (null != databaseStrategy) {
            // 构建分库策略
            result.setDatabaseShardingStrategyConfig(databaseStrategy.build());
        }
        // 设置分表策略
        if (null != tableStrategy) {
            result.setTableShardingStrategyConfig(tableStrategy.build());
        }
        if (!Strings.isNullOrEmpty(keyGeneratorClassName)) {
            result.setKeyGenerator(KeyGeneratorFactory.newInstance(keyGeneratorClassName));
        }
        result.setKeyGeneratorColumnName(keyGeneratorColumnName);
        result.setLogicIndex(logicIndex);
        return result;
    }

在构建表配置时,会去封装逻辑表名,实际节点以及分库分表策略。

构建分库分表策略databaseStrategy.build()
    public ShardingStrategyConfiguration build() {
        int shardingStrategyConfigCount = 0;
        ShardingStrategyConfiguration result = null;
        // 首先获取标准策略配置
        if (null != standard) {
            shardingStrategyConfigCount++;
            // 只创建精确分片算法
            if (null == standard.getRangeAlgorithmClassName()) {
                result = new StandardShardingStrategyConfiguration(standard.getShardingColumn(),
                        ShardingAlgorithmFactory.newInstance(standard.getPreciseAlgorithmClassName(), PreciseShardingAlgorithm.class));
            } else {
                // 创建包含范围和精确的分片算法
                result = new StandardShardingStrategyConfiguration(standard.getShardingColumn(),
                        ShardingAlgorithmFactory.newInstance(standard.getPreciseAlgorithmClassName(), PreciseShardingAlgorithm.class),
                        ShardingAlgorithmFactory.newInstance(standard.getRangeAlgorithmClassName(), RangeShardingAlgorithm.class));
            }
            
        }
        // 复杂分片算法
        if (null != complex) {
            shardingStrategyConfigCount++;
            result = new ComplexShardingStrategyConfiguration(complex.getShardingColumns(), ShardingAlgorithmFactory.newInstance(complex.getAlgorithmClassName(), ComplexKeysShardingAlgorithm.class));
        }
        // 内联表达式分片算法
        if (null != inline) {
            shardingStrategyConfigCount++;
            result = new InlineShardingStrategyConfiguration(inline.getShardingColumn(), inline.getAlgorithmExpression());
        }
        // 暗示分片算法,需要根据业务场景手工设置
        if (null != hint) {
            shardingStrategyConfigCount++;
            result = new HintShardingStrategyConfiguration(ShardingAlgorithmFactory.newInstance(hint.getAlgorithmClassName(), HintShardingAlgorithm.class));
        }
        if (null != none) {
            shardingStrategyConfigCount++;
            result = new NoneShardingStrategyConfiguration();
        }
        // 断言分库或分表最多只能含有一个算法,配置多个分片算法将抛出异常
        Preconditions.checkArgument(shardingStrategyConfigCount <= 1, "Only allowed 0 or 1 sharding strategy configuration.");
        return result;
    }

通过以上步骤,就已经将分库分表的所有配置信息封装完成,接下来将会初始化 ShardingDataSource,在初始化 ShardingDataSource 时会校验数据库表字段索引完成性及创建 ShardingContext

校验数据库完成性

回到 SpringBootConfigurationdataSource 方法上

    @Bean
    public DataSource dataSource() throws SQLException {
        return null == masterSlaveProperties.getMasterDataSourceName() 
                ? ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingProperties.getShardingRuleConfiguration(), shardingProperties.getConfigMap(), shardingProperties.getProps())
                : MasterSlaveDataSourceFactory.createDataSource(dataSourceMap, masterSlaveProperties.getMasterSlaveRuleConfiguration(), masterSlaveProperties.getConfigMap());
    }

在执行 ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingProperties.getShardingRuleConfiguration(), shardingProperties.getConfigMap(), shardingProperties.getProps()) 时会去初始化 ShardingDataSource,接下来看下 ShardingDataSource 的构造函数:

    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Map<String, Object> configMap, final Properties props) throws SQLException {
        super(dataSourceMap.values());
        if (!configMap.isEmpty()) {
            ConfigMapContext.getInstance().getShardingConfig().putAll(configMap);
        }
        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
        // 从环境变量中获取工作线程数,默认是虚拟机的处理器数量
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        // 创建执行引擎线程池
        executorEngine = new ExecutorEngine(executorSize);
        ShardingMetaData shardingMetaData = new JDBCShardingMetaData(dataSourceMap, shardingRule, getDatabaseType());
        // 初始化 ShardingMetaData 并进行校验
        shardingMetaData.init(shardingRule);
        boolean showSQL = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW);
        // 创建上下文
        shardingContext = new ShardingContext(dataSourceMap, shardingRule, getDatabaseType(), executorEngine, shardingMetaData, showSQL);
    }

ShardingDataSource 构造函数中会进行一系列的初始化,其中包括 init 方法:

    public void init(final ShardingRule shardingRule) throws SQLException {
        tableMetaDataMap = new HashMap<>(shardingRule.getTableRules().size(), 1);
        for (TableRule each : shardingRule.getTableRules()) {
            refresh(each, shardingRule);
        }
    }

init 方法会循环遍历 tableRules,获取表的元数据,具体实现在 refresh 中,refresh 方法会调用 getTableMetaData 方法来获取表元数据并进行表结构对比:

    private TableMetaData getTableMetaData(final String logicTableName, final List<DataNode> actualDataNodes,
                                           final ShardingDataSourceNames shardingDataSourceNames, final Map<String, Connection> connectionMap) throws SQLException {
        Collection<ColumnMetaData> result = null;
        for (DataNode each : actualDataNodes) {
            Collection<ColumnMetaData> columnMetaDataList = getColumnMetaDataList(each, shardingDataSourceNames, connectionMap);
            // 将第一次循环获得的 columnMetaDataList 设置给 result            
            if (null == result) {
                result = columnMetaDataList;
            }
            // 并以第一次的作为参照与其他分表进行对比
            if (!result.equals(columnMetaDataList)) {
                // 当表结构不一致时抛出异常
                throw new ShardingException(getErrorMsgOfTableMetaData(logicTableName, result, columnMetaDataList));
            }
        }
        return new TableMetaData(result);
    }

那么是根据什么判断表结构不一致的呢?我们看下 ColumnMetaData 有哪些元素便知:

public final class ColumnMetaData {
    
    private final String columnName;
    
    private final String columnType;
    
    private final String keyType;
}
  1. ColumnMetaData 的数据结构上,我们看到,只有当列名称、列类型(包括长度)、索引类型完全一致时,才认为表结构是完成的。
  2. 在获得表的元数据后,将元数据封装在ShardingMetaData.tableMetaDataMap 中并依次返,至此 ShardingDataSource 创建完成,Sharding-JDBC 的启动流程已全部完成。

总结

  1. Sharding-JDBC 启动时主要是拉取分库分表的配置,包括数据源、分片策略,并校验配置是否有误;
  2. 同时,会校验逻辑表在实际节点的表结构是否一致,通过列名称、列类型与 Key 类型进行判断,全部相同时才认为表结构一致;
  3. 封装表元数据并创建 ShardingDataSource 数据源对象。

扩展:Mysql 表结构中的 Key 有哪些及含义是什么?

Key 的含义:表示该列是否被索引

Key 类型 含义
“” 该列没有索引或属于组合索引中的辅助索引
PRI 该列是主键索引或是组合主键索引之一的列
UNI 该列是唯一索引的第一列(唯一索引允许空值)
MUL 该列是非唯一索引的第一列,也就是普通索引
  • 如果多个 Key 类型应用于表的给定列,则 Key 按 PRIUNIMUL; 的顺序显示优先级最高的一个;
  • 如果一个唯一索引不能包含空值并且表中没有主键,那么它可以显示为PRI
  • 如果多个列组成一个复合唯一索引,则唯一索引可能显示为MUL,尽管这些列的组合是唯一的,但每列仍然可以包含给定值的多个引用。
上一篇:sharding-proxy总揽


下一篇:[总结] 数据库分库分表sharding-jdbc 实践