「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流。

Oracle在其Oracle GoldenGate for Big Data套件中提供了一个Kafka连接处理程序,用于将CDC(更改数据捕获)事件流推送到Apache Kafka集群。

因此,对于给定的Oracle数据库,成功完成的业务事务中的任何DML操作(插入、更新、删除)都将转换为实时发布的Kafka消息。

这种集成对于这类用例非常有趣和有用:

  1. 如果遗留的单片应用程序使用Oracle数据库作为单一数据源,那么应该可以通过监视相关表的更改来创建实时更新事件流。换句话说,我们可以实现来自遗留应用程序的数据管道,而无需更改它们。
  2. 我们需要承认只有在数据库事务成功完成时才会发布Kafka消息。为了赋予这个特性,我们可以(始终以事务的方式)在一个由GoldenGate特别监视的表中编写Kafka消息,通过它的Kafka连接处理程序,将发布一个“插入”事件来存储原始的Kafka消息。

在本文中,我们将逐步说明如何通过GoldenGate技术实现PoC(概念验证)来测试Oracle数据库与Kafka之间的集成。

PoC的先决条件

我们将安装所有的东西在一个本地虚拟机,所以你需要:

  1. 安装Oracle VirtualBox(我在Oracle VirtualBox 5.2.20上测试过)
  2. 16 gb的RAM。
  3. 大约75GB的磁盘空间空闲。
  4. 最后但并非最不重要的是:了解vi。

PoC架构

本指南将创建一个单一的虚拟机有:

  1. Oracle数据库12c:要监视的表存储在其中。
  2. Oracle GoldenGate 12c(经典版本):将应用于监视表的业务事务实时提取,以中间日志格式(trail log)存储,并将其输送到另一个GoldenGate(用于大数据)实例管理的远程日志。
  3. Oracle GoldenGate for Big Data 12c:pumped的业务事务并将其复制到Kafka消息中。
  4. Apache Zookeeper/Apache Kafka实例:在这里发布Kafka消息中转换的业务事务。

换句话说,在某些Oracle表上应用的任何插入、更新和删除操作都将生成Kafka消息的CDC事件流,该事件流将在单个Kafka主题中发布。

下面是我们将要创建的架构和实时数据流:


步骤1/12:启动Oracle数据库

您可以自由地安装Oracle数据库和Oracle GoldenGate手动。但幸运的是……)Oracle共享了一些虚拟机,这些虚拟机已经安装了所有的东西,可以随时进行开发。

Oracle虚拟机可以在这里下载,你需要一个免费的Oracle帐户来获得它们。

我使用了Oracle Big Data Lite虚拟机(ver)。4.11),它包含了很多Oracle产品,包括:

  1. Oracle数据库12c第一版企业版(12.1.0.2)
  2. Oracle GoldenGate 12c (12.3.0.1.2)

从上述下载页面获取所有7-zip文件(约22GB),提取VM映像文件BigDataLite411。在Oracle VirtualBox中双击文件,打开导入向导。完成导入过程后,一个名为BigDataLite-4.11的VM将可用。


启动BigDataLite-4.11并使用以下凭证登录:

  1. 用户:oracle
  2. 密码:welcome1

一个舒适的Linux桌面环境将会出现。

双击桌面上的“开始/停止服务”图标,然后:

  1. 检查第一项ORCL (Oracle数据库12c)。
  2. 不要检查所有其他的东西(对PoC无用且有害)。
  3. 按回车确认选择。


最后,Oracle数据库将启动。

当您重新启动虚拟机时,Oracle数据库将自动启动。

与下载的虚拟机有关的其他有用信息:

  1. Oracle主文件夹($ORACLE_HOME)是/u01/app/ Oracle /product/12.1.0.2/dbhome_1
  2. GoldenGate (classic)安装在/u01/ogg中
  3. SQL Developer安装在/u01/sqldeveloper中。您可以从上面工具栏中的图标启动SQL Developer。
  4. Oracle数据库是作为多租户容器数据库(CDB)安装的。
  5. Oracle数据库监听端口是1521
  6. 根容器的Oracle SID是cdb
  7. PDB(可插拔数据库)的Oracle SID是orcl
  8. 所有Oracle数据库用户(SYS、SYSTEM等)的密码都是welcome1
  9. 连接到PDB数据库的tnsname别名是ORCL(参见$ORACLE_HOME/network/admin/tnsnames)。ora文件内容)。
  10. Java主文件夹($JAVA_HOME)是/usr/java/latest
  11. $JAVA_HOME中安装的Java开发工具包是JDK8更新151。

步骤2/12:在Oracle中启用归档日志

我们需要在Oracle中启用归档日志来使用GoldenGate (classic)。

从VM的Linux shell中启动SQL Plus作为SYS:

sqlplus sys/welcome1 as sysdba

然后从SQL + shell运行这个命令列表(我建议一次启动一个):

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;ALTER DATABASE FORCE LOGGING;ALTER SYSTEM SWITCH LOGFILE;ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION=TRUE;SHUTDOWN IMMEDIATE;STARTUP MOUNT;ALTER DATABASE ARCHIVELOG;ALTER DATABASE OPEN;

然后检查存档日志是否成功启用:

ARCHIVE LOG LIST;

输出应该是这样的:

Database log mode Archive ModeAutomatic archival EnabledArchive destination USE_DB_RECOVERY_FILE_DESTOldest online log sequence 527Next log sequence to archive 529Current log sequence 529


步骤3/12:创建一个ggadmin用户

需要为GoldenGate (classic)创建一个特殊的Oracle管理员用户。

同样,从VM的Linux shell中打开SQL Plus:

sqlplus sys/welcome1作为sysdba

并通过运行这个脚本创建ggadmin用户:

ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE; CREATE USER ggadmin IDENTIFIED BY ggadmin;GRANT CREATE SESSION, CONNECT, RESOURCE, ALTER SYSTEM TO ggadmin;EXEC DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(grantee=>'ggadmin', privilege_type=>'CAPTURE', grant_optional_privileges=>'*');GRANT SELECT ANY DICTIONARY TO ggadmin;GRANT UNLIMITED TABLESPACE TO ggadmin;

步骤4/12 -创建ESHOP模式

我们将创建一个模式(ESHOP),其中只有两个表(CUSTOMER_ORDER和CUSTOMER_ORDER_ITEM),用于生成要推送到Kafka中的CDC事件流。

使用SQL Plus(或者,如果您愿意,也可以使用SQL Developer)连接orcl作为SID的Oracle PDB:

sqlplus sys/welcome1@ORCL as sysdba

运行这个脚本:

-- init session ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE; -- create tablespace for eshop CREATE TABLESPACE eshop_tbs DATAFILE 'eshop_tbs.dat' SIZE 10M AUTOEXTEND ON;CREATE TEMPORARY TABLESPACE eshop_tbs_temp TEMPFILE 'eshop_tbs_temp.dat' SIZE 5M AUTOEXTEND ON; -- create user schema eshop, please note that the password is eshopCREATE USER ESHOP IDENTIFIED BY eshop DEFAULT TABLESPACE eshop_tbs TEMPORARY TABLESPACE eshop_tbs_temp; -- grant eshop user permissionsGRANT CREATE SESSION TO ESHOP;GRANT CREATE TABLE TO ESHOP;GRANT UNLIMITED TABLESPACE TO ESHOP;GRANT RESOURCE TO ESHOP;GRANT CONNECT TO ESHOP;GRANT CREATE VIEW TO ESHOP; -- create eshop sequencesCREATE SEQUENCE ESHOP.CUSTOMER_ORDER_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;CREATE SEQUENCE ESHOP.CUSTOMER_ORDER_ITEM_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE; -- create eshop tablesCREATE TABLE ESHOP.CUSTOMER_ORDER ( ID NUMBER(19) PRIMARY KEY, CODE VARCHAR2(10), CREATED DATE, STATUS VARCHAR2(32), UPDATE_TIME TIMESTAMP); CREATE TABLE ESHOP.CUSTOMER_ORDER_ITEM ( ID NUMBER(19) PRIMARY KEY, ID_CUSTOMER_ORDER NUMBER(19), DESCRIPTION VARCHAR2(255), QUANTITY NUMBER(3), CONSTRAINT FK_CUSTOMER_ORDER FOREIGN KEY (ID_CUSTOMER_ORDER) REFERENCES ESHOP.CUSTOMER_ORDER (ID));

步骤5/12:初始化GoldenGate Classic

现在是时候在BigDataListe-4.11虚拟机中安装GoldenGate (classic)实例了。

从Linux shell运行:

cd /u01/ogg./ggsci

GoldenGate CLI(命令行界面)将启动:

Oracle GoldenGate Command Interpreter for OracleVersion 12.2.0.1.0 OGGCORE_12.2.0.1.0_PLATFORMS_151101.1925.2_FBOLinux, x64, 64bit (optimized), Oracle 12c on Nov 11 2015 03:53:23Operating system character set identified as UTF-8. Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved. GGSCI (bigdatalite.localdomain) 1>

从GoldenGate CLI启动经理与以下命令:

start mgr

它将引导GoldenGate的主控制器进程(监听端口7810)。

现在创建一个凭据库来存储ggadmin用户凭据(并使用具有相同名称的别名来引用它们):

add credentialstorealter credentialstore add user ggadmin password ggadmin alias ggadmin

