基于Apache doris怎么构建数据中台(四)-数据接入系统

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。

基于Apache doris怎么构建数据中台(四)-数据接入系统


上一次我们讲解了数据资产,元数据管理,血缘关系等,这次我们开始将数据接入,怎么实现快速的数据接入


在开发数据模型时,我们必须有一个统一的平台,能够像流水线一样,把数据一步步加工成数据模型。这其中涉及到数据萃取、数据聚合、作业调度等。


主要是为了实现业务数据的快速接入,零代码实现,数据分析人员只需要通过UI进行简单的配置、提交任务即可完成数据的接入,并能实现对数据接入任务的管理及监控。


image.png

Mysql数据源数据接入


image.png


主要是为了完成针对Mysql数据的业务系统数据接入零代码实现,不需要开发人员接入,提供给数据分析人员使用,目的是为了业务数据快速接入,无需编码


数据接入系统我们通过自研的规则引擎,和接入任务整合,同时自动化完成了数据接入的ETL工作,规则可以通过页面进行可视化配置,这块我会在后面的质量模块介绍


  1. 通过UI界面添加数据接入任务的方式


  1. Mysql数据的采集是通过Canal 采集binlog完成


  1. 在界面上第一步是配置canal实例(canal实例的管理是通过cannal admin),除了kafka topic名称需要手工输入,其他信息尽可能不要让使用人员手工数据


  1. 第二步配置Flink Job任务信息,需要的kafka topic名称来源于上一步,业务表和数仓表的对应关系通过元数据选择方式完成,避免手工输入


  1. 第三步:提交任务,这时候完成canal实例创建,运行,Flink job任务提交运行


  1. 并在列表上监控canal实例及Flink job运行状态的监控


DataX 数据接入


要实现的内容基本和Mysql binlog的同步一样


只不过是Datax是为了实现非mysql数据的数据接入零代码完成


数据API方式数据接入


传统数据API方式的数据接入都需要进行代码开发对接才能完成,初步设想这块通过通用的代码生成器的方式实现针对常用API方式(WebService,RestFul API)零代码接入


图形化数据接入


我们目前支持Kafka,Mysql,datax数据零代码接入


Mysql :通过Canal采集业务数据库的binlog日志,将数据推送到指定的Kafka队列

其他DB:通过datax(全量和增量)定时的将数据,推送到指定的Kafka队列,这里我们对Datax做了改造,让Datax的数据格式和Canal格式一致,


数据接口:后端数据接收服务队数据进行转换(可配置)以后,形成和Canal一致的数据格式,推送到指定的Kafka队列


  1. 后端针对业务db,我们会通过元数据采集系统采集业务系统库表及字段的元数据信息,进入到元数据管理系


  1. 针对没有数据库表的,通过接口直接推送到Kafka的数据,我们在元数据管理里统一提供虚拟库表,通过这个完成数据的统一接入


  1. 在数据接入的时候,我们整合了我们自研的规则引擎,可以实现数据接入和ETL规则自动绑定,通过阿波罗配置系统进行统一下推到Flink Job里执行,


  1. 对于异常数据(不符合规则的),自动推送到指定的Kafka队列,后端有对应的服务进行处理,我们这里是通过Flink实现了一个轻量级的ETL,及数据入Doris的自动化工具


效果如下:

第一步选择要接入的数据表

image.png


第二步选择数据仓相关的信息,这一步会进行表字段映射检查及配置,目前要首先在数仓中建立相应的表,后续会自动化建表

image.png


第三步就是输入Flink Job名称进行提交了,整个就完成了


数据开发控制台


  1. 提供一个数据类似于HUE的SQL数据开发控制台,数据开发人员可以通过这个控制台进行sql的开发调试


  1. 生产环境这里delete,drop等操作要进行审批确认,才能进行,避免误操作,删除数据


  1. 可以将调试好的sql,添加到定时任务调度系统中,这里我们将海豚调度集成到我们数据中台中


零代码入仓的问题解答


