X-Pack Spark对接阿里云日志服务LogHub

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 概述 X-Pack Spark分析引擎是基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的多种数据源,例如:云HBase数据库、MongoDB、Phoenix等,同时也支持对接阿里云日志服务LogHub。

概述

X-Pack Spark分析引擎是基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的多种数据源,例如:云HBase数据库、MongoDB、Phoenix等,同时也支持对接阿里云日志服务LogHub。阿里云日志服务(Log Service,简称LOG)是针对实时日志数据的一站式服务,提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志处理和分析能力。

场景介绍

某一款销售平台的APP,针对用户在APP中打开首页、搜索、商品详细页以及最终下订单购买商品等操作,操作所产生的事件均记录到阿里云日志系统中。现需要对APP的用户的行为数据做一些统计分析,每天、每周出详细的运营数据、以及给用户提供在线查询账单等。

如何实现

通过阿里云的日志服务+X-Pack Spark+云HBase完成这些诉求。先看下整理的数据流图:

_


由上图可见数据流程为:用户通过LogHub对接APP的日志数->Spark Streming 对接LogHub同步数到HBase(Phoenix)->在线数据同步到Spark离线数仓->离线数仓批量计算输出运营数据等。
APP日中包含用户的使用APP所产生的事件信息,下面以一个简单的例子说明下每一个步骤的实现。

LogHub对接APP日志

假设APP的日志产生在某机器的目录文件中,通过LogHub可以对接机器的文件,读取解析日志。假设日志的字段信息如下:

event_time: long #事件产生的时间戳
user_id: string #用户ID,唯一值。
device_id: String #设备id,APP使用的设备。
event_name: String #事件名称,如:首页、搜索、明细页、购买
prod_id: String #商品ID。
stay_times: int #停留时间。
AI 代码解读

上述信息在APP的日志中使用逗号分隔符,所以在LogHub配置指定采集模式时选择逗号分隔。

SparkStreaming 对接APP

SparkStreaming 对接APP可以使用X-Pack Connectors中对接LogHub的插件。SparkStreaming对接LogHub可以设置每个1分钟同步一次数据到Phoenix。
同步数据之前需要在Phoenix中创建一张表,如下:

CREATE TABLE IF NOT EXISTS user_event (
   event_time BIGINT NOT NULL,
   user_id VARCHAR NOT NULL,
   device_id VARCHAR,
   event_name VARCHAR,
   prod_id VARCHAR
   CONSTRAINT my_pk PRIMARY KEY (event_time, user_id)
  );
AI 代码解读

Phoenix表user_event使用user_event和user_id作为组合主键,主要是为了使用user_id进行运营明细查询,时间信息方便按照时间范围同步数据到Spark。
SparkStreaming同步LogHub数据到Phoenix的代码主要逻辑如下:

val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        numReceiver,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD { rdd =>
        rdd.foreachPartition { pt =>
          // 获取Phoenix的链接
          val phoenixConn = DriverManager.getConnection("jdbc:phoenix:" + zkAddress)
          val statment = phoenixConn.createStatement()
          var i = 0
          while (pt.hasNext) {
            val value = pt.next()
            //获取的LogHub的数据是json格式的,需要进行转换
            val valueFormatted = JSON.parseObject(new String(value))
            //构造phonenix 插入语句
            val insetSql = s"upsert into $phoenixTableName values(" +
              s"${valueFormatted.getLong("event_time")}," +
              s"'${valueFormatted.getString("user_id").trim}'," +
              s"'${valueFormatted.getString("device_id").trim}'," +
              s"'${valueFormatted.getString("event_name").trim}'," +
              s"'${valueFormatted.getString("prod_id").trim}')"
            statment.execute(insetSql)
            i = i + 1
            // 每隔batchSize行提交一次commit到Phoenix。
            if (i % batchSize == 0) {
              phoenixConn.commit()
              println(s"====finish upsert $i rows====")
            }
          }
          phoenixConn.commit()
          println(s"==last==finish upsert $i rows====")
          phoenixConn.close()
          }
      }
