Flink处理函数实战之二:ProcessFunction类

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 学习和使用Flink处理函数

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

Flink处理函数实战系列链接

  1. 深入了解ProcessFunction的状态操作(Flink-1.10)
  2. ProcessFunction
  3. KeyedProcessFunction类
  4. ProcessAllWindowFunction(窗口处理)
  5. CoProcessFunction(双流处理)

关于处理函数(Process Function)

如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,从本章开始,我们一起通过实战来熟悉处理函数(Process Function),看看这一系列的低级算子可以带给我们哪些能力?
在这里插入图片描述

关于ProcessFunction类

处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement和onTimer:
在这里插入图片描述
常用特性如下所示:

  1. 处理单个元素;
  2. 访问时间戳;
  3. 旁路输出;

接下来写两个应用体验上述功能;

版本信息

  1. 开发环境操作系统:MacBook Pro 13寸, macOS Catalina 10.15.3
  2. 开发工具:IDEA ULTIMATE 2018.3
  3. JDK:1.8.0_211
  4. Maven:3.6.0
  5. Flink:1.9.2

    源码下载

    如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:
在这里插入图片描述

创建工程

执行以下命令创建一个flink-1.9.2的应用工程:

mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2

按提示输入groupId:com.bolingcavalry,architectid:flinkdemo

第一个demo

第一个demo用来体验以下两个特性:

  1. 处理单个元素;
  2. 访问时间戳;

创建Simple.java,内容如下:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Simple {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 并行度为1
        env.setParallelism(1);

        // 设置数据源,一共三个元素
        DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
   
   
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
   
   
                for(int i=1; i<4; i++) {
   
   

                    String name = "name" + i;
                    Integer value = i;
                    long timeStamp = System.currentTimeMillis();

                    // 将将数据和时间戳打印出来,用来验证数据
                    System.out.println(String.format("source,%s, %d, %d\n",
                            name,
                            value,
                            timeStamp));

                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

                    // 为了让每个元素的时间戳不一样,每发射一次就延时10毫秒
                    Thread.sleep(10);
                }
            }

            @Override
            public void cancel() {
   
   

            }
        });


        // 过滤值为奇数的元素
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
   
   
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
   
   
                        // f1字段为奇数的元素不会进入下一个算子
                        if(0 == value.f1 % 2) {
   
   
                            out.collect(String.format("processElement,%s, %d, %d\n",
                                    value.f0,
                                    value.f1,
                                    ctx.timestamp()));
                        }
                    }
                });

        // 打印结果,证明每个元素的timestamp确实可以在ProcessFunction中取得
        mainDataStream.print();

        env.execute("processfunction demo : simple");
    }
}

这里对上述代码做个介绍:

  1. 创建一个数据源,每个10毫秒发出一个元素,一共三个,类型是Tuple2,f0是个字符串,f1是整形,每个元素都带时间戳;
  2. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;
  3. 在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉;
  4. 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;

直接执行Simple类,结果如下,可见过滤和提取时间戳都成功了:
在这里插入图片描述

第二个demo

第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出:

创建SideOutput类:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;

public class SideOutput {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度为1
        env.setParallelism(1);

        // 定义OutputTag
        final OutputTag<String> outputTag = new OutputTag<String>("side-output"){
   
   };

        // 创建一个List,里面有两个Tuple2元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 2));
        list.add(new Tuple2("ccc", 3));

        //通过List创建DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        //所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput
        SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
   
   
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
   
   

                        //进入主流程的下一个算子
                        out.collect("main, name : " + value.f0 + ", value : " + value.f1);

                        //f1字段为奇数的元素进入SideOutput
                        if(1 == value.f1 % 2) {
   
   
                            ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);
                        }
                    }
                });

        // 禁止chanin,这样可以在页面上看清楚原始的DAG
        mainDataStream.disableChaining();

        // 取得旁路数据
        DataStream<String> sideDataStream = mainDataStream.getSideOutput(outputTag);

        mainDataStream.print();
        sideDataStream.print();

        env.execute("processfunction demo : sideoutput");
    }
}

这里对上述代码做个介绍:

  1. 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形;
  2. ProcessFunction的匿名子类中,将每个元素的f0和f1拼接成字符串,发给主流程算子,再将f1字段为奇数的元素发到旁路输出;
  3. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;
  4. 将主流程和旁路输出的元素都打印出来,验证处理结果是否符合预期;

执行SideOutput看结果,如下图,main前缀的都是主流程算子,一共三条记录,side前缀的是旁路输出,只有f1字段为奇数的两条记录,符合预期:
在这里插入图片描述
上面的操作都是在IDEA上执行的,还可以将flink单独部署,再将上述工程构建成jar,提交到flink的jobmanager,可见DAG如下:
在这里插入图片描述
至此,处理函数中最简单的ProcessFunction类的学习和实战就完成了,接下来的文章我们会尝试更多了类型的处理函数;

欢迎关注阿里云开发者社区:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
181 3
|
29天前
|
消息中间件 资源调度 Java
实时计算 Flink版产品使用问题之拉取代码没有这个类,但是在下载的jar包中有这个类,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
存储 流计算
|
1月前
|
传感器 流计算
|
1月前
|
消息中间件 SQL 分布式计算
|
28天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
104 0
|
2月前
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0
|
28天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
718 0
|
28天前
|
消息中间件 资源调度 Java
实时计算 Flink版操作报错合集之遇到了缺少包的错误,已经添加了相应的 jar 包,仍然出现同样的报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
664 2