X-Pack Spark对接阿里云日志服务LogHub

简介: 概述 X-Pack Spark分析引擎是基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的多种数据源,例如:云HBase数据库、MongoDB、Phoenix等,同时也支持对接阿里云日志服务LogHub。

概述

X-Pack Spark分析引擎是基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的多种数据源,例如:云HBase数据库、MongoDB、Phoenix等,同时也支持对接阿里云日志服务LogHub。阿里云日志服务(Log Service,简称LOG)是针对实时日志数据的一站式服务,提供日志类数据采集、消费、投递及查询分析功能,全面提升海量日志处理和分析能力。

场景介绍

某一款销售平台的APP,针对用户在APP中打开首页、搜索、商品详细页以及最终下订单购买商品等操作,操作所产生的事件均记录到阿里云日志系统中。现需要对APP的用户的行为数据做一些统计分析,每天、每周出详细的运营数据、以及给用户提供在线查询账单等。

如何实现

通过阿里云的日志服务+X-Pack Spark+云HBase完成这些诉求。先看下整理的数据流图:

_


由上图可见数据流程为:用户通过LogHub对接APP的日志数->Spark Streming 对接LogHub同步数到HBase(Phoenix)->在线数据同步到Spark离线数仓->离线数仓批量计算输出运营数据等。
APP日中包含用户的使用APP所产生的事件信息,下面以一个简单的例子说明下每一个步骤的实现。

LogHub对接APP日志

假设APP的日志产生在某机器的目录文件中,通过LogHub可以对接机器的文件,读取解析日志。假设日志的字段信息如下:

event_time: long #事件产生的时间戳
user_id: string #用户ID,唯一值。
device_id: String #设备id,APP使用的设备。
event_name: String #事件名称,如:首页、搜索、明细页、购买
prod_id: String #商品ID。
stay_times: int #停留时间。

上述信息在APP的日志中使用逗号分隔符,所以在LogHub配置指定采集模式时选择逗号分隔。

SparkStreaming 对接APP

SparkStreaming 对接APP可以使用X-Pack Connectors中对接LogHub的插件。SparkStreaming对接LogHub可以设置每个1分钟同步一次数据到Phoenix。
同步数据之前需要在Phoenix中创建一张表,如下:

CREATE TABLE IF NOT EXISTS user_event (
   event_time BIGINT NOT NULL,
   user_id VARCHAR NOT NULL,
   device_id VARCHAR,
   event_name VARCHAR,
   prod_id VARCHAR
   CONSTRAINT my_pk PRIMARY KEY (event_time, user_id)
  );

Phoenix表user_event使用user_event和user_id作为组合主键,主要是为了使用user_id进行运营明细查询,时间信息方便按照时间范围同步数据到Spark。
SparkStreaming同步LogHub数据到Phoenix的代码主要逻辑如下:

val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        numReceiver,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD { rdd =>
        rdd.foreachPartition { pt =>
          // 获取Phoenix的链接
          val phoenixConn = DriverManager.getConnection("jdbc:phoenix:" + zkAddress)
          val statment = phoenixConn.createStatement()
          var i = 0
          while (pt.hasNext) {
            val value = pt.next()
            //获取的LogHub的数据是json格式的,需要进行转换
            val valueFormatted = JSON.parseObject(new String(value))
            //构造phonenix 插入语句
            val insetSql = s"upsert into $phoenixTableName values(" +
              s"${valueFormatted.getLong("event_time")}," +
              s"'${valueFormatted.getString("user_id").trim}'," +
              s"'${valueFormatted.getString("device_id").trim}'," +
              s"'${valueFormatted.getString("event_name").trim}'," +
              s"'${valueFormatted.getString("prod_id").trim}')"
            statment.execute(insetSql)
            i = i + 1
            // 每隔batchSize行提交一次commit到Phoenix。
            if (i % batchSize == 0) {
              phoenixConn.commit()
              println(s"====finish upsert $i rows====")
            }
          }
          phoenixConn.commit()
          println(s"==last==finish upsert $i rows====")
          phoenixConn.close()
          }
      }

SparkStreaming同步数据到Phoenix后,可以对Phoenix数据库进行用户明细查询。例如:

# 查询用户user_id_1006所有浏览明细。
select * from user_event where user_id = 'user_id_1006';

同步到Spark离线数仓

