使用MaxCompute连接访问Hologres开发实践

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 很多客户使用MaxCompute和Hologres的集成方案同时满足大规模离线分析、实时运营分析、交互式查询及在线Serving等多业务场景。MaxCompute和Hologres之间支持相互读写对方数据,能够消除不必要的数据冗余,形成有效的数据分层并支持离线/实时统一视图和联合分析。本文重点介绍了MaxCompute如何访问Hologres数据的开发实践。

使用MaxCompute连接访问Hologres开发实践

前言

MaxCompute(大数据计算服务)是阿里云Serverless、全托管、支持多种分析场景的云数据仓库服务,Hologres(交互式分析)是阿里云实时交互式分析产品。Hologres具备高并发地实时写入和查询数据的能力,同时支持数据不迁移高性能加速分析MaxCompute数据的能力、联邦分析Hologres实时数据与MaxCompute离线数据,实现离线实时一体化的数据仓库产品解决方案。很多客户使用MaxCompute和Hologres的集成方案同时满足大规模离线分析、实时运营分析、交互式查询及在线Serving等多业务场景。

在Hologres的产品官方文档也给出了"联邦分析实时数据和离线数据"的典型应用场景说明。如下图所示:

image.png

从MaxCompute和Hologres数据交互链路上看,包含2个部分:

  • Hologres能够"直接读"MaxCompute:Hologres借助IMPORT FOREIGN SCHEMA语法批量创建MaxCompute外表,在Hologres侧并高性能加速分析保存在MaxCompute中的数据,例如,Hologres中可进行当日实时消费与MaxCompute中的近30天的消费数据对比分析;具体操作使用参见Hologres产品文档相关内容
  • MaxCompute能够"直接读"Hologres:当日实时数据进入Hologres中,在Hologres侧进行实时运营分析。到T+1时,MaxCompute的ETL逻辑、数据模型加工需要使用当日新增数据,此时Hologres的数据作为MaxCompute数仓的ODS层,MaxCompute无需数据导入,"直接读"取Hologres对应的数据表,完成增量+全量数据的模型加工。

本文将主要介绍在MaxCompute、Holgres的组合方案下,MaxCompute直读Hologres数据源的开发实践。

MaxCompute访问Hologres数据源包含以下2种手段:

  • MaxCompute SQL以外部表方式访问Hologres数据源
  • MaxCompute Spark以JDBC方式直接访问Hologres数据源


MaxCompute SQL外部表方式读写Hologres

详细内容参见MaxCompute产品文档-Hologres外部表部分,以下仅作简要介绍。

首先,在MaxCompute创建Hologres外部表。

在MaxCompute创建Hologres外部表时,您需要在建表DDL语句中指定StorageHandler,并配置JDBC驱动机制参数实现访问MC-Hologres数据源。建表语句定义如下:

create external table <table_name>(
  <col1_name> <data_type>,
  <col2_name> <data_type>,
  ......
)
stored by '<com.aliyun.odps.jdbc.JdbcStorageHandler>'
location '<jdbc:postgresql://<accessid>:<accesskey>@<endpoint>:<port>/<database>?currentSchema=<schema>&preferQueryMode=simple&table=<holo_table_name>/>' 
tblproperties (
  'mcfed.mapreduce.jdbc.driver.class'='org.postgresql.Driver', 
  'odps.federation.jdbc.target.db.type'='holo',
  ['odps.federation.jdbc.colmapping'='<col1:column1,col2:column2,col3:column3,...>']
);


示例如下:

create external table if not exists my_table_holo_jdbc
(
 id bigint,
 name string
)
stored by 'com.aliyun.odps.jdbc.JdbcStorageHandler' 
LOCATION 'jdbc:postgresql://LTAI4FzxmCwzb4BJqFce****:hKZMEFjdLe8ngRT5qp75UYufvT****@hgprecn-cn-oew210utf003-cn-hangzhou-internal.MC-Hologres.aliyuncs.com:80/mc_test?currentSchema=public&preferQueryMode=simple&useSSL=false&table=holo/'
TBLPROPERTIES (
  'mcfed.mapreduce.jdbc.driver.class'='org.postgresql.Driver',
  'odps.federation.jdbc.target.db.type'='holo',
  'odps.federation.jdbc.colmapping'='id:id,name:name'
);

