需求:Oracle 数据库结构化数据同步到Kafka,并接入大数据平台
目标:通过Oracle GoldenGate 实现Oracle数据库到Kafka数据的同步
步骤:
基本信息:
Kafka集群地址:kafka_host:9092
Kafka topic: goldengate
Oracle 数据库同步表名:fund
一、Oracle源库准备
二、GoldenGate源库准备
三、GoldenGate For Kafka配置
一、Oracle源库准备:
export TMPDIR=/oracle/app/db/xttsdir
export OGG_HOME=/oradata/ogg
Oracle源数据库准备:
CREATE USER ogg IDENTIFIED BY ogg4test DEFAULT TABLESPACE USERS ;
GRANT CONNECT,RESOURCE,DBA TO ogg;
show parameter pfile
SQL> alter system set enable_goldengate_replication=TRUE scope=both;
设置归档目录:/oradata/archive
SQL>alter system set log_archive_dest_1='location=/oradata/archive' scope=both;
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>archive log list;
SQL>alter database open;
SQL>select force_logging,supplemental_log_data_min from v$database;
SQL>alter database force logging;
SQL>alter database add supplemental log data;
SQL>alter system archive log current;
二、Oracle GoldenGate源库准备:
1. 安装goldengate 软件,./runInstaller 图形化安装
./ggsci
GGSCI (test01) 1> create subdirs
Creating subdirectories under current directory /oradata/ogg
Parameter file /oradata/ogg/dirprm: already exists.
Report file /oradata/ogg/dirrpt: already exists.
Checkpoint file /oradata/ogg/dirchk: already exists.
Process status files /oradata/ogg/dirpcs: already exists.
SQL script files /oradata/ogg/dirsql: already exists.
Database definitions files /oradata/ogg/dirdef: already exists.
Extract data files /oradata/ogg/dirdat: already exists.
Temporary files /oradata/ogg/dirtmp: already exists.
Credential store files /oradata/ogg/dircrd: already exists.
Masterkey wallet files /oradata/ogg/dirwlt: already exists.
Dump files /oradata/ogg/dirdmp: already exists.
2. 配置OGG Mgr进程:
GGSCI>edit param mgr
PORT 7809
PURGEOLDEXTRACTS ./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72
AUTORESTART ER *, RETRIES 3, WAITMINUTES 10, RESETMINUTES 60
3. 增加需同步的oracle数据库表FUND:
GGSCI (test01) 7> DBLOGIN USERID ogg, PASSWORD ogg4test
Successfully logged into database.
GGSCI (test01 as ogg@zjtest1) 8> ADD TRANDATA funder.FUND
2018-05-17 15:25:19 INFO OGG-15132 Logging of supplemental redo data enabled for table funder.FUND.
2018-05-17 15:25:19 INFO OGG-15133 TRANDATA for scheduling columns has been added on table funder.FUND.
2018-05-17 15:25:20 INFO OGG-15135 TRANDATA for instantiation CSN has been added on table funder.FUND.
GGSCI (test01 ) 9> add ext eapps, tranlog, begin now,threads 1 <====
4. 抽取传递进程配置:
为eapps配置参数:
GGSCI (test01 )> edit param eapps
extract eapps
USERID ogg, PASSWORD ogg4test
DISCARDFILE ./dirrpt/eapps.dsc, APPEND
DISCARDROLLOVER AT 01:00 ON SUNDAY
EXTTRAIL ./dirdat/et
TRANLOGOPTIONS DBLOGREADER
STATOPTIONS REPORTFETCH
REPORTCOUNT every 10 minutes, RATE
REPORTROLLOVER AT 01:00 ON SUNDAY
TABLE funder.FUND;
GGSCI (test01) > add ext papps, exttrailsource ./dirdat/et
EXTRACT added.
GGSCI (test01) > add rmttrail ./dirdat/rt, ext papps, megabytes 500
RMTTRAIL added.
编辑papps进程
GGSCI (test01) >edit params papps
extract papps
RMTHOST 192.168.1.22, MGRPORT 7809
PASSTHRU
RMTTRAIL ./dirdat/rt
TABLE funder.FUND;
GGSCI (test01) > start papps
Sending START request to MANAGER ...
EXTRACT papps starting
GGSCI (test01) > info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING eapps 00:00:00 00:00:01
EXTRACT RUNNING papps 00:00:00 00:00:18
GGSCI (test01) >
GGSCI (test01) > start eapps
Sending START request to MANAGER ...
EXTRACT eapps starting
GGSCI (test01)> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING eapps 00:00:00 00:02:53
三、GoldenGate For Kafka配置
安装OGG for big data Adapters:
ggs_Adapters_Linux_x64 直接在/oracle/app/ogg4bigdata目录下解压(无需安装)
$ls –lrt
drwxr-xr-x 10 oracle oinstall 4096 Aug 29 2017 ogg4bigdata
配置OGG进程:
GGSCI >create subdirs
配置Manager,并启动mgr进程:
GGSCI >edit params mgr
PORT 7809
PURGEOLDEXTRACTS./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72
AUTORESTART ER *, RETRIES3, WAITMINUTES 10, RESETMINUTES 60
GGSCI > start mgr
3. 配置Kafka进程
GGSCI> add replicat rkafka,exttrail dirdat/rt,begin now
GGSCI>edit params rkafka
REPLICAT rkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP funder.*, TARGET funder.*;
4. 修改kafka.props (kafka.props要放在/data/ogg/dirprm目录)
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=./dirprm/custom_kafka_producer.properties
#The following resolves the topicname using the short table name
gg.handler.kafkahandler.topicMappingTemplate=goldengate
#The following selects the messagekey using the concatenated primary keys
gg.handler.kafkahandler.keyMappingTemplate=id
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.SchemaTopicName=goldengate
gg.handler.kafkahandler.BlockingSend=false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=tx
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for ApacheKafka
gg.classpath=dirprm/:/oracle/app/ogg4bigdata/ggjava/resources/lib/*:/oracle/app/ogg4bigdata/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m-Xms32m -Djava.class.path=ggjava/ggjava.jar
5. custom_kafka_producer.properties
bootstrap.servers=kafka_host:9092
acks=1
reconnect.backoff.ms=1000
测试过程:
如下在oracle数据库表中插入数据:
同时在kafka中监控JSON数据是否接收到
推荐阅读