Phoenix在线数据库适合明细查询,如果需要进行统计、离线计算需要用到Spark数仓。Phoenix同步数据到Spark数仓实质就是在Spark上创建表,然后把数据同步一份到Spark表中。
本文用Sql表示下同步的逻辑,这里假设数据每天同步一次到Spark 。
Spark 中建表、同步的方法如下:

#在Spark中创建Parquet格式表:user_event_parquet,使用dt作为分区字段。
create table user_event_parquet(
    event_time long,
    user_id string,
    device_id string,
    event_name string,
    prod_id string, 
    dt string
) using parquet
partitioned by(dt);

#  在Spark中创建表user_event_phoenix映射Phoenix数据库的表。
CREATE TABLE user_event_phoenix USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' 'hb-xx-master3-001.hbase.rds.aliyuncs.com:2181,hb-xx-master1-001.hbase.rds.aliyuncs.com:2181,hb-xx-master2-001.hbase.rds.aliyuncs.com:2181',
'table' 'user_event'
);

# 向Parquet表:user_event_parquet插入一天:2019-01-01的数据
insert into user_event_parquet select EVENT_TIME,USER_ID,DEVICE_ID,EVENT_NAME,PROD_ID,'2019-01-01' from user_event_phoenix where EVENT_TIME >=1546272000 and EVENT_TIME < 1546358400

离线数仓批量计算

数据同步到Spark可以对Spark数据做统计分析预算,例如:

#统计每天的访问数
select dt, count(*) from user_event_parquet group by dt
#统计前十的访问
select dt, count(*) total from user_event_parquet group by dt order by total desc limit 10
#统计前100个用户的访问数
select dt,user_id, count(*) total  from user_event_parquet group by dt,user_id order by total desc limit 100

计算的结果可以回写到业务数据库,供业务查询、出报表等。

小结

本文简单介绍了Spark如何对接LogHub以及如何同步数据等常用的操作。参考链接如下:

相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
目录
相关文章
|
11月前
|
分布式计算 运维 搜索推荐
立马耀:通过阿里云 Serverless Spark 和 Milvus 构建高效向量检索系统,驱动个性化推荐业务
蝉妈妈旗下蝉选通过迁移到阿里云 Serverless Spark 及 Milvus,解决传统架构性能瓶颈与运维复杂性问题。新方案实现离线任务耗时减少40%、失败率降80%,Milvus 向量检索成本降低75%,支持更大规模数据处理,查询响应提速。
572 57
|
9月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
305 4
|
9月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
651 1
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
952 58
|
10月前
|
自然语言处理 监控 安全
阿里云发布可观测MCP!支持自然语言查询和分析多模态日志
阿里云可观测官方发布了Observable MCP Server,提供了一系列访问阿里云可观测各产品的工具能力,包含阿里云日志服务SLS、阿里云应用实时监控服务ARMS等,支持用户通过自然语言形式查询
1444 0
阿里云发布可观测MCP!支持自然语言查询和分析多模态日志
|
12月前
|
存储 消息中间件 缓存
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
基于阿里云SelectDB,MiniMax构建了覆盖国内及海外业务的日志可观测中台,总体数据规模超过数PB,日均新增日志写入量达数百TB。系统在P95分位查询场景下的响应时间小于3秒,峰值时刻实现了超过10GB/s的读写吞吐。通过存算分离、高压缩比算法和单副本热缓存等技术手段,MiniMax在优化性能的同时显著降低了建设成本,计算资源用量降低40%,热数据存储用量降低50%,为未来业务的高速发展和技术演进奠定了坚实基础。
541 1
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
|
域名解析 应用服务中间件 网络安全
阿里云个人博客外网访问中断应急指南:从安全组到日志的七步排查法
1. 检查安全组配置:确认阿里云安全组已开放HTTP/HTTPS端口,添加规则允许目标端口(如80/443),授权对象设为`0.0.0.0/0`。 2. 本地防火墙设置:确保服务器防火墙未阻止外部流量,Windows启用入站规则,Linux检查iptables或临时关闭防火墙测试。 3. 验证Web服务状态:检查Apache/Nginx/IIS是否运行并监听所有IP,使用命令行工具确认监听状态。 4. 测试网络连通性:使用外部工具和内网工具测试服务器端口是否开放,排除本地可访问但外网不可的问题。 5. 排查DNS解析:确认域名A记录指向正确公网IP,使用`ping/nslookup`验证解析正
460 2
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
612 15