使用Streamsets将Oracle数据实时同步到MySQL中

相关环境:

Oracle 11g:11.2.0.1.0 

MySQL:8.0.22

前期准备:

1、打开Oracle的logminer

a.在SQL Shell中,以具有DBA的用户身份登录数据库:

sqlplus /nolog;

conn / as sysdba;

b.检查数据库日志记录模式:

select log_mode from v$database;

如果查询结果是ARCHIVELOG,则以下操作都不用执行,如果命令返回NOARCHIVELOG,请继续执行以下步骤:

c.关闭数据库:

shutdown immediate;

d.启动并挂载数据库:

startup mount;

f.启用存档,打开数据库,并使其可写:

alter database archivelog;
alter database open read write;

使用Streamsets将Oracle数据实时同步到MySQL中

这里要注意,如果是Linux中通过docker容器安装的Oracle要进入容器中进行修改,外部直接sqlplus连接的话会报错。

docker exec -it oracle11g bash

2、启用补充日志

a.要验证是否为数据库启用了补充日志记录,请在SQL Shell中运行以下命令:

SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;

使用Streamsets将Oracle数据实时同步到MySQL中

b.状态都为no则执行以下指令:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;

使用Streamsets将Oracle数据实时同步到MySQL中

3、创建新Oracle用户并授权

开启补充日志之后为创建新的用户帐户用于Oracle CDC客户端源,并为用户 streamsets 授权,

CREATE USER streamsets IDENTIFIED BY streamsets;
GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets;
GRANT select on GV_$DATABASE to streamsets;
GRANT select on V_$ARCHIVED_LOG to streamsets;
GRANT select on V_$LOGMNR_CONTENTS to streamsets;
GRANT select on v_$logmnr_parameters to streamsets;

使用Streamsets将Oracle数据实时同步到MySQL中

后面测试创建表后,插入数据报错:

ORA-01950: no privileges on tablespace 'USERS'

因为没有Resource权限,所以需要执行:

GRANT Resource to streamsets;

GRANT select on <db>.<table> TO <user name>;

Streamsets管道配置:

1、在Streamsets中创建新的管道

a.通过 192.168.XXX.XXX:18630 打开Streamsets

默认账号密码为admin - admin

b.登录后在左边创建新的管道

使用Streamsets将Oracle数据实时同步到MySQL中

2、编辑新建的管道

因为业务是将Oracle的数据实时同步到MySQL中,所以在Origins中选择Oracle CDC Client,在目的地Destinations中选择JDBC Producer。然后将两者相连:

使用Streamsets将Oracle数据实时同步到MySQL中

3、导入所需的jdbc驱动包

在StreamSets界面中,点击右上角的Package Manager,然后右边的框拉到最下面选择External Libraries导入外部的jar包:

使用Streamsets将Oracle数据实时同步到MySQL中

如果导入报错时,解决步骤可以看这里:导入外部库

4、配置 Oracle CDC Client

注意:
A.这里的要注意一下 Oracle CDC Client支持处理以下数据库中的数据:
    Oracle 11g,12c,18c和19c
    Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle终端启动LogMiner,总结数据库活动,使用这些日志来生成记录。LogMiner要求数据库是打开的,可写的,并且在启用归档的情况下处于ARCHIVELOG模式。

a.首先在Oracle 11g中创建一个学生表

命名为student:

使用Streamsets将Oracle数据实时同步到MySQL中

然后插入一些测试数据:

使用Streamsets将Oracle数据实时同步到MySQL中

b.编辑Oracle CDC中的内容

使用Streamsets将Oracle数据实时同步到MySQL中

Tables 配置为ORACLE DATABASE中的定义的测试表STREAMSETS.student,注意ORACEL大小写敏感。

Initial Change配置默认是 From Latest Change,实时数据同步只需要捕获变化数据。

Operation配置为Hbase对应支持的CRUD操作类型,匹配ORACLE Database中实时变化数据的增删改操作事务操作。

使用Streamsets将Oracle数据实时同步到MySQL中

这里改一下时区。

c.编辑JDBC配置内容

JDBC Connection String中输入jdbc的连接信息:

jdbc:oracle:thin:@192.168.105.77:helowinXDB

使用Streamsets将Oracle数据实时同步到MySQL中

d.编辑Credentials

填写数据库的用户名与密码:

使用Streamsets将Oracle数据实时同步到MySQL中

5、配置 JDBC Producer 组件

a.编辑General Property

Required Fields:必填字段,是必须存在于记录中以允许其进入处理阶段的字段。当记录不包含所有必填字段时,将根据为管道配置的错误处理对其进行处理。您可以为任何处理器,执行程序和大多数目标阶段定义必需的字段。

Preconditions:必须评估为TRUE以允许记录进入处理阶段的条件。单击"+"以创建其他前提条件。根据为阶段配置的错误处理,处理不满足所有前提条件的记录。

On Record Error:阶段的错误记录处理:

  •     Discard - 丢弃记录。
  •     Send to Error - 将记录发送到管道以进行错误处理。
  •     Stop Pipeline- 停止管道。

b.编辑JDBC

使用Streamsets将Oracle数据实时同步到MySQL中