很多朋友问到,我们这种方式会不会数据丢失,会不会数据重复,结合我们自己的场景,给我我们的解决方案


数据入到Doris数据仓库对应的表中,这里我们采用的是Flink实时消费KafKa的数据,然后通过Doris的 Stream Load完成


Flink消费Kafka数据我们支持两种方式:


  1. 指定Kafka Topic的Offset进行消费:kafka.offset


  1. 指定时间戳的方式:kafka.timestamp


数据丢失的问题


针对Flink Job失败,重启也是通过这两个参数,


  1. 如果你记录了失败的时间点的Kafka Offset,可以通过配置文件配置这个参数来重启Flink Job就行。这样不会造成数据丢失


  1. 如果没有记录这个offset,可通过指定


consumer.setStartFromTimestamp(timestamp);这个时间就是在配置文件中配置的时间戳 ,这样无论是通过offset还是从指定的时间开始消费Kafka数据,都不会造成数据丢失


数据重复问题


因为我们这个是在数据接入层使用的,数据是进入到数据仓ODS层,在这一层我们采用的是Doris Unique Key模型,就算数据重复入库,会覆盖原先的数据,不会存在数据重复问


下一讲开始讲解数据质量管理




相关实践学习
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
相关文章
|
2月前
|
人工智能 缓存 监控
使用LangChain4j构建Java AI智能体:让大模型学会使用工具
AI智能体是大模型技术的重要演进方向,它使模型能够主动使用工具、与环境交互,以完成复杂任务。本文详细介绍如何在Java应用中,借助LangChain4j框架构建一个具备工具使用能力的AI智能体。我们将创建一个能够进行数学计算和实时信息查询的智能体,涵盖工具定义、智能体组装、记忆管理以及Spring Boot集成等关键步骤,并展示如何通过简单的对话界面与智能体交互。
913 1
|
2月前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
282 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
2月前
|
人工智能 Java API
构建基于Java的AI智能体:使用LangChain4j与Spring AI实现RAG应用
当大模型需要处理私有、实时的数据时,检索增强生成(RAG)技术成为了核心解决方案。本文深入探讨如何在Java生态中构建具备RAG能力的AI智能体。我们将介绍新兴的Spring AI项目与成熟的LangChain4j框架,详细演示如何从零开始构建一个能够查询私有知识库的智能问答系统。内容涵盖文档加载与分块、向量数据库集成、语义检索以及与大模型的最终合成,并提供完整的代码实现,为Java开发者开启构建复杂AI智能体的大门。
1408 58
|
2月前
|
存储 自然语言处理 分布式计算
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
Apache Doris 3.1 正式发布!全面升级半结构化分析,支持 VARIANT 稀疏列与模板化 Schema,提升湖仓一体能力,增强 Iceberg/Paimon 集成,优化存储引擎与查询性能,助力高效数据分析。
476 4
Apache Doris 3.1 正式发布:半结构化分析全面升级,湖仓一体能力再跃新高
存储 人工智能 机器人
99 0
|
2月前
|
人工智能 安全 数据库
构建可扩展的 AI 应用:LangChain 与 MCP 服务的集成模式
本文以LangChain和文件系统服务器为例,详细介绍了MCP的配置、工具创建及调用流程,展现了其“即插即用”的模块化优势,为构建复杂AI应用提供了强大支持。
|
3月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
782 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
3月前
|
人工智能 运维 监控
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
本文基于 Apache Doris 数据运维治理 Agent 展开讨论,如何让 AI 成为 Doris 数据运维工程师和数据治理专家的智能助手,并在某些场景下实现对人工操作的全面替代。这种变革不仅仅是技术层面的进步,更是数据运维治理思维方式的根本性转变:从“被动响应”到“主动预防”,从“人工判断”到“智能决策”,从“孤立处理”到“协同治理”。
616 11
智能运维与数据治理:基于 Apache Doris 的 Data Agent 解决方案
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
330 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
2月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1116 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架

热门文章

最新文章

推荐镜像

更多