大数据开发笔记(八):Sparkstreaming

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,可以使用诸如map、reduce、join等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,处理结果保存到HDFS,数据库等。

一、Spark Streaming处理框架:


image.png


Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,可以使用诸如map、reduce、join等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,处理结果保存到HDFS,数据库等。


二、SparkStreaming实时任务如何开发?


1. 数据的输入


1.1 socket(测试开发的时候使用起来很方便。)


1.2 HDFS(使用得很少)


1.3 Flume(也是很少)


1.4 自定义数据源(用得很少,我们公司里面没有出现过,但是不代表没有用。)


1.5 Kafka   真正企业里面使用的是kafka


2. 数据的处理:


企业里面怎么用?


2.1 RDD的那些算子


2.2 transform


2.3 updateStateByKey


2.4 mapWithState


2.5 Window窗口的计算


3. 数据的输出


3.1 print(测试的时候使用)


3.2 foreachRDD(允许用户对Dstream每一批数据对应的RDD本身做任意操作,企业里面也是使用的这个api)


这个就是真正项目上线的时候需要使用的API。


存入kafka,mysql,codis,reids,hbase


比如公司里面上班:


电梯:批处理,或者说是离线处理。


离线,数据量大


商场里面购物:


扶梯:实时处理,处理的是流数据


实时,每次处理的数据量不大。


三、spark组件类比:


image.png


SparkCore:核心计算引擎


1. 核心的抽象 RDD


2. 程序的入口


val conf=new SparkConf
val sc=new SparkContext(conf)


后面无非就是一些算子对RDD进行各种操作。


SparkStreaming


1. 核心的抽象 DStream(一个DStream包括多个RDD,加了时间维度(隔一定时间执行一套RDD),不同时间RDD变换)


2. 程序的入口


val conf=new SparkConf()
val ssc=new StremaingContext(conf,Seoncdss(1))

SparkSQL:


1. 核心的抽象 DataFrame/DataSet


2. 程序的入口


spark1.x:    val sqlContext=new SQLContext(conf)
spark2.x:      val spark=SparkSessionxxx


后面的操作无非就是对dataFream/dataset进行各种算子的操作


四、Sparkstreaming架构:


– Client:负责向Spark Streaming中灌入数据(flume kafka)


• 整个架构由3个模块组成:


– Master:记录Dstream之间的依赖关系或者血缘关系,并负责任务调度以生成新的RDD


– Worker:①从网络接收数据并存储到内存中  ②执行RDD计算

36264a89e61ea76ae9d393472f7fbdca_1eb2c44a6d307bd6768929b1e984fdfb.png

spark中driver=AM , executor=worker节点


五、SparkStreaming作业提交


• Network Input Tracker:跟踪每一个网络received数据,并且将其映射到相应的input Dstream上


• Job Scheduler:周期性的访问DStream Graph并生成Spark Job,将其交给Job Manager执行


• Job Manager:获取任务队列,并执行Spark任务

6356e7d37f61ec95bf33da74c9bd8d2c_db6e10e0c6125b965c218702137958b2.png


六、SparkStreaming窗口操作


Spark提供了一组窗口操作,通过滑动窗口技术对大规模数据的增量更新进行统计分析

9e9dcf598ce96b0630e7eeb2ee8f6418_ad11879bac478722a86b46c2a31e2f8a.png

• Window Operation:定时进行一定时间段内的数据处理(上图time 3 4 5  每个2秒,一共6秒)


• 任何基于窗口操作需要指定两个参数:


– 窗口总长度(window length)10s


– 滑动时间间隔(slide interval)  2s


执行代码前先启动nc -lk 9999

5561dce4ed0570b47bc30ae7dc460c47_daa8c1e5aa39367810e37e47886749ff.png

执行代码:整个窗口长度10s,每2秒打印一次


改代码:


1.上代码改为seconds(10),second(3),报错,必须为scc seconds的整数倍


2.改为seconds(9),second(2)也出错,如下图,也必须为上scc seconds整数倍


七、Sparkstreaming全局统计量


• 如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制


• 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份


• 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在内存数据丢失的时候,可以从checkpoint中恢复数据

f245f5b7ff39d9898fe7bfaccaf3a375_cf33506edda52db21b3309fb35548d0b.png

左10s统计结果,右再过2s统计结果,最后全局合并统计updatestatebykey,要开启checkpoint且先nc -lp 9999

eee5064085fbf7cd358df9084e6ebd5a_99ba8159f9cfde13fbd88f5fd4b0fceb.png

3dd79075addac3d3b1b092ce0cbc2830_a96b6310695e5cee3b4c2b90ca7911a7.png

再输入7个a后

3360a51cb02219f96dfe32cca0a0e6f5_23fea564a44d30defd5b6309d264640b.png


Sparkstreaming容错性分析

(RDD容错靠血缘关系DAG,sparkstreaming靠WAL)


• 实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之初,Spark Streaming就支持driver和worker节点的错误恢复。


