Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.9.1部署整合standalone集群【离线计算DataSet/实时计算DataStream】

一、环境准备


1.下载地址:https://flink.apache.org


2020091915113814.png

下面那个hadoop的整合包要放到flink的lib中去


2.上传到linux中去,并解压到相关目录

tar -zxvf flink-1.9.1-… apps/


二、standalone部署


2.1 修改conf中的flink-conf.yaml

# 1.主节点的主机名
jobmanager.rpc.address: hadoop01  
# 2.节点的资源槽数
taskmanager.numberOfTaskSlots: 2 
# 3.单机的话,暂时不用配置zookeeper的地址

2.2 修改conf中的slaves


# 设置从节点
hadoop02
hadoop03


2.3 拷贝到其他节点


scp -r flink-1.9.1/ hadoop02:$PWD
scp -r flink-1.9.1/ hadoop03:$PWD


2.4 启动集群


bin/start-cluster.sh


2.5 测试访问


hadoop01:8081

20200919152934679.png



2.6 页面提交程序jar包

20200919153816760.png


提前开设端口,在上面的提交之前

20200919154152935.png


2.7 命令行窗口提交程序jar包

# --hostname hadoop01  --port 8888 为参数
bin/flink run -m hadoop01 -p 4 -c com.wang.Main.class /jar路径  --hostname hadoop01  --port 8888


三、项目整合【Maven3.x+Jdk8/Scala-2.11】


3.1 pom依赖

<dependencies>
    <!--如果是java程序 java所需要的jar-->
    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <scope>provided</scope>
        </dependency>
    <!--如果是scala程序 scala所需要的jar-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.9.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.1</version>
            <scope>provided</scope>
        </dependency>
        <!--依赖日志-->
    <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
</dependencies>
<build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>com.wang.flink.SocketWordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.2 实时计算【DataStream】


1.利用socket通信实现实时的单词计数计算。


2.开启socket端口号


nc -lk 8888


3.java程序

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建数据集Source
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        // 3.数据转换Transformations
        DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    // 输出到收集器
                    out.collect(word);
                }
            }
        });
        // 4.单词和1组合
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });
        // 5. 分组聚合 单词:次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = wordAndOne.keyBy(0).sum(1);
        // 6.Sink 数据下沉
        // 这里只打印到控制台
        sumed.print();
        // 7.启动
        env.execute("StreamingWordCount");
    }
}

3.测试结果


20200919163941892.png


4.打包到集群运行

将程序中的socket通信的主机地址和端口号改为参数形式如:

DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

1)web页面提交,提前开启socket端口

20200919165946723.png


测试结果如下:


20200919170100208.png

20200919170134792.png


5.java8 lambda表达式的优化

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            Arrays.stream(line.split(" ")).forEach(w -> {
                out.collect(Tuple2.of(w, 1));
            });
        });

3.3 离线计算【DataSet】


1.整理一个文件,放入一些数据如

flink flink spark hadoop

flink vue java

hdfs spark

2.java程序

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1.获取配置
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.读取数据
        DataSource<String> lines = env.readTextFile(args[0]);
        // 3.切分压平
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        // 4.离线计算 分组聚合groupBy(),而不是实时计算的keyBy()
        AggregateOperator<Tuple2<String, Integer>> sumed = wordAndOne.groupBy(0).sum(1);
        // 5.保存数据 设置并行度 数据几个文件
        sumed.writeAsText(args[1]).setParallelism(2);
    // 6.执行
    env.execute("BatchWordCount");
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
Oracle Java 关系型数据库
实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 0
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 0
|
1天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 0
|
1天前
|
SQL 存储 关系型数据库
实时计算 Flink版操作报错合集之向Hudi写入数据时遇到错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 0
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 0
|
消息中间件 Java 关系型数据库
Flink实战(五) - DataStream API编程(下)
Flink实战(五) - DataStream API编程(下)
165 0
Flink实战(五) - DataStream API编程(下)
|
消息中间件 监控 Java
Flink实战(五) - DataStream API编程(上)
Flink实战(五) - DataStream API编程(上)
373 0
Flink实战(五) - DataStream API编程(上)
|
1天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0