「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流(下)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

步骤7/12:安装并运行Apache Kafka

从VM的桌面环境中打开Firefox并下载Apache Kafka(我使用的是kafka_2.11-2.1.1.tgz)。

现在,打开一个Linux shell并重置CLASSPATH环境变量(在BigDataLite-4.11虚拟机中设置的当前值会在Kafka中产生冲突):

declare -x CLASSPATH=""

从同一个Linux shell中,解压缩压缩包,启动ZooKeeper和Kafka:

cdtar zxvf Downloads/kafka_2.11-2.1.1.tgzcd kafka_2.11-2.1.1./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties./bin/kafka-server-start.sh -daemon config/server.properties

你可以通过启动“echo stats | nc localhost 2181”来检查ZooKeeper是否正常:

[oracle@bigdatalite ~]$ echo stats | nc localhost 2181Zookeeper version: 3.4.5-cdh5.13.1--1, built on 11/09/2017 16:28 GMTClients: /127.0.0.1:34997[1](queued=0,recved=7663,sent=7664) /0:0:0:0:0:0:0:1:17701[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/25Received: 8186Sent: 8194Connections: 2Outstanding: 0Zxid: 0x3fMode: standaloneNode count: 25

您可以检查Kafka是否与“echo dump | nc localhost 2181 | grep代理”(一个字符串/brokers/ids/0应该出现)

[oracle@bigdatalite ~]$ echo dump | nc localhost 2181 | grep brokers/brokers/ids/0

用于PoC的BigDataLite-4.11虚拟机已经在启动虚拟机时启动了一个较老的ZooKeeper实例。因此,请确保禁用了步骤1中描述的所有服务。

此外,当您打开一个新的Linux shell时,请注意在启动ZooKeeper和Kafka之前总是要重置CLASSPATH环境变量,这一点在步骤开始时已经解释过了。


步骤8/12:为大数据安装GoldenGate

同样,从这个页面下载Oracle GoldenGate for Big Data 12c只需要使用VM中安装的Firefox浏览器(我在Linux x86-64上使用Oracle GoldenGate for Big Data 12.3.2.1.1)。请注意,您需要一个(免费)Oracle帐户来获得它。

安装很容易,只是爆炸压缩包内的下载:

cd ~/Downloadsunzip OGG_BigData_Linux_x64_12.3.2.1.1.zipcd ..mkdir ogg-bd-poccd ogg-bd-poctar xvf ../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar

就这样,GoldenGate for Big Data 12c被安装在/home/oracle/ogg-bd-poc文件夹中。

同样,BigDataLite-4.11虚拟机已经在/u01/ogg-bd文件夹中安装了用于大数据的GoldenGate。但它是一个较旧的版本,连接Kafka的选项较少。

步骤9/12:启动GoldenGate for Big Data Manager

打开大数据大门

cd ~/ogg-bd-poc./ggsci

需要更改管理器端口,否则之前启动的与GoldenGate (classic)管理器的冲突将被引发。

因此,从大数据的GoldenGate来看,CLI运行:

create subdirsedit params mgr

一个vi实例将开始,只是写这个内容:

PORT 27801

然后保存内容,退出vi,返回CLI,我们终于可以启动GoldenGate for Big Data manager监听端口27081:


步骤10/12:创建数据泵(Data Pump)

现在,我们需要创建在GoldenGate世界中被称为数据泵的东西。数据泵是一个提取过程,它监视一个跟踪日志,并(实时地)将任何更改推到另一个由不同的(通常是远程的)GoldenGate实例管理的跟踪日志。

对于这个PoC,由GoldenGate (classic)管理的trail log aa将被泵送至GoldenGate管理的trail log bb进行大数据处理。

因此,如果您关闭它,请回到来自Linux shell的GoldenGate(经典)CLI:

cd /u01/ogg./ggsci

来自GoldenGate(经典)CLI:

edit params pmpeshop

并在vi中加入以下内容:

EXTRACT pmpeshopUSERIDALIAS ggadminSETENV (ORACLE_SID='orcl')-- GoldenGate for Big Data address/port:RMTHOST localhost, MGRPORT 27801RMTTRAIL ./dirdat/bbPASSTHRU-- The "tokens" part it is useful for writing in the Kafka messages-- the Transaction ID and the database Change Serial NumberTABLE orcl.eshop.*, tokens(txid = @GETENV('TRANSACTION', 'XID'), csn = @GETENV('TRANSACTION', 'CSN'));

保存内容并退出vi。

正如已经解释的提取器,保存的内容将存储在/u01/ogg/dirprm/pmpeshop中。人口、难民和移民事务局文件。

现在我们要注册并启动数据泵,从GoldenGate CLI:

dblogin useridalias ggadminadd extract pmpeshop, exttrailsource ./dirdat/aa begin nowadd rmttrail ./dirdat/bb extract pmpeshopstart pmpeshop

通过从CLI运行以下命令之一来检查数据泵的状态:

info pmpeshopview report pmpeshop

你甚至可以在金门大数据的dirdat文件夹中查看trail log bb是否已经创建:

[oracle@bigdatalite dirdat]$ ls -l ~/ogg-bd-poc/dirdattotal 0-rw-r-----. 1 oracle oinstall 0 May 30 13:22 bb000000000[oracle@bigdatalite dirdat]$

那检查泵送过程呢?来自Linux shell:

sqlplus eshop/eshop@ORCL

执行这个SQL脚本创建一个新的模拟客户订单:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA02', SYSDATE, 'SHIPPING', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Inside Out', 1); COMMIT;

