实时数仓入门训练营:Hologres 数据导入/导出实践-阿里云开发者社区

开发者社区> 阿里云实时计算Flink> 正文
登录阅读全文

实时数仓入门训练营:Hologres 数据导入/导出实践

简介: 《实时数仓入门训练营》由阿里云研究员王峰、阿里云高级产品专家刘一鸣等实时计算Flink版和 Hologres 的多名技术/产品一线专家齐上阵,合力搭建此次训练营的课程体系,精心打磨课程内容,直击当下同学们所遇到的痛点问题。由浅入深全方位解析实时数仓的架构、场景、以及实操应用,7 门精品课程帮助你 5 天时间从小白成长为大牛!

本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》
视频链接:https://developer.aliyun.com/learning/course/807/detail/13891

内容简要:
一、Hologres生态介绍
二、Hologres实时读写接口介绍
三、Hologres实时读写场景介绍
四、Demo演示
五、常见问题及未来展望

Hologres生态介绍

(一)Hologres生态

图片 1.png

Hologres是一款兼容PostgreSQL协议的实时交互式分析产品,也已经打通了大数据生态。以最常见的几个开源组件来说,如Apache Flink、Spark、Hive、Kafka等,Hologres都已经有了相关的Connector实现并进行了开源。

对于实时链路,用户依托Flink或者Spark,就可以将上游的比如埋点或业务数据等,以非常高的性能以及毫秒级的延迟导入Hologres。对于离线链路,Hologres也支持把外部系统的数据以非常简便的操作导入,反过来也支持再将数据备份回外部系统,比如阿里云的MaxComputer、OSS等。

当数据导入Hologres之后,因为Hologres本身兼容PostgreSQL协议,所以能使用各种现成的查询工具,无缝连接Hologres进行数据的展示、查询等。

(二)Dataworks数据集成支持输入

除了刚才提到的大数据场景之外,使用阿里云的Dataworks数据集成功能,我们还能将用户存储在传统数据库中的数据导入Hologres,实现方便高效的数据库整库实时镜像。

图片 2.png

如上图所示,当下Dataworks数据集成支持将MySQL的Binlog,SQLServer的CDC,Oracle的CDC实时镜像同步至Hologres。此外,Dataworks也支持将Kafka,还有阿里云Datahub的数据同步至Hologres。

图片 3.png

另外值得一提的是,Datahub这个产品自身也提供了直接将数据实时同步到Hologres的功能,这个功能叫Datahub Connector。使用这个功能用户就无需经过Flink或者其他组件,可以直接将数据导入到Hologres,对于无需ETL的数据同步是一个比较快捷的方式。

Hologres实时读写接口介绍

图片 4.png

Hologres实时读写实现原理

上图为整个Hologres实时读写实现原理架构图。

从上往下看,最上游是应用端,也就是会读写Hologres的各种客户端,比如说数据集成,Apache Flink、Spark等等。这些客户端通常会使用SQL接口,将读写数据的请求发送给Hologres,这些请求会经过一个负载均衡服务器,然后这些请求就会路由分发到一个叫做Frontend的节点。一个Hologres实例通常有多个Frontend节点,这样就可以支持非常高的QPS请求。Frontend节点主要是负责SQL的Parse、优化等功能。

经过一系列的处理之后,Frontend就会将用户的SQL请求转换成一个物理执行计划,然后这些物理执行计划就会被分发到后端的一个执行节点,执行真正的物理读写请求,最终写入的数据会持久化至分布式文件系统,比如阿里的Pangu系统或者开源的HDFS。

这里要特别强调的是,正常的SQL解析,然后通过Query的优化器优化生成最优执行计划,通常这部分的链路开销是比较大的,对于高QPS的读写场景,这往往会成为一个性能的瓶颈。

所以对于一些常见的SQL场景,这里我们列了几个 SQL,如下所示。

图片 -5.png

Fixed Plan

比如Insert into table values (),就是简单地插入一行或者几行。还有Insert into table values () on conflict do update,就是对数据进行几行的更新。Select * from table where pk = xxx和Delete from table where pk = xxx是根据主键去进行数据的查找或者删除。

对于这些常见的SQL,Hologres的Frontend做了一定的短路优化,略去了很多不必要的优化逻辑,直接生成最优的一个执行计划,并发送给后端的执行节点,这样就能提升整体的请求吞吐。

下面我们看一下,当物理执行计划发送到后端之后是如何处理的。

图片 5.png
图片 6.png

