Spark Streaming 流式计算实战

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Spark Streaming 流式计算实战
这篇文章由一次平安夜的微信分享整理而来。在Stuq 做的分享, 原文内容。 

业务场景

这次分享会比较实战些。具体业务场景描述:
我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成
  • userName/year/month/day/hh/normal 
  • userName/year/month/day/hh/delay
路径,存储到HDFS中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。

Spark Streaming 与 Storm 适用场景分析

为什么这里不使用 Storm呢? 我们初期确实想过使用 Storm 去实现,然而使用 Storm 写数据到HDFS比较麻烦:
* Storm 需要持有大量的 HDFS 文件句柄。需要落到同一个文件里的记录是不确定什么时候会来的,你不能写一条就关掉,所以需要一直持有。
* 需要使用HDFS 的写文件的 append 模式,不断追加记录。
大量持有文件句柄以及在什么时候释放这些文件句柄都是一件很困难的事情。另外使用 HDFS 的追加内容模式也会有些问题。
后续我们就调研 Spark Streaming 。 Spark Streaming 有个好处,我可以攒个一分钟处理一次即可。这就意味着,我们可以隔一分钟(你当然也可以设置成五分钟,十分钟)批量写一次集群,HDFS 对这种形态的文件存储还是非常友好的。这样就很轻易的解决了 Storm 遇到的两个问题。
实时性也能得到保证,譬如我的 batch interval 设置为 一分钟 那么我们就能保证一分钟左右的延迟 ,事实上我们的业务场景是可以容忍半小时左右的。
当然,Spark 处理完数据后,如何落到集群是比较麻烦的一件事情,不同的记录是要写到不同的文件里面去的,没办法简单的 saveAsTextFile 就搞定。这个我们通过自定义 Partitioner 来解决,第三个环节会告诉大家具体怎么做。 
上面大家其实可以看到 Spark Streaming 和 Storm 都作为流式处理的一个解决方案,但是在不同的场景下,其实有各自适合的时候。 

Spark Streaming 与 Kafka 集成方案选型

我们的数据来源是Kafka ,我们之前也有应用来源于 HDFS文件系统监控的,不过建议都尽量对接 Kafka 。
Spark Streaming 对接Kafka 做数据接受的方案有两种:
*Receiver-based Approach 
*Direct Approach (No Receivers)
两个方案具体优劣我专门写了文章分析,大家晚点可以看看这个链接和 Spark Streaming 相关的文章。
我这里简单描述下:
*Receiver-based Approach 内存问题比较严重,因为她接受数据和处理数据是分开的。如果处理慢了,它还是不断的接受数据。容易把负责接受的节点给搞挂了。
* Direct Approach 是直接把 Kafka 的 partition 映射成 RDD 里的 partition 。 所以数据还是在 kafka 。只有在算的时候才会从 Kafka 里拿,不存在内存问题,速度也快。
所以建议都是用 Direct Approach 。具体调用方式是这样:
0193f0ce3517aa1f39a091dd0929cb67bcdd552f
还是很简单的,之后就可以像正常的 RDD 一样做处理了。

自定义 Partitioner 实现日志文件快速存储到 HDFS

