Oracle数据同步到Kafka (Oracle GoldenGate for Kafka) 原创
金蝶云社区-云社区用户O7l87531
云社区用户O7l87531
1人赞赏了该文章 877次浏览 未经作者许可,禁止转载编辑于2020年09月16日 10:24:01

需求:Oracle 数据库结构化数据同步到Kafka,并接入大数据平台

目标:通过Oracle GoldenGate 实现Oracle数据库到Kafka数据的同步

步骤:

基本信息:

        Kafka集群地址:kafka_host:9092

        Kafka topic: goldengate

        Oracle 数据库同步表名:fund


一、Oracle源库准备

二、GoldenGate源库准备

三、GoldenGate For Kafka配置



一、Oracle源库准备:
export TMPDIR=/oracle/app/db/xttsdir
export OGG_HOME=/oradata/ogg

Oracle源数据库准备:
CREATE USER ogg IDENTIFIED BY ogg4test DEFAULT TABLESPACE USERS ;
GRANT CONNECT,RESOURCE,DBA TO ogg;
show parameter pfile
SQL> alter system set enable_goldengate_replication=TRUE scope=both;
设置归档目录:/oradata/archive
SQL>alter system set log_archive_dest_1='location=/oradata/archive' scope=both;
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>archive log list;
SQL>alter database open;
SQL>select force_logging,supplemental_log_data_min from v$database;
SQL>alter database force logging;
SQL>alter database add supplemental log data;
SQL>alter system archive log current;


二、Oracle GoldenGate源库准备:
1. 安装goldengate 软件,./runInstaller 图形化安装
./ggsci
GGSCI (test01) 1> create subdirs

Creating subdirectories under current directory /oradata/ogg

Parameter file                 /oradata/ogg/dirprm: already exists.
Report file                    /oradata/ogg/dirrpt: already exists.
Checkpoint file                /oradata/ogg/dirchk: already exists.
Process status files           /oradata/ogg/dirpcs: already exists.
SQL script files               /oradata/ogg/dirsql: already exists.
Database definitions files     /oradata/ogg/dirdef: already exists.
Extract data files             /oradata/ogg/dirdat: already exists.
Temporary files                /oradata/ogg/dirtmp: already exists.
Credential store files         /oradata/ogg/dircrd: already exists.
Masterkey wallet files         /oradata/ogg/dirwlt: already exists.
Dump files                     /oradata/ogg/dirdmp: already exists.
2. 配置OGG Mgr进程:
GGSCI>edit param mgr
PORT 7809
PURGEOLDEXTRACTS ./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72
AUTORESTART ER *, RETRIES 3, WAITMINUTES 10, RESETMINUTES 60
3. 增加需同步的oracle数据库表FUND:
GGSCI (test01) 7> DBLOGIN USERID ogg, PASSWORD ogg4test
Successfully logged into database.

GGSCI (test01 as ogg@zjtest1) 8> ADD TRANDATA funder.FUND
2018-05-17 15:25:19  INFO    OGG-15132  Logging of supplemental redo data enabled for table funder.FUND.
2018-05-17 15:25:19  INFO    OGG-15133  TRANDATA for scheduling columns has been added on table funder.FUND.
2018-05-17 15:25:20  INFO    OGG-15135  TRANDATA for instantiation CSN has been added on table funder.FUND.
GGSCI (test01 ) 9> add ext eapps, tranlog, begin now,threads 1     <====
4. 抽取传递进程配置:
为eapps配置参数:
GGSCI (test01 )> edit param eapps
extract eapps
USERID ogg, PASSWORD ogg4test
DISCARDFILE ./dirrpt/eapps.dsc, APPEND
DISCARDROLLOVER AT 01:00 ON SUNDAY
EXTTRAIL ./dirdat/et
TRANLOGOPTIONS DBLOGREADER
STATOPTIONS REPORTFETCH
REPORTCOUNT every 10 minutes, RATE
REPORTROLLOVER AT 01:00 ON SUNDAY
TABLE funder.FUND;

GGSCI (test01) > add ext papps, exttrailsource ./dirdat/et
EXTRACT added.
GGSCI (test01) > add rmttrail ./dirdat/rt, ext papps, megabytes 500
RMTTRAIL added.
编辑papps进程

GGSCI (test01) >edit params papps
extract papps
RMTHOST 192.168.1.22, MGRPORT 7809
PASSTHRU
RMTTRAIL ./dirdat/rt
TABLE funder.FUND;

GGSCI (test01) > start papps
Sending START request to MANAGER ...
EXTRACT papps starting
GGSCI (test01) > info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
EXTRACT     RUNNING     eapps     00:00:00      00:00:01    
EXTRACT     RUNNING     papps     00:00:00      00:00:18    


GGSCI (test01) >
GGSCI (test01) > start eapps

Sending START request to MANAGER ...
EXTRACT eapps starting

GGSCI (test01)> info all
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
MANAGER     RUNNING                                           
EXTRACT     RUNNING     eapps     00:00:00      00:02:53    


三、GoldenGate For Kafka配置

  1. 安装OGG for big data Adapters:

    ggs_Adapters_Linux_x64 直接在/oracle/app/ogg4bigdata目录下解压(无需安装)

    $ls –lrt

    drwxr-xr-x 10 oracle oinstall 4096 Aug 29  2017 ogg4bigdata

  2. 配置OGG进程:

    GGSCI >create subdirs

    配置Manager,并启动mgr进程:

    GGSCI >edit params mgr

    PORT 7809

    PURGEOLDEXTRACTS./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72

    AUTORESTART ER *, RETRIES3, WAITMINUTES 10, RESETMINUTES 60

       GGSCI > start mgr

   3. 配置Kafka进程

       GGSCI> add replicat rkafka,exttrail dirdat/rt,begin now

       GGSCI>edit params rkafka
       REPLICAT rkafka
       TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
       REPORTCOUNT EVERY 1 MINUTES, RATE
       GROUPTRANSOPS 10000
       MAP funder.*, TARGET funder.*;

   4. 修改kafka.props (kafka.props要放在/data/ogg/dirprm目录)

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=./dirprm/custom_kafka_producer.properties

#The following resolves the topicname using the short table name

gg.handler.kafkahandler.topicMappingTemplate=goldengate

#The following selects the messagekey using the concatenated primary keys

gg.handler.kafkahandler.keyMappingTemplate=id

gg.handler.kafkahandler.format=delimitedtext

gg.handler.kafkahandler.SchemaTopicName=goldengate

gg.handler.kafkahandler.BlockingSend=false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=tx

 

goldengate.userexit.timestamp=utc

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

 

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

 

#Sample gg.classpath for ApacheKafka

gg.classpath=dirprm/:/oracle/app/ogg4bigdata/ggjava/resources/lib/*:/oracle/app/ogg4bigdata/kafka/libs/*

#Sample gg.classpath for HDP

#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

 

javawriter.bootoptions=-Xmx512m-Xms32m -Djava.class.path=ggjava/ggjava.jar

       5. custom_kafka_producer.properties

 

bootstrap.servers=kafka_host:9092

acks=1

reconnect.backoff.ms=1000

测试过程:

如下在oracle数据库表中插入数据:

同时在kafka中监控JSON数据是否接收到


赞 1