基于Apache doris怎么构建数据中台(四)-数据接入系统

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。

基于Apache doris怎么构建数据中台(四)-数据接入系统


上一次我们讲解了数据资产,元数据管理,血缘关系等,这次我们开始将数据接入,怎么实现快速的数据接入


在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。


主要是为了实现业务数据的快速接入,零代码实现,数据分析人员只需要通过UI进行简单的配置、提交任务即可完成数据的接入,并能实现对数据接入任务的管理及监控。


image.png

Mysql数据源数据接入


image.png


主要是为了完成针对Mysql数据的业务系统数据接入零代码实现,不需要开发人员接入,提供给数据分析人员使用,目的是为了业务数据快速接入,无需编码


数据接入系统我们通过自研的规则引擎,和接入任务整合,同时自动化完成了数据接入的ETL工作,规则可以通过页面进行可视化配置,这块我会在后面的质量模块介绍


  1. 通过UI界面添加数据接入任务的方式


  1. Mysql数据的采集是通过Canal 采集binlog完成


  1. 在界面上第一步是配置canal实例(canal实例的管理是通过cannal admin),除了kafka topic名称需要手工输入,其他信息尽可能不要让使用人员手工数据


  1. 第二步配置Flink Job任务信息,需要的kafka topic名称来源于上一步,业务表和数仓表的对应关系通过元数据选择方式完成,避免手工输入


  1. 第三步:提交任务,这时候完成canal实例创建,运行,Flink job任务提交运行


  1. 并在列表上监控canal实例及Flink job运行状态的监控


DataX 数据接入


要实现的内容基本和Mysql binlog的同步一样


只不过是Datax是为了实现非mysql数据的数据接入零代码完成


数据API方式数据接入


传统数据API方式的数据接入都需要进行代码开发对接才能完成,初步设想这块通过通用的代码生成器的方式实现针对常用API方式(WebService,RestFul API)零代码接入


图形化数据接入


我们目前支持Kafka,Mysql,datax数据零代码接入


Mysql :通过Canal采集业务数据库的binlog日志,将数据推送到指定的Kafka队列

其他DB:通过datax(全量和增量)定时的将数据,推送到指定的Kafka队列,这里我们对Datax做了改造,让Datax的数据格式和Canal格式一致,


数据接口:后端数据接收服务队数据进行转换(可配置)以后,形成和Canal一致的数据格式,推送到指定的Kafka队列


  1. 后端针对业务db,我们会通过元数据采集系统采集业务系统库表及字段的元数据信息,进入到元数据管理系


  1. 针对没有数据库表的,通过接口直接推送到Kafka的数据,我们在元数据管理里统一提供虚拟库表,通过这个完成数据的统一接入


  1. 在数据接入的时候,我们整合了我们自研的规则引擎,可以实现数据接入和ETL规则自动绑定,通过阿波罗配置系统进行统一下推到Flink Job里执行,


  1. 对于异常数据(不符合规则的),自动推送到指定的Kafka队列,后端有对应的服务进行处理,我们这里是通过Flink实现了一个轻量级的ETL,及数据入Doris的自动化工具


效果如下:

第一步选择要接入的数据表

image.png


第二步选择数据仓相关的信息,这一步会进行表字段映射检查及配置,目前要首先在数仓中建立相应的表,后续会自动化建表

image.png


第三步就是输入Flink Job名称进行提交了,整个就完成了


数据开发控制台


  1. 提供一个数据类似于HUE的SQL数据开发控制台,数据开发人员可以通过这个控制台进行sql的开发调试


  1. 生产环境这里delete,drop等操作要进行审批确认,才能进行,避免误操作,删除数据


  1. 可以将调试好的sql,添加到定时任务调度系统中,这里我们将海豚调度集成到我们数据中台中


零代码入仓的问题解答


很多朋友问到,我们这种方式会不会数据丢失,会不会数据重复,结合我们自己的场景,给我我们的解决方案


数据入到Doris数据仓库对应的表中,这里我们采用的是Flink实时消费KafKa的数据,然后通过Doris的 Stream Load完成


Flink消费Kafka数据我们支持两种方式:


  1. 指定Kafka Topic的Offset进行消费:kafka.offset


  1. 指定时间戳的方式:kafka.timestamp


数据丢失的问题


针对Flink Job失败,重启也是通过这两个参数,


  1. 如果你记录了失败的时间点的Kafka Offset,可以通过配置文件配置这个参数来重启Flink Job就行。这样不会造成数据丢失


  1. 如果没有记录这个offset,可通过指定


consumer.setStartFromTimestamp(timestamp);这个时间就是在配置文件中配置的时间戳 ,这样无论是通过offset还是从指定的时间开始消费Kafka数据,都不会造成数据丢失


数据重复问题


因为我们这个是在数据接入层使用的,数据是进入到数据仓ODS层,在这一层我们采用的是Doris Unique Key模型,就算数据重复入库,会覆盖原先的数据,不会存在数据重复问


下一讲开始讲解数据质量管理




相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
3月前
|
前端开发 机器人 API
前端大模型入门(一):用 js+langchain 构建基于 LLM 的应用
本文介绍了大语言模型(LLM)的HTTP API流式调用机制及其在前端的实现方法。通过流式调用,服务器可以逐步发送生成的文本内容,前端则实时处理并展示这些数据块,从而提升用户体验和实时性。文章详细讲解了如何使用`fetch`发起流式请求、处理响应流数据、逐步更新界面、处理中断和错误,以及优化用户交互。流式调用特别适用于聊天机器人、搜索建议等应用场景,能够显著减少用户的等待时间,增强交互性。
801 2
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
148 7
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
113 5
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
125 4
|
2月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
139 61
|
1月前
|
弹性计算 自然语言处理 数据库
通过阿里云Milvus和LangChain快速构建LLM问答系统
本文介绍如何通过整合阿里云Milvus、阿里云DashScope Embedding模型与阿里云PAI(EAS)模型服务,构建一个由LLM(大型语言模型)驱动的问题解答应用,并着重演示了如何搭建基于这些技术的RAG对话系统。
|
2月前
|
JSON 数据可视化 NoSQL
基于LLM Graph Transformer的知识图谱构建技术研究:LangChain框架下转换机制实践
本文介绍了LangChain的LLM Graph Transformer框架,探讨了文本到图谱转换的双模式实现机制。基于工具的模式利用结构化输出和函数调用,简化了提示工程并支持属性提取;基于提示的模式则为不支持工具调用的模型提供了备选方案。通过精确定义图谱模式(包括节点类型、关系类型及其约束),显著提升了提取结果的一致性和可靠性。LLM Graph Transformer为非结构化数据的结构化表示提供了可靠的技术方案,支持RAG应用和复杂查询处理。
194 2
基于LLM Graph Transformer的知识图谱构建技术研究:LangChain框架下转换机制实践
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
72 1
|
2月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
93 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka

推荐镜像

更多