Structured Streaming中DS和DF的操作详解

简介: 笔记

创建流式的DataSet和DataFrame

方式一:通过JavaBean方式

Java语言实现:

package com.kfk.spark.structuredstreaming;
import java.sql.Date;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:38 下午
 */
public class DeviceData {
    // device: string, type: string, signal: double, time: DateType
    private String device;
    private String deviceType;
    private double signal;
    private Date deviceTime;
    public String getDevice() {
        return device;
    }
    public void setDevice(String device) {
        this.device = device;
    }
    public String getDeviceType() {
        return deviceType;
    }
    public void setDeviceType(String deviceType) {
        this.deviceType = deviceType;
    }
    public double getSignal() {
        return signal;
    }
    public void setSignal(double signal) {
        this.signal = signal;
    }
    public Date getDeviceTime() {
        return deviceTime;
    }
    public void setDeviceTime(Date deviceTime) {
        this.deviceTime = deviceTime;
    }
    public DeviceData(String device, String deviceType, double signal, Date deviceTime) {
        this.device = device;
        this.deviceType = deviceType;
        this.signal = signal;
        this.deviceTime = deviceTime;
    }
    public DeviceData(){
    }
}
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.Date;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        // 根据javaBean转换为dataset
        Dataset<DeviceData> dfdevice = dflines.map(new MapFunction<String, DeviceData>() {
            @Override
            public DeviceData call(String value) throws Exception {
                String[] lines = value.split(",");
                return new DeviceData(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, ExpressionEncoder.javaBean(DeviceData.class));
        // result table
        Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("update")
                .format("console")
                .start();
        query.awaitTermination();
    }
}

方式二:构造Schema的方式

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper2 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        List<StructField> fields = new ArrayList<StructField>();
        StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
        StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
        StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
        StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
        fields.add(device);
        fields.add(deviceType);
        fields.add(signal);
        fields.add(deviceTime);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 根据schema转换为dataset
        Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
            @Override
            public Row call(String value) throws Exception {
                String[] lines = value.split(",");
                return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, RowEncoder.apply(scheme));
        // result table
        Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("update")
                .format("console")
                .start();
        query.awaitTermination();
    }
}

Scala语言实现:

package com.kfk.spark.structuredstreaming
import com.kfk.spark.common.CommSparkSessionScala
import java.sql.Date
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 7:31 下午
 */
object StruStreamingDFOperScala {
    case  class  DeviceData(device : String,deviceType : String , signal : Double ,deviceTime: Date)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession() ;
        import spark.implicits._;
        // input table
        val socketDF = spark
                .readStream
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as[String]
        // 构造Schema
        val userSchema = new StructType()
                .add("device", "string")
                .add("deviceType", "string")
                .add("signal", "double")
                .add("deviceTime", "date")
        // 根据schema转换为dataset
        val df_device = socketDF.map(x => {
            val lines = x.split(",")
            Row(lines(0),lines(1),lines(2),lines(3))
        })(RowEncoder(userSchema))
        // 根据case  class转换为dataset
        val df : Dataset[DeviceData] = df_device.as[DeviceData]
        // result table
        val df_final = df.groupBy("deviceType").count()
        // output
        val query = df_final.writeStream
                .outputMode("update")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

测试数据:

aa,ss,100,2020-12-21
bb,ss,47,2020-12-21
cc,ss,3,2020-12-21
dd,ss,46,2020-12-21

运行结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-----+
|deviceType|count|
+----------+-----+
|        ss|    3|
+----------+-----+

创建流式的DataSet和DataFrame也可以通过创建临时表来进行业务分析

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper3 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        List<StructField> fields = new ArrayList<StructField>();
        StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
        StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
        StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
        StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
        fields.add(device);
        fields.add(deviceType);
        fields.add(signal);
        fields.add(deviceTime);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 根据schema转换为dataset
        Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
            @Override
            public Row call(String value) throws Exception {
                String[] lines = value.split(",");
                return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, RowEncoder.apply(scheme));
        // 创建临时表
        dfdevice.createOrReplaceTempView("device");
        // result table
        Dataset<Row> dffinal = spark.sql("select * from device");
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("append")
                .format("console")
                .start();
        query.awaitTermination();
    }
}
相关文章
|
运维 监控 安全
【网络安全】护网系列-社工&溯源
【网络安全】护网系列-社工&溯源
1410 0
|
8月前
|
传感器 人工智能 物联网
穿戴科技新风尚:智能服装设计与技术全解析
穿戴科技新风尚:智能服装设计与技术全解析
693 85
|
Web App开发 移动开发 JavaScript
JS - 微信浏览器(H5)语音录音插件(Recorder H5)
JS - 微信浏览器(H5)语音录音插件(Recorder H5)
3070 0
【ChatGLM】本地版ChatGPT ?6G显存即可轻松使用 !ChatGLM-6B 清华开源模型本地部署教程
【ChatGLM】本地版ChatGPT ?6G显存即可轻松使用 !ChatGLM-6B 清华开源模型本地部署教程
|
8月前
|
网络协议 安全
修复安全组未开放风险端口
安全体检结果显示,安全组22/3389端口TCP/UDP协议的入方向规则目的地址设置为0.0.0.0/0存在风险,易遭暴力破解。修复过程:修改安全组22的授权对象,限制访问源地址,增强服务器登录安全性,有效降低风险。
254 13
|
9月前
|
人工智能 程序员 iOS开发
一文彻底学会HarmonyOS的AI编程助手
本文介绍了华为官方AI辅助编程工具CodeGenie,该工具支持HarmonyOS NEXT领域的智能问答、ArkTS代码补全/生成及万能卡片生成,显著提升开发效率。安装步骤包括下载插件、离线安装及授权登录,功能涵盖知识问答、代码补全与生成、以及智能生成HarmonyOS万能卡片。
355 0
|
11月前
|
人工智能 搜索推荐 安全
数字孪生与教育:虚拟实验室的兴起
数字孪生技术通过模拟、分析和优化,为教育创新提供了新机遇。特别是在虚拟实验室的构建和应用上,数字孪生技术打破了物理限制,提供了丰富的学习体验,支持精准教学与个性化学习,有效培养学生的创新能力和实践能力。国内外高校已积极应用,未来将更加智能化、个性化。
|
存储 缓存 算法
什么是配置中心页面?
【10月更文挑战第24天】什么是配置中心页面?
260 3
|
机器学习/深度学习 人工智能 JSON
在LLM浪潮下,prompt工程师需要很懂算法吗?
最近AI大神吴恩达推出prompt教程并给出了prompt构建三大原则,Prompt Engineering Guide也提出了诸多技巧,受AI技术的快速发展的影响,你觉得在LLM浪潮下,prompt工程师需要很懂算法吗?欢迎一起来聊一聊~
26058 50
在LLM浪潮下,prompt工程师需要很懂算法吗?
|
IDE 开发工具 流计算
保姆级Arduino开发环境搭建
保姆级Arduino开发环境搭建
402 1