1. Worker容错:spark和rdd的保证worker节点的容错性。spark streaming构建在spark之上,所以它的worker节点也是同样的容错机制


•2.Driver容错:依赖WAL(WriteAheadLog)持久化日志


– 启动WAL需要做如下的配置


– 1:给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统hdfs,用来保存WAL和


做Streaming的checkpoint


– 2:spark.streaming.receiver.writeAheadLog.enable 设置为true; receiver才有WAL


Sparkstreaming中WAL简介

• Spark应用分布式运行的,如果driver进程挂了,所有的executor进程将不可用,保存在这些进程所


持有内存中的数据将会丢失。为了避免这些数据的丢失,Spark Streaming中引入了一个WAL.


• WAL在文件系统和数据库中用于数据操作的持久化,先把数据写到一个持久化的日志中,然后对数


据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。


• 如果WAL 启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持


久性,此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样drive重启后那些保存在


内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。


WAL工作原理

driver=AM  ,  executor=worker节点          


block+文件数据(代码)WAL   两部分结合

49faa900f01f88b0c9642e92eb83f9de_95b4375d1bcf9fbca8530e793517f0ca.png

1. 蓝色的箭头表示接收的数据:


– 接收器把数据流打包成块,存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文


件系统的日志文件中


2. 青色的箭头表示提醒driver:


– 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据


块的引用ID和日志文件中数据块的偏移信息


3. 红色箭头表示处理数据:


– 每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs. SparkContext执行这些job用


于处理executor内存中的数据块


4. 黄色箭头表示checkpoint这些计算:


– 以便于恢复。流式处理会周期的被checkpoint到文件中

085ed1565aa9dd99ce9baf89575c5d1e_b30463ac401f6a45e1e9d5d3824c17cf.png


Sparkstreaming消费kafka

Spark Streaming 接受数据的方式有两种: 只有receive有wal,direct不需要


• Receiver-based Approach:offset存储在zookeeper,由Receiver维护,Spark获取数据存入executor中,调用


Kafka高阶API

99c2002a790c51adc90e9ebc9c0e0adf_03f7b9cbdc2aba3c2d4770ebcac82465.png

Direct Approach (No Receivers):offset自己存储和维护,由Spark维护,且可以从每个分区读取数据,调用Kafka低阶API

f4ae7af8562d0aefae614bca8439e5ba_1607bb3a5da15471aa85729eab347ee9.png


SparkstreamingonKafkaDirect

1. Direct的方式是会直接操作kafka底层的元数据信息


2. 由于直接操作的是kafka,kafka就相当于底层的文件系统(对应receiver的executor内存)。


3. 由于底层是直接读数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询kafka,


处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka中特定范围(offset


范围)中的数据。


4. 读取多个kafka partition,Spark也会创建RDD的partition ,这个时候RDD的partition和


kafka的partition是一致的。


5. 不需要开启wal机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘


空间。从kafka获取数据,比从hdfs获取数据,因为zero copy的方式,速度肯定更快。


Direct与 Receiver对比

• 从容错角度:


– Receiver(高层次的消费者API):在失败的情况下,有些数据很有可能会被处理不止一次。 接收到的数


据被可靠地保存到WAL中,但是还没有来得及更新Zookeeper中Kafka偏移量。导致数据不一致性:


Streaming知道数据被接收,但Kafka认为数据还没被接收。这样系统恢复正常时,Kafka会再一次发送这


些数据。at least once


– Direct(低层次消费者API):给出每个batch区间需要读取的偏移量位置,每个batch的Job被运行时,


对应偏移量的数据从Kafka拉取,偏移量信息也被可靠地存储(checkpoint),在从失败中恢复可以直接


读取这些偏移量信息。exactly once


Direct API消除了需要使用WAL的Receivers的情况,而且确保每个Kafka记录仅被接收一次并被高效地接收


。这就使得我们可以将Spark Streaming和Kafka很好地整合在一起。总体来说,这些特性使得流处理管道拥


有高容错性,高效性,而且很容易地被使用。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
SQL 存储 分布式计算
ODPS开发大全:入门篇(3)
ODPS开发大全:入门篇
217 19
|
5月前
|
SQL 分布式计算 资源调度
ODPS开发大全:进阶篇(1)
ODPS开发大全:进阶篇
463 13
|
2月前
|
大数据 网络安全 数据安全/隐私保护
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(二)
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(二)
138 5
|
2月前
|
XML 大数据 网络安全
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(一)
大数据-03-Hadoop集群 免密登录 超详细 3节点云 分发脚本 踩坑笔记 SSH免密 集群搭建(一)
72 4
|
3月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
63 0
|
3月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
107 0
|
5月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
|
5月前
|
SQL 分布式计算 MaxCompute
ODPS开发大全:入门篇(2)
ODPS开发大全:入门篇
139 14
|
5月前
|
存储 分布式计算 MaxCompute
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中
构建NLP 开发问题之如何支持其他存储介质(如 HDFS、ODPS Volumn)在 transformers 框架中