Hologres后端的整体存储引擎是基于Log Structured Merge Tree(LSM)来实现的,这里LSM能够把随机写变成顺序写,大大提升了数据写入的吞吐。

写请求首先会被写到Write Ahead Log,也就是传统的WAL文件中,一旦写入成功了,我们就认为这条记录永久写入成功了。之后,我们会把WAL日志Apply到Mem Table里面,Apply完成后,数据就对外可见了,可以进行查询,这中间的延迟通常在毫秒以内。

当Mem Table写满了之后,我们会有一个异步的线程,将Mem Table刷盘持久化,整体流程是一个比较标准化的LSM实现。

这里有别于其他LSM实现的存储系统,比如HBase,Hologres后端采用了全异步的实现,基于协程省去了操作系统内核线程开销,大大提升了系统CPU的利用率,使得Hologres的读写性能非常优异。

我们再回过头来看一下上面应用端的数据写入接口,现在Flink、Spark和Dataworks读写Hologres其实都使用了一个叫做Holo-Client的SDK。

图片 7.png

Holo-Client基于Jdbc实现,对读写holo最佳实践的封装,可以减轻数据集成开发工作量。
我们也对一些特定场景的SQL做了一定的优化,例如:

  • 数据写入
    1)攒批,基于jdbc reWriteBatchedInserts的实现原理;
    2)数据合并,相同主键的INSERT/DELETE在一个批次中会合并减少请求量;
    3)自动提交,支持基于批行数、批字节大小和最长提交间隔自动提交。
  • 数据点查
    1)提供异步点查接口;
    2)QPS高时自动转入攒批模式。
  • 数据Copy
    提供并发CopyIn的简易接口。
  • 异常处理
    对holo返回异常归类,正确在holo升级、扩容等场景下重试等待实例恢复。

我们非常推荐用户之后如果有读写holo的场景,就使用Holo-client这个SDK。

Hologres实时读写场景介绍

介绍完Hologres的读写接口的实现原理之后,接下来看一下基于Hologres读写接口能实现的几种常见的读写场景。

(一)实时写入场景

图片 8.png

图片 -9.png

第一种是最简单的实时写入场景,如上所示。

这里我们使用了一个Blink SQL的实例,其实就是生成一个随机数据源,然后将数据导入至Hologres。对于实时写入的场景,Hologres支持行存和列存这两种格式,还支持根据主键进行去重,这是相较于很多其他 OLAP系统的一个非常大的亮点。

另外,Hologres的实时写入还支持整行数据更新或者数据的局部更新。对于性能而言,Hologres导入即可见,拥有非常低的延迟,通常延迟在毫秒以内。经过我们自己的测试,以TPCH PartSupp表为例,我们后端单Core能达到2万左右的RPS,而且该性能可以随着实例的资源进行线性扩展。

(二)实时宽表Merge场景

然后接下来我们介绍一下实时宽表Merge的场景,这里其实是使用了holo的整行局部更新的功能。

图片 9.png

以上图为例,比如用户想将多个数据源的数据合并成一张宽表写入至Hologres。我们希望整张表最终有A|B|C|D|E|F六个列,然后有一部分数据,比如说A|B|C|D这四个列是在一个数据源里面,然后A|B|E|F是在另外一个数据源里,我们希望把这两个数据源的数据合并写入至Hologres的一张宽表。

常见的一种实现是我们会使用Flink的Join功能,就是使用两个流同时消费上述数据源,然后在Flink里面进行两个流的Join,进行数据的打宽,最后写入到Hologres里面。

但是这种场景的一个问题是Flink的Join开销通常非常大,因为它需要缓存非常多的状态,这对于整个作业的维护是一个非常大的开销。

下面我们来看一下Hologres是如何解决这个问题的。

图片 10.png

上文提到Hologres自身支持整合数据的局部更新功能,如上图所示,我们可以直接用两个流来直接写Hologres,而无需进行再做Flink内的Join。一个流比如A|B|C|D可以直接写Hologres,另外一个流A|B|E|F也可以直接写Hologres。因为这两个流的数据有相同的主键,所以当两行数据用相同的主键写入到Hologres的时候,Hologres内部会进行一个Merge,最终达到数据打宽的功能,省去了用户自己去写Flink Join,以及维护这么一个复杂作业的问题。

(三)实时维表Join场景

介绍完实时写入Hologres场景之后,下面我们来看一下实时读的场景。

实时读通常分为两种,第一种就是我们常见的Flink的实时维表Join场景,维表Join就是一个点查的实现。

图片 11.png

