目录
前言
第一步下载源码
第二步修改源码
1、Oraclewriter
2、WriterUtil
2.1、修改getWriteTemplate方法
2.2、新增onMergeIntoDoString与getStrings方法
3、CommonRdbmsWriter
3.1、修改startWriteWithConnection
3.2、修改doBatchInsert
3.3、修改fillPreparedStatement
第三步打包
第四步脚本修改
修改后jar包地址
前言
目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。
原理:oracle 不支持类似 MySQL的 REPLACE INTO
和 INSERT … ON DUPLICATE KEY UPDATE
,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。
MERGE INTO [target-table] A USING [source-table sql] B
ON([conditional expression] and [...]...)
WHEN MATCHED THEN
[UPDATE sql]
WHEN NOT MATCHED THEN
[INSERT sql]
第一步下载源码
地址:datax_v202309。
第二步修改源码
一共修改3个文件
1、Oraclewriter
找到该代码直接注释掉就行。
2、WriterUtil
2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
boolean update = writeMode.trim().toLowerCase().startsWith("update");
boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
|| writeMode.trim().toLowerCase().startsWith("replace")
|| update;
if (!isWriteModeLegal) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
}
// && writeMode.trim().toLowerCase().startsWith("replace")
String writeDataSqlTemplate;
if (forceUseUpdate || update) {
//update只在mysql下使用
if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {
writeDataSqlTemplate = new StringBuilder()
.append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
.append(") VALUES(").append(StringUtils.join(valueHolders, ","))
.append(")")
.append(onDuplicateKeyUpdateString(columnHolders))
.toString();
}
//update在Oracle下使用
else if (dataBaseType == DataBaseType.Oracle) {
writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +
StringUtils.join(columnHolders, ",") +
") VALUES(" + StringUtils.join(valueHolders, ",") +")";
}else {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));
}
} else {
//这里是保护,如果其他错误的使用了update,需要更换为replace
if (update) {
writeMode = "replace";
}
writeDataSqlTemplate = new StringBuilder().append(writeMode)
.append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
.append(") VALUES(").append(StringUtils.join(valueHolders, ","))
.append(")").toString();
}
return writeDataSqlTemplate;
}
2.2、新增onMergeIntoDoString与getStrings方法
代码作用:对Oracle进行update的MERGE拼接
public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {
String[] sArray = getStrings(merge);
StringBuilder sb = new StringBuilder();
sb.append("MERGE INTO %s A USING ( SELECT ");
boolean first = true;
boolean first1 = true;
StringBuilder str = new StringBuilder();
StringBuilder update = new StringBuilder();
for (String columnHolder : columnHolders) {
if (Arrays.asList(sArray).contains(columnHolder)) {
if (!first) {
sb.append(",");
str.append(" AND ");
} else {
first = false;
}
str.append("TMP.").append(columnHolder);
sb.append("?");
str.append(" = ");
sb.append(" AS ");
str.append("A.").append(columnHolder);
sb.append(columnHolder);
}
}
for (String columnHolder : columnHolders) {
if (!Arrays.asList(sArray).contains(columnHolder)) {
if (!first1) {
update.append(",");
} else {
first1 = false;
}
update.append(columnHolder);
update.append(" = ");
update.append("?");
}
}
sb.append(" FROM DUAL ) TMP ON (");
sb.append(str);
sb.append(" ) WHEN MATCHED THEN UPDATE SET ");
sb.append(update);
sb.append(" WHEN NOT MATCHED THEN ");
return sb.toString();
}
public static String[] getStrings(String merge) {
merge = merge.replace("update", "");
merge = merge.replace("(", "");
merge = merge.replace(")", "");
merge = merge.replace(" ", "");
return merge.split(",");
}
3、CommonRdbmsWriter
3.1、修改startWriteWithConnection
// 替换原先的代码块
public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
this.taskPluginCollector = taskPluginCollector;
List<String> columns = new LinkedList<>();
if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {
String merge = this.writeMode;
String[] sArray = WriterUtil.getStrings(merge);
this.columns.forEach(column->{
if (Arrays.asList(sArray).contains(column)) {
columns.add(column);
}
});
this.columns.forEach(column->{
if (!Arrays.asList(sArray).contains(column)) {
columns.add(column);
}
});
}
columns.addAll(this.columns);
// 用于写入数据的时候的类型根据目的表字段类型转换
this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));
// 写数据库的SQL语句
calcWriteRecordSql();
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
int bufferBytes = 0;
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException
.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
record.getColumnNumber(),
this.columnNumber));
}
writeBuffer.add(record);
bufferBytes += record.getMemorySize();
if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
}
if (!writeBuffer.isEmpty()) {
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
}
} catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
writeBuffer.clear();
bufferBytes = 0;
DBUtil.closeDBResources(null, null, connection);
}
}
3.2、修改doBatchInsert
protected void doBatchInsert(Connection connection, List<Record> buffer)
throws SQLException
{
PreparedStatement preparedStatement = null;
try {
connection.setAutoCommit(false);
preparedStatement = connection
.prepareStatement(this.writeRecordSql);
if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
String merge = this.writeMode;
String[] sArray = WriterUtil.getStrings(merge);
for (Record record : buffer) {
List<Column> recordOne = new ArrayList<>();
for (int j = 0; j < this.columns.size(); j++) {
if (Arrays.asList(sArray).contains(this.columns.get(j))) {
recordOne.add(record.getColumn(j));
}
}
for (int j = 0; j < this.columns.size(); j++) {
if (!Arrays.asList(sArray).contains(this.columns.get(j))) {
recordOne.add(record.getColumn(j));
}
}
for (int j = 0; j < this.columns.size(); j++) {
recordOne.add(record.getColumn(j));
}
for (int j = 0; j < recordOne.size(); j++) {
record.setColumn(j, recordOne.get(j));
}
preparedStatement = fillPreparedStatement(
preparedStatement, record);
preparedStatement.addBatch();
}
}
else {
for (Record record : buffer) {
preparedStatement = fillPreparedStatement(
preparedStatement, record);
preparedStatement.addBatch();
}
}
preparedStatement.executeBatch();
connection.commit();
}
catch (SQLException e) {
LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());
connection.rollback();
doOneInsert(connection, buffer);
}
catch (Exception e) {
throw DataXException.asDataXException(
DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
finally {
DBUtil.closeDBResources(preparedStatement, null);
}
}
3.3、修改fillPreparedStatement
protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
throws SQLException
{
for (int i = 0; i < record.getColumnNumber(); i++) {
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
String typeName = this.resultSetMetaData.getRight().get(i);
preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));
}
return preparedStatement;
}
第三步打包
1、只需要在idea里面打包修改的两个程序就可以
2、打包成功后获取两个jar包
3、将包替换到datax的插件里面
将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter
将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs
第四步脚本修改
{
"job": {
"setting": {
"speed": {
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${r_username}",
"password": "${r_password}",
"connection": [
{
"querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],
"jdbcUrl": ["${r_jdbcUrl}"]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"writeMode": "update(f_year,f_code)",
"username": "${w_username}",
"password": "${w_password}",
"column": [
"f_year","f_code","f_name","f_order"
],
"session": [],
"preSql": [],
"connection": [
{
"jdbcUrl": "${w_jdbcUrl}",
"table": ["tableName"]
}
]
}
}
}
]
}
}
参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"
update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。
这时候运行就成功了
参考文章DataX 二次开发支持 Oracle 更新数据https://blog.****.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新https://blog.****.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7
修改后jar包地址
懒得修改可以直接下载两个jar替换到你们的datax对应目录。
https://download.****.net/download/qq_36802726/89046154https://download.****.net/download/qq_36802726/89046154