基于OGG实现ORACLE同步至KAFKA实施方案

一、背景

本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现从OLTP系统中获取数据变更,实时同步到下游业务系统。

二、环境介绍

1、组件版本

组件

版本

操作系统

IP地址

描述

源端Oracle

11.2.0.4.0

Red Hat 6.8

192.168.140.186

源端Oracle数据库

源端OGG

12.1.2.1.0

Red Hat 6.8

192.168.140.186

源端OGG,用于抽取源端Oracle数据变更,并将变更日志发送到目标端

目标端OGG

12.3.2.1.1

Red Hat 7.5

192.168.83.227、228、229

目标端OGG,接受源端发送的Oracle事务变更日志,并将变更推送到kafka消息队列

目标端kafka

2.12-1.1.0

Red Hat 7.5

192.168.83.227、228、229

消息队列,接收目标端OGG推送过来的数据

 

2、整体架构图

 基于OGG实现ORACLE同步至KAFKA实施方案

3、名词解释

1)   OGG Manager:OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。

2)   数据抽取(Extract):抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下2种类型:

a.本地抽取:从本地数据库捕获增量变更数据,写入到本地Trail文件

b.初始数据抽取:从数据库表中导出全量数据,用于初次数据加载

3)   数据推送(Data Pump):Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG

4)   4.Trail文件:数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。

5)   数据接收(Collector):数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。

6)   数据复制(Replicat):数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。

7)   检查点(Checkpoint):检查点用于记录数据库事物变更。

 

二、操作步骤

1、源端安装配置(192.168.140.186)

(1)源端安装了ORACLE 版本11.2.0.4

     alter system set enable_goldengate_replication=TRUE;

     Alter database add supplemental log data;

     alter database force logging;

     Select supplemental_log_data_min from v$database;

     Create user ogg identified by "Qwer!234" Default tablespace users temporary tablespace temp  profile DEFAULT;

Grant connect to ogg;

grant resource to ogg;

grant dba to ogg;

grant alter session to ogg;

grant create session to ogg;

grant select any dictionary to ogg;

grant select any table to ogg;

grant insert any table to ogg;

grant delete any table to ogg;

grant update any table to ogg;

grant alter any table to ogg;

grant create table to ogg;

grant lock any table to ogg;

grant flashback any table to ogg;

Grant unlimited tablespace to ogg;

(2)源端安装了OGG版本12.1.2.1.0

     --创建用户

     $useradd -g oinstall gg

     --设置gg环境变量

      将oracle的环境变量拷贝至gg用户

     --创建目录

      Mkdir /oracle/gg

      Chown oracle:oinstall /oracle/gg

     --解压安装

     Unzip 121210_fbo_ggs_Linux_x64_shiphome.zip

     vi /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

      INSTALL_OPTION=ORA11g

      SOFTWARE_LOCATION=/oracle/gg

      START_MANAGER=false

     cd /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1

./runInstaller -responseFile /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp -silent  -ignoreSysPrereqs  -ignorePrereq –local

--创建子目录

./ggsci

create SUBDIRS

start mgr

 

(3)源端配置OGG

/*网络截取资料

GGSCI (zwjfdb3) 7> view param EZWJFBOR

EXTRACT EZWJFBOR

SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")

SETENV (ORACLE_SID = "zwjfdb3")

--捕获 truncate 操作

gettruncates

--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中

DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024

--动态解析表名

DYNAMICRESOLUTION

--获取更新之前数据

GETUPDATEBEFORES

--当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)

DBOPTIONS  ALLOWUNUSEDCOLUMN

--每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息

REPORTCOUNT EVERY 30 MINUTES, RATE

--每隔3分钟检查一下大事务,超过2小时还没结束的进行报告

WARNLONGTRANS 2h,CHECKINTERVAL 3m

--不会从闪回日志中获取数据

FETCHOPTIONS NOUSESNAPSHOT

USERID xxxxxx,PASSWORD xxxxxx

EXTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

 

#添加抽取进程

GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW

EXTRACT added.

#定义trail文件

GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200

EXTTRAIL added.

 

#pump extract进程

GGSCI (zwjfdb3) 8> view param PZWJFBOR

EXTRACT PZWJFBOR

SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")

PASSTHRU

DYNAMICRESOLUTION

RMTHOST xx.xx.xx.xx,MGRPOT 7809

RMTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

#添加pump捕获组

GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb

EXTRACT added.

#定义pump trail文件

GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200

RMTTRAIL added.

*/