JDBC Connection String: 用于连接数据库的连接字符串,各数据库的连接配置使用以下格式

  •       Mysql: jdbc:mysql://xxxxxxxxxx:3306/数据库名
  •       Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服务名
  •       PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
  •       SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx

Schema Name :要使用的可选数据库或模式名称。在数据库需要完全限定的表名时使用。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Table Name :要使用的数据库表名称。使用数据库所需的表名格式。提示: 默认情况下,Oracle会对模式,表和列名称使用全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Field to Column Mapping:用于覆盖默认字段到列映射。默认情况下,字段将写入同名的列。覆盖映射时,可以定义参数化值,以便在将字符值写入列之前将SQL函数应用于字段值。例如,要将字段值转换为整数,请为参数化值输入以下内容:

CAST(? AS INTEGER)

使用Streamsets将Oracle数据实时同步到MySQL中

Enclose Object Names:写入数据库时​​,将数据库或模式名称,表名称和列名称括在引号中。允许使用区分大小写的名称或带有特殊字符的名称。未启用时,目标使用的JDBC驱动程序将确定名称的提交方式。Oracle JDBC驱动程序默认将名称提交为全部大写。此外,Oracle默认使用模式,表和列名称的全部大写。仅当名称周围使用引号创建架构,表或列时,名称可以是低位或大小写。

Change Log Format:变更捕获数据的格式。处理更改捕获数据时使用。

Default Operation:如果未设置sdc.operation.type记录标头属性,则执行的默认增删改操作。

Unsupported Operation Handling :不支持在sdc.operation.type记录标头属性中定义的CRUD操作类型时要采取的操作:

  • Discard - 丢弃记录。
  • Send to Error - 将记录发送到管道以进行错误处理。
  • Use Default Operation - 使用默认操作将记录写入目标系统。

Use Multi-Row Operation:确定阶段如何处理记录。选择以一次启用插入和删除多个记录。在启用此选项之前,请验证数据库是否支持阶段使用的多行SQL语句。默认情况下,该阶段执行单行操作。

Max Cache Size Per Batch (Entries):定义多行插入的预准备语句中允许的参数数量。使用-1可禁用参数限制。默认值为-1。

Rollback Batch On Error:当批次中发生错误时,回滚整个批次。

Additional JDBC Configuration Propertie:要使用的其他JDBC配置属性。要添加属性,请单击“ + ”并定义JDBC属性名称和值,使用JDBC所期望的属性名称和值。

c.编辑Credentials

填写数据库的用户名与密码:

使用Streamsets将Oracle数据实时同步到MySQL中
d.如果jdbc版本低于4.0,则需要配置Legacy Drivers

JDBC Driver Class Name:JDBC驱动程序的类名。早于版本4.0的JDBC版本必须填写。
Connection Health Test Query :可选查询,用于测试连接的运行状况。仅当JDBC版本低于4.0时建议使用。

e.编辑Advanced

Maximum Pool Size:要创建的最大连接数,默认值为1,建议值为1。

Minimum Idle Connections:要创建和维护的最小连接数。要定义固定连接池,请将其设置为与“Maximum Pool Size”相同的值,默认值为1。

Connection Timeout (Seconds):等待连接的最长时间。在表达式中使用时间常量来定义时间增量,默认值为30秒。

Idle Timeout (Seconds):允许连接空闲的最长时间。在表达式中使用时间常量来定义时间增量,使用0可以避免删除任何空闲连接,默认值为10分钟.

Max Connection Lifetime (Seconds):连接的最长寿命。在表达式中使用时间常量来定义时间增量,使用0设置无最大生命周期,默认值为30分钟

Transaction Isolation:用于连接数据库的事务隔离级别。默认值是为数据库设置的默认事务隔离级别。您可以通过将级别设置为以下任何一项来覆盖数据库默认值:

  • 读取已提交
  • 读取未提交
  • 可重复阅读

序列化
Init Query:在该组件第一次连接到数据库之后立即执行的SQL查询

使用Streamsets将Oracle数据实时同步到MySQL中

6、启动管道流:

使用Streamsets将Oracle数据实时同步到MySQL中

问题解决:

报错一:

启动时候报错了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor

jdbc连接数据库的时候,需要使用数据库的sid_name,而不是数据库的services_name.而使用plsql连接数据库的时候,只需要数据库的services_name即可,所以修改连接字符串中的services_name 为sid_name.
查询sid_name的语句:

select INSTANCE_NAME from v$instance;

使用Streamsets将Oracle数据实时同步到MySQL中

在JDBC中修改即可:

使用Streamsets将Oracle数据实时同步到MySQL中

报错二:

我Oracle数据修改以后,并没有在MySQL数据库中同步到数据,然后在错误中可以看到错误信息:

使用Streamsets将Oracle数据实时同步到MySQL中

具体的报错内容如下:

使用Streamsets将Oracle数据实时同步到MySQL中

JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
    ... 4 more

将Oracle与MySQL的数据表中的字段都改为大写,则成功执行:

使用Streamsets将Oracle数据实时同步到MySQL中

在Oracle中新增一条数据后:

使用Streamsets将Oracle数据实时同步到MySQL中

MySQL中实时新增一条数据:

使用Streamsets将Oracle数据实时同步到MySQL中

 

 

 

上一篇:SDC设计约束(4)——其他SDC命令


下一篇:硬盘故障恢复实践