4、DStream转换
DStream上的操作和RDD类型,分为转换和输出两种类型,此外转换操作中还有一些比较特殊的原语,如:transform()以及各种Window相关的原语。
4.1 无状态转换操作
无状态转换操作就是把RDD转换操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。
4.1.1 常规无状态转换操作
DStream的部分无状态转换操作列表
需要注意的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转换操作是分别应用到每个RDD(应该批次的数据)上。
4.2 窗口操作
4.2.1 WindowOperations
1、Window Operations:可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。
2、参数:所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
(1)窗口时长:计算内容的时间范围
(2)滑动步长:隔多久触发一次计算
注意:这两者都必须为采集批次大小的整数倍。
4.2.2 Window
1、基本语法
window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。
2、 需求:统计WordCount:3秒一个批次,窗口6秒,滑步3秒。
3、代码实现
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test03_SocketWindow * @Description TODO * @Author Zouhuiming * @Date 2023/7/3 21:27 * @Version 1.0 */ public class Test03_SocketWindow { public static void main(String[] args) throws InterruptedException { //1、创建SparkConf配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_SocketWindow"); //2、创建JavaStreamingContext对象,并且设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L)); //3、编写业务逻辑 // 3.1 对接数据源,获取数据 JavaReceiverInputDStream<String> linesDStream = ssc.socketTextStream("hadoop102", 44443); // 3.2 切分数据并转换为kv JavaPairDStream<String, Integer> wordOneDStream = linesDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String s) throws Exception { Iterator<String> iterator = Arrays.asList(s.split(" ")).iterator(); ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>(); while (iterator.hasNext()) { resultList.add(new Tuple2<>(iterator.next(), 1)); } return resultList.iterator(); } }); //3.3 添加窗口:窗口大小6s,滑动步长为3s JavaPairDStream<String, Integer> windowDStream = wordOneDStream.window(Duration.apply(6000L), Duration.apply(3000L)); //3.4 对开窗的数据流进行窗口计算 JavaPairDStream<String, Integer> reduceByKeyDStream = windowDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //3.5打印结果流 reduceByKeyDStream.print(); //4、启动并阻塞线程 ssc.start(); ssc.awaitTermination(); } }
运行结果:
因为添加了窗口,所以控制台打印了两次。
4、如果有多批数据进入窗口,最终也会通过window操作变成统一的RDD处理。
4.2.3 reduceByKeyAndWindow
1、基本语法
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
2、需求:统计WordCount:3秒一个批次,窗口6秒,滑步3秒。
3、代码编写
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test04_reduceByKeyAndWindow * @Description TODO * @Author Zouhuiming * @Date 2023/7/4 9:29 * @Version 1.0 */ public class Test04_reduceByKeyAndWindow { public static void main(String[] args) throws InterruptedException { //1、创建sparkConf配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_reduceByKeyAndWindow"); //2、创建JavaStreamingContext对象,并设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L)); //3、业务逻辑 //3.1 对接数据源,获取数据 JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44442); //3.2 切分数据并转换为kv JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String s) throws Exception { Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator(); ArrayList<Tuple2<String, Integer>> result = new ArrayList<>(); while (iterator.hasNext()) { result.add(new Tuple2<>(iterator.next(), 1)); } return result.iterator(); } }); //3.3 对数据流直接开窗计算 JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Duration.apply(6000L), Duration.apply(3000L)); //3.4 打印结果流 resultDStream.print(); //4、启动并阻塞线程 ssc.start(); ssc.awaitTermination(); } }
运行结果:
因为添加了窗口,所以控制台打印了两次。
5、DStream输出
DStream通常将数据输出到外部数据库或屏幕上。
DStream与RDD中的惰性求值类似,如果一个DStream及派生出的DStream都没有执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不启动。
1、输出操作API如下:
在企业开发操作中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。可以让我们访问任意的RDD。在foreachRDD()中,可以重用在Spark中实现的所以行动算子操作(action算子)。
2、案例需求:将端口数据流进行窗口计算,并将结果输出到mysql的test_result库中的result表中。
3、编写数据库连接工具类JDBCUtil
import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; /** * @ClassName JDBCUtil * @Description TODO * @Author Zouhuiming * @Date 2023/7/4 9:55 * @Version 1.0 */ public class JDBCUtil { public static Connection connection; public static Connection getConnection(String user,String password){ try { //1、加载驱动类 Class.forName("com.mysql.jdbc.Driver"); connection= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test_result?useSSL=false&useUnicode=true&characterEncoding=UTF-8", user, password); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } catch (SQLException e) { throw new RuntimeException(e); } return connection; } }
4、编写主类Test05_save对端口数据进行窗口计算并存储到MySQL中
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test05_Dave * @Description TODO * @Author Zouhuiming * @Date 2023/7/4 9:58 * @Version 1.0 */ public class Test05_Save { /** *① 连接不能写在Driver层面(序列化) * ② 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失; * ③ 增加foreachPartition,在分区创建(获取) */ public static void main(String[] args) throws InterruptedException { //1、创建SparkConf配置对象 SparkConf conf = new SparkConf(); conf.setMaster("local[*]"); conf.setAppName("Test05_Save"); //2、创建JavaStreamContext对象,并设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L)); //3、读取数据开始业务逻辑计算 //3.1 对接数据源 JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44442); //3.2 切分并转换为kv JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String s) throws Exception { Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator(); ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>(); while (iterator.hasNext()) { resultList.add(new Tuple2<>(iterator.next(), 1)); } return resultList.iterator(); } }); //3.3 按照key聚合value JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Duration.apply(6000L), Duration.apply(3000L)); //3.4 将数据保存到mysql中 resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { @Override public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { stringIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() { @Override public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception { //TODO 在此处可创建JDBC连接,可在每个分区创建一个连接后,同一个分区重复使用 //3.4.1 创建数据库连接对象 Connection connection = JDBCUtil.getConnection("root", "123456"); //3.4.2 编写sql语句 String sql="insert into result values(?,?);"; //3.4.3 预编译sql语句 PreparedStatement preparedStatement = connection.prepareStatement(sql); //3.4.4 结果数据集储存到MySQL中 while (tuple2Iterator.hasNext()){ //3.4.5 获取数据 Tuple2<String, Integer> valueKV = tuple2Iterator.next(); preparedStatement.setString(1,valueKV._1); preparedStatement.setInt(2,valueKV._2); //3.4.6 执行sql preparedStatement.execute(); } //3.4.7关闭资源 connection.close(); } }); } }); //4、启动并阻塞线程 ssc.start(); ssc.awaitTermination(); } }
运行结果:
输入
输出(MySQL中的数据)
注意:因为是开窗了所以有两份。
5、总结
(1)连接不能写在Driver层面(序列化)
(2)如果写在foreach则每个RDD中的每条数据都创建
(3)增加foreachPartition,在分区创建
6、优雅关闭
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
关闭方式:创建子线程,在子线程中监听指定路径,如果出现目标文件夹(stopSpark)就控制内部程序关闭。
1、主程序
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test06_close * @Description TODO * @Author Zouhuiming * @Date 2023/7/4 10:46 * @Version 1.0 */ public class Test06_close { public static void main(String[] args) throws InterruptedException { //1、创建SparkConf对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test06_close"); //2、创建JavaStreamingContext对象并设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L)); //3、业务逻辑 //3.1 对接数据源 JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44441); //3.2 切分并转换为kv JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String s) throws Exception { Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator(); ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>(); while (iterator.hasNext()) { resultList.add(new Tuple2<>(iterator.next(), 1)); } return resultList.iterator(); } }); //3.3 按照key聚合value JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, Duration.apply(6000L), Duration.apply(3000L)); //3.4 将结果保存到mysql中 resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() { @Override public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { stringIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() { @Override public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception { //TODO 在此处可创建JDBC连接,可在每个分区创建一个JDBC连接对象后,重复使用 //3.4.1 创建数据库对象 Connection connection = JDBCUtil.getConnection("root", "123456"); //3.4.2 编写sql语句 String sql="insert into result values(?,?);"; //3.4.3 预编译sql语句 PreparedStatement preparedStatement = connection.prepareStatement(sql); //3.4.4 将结果集储存到MySQL中 while (tuple2Iterator.hasNext()){ //3.4.5 获取数据 Tuple2<String, Integer> next = tuple2Iterator.next(); //3.4.6 向sql语句传参 preparedStatement.setString(1, next._1); preparedStatement.setInt(2, next._2); //3.4.7 执行sql语句 preparedStatement.execute(); } connection.close(); } }); } }); //TODO 启动监控线程 new Thread(new MonitorStopByDir(ssc)).start(); //4、启动并阻塞线程 ssc.start(); ssc.awaitTermination(); } }
2、编辑监听类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.streaming.StreamingContextState; import org.apache.spark.streaming.api.java.JavaStreamingContext; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * @ClassName MonitorStopByDir * @Description TODO * @Author Zouhuiming * @Date 2023/7/4 10:58 * @Version 1.0 */ public class MonitorStopByDir implements Runnable{ private JavaStreamingContext ssc=null; public MonitorStopByDir(JavaStreamingContext ssc) { this.ssc = ssc; } /** * 监控程序:当指定的hdfs根目录下出现stopSpark文件夹,就停止ssc */ @Override public void run() { try { //1、获取hdfs文件对象 FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), new Configuration(), "zhm"); //2、轮询 while (true){ Thread.sleep(5000); boolean isStop = fileSystem.exists(new Path("/stopSpark")); if (isStop){ if (ssc.getState()== StreamingContextState.ACTIVE){ ssc.stop(true,true); System.exit(0); } } } } catch (IOException e) { throw new RuntimeException(e); } catch (URISyntaxException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
3、测试
(1)发送数据
(2)启动Hadoop集群,并且创建目录/stopSpark
(3)IDEA控制台
恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)。