操作手册
【实践】基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
场景简介
在阿里云大数据服务中,利用MaxCompute服务进行海量历史推送数据的存储以及查询,同时利用实时计算Flink进行实时广告曝光数据的收集,并同历史数据一起汇聚在阿里云实时交互式查询工具Hologres中,最后利用DataV进行动态展示。
在本实验中,模拟一个简单的广告点击场景。通过搭建Hologres实时交互平台,将存储在MaxCompute中的批量推送数据和通过Flink实时收集的点击数据进行汇聚。从Hologres的查询结果,可以动态的调整推送的目标人群。本场景中,会聚合点击的手机操作系统信息,调整推送对于iOS和Android的倾向性。
背景知识
Hologres是阿里巴巴自主研发的一款交互式分析产品,兼容PostgreSQL 11协议,与大数据生态无缝连接,支持高并发和低延时地分析处理PB级数据。
随着收集数据的方式不断丰富,企业信息化程度越来越高,企业掌握的数据量呈TB、PB或EB级别增长。同时,数据中台的快速推进,使数据应用主要为数据支撑、用户画像、实时圈人及广告精准投放等核心业务服务。高可靠和低延时地数据服务成为企业数字化转型的关键。
Hologres致力于低成本和高性能地大规模计算型存储和强大的查询能力,为您提供海量数据的实时数据仓库解决方案和实时交互式查询服务。
阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源Flink API,提供丰富的企业级增值功能。
MaxCompute是面向分析的企业级SaaS模式云数据仓库,以Serverless架构提供快速、全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您可以经济并高效的分析处理海量数据。数以万计的企业正基于MaxCompute进行数据计算与分析,将数据高效转换为业务洞察。
DataV数据可视化是使用可视化应用的方式来分析并展示庞杂数据的产品。DataV旨让更多的人看到数据可视化的魅力,帮助非专业的工程师通过图形化的界面轻松搭建专业水准的可视化应用,满足您会议展览、业务监控、风险预警、地理信息分析等多种业务的展示需求。
本实验整体架构说明
场景描述
在阿里云大数据服务中,利用MaxCompute服务进行海量历史推送数据的存储以及查询,同时利用实时计算Flink进行实时广告曝光数据的收集,并同历史数据一起汇聚在阿里云实时交互式查询工具Hologres中,最后利用DataV进行动态展示。
本最佳实践通过一个实际的demo来演示如何建立MaxCompute集群,使用Flink收集实时数据,并利用Hologres进行汇聚查询。
系统架构
相关视频
场景相关视频,请观看基于Hologres轻松玩转一站式实时仓库。
前提条件
云起实验室将在您的账号下开通本次实操资源,资源按量付费,需要您自行承担本次实操的云资源费用。
本实验预计两个小时产生费用25元。如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
进入实操前,请确保阿里云账号满足以下条件:
创建实验资源
在实验页面,勾选我已阅读并同意《阿里云云起实践平台服务协议》和我已授权阿里云云起实践平台创建、读取及释放实操相关资源后,单击开始实操。
创建资源需要10分钟左右的时间,请您耐心等待。
在云产品资源列表,您可以查看本场景涉及的云产品资源信息。
创建Hologres数据库
本步骤指导您如何登录Hologres控制台,并创建数据库。
在云产品资源列表的实时数仓区域,单击管理。
在实例详情页面,单击登录实例,进入HoloWeb。
在元数据管理页签,单击新建库。
在新增数据库对话框中,数据库名称设置为
db4demo
,然后单击确认。
创建Hologres数据表
在顶部菜单栏中,单击SQL编辑器。
在左侧Query查询区域,单击图标新建SQL查询。
将如下命令复制并粘贴至临时Query查询页签中,单击运行。
BEGIN ; DROP TABLE IF EXISTS log_pv; CREATE TABLE IF NOT EXISTS log_pv ( date_time DATE NOT NULL, day_time TEXT , rmid TEXT NOT NULL, rmid_hash TEXT, ver TEXT, publisher_id TEXT, ip TEXT, bc TEXT, camp TEXT, message TEXT ); SET hg_experimental_enable_shard_count_cap=off; CALL set_table_property('log_pv', 'distribution_key', 'rmid'); CALL set_table_property('log_pv', 'segment_key', 'date_time'); CALL set_table_property('log_pv', 'clustering_key', 'rmid'); CALL set_table_property('log_pv', 'shard_count', '20'); COMMIT ;
执行成功后会返回执行成功结果,如果未显示下图所示结果请排查并重新执行。
创建阿里云AccessKey
在本实验完成后,若不再使用,建议参考实验手册步骤及时删除阿里云AccessKey。
前往AccessKey管理。
在不建议使用云账号AccessKey对话框,阅读创建主账号AccessKey的风险,如果必须要创建主账号AccessKey,则勾选我确认知晓云账号AccessKey安全风险,然后单击继续使用云账号AccessKey。
在AccessKey页面,单击创建AccessKey。
根据界面提示完成安全验证。
在创建云账号AccessKey对话框,再次阅读创建主账号AccessKey的风险及主账号AccessKey使用限制,如果确定要创建主账号AccessKey,则勾选我确认知晓云账号AccessKey安全风险,然后单击继续使用云账号AccessKey。
在创建AccessKey对话框,保存AccessKey ID和AccessKey Secret,然后勾选我已保存好AccessKey Secret,最后单击确定。
创建Flink数据处理作业
在云产品资源列表的实时计算Flink版区域,单击管理。
在实时计算控制台页面,找到您的工作空间,单击右侧操作列下的控制台。
说明您可在云产品资源列表中查看到实时计算Flink版的实例ID。
在左侧导航栏中,选择新建 > SQL。
在新建作业草稿对话框中,选择SQL基础模板 > 空白的流作业草稿,单击下一步。
在空白的流作业草稿创建页中,自定义文件名称,引擎版本选择vvr-8.0.9-flink-1.17,单击创建。
说明注意所选择的引擎版本,在创建Session集群时会使用到。
将如下SQL复制到作业开发文本编辑区,并参考下方说明修改SQL中的三个参数值。
配置项
说明
配置项
说明
endpoint
Hologres实例的VPC地址,该地址可以在Hologres的实例详情 > 网络信息中查看到。
username
填写您创建的AccessKey ID。
password
填写您创建的AccessKey Secret。
CREATE TEMPORARY TABLE randomSource ( date_time DATE, day_time VARCHAR, rmid VARCHAR, rmid_hash VARCHAR, ver VARCHAR, publisher_id VARCHAR, ip VARCHAR, bc VARCHAR, camp VARCHAR, message VARCHAR ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE pv_test ( date_time DATE, day_time VARCHAR, rmid VARCHAR, rmid_hash VARCHAR, ver VARCHAR, publisher_id VARCHAR, ip VARCHAR, bc VARCHAR, camp VARCHAR, message VARCHAR ) WITH ( 'connector' = 'hologres', 'endpoint' = '', 'username' = '', 'password' = '', 'dbname' = 'db4demo', 'tablename' = 'log_pv' ); BEGIN STATEMENT SET; INSERT INTO pv_test SELECT CAST ('2019-04-22' as DATE), '12:00:00', '000991', '87934707160EC0397EBFE739BB0085F7', '0', '6', '175.18.164.59', 'XJIAfnFzxs', 'e1006', 'a' FROM randomSource; INSERT INTO pv_test SELECT CAST ('2019-04-22' as DATE), '12:00:00', '001229', '87934707160EC0397EBFE739BB0085F7', '0', '6', '175.18.164.59', 'XJIAfmhLiq', 'e101', 'c' FROM randomSource; END;
创建并配置Session集群。
在左侧导航栏中,选择运维中心 > Session管理。
在Session集群页面,单击创建Session集群。
在创建Session集群页面,参考如下说明进行配置,然后单击创建Session集群,配置说明如下。
名称:自定义填写集群名称,例如
qpflink
。状态:选择RUNNING。
引擎版本:选择vvr-8.0.9-flink-1.17。
在集群总览页签,当Session集群状态(页面上方集群名称旁边)从启动中变为运行中后,您可以进入后续步骤。
在左侧导航栏中,选择概览 > SQL开发。
在作业开发页面,单击右上角调试。
在调试对话框中,选择您刚创建的Session集群,单击下一步。
在调试对话框中,单击确定。
执行完毕后,在作业开发页面,单击部署。
在部署新版本对话框中,单击确定。
立即启动SQL作业。
在作业开发页面,单击前往运维。
在部署详情维页签,单击启动。
在作业启动面板,单击启动。
返回至HoloWeb页面,单击元数据管理,在public下拉列表中双击log_pv,最后单击数据预览,即可看到作业运行的结果。
说明若未看到数据,请您等待1分钟后刷新页面。
创建MaxCompute
开通云原生大数据计算服务MaxCompute和DataWorks。
说明本步骤操作会同时开通MaxCompute和DataWorks,若已开通相关产品可跳过此步骤。
在开通页面单击立即开通MaxCompute。
本实验实例地域为华东1(杭州),规格选择标准版,DataWorks版本选择基础版,单击创建服务关联角色后勾选服务协议,单击确认订单并支付。
在弹出的资源校验对话框中,可以看到两个产品分别为MaxCompute和DataWorks后订购资源。
购买Quota。
说明若您的阿里云账户在杭州地域已开通按量付费的Quota,请您跳过此步骤。
在左侧导航栏选择配额(Quota)管理,在Quota管理页面单击新购Quota。
在购买页,按如下参数配置后,单击立即购买并开通。
商品类型:按量付费。
地域:本实验以华东1(杭州)为例。
规格类型:标准版。
服务关联角色:创建所需角色。
在Quota管理页面,请您耐心等到1~10分钟,并刷新页面,当页面出现您创建的按量付费Quota时,再进行下一步骤。
创建项目。
在左侧导航栏中,单击项目管理。
在项目管理页面,单击新建项目。
在新建项目对话框中,自定义项目名称,计算资源付费类型选择按量付费,默认Quota选择您创建的Quota,单击确定。
返回如下页面,表示项目创建成功。
安全设置
资源创建完成后,为了保护您阿里云主账号上资源的安全,请您重置云服务器ECS的登录密码和设置安全组端口。
重置云服务器ECS的登录密码。
在云产品资源列表的ECS云服务器区域,单击管理。
在实例详情页签的基本信息区域,单击重置密码。
在重置实例密码对话框中,设置新密码和确认密码,重置密码的方式选择在线重置密码,配置SSH密码登录策略选择开启,单击确认修改。
返回如下结果,表示ECS实例root用户的登录密码重置成功。
安装MaxCompute客户端(odpscmd)
本步骤指导您如何在云服务器ECS上安装MaxCompute客户端odpscmd。
连接ECS并安装Java环境。
在云产品资源列表的ECS云服务器区域,单击远程连接。
在登录实例对话框中,输入用户自定义密码,单击确定。
执行如下命令,安装openjdk及unzip工具。
yum install -y java-1.8.0-openjdk.x86_64 unzip
安装MaxCompute客户端odpscmd。
在Web Terminal操作界面,执行如下命令,下载odpscmd安装包。
wget https://developer-labfileapp.oss-cn-hangzhou.aliyuncs.com/MaxCompute/odpscmd_public.zip
执行如下命令,解压下载的安装文件,得到bin、conf、lib、plugins四个文件夹。
unzip odpscmd_public.zip
执行如下命令,用vim编辑当前conf文件夹中的odps_config.ini文件,对客户端进行配置。
vim conf/odps_config.ini
按
i
键进入编辑模式,参考如下信息修改配置文件。配置说明:
project_name:填写MaxCompute的项目名。
access_id与access_key:填写您创建的AccessKey ID和AccessKey Secret。
end_point:填写
http://service.cn-hangzhou.maxcompute.aliyun.com/api
。
修改完成后编辑按
Esc
,输入:wq
并回车,保存并退出vim配置。
执行如下命令,测试并运行odpscmd。
./bin/odpscmd
返回如下页面,表示配置并运行成功。然后输入
quit;
命令并回车,退出odpscmd。
向MaxCompute中导入数据
准备原始数据。
执行如下命令下载并解压原始数据
wget https://developer-labfileapp.oss-cn-hangzhou.aliyuncs.com/MaxCompute/176.zip unzip 176.zip unzip 176/push_354.zip
执行odpscmd。
执行如下命令,启动odpscmd.
./bin/odpscmd
在odpscmd中执行如下命令,打开MaxCompute 2.0数据类型。
set odps.sql.type.system.odps2=true;
执行如下code语句,创建push_data表。
CREATE TABLE IF NOT EXISTS push_data(raw VARCHAR(2000));
执行如下code语句,进行原始数据导入。
tunnel upload /root/push_354 push_data -fd='\u0001';
执行如下code语句,创建log_push_raw表。
CREATE TABLE IF NOT EXISTS log_push_raw ( DATE_time DATE, hour VARCHAR(10) NOT NULL, rmid VARCHAR(80), rmid_hash VARCHAR(32), ver VARCHAR(10), publisher_id VARCHAR(10), ip VARCHAR(20), bc_c VARCHAR(20), c_bc VARCHAR(20), message VARCHAR(1500) ) PARTITIONED BY (pure_DATE int); ALTER TABLE log_push_raw ADD PARTITION (pure_DATE='20190422');
执行如下code语句,对原始数据进行清洗,导入临时表log_push_raw中。
INSERT OVERWRITE TABLE log_push_raw PARTITION (pure_DATE=20190422) SELECT cast( a[1] as DATE ), nvl(a[2],'N/A'), nvl(a[3],'N/A'), nvl(a[4],'N/A'), nvl(a[5],'N/A'), nvl(a[6],'N/A'), nvl(a[7],'N/A'), nvl(a[8],'N/A'), nvl(a[9],'N/A'), nvl(a[10],'N/A') FROM ( SELECT split(concat(SUBSTRING_INDEX(SUBSTRING_INDEX(REGEXP_REPLACE(raw,'\\s+','/'),"{",1),'/',10),"/",REGEXP_SUBSTR(raw, "\{.*\}")),"/") FROM push_data ) as dt(a);
执行如下code语句,创建用于存储最终清洗后的数据的数据表log_push。
CREATE TABLE IF NOT EXISTS log_push ( DATE_time DATE, hour VARCHAR(10), rmid VARCHAR(10), rmid_hash VARCHAR(10), ver VARCHAR(10), publisher_id VARCHAR(10), ip VARCHAR(10), bc_c VARCHAR(10), c_bc VARCHAR(10), error VARCHAR(10), rmip VARCHAR(10), r_x VARCHAR(10), reqId VARCHAR(10), site_channelid VARCHAR(10), ad_type VARCHAR(10), app_id VARCHAR(10), app_name1 VARCHAR(10), app_name VARCHAR(10), app_ver VARCHAR(10), tag_id VARCHAR(10), cid VARCHAR(10), mdid VARCHAR(10), meid VARCHAR(10), os VARCHAR(10), device_make VARCHAR(10), blTime VARCHAR(10), user_yearsold VARCHAR(10), user_gender VARCHAR(10), device_connectiontype VARCHAR(10), device_type VARCHAR(10), mtype VARCHAR(10), iswifi VARCHAR(10), muid VARCHAR(10), plat VARCHAR(10), rtpd VARCHAR(10), s_cachedata VARCHAR(10), chaoliang1 VARCHAR(10), cmrd VARCHAR(10), device_model VARCHAR(10) ) PARTITIONED BY ( pure_DATE INT ); ALTER TABLE log_push ADD PARTITION (pure_DATE=20190422);
执行如下code语句,插入用户数据。
INSERT INTO TABLE log_push PARTITION (pure_DATE=20190422) SELECT DATE_time,hour,rmid,rmid_hash,ver,publisher_id,ip,bc_c,c_bc, get_json_object(message,'$.err') as error, get_json_object(message,'$.rmip') as rmip, get_json_object(message,'$.r_x') as r_x, get_json_object(message,'$.reqId') as reqId, get_json_object(message,'$.site_channelid') as site_channelid, get_json_object(message,'$.ad_type') as ad_type, get_json_object(message,'$.app_id') as app_id, get_json_object(message,'$.app_name1') as app_name1, get_json_object(message,'$.app_name') as app_name, get_json_object(message,'$.app_ver') as app_ver, get_json_object(message,'$.tag') as tag_id, get_json_object(message,'$.cid') as cid, get_json_object(message,'$.mdid') as mdid, get_json_object(message,'$.meid') as meid, get_json_object(message,'$.os') as os, get_json_object(message,'$.device_make') as device_make, get_json_object(message,'$.blTime') as blTime, get_json_object(message,'$.user_yearsold') as user_yearsold, get_json_object(message,'$.user_gender') as user_gender, get_json_object(message,'$.device_connectiontype') as device_connectiontype, get_json_object(message,'$.device_type') as device_type, get_json_object(message,'$.mtype') as mtype, get_json_object(message,'$.iswifi') as iswifi, get_json_object(message,'$.muid') as muid, get_json_object(message,'$.plat') as plat, get_json_object(message,'$.rtpd') as rtpd, get_json_object(message,'$.s_cachedata') as s_cachedata, get_json_object(message,'$.chaoliang1') as chaoliang1, get_json_object(message,'$.cmrd') as cmrd, get_json_object(message,'$.device_model') as device_model FROM log_push_raw WHERE pure_DATE=20190422;
将log_push数据聚合在Hologres中
返回至HoloWeb页签。在顶部菜单栏中,选择MaxCompute加速 > 创建外部表。
在新建外部表页面,参考如下说明进行配置,然后单击提交。
选择加速方式:选择单表加速。
MaxCompure源表
项目名:选择您创建的MaxCompure的项目名。
表名:输入log_push。
Hologres目标表
Schema Name:public。
表名:输入push_history。
查询Flink和MaxCompute数据推送匹配效果。
在HoloWeb页面顶部菜单栏,单击SQL编辑器。
在临时Query查询页签,输入以下code语句,然后单击运行,系统会收集外表的统计信息。
ANALYZE public.push_history;
在临时Query查询页签,输入以下code语句,然后单击运行。您就可以查询到Flink在实时收集曝光数据的情况下,存在于MaxCompute中的历史推送数据进行匹配,查看推送的效果了。
SELECT a.rmid ,COUNT(*) ,b.os FROM log_pv AS a JOIN push_history AS b ON a.rmid = b.rmid GROUP BY a.rmid ,b.os;
(可选)创建可视化大屏项目
本实验不提供DatatV数据可视化产品,如需使用,请使用您的阿里云账号自行购买该产品或体验DataV产品试用机会。
登录DataV控制台。
根据页面指引,选择您需要开通的版本。
重要如果您的阿里云账号有DataV试用版本资格,请您先领取试用版本。
如果您的阿里云账号没有DataV试用版本资格,请您开通公共云版本或本地部署版本。其中,公共云版本中的个人版不支持Hologres数据源接入,请勿开通此版本。公共云版本或本地部署版本的费用问题,详情请参见计费方式。
若您已开通DataV,请您跳过此步骤。
创建Hologres数据源。
将数据所在的Hologres实例和数据库创建为DataV的数据源,详情请参见DataV。
在左侧导航栏中,选择数据准备 > 数据源。
在数据源页面,单击新建数据源。
在添加数据源面板中,根据如下说明配置参数信息新增Hologres数据源,并单击确定。
参数
说明
参数
说明
类型
实时数仓Hologres
网络
内网
华东2
VPC ID
登录Hologres控制台,在实例列表页面,单击目标实例ID,在实例详情页面的网络信息区域,获取指定VPC的VPC ID。
VPC实例ID
登录Hologres控制台,在实例列表页面,单击目标实例ID,在实例详情页面的网络信息区域,获取指定VPC的VPC实例ID。
名称
自定义名称。
域名
登录Hologres控制台,在实例列表页面,单击目标实例ID,在实例详情页面的网络信息区域,获取指定VPC的VPC域名,填写时需要去掉
:80
。用户名
输入创建的AccessKey ID。
密码
输入创建的AccessKey Secret。
端口
登录Hologres控制台,在实例列表页面,单击目标实例ID,在实例详情页面的网络信息区域,获取指定VPC的VPC域名中获取端⼝。例如:80。
数据库
输入上述已创建的Hologres数据库名称,本实验以
holo_tutorial
为例。
创建DataV可视页面并展示。
在左侧导航栏,单击未分组。
在未分组页面,单击创建PC端看板。
选择空白画板,单击创建项目。
在创建数据大屏对话框中,输入数据大屏名称,单击创建。
单击全部资产>图标>柱形图。
单击选中画布中的基本柱状图组件,然后单击右侧数据图标,最后单击配置数据源。
设置数据源。
数据源类型:选择数据库。
选择已有数据源:下拉选择Hologres数据库。
SOL:将以下code语句拷贝到SQL语句框中。
SELECT a.rmid ,COUNT(*) ,b.os FROM log_pv AS a JOIN push_history AS b ON a.rmid = b.rmid GROUP BY a.rmid ,b.os;
关闭设置数据源页面,填写字段映射信息,将x和y分别映射为os和count字段。
单击配置图标,然后单击柱子颜色,关闭视觉映射。
配置完成后,当实时广告的曝光数据命中了推送时,您就可以看到所需要的一些可视化的数据,从而帮助您进行商业决策。
清理及后续
在完成实验后,如果无需继续使用资源,请根据以下步骤,先删除相关资源后,再结束实操,否则资源会持续运行产生费用。
删除MaxCompute项目。
在左侧导航栏中,单击项目管理。
在项目管理页面,找到您创建的项目,单击其右侧操作列下的新建项目。
删除阿里云AccessKey。
前往AccessKey管理。
在不建议使用云账号AccessKey对话框,阅读创建主账号AccessKey的风险,如果必须要创建主账号AccessKey,则勾选我确认知晓云账号AccessKey安全风险,然后单击继续使用云账号AccessKey。
在AccessKey页面,找到目标AccessKey,单击右侧操作列下的禁用。
在禁用对话框中,单击禁用。
在AccessKey页面,找到目标AccessKey,单击右侧操作列下的删除。
在删除对话框中,输入当前AccessKey的ID,单击删除。
删除DataV数据源。
登录DataV控制台。
在左侧导航栏中,选择数据准备 > 数据源。
在数据源页面,找到创建的数据源并删除。
删除相关资源后,单击结束实操。在结束实操对话框中,单击确定。
在完成实验后,如果需要继续使用资源,您可跳过释放相关资源的操作,直接单击结束实操。在结束实操对话框中,单击确定。请随时关注账户扣费情况,避免发生欠费。
【实践】基于Hologres轻松玩转一站式实时仓库
• 进入实操前,请确保您已通过 实名认证 且 账户余额 充足
• 资源开通在个人账号,默认时长480分钟
• 云资源产生的费用需您自行承担,云起实践平台不会向您征收额外费用
• 实操结束后,您可以选择继续付费保留资源,或参考手册自动/手动释放资源