在Docker跑通Flink分布式版本的WordCount

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Docker跑通Flink分布式版本的WordCount

前言

前文我们介绍了,使用Docker快速部署Flink分布式集群,,这一把我们研究一下怎么自己撸一个WordCount上去跑起来。

官网例子的问题

大家发现我的风格或多或少是因为引导大家怎么去入门到熟悉的过程,所以我希望传递给大家一些学习的办法。我是比较大家直接看官网源码的例子的,而且官网刻意给出来本身也就是这个作用,WordCount非常重要,这是一个麻雀虽小五脏俱全的例子,这个例子熟悉了,其实就是入门了。熟悉的第一步特征就是可以试着自己跑起来,第二步就是去改写了,前面我们已经运行过了,这一把我们试着去改一改。

是这样,官网的例子呢,默认是帮你跑批模式,里面是做了一个WordCountData静态类作为批的输入,然后就跑完就结束了。实时程序码,我们还是希望源源不断的输入来一个计算一个嘛,那才有感觉。第二点就是里面倒是有输入一个,计算一个的例子,但是这个例子是需要基于Socket通信的输入,就是其他额外的动作比较多,在容器中操作起来会麻烦一些,也是搞得很容易放弃的,比较难受。

所以我们基于这种程序做一些改造,我们希望程序自己去生成数据,源源不断产生就行,然后后面可以简单的帮我做计算,这样给我们一种丝滑体验。

flink工程构建

命令行构建maven工程

mvn --settings /Users/zhuxuemin/.m2/settings_aliyun.xml archetype:generate -DgroupId=com.blog.flink -DartifactId=docker-app-flink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
• 1

导入idea,然后改吧改吧,测试代码什么的直接删除。注意仓库调整好,还是用阿里巴巴的就行

<mirror>
           <id>alimaven</id>
           <name>aliyun maven</name>
           <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
           <mirrorOf>central</mirrorOf>
</mirror>

我们来开始定义pom.xml,这里注意了,我们里面用啥版本,和我们容器上一致就行,低版本虽然也是可以兼容的,但是代码风格有差别的,直接干到最新就行,学习嘛。去哪里可以抄呢,还记得我们的命令行上面其实是带的。

那java版本是多少呢,自然是看文档啦,官网文档在这里

我们一看很清楚的

Java8目前是标注了过时了, 是建议搞到11

Java11呢,明显字那么多,就是当前支持的啦

Java17呢,实验性支持。

所以有的老铁说那我直接干到最新行不行,现在JDK都干到了21去了,太新太旧肯定都不合适的,官网都有的

使是所以我们的版本就定了基调了,

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.18.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

那引入哪些包呢,这里其实去源码里面抄就行的,就是对应WordCount里面就有,不过记得把源码搞到对应的版本。

怎么操作呢,从github上面拉取源码之后一般是master分支,然后我们找到远程的大版本分支就行

git branch -a
 
 remotes/origin/HEAD -> origin/master
 remotes/origin/blink
 ...这里好多分支
 remotes/origin/release-1.16
 remotes/origin/release-1.17
 remotes/origin/release-1.18. --这个就是我们要的

找到1.18分支,然后

git checkout release-1.18

然后找到对应的文件,就可以开始抄了

这里是要啥补啥,不断测试的时候就删减

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope> <!--本地运行时注释,打包时打开-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

WordCount编写

其实整个代码就是需要改造输入源,因为我不需要做输入来源的选择,所以对参数判断的逻辑就干掉了。

总共三个类,第一个是WordSource,我准备了一些句子,算是做数据准备

public class WordSource {
    static String[]  messages=new String[]{
            "How are you?",
            "Nice to meet you.",
            "How is it going?",
            "How's everything with you?",
            "Hi!Are you having fun",
            "Nice to see you again.",
            "How have you been?"
    };
    public static int length(){
        return messages.length;
    }
}

第二个就是分词器,就是把上面的每一句话打散成一个个单词,从源码里直接Copy:

public  final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

第三个类是主类,首先我们改写了源码中的数据源,通过DataGeneratorSource自动生成的方式,随机从我们WordSource里面的消息获取句子,每次取一句,源头就构建好了

第二步就是计算,WordCount的核心逻辑了

