「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流(下)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

步骤7/12:安装并运行Apache Kafka

从VM的桌面环境中打开Firefox并下载Apache Kafka(我使用的是kafka_2.11-2.1.1.tgz)。

现在,打开一个Linux shell并重置CLASSPATH环境变量(在BigDataLite-4.11虚拟机中设置的当前值会在Kafka中产生冲突):

declare -x CLASSPATH=""

从同一个Linux shell中,解压缩压缩包,启动ZooKeeper和Kafka:

cdtar zxvf Downloads/kafka_2.11-2.1.1.tgzcd kafka_2.11-2.1.1./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties./bin/kafka-server-start.sh -daemon config/server.properties

你可以通过启动“echo stats | nc localhost 2181”来检查ZooKeeper是否正常:

[oracle@bigdatalite ~]$ echo stats | nc localhost 2181Zookeeper version: 3.4.5-cdh5.13.1--1, built on 11/09/2017 16:28 GMTClients: /127.0.0.1:34997[1](queued=0,recved=7663,sent=7664) /0:0:0:0:0:0:0:1:17701[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/25Received: 8186Sent: 8194Connections: 2Outstanding: 0Zxid: 0x3fMode: standaloneNode count: 25

您可以检查Kafka是否与“echo dump | nc localhost 2181 | grep代理”(一个字符串/brokers/ids/0应该出现)

[oracle@bigdatalite ~]$ echo dump | nc localhost 2181 | grep brokers/brokers/ids/0

用于PoC的BigDataLite-4.11虚拟机已经在启动虚拟机时启动了一个较老的ZooKeeper实例。因此,请确保禁用了步骤1中描述的所有服务。

此外,当您打开一个新的Linux shell时,请注意在启动ZooKeeper和Kafka之前总是要重置CLASSPATH环境变量,这一点在步骤开始时已经解释过了。


步骤8/12:为大数据安装GoldenGate

同样,从这个页面下载Oracle GoldenGate for Big Data 12c只需要使用VM中安装的Firefox浏览器(我在Linux x86-64上使用Oracle GoldenGate for Big Data 12.3.2.1.1)。请注意,您需要一个(免费)Oracle帐户来获得它。

安装很容易,只是爆炸压缩包内的下载:

cd ~/Downloadsunzip OGG_BigData_Linux_x64_12.3.2.1.1.zipcd ..mkdir ogg-bd-poccd ogg-bd-poctar xvf ../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar

就这样,GoldenGate for Big Data 12c被安装在/home/oracle/ogg-bd-poc文件夹中。

同样,BigDataLite-4.11虚拟机已经在/u01/ogg-bd文件夹中安装了用于大数据的GoldenGate。但它是一个较旧的版本,连接Kafka的选项较少。

步骤9/12:启动GoldenGate for Big Data Manager

打开大数据大门

cd ~/ogg-bd-poc./ggsci

需要更改管理器端口,否则之前启动的与GoldenGate (classic)管理器的冲突将被引发。

因此,从大数据的GoldenGate来看,CLI运行:

create subdirsedit params mgr

一个vi实例将开始,只是写这个内容:

PORT 27801

然后保存内容,退出vi,返回CLI,我们终于可以启动GoldenGate for Big Data manager监听端口27081:


步骤10/12:创建数据泵(Data Pump)

现在,我们需要创建在GoldenGate世界中被称为数据泵的东西。数据泵是一个提取过程,它监视一个跟踪日志,并(实时地)将任何更改推到另一个由不同的(通常是远程的)GoldenGate实例管理的跟踪日志。

对于这个PoC,由GoldenGate (classic)管理的trail log aa将被泵送至GoldenGate管理的trail log bb进行大数据处理。

因此,如果您关闭它,请回到来自Linux shell的GoldenGate(经典)CLI:

cd /u01/ogg./ggsci

来自GoldenGate(经典)CLI:

edit params pmpeshop

并在vi中加入以下内容:

EXTRACT pmpeshopUSERIDALIAS ggadminSETENV (ORACLE_SID='orcl')-- GoldenGate for Big Data address/port:RMTHOST localhost, MGRPORT 27801RMTTRAIL ./dirdat/bbPASSTHRU-- The "tokens" part it is useful for writing in the Kafka messages-- the Transaction ID and the database Change Serial NumberTABLE orcl.eshop.*, tokens(txid = @GETENV('TRANSACTION', 'XID'), csn = @GETENV('TRANSACTION', 'CSN'));

保存内容并退出vi。

正如已经解释的提取器,保存的内容将存储在/u01/ogg/dirprm/pmpeshop中。人口、难民和移民事务局文件。

现在我们要注册并启动数据泵,从GoldenGate CLI:

dblogin useridalias ggadminadd extract pmpeshop, exttrailsource ./dirdat/aa begin nowadd rmttrail ./dirdat/bb extract pmpeshopstart pmpeshop

通过从CLI运行以下命令之一来检查数据泵的状态:

info pmpeshopview report pmpeshop

你甚至可以在金门大数据的dirdat文件夹中查看trail log bb是否已经创建:

[oracle@bigdatalite dirdat]$ ls -l ~/ogg-bd-poc/dirdattotal 0-rw-r-----. 1 oracle oinstall 0 May 30 13:22 bb000000000[oracle@bigdatalite dirdat]$

那检查泵送过程呢?来自Linux shell:

sqlplus eshop/eshop@ORCL

执行这个SQL脚本创建一个新的模拟客户订单:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA02', SYSDATE, 'SHIPPING', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Inside Out', 1); COMMIT;

