这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;
但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;
最后想说一句君子不隐其短,不知则问,不能则学。
如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)
一、SparkStreaming概述
1.1 SparkStreaming是什么
SparkStreaming用于流式数据的处理。
(1)SparkS支持的数据输入源很多,例如kafka、Flume、HDFS等。
(2)数据输入可以用Spark的高度抽象原语如:map、Reduce、join、Window等进行运算
(3)而且结果也能保存在很多地方,例如HDFS、数据库等。
1.2 SparkStreaming架构原理
1.2.1 什么是DStream
SparkCore==>RDD
SparkSQL==>DataFrame、DataSet
SparkStreaming使用离散化流作为抽象表示,叫作DStream
DStream是随时间推移而受到的数据序列
在DStream是随时间推移而收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此被称为离散化)。
所以,简单来说,DStream就是对RDD的实时数据处理场景的一种封装。
1.2.2 架构图
1、整体架构图
2、SparkStreaming架构图
1.2.3 背压机制
1、Spark1.5 以前的版本:
用户可以通过设置静态配置参数“spark.streaming.receiver.maxRate”的值来限制Receiver的数据接收速率。
优点:此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。
缺点:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
2、1.5版本及以后版本
Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。
背压机制(SparkStreaming Backpressure):
更具JobScheduler反馈作业的执行信息来动态调整Receiver数据接受率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值是false,即不启用。
1.3 SparkStreaming特点
1、易用
2、容错
3、易整合到Spark体系
二、DStream入门
2.1 WordCount案例实操
1、需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数。
2、添加依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.0</version> </dependency> </dependencies>
3、编写代码
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test01_wordCount * @Description TODO * @Author Zouhuiming * @Date 2023/7/3 20:49 * @Version 1.0 */ public class Test01_wordCount { public static void main(String[] args) throws InterruptedException { //TODO 第一步 创建SparkConf对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_wordCount"); //TODO 第二步 创建JavaStreamingContext对象,并设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L)); //TODO 第三步 地读取数据开始业务逻辑计算 //1、对接数据源获取数据 JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44444); //2、切分 JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //3、转换word->(word,1) JavaPairDStream<String, Integer> javaPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //4、统计单词个数 JavaPairDStream<String, Integer> reduceByKeyDStream = javaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //5、输出结果 reduceByKeyDStream.print(); //TODO 第四步 启动阻塞进程 ssc.start(); ssc.awaitTermination(); } }
4、更改日志打印级别
如果不希望运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息。
# Set everything to be logged to the console rootLogger.level = ERROR rootLogger.appenderRef.stdout.ref = console # In the pattern layout configuration below, we specify an explicit `%ex` conversion # pattern for logging Throwables. If this was omitted, then (by default) Log4J would # implicitly add an `%xEx` conversion pattern which logs stacktraces with additional # class packaging information. That extra information can sometimes add a substantial # performance overhead, so we disable it in our default logging config. # For more information, see SPARK-39361. appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex # Set the default spark-shell/spark-sql log level to WARN. When running the # spark-shell/spark-sql, the log level for these classes is used to overwrite # the root logger's log level, so that the user can have different defaults # for the shell and regular Spark apps. logger.repl.name = org.apache.spark.repl.Main logger.repl.level = warn logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver logger.thriftserver.level = warn # Settings to quiet third party logs that are too verbose logger.jetty1.name = org.sparkproject.jetty logger.jetty1.level = warn logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle logger.jetty2.level = error logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper logger.replexprTyper.level = info logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter logger.replSparkILoopInterpreter.level = info logger.parquet1.name = org.apache.parquet logger.parquet1.level = error logger.parquet2.name = parquet logger.parquet2.level = error # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler logger.RetryingHMSHandler.level = fatal logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry logger.FunctionRegistry.level = error # For deploying Spark ThriftServer # SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805 appender.console.filter.1.type = RegexFilter appender.console.filter.1.regex = .*Thrift error occurred during processing of message.* appender.console.filter.1.onMatch = deny appender.console.filter.1.onMismatch = neutral
5、启动程序并通过netcat发送数据(nc 再启动IEDA程序):
2.2 WordCount解析
在SparkStreaming中,DataStream是基础抽象,代表这数据流和经过算子计算的结果流。SparkStreaming仍然是基于批处理的思想来处理流式数据的,在内部实现上,将每一批次的数据疯转为一个RDD,DStream就是一系列RDD的封装,接下来就是Spark引擎来对这些RDD进行转换。DStream中批次与批次之间计算相互独立。
3、kafka数据源
3.1 版本选型
1、ReceiverAPI:需要一个专门的Executor去接受数据,然后发生给其他的Executor做计算。
存在的问题:接受数据的Executor和计算的Executor速度会有所不同,特别在接受数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。
2、DirectAPI:是由计算的Executor来主动消费kafka的数据,速度由自身控制。
3.2 对接Kafka数据源
1、需求:通过SparkStreaming读取kafka某个主题的数据并输出打印到控制台。
2、添加依赖:
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.3.0</version> </dependency> </dependencies>
3、编写代码
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; /** * @ClassName Test02_kafkaDirectAuto * @Description TODO * @Author Zouhuiming * @Date 2023/7/3 21:05 * @Version 1.0 */ public class Test02_kafkaDirectAuto { public static void main(String[] args) throws InterruptedException { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_kafkaDirectAuto"); //2、创建JavaStreamingContext对象 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L)); //3、业务逻辑 //4、定义要消费的kafka主题 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); //5、定义kafka消费者配置以及创建消费者策略 HashMap<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"ssID"); kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); ConsumerStrategy<Object, Object> consumerStrategy = ConsumerStrategies.Subscribe(topics, kafkaParams); //6、对接kafka JavaInputDStream<ConsumerRecord<Object, Object>> kafkaDStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), consumerStrategy); //7、转换数据结构 consumerRecord==》consumerRecord.value JavaDStream<String> lineDStream =kafkaDStream.map(new Function<ConsumerRecord<Object, Object>, String>() { @Override public String call(ConsumerRecord<Object, Object> v1) throws Exception { return v1.value().toString(); } }); //8、切分数据 JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.stream(s.split(" ")).iterator(); } }); //9、转换数据结构 word->(word,1) JavaPairDStream<String, Integer> mapToPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //10、按照key进行聚合value JavaPairDStream<String, Integer> reduceByKeyDStream = mapToPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //11、打印结果输出流 reduceByKeyDStream.print(); //x. 启动并阻止线程 ssc.start(); ssc.awaitTermination(); } }
运行结果