【大数据计算引擎】流式计算引擎Flink1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据计算引擎】流式计算引擎Flink

Flink

1.Flink核心概念简介

1.1.什么是Flink

Apache Flink是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。

官网:https://flink.apache.org/zh/flink-architecture.html

有谁再用?

Apache Flink 为全球许多公司和企业的关键业务提供支持。在这个页面上,我们展示了一些著名的 Flink 用户,他们在生产中运行着有意思的用例,并提供了展示更详细信息的链接。

在项目的 wiki 页面中有一个 谁在使用 Flink 的页面,展示了更多的 Flink 用户。请注意,该列表并不全面。我们只添加明确要求列出的用户。

大厂一般做实时数仓建设、实时数据监控、实时反作弊风控、画像系统等。


6c9e8635a87b4516bedaf26bd1b20c85.jpg


558d2cbd7bea429081a53dc178a36fa3.jpg


e4ec1f8657c3435fb12faca70503630a.jpg

1.2.无界数据和有界数据简介

(1)无界流:有定义流的开始,但没有定义流的结束。他们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都达到在处理,因为输入是无限的,在任何时候输入都不会完成。

处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,一边能够推断结果的完整性

(2)有界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后在进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。


ec4ded3484a24f5aa5628dfec856d920.jpg

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

1.3.Flink优化利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。

任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

82e263d3765d4cafb609f66cfa7066e9.jpg

1.4.Flink常见的应用场景

(1)事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。


21c6146273b0420eb4b9ce454b527229.jpg


事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。

传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

典型的事件驱动型应用实例

(2)数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

3c51664048e24c03811984974b74e202.jpg


流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。

。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 如何支持数据分析类应用?

Flink 为持续流式分析和批量分析都提供了良好的支持。

具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

(3)数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

a583f41eaa624e9395a70259f5d27091.jpg

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

Flink 如何支持数据管道应用?

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

2.Flink案例入门

2.1.创建Maven项目整合Flink

  • 创建maven项目
<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.13.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.16</version>
    </dependency>
    <!--flink客户端-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--scala版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--flink-java版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--streamingጱscala版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--streamingጱjava版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--日志输出-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
        </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
    <!--json依赖包-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
</dependencies>
新建log4j.properties,日志文件
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug级别以上的日志到:src/logs/debug.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
### Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n
### error级别以上的日志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n

2.2.Tuple数据类型操作

/**
 * @author lixiang
 */
public class Test {
    private static List<String> list = new ArrayList<>();
    static {
        list.add("SpringBoot,Docker");
        list.add("Netty,SpringCloud");
        list.add("Flink,Linux");
    }
    public static void test1() {
        Tuple3<Integer, String, Integer> tuple3 = Tuple3.of(1, "lixiang", 23);
        System.out.println(tuple3.f0);
        System.out.println(tuple3.f1);
        System.out.println(tuple3.f2);
    }
    public static void test2() {
        List<String> collect = list.stream().map(obj -> obj + "拼接").collect(Collectors.toList());
        System.out.println(collect);
    }
    public static void test3() {
        List<String> collect = list.stream().flatMap(obj -> Arrays.stream(obj.split(","))).collect(Collectors.toList());
        System.out.println(collect);
    }
}
public static void main(String[] args) {
     test1();
     test2();
     test3();
}

685e6f3a543748cf8e5fd4ffcaa98412.jpg

2.3.Flink流式处理案例

/**
 * @author lixiang
 */
public class FlinkDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口,存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //相同类型元素的数据流
        DataStream<String> stringDataStream = env.fromElements("Java,SpringBoot,SpringCloud", "Java,Linux,Docker");
        stringDataStream.print("处理前");
        //FlatMapFunction<Strings,String>,key是输入类型,value是Collector响应的收集的类型,看源码的注释也是DataStream<String>里面泛型类型
        DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(",");
                for (String s : arr) {
                    out.collect(s);
                }
            }
        });
        flatMapDataStream.print("处理后");
        //DataStream需要execute,可以取个名称
        env.execute("data stream job");
    }
}

fe52de077c6c4d33892010eba587f03f.jpg

可以设置多个线程并行执行

//设置并行度
env.setParallelism(3);


52627464e0284306a1d4425299c1cb3b.jpg

2.4.Flink运行流程解析

(1)Flink和Blink关系

  • 2019年Flink的母公司被阿里全资收购
  • 阿里进行高度定制并取名为Blink (加了很多特性 )
  • 阿里巴巴官方说明:Blink不会单独作为一个开源项目运作,而是Flink的一部分
  • 都在不断演进中,对比其他流式计算框架(老到新)
  • Storm 只支持流处理
  • Spark Streaming (流式处理,其实是micro-batch微批处理,本质还是批处理)
  • Flink 支持流批一体