d716dafc2b51f7ee896209c64579a2ce41322625
经过处理完成后 ,我们拿到了logs 对象 。
到这一步位置,日志的每条记录其实是一个 tuple(path,line)  也就是每一条记录都会被标记上一个路径。那么现在要根据路径,把每条记录都写到对应的目录去该怎么做呢?
一开始想到的做法是这样:
676553464115e605a087ec2693c4b1841d4a7e62
首先收集到所有的路径。接着 for 循环 paths ,然后过滤再进行存储,类似这样:
8d33cde81d974e45c6ce9b404f999f3bf11a69d0
这里我们一般会把 rdd 给 cache 住,这样每次都直接到内存中过滤就行了。但如果 path 成百上千个呢? 而且数据量一分钟至少几百万,save 到磁盘也是需要时间的。所以这种方案肯定是不可行的。
我当时还把 paths 循环给并行化了,然而当前情况是 CPU 处理慢了,所以有改善,但是仍然达不到要求。
这个时候你可能会想,要是我能把每个路径的数据都事先收集起来,得到几个大的集合,然后把这些集合并行的写入到 HDFS 上就好了。事实上,后面我实施的方案也确实是这样的。所谓集合的概念,其实就是 Partition 的概念。而且这在Spark 中也是易于实现的,而实现的方式就是利用自定义 Partioner 。具体的方式如下:
cb19375e7da7296b6fcbdd64dcb8930870b35c8f
通过上面的代码,我们就得到了路径和 partiton id 的对应关系。接着遍历 partition 就行了。对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。
我简单解释下代码 ,首先我把收集到的路径 zipWithIndex 这样就把路径和数字一一对应了 ;接着我新建了一个匿名类 实现了 Partitioner 。numPartitions 显然就是我们的路径集合的大小,遇到一个 key (其实就是个路径)时,则调用路径和数字的映射关系 ,然后就把所有的数据根据路径 hash 到不同的 partition 了 。接着遍历 partition 就行了,对应的 a 是分区号,b 则是分区的数据迭代器。接着做对应的写入操作就行。这些分区写入都是在各个 Executor 上执行的,并不是在 Driver 端,所以足够快。我们在测试集群上五分钟大概 1000-2000w 数据,90颗核,180G 内存,平均处理时间大概是2分钟左右。内存可以再降降  我估计 100G 足够了  。

在演示场景中,Spark Streaming 如何保证数据的完整性,不丢,不重

