1. Hologres 概述与核心架构
阿里云实时数仓Hologres是一款兼容PostgreSQL协议的一站式实时数仓引擎。它采用可扩展的大规模并行处理(MPP)架构,支持PB级数据亚秒级交互式分析。Hologres的核心价值在于实现了湖仓存储一体、多模式计算一体、分析服务一体、Data+AI一体的统一数据平台。
在架构设计上,Hologres具备以下几个关键特性:
- 实时写入与即时查询:Hologres支持高并发实时写入与更新,数据写入后即可查询。写入延迟基本可以达到毫秒级别,写入支持整行和局部更新。
- 行列共存存储:Hologres支持一张表同时存储行存数据和列存数据,两份数据强一致。这使得中间层表既能作为Flink的源表,也能作为维表进行主键点查与维表Join。
- Binlog能力:Hologres提供Binlog能力,可用于驱动Flink进行实时计算,以此作为流式计算的上游。
- 资源强隔离:Hologres支持通过主从实例读写分离部署或计算组实例架构实现资源强隔离。计算组形态能够在共享同一数据的同时,实现读写计算资源的隔离。
- 兼容PostgreSQL生态:Hologres兼容PostgreSQL 11,提供JDBC/ODBC接口,支持对接第三方ETL和BI工具。
Hologres的典型应用场景包括实时大屏、用户画像分析、个性化推荐、实时风控、OLAP多维分析等。它与MaxCompute、Flink、DataWorks深度融合,提供离在线一体化全栈数仓解决方案。
2. 准备工作:开通实例与网络配置
在使用Hologres之前,需要先完成实例的开通与基础配置。
需要先登录阿里云控制台,点击:阿里云控制台
2.1 开通Hologres实例
登录阿里云控制台后,进入Hologres产品页面,点击开通实例。新用户通常可以享受一定时长的免费试用期。在实例创建页面,需要选择地域、实例规格、网络类型等参数。实例创建大约需要5-10分钟,待实例状态变为\"运行正常\"后即可使用。
2.2 获取连接信息
实例创建成功后,进入Hologres管理控制台,在左侧导航栏选择\"实例列表\",单击目标实例,在实例详情页的\"网络信息\"中获取网络地址(Endpoint)和端口(Port)。需要注意:
- 请根据应用运行所在的网络环境(VPC内网或公网)选择正确的网络地址,否则将无法正常连接。
- 端口默认为5432。
- 开通Hologres实例后,系统自动创建postgres数据库,但该数据库分配到的资源较少,建议新建数据库用于实际业务。
2.3 创建数据库与表
可以通过HoloWeb(Hologres自带的Web管理工具)或SQL客户端执行建库建表操作。以下是通过SQL创建数据库和表的示例:
-- 创建数据库 CREATE DATABASE my_warehouse; -- 切换到新数据库 \c my_warehouse; -- 创建订单明细表(列存模式,适合OLAP分析) CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, user_id BIGINT, product_id BIGINT, order_amount DECIMAL(38, 2), order_status VARCHAR(20), create_time TIMESTAMP ) WITH ( orientation = 'column', distribution_key = 'order_id', partition_key = 'create_time' );
3. 通过 JDBC 连接 Hologres
Hologres兼容PostgreSQL有线协议,因此任何使用PostgreSQL JDBC驱动(42.3.2及以上版本)的工具或应用都可以连接Hologres。
3.1 添加Maven依赖
对于Java应用,在pom.xml中添加PostgreSQL JDBC驱动依赖:
<dependencies> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.2</version> </dependency> </dependencies>
3.2 构建连接字符串
JDBC连接字符串的格式如下:
jdbc:postgresql://<Endpoint>:<Port>/<DBNAME>?user=<ACCESS_ID>&password=<ACCESS_KEY>
各参数说明:
- Endpoint:Hologres实例的网络地址,从控制台获取。
- Port:端口号,默认为5432。
- DBNAME:数据库名称。
- ACCESS_ID:AccessKey ID,建议通过环境变量配置。
- ACCESS_KEY:AccessKey Secret,建议通过环境变量配置。
可选参数:
- ApplicationName:为连接打上应用名称标签,便于在慢查询列表中识别。
- reWriteBatchedInserts=true:将批量插入重写为单条多值INSERT语句,提高写入吞吐量。
- currentSchema:设置默认Schema。
3.3 Java代码示例
import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class HologresJDBCExample { public static void main(String[] args) { String endpoint = \"hgpostcn-cn-xxxxx-cn-hangzhou-vpc-st.hologres.aliyuncs.com\"; int port = 5432; String dbName = \"my_warehouse\"; String accessId = System.getenv(\"HOLOGRES_ACCESS_ID\"); String accessKey = System.getenv(\"HOLOGRES_ACCESS_KEY\"); String url = String.format(\"jdbc:postgresql://%s:%d/%s?user=%s&password=%s&reWriteBatchedInserts=true\", endpoint, port, dbName, accessId, accessKey); try (Connection conn = DriverManager.getConnection(url)) { // Hologres不支持事务内多条写入,需将autoCommit设为true conn.setAutoCommit(true); try (Statement stmt = conn.createStatement()) { // 查询示例 ResultSet rs = stmt.executeQuery(\"SELECT order_id, user_id, order_amount FROM orders LIMIT 10\"); while (rs.next()) { System.out.printf(\"order_id=%d, user_id=%d, amount=%.2f\\n\", rs.getLong(\"order_id\"), rs.getLong(\"user_id\"), rs.getDouble(\"order_amount\")); } // 批量插入示例 stmt.executeUpdate(\"INSERT INTO orders VALUES (1001, 201, 5001, 299.00, 'PAID', NOW())\"); } } catch (Exception e) { e.printStackTrace(); } } }
注意事项:Hologres不支持单事务内多条写入,因此需要将autoCommit设置为true。如果遇到\"ERROR: INSERT in transaction is not supported now\"错误,请显式调用conn.setAutoCommit(true)。
3.4 计算组实例的连接
对于计算组型实例,可以在连接字符串中显式指定计算组:
jdbc:postgresql://<Endpoint>:<Port>/<database_name>@<warehouse_name>
未显式指定计算组时,将使用用户默认的计算组连接实例。
4. 通过 Python Psycopg2 连接 Hologres
Hologres兼容PostgreSQL 11,因此可以通过Python的psycopg2库访问Hologres。
4.1 安装psycopg2
pip install psycopg2-binary
4.2 Python连接示例
import psycopg2 import os # 从环境变量获取认证信息 host = os.environ.get('HOLOGRES_ENDPOINT') port = int(os.environ.get('HOLOGRES_PORT', 5432)) database = os.environ.get('HOLOGRES_DATABASE') user = os.environ.get('HOLOGRES_ACCESS_ID') password = os.environ.get('HOLOGRES_ACCESS_KEY') # 建立连接 try: conn = psycopg2.connect( host=host, port=port, dbname=database, user=user, password=password, application_name='python_etl_job' ) cur = conn.cursor() # 建表 cur.execute(\"\"\" CREATE TABLE IF NOT EXISTS user_behavior ( user_id BIGINT, event_type VARCHAR(50), event_time TIMESTAMP, page_url VARCHAR(500) ) WITH ( orientation='column', distribution_key='user_id' ) \"\"\") conn.commit() # 批量插入数据 import time cur.execute(\"\"\" INSERT INTO user_behavior SELECT generate_series(1, 1000) AS user_id, 'click' AS event_type, NOW() AS event_time, '/product/' || generate_series(1, 1000) AS page_url \"\"\") conn.commit() # 查询数据 cur.execute(\"SELECT COUNT(*) FROM user_behavior\") count = cur.fetchone()[0] print(f\"Total records: {count}\") cur.close() conn.close() except Exception as e: print(f\"Error: {e}\")
psycopg2的连接方式也支持DSN字符串格式:
conn = psycopg2.connect(\"dbname=my_warehouse user=access_id password=access_key host=endpoint port=5432\")
5. 通过 Flink 实时写入 Hologres
Flink与Hologres的深度集成是构建实时数仓的核心路径。从开源Flink 1.11版本开始,Hologres Connector已开源并发布到Maven中央仓库。
5.1 添加Flink Connector依赖
在Flink项目的pom.xml中添加Hologres Connector依赖:
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-flink-1.15</artifactId> <version>1.4.0</version> <classifier>jar-with-dependencies</classifier> </dependency>
各Flink版本与Connector版本的对应关系:
| Flink版本 | Connector版本 |
| Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
| Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
| Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
| Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
5.2 Flink SQL 写入示例
通过Flink SQL将数据写入Hologres:
String createHologresTable = String.format( \"create table sink(\" + \" user_id bigint,\" + \" user_name string,\" + \" price decimal(38,2),\" + \" sale_timestamp timestamp\" + \") with (\" + \" 'connector'='hologres',\" + \" 'dbname' = '%s',\" + \" 'tablename' = '%s',\" + \" 'username' = '%s',\" + \" 'password' = '%s',\" + \" 'endpoint' = '%s'\" + \")\", database, tableName, userName, password, endPoint); tEnv.executeSql(createHologresTable); createScanTable(tEnv); tEnv.executeSql(\"insert into sink select * from source\");
5.3 实时数仓分层架构
基于Flink+Hologres搭建实时数仓的典型分层架构如下:
- ODS层(操作数据层):业务数据库(如MySQL)通过Flink CDC实时同步到Hologres。
- DWD层(明细数据层):将ODS层的多张表进行实时打宽,生成主题宽表。
- DWS层(汇总数据层):实时消费DWD层宽表的Binlog,事件驱动地聚合出用户维度和商户维度的指标表。
- ADS层(应用数据层):通过Hologres对外提供应用查询,支持百万级RPS的点查和秒级响应的OLAP分析。
该方案的核心优势包括:
- 高效更新与即时查询:Hologres支持每一层数据的高效更新和写入即可查。
- 数据分层复用:所有层级数据都可单独对外提供服务。
- 架构简化:基于Flink SQL构建实时ETL链路,所有层级数据统一存储在Hologres。
5.4 Flink Connector 高级特性
Hologres Flink Connector支持以下高级功能:
- 维表Join:支持百万级别的点查,可作为Flink维表使用。
- 局部更新:支持结果表的实时写入和局部更新,便于宽表合并。
- CDC DDL同步:上游增加列时,Hologres会自动添加相应列。
- 全增量一体化消费:源表支持全量数据读取和基于Binlog的增量消费。
- Flink Catalog:在Flink中创建Catalog,可将Hologres数据库中的所有表映射到Flink。
6. 通过 Spark 读写 Hologres
Hologres提供Spark Connector,支持在Spark集群中批量读写Hologres数据。
6.1 添加Spark Connector依赖
从Maven中央仓库下载hologres-connector-spark-3.x JAR包:
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> </dependency>
6.2 Spark读写示例
Spark可以通过JDBC读取Hologres表中的数据:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName(\"HologresSparkExample\") .getOrCreate() // 读取Hologres表 val jdbcUrl = \"jdbc:postgresql://endpoint:5432/my_warehouse?user=access_id&password=access_key\" val df = spark.read .format(\"jdbc\") .option(\"url\", jdbcUrl) .option(\"dbtable\", \"orders\") .option(\"driver\", \"org.postgresql.Driver\") .load() df.show(10) // 批量写入Hologres df.write .format(\"jdbc\") .option(\"url\", jdbcUrl) .option(\"dbtable\", \"orders_copy\") .option(\"driver\", \"org.postgresql.Driver\") .mode(\"append\") .save()
Hologres Connector相比原生JDBC写入有更好的性能优化,建议在Spark作业中使用Connector进行数据写入。
7. 通过 DataWorks 同步数据至 Hologres
DataWorks数据集成提供Hologres数据读取与写入的能力,支持离线同步、实时同步、全增量同步等多种数据同步场景。
7.1 MySQL整库实时同步至Hologres
DataWorks支持将MySQL整库数据全量+增量实时同步至Hologres。操作步骤如下:
- 登录DataWorks控制台,进入数据集成页面。
- 在左侧导航栏单击\"同步任务\",点击\"新建同步任务\"。
- 配置数据来源为MySQL,数据去向为Hologres,同步类型选择\"整库实时\"。
- 选择同步步骤:全量同步和增量同步。
- 配置网络与资源组,测试数据源连通性。
- 选择要同步的库表,支持通过正则表达式筛选。
- 配置目标表映射关系,确认后启动同步任务。
对于实时整库同步,系统会先将历史数据全量同步到Hologres,随后自动启动实时同步流程,将源端的变更实时同步到Hologres。
7.2 DataWorks上的Hologres任务开发
DataWorks提供数据开发(DataStudio)功能,支持Hologres SQL任务的开发与调度。HoloStudio能力已整合至DataStudio中。在DataWorks上进行Hologres任务开发时,需要注意:
- 需要购买相应DataWorks版本及所需资源组。
- 完成Hologres数据源的创建与绑定。
- DataWorks提供产品级与模块级的权限控制。
8. 对接 BI 工具进行可视化分析
Hologres兼容PostgreSQL生态,提供JDBC/ODBC接口,支持对接第三方ETL和BI工具。支持的BI工具包括:
- Quick BI:阿里云自研BI工具,与Hologres深度集成。
- Tableau:通过PostgreSQL JDBC驱动连接。
- Power BI:通过ODBC驱动连接。
- Superset:开源BI工具,支持PostgreSQL数据源。
在Quick BI中连接Hologres时,只需选择PostgreSQL数据源类型,填入Hologres实例的连接信息即可。Hologres查询的数据可以直接对接BI工具,进行多维分析和业务探索。
9. 表设计最佳实践
9.1 存储模式选择
Hologres支持列存和行存两种存储模式:
- 列存:适合高频分析的OLAP场景,减少全表扫描的I/O开销。
- 行存:适合高QPS点查场景。
建表时通过WITH子句的orientation参数指定:
CREATE TABLE orders ( order_id BIGINT, user_id BIGINT, amount DECIMAL(38,2) ) WITH (orientation = 'column');
9.2 分布键设计
分布键决定了数据在Hologres各Shard节点间的分布方式。设计原则:
- 选择分布比较均衡的字段(如订单ID、用户ID、事件ID)。
- 多个需要JOIN的表使用相同的分布键,使相关数据分片到同一个Shard,通过Local JOIN实现更高效率。
- 高频查询的过滤字段也可作为分布键,便于直接定位到包含所需数据的分片。
CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, user_id BIGINT ) WITH (distribution_key = 'order_id');
9.3 分区策略
对高频查询字段(如时间、用户ID)进行分区,结合PARTITION BY实现分区剪枝,减少扫描数据量。分区键的选择应使每次查询结果尽可能存储在相同或最少的分区内。
CREATE TABLE orders ( order_id BIGINT, create_time TIMESTAMP ) WITH ( partition_key = 'create_time', partition_interval = 'day' );
分布键和分区键在建表时设定后无法修改,因此需要提前规划好。
10. 性能优化进阶
10.1 物化视图
Hologres的实时物化视图会对明细表数据进行预先聚合存储,查询时直接读取物化视图,减少计算量。与传统物化视图需要手动刷新不同,Hologres的物化视图支持自动更新,基表数据变更后会立即反映到视图中。
-- 创建实时物化视图 CREATE MATERIALIZED VIEW daily_order_summary AS SELECT DATE(create_time) AS order_date, COUNT(*) AS order_count, SUM(order_amount) AS total_amount FROM orders GROUP BY DATE(create_time);
10.2 Dynamic Table
Hologres推出了声明式数据处理架构Dynamic Table,业务可以根据需求设置不同的数据刷新策略,实现数据从基表到Dynamic Table的自动流转。Dynamic Table能够做到数据自动计算、结果更新,实现更高效、更低成本的数据自动流动与数仓分层。
10.3 计算资源管理
对于多团队共用Hologres的场景,推荐使用计算组型实例实现负载隔离:
- 主计算组(init_warehouse):数据中台团队使用,负责数据写入与加工。
- 只读计算组1:分析业务方使用。
- 只读计算组2:推荐业务方使用。
对于业务高峰期的资源诉求,可以使用分时弹性功能自动扩缩容。对于极端大任务(如大量历史数据回刷、复杂关联查询等),可以开启自适应Serverless Computing。
11. 安全管理与监控
11.1 权限管理
Hologres支持通过RAM进行账号授权管理。建议使用RAM子账号进行日常操作,避免主账号AccessKey泄露风险。在数据库层面,Hologres兼容PostgreSQL的权限体系,支持GRANT/REVOKE进行精细化权限控制。
11.2 监控告警
Hologres控制台提供丰富的监控指标,包括:
- CPU使用率、内存使用率
- 连接数
- QPS与查询延迟
- 存储空间使用量
可以配置云监控告警规则,在指标异常时及时通知运维人员。
11.3 常见问题排查
使用Hologres过程中可能遇到的常见问题及排查思路:
- 连接失败:检查网络类型是否匹配(VPC内网/公网),确认Endpoint和端口正确。
- OOM(内存溢出):检查是否有大查询或全表扫描,可开启自适应Serverless Computing或优化SQL。
- 写入性能下降:检查是否产生了大量小文件,可通过Flink的Repartition操作优化。
- 数据同步延迟:检查DataWorks资源组配置和网络带宽。
12. 总结
本文系统介绍了阿里云实时数仓Hologres的多种对接方式与使用最佳实践。Hologres作为一款兼容PostgreSQL协议的一站式实时数仓引擎,通过JDBC、Python Psycopg2、Flink、Spark、DataWorks等多种方式均可轻松接入。在实时数仓建设中,Flink+Hologres的组合能够实现ODS→DWD→DWS→ADS的完整分层架构,具备毫秒级写入、秒级查询、数据分层复用等核心优势。在表设计层面,合理选择存储模式、分布键和分区键是性能优化的基础;在进阶优化层面,物化视图、Dynamic Table和计算组隔离能够进一步提升系统的稳定性与效率。希望本文能够帮助读者快速上手Hologres,构建高效、稳定的实时数据架构。
常见问题解答
Q1:Hologres与PostgreSQL是什么关系?
A:Hologres兼容PostgreSQL 11的协议和语法,因此可以使用PostgreSQL的JDBC/ODBC驱动以及psycopg2等客户端工具进行连接。但Hologres是专门的实时数仓引擎,在架构和性能优化上与原生PostgreSQL有本质区别。
Q2:Flink写入Hologres时需要注意什么?
A:需要注意三点:一是根据Flink版本选择对应的Connector版本;二是Hologres作为结果表时不支持事务内多条写入,需将autoCommit设为true;三是对于有主键的表,Flink Connector会自动处理UPSERT语义,实现数据的更新与去重。
Q3:如何选择分布键和分区键?
A:分布键应选择分布均衡的字段(如订单ID、用户ID),多表JOIN时建议使用相同分布键以实现Local JOIN。分区键应选择高频查询的过滤字段(如日期),通过分区剪枝减少扫描数据量。两者在建表时设定后不可修改,需提前规划。
Q4:Hologres支持哪些数据导入方式?
A:Hologres支持多种数据导入方式:Flink实时写入、DataWorks数据同步(离线/实时/全增量)、Spark批量读写、JDBC批量INSERT、以及通过HoloWeb控制台上传CSV文件等。
Q5:Hologres如何对接BI工具?
A:Hologres兼容PostgreSQL生态,提供JDBC/ODBC接口,支持对接Quick BI、Tableau、Power BI、Superset等主流BI工具。只需在BI工具中选择PostgreSQL数据源类型,填入Hologres的连接信息即可。
Q6:Hologres如何进行成本优化?
A:成本优化可从以下几方面入手:合理选择实例规格避免资源浪费;使用计算组隔离实现负载分离,按需扩缩容;利用分时弹性应对业务高峰;对于冷数据可转为低频存储或归档存储;通过物化视图和Dynamic Table减少重复计算消耗。