广告点击数实时统计:Spark StructuredStreaming + Redis Streams

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 实时统计广告点击数:Spark StructuredStreaming + Redis Stream业务场景介绍某广告公司在网页上投递动态图片广告,广告的展现形式是根据热点图片动态生成的。为了收入的最大化,需要统计每个广告的点击数来决定哪些广告可以投放的更长时间,哪些需要及时更换。

实时统计广告点击数:Spark StructuredStreaming + Redis Stream

业务场景介绍

某广告公司在网页上投递动态图片广告,广告的展现形式是根据热点图片动态生成的。为了收入的最大化,需要统计每个广告的点击数来决定哪些广告可以投放的更长时间,哪些需要及时更换。大部分的广告生命周期很短,实时获取广告的点击数可以让我们快速确定哪些广告对业务是关键的。所以我们理想的解决方案是有流处理数据的能力,可以统计所有广告的点击量以及统计实时的点击量。

业务数据流

来看下我们业务数据链路。
_1

广告点击数据通过手机或者电脑的网页传递到“数据提取”,提取后的数据经过“数据处理”计算实时的点击数,最后存储到数据库,使用“数据查询”用于统计分析,统计每个广告的点击总数。
根据我们的数据特点,整个数据链路的数据输入输出如下:

输入

针对每个点击事件我们使用asset id以及cost 两个字段来表示一个广告信息,例如:

asset [asset id] cost [actual cost]
asset aksh1hf98qwdst9q7 cost 39
asset aksh1hf98qwdst9q8 cost 19

输出

经过上图中步骤2:数据处理后,我们把结果集存储到一个数据表中,数据表可以用于上图步骤3使用Sql查询,例如:

select asset, count from clicks order by count desc

asset            count
-----------------     -----
aksh1hf98qwdst9q7    2392
aksh1hf98qwdst9q8    2010
aksh1hf98qwdst9q6    1938

解决方案

基于以上诉求选择StructuredStreaming + Redis Stream作为解决方案。先介绍下方案中涉及到的组件。

  • Spark StructuredStreaming是Spark在2.0后推出的基于Spark SQL上的一种实时处理流数据的框架。处理时延可达毫秒级别。
  • Redis Stream是在Redis 5.0后引入的一种新的数据结构,可高速收集、存储和分布式处理数据,处理时延可达亚毫秒级别。
  • Spark-Redis 连接器提供了Spark对接Redis的桥梁。通过Spark-Redis 连接器, StructuredStreaming可以使用Redis Stream作为数据源,经过Spark处理后数据再写回Redis。

数据处理流

现在让我们看下如何使用StructuredStreaming + Redis Stream
_1
通过上图可以看到点击数据首先存储到Redis Stream,然后通过StructuredStreaming消费数据、处理聚合数据,再把处理的结果入库到Redis,最后通过Spark Sql查询Redis进行统计分析。下面分别看下每个步骤:

数据提取:

Redis Stream是Redis内置的数据结构,具备每秒百万级别的读写能力,另外存储的数据可以根据时间自动排序。Spark-Redis连接器支持使用Redis Stream作为数据源,非常适用这个场景,把Redis Stream数据对接到Spark 引擎。

数据处理:

Spark的StructuredStreaming 非常适合此场景的数据处理部分,Spark-Redis连接器可以获取Redis Stream的数据转换成Spark的DataFrames。在StructuredStreaming处理流数据的过程中,可以对微批次数据或者整体数据进行查询。数据的处理结果可以通过自定义的“writer”输出到不同的目的地,本场景中我们直接把数据输出到Redis的Hash数据结构。

数据查询:

Spark-Redis连接器可以把Redis的数据结构映射成Spark的DataFrames,然后我们把DataFrames创建成一个临时表,表的字段映射Redis的Hash数据结构。借助Redis的亚毫米级的延迟,使用Spark-SQL进行实时的数据查询。

开发步骤

通过下面实例介绍下开发的步骤

Redis Stream存储数据

Redis Streams 是一个append-only的数据结构。部署Redis Streams后使用redis-cli向Redis发送数据。
redis-cli使用方法可参考redis-cli连接。下面的命令是Redis向Stream clicks发送数据。

XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29

_2

数据处理

在StructuredStreaming中把数据处理步骤分成3个子步骤。
a. 从Redis Stream读取、处理数据。
b. 存储数据到Redis。
c. 运行StructuredStreaming程序。
_3

从Redis Stream读取、处理数据

在Spark中读取Redis Stream数据需要确定如何去连接Redis,以及Redis Stream的schema信息。这里使用Spark-Redis连接器,需要创建一个SparkSession并带上Redis的连接信息。

val spark = SparkSession
      .builder()
      .appName("StructuredStreaming on Redis")
      .config("spark.redis.host", redisHost)
      .config("spark.redis.port", redisPort)
      .config("spark.redis.auth", redisPassword)
      .getOrCreate()

在Spark中构建schema,我们给流数据命名为“clicks”,并且需要设置参数“stream.kes”的值为“clicks”。由于Redis Stream中的数据包含两个字段:“asset”和“cost”,所以我们要创建StructType映射这两个字段。

