Spark Streaming之foreachRDD操作详解

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 笔记

DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作, 那么,压根儿就不会执行定义的计算逻辑。


此外,即使你使用了foreachRDD output操作,也必须在里面对RDD执行action操作,才能触 发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行 action操作,也不会触发任何逻辑。1.png通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。


误区一:在RDD的foreach操作外部,创建Connection

这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象, 实际上一般是不支持序列化的,也就无法被传输。

dstream.foreachRDD( rdd => {
  val connection = createNewConnection() 
  rdd.foreach( record => {
    connection.send(record) 
  } 
})

误区二:在RDD的foreach操作内部,创建Connection

这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。 而通常来说,Connection的创建,是很消耗性能的。

dstream.foreachRDD( rdd => {
  rdd.foreach( record => {
    val connection = createNewConnection() 
    connection.send(record) 
    connection.close() 
  })
})

合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象

这样就相当于为RDD的每个partition创建一个Connection对象,节省资源的多了。

dstream.foreachRDD( rdd => {
  rdd.foreachPartition( partitionOfRecords => {
    val connection = createNewConnection()
    partitionOfRecords.foreach( record => {
      connection.send(record)
      connection.close() 
    })
  })
})

合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从 静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。

这样的话,甚至在多个RDD的 partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

dstream.foreachRDD( rdd => {
  rdd.foreachPartition( partitionOfRecords => {
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach( record => {
      connection.send(record)
    )}
    ConnectionPool.returnConnection(connection) 
  })
})

案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数 据库中。

首先封装一个静态连接池:

package com.kfk.spark.common;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 7:26 下午
 */
public class ConnectionPool {
    private static LinkedList<Connection> connectionQueue;
    /**
     * 加载驱动
     */
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    /**
     * 获取连接,多线程访问并发控制
     * @return
     */
    public synchronized static Connection getConnection(){
        try {
            if (connectionQueue == null){
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0;i < 10;i++){
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://bigdata-pro-m04:3306/spark?useSSL=false",
                            "root",
                            "199911"
                    );
                    connectionQueue.push(conn);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
        return connectionQueue.poll();
    }
    /**
     * 还回去一个连接
     * @param conn
     */
    public static void returnConnection(Connection conn){
        connectionQueue.push(conn);
    }
}

编写实现功能代码:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    }
}
package com.kfk.spark.foreachrdd_project;
import com.kfk.spark.common.CommStreamingContext;
import com.kfk.spark.common.ConnectionPool;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 7:49 下午
 */
public class ForeachPersistMySQL {
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制
        // 以便于内存数据丢失时,可以从Checkpoint中恢复数据
        jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint");
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999);
        // flatmap
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair =  words.mapToPair(word -> new Tuple2<>(word,1));
        // 通过spark来维护一份每个单词的全局统计次数
        JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
            @Override
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
                Integer newValues = 0;
                if (state.isPresent()){
                    newValues = state.get();
                }
                for (Integer value : values){
                    newValues += value;
                }
                return Optional.of(newValues);
            }
        });
        // foreachRDD
        wordcount.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception {
                stringIntegerJavaPairRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
                        Tuple2<String, Integer> wordcount = null;
                        Connection conn = ConnectionPool.getConnection();
                        while (tuple2Iterator.hasNext()){
                            wordcount = tuple2Iterator.next();
                            String sql = "insert into spark.wordcount(word,count) values('"+wordcount._1+"', '"+wordcount._2+"')";
                            Statement statement = conn.createStatement();
                            statement.executeUpdate(sql);
                        }
                        ConnectionPool.returnConnection(conn);
                    }
                });
            }
        });
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}





相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
101 0
|
1月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
49 0
|
2月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
52 0
|
2月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
39 0
|
分布式计算 大数据 Spark
Spark 操作算子本质、RDD 容错_1 | 学习笔记
快速学习 Spark 操作算子本质、RDD 容错_1
125 0
Spark 操作算子本质、RDD 容错_1 | 学习笔记
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
下一篇
DataWorks