实时数仓入门训练营:Hologres 数据导入/导出实践

简介: 《实时数仓入门训练营》由阿里云研究员王峰、阿里云高级产品专家刘一鸣等实时计算Flink版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打磨课程内容,直击当下同学们所遇到的痛点问题。由浅入深全方位解析实时数仓的架构、场景、以及实操应用,7 门精品课程帮助你 5 天时间从小白成长为大牛!

本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》
视频链接:https://developer.aliyun.com/learning/course/807/detail/13891

内容简要:
一、Hologres生态介绍
二、Hologres实时读写接口介绍
三、Hologres实时读写场景介绍
四、Demo演示
五、常见问题及未来展望

Hologres生态介绍

(一)Hologres生态

图片 1.png

Hologres是一款兼容PostgreSQL协议的实时交互式分析产品,也已经打通了大数据生态。以最常见的几个开源组件来说,如Apache Flink、Spark、Hive、Kafka等,Hologres都已经有了相关的Connector实现并进行了开源。

对于实时链路,用户依托Flink或者Spark,就可以将上游的比如埋点或业务数据等,以非常高的性能以及毫秒级的延迟导入Hologres。对于离线链路,Hologres也支持把外部系统的数据以非常简便的操作导入,反过来也支持再将数据备份回外部系统,比如阿里云的MaxComputer、OSS等。

当数据导入Hologres之后,因为Hologres本身兼容PostgreSQL协议,所以能使用各种现成的查询工具,无缝连接Hologres进行数据的展示、查询等。

(二)Dataworks数据集成支持输入

除了刚才提到的大数据场景之外,使用阿里云的Dataworks数据集成功能,我们还能将用户存储在传统数据库中的数据导入Hologres,实现方便高效的数据库整库实时镜像。

图片 2.png

如上图所示,当下Dataworks数据集成支持将MySQL的Binlog,SQLServer的CDC,Oracle的CDC实时镜像同步至Hologres。此外,Dataworks也支持将Kafka,还有阿里云Datahub的数据同步至Hologres。

图片 3.png

另外值得一提的是,Datahub这个产品自身也提供了直接将数据实时同步到Hologres的功能,这个功能叫Datahub Connector。使用这个功能用户就无需经过Flink或者其他组件,可以直接将数据导入到Hologres,对于无需ETL的数据同步是一个比较快捷的方式。

Hologres实时读写接口介绍

图片 4.png

Hologres实时读写实现原理

上图为整个Hologres实时读写实现原理架构图。

从上往下看,最上游是应用端,也就是会读写Hologres的各种客户端,比如说数据集成,Apache Flink、Spark等等。这些客户端通常会使用SQL接口,将读写数据的请求发送给Hologres,这些请求会经过一个负载均衡服务器,然后这些请求就会路由分发到一个叫做Frontend的节点。一个Hologres实例通常有多个Frontend节点,这样就可以支持非常高的QPS请求。Frontend节点主要是负责SQL的Parse、优化等功能。

经过一系列的处理之后,Frontend就会将用户的SQL请求转换成一个物理执行计划,然后这些物理执行计划就会被分发到后端的一个执行节点,执行真正的物理读写请求,最终写入的数据会持久化至分布式文件系统,比如阿里的Pangu系统或者开源的HDFS。

这里要特别强调的是,正常的SQL解析,然后通过Query的优化器优化生成最优执行计划,通常这部分的链路开销是比较大的,对于高QPS的读写场景,这往往会成为一个性能的瓶颈。

所以对于一些常见的SQL场景,这里我们列了几个 SQL,如下所示。

图片 -5.png

Fixed Plan

比如Insert into table values (),就是简单地插入一行或者几行。还有Insert into table values () on conflict do update,就是对数据进行几行的更新。Select * from table where pk = xxx和Delete from table where pk = xxx是根据主键去进行数据的查找或者删除。

对于这些常见的SQL,Hologres的Frontend做了一定的短路优化,略去了很多不必要的优化逻辑,直接生成最优的一个执行计划,并发送给后端的执行节点,这样就能提升整体的请求吞吐。

下面我们看一下,当物理执行计划发送到后端之后是如何处理的。

图片 5.png
图片 6.png