图片 12.png

这里Hologres的行存表通常可以替换HBase来实现Flink的维表功能,提供非常高的吞吐以及非常低的延迟。

(四)Hologres Binlog场景

实时读的第二种场景是Hologres Binlog场景。Binlog和MySQL的Binlog是一个类似的概念,使用Hologres Binlog我们就能实时消费Hologres单表的Change log,可以对每行数据的更新进行追踪记录。

现在实时计算 Flink 版的Hologres CDC Source,能实现表的实时镜像同步,甚至使用Flink + Hologres,能够实现ODS到DWD表的实时 ETL。

Hologres的Binlog功能默认是不开启的。

图片 13.png

上图是一个例子,列出了如何使用Hologres的Binlog,这里是一个建表的DDL。

可以看到我们有额外两个表的属性,一个叫做binlog.level,设置为replica,代表这张表会开启Hologres的Binlog功能,’binlog.ttl’就是代表Binlog数据的一个生命周期,下面我们使用Hologres Binlog看看能达到一个什么样的效果。

图片 14.png

由于Hologres是一个强Schema的数仓,所以我们甚至能够用SQL接口来查询Hologres的Binlog。如上所示,这里我们通过提供几个隐藏列:hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,就能查询到Hologres的Binlog。

这里hg_binlog_lsn就是代表了每条Binlog生成的LSN序列号,然后hg_binlog_event_type是代表了Binlog的消息类型,它是代表Delete还是Insert的,或者是Before Update,或者是说After Update。这里的hg_binlog_timestamp_us代表这条Binlog生成的时间。

有了这几个隐藏列之后,用户就可以非常方便地使用SQL来进行Binlog的查询,进行数据的Debug。

Demo演示

(一)实时计算 Flink 版实时读写Hologres Demo

介绍完Hologres的读写场景之后,我们通过实际操作的Demo来看一下如何使用Flink来实时读写Hologres。

图片 15.png

如上图所示,首先,我们这里有两张Hologres的表,这两张表都会开启Binlog。我们假设这两者会有实时的写入,然后我们会写另外一个Flink任务去实施消费这两张表的Binlog,进行这两张表Binlog的Merge,甚至进行一定的group by计算操作,最终将这两张表的数据同步写入之后关Hologres的另一张结果表。

接下来进入演示,首先我们看一下Hologres建表的DDL,如下所示。

图片 -16.png


a表的建表DDL

图片 -17.png

b表的建表DDL

这两张表有两个相同的字段,分别叫id和create_time,之后会进行一个数据的聚合。每张表还会有一个不同的值,value_from_a是表a所特有的,value_from_b是表b所特有的。

图片 -18.png

结构表

最后我们会有一张结构表,这张结果表有a和b两张表共有的两个列,分别从a和b两张表得到了另外两个列a和b,我们希望将a和b的数据进行一个实时聚合,写入到Sink表里面。

我们看一下整个Flink的SQL。

图片 16.png

这里首先是分别声明了两张Hologres的source表,需要实时的消费Hologres两张表的Binlog。

需要注意的是,我们这里需要开启‘binlog’=‘true’这个参数来让Flink进行消费Hologres的Binlog以及开启CDC模式。

图片 16.png

结果表

然后我们来看一下结果表的声明,如上所示。

在这里需要注意的是,我们需要设置一个‘ignoreDelete’=‘false’,这样防止我们会忽略包括Delete或者beforeUpdate这种类型的数据,导致数据的不一致。

图片 18.png

我们看一下整个Flink计算逻辑的SQL,如上所示。

这里的逻辑其实比较简单,其实只是将两张表的结果union起来,然后进行一个group by id和create_time进行实时的sum,写入到Hologres的结果表。

这里的作业上线之后,我们可以直接启动运行该作业。

在启动的过程中,我们可以看一下当前Hologres这几张表的状态。

图片 19.png

可以看到当前Hologres这几张表都是一张空表,我们会对这几张表进行更新,然后看一下数据的同步的过程。

图片 20.png

首先往a表插入一条数据,可以看到a表的数据已经实时同步到结果表中。
接下来对b表数据进行一个更新。

图片 21.png

可以看到这两个流的数据已经实时更新到结果表,并进行了准确的数据聚合。

接下来我们再更新a表。

图片 22.png

可以看到对于源表a的实时更新,已经正确地反映到了结果表当中,Flink非常正确地计算出了两个流的结果。

我们甚至可以看一下这张sink表的binlog数据,因为我们这张结构表也同样开启了binlog的功能,如下所示。