创建的Hologres外部表将映射到指定的Hologres实例下db的某张表。

其次,创建完成后,使用MaxCompute SQL查询该外部表以获得Hologres表的数据

命令示例如下:


--访问MC-Hologres外部表需要添加如下属性。
set odps.sql.split.hive.bridge=true;
set odps.sql.hive.compatible=true;
--查询MC-Hologres外部表数据。
select * from my_table_holo_jdbc limit 10;

MaxCompute SQL同时可以写入数据到Hologres外部表,实现将MaxCompute加工后的消费数据直接导入Hologres,借助Hologres高性能存储的分析引擎,获得最佳的分析体验。

--访问MC-Hologres外部表需要添加如下属性。
set odps.sql.split.hive.bridge=true;
set odps.sql.hive.compatible=true;
--向MC-Hologres外部表插入数据。
insert into my_table_holo_jdbc values (12,'alice');
--查询MC-Hologres外部表数据。
select * from my_table_holo_jdbc;


MaxCompute Spark使用JDBC连接访问Hologres

MaxCompute原生集成Apache Spark分析引擎,借助Spark不仅可以直接分析MaxCompute数据,MaxCompute Spark还可以使用JDBC方式连接Hologres数据源。习惯使用Spark的用户,可以在Spark代码中实现更灵活的业务逻辑。

笔者分别使用以下3种提交模式验证Spark如何访问Hologres。关于MaxCompute Spark支持的几种模式,具体请参考MaxCompute产品官方文档,本文不作展开。

本地(Local)提交模式

熟悉MaxCompute Spark的用户经常使用这个模式做本地测试,验证代码逻辑是否正确。本文主要是用于验证、跑通Spark的JDBC方式能够正常访问Hologres数据源。


样例代码(PySpark):

spark=SparkSession \
    .builder \
    .appName("MC spark") \
    .getOrCreate()