1、ORACLE创建测试表

create tablespace TBSDATA datafile size 1G;

create user linq identified by "Qwer!234" default tablespace TBSDATA;

grant connect to linq;

grant resource to linq;

create table linq.test_ogg(id number ,name varchar2(200),primary key(id));

    2、配置MGR

>edit params mgr

    PORT 7809

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 2

   3、配置捕获进程
    #一定要记得同步之前要开启表的全补充日志
#alter table linq.test_ogg add supplemental log data (all) columns;

>dblogin userid ogg@ORCL,password Qwer!234

    >add extract E_LINQ,tranlog,begin now

>ADD EXTTRAIL ./dirdat/lq, EXTRACT E_LINQ

>add trandata linq.test_ogg

>info  trandata linq.test_ogg

>edit params E_LINQ

  extract E_LINQ      

--动态解析表名

dynamicresolution

SETENV (ORACLE_SID = "newdb")

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

--SETENV (NLS_LANG = "american_america.AL32UTF8")

userid ogg@ORCL,password Qwer!234

--获取更新之前全部字段数据(如果注释,before显示空,需与UPDATERECORDFORMAT COMPACT一起配置)

GETUPDATEBEFORES

--获取删除前非主键字段(如果注释,before只会显示主键)

NOCOMPRESSDELETES

--获取更新前非主键字段(目前没发现用途)

NOCOMPRESSUPDATES

--获取更新的前后镜像信息(如果注释,before显示空,需与GETUPDATEBEFORES一起配置)

UPDATERECORDFORMAT COMPACT

--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中

discardfile ./dirrpt/E_LINQ.dsc,purge,megabytes 20000

warnlongtrans 2h,checkinterval 3m

exttrail ./dirdat/lq

PURGEOLDEXTRACTS ./dirdat/lq*,usecheckpoints, minkeepdays 3

TRANLOGOPTIONS DBLOGREADER

numfiles 3000

allocfiles 200

table linq.test_ogg;

   4、配置传输进程

>add extract P_LINQ,exttrailsource ./dirdat/lq

> ADD EXTTRAIL ./dirdat/lq, EXTRACT P_LINQ

>add rmttrail ./dirdat/lq,extract P_LINQ

>edit params P_LINQ

extract P_LINQ

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

passthru

dynamicresolution

rmthost 192.168.73.227,mgrport 7809 ,compress

rmttrail ./dirdat/lq

numfiles 3000

table linq.test_ogg;

 

5、配置define定义文件

> edit param test_ogg

defsfile ./dirdef/linq.test_ogg

userid ogg@ORCL,password Qwer!234

table linq.test_ogg;

在OGG主目录下执行:

./defgen paramfile dirprm/test_ogg.prm

基于OGG实现ORACLE同步至KAFKA实施方案  
注:目标端安装后,将生成的./dirdef/linq.test_ogg发送的目标端ogg目录下的dirdef里

2、目标端安装配置(192.168.73.227)

(1)目标端安装kafka,版本2.12-1.1.0,已安装
      
(2)目标端安装OGG,版本12.3.2.1.1
  groupadd gg
  useradd -g gg -G gg gg
  passwd gg
su - gg
unzip
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /data/gg/
  
  vi /home/gg/.bash_profile
export OGG_HOME=/data/gg 
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib 
export PATH=$OGG_HOME:$PATH
. .bash_profile
 
测试ggsci

基于OGG实现ORACLE同步至KAFKA实施方案


 
创建子目录create subdirs

基于OGG实现ORACLE同步至KAFKA实施方案