Hologres后端的整体存储引擎是基于Log Structured Merge Tree(LSM)来实现的,这里LSM能够把随机写变成顺序写,大大提升了数据写入的吞吐。

写请求首先会被写到Write Ahead Log,也就是传统的WAL文件中,一旦写入成功了,我们就认为这条记录永久写入成功了。之后,我们会把WAL日志Apply到Mem Table里面,Apply完成后,数据就对外可见了,可以进行查询,这中间的延迟通常在毫秒以内。

当Mem Table写满了之后,我们会有一个异步的线程,将Mem Table刷盘持久化,整体流程是一个比较标准化的LSM实现。

这里有别于其他LSM实现的存储系统,比如HBase,Hologres后端采用了全异步的实现,基于协程省去了操作系统内核线程开销,大大提升了系统CPU的利用率,使得Hologres的读写性能非常优异。

我们再回过头来看一下上面应用端的数据写入接口,现在Flink、Spark和Dataworks读写Hologres其实都使用了一个叫做Holo-Client的SDK。

图片 7.png

Holo-Client基于Jdbc实现,对读写holo最佳实践的封装,可以减轻数据集成开发工作量。
我们也对一些特定场景的SQL做了一定的优化,例如:

  • 数据写入
    1)攒批,基于jdbc reWriteBatchedInserts的实现原理;
    2)数据合并,相同主键的INSERT/DELETE在一个批次中会合并减少请求量;
    3)自动提交,支持基于批行数、批字节大小和最长提交间隔自动提交。
  • 数据点查
    1)提供异步点查接口;
    2)QPS高时自动转入攒批模式。
  • 数据Copy
    提供并发CopyIn的简易接口。
  • 异常处理
    对holo返回异常归类,正确在holo升级、扩容等场景下重试等待实例恢复。

我们非常推荐用户之后如果有读写holo的场景,就使用Holo-client这个SDK。

Hologres实时读写场景介绍

介绍完Hologres的读写接口的实现原理之后,接下来看一下基于Hologres读写接口能实现的几种常见的读写场景。

(一)实时写入场景

图片 8.png

图片 -9.png

第一种是最简单的实时写入场景,如上所示。

这里我们使用了一个Blink SQL的实例,其实就是生成一个随机数据源,然后将数据导入至Hologres。对于实时写入的场景,Hologres支持行存和列存这两种格式,还支持根据主键进行去重,这是相较于很多其他 OLAP系统的一个非常大的亮点。

另外,Hologres的实时写入还支持整行数据更新或者数据的局部更新。对于性能而言,Hologres导入即可见,拥有非常低的延迟,通常延迟在毫秒以内。经过我们自己的测试,以TPCH PartSupp表为例,我们后端单Core能达到2万左右的RPS,而且该性能可以随着实例的资源进行线性扩展。

(二)实时宽表Merge场景

然后接下来我们介绍一下实时宽表Merge的场景,这里其实是使用了holo的整行局部更新的功能。

图片 9.png

以上图为例,比如用户想将多个数据源的数据合并成一张宽表写入至Hologres。我们希望整张表最终有A|B|C|D|E|F六个列,然后有一部分数据,比如说A|B|C|D这四个列是在一个数据源里面,然后A|B|E|F是在另外一个数据源里,我们希望把这两个数据源的数据合并写入至Hologres的一张宽表。

常见的一种实现是我们会使用Flink的Join功能,就是使用两个流同时消费上述数据源,然后在Flink里面进行两个流的Join,进行数据的打宽,最后写入到Hologres里面。

但是这种场景的一个问题是Flink的Join开销通常非常大,因为它需要缓存非常多的状态,这对于整个作业的维护是一个非常大的开销。

下面我们来看一下Hologres是如何解决这个问题的。

图片 10.png

上文提到Hologres自身支持整合数据的局部更新功能,如上图所示,我们可以直接用两个流来直接写Hologres,而无需进行再做Flink内的Join。一个流比如A|B|C|D可以直接写Hologres,另外一个流A|B|E|F也可以直接写Hologres。因为这两个流的数据有相同的主键,所以当两行数据用相同的主键写入到Hologres的时候,Hologres内部会进行一个Merge,最终达到数据打宽的功能,省去了用户自己去写Flink Join,以及维护这么一个复杂作业的问题。