虽然 Spark Streaming 是作为一个24 * 7 不间断运行的程序来设计的,但是程序都会 crash ,那如果 crash 了,会不会导致数据丢失?会不会启动后重复消费?
关于这个,我也有专门的文章阐述( https://yq.aliyun.com/articles/60120),
我这里直接给出结论:
* 使用 Direct Approach 模式
* 启用 checkPoint 机制
做到上面两步,就可以保证数据至少被消费一次。
那如何保证不重复消费呢?
这个需要业务自己来保证。简单来说,业务有两种:
* 幂等的
* 自己保证事务
所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。
以当前场景为例,就是典型的幂等 ,因为可以做写覆盖 ,
8a95de8d680c824a1e41be21bb0f6feaf06331fc
具体代码如上 ,那如何保证写覆盖呢? 
文件名我采用了 job batch time 和 partition 的 id 作为名称。这样,如果假设系统从上一次失败的 job 重新跑的时候,相同的内容会被覆盖写,所以就不会存在重复的问题。

回顾

我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成  
  • userName/year/month/day/hh/normal 
  • userName/year/month/day/hh/delay 
路径,存储到HDFS中。如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 normal 目录。
我们作了四个方面的分析: 
  1. Spark Streaming 与 Storm 适用场景分析 ;
  2. Spark Streaming 与 Kafka 集成方案选型,我们推荐Direct Approach 方案 ;
  3. 自定义 Partitioner 实现日志文件快速存储到HDFS ;
  4. Spark Streaming 如何保证数据的完整性,不丢,不重 。

Q&A

Q1. spark streaming 可以直接在后面连上 elasticsearch 么?
A1. 可以的。透露下,我马上也要做类似的实践。
Q2. 公司选用 storm 是由于它可以针对每条日志只做一次处理,spark streaming 可以做到么?
A2.  spark streaming 是按时间周期的, 需要攒一段时间,再一次性对获得的所有数据做处理
Q3. 什么是文件句柄?
A3. HDFS 写入 你需要持有对应的文件的 client 。不可能来一条数据,就重新常见一个链接,然后用完就关掉。
Q4. Spark 分析流数据,分析好的数据怎么存到 mysql 比较好?
A4. 我没有这个实践过存储到 MySQL。一般数据量比较大,所以对接的会是 Reids/HBase/HDFS。
Q5. 有没有尝试过将数据写入 hive?
A5. 没有。但没有问题的。而且 Spark Streaming 里也可以使用 Spark SQL 。我不知道这会不会有帮助。
Q6. 幂等是什么?
A6. 就是反复操作不会有副作用。
Q7. 可不可以分享一下 spark 完整的应用场景?
A7. 这个有点太大。 目前 spark 覆盖了离线计算,数据分析,机器学习,图计算,流式计算等多个领域,目标也是一个通用的数据平台,所以一般你想到的都能用 spark 解决。
Q8. 如何理解日志产生时间和到达时间相差超过一定的阈值?
A8. 每条日志都会带上自己产生的时间。同时,如果这条日志到我们的系统太晚了,我们就认为这属于延时日志。
Q9. 目前这套体系稳定性如何?会不会有经常d节点的情况?
A9. 稳定性确实有待商榷 建议小范围尝试。
Q10. Spark Streaming 内部是如何设计并解决 storm 存在的两个问题的?老师能分析一下细节吗?
A10. 这和 Spark Streaming 的设计是相关的。微批处理模式使得我们可以一个周期打开所有文件句柄,然后直接写入几千万条数据,然后关闭。第二个是使用 partition 并行加快写入速度。
Q11. 如何应对网络抖动导致阻塞?
A11. Spark 本身有重试机制,还有各种超时机制。
Q12. 怎样保证消息的及时性?
A12. 依赖于数据源,kafka,Spark Streaming 是否处理能力充足,没有 delay . 所有环节都会影响消息的及时性。
Q13. 实际运用中,分析完的数据,本身有很大的结构关系,有时又需要对数据二次补充,处理完的数据量不大,该选哪种存储方式?
A13. 能用分布式存储的就用分布式存储。可以不做更新的,尽量不做更新。我一般推荐对接到 HBase 。
Q14. Streaming 字面是流的意思,倒是课程中提到对日志有延迟的考虑,是 Spark  Streaming 是自定一个周期,处理周期到达的数据集合,通俗讲感觉像批处理,不是每条记录不一定要有时间戳?
A14. 你理解对了。每条记录没有时间戳。如果有,也是日志自己带的。Spark Streaming 并不会给每条记录带上时间。
Q16. storm 避免重复是依赖 zookeeper,Spark Streaming 靠什么记录处理到哪行呢?
A16. 通过 checkpoint 机制,自己维护了 zookeeper 的偏移量。
Q17. 请问一下 Spark Streaming 处理日志数据的压测结果如何呢?
Q17. 刚刚说了,在我们的测试集群里, 1000-2000w 条记录,平均处理时间大约2分钟,90颗核,180G 内存。没有任何调优参数。理论内存可以继续降低,,因为不 cache 数据 。
Q18. AMQ 与他们之间区别和联系?
A18. AMQ 也是消息队列? Spark Streaming 支持相当多的消息队列。
Q19. 国内 spark 集群部署在哪些云上?
A19. 没有用过云。
Q21. zookeeper 目前 hbase 都不想依赖它了,因为会导致系统的不稳定,请问老师怎么看?
A21. 还好吧,产生问题主要是 client 太多。比如 hbase 依赖 zookeeper,所有使用 hbase 的,都需要先和 zookeeper 建立连接,这对 zookeeper 产生较大的压力。其他的系统也类似。如果共享 zookeeper 集群,那么它的连接数会成为一个瓶颈。
目录
相关文章
|
30天前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
35 1
|
1月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
52 3
|
3月前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
168 56
|
2月前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
2月前
|
分布式计算 Apache Spark
|
3月前
|
分布式计算 监控 大数据
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
|
4月前
|
SQL 分布式计算 关系型数据库
Spark 分析计算连续三周登录的用户数
本文介绍了如何使用窗口函数`range between`来查询`login_time`为2022-03-10的用户最近连续三周的登录数。首先在MySQL中创建`log_data`表并插入数据,接着定义需求为找出该日期前连续三周活跃的用户数。通过Spark SQL,分步骤实现:1)确定统计周期,2)筛选符合条件的数据,3)计算用户连续登录状态。在初始实现中出现错误,因未考虑日期在周中的位置,修正后正确计算出活跃用户数。
|
3月前
|
分布式计算 Serverless 数据处理
Serverless Spark计算服务
Serverless Spark计算服务
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
110 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
123 3