【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube

本文涉及的产品
MSE Nacos 企业版免费试用,1600元额度,限量50份
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

一、源端做DML操作

1.源端表ztvoucher目前没有数据,现在做insert,并查询:


insert into ztvoucher (MANDT, GJAHR, BUKRS, BELNR, BUZEI, MONAT, BUDAT, HKONT, DMBTR, ZZ0014)
values ('666', '2222', '3432', '2200001414', '001', '01', '20190101', '9101000000', 100.00, '101');

1 row created.

SQL> commit;

Commit complete.

SQL> alter system switch logfile;

System altered.
SQL>  select * from ztvoucher;
MANDT    GJAHR    BUKRS    BELNR    BUZEI    MONAT    BUDAT    HKONT    DMBTR    ZZ0014
666    2222    3432    2200001414    001    01    20190101    9101000000    100.00    101
AI 代码解读

2.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}
AI 代码解读

发现源端做的insert已经在 topic:ZTVOUCHER_INS有了。
3.源端做update操作:

update ztvoucher set dmbtr=50 where mandt='666';
commit;
alter system switch logfile;
AI 代码解读

4.去kafka查看:
先看ZTOVOUCHER_INS 内容:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:33.799000","pos":"00000000080000012613","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}
AI 代码解读

发现除了之前的insert操作,现在update之后的数据也进来了。
再看ZTVOUCHER_DEL:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}
AI 代码解读

发现DEL的topic中也存入了update之前的数据。
5.源端做delete操作:

delete from ztvoucher where mandt='666';
commit;
alter system switch logfile;
AI 代码解读

6.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:26:26.353705","current_ts":"2019-05-22T16:27:15.049000","pos":"00000000080000012857","tokens":{"TKN-OP-TYPE":"DELETE"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}
AI 代码解读

发现除了上面update之前的数据以外,还写入了刚做的delete操作的数据。
好了,现在数据都组织好了,现在去流式创建cube。

二、流式构建cube

流式构建cube官方连接(本人用的2.4版本):
http://kylin.apache.org/cn/docs24/tutorial/cube_streaming.html
流式构建cube需要一个类型为timestamp的时间列字段用来标识消息的时间,从前面两个topic中的json数据可以看到,op_ts字段满足这个要求。
1、用j'son数据定义一张表
先来构建

目录
打赏
0
0
0
0
137
分享
相关文章
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
智能运维,由你定义:SAE自定义日志与监控解决方案
通过引入 Sidecar 容器的技术,SAE 为用户提供了更强大的自定义日志与监控解决方案,帮助用户轻松实现日志采集、监控指标收集等功能。未来,SAE 将会支持 istio 多租场景,帮助用户更高效地部署和管理服务网格。
324 51
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
指挥学校大数据系统解决方案
本系统集成九大核心平台,包括中心化指挥、数据处理、学生信息、反校园欺凌大数据、智慧课堂、学生行为综合、数据交换及其他外部系统云平台。通过这些平台,系统实现对学生行为、课堂表现、校园安全等多维度的实时监控与数据分析,为教育管理、执法机关、心理辅导等提供强有力的数据支持。特别地,反校园欺凌平台利用多种传感器和智能设备,确保及时发现并处理校园霸凌事件,保障学生权益。同时,系统还涵盖超市、食堂、图书馆、消防安全等辅助云平台,全面提升校园智能化管理水平。
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
455 58
高科技生命体征探测器、情绪感受器以及传感器背后的大数据平台在健康监测、生命体征检测领域的设想与系统构建
本系统由健康传感器、大数据云平台和脑机接口设备组成。传感器内置生命体征感应器、全球无线定位、人脸识别摄像头等,搜集超出现有科学认知的生命体征信息。云平台整合大数据、云计算与AI,处理并传输数据至接收者大脑芯片,实现实时健康监测。脑机接口设备通过先进通讯技术,实现对健康信息的实时感知与反馈,确保身份验证与数据安全。
idc机房智能运维解决方案
华汇数据中心一体化智能运维方案应运而生,以“自主可控、精准洞察、智能决策”三大核心能力,助力企业实现运维效率提升与综合成本下降的数字化转型目标。
249 24
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
489 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问