(3)定义文件拷贝
从源端192.168.140.128的定义文件/oracle/gg/dirdef/linq.test_ogg拷贝
到目标端/data/gg/dirdef/linq.test_ogg
scp /oracle/gg/dirdef/linq.test_ogg gg@192.168.83.227:/data/gg/dirdef/
(4)kafka创建主题
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 3 --topic testogg2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic testogg2
(5)开启kafka进程
--83.227开启
  kafka-server-start.sh -daemon /data/kfdata/kafka/config/server.properties
(6)配置管理器mgr
>edit param mgr 
PORT 7809 
DYNAMICPORTLIST 7810-7909 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
 
(7)配置checkpoint
>edit  param  ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
(8)配置replicate进程
>edit param rekafka
REPLICAT rekafka
sourcedefs /data/gg/dirdef/linq.test_ogg 
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props 
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000 
MAP linq.test_ogg, TARGET linq.test_ogg;
说明:REPLICATE rekafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系。
(9)配置kafka.props(备注不能配进去)
cd /data/gg/dirprm/
vi kafka.props
/*
gg.handlerlist=kafkahandler //handler类型 
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,可先手动创建好,默认创建的话partition数只有1
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等 
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键
gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
*/
无备注版:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka 
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true 
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
 
vi custom_kafka_producer.properties
/*
bootstrap.servers=192.168.83.227:9092 //kafkabroker的地址 
acks=1 
compression.type=gzip //压缩类型 
reconnect.backoff.ms=1000 //重连延时 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
*/
无备注版:
bootstrap.servers=192.168.83.227:9092
acks=1 
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
(10)添加trail文件到replicate进程
add replicat rekafka exttrail /data/gg/dirdat/lq,checkpointtable test_ogg.checkpoint
(11)开启源端与目标端ogg
  
(12)测试
1、源端数据入库
conn linq/Qwer!234
insert into test_ogg values(2,‘go‘);
commit;
2、查看目标端kafka主题是否创建
kafka-topics.sh --list --zookeeper localhost:2181

基于OGG实现ORACLE同步至KAFKA实施方案


3、进行消费测试
kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
(用kafka端口9092,consumer的信息将会存放在kafka之中,推荐)
或:
kafka-console-consumer.sh --zookeeper 192.168.83.227:2181 --from-beginning --topic test_ogg
(用zookeeper端口2181,consumer的信息将会存放在zk之中)
测试结果如下(中文也是支持的):

基于OGG实现ORACLE同步至KAFKA实施方案

[root@kafka3 data]# kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:12:03.648091","current_ts":"2020-03-06T17:12:10.737000","pos":"00000000000000001432","primary_keys":["ID"],"after":{"ID":2,"NAME":"go"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:21:23.652647","current_ts":"2020-03-06T17:21:31.368000","pos":"00000000000000001569","primary_keys":["ID"],"after":{"ID":3,"NAME":"ok"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-06 17:22:10.653057","current_ts":"2020-03-06T17:22:17.411000","pos":"00000000000000001701","primary_keys":["ID"],"before":{"ID":3,"NAME":"ok"},"after":{"ID":3,"NAME":"ok3"}}
{"table":"LINQ.TEST_OGG","op_type":"D","op_ts":"2020-03-06 17:22:58.653488","current_ts":"2020-03-06T17:23:05.454000","pos":"00000000000000001858","primary_keys":["ID"],"before":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-07 19:21:07.411700","current_ts":"2020-03-07T19:21:12.465000","pos":"00000000000000001994","primary_keys":["ID"],"after":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-10 15:23:20.354624","current_ts":"2020-03-10T15:23:27.371000","pos":"00000000000000002130","primary_keys":["ID"],"after":{"ID":4,"NAME":"linq"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-10 15:30:19.357882","current_ts":"2020-03-10T15:30:25.697000","pos":"00000000000000002266","primary_keys":["ID"],"before":{"ID":4,"NAME":"linq"},"after":{"ID":4,"NAME":"林勤"}}

基于OGG实现ORACLE同步至KAFKA实施方案

上一篇:linux预习5 终端,sql


下一篇:jdbc增删改查