(2)算子Operator

  • 将一个或多个DataStream转换成新的DataStream,可以将多个转换组合成复杂的数据流拓扑
  • Source和Sink是数据输入和数据输出的特殊算子,重点是transformation类的算子
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

(3)Flink在生产环境中的用法

  • flink可以本地idea执行模拟多线程执行,但不能读取配置文件,适合本地调试
  • 可以提交到远程搭建的flink集群
  • getExecutionEnvironment() 是flink封装好的方式可以自动判断运行模式,更方便开发,
  • 如果程序是独立调用的,此方法返回本地执行环境;
  • 如果从命令行客户端调用程序以提交到集群,则返回此集群的执行环境,是最常用的一种创建执行环境的方式

(4)Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

  • Local 本地部署,直接启动进程,适合调试使用
  • Standalone Cluster集群部署,flink自带集群模式
  • On Yarn 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率,生产环境
  • c5f02920a1974901a0e3ed4362f07a76.jpg

2.5.Flink可视化控制台

(1)增加maven依赖

<!--Flink web ui-->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>

访问方式:ip:8081

(2)代码开发

/**
 * flink UI demo
 * @author lixiang
 */
public class FlinkUI {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置并行度
        env.setParallelism(1);
        //监听192.168.139.80:8888输送过来的数据流
        DataStreamSource<String> stream = env.socketTextStream("192.168.139.80", 8888);
        //流处理
        SingleOutputStreamOperator<String> streamOperator = stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        streamOperator.print("处理后");
        //执行任务
        env.execute("data stream job");
    }
}

54e442dcc5e04b1682374e6c5cca6cba.jpg

9ad06488d7824712b3a6ff4d0662eabc.jpg


18baf31544484967b322932833ceacd2.jpg

  • nc命令介绍
  • Linux nc命令用于设置网络路由的
  • nc -lk 8888
  • 开启 监听模式,用于指定nc将处于监听模式, 等待客户端来链接指定的端口

win | linux 需要安装

  • win 百度搜索博文参考不同系统安装
  • yum install -y netcat
  • yum install -y nc

3.Flink部署和整体架构讲解

3.1.Flink运行流程简介

(1)运行流程

  • 用户提交Flink程序到JobClient
  • JobClient的解析、优化提交到JobManager
  • TaskManager运行task,并上报信息给JobManager

3.2.Flink组件角色介绍


941313ba7dad4ade97529db527a55d5d.jpg

(1)Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。

  • 运行时由两种类型的进程组成:
  • 一个JobManager
  • 一个或者多个TaskManager
  • 2)什么是JobManager
  • 协调Flink应用程序的分布式执行的功能
  • 它决定何时调度下一个task
  • 对完成的task或执行失败做出反应
  • 协调checkpoint、并且协调从失败中恢复等等

(3)什么是TaskManager

  • 负责计算的worker,还有上报内存,任务运行情况给JobManager等
  • 至少有一个TaskManager,也称为worker执行作业流的task,并且缓存和交换数据流
  • 在TaskManager中资源调度的最小单位是task slot

(4)JobManager进程由三个不同的组件组成

  • ResourceManager
  • 负责Flink集群中的资源提供、回收、分配,它管理task slots
  • Dispatcher
  • 提供一个REST接口,用来提交Flink应用程序执行
  • 为每一个提交的作业启动一个新的JobManager
  • 运行Flink WebUI用来提供作业执行信息
  • JobMaster
  • 负责管理单个JobGraph的执行,Flink集群中可以同时运行多个作业,每个作业都有自己的JobManager
  • 至少有一个JobManager,高可用(HA)设置中可能有多个JobManager,其中一个始终是Leader,其他则是standby
  • (5)TaskManager中task slot的数量表示并发处理task的数量
  • 一个task slot中可以执行多个算子,里面有多个线程
  • 算子opetator
  • source
  • transformation
  • sink
  • 对于分布式执行,Flink将算子的subtasks链接成tasks,每个task由一个线程执行
  • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
  • 将算子链接成task是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的时增加整体吞吐量

c320d1e4a8c048b2b1d806bfd3bff53e.jpg

820859b288a44d5e85670ef41ded74a6.jpg