jdbcDF=spark.read.format("jdbc"). \
options(
url='jdbc:postgresql://hgpostcn-cn-xxx-cn-shanghai.hologres.aliyuncs.com:80/test_holo',
dbtable='test',
user='xxx',# e.g.Access_idpassword='xxx', # e.g.Secret_keydriver='org.postgresql.Driver'). \
load()
jdbcDF.printSchema()

这里使用Spark的JDBC连接方式,通过postgresql驱动连接Hologres,访问test_holo数据库下的test表,打印出该表的schema信息。由于是本地测试,选择使用公网方式连接Hologres,其中url为hologres实例的公网访问域名。

使用spark-submit方式提交作业:

#本地Spark访问Holospark-submit --master local  --driver-class-path /drivers/postgresql-42.2.16.jar  --jars  /path/postgresql-42.2.16.jar   /path/read_holo.py

Postgresql的JDBC驱动可访问pg驱动官网下载

提交后查看spark打印日志:

image.png

打印的test表的schema信息与Holo中创建的test表schema一致,访问成功。其他关于JDBC数据源表的数据处理可以参考Apache Spark文档,本文主要介绍如何打通访问,其他处理逻辑不再展开。

MaxCompute集群(yarn-cluster)提交模式

样例代码(PySpark):

spark=SparkSession \
    .builder \
    .appName("MC spark") \
    .getOrCreate()
jdbcDF=spark.read.format("jdbc"). \
options(
url='jdbc:postgresql://hgpostcn-cn-xxx-cn-shanghai-internal.hologres.aliyuncs.com:80/test_holo',
dbtable='test',
user='xxx',# e.g.Access_idpassword='xxx', # e.g.Secret_keydriver='org.postgresql.Driver'). \
load()
jdbcDF.printSchema()

由于是集群提交模式,代码需要上传到MaxCompute云上集群运行,在集群中MaxCompute通过Hologres的经典网络域名以内网方式访问,这里不能使用公网地址。


配置MaxCompute Spark客户端(为MaxCompute适配进行了定制改造,相关介绍和下载,详见产品文档spark-defaults.conf,增加spark.hadoop.odps.cupid.trusted.services.access.list参数项,添加需要访问的目标hologres实例的经典网络域名地址。增加该配置,主要是为了在MaxCompute安全运行沙箱环境中开启到对应Hologres实例的网络策略,否则MaxCompute集群默认无法访问外部服务。

# OdpsAccount Info Settingspark.hadoop.odps.project.name=your_maxc_projectspark.hadoop.odps.access.id=xxxspark.hadoop.odps.access.key=xxx# endpointspark.hadoop.odps.end.point=http://service.cn.maxcompute.aliyun.com/apispark.hadoop.odps.runtime.end.point=http://service.cn.maxcompute.aliyun-inc.com/api#Access holo instancespark.hadoop.odps.cupid.trusted.services.access.list=hgprecn-cn-xxx-cn-shanghai-internal.hologres.aliyuncs.com:80


使用spark-submit方式提交作业:

#本地Spark访问Holospark-submit --master yarn-cluster  --driver-class-path /drivers/postgresql-42.2.16.jar  --jars  /path/postgresql-42.2.16.jar   /path/read_holo.py

提交后查看spark打印日志,作业正常完成会打印作业的Logview以及Spark-UI的Job View链接地址,可供开发者进一步诊断作业。

image.png

查看MaxCompute作业信息Logview、Spark-UI的Job View,验证作业执行成功。

查看日志有无报错,同时打开logview链接,查看作业执行状态。

image.png

查看作业状态为Success,同时点击master-0下的worker的StdOut输出按钮。

image.png

这里是spark代码中jdbcDF.printSchema()的返回结果,与预期一致,验证完成。


MaxCompute Spark同时提供Spark web-ui进行作业诊断,打开日志中的job view链接地址即可访问。

image.png

点击driver的stdout按钮,可查看应用的打印输出是否符合业务预期:

image.png

DataWorks提交模式

更多的用户使用DataWorks作为MaxCompute作业调度系统,通过DataWorks使用者也能够很方便地提交Spark作业到MaxCompute中实现访问Hologres的逻辑。

第一步,登录阿里云控制台,进入dataworks指定的工作空间,进入项目对应的数据开发模块。

image.png

第二步,创建/调整业务流程

1.在业务流程中的MaxCompute节点下,上传postgresql jdbc jar文件,以便spark程序引用驱动。注意,这里要选择使用文件(File)资源类型。

image.png

2.在资源节点下,上传python代码。本文提交了pyspark代码用于测试。

image.png

3.在业务流程画布中选择odps spark节点并填写spark作业的参数信息

  • 业务流程画布中选择MaxCompute引擎支持的ODPS Spark节点

image.png

  • 双击画布中ODPS Spark节点图标,编辑该任务节点,填写任务信息

本文使用了pyspark,因此该节点选择python语言,同时在"选择主python资源"选项中,选择刚才上传的python文件资源。

其中在配置项中为该任务添加hologres目标地址的网络白名单,地址仍然使用Hologres实例的经典网络域名,如图所示:

配置项:spark.hadoop.odps.cupid.trusted.services.access.list

配置项取值:hgpostcn-cn-xxx-cn-shanghai-internal.hologres.aliyuncs.com:80

同时,在"选择file资源"选项中,选择刚才上传到资源中的postgres的驱动jar文件。

image.png

点击保存并提交。

4.在dataworks运行ODPS Spark节点任务进行验证

点击"运行节点",在dataworks页面下方将打印作业日志,其中包含MaxCompute作业的诊断信息Logview链接地址。

image.png

查看日志有无报错,同时打开logview链接,查看作业执行状态。

image.png

查看作业状态为Success,同时点击master-0下的worker的StdOut输出按钮。

image.png

这里是spark代码中jdbcDF.printSchema()的返回结果,与预期一致,验证完成。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
163 0
|
3月前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8314 15
畅捷通基于Flink的实时数仓落地实践
|
3月前
|
SQL 消息中间件 OLAP
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
55 1
|
2月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
46 0
|
2月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
84 0
|
4月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
|
4月前
|
存储 分布式计算 MaxCompute
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中
|
3月前
|
数据可视化
Echarts数据可视化开发| 智慧数据平台
Echarts数据可视化开发| 智慧数据平台
|
3月前
|
数据可视化
Echarts数据可视化大屏开发| 大数据分析平台
Echarts数据可视化大屏开发| 大数据分析平台

相关产品

  • 云原生大数据计算服务 MaxCompute