现在从GoldenGate(经典)CLI运行:

stats pmpeshop

用于检查插入操作是否正确计数(在输出的一部分下面):

GGSCI (bigdatalite.localdomain as ggadmin@cdb/CDB$ROOT) 11> stats pmpeshop Sending STATS request to EXTRACT PMPESHOP ... Start of Statistics at 2019-05-30 14:49:00. Output to ./dirdat/bb: Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER: *** Total statistics since 2019-05-30 14:01:56 ***Total inserts 1.00Total updates 0.00Total deletes 0.00Total discards 0.00Total operations 1.00

此外,您还可以验证GoldenGate中存储的用于测试泵过程的大数据的跟踪日志的时间戳。事务提交后,从Linux shell运行:“ln -l ~/og -bd-poc/dirdat”,并检查最后一个以“bb”作为前缀的文件的时间戳。


步骤11/12:将事务发布到Kafka

最后,我们将在GoldenGate中为BigData创建一个副本流程,以便在Kafka主题中发布泵出的业务事务。replicat将从trail日志bb读取事务中的插入、更新和删除操作,并将它们转换为JSON编码的Kafka消息。

因此,创建一个名为eshop_kafkaconnect的文件。文件夹/home/oracle/ogg-bd- pocd /dirprm中的属性包含以下内容:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties# ----------------------------------------------------------- # address/port of the Kafka brokerbootstrap.servers=localhost:9092acks=1 #JSON Converter Settingskey.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false #Adjust for performancebuffer.memory=33554432batch.size=16384linger.ms=0 # This property fix a start-up error as explained by Oracle Support here:# https://support.oracle.com/knowledge/Middleware/2455697_1.htmlconverter.type=key

在同一个文件夹中,创建一个名为eshop_kc的文件。具有以下内容的道具:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kc.props# ---------------------------------------------------gg.handlerlist=kafkaconnect #The handler propertiesgg.handler.kafkaconnect.type=kafkaconnectgg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.propertiesgg.handler.kafkaconnect.mode=tx #The following selects the topic name based only on the schema namegg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName} #The following selects the message key using the concatenated primary keysgg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter propertiesgg.handler.kafkaconnect.messageFormatting=opgg.handler.kafkaconnect.insertOpKey=Igg.handler.kafkaconnect.updateOpKey=Ugg.handler.kafkaconnect.deleteOpKey=Dgg.handler.kafkaconnect.truncateOpKey=Tgg.handler.kafkaconnect.treatAllColumnsAsStrings=falsegg.handler.kafkaconnect.iso8601Format=falsegg.handler.kafkaconnect.pkUpdateHandling=abendgg.handler.kafkaconnect.includeTableName=truegg.handler.kafkaconnect.includeOpType=truegg.handler.kafkaconnect.includeOpTimestamp=truegg.handler.kafkaconnect.includeCurrentTimestamp=truegg.handler.kafkaconnect.includePosition=truegg.handler.kafkaconnect.includePrimaryKeys=truegg.handler.kafkaconnect.includeTokens=true goldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE gg.log=log4jgg.log.level=INFO gg.report.time=30sec # Apache Kafka Classpath# Put the path of the "libs" folder inside the Kafka home pathgg.classpath=/home/oracle/kafka_2.11-2.1.1/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

如果关闭,重启大数据CLI的GoldenGate:

cd ~/ogg-bd-poc./ggsci

and start to create a replicat from the CLI with:

edit params repeshop

in vi put this content:

REPLICAT repeshopTARGETDB LIBFILE libggjava.so SET property=dirprm/eshop_kc.propsGROUPTRANSOPS 1000MAP orcl.eshop.*, TARGET orcl.eshop.*;

然后保存内容并退出vi。现在将replicat与trail log bb关联,并使用以下命令启动replicat进程,以便从GoldenGate启动大数据CLI:

add replicat repeshop, exttrail ./dirdat/bbstart repeshop

Check that the replicat is live and kicking with one of these commands:

info repeshopview report repeshop

Now, connect to the ESHOP schema from another Linux shell:

sqlplus eshop/eshop@ORCL

and commit something:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'DELIVERED', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Cars 3', 2); COMMIT;

From the GoldenGate for Big Data CLI, check that the INSERT operation was counted for the replicat process by running:

stats repeshop

And (hurrah!) we can have a look inside Kafka, as the Linux shell checks that the topic named CDC-ESHOP was created:

cd ~/kafka_2.11-2.1.1/bin./kafka-topics.sh --list --zookeeper localhost:2181

and from the same folder run the following command for showing the CDC events stored in the topic:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning

You should see something like:

[oracle@bigdatalite kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning {"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"CODE":"AAAA03","CREATED":"2019-05-31 04:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-31 04:24:34.929950000"}}{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"ID_CUSTOMER_ORDER":11.0,"DESCRIPTION":"Cars 3","QUANTITY":2}}

For a better output, install jq:

sudo yum -y install jq./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning | jq .

and here is how will appear the JSON events:

{ "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.637000", "pos": "00000000020000003830", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "CODE": "AAAA03", "CREATED": "2019-05-31 04:24:34", "STATUS": "DELIVERED", "UPDATE_TIME": "2019-05-31 04:24:34.929950000" }}{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.650000", "pos": "00000000020000004074", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "ID_CUSTOMER_ORDER": 11, "DESCRIPTION": "Cars 3", "QUANTITY": 2 }}

现在打开Kafka -console-consumer.sh进程,并在ESHOP上执行其他一些数据库事务,以便实时打印发送给Kafka的CDC事件流。

以下是一些用于更新和删除操作的JSON事件示例:

// Generated with: UPDATE CUSTOMER_ORDER SET STATUS='DELIVERED' WHERE ID=8; { "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "U", "op_ts": "2019-05-31 06:22:07.000245", "current_ts": "2019-05-31 06:22:11.233000", "pos": "00000000020000004234", "primary_keys": [ "ID" ], "tokens": { "txid": "14.6.2656", "csn": "13913689" }, "before": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "SHIPPING", "UPDATE_TIME": null }, "after": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "DELIVERED", "UPDATE_TIME": null }} // Generated with: DELETE CUSTOMER_ORDER_ITEM WHERE ID=3;{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "D", "op_ts": "2019-05-31 06:25:59.000916", "current_ts": "2019-05-31 06:26:04.910000", "pos": "00000000020000004432", "primary_keys": [ "ID" ], "tokens": { "txid": "14.24.2651", "csn": "13913846" }, "before": { "ID": 3, "ID_CUSTOMER_ORDER": 1, "DESCRIPTION": "Toy Story", "QUANTITY": 1 }, "after": null}

恭喜你!你完成了PoC:


步骤12/12:使用PoC

GoldenGate中提供的Kafka Connect处理程序有很多有用的选项,可以根据需要定制集成。点击这里查看官方文件。

例如,您可以选择为CDC流中涉及的每个表创建不同的主题,只需在eshop_kc.props中编辑此属性:

gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}-${tableName}

更改后重新启动replicat,从GoldenGate for Big Data CLI:

stop repeshopstart repeshop

您可以在“~/og -bd-poc/AdapterExamples/big-data/kafka_connect”文件夹中找到其他配置示例。

结论

在本文中,我们通过GoldenGate技术在Oracle数据库和Kafka代理之间创建了一个完整的集成。CDC事件流以Kafka实时发布。

为了简单起见,我们使用了一个已经全部安装的虚拟机,但是您可以在不同的主机上免费安装用于大数据的GoldenGate和Kafka。

请在评论中告诉我您对这种集成的潜力(或限制)的看法。

目录
打赏
0
0
0
0
111
分享
相关文章
docker arm架构部署kafka要点
本内容介绍了基于 Docker 的容器化解决方案,包含以下部分: 1. **Docker 容器管理**:通过 Portainer 可视化管理工具实现对主节点和代理节点的统一管理。 2. **Kafka 可视化工具**:部署 Kafka-UI 以图形化方式监控和管理 Kafka 集群,支持动态配置功能, 3. **Kafka 安装与配置**:基于 Bitnami Kafka 镜像,提供完整的 Kafka 集群配置示例,涵盖 KRaft 模式、性能调优参数及数据持久化设置,适用于高可用生产环境。 以上方案适合 ARM64 架构,为用户提供了一站式的容器化管理和消息队列解决方案。
ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用
本文整理自2024年云栖大会阿里云智能集团高级技术专家金吉祥的演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》。
246 38
事件驱动架构是一种编程范式
【10月更文挑战第7天】事件驱动架构是一种编程范式
154 65
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
Kafka 是一个高吞吐量、高性能的消息中间件,关于 Kafka 高性能背后的实现,是大厂面试高频问题。本篇全面详解 Kafka 高性能背后的实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
事件驱动架构的实现方式?
【10月更文挑战第7天】事件驱动架构的实现方式?
112 7
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
82 7
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
135 5
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
149 4
【赵渝强老师】Kafka的体系架构
Kafka消息系统是一个分布式系统,包含生产者、消费者、Broker和ZooKeeper。生产者将消息发送到Broker,消费者从Broker中拉取消息并处理。主题按分区存储,每个分区有唯一的偏移量地址,确保消息顺序。Kafka支持负载均衡和容错。视频讲解和术语表进一步帮助理解。

热门文章

最新文章

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等