后面就run起来,没啥逻辑了,改写之后的代码比较清爽,是不是感觉更加适合入门了。

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义发射数据源
        DataGeneratorSource<String> wordSource=new DataGeneratorSource<String>(new GeneratorFunction<Long, String>() {
            Random random=new Random();
            public String map(Long aLong) throws Exception {
                return WordSource.messages[random.nextInt(WordSource.length())];
            }
        },Long.MAX_VALUE, RateLimiterStrategy.perSecond(10), Types.STRING);
        //计算
        env.fromSource(wordSource, WatermarkStrategy.noWatermarks(),"wordMessageSource")
        .flatMap(new Tokenizer()).name("tokenizer")
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
                .sum(1).name("counter")
                .print().name("print-sink");
        env.execute();
    }
}

这种程序是本机就可以运行的,先确保本机的输出符合预期:

Docker里面的Flink集群跑起来

注意了,发布之前其实需要打包的,部署失败的时候其实是有日志提示的,按照下面这个方式去找到错误信息。

这种错误是不是感觉特亲切。

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/GeneratorFunction
  at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
  at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
  at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod(Unknown Source) ~[?:?]
  at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307) ~[flink-dist-1.18.1.jar:1.18.1]
  ... 54 more

那如何打包呢,当然直接百度一下没问题的,当然我还是会建议直接去源码中去拿,源码中其实是一个程序里面包含了很多个示例程序,相当于支持了一个应用工程中多个打包程序的定制输出,这其实是直接在我们实际开发中也是碰到的。

这样子的话,我们有个参考,定义一份属于我们的打包文件,我把关键部分放出来,附上我的注释,这样子方便大家对着改

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <id>WordCount</id>
            <!-- 和 package 阶段绑定 -->
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <shadeTestJar>false</shadeTestJar>
          <!-- 最后jar包文件生成时候的名字 -->
          <finalName>WordCount</finalName> 
        <filters>
         <!-- 这里控制哪些依赖的jar包需要包含进来,注意:里面其实是写maven坐标 -->
          <filter>
            <artifact>org.apache.flink:flink-connector-datagen</artifact>
            <includes>
              <include>org/apache/flink/connector/datagen/source/**</include>
            </includes>
          </filter>
        </filters>
        </configuration>
      </plugin>

打包:运行

-DskipTests=true package

那个最丰满的,名字又是那么显眼的,一下子就认出来了。

接下来我们部署上去,首先还是从flink的UI上面部署,

上传之后就看到了一直运行的job了。

以命令行方式提交

这里先给一个小妙招,我们在Docker里面运行的时候需要每次找到对应的容器ID再执行代码比较麻烦,我们可以通过一些查询条件去获取到jobmanager对应的容器ID,再执行里面的flink命令,那么就变成下面这样子,比如说我要知道flink运行了哪些作业,执行下面的命令:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink list

可以看到我们刚才提交的命令

Waiting for response...
------------------ Running/Restarting Jobs -------------------
03.03.2024 07:50:15 : e356b3168e95ec2716c0830380f6c57b : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

由于我们对jobmanager的容器id下面会重复使用,而且比较长,我们定义一个变量代替,这样就方便使用

FLINK_JOBMANAGER=$(docker ps --filter name=jobmanager --format={{.ID}})

可以输出这个结果:

echo $FLINK_JOBMANAGER
cd422928babc

接下来,我们先拆解动作,我们首先把宿主机器上面的jar包复制到我们jobmanager容器中:

docker cp /Users/zhuxuemin/docker-app-flink/target/WordCount.jar $FLINK_JOBMANAGER:/opt/WordCount.jar

可以验证查看

docker exec $FLINK_JOBMANAGER ls /opt
flink
java
WordCount.jar

下一步,我们可以在容器内部实施提交了,还比较丝滑

docker exec -it $FLINK_JOBMANAGER flink \
 run -c com.blog.flink.WordCount \
 /opt/WordCount.jar

其实这样就可以了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
87 0
|
2月前
|
流计算 Docker 容器
在docker中玩flink时候记录一些组合命令
在docker中玩flink时候记录一些组合命令
20 0
|
2月前
|
Java 大数据 流计算
使用Docker快速部署Flink分布式集群
使用Docker快速部署Flink分布式集群
185 0
|
17天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
775 0
|
17天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
682 0
|
17天前
|
消息中间件 资源调度 Java
实时计算 Flink版操作报错合集之遇到了缺少包的错误,已经添加了相应的 jar 包,仍然出现同样的报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
658 2
|
17天前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
17天前
|
监控 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在配置连接时,添加了scan.startup.mode参数后,出现报错。是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
748 0
|
17天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接RabbitMQ时遇到Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory'错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
370 0
|
17天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
458 0