(6)Task Slots任务槽

  • Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask,每个subtask会以单独的线程来运行
  • 每个worker(TaskManager)是一个JVM进程,可以在单独的线程中执行一个(1个solt)或多个subtask
  • 为了控制一个TaskManager中接受多少个task,就是为了所谓的task slots(至少一个)
  • 每个task slots代表TaskManager中资源的固定子集
  • 注意:
  • 所有Task Slot平均分配TaskManager的内存,TaskSlot没有CPU隔离
  • 当前TaskSlot独占内存空间,作业间互不影响
  • 一个TaskManager进程里面有多少个TaskSlot就意味着多少个并发
  • Task Solt数量建议是cpu的核数,独占内存,共享CPU
  • ff6f1175973f4f28b942d7d0347745fc.jpg

5个subtask执行,因此有5个并行线程

  • Task正好封装了一个Operator或者Operator Chain的parallel instance。
  • Sub-Task强调的是同一个Operator或者Operator Chain具有多个并行的Task。
  • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
  • 算子连接成一个task他减少线程间切换、缓冲的开销,并减少延迟的同事增加整体吞吐量。

071d16ec418c405b9a8bf21d08c2fc06.jpg

  • Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个Task即subtask,每个subtask会以单独的线程来运行。
  • Flink算子之间可以通过【一对一】模式或者【重新分发】模式传输数据。

3.3.Flink并行度和优先级概念

(1)Flink是分布式流式计算框架

  • 程序在多个节点并行执行,所以就有并行度Parallelism
  • DataStream就像是有向无环图(DAG),每一个数据流(DataStream)以一个或者多个source开始,以一个或者多个sink结束

(2)流程

  • 一个数据流(stream)包含一个或者多个分区,在不同的线程、物理机里并行执行
  • 每一个算子(operator)包含一个或者多个子任务(subtask),子任务在不同的线程、物理机里并行执行
  • 一个算子的子任务subtask的个数就是并行度(parallelism)
  • (3)并行度的调整配置
  • Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级
  • Flink并行度配置级别(高到低)
  • 算子
  • map(xxxx).setParallelism(2)
  • 全局env
  • env.setParallelism(2)
  • 客户端cli
  • ./bin/flink run -p 2 xxx.jar
  • Flink配置文件
  • /conf/flink-conf.yaml的parallelism.default默认值
  • 本地IDEA运行,并行度默认为cpu核数

(4)一个很重要的区分TaskSolt和parallelism并行度配置

  • taskslot是静态的概念,是指taskmanager具有的并发执行能力
  • parallelism是动态的概念,是指程序运行时实际使用的并发能力

(5)Flink有3种运行模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  • STRAMING 流处理
  • BATCH 批处理
  • AUTOMATCH 根据cource类型自动选择运行模式,基本就是使用这个

4.Flink里的Source Operator实战

4.1.Source Operator速览

(1)Flink的API层级为流式/批式处理应用程序的开发提供了不同级别的抽象

  • 第一层是最底层的抽象为有状态实时流处理,抽象实现是ProcessFunction,
  • 用于底层处理
  • 第二层抽象是Core APIs,许多应用程序不需要用到上述最底层抽象的API,而是使用Core APIs进行开发
  • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层API中处理的数据类型在每种编程语言中都有对应的类。
  • 第三层抽象是Table API。是以表Table为中心的声明式编程API,Table API使用起来很简洁但是表达能力差
  • 类似数据库中关系模型中的操作,
  • 比如select、project、join、group by和aggregate等
  • 允许用户在编写应用程序时将Table API与DataStream/DataSet API混合使用
  • 第四层最顶层抽象是SQL,这层程序表达式上都类似于Table API,
  • 但是器程序实现都是SQL查询表达式
  • SQL抽象与Table API抽象之间的关联是非常紧密的


d0018bde9d76426c8ef5792e9d1632ed.jpg

(2)Flink编程模型


b65fe38d5a2e4df9bc7ebe850b161341.jpg

(3)Source来源

  • 元素集合
  • env.fromElements
  • env.fromColletion
  • env.fromSequence(start,end);
  • 文件/文件系统
  • env.readTextFile(本地文件);
  • env.readTextFile(HDFS文件);
  • 基于Socket
  • env.socketTextStream(“ip”, 8888)
  • 自定义Source,实现接口自定义数据源,rich相关的api更丰富
  • 并行度为1
  • SourceFunction
  • RichSourceFunction
  • 并行度大于1
  • ParallelSourceFunction
  • RichParallelSourceFunction

(4)Connectors与第三方系统进行对接(用于source或者sink都可以)

  • Flink本身提供Connector例如kafka、RabbitMQ、ES等
  • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败