现在,通过使用刚才创建的ggadmin别名连接到Oracle数据库,并启用对存储在名为orcl的PDB中的eshop模式的附加日志:

dblogin useridalias ggadminadd schematrandata orcl.eshop


步骤6/12:制作金门果提取物

在此步骤中,我们将创建一个GoldenGate摘要,此过程将监视Oracle archive重做日志,以捕获与ESHOP表相关的数据库事务,并将此SQL修改流写入另一个名为trail log的日志文件中。

从GoldenGate CLI运行:

edit params exteshop

该命令将打开一个引用新空文件的vi实例。在vi编辑器中放入以下内容:

EXTRACT exteshopUSERIDALIAS ggadminEXTTRAIL ./dirdat/aaTABLE orcl.eshop.*;

保存内容并退出vi,以便返回GoldenGate CLI。

保存的内容将存储在/u01/ogg/dirprm/exteshop中。人口、难民和移民事务局文件。您也可以在外部编辑它的内容,而不需要再次从GoldenGate CLI运行“edit params exteshop”命令。

现在在Oracle中注册提取过程,从GoldenGate CLI运行以下命令:

dblogin useridalias ggadminregister extract exteshop database container (orcl)

最后一个命令的输出应该是这样的:

OGG-02003 Extract EXTESHOP successfully registered with database at SCN 13624423.

使用所示的SCN号来完成提取配置。从GoldenGate CLI:

add extract exteshop, integrated tranlog, scn 13624423add exttrail ./dirdat/aa, extract exteshop

现在我们可以启动名为exteshop的GoldenGate提取过程:

start exteshop

你可以使用以下命令中的on来检查进程的状态:

info exteshopview report exteshop

验证提取过程是否正常工作以完成此步骤。从Linux shell运行以下命令,用SQL Plus(或SQL Developer)连接到ESHOP模式:

sqlplus eshop / eshop@ORCL

创建一个模拟客户订单:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA01', SYSDATE, 'DRAFT', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Toy Story', 1); COMMIT;

最后,从GoldenGate CLI跑出来:

stats exteshop

并验证前面的插入操作是否已计算在内。下面是stats命令输出的一个小示例:

Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER: *** Total statistics since 2019-05-29 09:18:12 ***Total inserts 1.00Total updates 0.00Total deletes 0.00Total discards 0.00Total operations 1.00

检查提取过程是否正常工作的另一种方法是检查GoldenGate跟踪日志文件的时间戳。在Linux shell中运行“ls -l /u01/ogg/dirdat/”,并验证以“aa”开头的文件的时间戳已经更改。

相关文章
|
4天前
|
消息中间件 缓存 架构师
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
Kafka 是一个高吞吐量、高性能的消息中间件,关于 Kafka 高性能背后的实现,是大厂面试高频问题。本篇全面详解 Kafka 高性能背后的实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
|
8天前
|
存储 Oracle NoSQL
【赵渝强老师】Oracle的体系架构
Oracle数据库的核心在于其体系架构,主要包括数据库与实例、存储结构、进程结构和内存结构。数据库由物理文件组成,实例则是内存和进程的组合。存储结构分为逻辑和物理两部分,进程结构涉及多个后台进程如SMON、PMON、DBWn等,内存结构则包含SGA和PGA。掌握这些知识有助于更好地管理和优化Oracle数据库。
|
8天前
|
消息中间件 存储 负载均衡
【赵渝强老师】Kafka的体系架构
Kafka消息系统是一个分布式系统,包含生产者、消费者、Broker和ZooKeeper。生产者将消息发送到Broker,消费者从Broker中拉取消息并处理。主题按分区存储,每个分区有唯一的偏移量地址,确保消息顺序。Kafka支持负载均衡和容错。视频讲解和术语表进一步帮助理解。
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
61 5
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
61 4
|
3月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
52 3
|
3月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
70 2
|
3月前
|
消息中间件 负载均衡 Kafka
【解密Kafka背后的秘密!】为什么Kafka不需要读写分离?深入剖析Kafka架构,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为高效实时数据处理与传输设计的消息系统,凭借其高吞吐量、低延迟及可扩展性在业界享有盛誉。不同于传统数据库常采用的读写分离策略,Kafka通过独特的分布式架构实现了无需读写分离即可满足高并发需求。其核心包括Producer(生产者)、Consumer(消费者)与Broker(代理),并通过分区复制、消费者组以及幂等性生产者等功能确保了系统的高效运行。本文通过分析Kafka的架构特性及其提供的示例代码,阐述了Kafka为何无需借助读写分离机制就能有效处理大量读写操作。
48 2
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
109 0
|
3月前
|
存储 运维 Oracle
常用的几种Oracle架构介绍
常用的几种Oracle架构介绍
52 0

推荐镜像

更多