(三)实时维表Join场景

介绍完实时写入Hologres场景之后,下面我们来看一下实时读的场景。

实时读通常分为两种,第一种就是我们常见的Flink的实时维表Join场景,维表Join就是一个点查的实现。

图片 11.png

图片 12.png

这里Hologres的行存表通常可以替换HBase来实现Flink的维表功能,提供非常高的吞吐以及非常低的延迟。

(四)Hologres Binlog场景

实时读的第二种场景是Hologres Binlog场景。Binlog和MySQL的Binlog是一个类似的概念,使用Hologres Binlog我们就能实时消费Hologres单表的Change log,可以对每行数据的更新进行追踪记录。

现在实时计算 Flink 版的Hologres CDC Source,能实现表的实时镜像同步,甚至使用Flink + Hologres,能够实现ODS到DWD表的实时 ETL。

Hologres的Binlog功能默认是不开启的。

图片 13.png

上图是一个例子,列出了如何使用Hologres的Binlog,这里是一个建表的DDL。

可以看到我们有额外两个表的属性,一个叫做binlog.level,设置为replica,代表这张表会开启Hologres的Binlog功能,’binlog.ttl’就是代表Binlog数据的一个生命周期,下面我们使用Hologres Binlog看看能达到一个什么样的效果。

图片 14.png

由于Hologres是一个强Schema的数仓,所以我们甚至能够用SQL接口来查询Hologres的Binlog。如上所示,这里我们通过提供几个隐藏列:hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,就能查询到Hologres的Binlog。

这里hg_binlog_lsn就是代表了每条Binlog生成的LSN序列号,然后hg_binlog_event_type是代表了Binlog的消息类型,它是代表Delete还是Insert的,或者是Before Update,或者是说After Update。这里的hg_binlog_timestamp_us代表这条Binlog生成的时间。

有了这几个隐藏列之后,用户就可以非常方便地使用SQL来进行Binlog的查询,进行数据的Debug。

Demo演示

(一)实时计算 Flink 版实时读写Hologres Demo

介绍完Hologres的读写场景之后,我们通过实际操作的Demo来看一下如何使用Flink来实时读写Hologres。

图片 15.png

如上图所示,首先,我们这里有两张Hologres的表,这两张表都会开启Binlog。我们假设这两者会有实时的写入,然后我们会写另外一个Flink任务去实施消费这两张表的Binlog,进行这两张表Binlog的Merge,甚至进行一定的group by计算操作,最终将这两张表的数据同步写入之后关Hologres的另一张结果表。

接下来进入演示,首先我们看一下Hologres建表的DDL,如下所示。

图片 -16.png


a表的建表DDL

图片 -17.png

b表的建表DDL

这两张表有两个相同的字段,分别叫id和create_time,之后会进行一个数据的聚合。每张表还会有一个不同的值,value_from_a是表a所特有的,value_from_b是表b所特有的。

图片 -18.png

结构表

最后我们会有一张结构表,这张结果表有a和b两张表共有的两个列,分别从a和b两张表得到了另外两个列a和b,我们希望将a和b的数据进行一个实时聚合,写入到Sink表里面。

我们看一下整个Flink的SQL。

图片 16.png

这里首先是分别声明了两张Hologres的source表,需要实时的消费Hologres两张表的Binlog。

需要注意的是,我们这里需要开启‘binlog’=‘true’这个参数来让Flink进行消费Hologres的Binlog以及开启CDC模式。

图片 16.png

结果表

然后我们来看一下结果表的声明,如上所示。

在这里需要注意的是,我们需要设置一个‘ignoreDelete’=‘false’,这样防止我们会忽略包括Delete或者beforeUpdate这种类型的数据,导致数据的不一致。

图片 18.png

我们看一下整个Flink计算逻辑的SQL,如上所示。

这里的逻辑其实比较简单,其实只是将两张表的结果union起来,然后进行一个group by id和create_time进行实时的sum,写入到Hologres的结果表。

这里的作业上线之后,我们可以直接启动运行该作业。

在启动的过程中,我们可以看一下当前Hologres这几张表的状态。

图片 19.png

可以看到当前Hologres这几张表都是一张空表,我们会对这几张表进行更新,然后看一下数据的同步的过程。

图片 20.png