(5)Apache Bahir连接器

  • 里面也有kafka、RabbitMQ、ES的连接器更多

4.2.元素集合类型Source

元素集合

  • env.fromElements
  • env.fromColletion
  • env.fromSequence(start,end)

代码实战

/**
 * @author lixiang
 */
public class FlinkSourceDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> ds1 = env.fromElements("java,springboot", "kafka,redis", "openstack,k8s,docker");
        ds1.print("ds1");
        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList("hive", "hadoop", "hbase", "rabbitmq", "java"));
        ds2.print("ds2");
        DataStreamSource<Long> ds3 = env.fromSequence(0, 10);
        ds3.print("ds3");
        //执行任务
        env.execute("data job");
    }
}

d899ecb23cc845929ff2dce2f6a3fdf5.jpg

4.3.文件/文件系统Source

文件/文件系统

  • env.readTextFile(本地文件)
  • env.readTextFile(HDFS文件)
/**
 * @author lixiang
 */
public class FlinkSourceDemo2 {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> textDS = env.readTextFile("E:\\软件资料\\log.txt");
        //DataStream<String> hdfsDS = env.readTextFile("hdfs://lixiang:8010/file/log/words.txt");
        textDS.print("textDS");
        env.execute("text job");
    }
}


38af6fd0adcd4653930e29c9ab7f807d.jpg

4.4.基于Socket的Source

基于Socket

  • env.socketTextStream(“ip”,8888)
/**
 * flink UI demo
 * @author lixiang
 */
public class FlinkUI {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置并行度
        env.setParallelism(1);
        //监听192.168.139.80:8888输送过来的数据流
        DataStreamSource<String> stream = env.socketTextStream("192.168.139.80", 8888);
        //流处理
        SingleOutputStreamOperator<String> streamOperator = stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        streamOperator.print("处理后");
        //执行任务
        env.execute("data stream job");
    }
}

09052237a1594e23bf123a04f977cc59.jpg

1018cfdb7c7a42f7841eb30cfabdcc59.jpg

4.5.自定义Source

自定义Source,实现接口自定义数据源

  • 并行度为1
  • SourceFunction
  • RichSourceFunction
  • 并行度大于1
  • ParallelSourceFunction
  • RichParallelSourceFunction
  • Rich相关的api更丰富,多了Open和Close方法,用于初始化连接,关闭等

设置实体VideoOrder

/**
 * 订单实体类
 * @author lixiang
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VideoOrder
{
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;
}

设置VideoOrderSource

/**
 * @author lixiang
 */
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder>
{
    private volatile Boolean flag = true;
    private Random random = new Random();
    private static List<String> list = new ArrayList<>();
    static
    {
        list.add("SpringBoot2.x课程");
        list.add("Linux入到到精通");
        list.add("Flink流式技术课程");
        list.add("Kafka流式处理消息平台");
        list.add("微服务SpringCloud教程");
    }
    @Override
    public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
        int x = 0;
        while (flag)
        {
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            sourceContext.collect(new VideoOrder(id, title, money, userId, new Date()));
            x++;
            if (x == 10)
            {
                cancel();
            }
        }
    }
    /**
     * 取消任务
     */
    @Override
    public void cancel()
    {
        flag = false;
    }
}

编写Flink任务

/**
 * @author lixiang
 */
public class FlinkMainSource {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<VideoOrder> source = env.addSource(new VideoOrderSource());
        source.print("接入的数据");
        SingleOutputStreamOperator<Integer> streamOperator = source.flatMap(new FlatMapFunction<VideoOrder, Integer>() {
            @Override
            public void flatMap(VideoOrder value, Collector<Integer> out) throws Exception {
                out.collect(value.getMoney());
            }
        });
        streamOperator.print("处理后");
        //流程启动
        env.execute("custom source job");
    }
}


cba706426b874504b6551a644dec5408.jpg

4.6.并行度调整结合WebUI

(1)开启WebUI

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

(2)设置不同并行度

e12147462ffb40d59a1c59f3c30966d2.jpg

  • 数据流中最大的并行度,就是算子链中最大算子的数量,比如source 2个并行度,filter 4个,sink 4个,最大就是4


ca5ec31aff0b444492fc5aa18b2341bc.jpg


相关文章
|
1天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
11 1
|
1天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
10 0
|
1天前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
14 0
|
1天前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
11 0
|
1天前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
11 0
|
大数据 流计算
大数据—Flink深入学习
1.容错机制
134 0
|
21天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
749 7
阿里云实时计算Flink在多行业的应用和实践
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
50 15

热门文章

最新文章