Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)

4、DStream转换

DStream上的操作和RDD类型,分为转换和输出两种类型,此外转换操作中还有一些比较特殊的原语,如:transform()以及各种Window相关的原语。

4.1 无状态转换操作

无状态转换操作就是把RDD转换操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。

4.1.1 常规无状态转换操作

DStream的部分无状态转换操作列表

image.png

需要注意的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转换操作是分别应用到每个RDD(应该批次的数据)上。

4.2 窗口操作

4.2.1 WindowOperations

1、Window Operations:可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。

2、参数:所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

(1)窗口时长:计算内容的时间范围

(2)滑动步长:隔多久触发一次计算

注意:这两者都必须为采集批次大小的整数倍。

4.2.2 Window

1、基本语法

window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream。

2、 需求:统计WordCount:3秒一个批次,窗口6秒,滑步3秒。

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test03_SocketWindow
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/3 21:27
 * @Version 1.0
 */
public class Test03_SocketWindow {
    public static void main(String[] args) throws InterruptedException {
        //1、创建SparkConf配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_SocketWindow");
        //2、创建JavaStreamingContext对象,并且设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L));
        //3、编写业务逻辑
        // 3.1 对接数据源,获取数据
        JavaReceiverInputDStream<String> linesDStream = ssc.socketTextStream("hadoop102", 44443);
        // 3.2 切分数据并转换为kv
        JavaPairDStream<String, Integer> wordOneDStream = linesDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                Iterator<String> iterator = Arrays.asList(s.split(" ")).iterator();
                ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>();
                while (iterator.hasNext()) {
                    resultList.add(new Tuple2<>(iterator.next(), 1));
                }
                return resultList.iterator();
            }
        });
        //3.3 添加窗口:窗口大小6s,滑动步长为3s
        JavaPairDStream<String, Integer> windowDStream = wordOneDStream.window(Duration.apply(6000L), Duration.apply(3000L));
        //3.4 对开窗的数据流进行窗口计算
        JavaPairDStream<String, Integer> reduceByKeyDStream = windowDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //3.5打印结果流
        reduceByKeyDStream.print();
        //4、启动并阻塞线程
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果:

因为添加了窗口,所以控制台打印了两次。

4、如果有多批数据进入窗口,最终也会通过window操作变成统一的RDD处理。

4.2.3 reduceByKeyAndWindow

1、基本语法

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。

2、需求:统计WordCount:3秒一个批次,窗口6秒,滑步3秒。

3、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test04_reduceByKeyAndWindow
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/4 9:29
 * @Version 1.0
 */
public class Test04_reduceByKeyAndWindow {
    public static void main(String[] args) throws InterruptedException {
        //1、创建sparkConf配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_reduceByKeyAndWindow");
        //2、创建JavaStreamingContext对象,并设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L));
        //3、业务逻辑
        //3.1 对接数据源,获取数据
        JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44442);
        //3.2 切分数据并转换为kv
        JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator();
                ArrayList<Tuple2<String, Integer>> result = new ArrayList<>();
                while (iterator.hasNext()) {
                    result.add(new Tuple2<>(iterator.next(), 1));
                }
                return result.iterator();
            }
        });
        //3.3 对数据流直接开窗计算
        JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, Duration.apply(6000L), Duration.apply(3000L));
        //3.4 打印结果流
        resultDStream.print();
        //4、启动并阻塞线程
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果:

因为添加了窗口,所以控制台打印了两次。

5、DStream输出

DStream通常将数据输出到外部数据库或屏幕上。

DStream与RDD中的惰性求值类似,如果一个DStream及派生出的DStream都没有执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不启动。

1、输出操作API如下:

image.png

在企业开发操作中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。可以让我们访问任意的RDD。在foreachRDD()中,可以重用在Spark中实现的所以行动算子操作(action算子)。

2、案例需求:将端口数据流进行窗口计算,并将结果输出到mysql的test_result库中的result表中。

3、编写数据库连接工具类JDBCUtil

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
 * @ClassName JDBCUtil
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/4 9:55
 * @Version 1.0
 */
public class JDBCUtil {
    public static Connection connection;
    public static Connection getConnection(String user,String password){
        try {
            //1、加载驱动类
            Class.forName("com.mysql.jdbc.Driver");
            connection= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test_result?useSSL=false&useUnicode=true&characterEncoding=UTF-8",
                    user,
                    password);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        return connection;
    }
}

4、编写主类Test05_save对端口数据进行窗口计算并存储到MySQL中

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test05_Dave
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/4 9:58
 * @Version 1.0
 */
