Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(二)

简介: Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(二)

4、RDD持久化

4.1 RDD Cache缓存

1、RDD Cache缓存

(1)RDD通过Cache或者persist方法将前面的计算结果缓存

(2)默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。

(3)但是并不是这个两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

2、创建包名com.zhm.spark.operator.cache

3、未使用缓存代码实现

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test01_no_cache
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 13:58
 * @Version 1.0
 */
public class Test01_no_cache {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("./input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //6、执行两个Action算子,以触发两个Job
        mapToPairRDD.saveAsTextFile("output/1"+System.currentTimeMillis()+".txt");
        mapToPairRDD.saveAsTextFile("output/2"+System.currentTimeMillis()+".txt");
        //7、设置睡眠时间  以便查看localhost:4040
        Thread.sleep(1000000L);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

4、运行结果:

结论:一共输出12次,我们的RDD中有12条数据,每个job都执行一个flatMapRDD.map都会输出,所以是8次,也就意味着我们的两个job都会从头开始计算,直到最终的结果。

查看任务的WebUI

5、使用缓存代码实现

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test02_has_cache
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 14:08
 * @Version 1.0
 */
public class Test02_has_cache {
    /**
     *自带缓存的算子:reduceByKey
     */
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("./input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //将mapToPairRDD进行缓存
        mapToPairRDD.cache();
        //6、执行两个Action算子,以触发两个Job
        mapToPairRDD.saveAsTextFile("output/1"+System.currentTimeMillis()+".txt");
        mapToPairRDD.saveAsTextFile("output/2"+System.currentTimeMillis()+".txt");
        //7、设置睡眠时间  以便查看localhost:4040
        Thread.sleep(1000000L);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

注意:缓存的应用程序执行结束之后,缓存的目录也会被删除

6、运行结果

结论:我们对JavaPairRDD进行了缓存,那么也就第一个Job会从头到JavaPairRDD执行,而第二个则会从缓存中得到JavaPairRDD数据,继续自己的处理逻辑。

观察任务WebUI

不同于未使用缓存的任务,本次job中再map出对javaPairRDD进行了缓存,途中绿色的点就表示缓存,鼠标停留到绿色点处会有提示。

7、缓存相关源码解析

mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。

8、SparkRDD的安全问题和解决方案

(1)问题:缓存有可能会丢失,或者储存于内存的数据由于内存不足而被删除

(2)RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行

(3)原理:

通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算所有的Partition。

9、自带缓存算子

(1)Spark会自动对一些Shuffle操作的中间数据做持久化操作(别人ReduceByKey)。

(2)这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。

(3)但是,在时间使用的时候,如果想重用数据,仍然建议调用persist或Cache。

(4)编写具有自带缓存算子的代码

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test03_OperatorWithCache
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 14:19
 * @Version 1.0
 */
public class Test03_OperatorWithCache {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //6、对mapToPairRDD按照key聚合
        JavaPairRDD<String, Integer> reduceByKeyRDD = mapToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //7、执行两个行动算子
        reduceByKeyRDD.saveAsTextFile("output/1"+System.currentTimeMillis()+".txt");
        reduceByKeyRDD.saveAsTextFile("output/2"+System.currentTimeMillis()+".txt");
        //8、设置睡眠时间  以便查看localhost:4040
        Thread.sleep(1000000L);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。

4.2 RDD CheckPoint检查点

1、检查点:是将RDD中间结果写入磁盘

2、为什么要做检查点?

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以就从检查点开始重做血缘,减少了开销。

3、检查点储存路径:Checkpoint的数据通常是储存在HDFS等容错、高可用的文件系统。

4、检查点数据储存格式为:二进制文件

5、检查点切断血缘:在Checkpoint的过程中,该RDD的所以依赖于父RDD的信息将全部被移除。

6、检查点触发时间:对RDD进行Checkpoint操作并不会马上执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍

7、设置检查点步骤

(1)设置检查点数据储存路径:sc.setCheckpointDir(“./checkpoint1”)

(2)调用检查点方法:wordToOneRdd.checkpoint()

8、代码实现

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test04_checkPoint
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 14:26
 * @Version 1.0
 */
public class Test04_checkPoint {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //设置检查点储存路径
        sparkContext.setCheckpointDir("output/checkPoint");
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //6、对mapToPairRDD设置检查点
        mapToPairRDD.checkpoint();
        //7、将数据储存到文件中
        mapToPairRDD.saveAsTextFile("output/checkPoint/wordCount_"+System.currentTimeMillis()+".txt");
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

9、执行结果:

访问http://localhost:4040/jobs/

10、Checkpoint对血缘的一些

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test05_checkPoint_printLineage
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 15:12
 * @Version 1.0
 */
public class Test05_checkPoint_printLineage {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //设置检查点储存路径
        sparkContext.setCheckpointDir("output/checkpoint1");
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //6、在检查点之前答应血缘
        System.out.println("在检查点之前打印血缘: ");
        System.out.println(mapToPairRDD.toDebugString());
        //7、对mapToPairRDD设置检查点
        mapToPairRDD.checkpoint();
        //8、将数据储存到文件中
        mapToPairRDD.saveAsTextFile("output/checkPoint1/wordCount"+System.currentTimeMillis()+".txt");
        //9、在检查点之后打印血缘
        System.out.println("在检查点之后打印血缘: ");
        System.out.println(mapToPairRDD.toDebugString());
        Thread.sleep(10000000);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

结论:血缘关系被切断了,因为Checkpoint机制是储存的数据很安全了,不用保留血缘依赖。

11、Checkpoint对数据的影响

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test06_check_influence_data
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 15:21
 * @Version 1.0
 */
public class Test06_check_influence_data {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //设置检查点储存路径
        sparkContext.setCheckpointDir("output/checkpoint3");
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,当前时间)
        JavaPairRDD<String, Long> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, System.currentTimeMillis());
            }
        });
        //6、checkpoint前收集打印
        System.out.println("checkpoint前收集打印:");
        mapToPairRDD.collect().forEach(System.out::println);
        //7、对mapToPairRDD设置检查点
        mapToPairRDD.checkpoint();
        //6、checkpoint后收集打印
        System.out.println("checkpoint前收集打印:");
        mapToPairRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

现象:由于Checkpoint要从头再执行一遍,这种与时间相关的就会造成数据不一致。

12、Checkpoint检查点+Cache缓存

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test07_checkPoint_with_cache
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 15:27
 * @Version 1.0
 */
public class Test07_checkPoint_with_cache {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //设置检查点储存路径
        sparkContext.setCheckpointDir("output/checkpoint4");
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,当前时间)
        JavaPairRDD<String, Long> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, System.currentTimeMillis());
            }
        });
        //缓存
        mapToPairRDD.cache();
        //6、checkpoint前收集打印
        System.out.println("checkpoint前收集打印:");
        mapToPairRDD.collect().forEach(System.out::println);
        //7、对mapToPairRDD设置检查点
        mapToPairRDD.checkpoint();
        //6、checkpoint后收集打印
        System.out.println("checkpoint后收集打印:");
        mapToPairRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

