【大数据环境准备】(十一)Flink安装

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink安装

一、安装

1、下载地址

https://archive.apache.org/dist/flink/ 
本文选择的版本是:flink-1.13.0-bin-scala_2.12.tgz

2、解压文件

[centos@hadoop10 data]$ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /data/module/
[centos@hadoop10 module]$ mv flink-1.13.0 flink

3、启动

[centos@hadoop10 module]$ ./flink/bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop10.
Starting taskexecutor daemon on host hadoop10.
[centos@hadoop10 module]$ jps
66328 TaskManagerRunner
66059 StandaloneSessionClusterEntrypoint

[centos@hadoop10 module]$ ./flink/bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 66328) on host hadoop10.
Stopping standalonesession daemon (pid: 66059) on host hadoop10.

4、集群配置

(1)进入conf目录下,修改flink-conf.yaml文件,修改jobmanager.rpc.address参数为hadoop10,如下所示:

$ cd conf/
$ vim flink-conf.yaml
# JobManager节点地址.
jobmanager.rpc.address: hadoop10

这就指定了hadoop10节点服务器为JobManager节点

(2)修改workers文件,将另外两台节点服务器添加为本Flink集群的TaskManager节点,具体修改如下:

$ vim workers 
hadoop11
hadoop12

这样就指定了hadoop11和hadoop12为TaskManager节点。

(3)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:

  • memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
  • memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
  • numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
  • default:Flink任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。

5、分发

[centos@hadoop10 data]$ ./xsync /data/module/flink/

6、启动

[centos@hadoop10 bin]$ ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop10.
Starting taskexecutor daemon on host hadoop11.
Starting taskexecutor daemon on host hadoop12.

7、访问ui

启动成功后,同样可以访问http://hadoop10:8081对flink集群和任务进行监控管理,如图3-3所示。http://192.168.31.10:8081/#/overview

Untitled

二、向集群提交作业

1、新建flink 项目

2、POM 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>gmall-flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>20</maven.compiler.source>
        <maven.compiler.target>20</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3、log4j.properties 配置

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{
   yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n

log4j.rootLogger=error,stdout

4、1. 程序打包

(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:

<build>
     <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
</build>

(2)插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。

Untitled

4.2. 在WebUI上提交作业

(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如图3-4所示

Untitled

(2)点击该JAR包,出现任务配置页面,进行相应配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
4月前
|
SQL 分布式计算 大数据
请问本地安装了大数据计算MaxCompute studio,如何验证联通性及基本DDL操作呢?
请问本地安装了大数据计算MaxCompute studio,如何验证联通性及基本DDL操作呢?
27 0
|
4月前
|
大数据 Docker 容器
大数据 安装指南-----利用docker
大数据 安装指南-----利用docker
42 0
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
分布式计算 网络安全 流计算
Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
【2月更文挑战第15天】Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
71 0
|
2月前
|
SQL 大数据 API
大数据技术之Flink---day01概述、快速上手
大数据技术之Flink---day01概述、快速上手
76 4
|
2月前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
497 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
3月前
|
缓存 Java 大数据
CDH大数据环境参数优化指南
CDH大数据环境参数优化指南
|
3月前
|
弹性计算 大数据 调度
Flink中Mesos在大数据领域中使用较多
【1月更文挑战第19天】【1月更文挑战第92篇】Flink中Mesos在大数据领域中使用较多
34 1
|
4月前
|
Shell Apache 流计算
Apache Flink教程----1.安装初体验
Apache Flink教程----1.安装初体验
39 0
|
4月前
|
分布式计算 大数据 Hadoop
Python大数据之PySpark(二)PySpark安装
Python大数据之PySpark(二)PySpark安装
222 0

热门文章

最新文章