相关环境:
Oracle 11g:11.2.0.1.0
MySQL:8.0.22
前期准备:
1、打开Oracle的logminer
a.在SQL Shell中,以具有DBA的用户身份登录数据库:
sqlplus /nolog;
conn / as sysdba;
b.检查数据库日志记录模式:
select log_mode from v$database;
如果查询结果是ARCHIVELOG,则以下操作都不用执行,如果命令返回NOARCHIVELOG,请继续执行以下步骤:
c.关闭数据库:
shutdown immediate;
d.启动并挂载数据库:
startup mount;
f.启用存档,打开数据库,并使其可写:
alter database archivelog;
alter database open read write;
这里要注意,如果是Linux中通过docker容器安装的Oracle要进入容器中进行修改,外部直接sqlplus连接的话会报错。
docker exec -it oracle11g bash
2、启用补充日志
a.要验证是否为数据库启用了补充日志记录,请在SQL Shell中运行以下命令:
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;
b.状态都为no则执行以下指令:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;
3、创建新Oracle用户并授权
开启补充日志之后为创建新的用户帐户用于Oracle CDC客户端源,并为用户 streamsets 授权,
CREATE USER streamsets IDENTIFIED BY streamsets;
GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets;
GRANT select on GV_$DATABASE to streamsets;
GRANT select on V_$ARCHIVED_LOG to streamsets;
GRANT select on V_$LOGMNR_CONTENTS to streamsets;
GRANT select on v_$logmnr_parameters to streamsets;
后面测试创建表后,插入数据报错:
ORA-01950: no privileges on tablespace 'USERS'
因为没有Resource权限,所以需要执行:
GRANT Resource to streamsets;
GRANT select on <db>.<table> TO <user name>;
Streamsets管道配置:
1、在Streamsets中创建新的管道
a.通过 192.168.XXX.XXX:18630 打开Streamsets
默认账号密码为admin - admin
b.登录后在左边创建新的管道
2、编辑新建的管道
因为业务是将Oracle的数据实时同步到MySQL中,所以在Origins中选择Oracle CDC Client,在目的地Destinations中选择JDBC Producer。然后将两者相连:
3、导入所需的jdbc驱动包
在StreamSets界面中,点击右上角的Package Manager,然后右边的框拉到最下面选择External Libraries导入外部的jar包:
如果导入报错时,解决步骤可以看这里:导入外部库
4、配置 Oracle CDC Client
注意:
A.这里的要注意一下 Oracle CDC Client支持处理以下数据库中的数据:
Oracle 11g,12c,18c和19c
Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle终端启动LogMiner,总结数据库活动,使用这些日志来生成记录。LogMiner要求数据库是打开的,可写的,并且在启用归档的情况下处于ARCHIVELOG模式。
a.首先在Oracle 11g中创建一个学生表
命名为student:
然后插入一些测试数据:
b.编辑Oracle CDC中的内容
Tables 配置为ORACLE DATABASE中的定义的测试表STREAMSETS.student,注意ORACEL大小写敏感。
Initial Change配置默认是 From Latest Change,实时数据同步只需要捕获变化数据。
Operation配置为Hbase对应支持的CRUD操作类型,匹配ORACLE Database中实时变化数据的增删改操作事务操作。
这里改一下时区。
c.编辑JDBC配置内容
JDBC Connection String中输入jdbc的连接信息:
jdbc:oracle:thin:@192.168.105.77:helowinXDB
d.编辑Credentials
填写数据库的用户名与密码:
5、配置 JDBC Producer 组件
a.编辑General Property
Required Fields:必填字段,是必须存在于记录中以允许其进入处理阶段的字段。当记录不包含所有必填字段时,将根据为管道配置的错误处理对其进行处理。您可以为任何处理器,执行程序和大多数目标阶段定义必需的字段。
Preconditions:必须评估为TRUE以允许记录进入处理阶段的条件。单击"+"以创建其他前提条件。根据为阶段配置的错误处理,处理不满足所有前提条件的记录。
On Record Error:阶段的错误记录处理:
- Discard - 丢弃记录。
- Send to Error - 将记录发送到管道以进行错误处理。
- Stop Pipeline- 停止管道。
b.编辑JDBC
JDBC Connection String: 用于连接数据库的连接字符串,各数据库的连接配置使用以下格式
- Mysql: jdbc:mysql://xxxxxxxxxx:3306/数据库名
- Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服务名
- PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
- SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx
Schema Name :要使用的可选数据库或模式名称。在数据库需要完全限定的表名时使用。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Table Name :要使用的数据库表名称。使用数据库所需的表名格式。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Field to Column Mapping:用于覆盖默认字段到列映射。默认情况下,字段将写入同名的列。覆盖映射时,可以定义参数化值,以便在将字符值写入列之前将SQL函数应用于字段值。例如,要将字段值转换为整数,请为参数化值输入以下内容:
CAST(? AS INTEGER)
Enclose Object Names:写入数据库时,将数据库或模式名称,表名称和列名称括在引号中。允许使用区分大小写的名称或带有特殊字符的名称。未启用时,目标使用的JDBC驱动程序将确定名称的提交方式。Oracle JDBC驱动程序默认将名称提交为全部大写。此外,Oracle默认使用模式,表和列名称的全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。
Change Log Format:变更捕获数据的格式。处理更改捕获数据时使用。
Default Operation:如果未设置sdc.operation.type记录标头属性,则执行的默认增删改操作。
Unsupported Operation Handling :不支持在sdc.operation.type记录标头属性中定义的CRUD操作类型时要采取的操作:
- Discard - 丢弃记录。
- Send to Error - 将记录发送到管道以进行错误处理。
- Use Default Operation - 使用默认操作将记录写入目标系统。
Use Multi-Row Operation:确定阶段如何处理记录。选择以一次启用插入和删除多个记录。在启用此选项之前,请验证数据库是否支持阶段使用的多行SQL语句。默认情况下,该阶段执行单行操作。
Max Cache Size Per Batch (Entries):定义多行插入的预准备语句中允许的参数数量。使用-1可禁用参数限制。默认值为-1。
Rollback Batch On Error:当批次中发生错误时,回滚整个批次。
Additional JDBC Configuration Propertie:要使用的其他JDBC配置属性。要添加属性,请单击“ + ”并定义JDBC属性名称和值,使用JDBC所期望的属性名称和值。
c.编辑Credentials
填写数据库的用户名与密码:
d.如果jdbc版本低于4.0,则需要配置Legacy Drivers
JDBC Driver Class Name:JDBC驱动程序的类名。早于版本4.0的JDBC版本必须填写。
Connection Health Test Query :可选查询,用于测试连接的运行状况。仅当JDBC版本低于4.0时建议使用。
e.编辑Advanced
Maximum Pool Size:要创建的最大连接数,默认值为1,建议值为1。
Minimum Idle Connections:要创建和维护的最小连接数。要定义固定连接池,请将其设置为与“Maximum Pool Size”相同的值,默认值为1。
Connection Timeout (Seconds):等待连接的最长时间。在表达式中使用时间常量来定义时间增量,默认值为30秒。
Idle Timeout (Seconds):允许连接空闲的最长时间。在表达式中使用时间常量来定义时间增量,使用0可以避免删除任何空闲连接,默认值为10分钟.
Max Connection Lifetime (Seconds):连接的最长寿命。在表达式中使用时间常量来定义时间增量,使用0设置无最大生命周期,默认值为30分钟
Transaction Isolation:用于连接数据库的事务隔离级别。默认值是为数据库设置的默认事务隔离级别。您可以通过将级别设置为以下任何一项来覆盖数据库默认值:
- 读取已提交
- 读取未提交
- 可重复阅读
序列化
Init Query:在该组件第一次连接到数据库之后立即执行的SQL查询
6、启动管道流:
问题解决:
报错一:
启动时候报错了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor
jdbc连接数据库的时候,需要使用数据库的sid_name,而不是数据库的services_name.而使用plsql连接数据库的时候,只需要数据库的services_name即可,所以修改连接字符串中的services_name 为sid_name.
查询sid_name的语句:
select INSTANCE_NAME from v$instance;
在JDBC中修改即可:
报错二:
我Oracle数据修改以后,并没有在MySQL数据库中同步到数据,然后在错误中可以看到错误信息:
具体的报错内容如下:
JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
... 4 more
将Oracle与MySQL的数据表中的字段都改为大写,则成功执行:
在Oracle中新增一条数据后:
MySQL中实时新增一条数据: