Structured Streaming之Event-Time的Window操作

简介: 笔记

基于Event-Time的Window操作

详细工作流程:10.png11.png12.png

方式一:

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple2;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:44 下午
 */
public class EventTimeWindow {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<Row> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .option("includeTimestamp",true)
                .load();
        /**
         * 输入数据:spark storm hive
         * 接受到数据模型:
         * 第一步:words -> spark storm hive,  timestamp -> 2019-09-09 12:12:12
         *        eg: tuple -> (spark storm hive , 2019-09-09 12:12:12)
         *
         * 第二步:split()操作
         *        对tuple中的key进行split操作
         *
         * 第三步:flatMap()操作
         * 返回类型 -> list(tuple(spark,2019-09-09 12:12:12),
         *               tuple(storm,2019-09-09 12:12:12),
         *               tuple(hive,2019-09-09 12:12:12))
         */
        Dataset<Row> words =  dflines.as(Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP()))
                .flatMap(new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
                    @Override
                    public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> stringTimestampTuple2) throws Exception {
                        List<Tuple2<String, Timestamp>> result = new ArrayList<>();
                        for (String word : stringTimestampTuple2._1.split(" ")){
                            result.add(new Tuple2<>(word,stringTimestampTuple2._2));
                        }
                        return result.iterator();
                    }
                },Encoders.tuple(Encoders.STRING(),Encoders.TIMESTAMP())).toDF("word","wordtime");
        // result table
        Dataset<Row> windowedCounts = words.groupBy(
                functions.window(words.col("wordtime"),
                        "10 minutes",
                        "5 minutes"),
                words.col("word")
        ).count();
        StreamingQuery query = windowedCounts.writeStream()
                .outputMode("update")
                .format("console")
                .option("truncate","false")
                .start();
        query.awaitTermination();
    }
}

方式二:

package com.kfk.spark.structuredstreaming;
import java.sql.Timestamp;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:25 下午
 */
public class EventData {
    //timestamp: Timestamp,
    //word: String
    private Timestamp wordtime;
    private String word;
    public Timestamp getWordtime() {
        return wordtime;
    }
    public void setWordtime(Timestamp wordtime) {
        this.wordtime = wordtime;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public EventData() {
    }
    public EventData(Timestamp wordtime, String word) {
        this.wordtime = wordtime;
        this.word = word;
    }
}
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.Timestamp;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 9:44 下午
 */
public class EventTimeWindow2 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        /**
         *  数据源:2019-03-29 16:40:00,hadoop
         *        2019-03-29 16:40:10,storm
         *
         *  // 根据javaBean转换为dataset
         */
        Dataset<EventData> dfeventdata = dflines.map(new MapFunction<String, EventData>() {
            @Override
            public EventData call(String value) throws Exception {
                String[] lines  = value.split(",");
                return new EventData(Timestamp.valueOf(lines[0]),lines[1]);
            }
        }, ExpressionEncoder.javaBean(EventData.class));
        // result table
        Dataset<Row> windowedCounts = dfeventdata.groupBy(
                functions.window(dfeventdata.col("wordtime"),
                        "10 minutes",
                        "5 minutes"),
                dfeventdata.col("word")
        ).count();
        StreamingQuery query = windowedCounts.writeStream()
                .outputMode("update")
                .format("console")
                .option("truncate","false")
                .start();
        query.awaitTermination();
    }
}


相关文章
|
8月前
|
前端开发 API 开发者
harmonyOS基础- 快速弄懂HarmonyOS ArkTs基础组件、布局容器(前端视角篇)
本文由黑臂麒麟(6年前端经验)撰写,介绍ArkTS开发中的常用基础组件与布局组件。基础组件包括Text、Image、Button等,支持样式设置如字体颜色、大小和加粗等,并可通过Resource资源引用统一管理样式。布局组件涵盖Column、Row、List、Grid和Tabs等,支持灵活的主轴与交叉轴对齐方式、分割线设置及滚动事件监听。同时,Tabs组件可实现自定义样式与页签切换功能。内容结合代码示例,适合初学者快速上手ArkTS开发。参考华为开发者联盟官网基础课程。
621 75
harmonyOS基础- 快速弄懂HarmonyOS ArkTs基础组件、布局容器(前端视角篇)
PTA-查询水果价格
该程序展示一个水果菜单,包含苹果、梨、桔子和葡萄的单价,以及退出选项。用户输入编号选择查询水果,连续查询超5次或输入0则退出。输入样例1:查询桔子、错误输入、退出,输出相应价格及0价;输入样例2:连续查询多个水果直至超过限制,显示对应价格。代码通过列表和循环实现查询功能,根据用户输入输出价格。
239 0
|
算法 安全 网络安全
Diffie-Hellman (DH) 算法的工作原理
【8月更文挑战第23天】
2015 0
|
小程序 JavaScript 数据库
微信小程序轮播图实现(超简单)
本文介绍了微信小程序使用内置swiper组件实现轮播图的简单方法。对于普通开发,示例代码展示了在wxml中配置swiper组件和嵌套image标签,以及相应的wxss样式设置。采用云开发时,wxml利用wx:for绑定数据列表,js部分展示如何从云端数据库获取数据并设置到list中。只需替换image的src为实际图片路径,即可完成轮播图功能。
993 0
|
前端开发
字体族[font-family]
字体族[font-family]。
90 2
|
存储 程序员
存储器-分段存储管理方式
存储器-分段存储管理方式
716 0
EMQ
|
监控 安全 算法
使用 SSL/TLS 加强 MQTT 通信安全
本文将着重介绍 TLS 以及它如何保证 MQTT 通信的完整性、机密性和真实性。
EMQ
1569 0
|
数据库 网络虚拟化 数据安全/隐私保护
|
机器学习/深度学习 存储 人工智能
那些加入抗击新冠病毒大军的AI组织和公司
发展到今天,人工智能会对新型冠状病毒的防疫产生广泛的影响吗?我认为是的,而且,这次的影响涉及到全球范围。
|
虚拟化 芯片 开发工具
《深入学习VMware vSphere 6》——2.4 在普通PC中安装VMware ESXi的注意事项
之后会进入VMware ESXi安装程序,在“Select a Disk to Install or Upgrade”对话框中选择要安装ESXi的磁盘。虽然当前主机支持RAID,但这只是南桥芯片组支持的RAID,相当于“软”RAID,所以ESXi会“跳过”这个RAID,直接识别出每个硬盘,如图2-4-13所示。
8442 0
下一篇
开通oss服务