图片 23.png

可以看到,我们拿到这张表所有的变更记录,和我们预期的效果保持了一致。

以上就是Flink实时读写Hologres的Demo。

(二)Dataworks实时同步Hologres Demo

接下来我们看一下使用Dataworks将PolarDB的数据实时同步到Hologres的Demo演示。

首先我们进入到数据集成,数据同步要进行一个数据源的添加,点击数据源添加。

图片 24.png

接着新增数据源,选择Hologres,填充完所有的信息之后,我们就可以进行一个数据的添加。

图片 25.png

新增数据源

接下来进行数据同步的演示。

图片 26.png

如上所示,首先这里已经有了一个 PolarDB的数据库,以及预先创建好了一张user_details表,可以看到这里已经有三条查询结果记录,之后我们希望把这张表的数据同步到Hologres当中。

然后我们返回到数据集成,点击一键实时同步至Hologres,如下所示。

图片 27.png

在基本配置中,数据源选择预先创建好的数据源PolarDB,之后选择需要同步的表user_details,然后点击下一步。

图片 28.png

图片 29.png

之后,我们会需要选择目标Hologres的数据源,添加后进行刷新,可以刷新出user_details这张表,然后可以配置这张表是否需要自动建表,还是用已经有的表,这里选择我们自动建表,然后点击下一步。

图片 30.png

在DDL消息处理规则中,我们可以配置各种各样的策略处理,根据需求配置好规则后选择下一步。

图片 31.png

接下来进行运行资源配置。对Dataworks数据进行实时同步,我们通常需要一个独享资源组,在这里我们已经完成了独享资源组的购买,然后选择各个同步功能所需要的资源组,完成配置并点击立即执行,等待作业的启动。

图片 32.png

可以看到PolarDB的数据已经实时同步到Hologres这张结构表当中。

图片 33.png

接下来可以对这张表再进行一定的更新,我们往这张user_details表里面重新插入一条1004数据,数据插入成功后可以看一下Hologres结构表。

图片 34.png

从后台可以看到,1004这条数据已经实时同步至Hologres,如下所示。

图片 35.png

通过上方的演示可以看到,使用Dataworks实时同步Hologres功能,我们可以非常便捷地将数据库中的数据同步到Hologres。

常见问题及未来展望

(一)实时计算 Flink 版 Hologres Connector常见问题

经过上述提到的关于Hologres应用场景以及几个Demo,接下来看一下在使用过程中通常会遇到什么问题。

Q: 作业启动失败,无法连接Hologres。
A: Hologres实例需要与Flink集群在同一Region,且使用VPC Endpoint。

Q: 实时写入结果表数据不符合预期。
A: 通常是由回撤引起,需要正确设置ignoreDelete参数。

Q: 实时写入性能慢。
A: 当前高QPS场景的列存表局部更新开销较大,建议换成整行更新或者行存写入。

Q: 维表查询性能较差,且Hologres实例CPU负载高。
A: 通常是由于使用了列存表作为维表,建议切换至行存表。

Q: 实时消费Binlog报错。
A: 通常是由于表没有开启Binlog导致,需要重建表。

(二)未来展望

接下来看一下整个Hologres在实时读写链路上的一个未来的规划和展望。

- Flink One-To-Many 维表Join

这是一个即将上线的功能,我们会在Flink实现一对多的维表Join功能,就不需要强制使用Hologres表的主键进行维表查询。
但需要注意的是,这种场景下面通常性能不会特别好,因为难以避免的查询会导致整表的扫描,使得延迟比较高,所以还是建议用户尽可能使用一对一的点查场景。

- 基于JDBC实时消费Hologres Binlog

当前Hologres Binlog实现是使用了内置接口,暂时没有对外透出。之后,我们会实现基于JDBC的接口实现让用户实时消费Hologres Binlog。

- Dataworks数据集成实时消费Hologres Binlog

当前数据集成并不支持消费Hologres数据,之后我们会支持使用Dataworks的数据集成,能够实时消费Hologres Binlog,这样就能将Hologres的数据实时镜像地同步到其他的数据库当中。

- 无连接限制的SQL读写

由于PostgreSQL的模型限制,当前Hologres整个实例的连接数有一定的限制,之后我们会提供一个无连接限制的SQL读写的功能。

上文提到Hologres的一些 connector和Holo-Client,都已经开源到Github上面,有需求的用户可以访问下方链接进行使用。

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

一套基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理、DataLake计算等场景。

官方博客
最新文章
相关文章
链接