flink jdbc分库分表

flink jdbc分库分表实现方式

前言

在flink提供的jdbc-connector中,只支持单表的数据同步,但是在日常任务中,在随着业务量的增大,单表记录数过多,会导致数据查询效率降低,因此会将表进行拆分,使一个业务表对应多个分表。如order拆分为1024张分表:order -> order_0000~order_1023。显然对现有flink jdbc插件并不适用这种情况,下面我们将会对flink插件进行改造,达到一个flink table映射多个物理表的效果。

流程梳理

需求

  • 版本 : flink-1.11.3
  • 目标:
    • 基于 flink-jdbc-connector进行改造,在创建一个flink table情况下,同步业务表order对应的所有物理分表(order_0000~order_1023)的数据
    • 兼容现有flink-jdbc-connector配置(包括数据拆分)

分析

jdbc自带拆分配置

我们在创建flink jdbc同步作业时,一般是通过下面的来声明一个table。在可选配置中,有jdbc的分区字段和分区数选项,其结果是将一个sql,在指定数据固定范围内(scan.partition.lower-bound,scan.partition.upper-bound)根据拆分字段(scan.partition.column)和数量(scan.partition.num),将sql进行等步长拆分。

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users',
);

  • 可选配置(数据分区)
scan.partition.column:用于将输入进行分区的列名
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。

例如:同步mysql 中order表,映射到mysql查询sql为,根据id拆分范围 1~100,数量2,其最后sql共拆分为两个。每个sql可能由不同的线程(或者说task)去执行。

拆分前

select id,name from order

拆分后

select id,name from order where id between 1 and 50
select id,name from order where id between 51 and 100

其目的一方面是将大sql进行拆分,减少对数据表的锁操作,另一方面是结合flink配置的并发度,并发同步数据,增大同步效率。

结合flink-jdbc-connector数据拆分,进行表拆分