结论:保持了数据的一致性

4.3缓存和检查点的区别

1、Cache缓存只是将数据保存起来,不切断血缘依赖.Checkpoint检查点切断血缘依赖

2、Cache缓存的数据通常储存在磁盘、内存等地方,可靠性低。Checkpoint的数据通常储存在HDFS等容错、高可用的文件系统,可靠性高。

3、建议对Checkpoint的RDD使用Cache缓存,这样Checkpoint的job只需要从Cache缓存中读取数据即可,否者需要再从头计算一次RDD。

4、如果使用完了缓存,可以通过unpersist方法是否缓存。

4.4. 检查点储存到HDFS集群

注意:如果检查点数据储存到HDFS集群,要注意配置访问集群的用户名。否者会报访问权限异常。

package com.zhm.spark.operator.cache;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test08_checkpoint_hdfs
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 15:30
 * @Version 1.0
 */
public class Test08_checkpoint_hdfs {
    public static void main(String[] args) {
        //0、设置hadoop用户
        System.setProperty("HADOOP_USER_NAME","zhm");
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //设置检查点储存路径
        sparkContext.setCheckpointDir("hdfs://hadoop102:8020/sparkCheckPoint");
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("input/word.txt");
        //4、炸裂每行数据
        JavaRDD<String> flatMapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、转换元素形式-->(word,1)
        JavaPairRDD<String, Integer> mapToPairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                System.out.println("你看我出现几次?");
                return new Tuple2<>(s, 1);
            }
        });
        //6、对mapToPairRDD设置检查点
        mapToPairRDD.checkpoint();
        //7、将数据储存到文件中
        mapToPairRDD.saveAsTextFile("output/checkPoint5/wordCount_"+System.currentTimeMillis()+".txt");
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}


相关文章
|
1月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
53 0
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
108 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
22 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
141 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
103 0
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
89 6