首先往a表插入一条数据,可以看到a表的数据已经实时同步到结果表中。
接下来对b表数据进行一个更新。

图片 21.png

可以看到这两个流的数据已经实时更新到结果表,并进行了准确的数据聚合。

接下来我们再更新a表。

图片 22.png

可以看到对于源表a的实时更新,已经正确地反映到了结果表当中,Flink非常正确地计算出了两个流的结果。

我们甚至可以看一下这张sink表的binlog数据,因为我们这张结构表也同样开启了binlog的功能,如下所示。

图片 23.png

可以看到,我们拿到这张表所有的变更记录,和我们预期的效果保持了一致。

以上就是Flink实时读写Hologres的Demo。

(二)Dataworks实时同步Hologres Demo

接下来我们看一下使用Dataworks将PolarDB的数据实时同步到Hologres的Demo演示。

首先我们进入到数据集成,数据同步要进行一个数据源的添加,点击数据源添加。

图片 24.png

接着新增数据源,选择Hologres,填充完所有的信息之后,我们就可以进行一个数据的添加。

图片 25.png

新增数据源

接下来进行数据同步的演示。

图片 26.png

如上所示,首先这里已经有了一个 PolarDB的数据库,以及预先创建好了一张user_details表,可以看到这里已经有三条查询结果记录,之后我们希望把这张表的数据同步到Hologres当中。

然后我们返回到数据集成,点击一键实时同步至Hologres,如下所示。

图片 27.png

在基本配置中,数据源选择预先创建好的数据源PolarDB,之后选择需要同步的表user_details,然后点击下一步。

图片 28.png

图片 29.png

之后,我们会需要选择目标Hologres的数据源,添加后进行刷新,可以刷新出user_details这张表,然后可以配置这张表是否需要自动建表,还是用已经有的表,这里选择我们自动建表,然后点击下一步。

图片 30.png

在DDL消息处理规则中,我们可以配置各种各样的策略处理,根据需求配置好规则后选择下一步。

图片 31.png

接下来进行运行资源配置。对Dataworks数据进行实时同步,我们通常需要一个独享资源组,在这里我们已经完成了独享资源组的购买,然后选择各个同步功能所需要的资源组,完成配置并点击立即执行,等待作业的启动。

图片 32.png

可以看到PolarDB的数据已经实时同步到Hologres这张结构表当中。

图片 33.png

接下来可以对这张表再进行一定的更新,我们往这张user_details表里面重新插入一条1004数据,数据插入成功后可以看一下Hologres结构表。

图片 34.png

从后台可以看到,1004这条数据已经实时同步至Hologres,如下所示。

图片 35.png

通过上方的演示可以看到,使用Dataworks实时同步Hologres功能,我们可以非常便捷地将数据库中的数据同步到Hologres。

常见问题及未来展望

(一)实时计算 Flink 版 Hologres Connector常见问题

经过上述提到的关于Hologres应用场景以及几个Demo,接下来看一下在使用过程中通常会遇到什么问题。

Q: 作业启动失败,无法连接Hologres。
A: Hologres实例需要与Flink集群在同一Region,且使用VPC Endpoint。

Q: 实时写入结果表数据不符合预期。
A: 通常是由回撤引起,需要正确设置ignoreDelete参数。

Q: 实时写入性能慢。
A: 当前高QPS场景的列存表局部更新开销较大,建议换成整行更新或者行存写入。

Q: 维表查询性能较差,且Hologres实例CPU负载高。
A: 通常是由于使用了列存表作为维表,建议切换至行存表。

Q: 实时消费Binlog报错。
A: 通常是由于表没有开启Binlog导致,需要重建表。

(二)未来展望

接下来看一下整个Hologres在实时读写链路上的一个未来的规划和展望。

- Flink One-To-Many 维表Join

这是一个即将上线的功能,我们会在Flink实现一对多的维表Join功能,就不需要强制使用Hologres表的主键进行维表查询。
但需要注意的是,这种场景下面通常性能不会特别好,因为难以避免的查询会导致整表的扫描,使得延迟比较高,所以还是建议用户尽可能使用一对一的点查场景。

- 基于JDBC实时消费Hologres Binlog

