【大数据开发运维解决方案】Kylin消费Kafka数据流式构建cube

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,这里主要介绍kylin如何消费数据创建流式cube。

一、源端做DML操作

1.源端表ztvoucher目前没有数据,现在做insert,并查询:


insert into ztvoucher (MANDT, GJAHR, BUKRS, BELNR, BUZEI, MONAT, BUDAT, HKONT, DMBTR, ZZ0014)
values ('666', '2222', '3432', '2200001414', '001', '01', '20190101', '9101000000', 100.00, '101');

1 row created.

SQL> commit;

Commit complete.

SQL> alter system switch logfile;

System altered.
SQL>  select * from ztvoucher;
MANDT    GJAHR    BUKRS    BELNR    BUZEI    MONAT    BUDAT    HKONT    DMBTR    ZZ0014
666    2222    3432    2200001414    001    01    20190101    9101000000    100.00    101

2.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现源端做的insert已经在 topic:ZTVOUCHER_INS有了。
3.源端做update操作:

update ztvoucher set dmbtr=50 where mandt='666';
commit;
alter system switch logfile;

4.去kafka查看:
先看ZTOVOUCHER_INS 内容:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_INS
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:33.799000","pos":"00000000080000012613","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了之前的insert操作,现在update之后的数据也进来了。
再看ZTVOUCHER_DEL:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

发现DEL的topic中也存入了update之前的数据。
5.源端做delete操作:

delete from ztvoucher where mandt='666';
commit;
alter system switch logfile;

6.去kafka查看:

