Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)

简介: Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)

2.3.1.4 groupBy()分组

1、用法:groupBy(f) ,以元素为粒度对每个元素执行函数f。

2、函数f:

(1)函数f为用户自定义实现内容,返回值任意

(2) 函数返回值为算子groupBy返回值的key,元素为value。

(3)算子groupBy的返回值为新的重新分区的K—V类型RDD

3、功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

4、案例说明:创建一个RDD,按照元素模以2的值进行分组。

5、代码实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
/**
 * @ClassName StudyGroupBy
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 14:32
 * @Version 1.0
 */
public class StudyGroupBy {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo 根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
        //4、储存到文件查看groupby之前的情况
        javaRDD.saveAsTextFile("outputGroupByBefore");
        //5、对RDD执行groupBy操作,计算规则是value%2
        JavaPairRDD<Integer, Iterable<Integer>> integerIterableJavaPairRDD = javaRDD.groupBy(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer % 2;
            }
        });
        //6、类型可以容易修改
        JavaPairRDD<Boolean, Iterable<Integer>> booleanIterableJavaPairRDD = javaRDD.groupBy(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });
        //7、输出结果
        System.out.println("---执行groupBy之后的RDD分区情况---");
        integerIterableJavaPairRDD.collect().forEach(System.out::println);
        booleanIterableJavaPairRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        Thread.sleep(1000000000L);
        sparkContext.stop();
    }
}

运行结果:

f50e546f81944fc796c365d06b319041.png


6、说明:

(1)groupBy会存在shuffle过程

(2)shuffle:将不同的分区数据进行打乱重组的过程

(3)shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。


e9643d9b64ff42e0a093315367b1e3ae.png2.3.1.5 distinct()去重

1、用法:distinct(numPartitions), 实现对RDD进行分布式去重。

2、参数numPartitions:指定去重后的RDD的分区个数。

3、功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。

4、代码实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
/**
 * @ClassName StudyDistinct
 * @Description 去重
 * @Author Zouhuiming
 * @Date 2023/6/27 14:44
 * @Version 1.0
 */
public class StudyDistinct {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo 根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 1, 2, 3, 4, 5, 6), 2);
        //4、使用distinct算子实现去重,底层使用分布式去重,慢但是不会OOM
        JavaRDD<Integer> distinctRDD = javaRDD.distinct();
        //5、收集打印
        distinctRDD.collect().forEach(System.out::println);
        Thread.sleep(100000000L);
        //6 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

6589dae3f37a4e02999590196ecc33fe.png

5、distinct会存在shuffle过程。

af3afb98c56e4bf8bea80e41ea2f0be9.png

2.3.1.6 sortBy()排序

1、用法:RDD.sortBy(f, ascending, numpartitions)

2、参数介绍:

(1)函数f: 对每个元素都执行函数f,返回值类型和元素中的类型一致

(2)ascending:数据类型为Boolean,默认是True,参数决定了排序后,RDD中的元素的排列顺序,即升序/降序

(3)numpartitions:排序后的RDD的分区数。

3、功能说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。默认排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。

4、案例需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序

6656c40048a049df95b8b7d39d4c73c8.png

5、代码实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
/**
 * @ClassName StudySortBy
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 14:48
 * @Version 1.0
 */
public class StudySortBy {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 3, 2, 9, 6, 5, 3), 2);
        //4、使用sortBy算子对javaRDD进行排序(泛型->以谁作为标准排序,true->为正序)
        JavaRDD<Integer> javaRDD1 = javaRDD.sortBy(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer;
            }
        }, true, 2);
        //5、收集输出
        javaRDD1.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:


ee72d85def9e4e519960d95df4dc07d8.png

2.3.2 Key-Value类型

创建包名:com.zhm.spark.operator.keyvalue

2.3.2.1 mapToPair()

1、用法:RDD.mapToPair(f) ,对父RDD中的每条记录都执行函数f,得到新的记录<k, v>

2、作用:将Value类型转换为key-Value类型

3、代码实现


package com.zhm.spark.operator.keyvalue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName StudyMapToPair
 * @Description 将不是kv类型的RDD转换为kv类型的RDD
 * @Author Zouhuiming
 * @Date 2023/6/27 14:52
 * @Version 1.0
 */
public class StudyMapToPair {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
        //4、使用mapToPair算子对javaRDD转换为kv类型的RDD
        JavaPairRDD<Integer, Integer> pairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
                return new Tuple2<>(integer, integer);
            }
        });
        //5、收集输出
        System.out.println("------由v型RDD转换得到的kv型RDD------");
        pairRDD.collect().forEach(System.out::println);
        //Todo 由集合直接创建KV型RDD
        JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2), new Tuple2<>(3, 3), new Tuple2<>(4, 5)));
        //6、收集输出
        System.out.println("-----由集合直接创建KV型RDD-----");
        integerIntegerJavaPairRDD.collect().forEach(System.out::println);
     sparkContext.stop();
    }
}

