【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

一、源端做DML操作

1.源端表ztvoucher目前没有数据,现在做insert,并查询:


insert into ztvoucher (MANDT, GJAHR, BUKRS, BELNR, BUZEI, MONAT, BUDAT, HKONT, DMBTR, ZZ0014)
values ('666', '2222', '3432', '2200001414', '001', '01', '20190101', '9101000000', 100.00, '101');

1 row created.

SQL> commit;

Commit complete.

SQL> alter system switch logfile;

System altered.
SQL>  select * from ztvoucher;
MANDT    GJAHR    BUKRS    BELNR    BUZEI    MONAT    BUDAT    HKONT    DMBTR    ZZ0014
666    2222    3432    2200001414    001    01    20190101    9101000000    100.00    101

2.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现源端做的insert已经在 topic:ZTVOUCHER_INS有了。
3.源端做update操作:

update ztvoucher set dmbtr=50 where mandt='666';
commit;
alter system switch logfile;

4.去kafka查看:
先看ZTOVOUCHER_INS 内容:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:33.799000","pos":"00000000080000012613","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了之前的insert操作,现在update之后的数据也进来了。
再看ZTVOUCHER_DEL:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现DEL的topic中也存入了update之前的数据。
5.源端做delete操作:

delete from ztvoucher where mandt='666';
commit;
alter system switch logfile;

6.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:26:26.353705","current_ts":"2019-05-22T16:27:15.049000","pos":"00000000080000012857","tokens":{"TKN-OP-TYPE":"DELETE"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了上面update之前的数据以外,还写入了刚做的delete操作的数据。
好了,现在数据都组织好了,现在去流式创建cube。

二、流式构建cube

流式构建cube官方连接(本人用的2.4版本):
http://kylin.apache.org/cn/docs24/tutorial/cube_streaming.html
流式构建cube需要一个类型为timestamp的时间列字段用来标识消息的时间,从前面两个topic中的json数据可以看到,op_ts字段满足这个要求。
1、用j'son数据定义一张表
先来构建

相关文章
|
27天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
210 92
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
25天前
|
存储 搜索推荐 大数据
数据大爆炸:解析大数据的起源及其对未来的启示
数据大爆炸:解析大数据的起源及其对未来的启示
89 15
数据大爆炸:解析大数据的起源及其对未来的启示
|
17天前
|
分布式计算 大数据 流计算
玩转数据:初学者的大数据处理工具指南
玩转数据:初学者的大数据处理工具指南
70 14
|
20天前
|
数据采集 存储 机器学习/深度学习
数据的秘密:如何用大数据分析挖掘商业价值
数据的秘密:如何用大数据分析挖掘商业价值
46 9
|
30天前
|
数据采集 存储 分布式计算
解密大数据:从零开始了解数据海洋
解密大数据:从零开始了解数据海洋
66 17
|
1月前
|
运维 监控 Cloud Native
构建深度可观测、可集成的网络智能运维平台
本文介绍了构建深度可观测、可集成的网络智能运维平台(简称NIS),旨在解决云上网络运维面临的复杂挑战。内容涵盖云网络运维的三大难题、打造云原生AIOps工具集的解决思路、可观测性对业务稳定的重要性,以及产品发布的亮点,包括流量分析NPM、网络架构巡检和自动化运维OpenAPI,助力客户实现自助运维与优化。
|
1月前
|
数据采集 机器学习/深度学习 DataWorks
DataWorks产品评测:大数据开发治理的深度体验
DataWorks产品评测:大数据开发治理的深度体验
113 1
|
2月前
|
数据采集 机器学习/深度学习 人工智能
基于AI的网络流量分析:构建智能化运维体系
基于AI的网络流量分析:构建智能化运维体系
204 13
|
2月前
|
Prometheus 运维 监控
Prometheus+Grafana+NodeExporter:构建出色的Linux监控解决方案,让你的运维更轻松
本文介绍如何使用 Prometheus + Grafana + Node Exporter 搭建 Linux 主机监控系统。Prometheus 负责收集和存储指标数据,Grafana 用于可视化展示,Node Exporter 则采集主机的性能数据。通过 Docker 容器化部署,简化安装配置过程。完成安装后,配置 Prometheus 抓取节点数据,并在 Grafana 中添加数据源及导入仪表盘模板,实现对 Linux 主机的全面监控。整个过程简单易行,帮助运维人员轻松掌握系统状态。
287 3