基于Apache doris怎么构建数据中台(四)-数据接入系统

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。

基于Apache doris怎么构建数据中台(四)-数据接入系统


上一次我们讲解了数据资产,元数据管理,血缘关系等,这次我们开始将数据接入,怎么实现快速的数据接入


在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。


主要是为了实现业务数据的快速接入,零代码实现,数据分析人员只需要通过UI进行简单的配置、提交任务即可完成数据的接入,并能实现对数据接入任务的管理及监控。


image.png

Mysql数据源数据接入


image.png


主要是为了完成针对Mysql数据的业务系统数据接入零代码实现,不需要开发人员接入,提供给数据分析人员使用,目的是为了业务数据快速接入,无需编码


数据接入系统我们通过自研的规则引擎,和接入任务整合,同时自动化完成了数据接入的ETL工作,规则可以通过页面进行可视化配置,这块我会在后面的质量模块介绍


  1. 通过UI界面添加数据接入任务的方式


  1. Mysql数据的采集是通过Canal 采集binlog完成


  1. 在界面上第一步是配置canal实例(canal实例的管理是通过cannal admin),除了kafka topic名称需要手工输入,其他信息尽可能不要让使用人员手工数据


  1. 第二步配置Flink Job任务信息,需要的kafka topic名称来源于上一步,业务表和数仓表的对应关系通过元数据选择方式完成,避免手工输入


  1. 第三步:提交任务,这时候完成canal实例创建,运行,Flink job任务提交运行


  1. 并在列表上监控canal实例及Flink job运行状态的监控


DataX 数据接入


要实现的内容基本和Mysql binlog的同步一样


只不过是Datax是为了实现非mysql数据的数据接入零代码完成


数据API方式数据接入


传统数据API方式的数据接入都需要进行代码开发对接才能完成,初步设想这块通过通用的代码生成器的方式实现针对常用API方式(WebService,RestFul API)零代码接入


图形化数据接入


我们目前支持Kafka,Mysql,datax数据零代码接入


Mysql :通过Canal采集业务数据库的binlog日志,将数据推送到指定的Kafka队列

其他DB:通过datax(全量和增量)定时的将数据,推送到指定的Kafka队列,这里我们对Datax做了改造,让Datax的数据格式和Canal格式一致,


数据接口:后端数据接收服务队数据进行转换(可配置)以后,形成和Canal一致的数据格式,推送到指定的Kafka队列


  1. 后端针对业务db,我们会通过元数据采集系统采集业务系统库表及字段的元数据信息,进入到元数据管理系


  1. 针对没有数据库表的,通过接口直接推送到Kafka的数据,我们在元数据管理里统一提供虚拟库表,通过这个完成数据的统一接入


  1. 在数据接入的时候,我们整合了我们自研的规则引擎,可以实现数据接入和ETL规则自动绑定,通过阿波罗配置系统进行统一下推到Flink Job里执行,


  1. 对于异常数据(不符合规则的),自动推送到指定的Kafka队列,后端有对应的服务进行处理,我们这里是通过Flink实现了一个轻量级的ETL,及数据入Doris的自动化工具


效果如下:

第一步选择要接入的数据表

image.png


第二步选择数据仓相关的信息,这一步会进行表字段映射检查及配置,目前要首先在数仓中建立相应的表,后续会自动化建表

image.png


第三步就是输入Flink Job名称进行提交了,整个就完成了


数据开发控制台


  1. 提供一个数据类似于HUE的SQL数据开发控制台,数据开发人员可以通过这个控制台进行sql的开发调试


  1. 生产环境这里delete,drop等操作要进行审批确认,才能进行,避免误操作,删除数据


  1. 可以将调试好的sql,添加到定时任务调度系统中,这里我们将海豚调度集成到我们数据中台中


零代码入仓的问题解答


很多朋友问到,我们这种方式会不会数据丢失,会不会数据重复,结合我们自己的场景,给我我们的解决方案


数据入到Doris数据仓库对应的表中,这里我们采用的是Flink实时消费KafKa的数据,然后通过Doris的 Stream Load完成


Flink消费Kafka数据我们支持两种方式:


  1. 指定Kafka Topic的Offset进行消费:kafka.offset


  1. 指定时间戳的方式:kafka.timestamp


数据丢失的问题


针对Flink Job失败,重启也是通过这两个参数,


  1. 如果你记录了失败的时间点的Kafka Offset,可以通过配置文件配置这个参数来重启Flink Job就行。这样不会造成数据丢失


  1. 如果没有记录这个offset,可通过指定