运行结果:

e5f878b76bc94265a7a8ac957ecb8ee2.png

2.3.2.2 mapValues()只对Value进行操作


1、用法:newRDD = oldRdd.mapValues(func)

2、参数函数func:自定义实现的函数,仅对oldRdd中(k,v)数据的v作用。

3、功能说明:针对于(K,V)形式的类型只对V进行操作

4、需求说明:创建一个pairRDD,并将value添加字符尾缀" Fighting"

5、代码实现

package com.zhm.spark.operator.keyvalue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @ClassName StudyMapValues
 * @Description 对kv类型的RDD的v进行操作
 * @Author Zouhuiming
 * @Date 2023/6/27 15:01
 * @Version 1.0
 */
public class StudyMapValues {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo
        JavaPairRDD<Integer, Integer> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 2), new Tuple2<>(3, 3),
                new Tuple2<>(4, 4), new Tuple2<>(5, 5)), 2);
        //4、为kv型RDD的v拼接尾缀"Fighting"
        JavaPairRDD<Integer, String> resultRDD = javaPairRDD.mapValues(new Function<Integer, String>() {
            @Override
            public String call(Integer integer) throws Exception {
                return integer + " Fighting";
            }
        });
        //5、打印收集
        resultRDD.collect().forEach(System.out::println);
        //6 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

7e5505c4105a4f7389d2cecfdd026062.png

2.3.2.3 groupByKey()按照K重新分组

1、用法:KVRDD.groupByKey();

2、功能说明

groupByKey对每个key进行操作,但只生成一个结果集,并不进行聚合。

该操作可以指定分区器或者分区数(默认使用HashPartitioner)

3、需求说明

统计单词出现次数

ea4632491dd4485cbec9111147bec966.png

4、代码实现

package com.zhm.spark.operator.keyvalue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @ClassName StudyGroupByKey
 * @Description 分组聚合
 * @Author Zouhuiming
 * @Date 2023/6/27 15:07
 * @Version 1.0
 */
public class StudyGroupByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo
        JavaRDD<String> javaRDD = sparkContext.parallelize(Arrays.asList("a", "a", "a", "b", "b", "b", "b", "a"), 2);
        //4、根据JavaRDD创建KVRDD
        JavaPairRDD<String, Integer> pairRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //5、聚合相同的key
        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();
        //6、收集并输出RDD内容
        System.out.println("-----查看groupByKeyRDD的内容------");
        groupByKeyRDD.collect().forEach(System.out::println);
        sparkContext.stop();
    }
}

5、运行结果:


8ca2e1c0d2a84b268678764dbd193275.png

2.3.2.4 reduceByKey()按照K聚合V

1、用法:KVRDD.reduceByKey(f);

2、功能说明:将RDD[K,V]中的元素按照相同的K的V进行聚合。存在多种重载形式,可设置新RDD的分区数。

3、需求说明:统计单词出现次数

5492e44701f14a9394f0fc9fb350af06.png

4、代码实现

package com.zhm.spark.operator.keyvalue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @ClassName StudyReduceByKey
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 15:15
 * @Version 1.0
 */
public class StudyReduceByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo
        JavaRDD<String> javaRDD = sparkContext.parallelize(Arrays.asList("a", "a", "a", "b", "b", "b", "b", "a"), 2);
        //4、根据JavaRDD创建KVRDD
        JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //5、聚合相同的key,统计单词出现的次数
        JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        //6、收集并输出RDD的内容
        System.out.println("查看--result--的内容");
        result.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:


9aef8fdfb0354d3ea699c407e2cad8f7.png

2.3.2.5 reduceByKey和groupByKey区别

1、educeByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

2、 groupByKey:按照key进行分组,直接进行shuffle。

3、在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。

2.3.2.6 sortByKey()按照K进行排序

1、用法:kvRDD.sortByKey(true/false)

2、功能说明:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。

3、参数说明:true:为升序排列,false为降序排列

4、需求说明:创建一个pairRDD,按照key的正序和倒序进行排序


bbb3fa5652ea4db8863897565a7e36f6.png

5、代码实现

package com.zhm.spark.operator.keyvalue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @ClassName StudySortByKey
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 15:47
 * @Version 1.0
 */