现在从GoldenGate(经典)CLI运行:

stats pmpeshop

用于检查插入操作是否正确计数(在输出的一部分下面):

GGSCI (bigdatalite.localdomain as ggadmin@cdb/CDB$ROOT) 11> stats pmpeshop Sending STATS request to EXTRACT PMPESHOP ... Start of Statistics at 2019-05-30 14:49:00. Output to ./dirdat/bb: Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER: *** Total statistics since 2019-05-30 14:01:56 ***Total inserts 1.00Total updates 0.00Total deletes 0.00Total discards 0.00Total operations 1.00

此外,您还可以验证GoldenGate中存储的用于测试泵过程的大数据的跟踪日志的时间戳。事务提交后,从Linux shell运行:“ln -l ~/og -bd-poc/dirdat”,并检查最后一个以“bb”作为前缀的文件的时间戳。


步骤11/12:将事务发布到Kafka

最后,我们将在GoldenGate中为BigData创建一个副本流程,以便在Kafka主题中发布泵出的业务事务。replicat将从trail日志bb读取事务中的插入、更新和删除操作,并将它们转换为JSON编码的Kafka消息。

因此,创建一个名为eshop_kafkaconnect的文件。文件夹/home/oracle/ogg-bd- pocd /dirprm中的属性包含以下内容:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties# ----------------------------------------------------------- # address/port of the Kafka brokerbootstrap.servers=localhost:9092acks=1 #JSON Converter Settingskey.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false #Adjust for performancebuffer.memory=33554432batch.size=16384linger.ms=0 # This property fix a start-up error as explained by Oracle Support here:# https://support.oracle.com/knowledge/Middleware/2455697_1.htmlconverter.type=key

在同一个文件夹中,创建一个名为eshop_kc的文件。具有以下内容的道具:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kc.props# ---------------------------------------------------gg.handlerlist=kafkaconnect #The handler propertiesgg.handler.kafkaconnect.type=kafkaconnectgg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.propertiesgg.handler.kafkaconnect.mode=tx #The following selects the topic name based only on the schema namegg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName} #The following selects the message key using the concatenated primary keysgg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter propertiesgg.handler.kafkaconnect.messageFormatting=opgg.handler.kafkaconnect.insertOpKey=Igg.handler.kafkaconnect.updateOpKey=Ugg.handler.kafkaconnect.deleteOpKey=Dgg.handler.kafkaconnect.truncateOpKey=Tgg.handler.kafkaconnect.treatAllColumnsAsStrings=falsegg.handler.kafkaconnect.iso8601Format=falsegg.handler.kafkaconnect.pkUpdateHandling=abendgg.handler.kafkaconnect.includeTableName=truegg.handler.kafkaconnect.includeOpType=truegg.handler.kafkaconnect.includeOpTimestamp=truegg.handler.kafkaconnect.includeCurrentTimestamp=truegg.handler.kafkaconnect.includePosition=truegg.handler.kafkaconnect.includePrimaryKeys=truegg.handler.kafkaconnect.includeTokens=true goldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE gg.log=log4jgg.log.level=INFO gg.report.time=30sec # Apache Kafka Classpath# Put the path of the "libs" folder inside the Kafka home pathgg.classpath=/home/oracle/kafka_2.11-2.1.1/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

如果关闭,重启大数据CLI的GoldenGate:

cd ~/ogg-bd-poc./ggsci

and start to create a replicat from the CLI with:

edit params repeshop

in vi put this content:

REPLICAT repeshopTARGETDB LIBFILE libggjava.so SET property=dirprm/eshop_kc.propsGROUPTRANSOPS 1000MAP orcl.eshop.*, TARGET orcl.eshop.*;

然后保存内容并退出vi。现在将replicat与trail log bb关联,并使用以下命令启动replicat进程,以便从GoldenGate启动大数据CLI:

add replicat repeshop, exttrail ./dirdat/bbstart repeshop

Check that the replicat is live and kicking with one of these commands:

info repeshopview report repeshop

Now, connect to the ESHOP schema from another Linux shell:

sqlplus eshop/eshop@ORCL

and commit something:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'DELIVERED', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Cars 3', 2); COMMIT;

From the GoldenGate for Big Data CLI, check that the INSERT operation was counted for the replicat process by running:

stats repeshop

And (hurrah!) we can have a look inside Kafka, as the Linux shell checks that the topic named CDC-ESHOP was created:

cd ~/kafka_2.11-2.1.1/bin./kafka-topics.sh --list --zookeeper localhost:2181

