如何构建、部署运行Flink程序。

简介: 如何构建、部署运行Flink程序

一、构建Flink程序


构建一个Flink程序有两种方式


方式一:构建 maven 工程,导入流式应用依赖包


<!-- 基础依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
 <!-- DataStream -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>

方式二:基础环境构建直接使用快捷命令【推荐在Mac或者Linux上使用】


curl https://flink.apache.org/q/quickstart.sh | bash -s 1.11.3
* -s 构建 flink 版本

Flink程序一般的开发步骤

构建完成Flink程序之后就可以开发程序了,开发一个Flink程序的一般步骤:


Obtain an execution environment,(构建流执行环境)

Load/create the initial data,(加载初始化的数据)

Specify transformations on this data,(指定此数据的转换)

Specify where to put the results of your computations,(指定计算结果的放置位置)

Trigger the program execution(触发程序执行)


二、快速上手Flink程序


批处理案例:


//批处理 (DataSet) 支持离线数据
public class WordCount {
    public static void main(String[] args)  throws Exception{
        //创建执行环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //从文件中读取数据
        String inputPath="text.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //对数据集进行处理
       DataSet<Tuple2<String,Integer>>  resultSet = inputDataSet.flatMap(new MyflatMapper())
                //按照第一个位置对word分组
                .groupBy(0)
                //将第二个位置上对数据求和
                .sum(1);
        resultSet.print();
    }
    //自定义类实现FlatMapFunction
    public static class MyflatMapper implements FlatMapFunction<String,Tuple2<String,Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //按空格分词
            String[] words=value.split(" ");
            //遍历所有ord,包成二元组
            for(String word:words){
                out.collect(new Tuple2<>(word,1));
            }
        }
    }
}

本地运行结果展示:

11.png


流处理案例:

//流处理 (DataStream)支持实时数据
public class StreamWordCount {
    /**
     * @author ZhaoPan
     * @createTime 2022/3/2
     * @description
     */
    public static void main(String[] args) throws Exception {
        //创建流处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度 相当于8个线程
        //env.setParallelism(2);
        //从文件中读取数据
    String inputPath="text.txt";
        DataStream<String> inputDataSream = env.readTextFile(inputPath);
        //基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = inputDataSream.flatMap(new WordCount.MyflatMapper())
                .keyBy(0)
                .sum(1);
        resultStream.print();
        //执行任务
        env.execute();
    }
}

本地运行结果:


22.png


三、运行部署Flink程序


此处介绍两种部署Flink程序的方式:


方式一:Standalone 模式 单机【本地测试推荐】【重点】

1、官网下载 flink 包:https://flink.apache.org/downloads.html#update-policy-for-old-releases


33.png


2、解压 flink-1.10.2-bin-scala_2.12 进入到 conf 目录,修改配置

44.png


# jobmanager节点可用的内存大小。
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
# taskmanager节点可用的内存代大小。
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
# 每台机器可用的cpu数量
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
# 默认情况下任务的并行度
parallelism.default: 1
slot 和 parallelism 总结:
1、slot 是静态的概念,是指 taskmanager 具有的并发执行能力
2、parallelism 是动态的概念,是指程序运行实际使用的并发能力
3、设置合适的 parallelism 来提高运算效率(kafka 应用一般和 partition 一一对应或成倍数关系配置)

flink从 1.8.0 版本开始,移除了对 hadoop 版本的依赖,在客户端包中需要提前将 hadoop 依赖添加到 flink 客户端 lib/ 目录下

注意:此处下载完flink对应的tar包后,还需要下载hadoop的jar包,最后将jar包放入lib目录


55.png

3、启动


进入bin目录 键入 ./start-cluster.sh

66.png

4、访问


注:我这里是将服务部署在自己的服务器上,访问的时候通过IP+端口访问,本地的话就是localhost:8081


http://IP地址:8081


至此就可以访问到如下前端页面,可以对 flink 集群和任务进行监控管理。

77.png

5、提交任务


后台命令方式提交:bin/flink run -h

打成jar包,前端提交

88.png

方式二:Yarn


以 Yarn 模式部署 Flink 任务时,要求 Flink 是有 Hadoop 支持的版本,Hadoop环境需要保证版本在 2.2 以上,并且集群中安装有 HDFS 服务。


Flink 提供了两种在 yarn 上运行的模式,分别为 Session-Cluster 和 Per-Job-Cluster模式。


模式一:yarn-session


原理:在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn集群中,除非手工停止。当资源不足时,后提交的任务会进入等待,直到有任务结束释放资源

适用场景:适合规模小执行时间短的作业

99.png

部署运行步骤:


1、启动 yarn-session


bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
参数解读:
-n(--container):TaskManager的数量。
-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。

2、启动任务

./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

模式二:yarn-cluster【日常使用频次最高方式】


原理:提交任务的时候创建新的 Application,用来运行程序,如果没有任务就不用创建

适用场景:大型批任务,复杂性高、数据量大流式任务

00.png

启动任务


./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
消息中间件 存储 传感器
369 0
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
276 3
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
903 2
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1666 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
962 7
Flink Materialized Table:构建流批一体 ETL
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
899 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
300 2
利用java8 的 CompletableFuture 优化 Flink 程序
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
268 0
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
206 0