public class StudySortByKey {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo 创建kvRDD
        JavaPairRDD<Integer, String> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(4, "d"), new Tuple2<>(3, "c"), new Tuple2<>(1, "a"),
                new Tuple2<>(2, "b")));
        //4、收集输出
        System.out.println("排序前:");
        javaPairRDD.collect().forEach(System.out::println);
        //5、对RDD按照key进行排序
        JavaPairRDD<Integer, String> sortByKeyRDD = javaPairRDD.sortByKey(true);
        //收集输出
        System.out.println("排序后:");
        sortByKeyRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

47f2631250ea4123b5b48fd86a44e676.png


2.4 Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

创建包名:com.zhm.spark.operator.action

2.4.1 collect():以数组的形式返回数据集

1、用法:RDD.collect();

2、功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。


50ce2050a4974b82beaf5bc47a067136.png


注意:所以的数据都会被拉取到Driver端,慎用

3、需求说明:创建一个RDD,并将RDD内容收集到Driver端打印(代码实现在最后面)

2.4.2 count()返回RDD中元素个数

1、用法:RDD.count(),返回值为Long类型

2、功能说明:返回RDD中元素的个数


3a461ef6af054b4999f6d252387b7acb.png

3、需求说明:创建一个RDD,统计该RDD的条数(代码实现在最后面)

2.4.3 first()返回RDD中的第一个元素

1、用法:RDD.first(), 返回值类型是元素类型

2、功能说明:返回RDD中的第一个元素


a197676cd04a4dc7b756e8af0531202f.png

3、需求说明:创建一个RDD,返回该RDD中的第一个元素(代码实现在最后面)

2.4.4 take()返回由RDD前n个元素组成的数组

1、RDD.take(int num), 返回值为RDD中元素类型的List列表

2、功能说明:返回一个由RDD的前n个元素组成的数组

25409de2a4344e6b8c33b791ecdfcb1d.png

3、需求说明:创建一个RDD,取出前3个元素(代码实现在最后面)

2.4.5 countByKey()统计每种key的个数

1、用法:pairRDD.countByKey(), 返回值类型为Map<[RDD中key的类型] , Long>

2、功能说明:统计每种key的个数


de3eca03254f441e8b7d825ea3a8a8bd.png

3、需求说明:创建一个PairRDD,统计每种key的个数(代码实现在最后面)

2.4.6 save相关算子

1、saveAsTextFile(path)

(1)功能:将RDD保存成Text文件

(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。

2、 saveAsObjectFile(path)

(1)功能:序列化成对象保存到文件

(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

(代码实现在最后面)

2.4.7 foreach()遍历RDD中每一个元素

1、功能:遍历RDD中的每个元素,并依次应用函数


44022f50c49a4e09aaabca1fbeaf6bf1.png

2、需求说明:创建一个RDD,对每个元素进行打印

2.4.8 所有的代码实现
package com.zhm.spark.operator.action;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
 * @ClassName TestAll
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/28 13:47
 * @Version 1.0
 */
public class TestAll {
    public static void main(String[] args) {
        //设置往HDFS储存数据的用户名
        System.setProperty("HADOOP_USER_NAME","zhm");
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("TestAll");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、获取RDD
//        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
        JavaPairRDD<Integer, String> javaPairRDD = sparkContext.parallelizePairs(Arrays.asList(new Tuple2<>(1, "z"), new Tuple2<>(2, "h"), new Tuple2<>(3, "m"),
                new Tuple2<>(1, "zhm")
        ),2);
        //4、collect():以数组的形式返回数据集
        //注意:所有的数据都会被拉取到Driver端,慎用
        System.out.println("-------------collect测试-------------");
        javaPairRDD.collect().forEach(System.out::println);
        //5、count():返回RDD中元素个数
        System.out.println("-------------count测试-------------");
        System.out.println(javaPairRDD.collect());
        //6、first():返回RDD中第一个元素
        System.out.println("-------------first测试-------------");
        System.out.println(javaPairRDD.first());
        //7、take():返回RDD前n个元素组成的数组
        System.out.println("-------------take测试-------------");
        javaPairRDD.take(3).forEach(System.out::println);
        //8、countByKey统计每种key的个数
        System.out.println("-------------countByKey测试-------------");
        System.out.println(javaPairRDD.countByKey());
        //9、save相关的算子
        //以文本格式储存数据
        javaPairRDD.saveAsTextFile("outputText");
        //以对象储存数据
        javaPairRDD.saveAsObjectFile("outputObject");
        //10、foreach():遍历RDD中每一个元素
        javaPairRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                System.out.println(integerStringTuple2._1+":"+integerStringTuple2._2);
            }
        });
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果


612cdc047a7f4f6bbf0d94ac6073ab24.png

相关文章
|
1月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
74 0
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
59 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
24 0
|
2月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
61 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
38 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
50 0
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
54 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
48 0