开篇
这篇文章的目的是讲解RM Executor模块当中一些通用的方法,这些方法在各个Executor的父类当中实现的,各个子类Executor模块都会复用,因此抽取出来统一的进行讲解。
个人是认为抽取通用的内容放在一篇文章讲解完后可以针对每类Executor讲解特有的功能,这样能够有更好的理解。这篇文章讲解Executor的父类BaseTransactionalExecutor。
类依赖图
说明:
- 着重讲解BaseTransactionalExecutor抽象父类。
BaseTransactionalExecutor通用方法介绍
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected StatementProxy<S> statementProxy;
protected StatementCallback<T, S> statementCallback;
protected SQLRecognizer sqlRecognizer;
private TableMeta tableMeta;
public BaseTransactionalExecutor(StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
this.statementProxy = statementProxy;
this.statementCallback = statementCallback;
this.sqlRecognizer = sqlRecognizer;
}
}
说明:
- BaseTransactionalExecutor的核心变量SQLRecognizer sqlRecognizer。
- SQLRecognizer sqlRecognizer的是通过构造函数进行赋值的。
- SQLRecognizer是由SQLVisitorFactory的工厂方法返回生成的。
public class SQLVisitorFactory {
public static SQLRecognizer get(String sql, String dbType) {
List<SQLStatement> asts = SQLUtils.parseStatements(sql, dbType);
if (asts == null || asts.size() != 1) {
throw new UnsupportedOperationException("Unsupported SQL: " + sql);
}
SQLRecognizer recognizer = null;
SQLStatement ast = asts.get(0);
if (JdbcConstants.MYSQL.equalsIgnoreCase(dbType)) {
if (ast instanceof SQLInsertStatement) {
recognizer = new MySQLInsertRecognizer(sql, ast);
} else if (ast instanceof SQLUpdateStatement) {
recognizer = new MySQLUpdateRecognizer(sql, ast);
} else if (ast instanceof SQLDeleteStatement) {
recognizer = new MySQLDeleteRecognizer(sql, ast);
} else if (ast instanceof SQLSelectStatement) {
if (((SQLSelectStatement)ast).getSelect().getQueryBlock().isForUpdate()) {
recognizer = new MySQLSelectForUpdateRecognizer(sql, ast);
}
}
} else {
throw new UnsupportedOperationException("Just support MySQL by now!");
}
return recognizer;
}
}
说明:
- SQLRecognizer 根据不同的SQL类型生成的。
- MySQLInsertRecognizer MySQLUpdateRecognizer MySQLDeleteRecognizer MySQLSelectForUpdateRecognizer。
- SQLUtils.parseStatements作为druid开源包提供的功能负责返回SQLStatement对象。
- SQLStatement是根据不同SQL生产的SQL会话对象包含SQL含有的一些通用信息。
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected String buildWhereConditionByPKs(List<Field> pkRows) throws SQLException {
StringBuffer whereConditionAppender = new StringBuffer();
for (int i = 0; i < pkRows.size(); i++) {
Field field = pkRows.get(i);
whereConditionAppender.append(getColumnNameInSQL(field.getName()) + " = ?");
if (i < (pkRows.size() - 1)) {
whereConditionAppender.append(" OR ");
}
}
return whereConditionAppender.toString();
}
}
说明:
- 根据primary key生成where条件语句。
- 根据主键的个数构建 select x from table where c1=? OR c2=?格式。
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected String getColumnNameInSQL(String columnName) {
String tableAlias = sqlRecognizer.getTableAlias();
if (tableAlias == null) {
return columnName;
} else {
return tableAlias + "." + columnName;
}
}
}
说明:
- 获取SQL当中的列名,通过sqlRecognizer获取别名。
- 如果有别名那么列名就是 tableAlias.columnName,否则就是columnName。
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected String getFromTableInSQL() {
String tableName = sqlRecognizer.getTableName();
String tableAlias = sqlRecognizer.getTableAlias();
if (tableAlias == null) {
return tableName;
} else {
return tableName + " " + tableAlias;
}
}
}
说明:
- 获取SQL当中的表名,通过sqlRecognizer获取别名。
- 如果有别名那么列名就是 tableName tableAlias,否则就是tableName。
- 类似select * from tableName tableAlias。
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected TableMeta getTableMeta() {
return getTableMeta(sqlRecognizer.getTableName());
}
protected TableMeta getTableMeta(String tableName) {
if (tableMeta != null) {
return tableMeta;
}
tableMeta = TableMetaCache.getTableMeta(statementProxy.getConnectionProxy().getDataSourceProxy(), tableName);
return tableMeta;
}
}
说明:
- 获取Table的Meta数据,通过TableMetaCache.getTableMeta()操作实现。
- TableMetaCache.getTableMeta()是实现Cache功能的元数据获取功能。
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected String buildLockKey(TableRecords rowsIncludingPK) {
if (rowsIncludingPK.size() == 0) {
return null;
}
StringBuilder sb = new StringBuilder();
sb.append(rowsIncludingPK.getTableMeta().getTableName());
sb.append(":");
boolean flag = false;
for (Field field : rowsIncludingPK.pkRows()) {
if (flag) {
sb.append(",");
} else {
flag = true;
}
sb.append(field.getValue());
}
return sb.toString();
}
}
说明:
- buildLockKey实现锁定key的生成逻辑,表名+主键列名的拼接。
- buildLockKey的实现逻辑:tableName:pkName1,pkName2.
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
SQLType sqlType = sqlRecognizer.getSQLType();
String tableName = sqlRecognizer.getTableName();
SQLUndoLog sqlUndoLog = new SQLUndoLog();
sqlUndoLog.setSqlType(sqlType);
sqlUndoLog.setTableName(tableName);
sqlUndoLog.setBeforeImage(beforeImage);
sqlUndoLog.setAfterImage(afterImage);
return sqlUndoLog;
}
}
说明:
- prepareUndoLog负责生产待回滚记录的日志,按照执行前镜像和执行后镜像对比生成。
- 回滚日志的存储对象是类SQLUndoLog。
public class TableMetaCache {
public static TableMeta getTableMeta(DataSourceProxy dataSourceProxy, String tableName) {
return getTableMeta(dataSourceProxy.getTargetDataSource(), tableName);
}
public static TableMeta getTableMeta(final DruidDataSource druidDataSource, final String tableName) {
String dataSourceKey = druidDataSource.getUrl();
TableMeta tmeta = null;
final String key = dataSourceKey + "." + tableName;
try {
tmeta = TABLE_META_CACHE.get(key, new Callable<TableMeta>() {
@Override
public TableMeta call() throws Exception {
return fetchSchema(druidDataSource, tableName);
}
});
} catch (ExecutionException e) {
}
if (tmeta == null) {
try {
tmeta = fetchSchema(druidDataSource, tableName);
} catch (SQLException e) {
}
}
return tmeta;
}
private static TableMeta fetchSchema(DruidDataSource druidDataSource, String tableName) throws SQLException {
return fetchSchemeInDefaultWay(druidDataSource, tableName);
}
private static TableMeta fetchSchemeInDefaultWay(DruidDataSource druidDataSource, String tableName)
throws SQLException {
Connection conn = null;
java.sql.Statement stmt = null;
java.sql.ResultSet rs = null;
try {
conn = druidDataSource.getConnection();
stmt = conn.createStatement();
StringBuffer sb = new StringBuffer("SELECT * FROM " + tableName + " LIMIT 1");
rs = stmt.executeQuery(sb.toString());
ResultSetMetaData rsmd = rs.getMetaData();
DatabaseMetaData dbmd = conn.getMetaData();
return resultSetMetaToSchema(rsmd, dbmd, tableName);
} catch (Exception e) {
if (e instanceof SQLException) {
throw ((SQLException)e);
}
throw new SQLException("Failed to fetch schema of " + tableName, e);
} finally {
}
}
private static TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String tableName)
throws SQLException {
String schemaName = rsmd.getSchemaName(1);
String catalogName = rsmd.getCatalogName(1);
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
java.sql.ResultSet rs1 = dbmd.getColumns(catalogName, schemaName, tableName, "%");
while (rs1.next()) {
ColumnMeta col = new ColumnMeta();
col.setTableCat(rs1.getString("TABLE_CAT"));
col.setTableSchemaName(rs1.getString("TABLE_SCHEM"));
col.setTableName(rs1.getString("TABLE_NAME"));
col.setColumnName(rs1.getString("COLUMN_NAME"));
col.setDataType(rs1.getInt("DATA_TYPE"));
col.setDataTypeName(rs1.getString("TYPE_NAME"));
col.setColumnSize(rs1.getInt("COLUMN_SIZE"));
col.setDecimalDigits(rs1.getInt("DECIMAL_DIGITS"));
col.setNumPrecRadix(rs1.getInt("NUM_PREC_RADIX"));
col.setNullAble(rs1.getInt("NULLABLE"));
col.setRemarks(rs1.getString("REMARKS"));
col.setColumnDef(rs1.getString("COLUMN_DEF"));
col.setSqlDataType(rs1.getInt("SQL_DATA_TYPE"));
col.setSqlDatetimeSub(rs1.getInt("SQL_DATETIME_SUB"));
col.setCharOctetLength(rs1.getInt("CHAR_OCTET_LENGTH"));
col.setOrdinalPosition(rs1.getInt("ORDINAL_POSITION"));
col.setIsNullAble(rs1.getString("IS_NULLABLE"));
col.setIsAutoincrement(rs1.getString("IS_AUTOINCREMENT"));
tm.getAllColumns().put(col.getColumnName(), col);
}
java.sql.ResultSet rs2 = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
String indexName = "";
while (rs2.next()) {
indexName = rs2.getString("INDEX_NAME");
String colName = rs2.getString("COLUMN_NAME");
ColumnMeta col = tm.getAllColumns().get(colName);
if (tm.getAllIndexes().containsKey(indexName)) {
IndexMeta index = tm.getAllIndexes().get(indexName);
index.getValues().add(col);
} else {
IndexMeta index = new IndexMeta();
index.setIndexName(indexName);
index.setNonUnique(rs2.getBoolean("NON_UNIQUE"));
index.setIndexQualifier(rs2.getString("INDEX_QUALIFIER"));
index.setIndexName(rs2.getString("INDEX_NAME"));
index.setType(rs2.getShort("TYPE"));
index.setOrdinalPosition(rs2.getShort("ORDINAL_POSITION"));
index.setAscOrDesc(rs2.getString("ASC_OR_DESC"));
index.setCardinality(rs2.getInt("CARDINALITY"));
index.getValues().add(col);
if ("PRIMARY".equalsIgnoreCase(indexName) || indexName.equalsIgnoreCase(
rsmd.getTableName(1) + "_pkey")) {
index.setIndextype(IndexType.PRIMARY);
} else if (index.isNonUnique() == false) {
index.setIndextype(IndexType.Unique);
} else {
index.setIndextype(IndexType.Normal);
}
tm.getAllIndexes().put(indexName, index);
}
}
IndexMeta index = tm.getAllIndexes().get(indexName);
if (index.getIndextype().value() != 0) {
if ("H2 JDBC Driver".equals(dbmd.getDriverName())) {
if (indexName.length() > 11 && "PRIMARY_KEY".equalsIgnoreCase(indexName.substring(0, 11))) {
index.setIndextype(IndexType.PRIMARY);
}
} else if (dbmd.getDriverName() != null && dbmd.getDriverName().toLowerCase().indexOf("postgresql") >= 0) {
if ((tableName + "_pkey").equalsIgnoreCase(indexName)) {
index.setIndextype(IndexType.PRIMARY);
}
}
}
return tm;
}
}
说明:
- TableMetaCache提供获取表数据的功能,包括缓存功能。
- 元数据功能包括数据库本身元数据和返回结果元数据。
- 通过DataSource->Connection->Statement的逻辑执行SQL语句获取ResultSetMetaData结果元数据。
- 通过Connection获取DatabaseMetaData dbmd数据库元数据