当前Hologres Binlog实现是使用了内置接口,暂时没有对外透出。之后,我们会实现基于JDBC的接口实现让用户实时消费Hologres Binlog。

- Dataworks数据集成实时消费Hologres Binlog

当前数据集成并不支持消费Hologres数据,之后我们会支持使用Dataworks的数据集成,能够实时消费Hologres Binlog,这样就能将Hologres的数据实时镜像地同步到其他的数据库当中。

- 无连接限制的SQL读写

由于PostgreSQL的模型限制,当前Hologres整个实例的连接数有一定的限制,之后我们会提供一个无连接限制的SQL读写的功能。

上文提到Hologres的一些 connector和Holo-Client,都已经开源到Github上面,有需求的用户可以访问下方链接进行使用。

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻量实时的高性能OLAP分析
本教程基于GitHub Archive公开数据集,通过DataWorks将GitHub中的项⽬、行为等20多种事件类型数据实时采集至Hologres进行分析,同时使用DataV内置模板,快速搭建实时可视化数据大屏,从开发者、项⽬、编程语⾔等多个维度了解GitHub实时数据变化情况。
相关文章
|
12月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1919 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
10月前
|
运维 算法 机器人
阿里云AnalyticDB具身智能方案:破解机器人仿真数据、算力与运维之困
本文将介绍阿里云瑶池旗下的云原生数据仓库AnalyticDB MySQL推出的全托管云上仿真解决方案,方案采用云原生架构,为开发者提供从开发环境、仿真计算到数据管理的全链路支持。
|
10月前
|
SQL 分布式计算 DataWorks
破界·融合·进化:解码DataWorks与Hologres的湖仓一体实践
基于阿里云DataWorks与实时数仓Hologres,提供统一的大数据开发治理平台与全链路实时分析能力。DataWorks支持多行业数据集成与管理,Hologres实现海量数据的实时写入与高性能查询分析,二者深度融合,助力企业构建高效、实时的数据驱动决策体系,加速数字化升级。
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1258 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
存储 SQL 运维
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
661 0
中国联通网络资源湖仓一体应用实践
|
存储 消息中间件 分布式计算
Hologres实时数仓在B站游戏的建设与实践
本文介绍了B站游戏业务中实时数据仓库的构建与优化过程。为满足日益增长的数据实时性需求,采用了Hologres作为核心组件优化传统Lambda架构,实现了存储层面的流批一体化及离线-实时数据的无缝衔接。文章详细描述了架构选型、分层设计(ODS、DWD、DIM、ADS)及关键技术挑战的解决方法,如高QPS点查、数据乱序重写等。目前,该实时数仓已广泛应用于运营分析、广告投放等多个场景,并计划进一步完善实时指标体系、扩展明细层应用及研发数据实时解析能力。
Hologres实时数仓在B站游戏的建设与实践
|
存储 分布式计算 MaxCompute
Hologres实时湖仓能力入门实践
本文由武润雪(栩染)撰写,介绍Hologres 3.0版本作为一体化实时湖仓平台的升级特性。其核心能力包括湖仓存储一体、多模式计算一体、分析服务一体及Data+AI一体,极大提升数据开发效率。文章详细解析了两种湖仓架构:MaxCompute + Hologres实现离线实时一体化,以及Hologres + DLF + OSS构建开放湖仓架构,并深入探讨元数据抽象、权限互通等重点功能,同时提供具体使用说明与Demo演示。
|
9月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
546 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
10月前
|
分布式计算 Serverless OLAP
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
Hologres推出Serverless型实例,支持按需计费、无需独享资源,适合新业务探索分析。高性能查询内表及MaxCompute/OSS外表,弹性扩展至512CU,性能媲美主流开源产品。新增Dynamic Table升级、直读架构优化及ChatBI解决方案,助力高效数据分析。
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
|
10月前
|
SQL DataWorks 关系型数据库
DataWorks+Hologres:打造企业级实时数仓与高效OLAP分析平台
本方案基于阿里云DataWorks与实时数仓Hologres,实现数据库RDS数据实时同步至Hologres,并通过Hologres高性能OLAP分析能力,完成一站式实时数据分析。DataWorks提供全链路数据集成与治理,Hologres支持实时写入与极速查询,二者深度融合构建离在线一体化数仓,助力企业加速数字化升级。