Spark Streaming与Spark SQL结合操作详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通 过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操 作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。


案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类 top3热门的商品。


数据输入模型:

--------------------------
姓名    商品分类    商品名称
alex  bigdata   hadoop
alex  bigdata   spark
jack  bigdata   flink
alex  bigdata   hadoop
alex  language  java
pony  language  python
jone  language  java
lili  bigdata   hadoop
--------------------------

第一步:mapToPair

第一步:mapToPair
<bigdata_hadoop,1>
<bigdata_spark,1>
<bigdata_flink,1>
...

第二步:reduceByKeyAndWindow

<bigdata_hadoop,5>
<bigdata_spark,2>
<bigdata_flink,4>
...

第三步:foreachRDD

foreachRDD -> DataFrame -> registerTempView -> sql

Java语言实现:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    }
}
package com.kfk.spark.top_hot_product_project;
import com.kfk.spark.common.CommSparkSession;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 1:36 下午
 */
public class TopHotProduct {
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        /**
         * 输入数据源模型
         * --------------------------
         * 姓名   商品分类    商品名称
         * alex   bigdata   hadoop
         * alex   bigdata   spark
         * jack   bigdata   flink
         * alex   bigdata   hadoop
         * alex   language  java
         * pony   language  python
         * jone   language  java
         * lili   bigdata   hadoop
         * --------------------------
         */
        JavaReceiverInputDStream<String> inputDstream = jssc.socketTextStream("bigdata-pro-m04",9999);
        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        JavaPairDStream<String, Integer> pairDstream = inputDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<>(line[1] + "_" + line[2], 1);
            }
        });
        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        JavaPairDStream<String, Integer> windowPairStream = pairDstream.reduceByKeyAndWindow(
                new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        }, Durations.seconds(60),Durations.seconds(10));
        /**
         * foreachRDD
         */
        windowPairStream.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception {
                // 转换成RDD[Row]
                JavaRDD<Row> countRowRdd = stringIntegerJavaPairRdd.map(new Function<Tuple2<String, Integer>, Row>() {
                    @Override
                    public Row call(Tuple2<String, Integer> tuple) throws Exception {
                        String category = tuple._1.split("_")[0];
                        String product = tuple._1.split("_")[1];
                        Integer productCount = tuple._2;
                        return RowFactory.create(category,product,productCount);
                    }
                });
                List<StructField> structFields = new ArrayList<StructField>();
                structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));
                structFields.add(DataTypes.createStructField("productCount",DataTypes.IntegerType,true));
                // 构造Schema
                StructType structType = DataTypes.createStructType(structFields);
                // 创建SparkSession
                SparkSession spark = CommSparkSession.getSparkSession();
                // 通过rdd和scheme创建DataFrame
                Dataset<Row> df = spark.createDataFrame(countRowRdd,structType);
                // 创建临时表
                df.createOrReplaceTempView("product_click");
                /**
                 * 统计每个种类下点击次数排名前3名的商品
                 */
                spark.sql("select category,product,productCount from product_click " +
                        "order by productCount desc limit 3").show();
                /**
                 * +--------+-------+------------+
                 * |category|product|productCount|
                 * +--------+-------+------------+
                 * | bigdata| hadoop|           4|
                 * | bigdata|  kafka|           2|
                 * | bigdata|  flink|           2|
                 * +--------+-------+------------+
                 */
            }
        });
        jssc.start();
        jssc.awaitTermination();
    }
}

Scala语言实现:

package com.kfk.spark.top_hot_product_project
import com.kfk.spark.common.{CommSparkSessionScala, CommStreamingContextScala}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.Seconds
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/19
 * @time : 3:59 下午
 */
object TopHotProductScala {
    def main(args: Array[String]): Unit = {
        val jssc = CommStreamingContextScala.getJssc
        /**
         * 输入数据源模型
         * --------------------------
         * 姓名   商品分类    商品名称
         * alex   bigdata   hadoop
         * alex   bigdata   spark
         * jack   bigdata   flink
         * alex   bigdata   hadoop
         * alex   language  java
         * pony   language  python
         * jone   language  java
         * lili   bigdata   hadoop
         * --------------------------
         */
        val inputDstream = jssc.socketTextStream("bigdata-pro-m04", 9999)
        /**
         * mapToPair
         * <bigdata_hadoop,1>
         * <bigdata_spark,1>
         * <bigdata_flink,1>
         */
        val pairDStream = inputDstream.map(x => (x.split(" ")(1) + "_" + x.split(" ")(2),1))
        /**
         * reduceByKeyAndWindow
         * <bigdata_hadoop,5>
         * <bigdata_spark,2>
         * <bigdata_flink,4>
         */
        val windowDStream = pairDStream.reduceByKeyAndWindow((x:Int,y:Int) => x+y,Seconds(60),Seconds(10))
        windowDStream.foreachRDD(x => {
            // 转换成RDD[Row]
            val rowRDD = x.map(tuple => {
                val category = tuple._1.split("_")(0)
                val product = tuple._1.split("_")(1)
                val productCount = tuple._2
                Row(category,product,productCount)
            })
            // 构造Schema
            val structType = StructType(Array(
                StructField("category",StringType,true),
                StructField("product",StringType,true),
                StructField("productCount",IntegerType,true)
            ))
            // 创建SparkSession
            val spark = CommSparkSessionScala.getSparkSession()
            // 通过rdd和scheme创建DataFrame
            val df = spark.createDataFrame(rowRDD,structType)
            // 创建临时表
            df.createOrReplaceTempView("product_click")
            /**
             * 统计每个种类下点击次数排名前3名的商品
             */
            spark.sql("select category,product,productCount from product_click "
                    + "order by productCount desc limit 3").show()
            /**
             * +--------+-------+------------+
             * |category|product|productCount|
             * +--------+-------+------------+
             * | bigdata| hadoop|           4|
             * | bigdata|  kafka|           2|
             * | bigdata|  flink|           2|
             * +--------+-------+------------+
             */
        })
        jssc.start()
        jssc.awaitTermination()
    }
}

测试数据源:

alex bigdata hadoop
alex bigdata spark
jack bigdata flink
alex bigdata hadoop
alex language java
pony language python
jone language java
lili bigdata hadoop
lili bigdata hive
lili bigdata hbase
lili bigdata hadoop
lili bigdata spark
lili bigdata flink
lili bigdata kafka
lili bigdata kafka

运行结果:

+--------+-------+------------+
|category|product|productCount|
+--------+-------+------------+
| bigdata| hadoop|           4|
| bigdata|  kafka|           2|
| bigdata|  flink|           2|
+--------+-------+------------+
相关文章
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
88 0
|
12天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
12天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
|
1月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
45 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
32 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
29 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
下一篇
无影云桌面