一、背景
本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现从OLTP系统中获取数据变更,实时同步到下游业务系统。
二、环境介绍
1、组件版本
组件 |
版本 |
操作系统 |
IP地址 |
描述 |
源端Oracle |
11.2.0.4.0 |
Red Hat 6.8 |
192.168.140.186 |
源端Oracle数据库 |
源端OGG |
12.1.2.1.0 |
Red Hat 6.8 |
192.168.140.186 |
源端OGG,用于抽取源端Oracle数据变更,并将变更日志发送到目标端 |
目标端OGG |
12.3.2.1.1 |
Red Hat 7.5 |
192.168.83.227、228、229 |
目标端OGG,接受源端发送的Oracle事务变更日志,并将变更推送到kafka消息队列 |
目标端kafka |
2.12-1.1.0 |
Red Hat 7.5 |
192.168.83.227、228、229 |
消息队列,接收目标端OGG推送过来的数据 |
2、整体架构图
3、名词解释
1) OGG Manager:OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。
2) 数据抽取(Extract):抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下2种类型:
a.本地抽取:从本地数据库捕获增量变更数据,写入到本地Trail文件
b.初始数据抽取:从数据库表中导出全量数据,用于初次数据加载
3) 数据推送(Data Pump):Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG
4) 4.Trail文件:数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。
5) 数据接收(Collector):数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。
6) 数据复制(Replicat):数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。
7) 检查点(Checkpoint):检查点用于记录数据库事物变更。
二、操作步骤
1、源端安装配置(192.168.140.186)
(1)源端安装了ORACLE 版本11.2.0.4
alter system set enable_goldengate_replication=TRUE;
Alter database add supplemental log data;
alter database force logging;
Select supplemental_log_data_min from v$database;
Create user ogg identified by "Qwer!234" Default tablespace users temporary tablespace temp profile DEFAULT;
Grant connect to ogg;
grant resource to ogg;
grant dba to ogg;
grant alter session to ogg;
grant create session to ogg;
grant select any dictionary to ogg;
grant select any table to ogg;
grant insert any table to ogg;
grant delete any table to ogg;
grant update any table to ogg;
grant alter any table to ogg;
grant create table to ogg;
grant lock any table to ogg;
grant flashback any table to ogg;
Grant unlimited tablespace to ogg;
(2)源端安装了OGG版本12.1.2.1.0
--创建用户
$useradd -g oinstall gg
--设置gg环境变量
将oracle的环境变量拷贝至gg用户
--创建目录
Mkdir /oracle/gg
Chown oracle:oinstall /oracle/gg
--解压安装
Unzip 121210_fbo_ggs_Linux_x64_shiphome.zip
vi /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp
INSTALL_OPTION=ORA11g
SOFTWARE_LOCATION=/oracle/gg
START_MANAGER=false
cd /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1
./runInstaller -responseFile /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp -silent -ignoreSysPrereqs -ignorePrereq –local
--创建子目录
./ggsci
create SUBDIRS
start mgr
(3)源端配置OGG
/*网络截取资料
GGSCI (zwjfdb3) 7> view param EZWJFBOR
EXTRACT EZWJFBOR
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
SETENV (ORACLE_SID = "zwjfdb3")
--捕获 truncate 操作
gettruncates
--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中
DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024
--动态解析表名
DYNAMICRESOLUTION
--获取更新之前数据
GETUPDATEBEFORES
--当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)
DBOPTIONS ALLOWUNUSEDCOLUMN
--每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息
REPORTCOUNT EVERY 30 MINUTES, RATE
--每隔3分钟检查一下大事务,超过2小时还没结束的进行报告
WARNLONGTRANS 2h,CHECKINTERVAL 3m
--不会从闪回日志中获取数据
FETCHOPTIONS NOUSESNAPSHOT
USERID xxxxxx,PASSWORD xxxxxx
EXTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加抽取进程
GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW
EXTRACT added.
#定义trail文件
GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200
EXTTRAIL added.
#pump extract进程
GGSCI (zwjfdb3) 8> view param PZWJFBOR
EXTRACT PZWJFBOR
SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
PASSTHRU
DYNAMICRESOLUTION
RMTHOST xx.xx.xx.xx,MGRPOT 7809
RMTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加pump捕获组
GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb
EXTRACT added.
#定义pump trail文件
GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200
RMTTRAIL added.
*/
1、ORACLE创建测试表
create tablespace TBSDATA datafile size 1G;
create user linq identified by "Qwer!234" default tablespace TBSDATA;
grant connect to linq;
grant resource to linq;
create table linq.test_ogg(id number ,name varchar2(200),primary key(id));
2、配置MGR
>edit params mgr
PORT 7809
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 2
3、配置捕获进程
#
一定要记得同步之前要开启表的全补充日志
#alter table
linq.test_oggadd supplemental log data (all) columns;
>dblogin userid ogg@ORCL,password Qwer!234
>add extract E_LINQ,tranlog,begin now
>ADD EXTTRAIL ./dirdat/lq, EXTRACT E_LINQ
>add trandata linq.test_ogg
>info trandata linq.test_ogg
>edit params E_LINQ
extract E_LINQ
--动态解析表名
dynamicresolution
SETENV (ORACLE_SID = "newdb")
SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")
--SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg@ORCL,password Qwer!234
--获取更新之前全部字段数据(如果注释,before显示空,需与UPDATERECORDFORMAT COMPACT一起配置)
GETUPDATEBEFORES
--获取删除前非主键字段(如果注释,before只会显示主键)
NOCOMPRESSDELETES
--获取更新前非主键字段(目前没发现用途)
NOCOMPRESSUPDATES
--获取更新的前后镜像信息(如果注释,before显示空,需与GETUPDATEBEFORES一起配置)
UPDATERECORDFORMAT COMPACT
--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中
discardfile ./dirrpt/E_LINQ.dsc,purge,megabytes 20000
warnlongtrans 2h,checkinterval 3m
exttrail ./dirdat/lq
PURGEOLDEXTRACTS ./dirdat/lq*,usecheckpoints, minkeepdays 3
TRANLOGOPTIONS DBLOGREADER
numfiles 3000
allocfiles 200
table linq.test_ogg;
4、配置传输进程
>add extract P_LINQ,exttrailsource ./dirdat/lq
> ADD EXTTRAIL ./dirdat/lq, EXTRACT P_LINQ
>add rmttrail
./dirdat/lq,extract
P_LINQ
>edit params P_LINQ
extract P_LINQ
SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")
passthru
dynamicresolution
rmthost 192.168.73.227,mgrport 7809 ,compress
rmttrail ./dirdat/lq
numfiles 3000
table linq.test_ogg;
5、配置define定义文件
> edit param test_ogg
defsfile ./dirdef/linq.test_ogg
userid ogg@ORCL,password Qwer!234
table linq.test_ogg;
在OGG主目录下执行:
./defgen paramfile dirprm/
test_ogg.prm
注:目标端安装后,将生成的./dirdef/linq.test_ogg发送的目标端ogg目录下的dirdef里
2、目标端安装配置(192.168.73.227)
(1)目标端安装kafka,版本
2.12-1.1.0,已安装
(2)目标端安装OGG,版本12.3.2.1.1
groupadd gg
useradd -g gg -G gg gg
passwd gg
su - gg
unzip
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /data/gg/
vi /home/gg/.bash_profile
export OGG_HOME=/data/gg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
. .bash_profile
测试ggsci
创建子目录create subdirs
(3)定义文件拷贝
从源端192.168.140.128的定义文件/oracle/gg/dirdef/linq.test_ogg拷贝
到目标端/data/gg/dirdef/linq.test_ogg
scp /oracle/gg/dirdef/linq.test_ogg gg@192.168.83.227:/data/gg/dirdef/
(4)kafka创建主题
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 3 --topic testogg2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic testogg2
(5)开启kafka进程
--83.227开启
kafka-server-start.sh -daemon /data/kfdata/kafka/config/server.properties
(6)配置管理器mgr
>edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
(7)配置checkpoint
>edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
(8)配置replicate进程
>edit param rekafka
REPLICAT rekafka
sourcedefs /data/gg/dirdef/linq.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP linq.test_ogg, TARGET linq.test_ogg;
说明:REPLICATE rekafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系。
(9)配置kafka.props(备注不能配进去)
cd /data/gg/dirprm/
vi kafka.props
/*
gg.handlerlist=kafkahandler //handler类型
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,可先手动创建好,默认创建的话partition数只有1
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键
gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
*/
无备注版:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
vi custom_kafka_producer.properties
/*
bootstrap.servers=192.168.83.227:9092 //kafkabroker的地址
acks=1
compression.type=gzip //压缩类型
reconnect.backoff.ms=1000 //重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
*/
无备注版:
bootstrap.servers=192.168.83.227:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
(10)添加trail文件到replicate进程
add replicat rekafka exttrail /data/gg/dirdat/lq,checkpointtable test_ogg.checkpoint
(11)开启源端与目标端ogg
略
(12)测试
1、源端数据入库
conn linq/Qwer!234
insert into test_ogg values(2,‘go‘);
commit;
2、查看目标端kafka主题是否创建
kafka-topics.sh --list --zookeeper localhost:2181
3、进行消费测试
kafka-console-consumer.sh--bootstrap-server
192.168.83.227
:9092 --from-beginning --topictest_ogg
(用kafka端口9092,consumer的信息将会存放在kafka之中,推荐)
或:
kafka-console-consumer.sh --zookeeper192.168.83.227
:2181 --from-beginning --topictest_ogg
(用zookeeper端口2181,consumer的信息将会存放在zk之中)
测试结果如下(中文也是支持的):
[root@kafka3 data]# kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:12:03.648091","current_ts":"2020-03-06T17:12:10.737000","pos":"00000000000000001432","primary_keys":["ID"],"after":{"ID":2,"NAME":"go"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:21:23.652647","current_ts":"2020-03-06T17:21:31.368000","pos":"00000000000000001569","primary_keys":["ID"],"after":{"ID":3,"NAME":"ok"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-06 17:22:10.653057","current_ts":"2020-03-06T17:22:17.411000","pos":"00000000000000001701","primary_keys":["ID"],"before":{"ID":3,"NAME":"ok"},"after":{"ID":3,"NAME":"ok3"}}
{"table":"LINQ.TEST_OGG","op_type":"D","op_ts":"2020-03-06 17:22:58.653488","current_ts":"2020-03-06T17:23:05.454000","pos":"00000000000000001858","primary_keys":["ID"],"before":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-07 19:21:07.411700","current_ts":"2020-03-07T19:21:12.465000","pos":"00000000000000001994","primary_keys":["ID"],"after":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-10 15:23:20.354624","current_ts":"2020-03-10T15:23:27.371000","pos":"00000000000000002130","primary_keys":["ID"],"after":{"ID":4,"NAME":"linq"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-10 15:30:19.357882","current_ts":"2020-03-10T15:30:25.697000","pos":"00000000000000002266","primary_keys":["ID"],"before":{"ID":4,"NAME":"linq"},"after":{"ID":4,"NAME":"林勤"}}