而此次需求是将一个逻辑表映射到多个物理表,其实也是对数据拆分,那么我们就可以根据flink-jdbc-connector的数据分区就行修改,最终效果是将数据由查询order,拆分为查询 order_0000~order_1023。如果用户配置了数据分区,那么再将拆分后的表进行再拆分。如果是同上面的。拆分方案,那么最终执行sql分片为1024*2个。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RLIHqRaO-1635432944409)(evernotecid://8C4A8129-FDA9-4EC4-8DCC-EF481E5C8EE9/appyinxiangcom/10436148/ENResource/p643)]

实现分表

  • flink-jdbc-connector数据拆分属性原理

在flink-jdbc-connector包中,提供JdbcParameterValuesProvider接口,这个接口被JdbcInputFormat用来计算要运行的并行查询列表(即拆分)。每个查询将使用由每个JdbcParameterValuesProvider实现提供的矩阵行进行参数化。

public interface JdbcParameterValuesProvider {

	/** Returns the necessary parameters array to use for query in parallel a table. */
	Serializable[][] getParameterValues();
}

其中getParameterValues()的返回值:Serializable[x][y]x值即为SQL拆分的数据,每个x对应的y个元素的一维数组,包含的是每个sql的变量信息,例如上述根据id进行拆分数量为2

  1. Serializable[][]的数据组成为:
//结构 :x=0~1
//Serializable[x] = {{id_min},{id_max}}
 Serializable[0] = {1,50}
 Serializable[1] = {51,100}
  1. sql模板:
select id,name from order where id between ? and ?
  • 那么对于分表来说,其变量相当于又增加了一个table_name,这样我们只需要改动两个地方,就可以实现分表的效果:
  1. 在生成Serializable[][]时,新增维度:table_name
    Serializable[][]的数据元素数为:1024*2

//结构 :x=0~2047
//Serializable[x] = {"order_{0000~1023}",{id_min},{id_max}}
 Serializable[0] = {"order_0000",1,50}
 Serializable[1] = {"order_0000",51,100}
 Serializable[2] = {"order_0001",1,50}
 Serializable[3] = {"order_0001",51,100}
 ...
 Serializable[2047] = {"order_1023",51,100}
  1. sql模板新增变量:table_name
select id,name from ${table_name} where id between ? and ?

实现分库分表

从上面的分析可看出,实现分库只要在分表的基础上再加以推导改造:

2库(10.1.1.2、10.1.1.2) 4个schema(order_00order_03),1024个表(order_0000order_1023),最终拆解如下:

  1. Serializable[][]的数据组成为:
//结构:x=0~2047
//Serializable[x] = {"{db_url}","{schema_name}","order_{0000~1023}",{id_min},{id_max}}

 Serializable[0] = {"jdbc://10.1.1.2","order_00","order_0000",1,50}
 ...
 Serializable[2047] = {"jdbc://10.1.1.3","order_03","order_1023",1,50}
  1. connection数据库连接:
    {db_url},{schema_name},这四个变量动态创建connection,
  2. sql模板:
    使用当前创建的connection,来切换数据源(分库)
select id,name from {table_name} where id between {id_min} and {id_max}

实战

上述分析,牵扯到一个重要的变量数据:Serializable[][],改造的流程是:

  1. 改造flink-jdbc-connection配置
  • url:支持多个数据库配置,并在schema支持正则表达式动态匹配数据库中的schema
  • table-name:表名支持正则匹配,可同时匹配多个表
   'url' = 'jdbc:mysql://localhost:3306/order_([0-9]{1,}),jdbc:mysql://localhost:3306/order_([0-9]{1,})',
   'table-name' = 'order_([0-9]{1,})',

  1. 解析url、table-name,匹配数据库中的schema、物理表。(代码实现省略)

    1. 查询数据库中所有schema
    2. 通过正则匹配schema
    3. 查询匹配schema下面的table
    4. 通过正则匹配表
    5. 返回数据库url与table的对应关系:List<TableItem>
  2. 新增JdbcMultiTableProvider,基于原有数据分片结果,根据分库分表,对Serializable[][]二次拆分

public class JdbcMultiTableProvider implements JdbcParameterValuesProvider {
    //匹配的数据库连接与table的对应关系
	private List<TableItem> tables;
    //原jdbc数据分片配置后的拆分结果
	private Serializable[][] partition;

	public JdbcMultiTableProvider(List<TableItem> tables) {
		this.tables = tables;
	}

	/**
	 * @return 返回拆分后的分片和数据块的对应关系,Serializable[partition][parameterValues]
	 * 启动partition为分片索引,parameterValues为每个分片对应的数据参数。
	 */
	@Override
	public Serializable[][] getParameterValues() {
		int tableNum = tables.stream().mapToInt(item -> item.getTable().size()).sum();
		int splitCount = partition == null ? tableNum : tableNum * partition.length;
		int paramLength = partition == null ? 2 : 4;
		Serializable[][] parameters = new Serializable[splitCount][paramLength];
		int splitIndex = 0;

		for (TableItem tableItem : tables) {
			for (String table : tableItem.getTable()) {
				if (partition != null) {
					for (Serializable[] serializables : partition) {
						parameters[splitIndex][0] = tableItem.getUrl();
						parameters[splitIndex][1] = table;
						//数据分片配置
						parameters[splitIndex][2] = serializables[0];
						parameters[splitIndex][3] = serializables[1];
						splitIndex++;
					}
				} else {
					parameters[splitIndex][0] = tableItem.getUrl();
					parameters[splitIndex][1] = table;
					splitIndex++;
				}
			}
		}
		return parameters;
	}
	
	public JdbcParameterValuesProvider withPartition(JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider) {
		if (null == jdbcNumericBetweenParametersProvider) {
			return this;
		}
		this.partition = jdbcNumericBetweenParametersProvider.getParameterValues();
		return this;
	}

	public static class TableItem {
		private String url;
		private List<String> table;
        //get/set..
	}
}

  1. 改造JdbcDynamicTableSource,生成基于分库分表的JdbcRowDataInputFormat对象。
	@Override
	@SuppressWarnings("unchecked")
	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
		final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
			.setDrivername(options.getDriverName())
			.setDBUrl(options.getDbURL())
			.setUsername(options.getUsername().orElse(null))
			.setPassword(options.getPassword().orElse(null));

		if (readOptions.getFetchSize() != 0) {
			builder.setFetchSize(readOptions.getFetchSize());
		}
		final JdbcDialect dialect = options.getDialect();
        JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = null;
		//数据分片配置
		if (readOptions.getPartitionColumnName().isPresent()) {
			long lowerBound = readOptions.getPartitionLowerBound().get();
			long upperBound = readOptions.getPartitionUpperBound().get();
			int numPartitions = readOptions.getNumPartitions().get();
			jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions);
		}
        //根据table分片
		List<TableItem>  tableItems = options.getTables();
		builder.setParametersProvider(new JdbcMultiTableProvider(tableItems)
				.withPartition(jdbcNumericBetweenParametersProvider, physicalSchema, readOptions.getPartitionColumnName().orElse(null)));

		final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
		builder.setRowConverter(dialect.getRowConverter(rowType));
		builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
			.createTypeInformation(physicalSchema.toRowDataType()));

		return InputFormatProvider.of(builder.build());
	}
  1. 改造JdbcRowDataInputFormat,在open(InputSplit inputSplit)中,初始化Connection、statement、以及sql查询模板。
    JdbcRowDataInputFormat整个生命周期中,每个并行实例调用一次openInputFormat()(对应关闭当前并行实例的方法:closeInputFormat())。每次切换分片,都会调用一次open(InputSplit inputSplit)(对应关闭当前数据分片方法:close()),inputSplit的值对应Serializable[x][y]中x的值递增,并且每个并行实例不会重复执行,比如有1024个分表,每个表2个数据分片,那么inputSplit.getSplitNumber()值的范围是:[0~2047]。JdbcRowDataInputFormat对象持有Serializable[x][y],并且根据open(InputSplit inputSplit)来定位当前JdbcRowDataInputFormat处理哪个分区的数据,从而达到数据分区根据并发度,并发查询的效果。

JdbcRowDataInputFormat.class

@Override
public void open(InputSplit inputSplit) throws IOException {
		try {
			//分库,分表逻辑
			Object[] params = parameterValues[inputSplit.getSplitNumber()];
			//初始化数据库连接,url= params[0].toString();
			initConnect(params);
			String url = params[0].toString();
			final JdbcDialect dialect = RdbsDialects.get(url).get();
			//数据查询模板,String table = params[1].toString();
			String queryTemplate = queryTemplate(params, dialect);
			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
          
			if (inputSplit != null && parameterValues != null) {
			//从index=2 开始为数据分片配置
				for (int i = 2; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
					Object param = parameterValues[inputSplit.getSplitNumber()][i];
					if (param instanceof String) {
						statement.setString(i - 1, (String) param);
					} else if (param instanceof Long) {
						statement.setLong(i - 1, (Long) param);
					} else if (param instanceof Integer) {
						statement.setInt(i - 1, (Integer) param);
					...
						//extends with other types if needed
						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
					}
				}
				if (LOG.isDebugEnabled()) {
					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
				}
			}
			resultSet = statement.executeQuery();
			hasNext = resultSet.next();
		} catch (SQLException se) {
			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
		}

}

基于上述步骤改造后,就可以实现从flink-jdbc-connector source端单库单表,到分库分表的改造。

上一篇:实体类Serializable 接口的作用和必要性


下一篇:Serializable 序列化和反序列化,文字不多,代码为主,自学用,谨慎借鉴