consumer.setStartFromTimestamp(timestamp);这个时间就是在配置文件中配置的时间戳 ,这样无论是通过offset还是从指定的时间开始消费Kafka数据,都不会造成数据丢失


数据重复问题


因为我们这个是在数据接入层使用的,数据是进入到数据仓ODS层,在这一层我们采用的是Doris Unique Key模型,就算数据重复入库,会覆盖原先的数据,不会存在数据重复问


下一讲开始讲解数据质量管理




相关实践学习
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
11天前
|
人工智能 缓存 监控
使用LangChain4j构建Java AI智能体:让大模型学会使用工具
AI智能体是大模型技术的重要演进方向,它使模型能够主动使用工具、与环境交互,以完成复杂任务。本文详细介绍如何在Java应用中,借助LangChain4j框架构建一个具备工具使用能力的AI智能体。我们将创建一个能够进行数学计算和实时信息查询的智能体,涵盖工具定义、智能体组装、记忆管理以及Spring Boot集成等关键步骤,并展示如何通过简单的对话界面与智能体交互。
241 1
|
14天前
|
人工智能 Java API
构建基于Java的AI智能体:使用LangChain4j与Spring AI实现RAG应用
当大模型需要处理私有、实时的数据时,检索增强生成(RAG)技术成为了核心解决方案。本文深入探讨如何在Java生态中构建具备RAG能力的AI智能体。我们将介绍新兴的Spring AI项目与成熟的LangChain4j框架,详细演示如何从零开始构建一个能够查询私有知识库的智能问答系统。内容涵盖文档加载与分块、向量数据库集成、语义检索以及与大模型的最终合成,并提供完整的代码实现,为Java开发者开启构建复杂AI智能体的大门。
438 58
存储 人工智能 机器人
35 0
|
1月前
|
人工智能 安全 数据库
构建可扩展的 AI 应用:LangChain 与 MCP 服务的集成模式
本文以LangChain和文件系统服务器为例,详细介绍了MCP的配置、工具创建及调用流程,展现了其“即插即用”的模块化优势,为构建复杂AI应用提供了强大支持。
|
2月前
|
机器学习/深度学习 算法 大数据
构建数据中台,为什么“湖仓一体”成了大厂标配?
在大数据时代,数据湖与数据仓库各具优势,但单一架构难以应对复杂业务需求。湖仓一体通过融合数据湖的灵活性与数据仓的规范性,实现数据分层治理、统一调度,既能承载海量多源数据,又能支撑高效分析决策,成为企业构建数据中台、推动智能化转型的关键路径。
|
3月前
|
数据采集 存储 分布式计算
一文读懂数据中台架构,高效构建企业数据价值
在数字化时代,企业面临数据分散、难以统一管理的问题。数据中台架构通过整合、清洗和管理数据,打破信息孤岛,提升决策效率。本文详解其核心组成、搭建步骤及常见挑战,助力企业高效用数。
1092 24
|
5月前
|
SQL 存储 OLAP
数据外置提速革命:轻量级开源SPL如何用文件存储实现MPP级性能?
传统交易型数据库在分析计算中常遇性能瓶颈,将数据迁至OLAP数据仓库虽可缓解,但成本高、架构复杂。SPL通过轻量级列存文件存储历史数据,提供强大计算能力,大幅简化架构并提升性能。它优化了列式存储、数据压缩与多线程并行处理,在常规及复杂计算场景中均表现优异,甚至单机性能超越集群。实际案例中,SPL在250亿行数据的时空碰撞问题上,仅用6分钟完成ClickHouse集群30分钟的任务。
数据外置提速革命:轻量级开源SPL如何用文件存储实现MPP级性能?
|
5月前
|
SQL 机器学习/深度学习 监控
构建数据中枢:数据中台指标体系如何赋能企业运营
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
|
5月前
|
存储 机器学习/深度学习 人工智能
使用 LangChain + Higress + Elasticsearch 构建 RAG 应用
本文介绍了如何利用LangChain、Higress和Elasticsearch快速构建RAG(检索增强生成)应用,实现企业知识的智能检索与问答。首先通过LangChain解析Markdown文档并写入Elasticsearch,接着部署Higress AI网关并配置ai-search插件以整合私有知识库与在线搜索功能。最后,通过实际案例展示了RAG查询流程及结果更新机制,确保内容准确性和时效性。文章还提供了相关参考资料以便进一步学习。
561 38

热门文章

最新文章

推荐镜像

更多