Flink编程模型

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Flink处理的数据集类型


任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。


数据可以被作为 无界 或者 有界 流来处理。


30.png

有界数据集:

有界数据集具有时间边界,在处理过程中数据一定会在某个时间范围内起始和结束,有可能是一分钟,也有可能是一天内的交易数据。对有界数据集的数据处理方式被称为批计算。


无界数据集:

数据从一开始生成就一直持续不断地产生新的数据,因此数据是没有边界的,例如服务器的日志、传感器信号数据等。和批处理数据方式对应,对无界数据集的数据处理方式被称为流式数据处理。


统一数据处理:

Spark和Flink


(2)Flink核心编程接口


Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。

31.png


Flink根据数据集类型的不同将核心数据处理接口分为两大类型,一类是支持批计算的接口DataSet API,另外一类是支持流计算的接口DataStream API。同时Flink将数据处理接口抽象成四层,由上向下分别为SQL API、Table API、DataStream/DataSet API以及Stateful StreamProcessing API。用户可以根据需要选择任意一层抽象接口来开发Flink应用。


Flink SQL:

Flink提供了统一的SQL API的完成对批计算和流计算的处理,对SQL API还在逐步完善中。


Table API:

Table API将内存中的DataStream和DataSet数据集在原有的基础上增加Schema信息,将数据类型统一抽象成表的结构,然后通过Table API提供的接口处理对应的数据集。SQL API可以直接查询Table API中注册表的数据表。Table API构建在DataStream和DataSet之上的同时,提供了大量面向领域语言的编程接口。例如GroupByKey、 Join等操作符,提供给用户一种更加友好的处理数据集的方式。除此之外,Table API在转换为DataStream和DataSet的数据处理过程中,也应用了大量的优化规则对数据逻辑进行了优化。同时Table API中的table可以和DataStream和DataSet之间进行相互转换。


DataStream API 和DataSet API:

DataStream API和DataSet API主要面向具有开发经验的用户,用户可以使用DataStream API处理无界流数据,使用DataSet API处理批量数据。


Stateful Stream Process API:

Stateful Stream Process APIFlinkStatefu是中处理 Stream最底层的接口,用户可以使用 Stateful Stream Process接口操作状态、时间等底层数据。


(3)Flink编程结构


(3.1)ExecutionEnvironment

运行Flink程序的第一步就是获取相应的执行环境,执行环境决定了程序是在本地环境执行还是集群环境运行。批量处理和流处理分别使用不同的ExecutionEnvironment。有三种方式获取程序的执行环境


以官方提供的流处理案例:


获取ExecutionEnvironment方式(一):

默认的执行环境创建方式,它会根据上下文去创建正确的ExecutionEnvironment,如果你在IDE中执行程序或者将程序作为一个常规的Java/Scala程序执行,那么它将为你创建一个本地的环境,你的程序将在本地执行。如果你将你的程序打成jar包,并通过命令行调用它,那么Flink集群管理器将执行你的main方法并且getExecutionEnvironment()方法将为你的程序在集群中执行生成一个执行环境。所以getExecutionEnvironment方法在本地执行或者集群执行都可用这个方法。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

获取ExecutionEnvironment方式(二):

本地执行环境创建方式,当程序在IDE中运行时候,可以通过createLocalEnvironment创建基于本地的执行环境,可以指定并行度

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

获取ExecutionEnvironment方式(三):

创建远程执行环境。远程环境将程序发送到集群执行。适用于本地直接发送程序到集群上执行测试。

StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("bigdata-pro-m07", 8081,"WordCountJava.jar");

(3.2)初始化数据

readTextFile

socketTextStream

通过读取文件并转化为DataStream[String]数据集,这样就完成了从本地文件到分布式数据集的转换,同时在Flink中提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将Flink系统和其他第三方系统连接,直接获取外部数据。


(3.3)执行转换操作