[root@hadoop kafka]# ./console.sh 
input topic:ZTVOUCHER_DEL
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
.{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD
ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:26:26.353705","current_ts":"2019-05-22T16:27:15.049000","pos":"00000000080000012857","tokens":{"TKN-OP-TYPE":"DELETE"},"a
fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

发现除了上面update之前的数据以外,还写入了刚做的delete操作的数据。
好了,现在数据都组织好了,现在去流式创建cube。

二、流式构建cube

流式构建cube官方连接(本人用的2.4版本):
http://kylin.apache.org/cn/docs24/tutorial/cube_streaming.html
流式构建cube需要一个类型为timestamp的时间列字段用来标识消息的时间,从前面两个topic中的json数据可以看到,op_ts字段满足这个要求。
1、用j'son数据定义一张表
先来构建

相关文章
|
2天前
|
消息中间件 运维 Kubernetes
构建高效自动化运维体系:Ansible与Kubernetes的融合实践
【5月更文挑战第9天】随着云计算和微服务架构的普及,自动化运维成为确保系统可靠性和效率的关键。本文将深入探讨如何通过Ansible和Kubernetes的集成,构建一个强大的自动化运维体系。我们将分析Ansible的配置管理功能以及Kubernetes容器编排的优势,并展示如何将二者结合,以实现持续部署、快速扩展和高效管理现代云原生应用。文章还将涵盖实际案例,帮助读者理解在真实环境下如何利用这些工具优化运维流程。
|
2天前
|
运维 Kubernetes Cloud Native
构建高效云原生运维体系:Kubernetes最佳实践
【5月更文挑战第9天】 在动态和快速演变的云计算环境中,高效的运维是确保应用稳定性与性能的关键。本文将深入探讨在Kubernetes环境下,如何通过一系列最佳实践来构建一个高效且响应灵敏的云原生运维体系。文章不仅涵盖了容器化技术的选择与优化、自动化部署、持续集成/持续交付(CI/CD)流程的整合,还讨论了监控、日志管理以及灾难恢复策略的重要性。这些实践旨在帮助运维团队有效应对微服务架构下的复杂性,确保系统可靠性及业务的连续性。
|
3天前
|
运维 Kubernetes 监控
构建高效自动化运维体系:基于Ansible的策略与实践
【5月更文挑战第8天】 在当今IT基础设施管理领域,自动化不再是一个选择,而是必要的步骤。随着复杂性的增加和变更的频繁性,自动化工具如Ansible提供了一种高效、可靠的解决方案来简化配置管理和多节点部署。本文将探讨如何利用Ansible构建一个高效的自动化运维体系,涵盖其核心原理、策略设计以及在实际环境中的应用。我们将分析Ansible与其他自动化工具的不同之处,并提供一些最佳实践,以帮助运维专家提升他们的工作效率和系统稳定性。
|
4天前
|
运维 负载均衡 持续交付
构建高效自动化运维体系:Ansible与Docker的协同实践
【5月更文挑战第7天】 在当今快速迭代的软件开发环境中,自动化运维成为确保部署效率和一致性的关键。本文将探讨如何通过结合Ansible和Docker技术,构建一个高效的自动化运维体系,旨在提升运维效率,减少人为错误,并实现持续集成与持续部署(CI/CD)的流程自动化。文章详细阐述了Ansible的配置管理机制、Docker容器化的优势,以及二者在实际运维场景中的结合应用,为读者提供一套可行的自动化运维解决方案。
|
5天前
|
运维 Kubernetes Devops
构建高效自动化运维体系:DevOps与容器化技术融合实践
【5月更文挑战第6天】随着企业IT架构的复杂化以及快速迭代的市场需求,传统的运维模式已难以满足高效率和高质量的交付标准。本文将探讨如何通过结合DevOps理念和容器化技术来构建一个高效的自动化运维体系,旨在实现持续集成、持续部署和自动化管理,提升系统的可靠性、可维护性和敏捷性。
|
9天前
|
存储 运维 Kubernetes
构建高效自动化运维体系:Ansible与Kubernetes的协同实践
【5月更文挑战第2天】随着云计算和微服务架构的兴起,自动化运维成为保障系统稳定性与效率的关键。本文将深入探讨如何利用Ansible作为配置管理工具,结合Kubernetes容器编排能力,共同打造一个高效、可靠的自动化运维体系。通过剖析二者的整合策略及具体操作步骤,为读者提供一套提升运维效率、降低人为错误的实用解决方案。
|
10天前
|
机器学习/深度学习 运维 持续交付
构建高效自动化运维体系:Ansible与Docker的完美结合构建高效机器学习模型的五大技巧
【4月更文挑战第30天】 在当今快速发展的云计算和微服务架构时代,自动化运维已成为维持系统稳定性和提高效率的关键。本文将探讨如何通过结合Ansible和Docker技术构建一个高效的自动化运维体系。文章不仅介绍了Ansible与Docker的基本原理和优势,还详细阐述了如何整合这两种技术以简化部署流程、加强版本控制,并提高整体运维效率。通过案例分析,我们将展示这一组合在实际环境中的应用效果,以及它如何帮助企业实现持续集成和持续部署(CI/CD)的目标。 【4月更文挑战第30天】 在数据驱动的时代,构建一个高效的机器学习模型是获取洞察力和预测未来趋势的关键步骤。本文将分享五种实用的技巧,帮助数
|
10天前
|
敏捷开发 运维 测试技术
构建高效自动化运维体系:基于容器技术的持续集成与持续部署实践
【4月更文挑战第30天】在数字化转型的浪潮中,企业对软件交付速度和质量的要求日益提高。自动化运维作为提升效率、确保稳定性的关键手段,其重要性不言而喻。本文将探讨如何利用容器技术构建一个高效的自动化运维体系,实现从代码提交到产品上线的持续集成(CI)与持续部署(CD)。通过分析现代容器技术与传统虚拟化的差异,阐述容器化带来的轻量化、快速部署及易于管理的优势,并结合实例讲解如何在实际环境中搭建起一套完善的CI/CD流程。
|
10天前
|
运维 Cloud Native 持续交付
构建高效弹性的云原生运维体系
【4月更文挑战第30天】 随着云计算的广泛应用和微服务架构的普及,传统的运维模式已难以满足快速迭代和高可用性的需求。本文旨在探讨如何构建一个高效而弹性的云原生运维体系,以应对动态变化的服务需求。通过引入自动化工具、容器化技术、微服务治理及持续集成/持续部署(CI/CD)流程等现代运维实践,实现系统的稳定性与敏捷性兼备。文中不仅阐述了相关技术要点,还提供了具体的实施步骤和策略,为运维人员在转型过程中提供参考。
|
10天前
|
人工智能 运维 自然语言处理
构建高效自动化运维体系:DevOps与AI的融合之路
【4月更文挑战第30天】在数字化转型的大潮中,企业IT基础设施的复杂性日益增加,传统的运维模式已难以满足快速变化的业务需求。本文深入探讨了如何通过融合DevOps和人工智能(AI)技术构建一个高效、自动化的运维体系。文章首先概述了现代运维面临的挑战,接着分析了DevOps的核心理念以及AI如何在故障预测、智能决策支持等方面提升运维效率。最后,本文提出了一个具体的实施框架,并讨论了在推进过程中可能遇到的挑战及应对策略。