AI 代码解读

SparkStreaming同步数据到Phoenix后,可以对Phoenix数据库进行用户明细查询。例如:

# 查询用户user_id_1006所有浏览明细。
select * from user_event where user_id = 'user_id_1006';
AI 代码解读

同步到Spark离线数仓

Phoenix在线数据库适合明细查询,如果需要进行统计、离线计算需要用到Spark数仓。Phoenix同步数据到Spark数仓实质就是在Spark上创建表,然后把数据同步一份到Spark表中。
本文用Sql表示下同步的逻辑,这里假设数据每天同步一次到Spark 。
Spark 中建表、同步的方法如下:

#在Spark中创建Parquet格式表:user_event_parquet,使用dt作为分区字段。
create table user_event_parquet(
    event_time long,
    user_id string,
    device_id string,
    event_name string,
    prod_id string, 
    dt string
) using parquet
partitioned by(dt);

#  在Spark中创建表user_event_phoenix映射Phoenix数据库的表。
CREATE TABLE user_event_phoenix USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' 'hb-xx-master3-001.hbase.rds.aliyuncs.com:2181,hb-xx-master1-001.hbase.rds.aliyuncs.com:2181,hb-xx-master2-001.hbase.rds.aliyuncs.com:2181',
'table' 'user_event'
);

# 向Parquet表:user_event_parquet插入一天:2019-01-01的数据
insert into user_event_parquet select EVENT_TIME,USER_ID,DEVICE_ID,EVENT_NAME,PROD_ID,'2019-01-01' from user_event_phoenix where EVENT_TIME >=1546272000 and EVENT_TIME < 1546358400
AI 代码解读

离线数仓批量计算

数据同步到Spark可以对Spark数据做统计分析预算,例如:

#统计每天的访问数
select dt, count(*) from user_event_parquet group by dt
#统计前十的访问
select dt, count(*) total from user_event_parquet group by dt order by total desc limit 10
#统计前100个用户的访问数
select dt,user_id, count(*) total  from user_event_parquet group by dt,user_id order by total desc limit 100
AI 代码解读

计算的结果可以回写到业务数据库,供业务查询、出报表等。

小结

本文简单介绍了Spark如何对接LogHub以及如何同步数据等常用的操作。参考链接如下:

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
打赏
0
0
0
0
1705
分享
相关文章
网络安全视角:从地域到账号的阿里云日志审计实践
日志审计的必要性在于其能够帮助企业和组织落实法律要求,打破信息孤岛和应对安全威胁。选择 SLS 下日志审计应用,一方面是选择国家网络安全专用认证的日志分析产品,另一方面可以快速帮助大型公司统一管理多组地域、多个账号的日志数据。除了在日志服务中存储、查看和分析日志外,还可通过报表分析和告警配置,主动发现潜在的安全威胁,增强云上资产安全。
170 13
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
153 15
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
网络安全视角:从地域到账号的阿里云日志审计实践
网络安全视角:从地域到账号的阿里云日志审计实践
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
网络安全视角:从地域到账号的阿里云日志审计实践
日志审计的必要性在于其能够帮助企业和组织落实法律要求,打破信息孤岛和应对安全威胁。选择 SLS 下日志审计应用,一方面是选择国家网络安全专用认证的日志分析产品,另一方面可以快速帮助大型公司统一管理多组地域、多个账号的日志数据。除了在日志服务中存储、查看和分析日志外,还可通过报表分析和告警配置,主动发现潜在的安全威胁,增强云上资产安全。
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
198 9
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
阿里云日志服务的傻瓜式极易预测模型
预测服务有助于提前规划,减少资源消耗和成本。阿里云日志服务的AI预测服务简化了数学建模,仅需SQL操作即可预测未来指标,具备高准确性,并能处理远期预测。此外,通过ScheduledSQL功能,可将预测任务自动化,定时执行并保存结果。
138 3
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?