【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

一、源端做DML操作

1.源端表ztvoucher目前没有数据,现在做insert,并查询:


insert into ztvoucher (MANDT, GJAHR, BUKRS, BELNR, BUZEI, MONAT, BUDAT, HKONT, DMBTR, ZZ0014)
values ('666', '2222', '3432', '2200001414', '001', '01', '20190101', '9101000000', 100.00, '101');

1 row created.

SQL> commit;

Commit complete.

SQL> alter system switch logfile;

System altered.
SQL>  select * from ztvoucher;
MANDT    GJAHR    BUKRS    BELNR    BUZEI    MONAT    BUDAT    HKONT    DMBTR    ZZ0014
666    2222    3432    2200001414    001    01    20190101    9101000000    100.00    101

2.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现源端做的insert已经在 topic:ZTVOUCHER_INS有了。
3.源端做update操作:

update ztvoucher set dmbtr=50 where mandt='666';
commit;
alter system switch logfile;

4.去kafka查看:
先看ZTOVOUCHER_INS 内容:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:33.799000","pos":"00000000080000012613","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了之前的insert操作,现在update之后的数据也进来了。
再看ZTVOUCHER_DEL:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现DEL的topic中也存入了update之前的数据。
5.源端做delete操作:

delete from ztvoucher where mandt='666';
commit;
alter system switch logfile;

6.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:26:26.353705","current_ts":"2019-05-22T16:27:15.049000","pos":"00000000080000012857","tokens":{"TKN-OP-TYPE":"DELETE"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了上面update之前的数据以外,还写入了刚做的delete操作的数据。
好了,现在数据都组织好了,现在去流式创建cube。

二、流式构建cube

流式构建cube官方连接(本人用的2.4版本):
http://kylin.apache.org/cn/docs24/tutorial/cube_streaming.html
流式构建cube需要一个类型为timestamp的时间列字段用来标识消息的时间,从前面两个topic中的json数据可以看到,op_ts字段满足这个要求。
1、用j'son数据定义一张表
先来构建

相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
167 0
|
19天前
|
运维 监控 关系型数据库
数据库管理中的自动化运维:挑战与解决方案
数据库管理中的自动化运维:挑战与解决方案
|
22天前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
36 2
|
1月前
|
运维 Serverless 数据处理
Serverless架构通过提供更快的研发交付速度、降低成本、简化运维、优化资源利用、提供自动扩展能力、支持实时数据处理和快速原型开发等优势,为图像处理等计算密集型应用提供了一个高效、灵活且成本效益高的解决方案。
Serverless架构通过提供更快的研发交付速度、降低成本、简化运维、优化资源利用、提供自动扩展能力、支持实时数据处理和快速原型开发等优势,为图像处理等计算密集型应用提供了一个高效、灵活且成本效益高的解决方案。
81 1
|
2月前
|
运维 Serverless 数据处理
Serverless架构通过提供更快的研发交付速度、降低成本、简化运维、优化资源利用、提供自动扩展能力、支持实时数据处理和快速原型开发等优势,为图像处理等计算密集型应用提供了一个高效、灵活且成本效益高的解决方案。
Serverless架构通过提供更快的研发交付速度、降低成本、简化运维、优化资源利用、提供自动扩展能力、支持实时数据处理和快速原型开发等优势,为图像处理等计算密集型应用提供了一个高效、灵活且成本效益高的解决方案。
61 3
|
2月前
|
运维 Java Linux
【运维基础知识】掌握VI编辑器:提升你的Java开发效率
本文详细介绍了VI编辑器的常用命令,包括模式切换、文本编辑、搜索替换及退出操作,帮助Java开发者提高在Linux环境下的编码效率。掌握这些命令,将使你在开发过程中更加得心应手。
38 2
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
44 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
33 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
56 1
|
2月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
48 0