3. 加工数据
您本步骤将指导您如何通过DataWorks计算和分析已采集的数据。
- 新建三张数据表,分别为数据运营层表(ods_log_info_d)、数据仓库层表(dw_user_info_all_d)和数据产品层表(rpt_user_info_d)。
1.1 在临时查询页面的左侧导航中,单击数据开发。
1.2 在数据开发页面,选择业务流程>MaxCompute,右键单击表,单击新建表。
1.3 在新建表对话框中,表名输入为ods_log_info_d,单击新建。
1.4 在表ods_log_info_d的编辑页面上方,单击DDL。
1.5 在DDL模式对话框中,输入如下创建数据运营层表的建表语句,单击生成表结构。
CREATE TABLE IF NOT EXISTS ods_log_info_d ( ip STRING COMMENT 'ip', uid STRING COMMENT 'uid', time STRING COMMENT 'timeyyyymmddhh:mi:ss', status STRING COMMENT 'status', bytes STRING COMMENT 'bytes', region STRING COMMENT 'region', method STRING COMMENT 'method', url STRING COMMENT 'url', protocol STRING COMMENT 'protocol', referer STRING COMMENT 'referer', device STRING COMMENT 'device', identity STRING COMMENT 'identity' ) PARTITIONED BY ( dt STRING );
1.6 在确认操作对话框中,单击确认。
1.7 在表ods_log_info_d的编辑页面,中文名输入为数据运营层表,单击提交到生产环境。
1.8 在提交到生产环境对话框中,单击确认。
1.9 重复1.2~1.8的步骤,根据如下两个建表语句,新建dw_user_info_all_d表和rpt_user_info_d表,中文名分别输入为数据仓库层表和数据产品层表,然后单击提交到生产环境。
CREATE TABLE IF NOT EXISTS dw_user_info_all_d ( uid STRING COMMENT 'uid', gender STRING COMMENT 'gender', age_range STRING COMMENT 'age_range', zodiac STRING COMMENT 'zodiac', region STRING COMMENT 'region', device STRING COMMENT 'device', identity STRING COMMENT 'identity', method STRING COMMENT 'method', url STRING COMMENT 'url', referer STRING COMMENT 'referer', time STRING COMMENT 'timeyyyymmddhh:mi:ss' ) PARTITIONED BY ( dt STRING );
CREATE TABLE IF NOT EXISTS rpt_user_info_d ( uid STRING COMMENT 'uid', region STRING COMMENT 'uid', device STRING COMMENT 'device', pv BIGINT COMMENT 'pv', gender STRING COMMENT 'gender', age_range STRING COMMENT 'age_range', zodiac STRING COMMENT 'zodiac' ) PARTITIONED BY ( dt STRING );
- 设计业务流程。
2.1 在数据开发页面左侧,双击您的业务流程。
2.2 在业务流程开发面板,单击ODPS SQL并拖拽至右侧的编辑页面。
2.3 在新建节点对话框中,节点名称输入为ods_log_info_d,单击提交。
2.4 在业务流程开发面板,单击ODPS SQL并拖拽至右侧的编辑页面。
2.5 在新建节点对话框中,节点名称输入为dw_user_info_all_d,单击提交。
2.6 在业务流程开发面板,单击ODPS SQL并拖拽至右侧的编辑页面。
2.7 在新建节点对话框中,节点名称输入为rpt_user_info_d,单击提交。
2.8 在右侧的编辑页面,通过拖拽连线,配置如下图所示的依赖关系。
- 创建用户自定义函数。
3.1 复制下方地址,在Chromium浏览器打开新页签,粘贴并访问,下载ip2region.jar。
https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/85298/cn_zh/1532163718650/ip2region.jar?spm=a2c4g.11186623.0.0.43df4d0dwSRLzd&file=ip2region.jar
3.2 切换回数据开发页面,选择业务流程>您的业务流程>MaxCompute,右键单击资源,选择新建资源>JAR。
3.3 在新建资源对话框中,单击点击上传。
3.4 在打开文件面板中,单击下载目录,选择ip2region.jar,单击选择。
3.5 在新建资源对话框中,单击新建。
3.6 在上方工具栏中,单击图标提交。
3.7 在提交新版本对话框中,单击提交。
3.8 在数据开发页面,选择业务流程>您的业务流程>MaxCompute,右键单击函数,单击新建函数。
3.9 在新建函数对话框中,函数名称输入为getregion,单击新建。
3.10 在注册函数页签,配置如下参数,其他配置保持默认,单击图标保存。
参数说明:
- 类名:输入org.alidata.odps.udf.Ip2Region。
- 资源列表:输入ip2region.jar。
- 描述:输入IP地址转换地域。
- 命令格式:输入getregion(‘ip’)。
- 参数说明:输入IP地址。
3.11在注册函数页签,单击图标提交。
3.12 在提交新版本对话框中,单击确定。
- 配置ODPS SQL节点。
4.1 在数据开发页面,选择业务流程>您的业务流程>MaxCompute>数据开发,双击ods_log_info_d。
4.2 在ods_log_info_d节点编辑页面,输入如下SQL语句,单击图标保存。
INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bdp.system.bizdate}) SELECT ip , uid , time , status , bytes , getregion(ip) AS region --使用自定义UDF通过IP得到地域。 , regexp_substr(request, '(^[^ ]+ )') AS method --通过正则把request差分为3个字段。 , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url , regexp_substr(request, '([^ ]+$)') AS protocol , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清晰refer,得到更精准的URL。 , CASE WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。 WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone' WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad' WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device , CASE WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN TOLOWER(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed' WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^[Mozilla|Opera]' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip , SPLIT(col, '##@@')[1] AS uid , SPLIT(col, '##@@')[2] AS time , SPLIT(col, '##@@')[3] AS request , SPLIT(col, '##@@')[4] AS status , SPLIT(col, '##@@')[5] AS bytes , SPLIT(col, '##@@')[6] AS referer , SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d WHERE dt = ${bdp.system.bizdate} ) a;
4.3 在数据开发页面,选择业务流程>您的业务流程>MaxCompute>数据开发,双击dw_user_info_all_d。
4.4 在dw_user_info_all_d节点的编辑页面,输入如下SQL语句,单击图标保存。
INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bdp.system.bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid , b.gender , b.age_range , b.zodiac , a.region , a.device , a.identity , a.method , a.url , a.referer , a.time FROM ( SELECT * FROM ods_log_info_d WHERE dt = ${bdp.system.bizdate} ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d WHERE dt = ${bdp.system.bizdate} ) b ON a.uid = b.uid;
4.5 在数据开发页面,选择业务流程>您的业务流程>MaxCompute>数据开发,双击rpt_user_info_d。
4.6 在rpt_user_info_d节点的编辑页面,输入如下SQL语句,单击图标保存。
INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bdp.system.bizdate}') SELECT uid , MAX(region) , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dw_user_info_all_d WHERE dt = ${bdp.system.bizdate} GROUP BY uid;
- 提交业务流程。
5.1 在数据开发页面左侧,双击您的业务流程。
5.2 在业务流程的编辑页面上方菜单栏中,单击提交图标,提交业务流程中已配置完成的节点。
5.3 在提交对话框中,选择所有的节点,备注输入提交业务流程,选择忽略输入输出不一致的告警,单击提交。
返回如下页面, 等待所有节点提交成功,单击图标关闭即可。
- 运行业务流程。
6.1 在业务流程的编辑页面上方菜单栏中,单击图标运行。
返回如下页面,等待3~5分钟,所有节点显示为后,表示任务运行完成。
6.2 在左侧导航栏中,单击临时查询。
6.3 在临时查询面板,右键单击临时查询,选择新建节点>ODPS SQL。
6.4 在新建节点对话框中,单击提交。
6.5 在SQL查询页签,输入如下SQL语句,单击图标运行,查看rpt_user_info_d数据情况,确认数据产出。
说明 :SQL语句中字段dt中的${bdp.system.bizdate}表示业务日期。例如,任务运行的日期为20180717,则业务日期为20180716,即任务运行日期的前一天。
select * from rpt_user_info_d where dt=${bdp.system.bizdate} limit 10;
6.6 在参数对话框中,单击确定。
6.7 在MaxCompute计算成本估计对话框中,单击运行。
返回如下结果,表示数据已产出。
- 在生产环境运行任务。
7.1在页面上方,单击运维中心。
7.2 在运维中心页面,选择周期任务运维>周期任务。
7.3 在周期任务页面,单击workshop_start业务流程。
7.4 在DAG图中,右键单击workshop_start节点,选择补数据>当前节点及下游节点。
7.5 在补数据对话框中,选择所有任务,单击确定。
7.6 在确认框对话框中,单击确认。
返回如下页面,单击刷新,等待3~5分钟,直至所有任务运行成功即可。