val clicks = spark
      .readStream
      .format("redis")
      .option("stream.keys", redisTableName)
      .schema(StructType(Array(
        StructField("asset", StringType),
        StructField("cost", LongType)
      )))
      .load()

在这里统计下每个asset的点击次数,可以创建一个DataFrames根据asset汇聚数据。

val bypass = clicks.groupBy("asset").count()

最后一个步骤启动StructuredStreaming。

val query = bypass
      .writeStream
      .outputMode("update")
      .foreach(clickWriter)
      .start()
存储数据到Redis

我们通过自定义的ClickForeachWriter向Redis写数据。ClickForeachWriter继承自FroeachWriter,使用Redis的Java客户端Jedis连接到Redis。

class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] {

  var jedis: Jedis = _

  def connect() = {
    val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt)
    shardInfo.setPassword(redisPassword)
    jedis = new Jedis(shardInfo)
  }

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(value: Row): Unit = {

    val asset = value.getString(0)
    val count = value.getLong(1)
    if (jedis == null) {
      connect()
    }

    jedis.hset("click:" + asset, "asset", asset)
    jedis.hset("click:" + asset, "count", count.toString)
    jedis.expire("click:" + asset, 300)

  }

  override def close(errorOrNull: Throwable): Unit = {}
}
运行StructuredStreaming程序。

程序完成打包后,可以通过Spark控制台提交任务,运行Spark StructuredStreaming任务。

--class com.aliyun.spark.redis.StructuredStremingWithRedisStream
--jars /spark_on_redis/ali-spark-redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar
--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
/spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar
xxx1 6379 xxx2 clicks

参数说明:

  • xxx1: Redis的内网连接地址(host)。
  • 6379:Redis的端口号(port)。
  • xxx2: Redis的登陆密码。
  • clicks: Redis的Stream名称

数据查询

数据查询使用Spark-SQL创建表读取Redis Hash数据库。这里使用Spark控制台的“交互式查询”,输入如下语句:

CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT) 
USING org.apache.spark.sql.redis 
OPTIONS (
'host' 'xxx1',
'port' '6379',
'auth' 'xxx2',
'table' 'click'
)

参数说明:

  • xxx1: Redis的内网连接地址(host)。
  • 6379:Redis的端口号(port)。
  • xxx2: Redis的登陆密码。
  • click: Redis的Hash 表名称。

然后运行查询语句:

select * from clicks;

例如下图:
Snip20190523_3
Spark-SQL通过Spark-Redis连接器直接查询Redis数据,统计了广告的点击数。

小结

本文主要介绍了Spark如何把Redis作为数据源以及Spark的StructuredStreaming与Redis Stream的结合。更多Spark介绍请参考:X-Pack Spark 分析引擎;更多Redis介绍请参考:云数据库 Redis 版

参考列表

本文的代码可参考Spark样例代码

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
97 0
|
8月前
|
存储 监控 安全
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(1)
带你读《Apache Doris 案例集》——07查询平均提速700% ,奇安信基于 Apache Doris 升级日志安全分析系统(1)
256 1
|
6月前
|
存储 NoSQL 算法
如何借助Redis更高效统计UV?——Hyperloglog篇
Redis的HyperLogLog数据类型是用于近似计算大规模数据集中不重复元素基数的工具,它以低空间开销(约12KB)提供高精度的估算(误差率约0.81%)。通过`pfadd`添加元素,`pfcount`统计数量,`pfmerge`合并多个HyperLogLog,实现去重计数。尽管内部存储为字符串,但它是概率数据结构,适合高效UV统计和其他大数据场景。
83 0
|
8月前
|
存储 监控 NoSQL
|
8月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
250 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
8月前
|
存储 SQL 关系型数据库
带你读《Apache Doris 案例集》——08秒级数据写入,毫秒查询响应,天眼查基于 Apache Doris 构建统一实时数仓(2)
带你读《Apache Doris 案例集》——08秒级数据写入,毫秒查询响应,天眼查基于 Apache Doris 构建统一实时数仓(2)
522 0
|
8月前
|
SQL 存储 运维
带你读《Apache Doris 案例集》——08秒级数据写入,毫秒查询响应,天眼查基于 Apache Doris 构建统一实时数仓(1)
带你读《Apache Doris 案例集》——08秒级数据写入,毫秒查询响应,天眼查基于 Apache Doris 构建统一实时数仓(1)
457 0
|
8月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
280 0
|
存储 NoSQL JavaScript
实战!Redis 巧用数据类型实现亿级数据统计!
实战!Redis 巧用数据类型实现亿级数据统计!
|
存储 缓存 NoSQL
每日一面 - Redis程序设计中,上百万的新闻,如何实时展示最热点的top10条呢
每日一面 - Redis程序设计中,上百万的新闻,如何实时展示最热点的top10条呢
每日一面 - Redis程序设计中,上百万的新闻,如何实时展示最热点的top10条呢

相关产品

  • 云数据库 Tair(兼容 Redis)