public class Test05_Save {
    /**
     *① 连接不能写在Driver层面(序列化)
     * ② 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
     * ③ 增加foreachPartition,在分区创建(获取)
     */
    public static void main(String[] args) throws InterruptedException {
        //1、创建SparkConf配置对象
        SparkConf conf = new SparkConf();
        conf.setMaster("local[*]");
        conf.setAppName("Test05_Save");
        //2、创建JavaStreamContext对象,并设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L));
        //3、读取数据开始业务逻辑计算
        //3.1 对接数据源
        JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44442);
        //3.2 切分并转换为kv
        JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator();
                ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>();
                while (iterator.hasNext()) {
                    resultList.add(new Tuple2<>(iterator.next(), 1));
                }
                return resultList.iterator();
            }
        });
        //3.3 按照key聚合value
        JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, Duration.apply(6000L), Duration.apply(3000L));
        //3.4 将数据保存到mysql中
        resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {
                stringIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
                        //TODO 在此处可创建JDBC连接,可在每个分区创建一个连接后,同一个分区重复使用
                        //3.4.1 创建数据库连接对象
                        Connection connection = JDBCUtil.getConnection("root", "123456");
                        //3.4.2 编写sql语句
                        String sql="insert into result values(?,?);";
                        //3.4.3 预编译sql语句
                        PreparedStatement preparedStatement = connection.prepareStatement(sql);
                        //3.4.4 结果数据集储存到MySQL中
                        while (tuple2Iterator.hasNext()){
                            //3.4.5 获取数据
                            Tuple2<String, Integer> valueKV = tuple2Iterator.next();
                            preparedStatement.setString(1,valueKV._1);
                            preparedStatement.setInt(2,valueKV._2);
                            //3.4.6 执行sql
                            preparedStatement.execute();
                        }
                        //3.4.7关闭资源
                        connection.close();
                    }
                });
            }
        });
        //4、启动并阻塞线程
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果:

输入

输出(MySQL中的数据)

注意:因为是开窗了所以有两份。

5、总结

(1)连接不能写在Driver层面(序列化)

(2)如果写在foreach则每个RDD中的每条数据都创建

(3)增加foreachPartition,在分区创建

6、优雅关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。

关闭方式:创建子线程,在子线程中监听指定路径,如果出现目标文件夹(stopSpark)就控制内部程序关闭。

1、主程序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test06_close
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/4 10:46
 * @Version 1.0
 */
public class Test06_close {
    public static void main(String[] args) throws InterruptedException {
        //1、创建SparkConf对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test06_close");
        //2、创建JavaStreamingContext对象并设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration.apply(3000L));
        //3、业务逻辑
        //3.1 对接数据源
        JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44441);
        //3.2 切分并转换为kv
        JavaPairDStream<String, Integer> flatMapToPairDStream = lineDStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                Iterator<String> iterator = Arrays.stream(s.split(" ")).iterator();
                ArrayList<Tuple2<String, Integer>> resultList = new ArrayList<>();
                while (iterator.hasNext()) {
                    resultList.add(new Tuple2<>(iterator.next(), 1));
                }
                return resultList.iterator();
            }
        });
        //3.3 按照key聚合value
        JavaPairDStream<String, Integer> resultDStream = flatMapToPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, Duration.apply(6000L), Duration.apply(3000L));
        //3.4 将结果保存到mysql中
        resultDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {
                stringIntegerJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
                        //TODO 在此处可创建JDBC连接,可在每个分区创建一个JDBC连接对象后,重复使用
                        //3.4.1 创建数据库对象
                        Connection connection = JDBCUtil.getConnection("root", "123456");
                        //3.4.2 编写sql语句
                        String sql="insert into result values(?,?);";
                        //3.4.3 预编译sql语句
                        PreparedStatement preparedStatement = connection.prepareStatement(sql);
                        //3.4.4 将结果集储存到MySQL中
                        while (tuple2Iterator.hasNext()){
                            //3.4.5 获取数据
                            Tuple2<String, Integer> next = tuple2Iterator.next();
                            //3.4.6 向sql语句传参
                            preparedStatement.setString(1, next._1);
                            preparedStatement.setInt(2, next._2);
                            //3.4.7 执行sql语句
                            preparedStatement.execute();
                        }
                        connection.close();
                    }
                });
            }
        });
        //TODO 启动监控线程
        new Thread(new MonitorStopByDir(ssc)).start();
        //4、启动并阻塞线程
        ssc.start();
        ssc.awaitTermination();
    }
}

2、编辑监听类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
 * @ClassName MonitorStopByDir
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/4 10:58
 * @Version 1.0
 */
public class MonitorStopByDir implements Runnable{
    private JavaStreamingContext ssc=null;
    public MonitorStopByDir(JavaStreamingContext ssc) {
        this.ssc = ssc;
    }
    /**
     * 监控程序:当指定的hdfs根目录下出现stopSpark文件夹,就停止ssc
     */
    @Override
    public void run() {
        try {
            //1、获取hdfs文件对象
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), new Configuration(), "zhm");
            //2、轮询
            while (true){
                Thread.sleep(5000);
                boolean isStop = fileSystem.exists(new Path("/stopSpark"));
                if (isStop){
                    if (ssc.getState()== StreamingContextState.ACTIVE){
                        ssc.stop(true,true);
                        System.exit(0);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

3、测试

(1)发送数据

(2)启动Hadoop集群,并且创建目录/stopSpark

(3)IDEA控制台

恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)

相关文章
|
1月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
66 11
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
52 5
|
1月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
55 1
|
3天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
107 1
|
1月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
41 2
|
1月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
41 2
|
消息中间件 分布式计算 大数据
【Spark Summit East 2017】 使用Kafka Connect和Spark Streaming构建实时数据管道
本讲义出自Ewen Cheslack Postava在Spark Summit East 2017上的演讲,主要介绍了面对使用Spark Streaming构建大规模实时数据管道的挑战,Kafka项目最近推出了新的工具—— Kafka Connect,该工具将帮助简化从Kafka导入和导出数据,Ewen Cheslack Postava分享了如何使用Kafka Connect和Spark Streaming构建实时数据管道。
2224 0
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
161 0
|
15天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。