Flink中的Transformation操作都是通过不同的Operator来实现的,每个Operator内部通过实现Function接口完成数据处理逻辑的定义。在DataStream和DataSet中提供了大量的算子。如Map、FlatMap、Filter、KeyBy等。


Flink中定义Function的计算逻辑可以通过三种方式完成:


方法一:通过创建Class实现Function接口1.png

方法二:通过创建匿名内部类实现Function接口

2.png

方法三:通过实现RichFunction


(3.4)分区key指定

分区的目的是将相同key的value放在同一个partition中


根据字段位置指定(Flink1.12不推荐)

3.jpeg

官方1.12版本中使用的这种方式:

4.png


根据字段名称指定(Flink1.12不推荐)

使用字段名称需要DataStream中的数据结构类型必须是Tuple类或者P0J0类


通过key选择器指定


基于POJO类型通过key选择器指定key分区代码示例:

package com.aikfk.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/5 3:07 下午
 */
public class WordCountJava3 {
    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<WordCount> dataStream = env.socketTextStream("bigdata-pro-m07",9999)
                .flatMap(new FlatMapFunction<String, WordCount>() {
                    @Override
                    public void flatMap(String line, Collector<WordCount> collector) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words){
                            collector.collect(new WordCount(word,1));
                        }
                    }
                })
                // 将相同key的value放在同一个partition(按照key选择器指定)
                .keyBy(new KeySelector<WordCount, Object>() {
                    @Override
                    public Object getKey(WordCount wordCount) throws Exception {
                        return wordCount.word;
                    }
                })
                .reduce(new ReduceFunction<WordCount>() {
                    @Override
                    public WordCount reduce(WordCount t1, WordCount t2) throws Exception {
                        return new WordCount(t1.word , t1.count + t2.count);
                    }
                });
        dataStream.print();
        env.execute("Window WordCount");
    }
    /**
     * POJO类
     */
    public static class WordCount{
        public String word;
        public int count;
        public WordCount() {
        }
        public WordCount(String word, int count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

运行结果:

2> WordCount{word='hive', count=1}
3> WordCount{word='java', count=1}
2> WordCount{word='hive', count=2}
2> WordCount{word='hive', count=3}
3> WordCount{word='java', count=2}
2> WordCount{word='hive', count=4}
15> WordCount{word='hadoop', count=1}

(3.5)输出结果

输出到控制台

输出到文件

输出到外部存储


(3.6)程序触发

应用的执行,需要调用ExecutionEnvironment的Execute()方法来触发应用程序的执行,DataStream流式应用需要显性的指定execute()方法来运行程序,如果不调用程序则不会执行;对于DataSet API输出算子中已经包含了对execute()方法的调用,则不需要显性调用execute()方法;


(4)Flink支持的数据类型


原生数据类型

env.fromElements(3,1,3,4);
env.fromElements("spark", "hbase", "java");
  • Java Tuples类型
env.fromElements(new Tuple2<String, Integer>("spark", 1),
new Tuple2<String, Integer>("java", 1));
  • Scala Case Class类型
case class WordCount(word : String, count : Int)
  • P0J0类型(具有默认构造函数)
  • 特殊数据类型
env.fromElements(Map("name" -> "beo"),
Map("name" -> "henry"))
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
12
分享
相关文章
Flink 流批一体在模型特征场景的使用
本文整理自B站资深开发工程师张杨老师在 Flink Forward Asia 2023 中 AI 特征工程专场中的分享。
77739 5
Flink 流批一体在模型特征场景的使用
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
160 0
实时计算 Flink版产品使用合集之怎样导数据使starrocks支持主键模型delete的配置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
265 1
flink-sql(table api 编程)
table api 基本使用 tableEnvironment 和 streamTableEnvironment 注册表,临时表,持久表 Table api 和 table sql 混用 table api 和 datastream 混用 table api 的输入和输出(kafka) kafka的高级特性option
flink-sql(table api 编程)
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式

热门文章

最新文章