and from the same folder run the following command for showing the CDC events stored in the topic:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning

You should see something like:

[oracle@bigdatalite kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning {"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"CODE":"AAAA03","CREATED":"2019-05-31 04:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-31 04:24:34.929950000"}}{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"ID_CUSTOMER_ORDER":11.0,"DESCRIPTION":"Cars 3","QUANTITY":2}}

For a better output, install jq:

sudo yum -y install jq./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning | jq .

and here is how will appear the JSON events:

{ "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.637000", "pos": "00000000020000003830", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "CODE": "AAAA03", "CREATED": "2019-05-31 04:24:34", "STATUS": "DELIVERED", "UPDATE_TIME": "2019-05-31 04:24:34.929950000" }}{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.650000", "pos": "00000000020000004074", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "ID_CUSTOMER_ORDER": 11, "DESCRIPTION": "Cars 3", "QUANTITY": 2 }}

现在打开Kafka -console-consumer.sh进程,并在ESHOP上执行其他一些数据库事务,以便实时打印发送给Kafka的CDC事件流。

以下是一些用于更新和删除操作的JSON事件示例:

// Generated with: UPDATE CUSTOMER_ORDER SET STATUS='DELIVERED' WHERE ID=8; { "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "U", "op_ts": "2019-05-31 06:22:07.000245", "current_ts": "2019-05-31 06:22:11.233000", "pos": "00000000020000004234", "primary_keys": [ "ID" ], "tokens": { "txid": "14.6.2656", "csn": "13913689" }, "before": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "SHIPPING", "UPDATE_TIME": null }, "after": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "DELIVERED", "UPDATE_TIME": null }} // Generated with: DELETE CUSTOMER_ORDER_ITEM WHERE ID=3;{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "D", "op_ts": "2019-05-31 06:25:59.000916", "current_ts": "2019-05-31 06:26:04.910000", "pos": "00000000020000004432", "primary_keys": [ "ID" ], "tokens": { "txid": "14.24.2651", "csn": "13913846" }, "before": { "ID": 3, "ID_CUSTOMER_ORDER": 1, "DESCRIPTION": "Toy Story", "QUANTITY": 1 }, "after": null}

恭喜你!你完成了PoC:


步骤12/12:使用PoC

GoldenGate中提供的Kafka Connect处理程序有很多有用的选项,可以根据需要定制集成。点击这里查看官方文件。

例如,您可以选择为CDC流中涉及的每个表创建不同的主题,只需在eshop_kc.props中编辑此属性:

gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}-${tableName}

更改后重新启动replicat,从GoldenGate for Big Data CLI:

stop repeshopstart repeshop

您可以在“~/og -bd-poc/AdapterExamples/big-data/kafka_connect”文件夹中找到其他配置示例。

结论

在本文中,我们通过GoldenGate技术在Oracle数据库和Kafka代理之间创建了一个完整的集成。CDC事件流以Kafka实时发布。

为了简单起见,我们使用了一个已经全部安装的虚拟机,但是您可以在不同的主机上免费安装用于大数据的GoldenGate和Kafka。

请在评论中告诉我您对这种集成的潜力(或限制)的看法。

相关文章
|
7天前
|
消息中间件 缓存 架构师
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
Kafka 是一个高吞吐量、高性能的消息中间件,关于 Kafka 高性能背后的实现,是大厂面试高频问题。本篇全面详解 Kafka 高性能背后的实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
|
12天前
|
消息中间件 存储 负载均衡
【赵渝强老师】Kafka的体系架构
Kafka消息系统是一个分布式系统,包含生产者、消费者、Broker和ZooKeeper。生产者将消息发送到Broker,消费者从Broker中拉取消息并处理。主题按分区存储,每个分区有唯一的偏移量地址,确保消息顺序。Kafka支持负载均衡和容错。视频讲解和术语表进一步帮助理解。
|
1月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
37 7
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
62 5
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
66 4
|
3月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
52 3
|
3月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
77 2
|
3月前
|
消息中间件 负载均衡 Kafka
【解密Kafka背后的秘密!】为什么Kafka不需要读写分离?深入剖析Kafka架构,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为高效实时数据处理与传输设计的消息系统,凭借其高吞吐量、低延迟及可扩展性在业界享有盛誉。不同于传统数据库常采用的读写分离策略,Kafka通过独特的分布式架构实现了无需读写分离即可满足高并发需求。其核心包括Producer(生产者)、Consumer(消费者)与Broker(代理),并通过分区复制、消费者组以及幂等性生产者等功能确保了系统的高效运行。本文通过分析Kafka的架构特性及其提供的示例代码,阐述了Kafka为何无需借助读写分离机制就能有效处理大量读写操作。
49 2
|
3月前
|
消息中间件 存储 Java
图解Kafka:Kafka架构演化与升级!
图解Kafka:Kafka架构演化与升级!
89 0
图解Kafka:Kafka架构演化与升级!
|
3月前
|
消息中间件 存储 SQL
Kafka架构及其原理
Kafka架构及其原理
100 1

推荐镜像

更多
下一篇
无影云桌面