在Docker跑通Flink分布式版本的WordCount

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Docker跑通Flink分布式版本的WordCount

前言

前文我们介绍了,使用Docker快速部署Flink分布式集群,,这一把我们研究一下怎么自己撸一个WordCount上去跑起来。

官网例子的问题

大家发现我的风格或多或少是因为引导大家怎么去入门到熟悉的过程,所以我希望传递给大家一些学习的办法。我是比较大家直接看官网源码的例子的,而且官网刻意给出来本身也就是这个作用,WordCount非常重要,这是一个麻雀虽小五脏俱全的例子,这个例子熟悉了,其实就是入门了。熟悉的第一步特征就是可以试着自己跑起来,第二步就是去改写了,前面我们已经运行过了,这一把我们试着去改一改。

是这样,官网的例子呢,默认是帮你跑批模式,里面是做了一个WordCountData静态类作为批的输入,然后就跑完就结束了。实时程序码,我们还是希望源源不断的输入来一个计算一个嘛,那才有感觉。第二点就是里面倒是有输入一个,计算一个的例子,但是这个例子是需要基于Socket通信的输入,就是其他额外的动作比较多,在容器中操作起来会麻烦一些,也是搞得很容易放弃的,比较难受。

所以我们基于这种程序做一些改造,我们希望程序自己去生成数据,源源不断产生就行,然后后面可以简单的帮我做计算,这样给我们一种丝滑体验。

flink工程构建

命令行构建maven工程

mvn --settings /Users/zhuxuemin/.m2/settings_aliyun.xml archetype:generate -DgroupId=com.blog.flink -DartifactId=docker-app-flink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
• 1

导入idea,然后改吧改吧,测试代码什么的直接删除。注意仓库调整好,还是用阿里巴巴的就行

<mirror>
           <id>alimaven</id>
           <name>aliyun maven</name>
           <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
           <mirrorOf>central</mirrorOf>
</mirror>

我们来开始定义pom.xml,这里注意了,我们里面用啥版本,和我们容器上一致就行,低版本虽然也是可以兼容的,但是代码风格有差别的,直接干到最新就行,学习嘛。去哪里可以抄呢,还记得我们的命令行上面其实是带的。

那java版本是多少呢,自然是看文档啦,官网文档在这里

我们一看很清楚的

Java8目前是标注了过时了, 是建议搞到11

Java11呢,明显字那么多,就是当前支持的啦

Java17呢,实验性支持。

所以有的老铁说那我直接干到最新行不行,现在JDK都干到了21去了,太新太旧肯定都不合适的,官网都有的

使是所以我们的版本就定了基调了,

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.18.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

那引入哪些包呢,这里其实去源码里面抄就行的,就是对应WordCount里面就有,不过记得把源码搞到对应的版本。

怎么操作呢,从github上面拉取源码之后一般是master分支,然后我们找到远程的大版本分支就行

git branch -a
 
 remotes/origin/HEAD -> origin/master
 remotes/origin/blink
 ...这里好多分支
 remotes/origin/release-1.16
 remotes/origin/release-1.17
 remotes/origin/release-1.18. --这个就是我们要的

找到1.18分支,然后

git checkout release-1.18

然后找到对应的文件,就可以开始抄了

这里是要啥补啥,不断测试的时候就删减

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope> <!--本地运行时注释,打包时打开-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

WordCount编写

其实整个代码就是需要改造输入源,因为我不需要做输入来源的选择,所以对参数判断的逻辑就干掉了。

总共三个类,第一个是WordSource,我准备了一些句子,算是做数据准备

public class WordSource {
    static String[]  messages=new String[]{
            "How are you?",
            "Nice to meet you.",
            "How is it going?",
            "How's everything with you?",
            "Hi!Are you having fun",
            "Nice to see you again.",
            "How have you been?"
    };
    public static int length(){
        return messages.length;
    }
}

第二个就是分词器,就是把上面的每一句话打散成一个个单词,从源码里直接Copy:

public  final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

第三个类是主类,首先我们改写了源码中的数据源,通过DataGeneratorSource自动生成的方式,随机从我们WordSource里面的消息获取句子,每次取一句,源头就构建好了

第二步就是计算,WordCount的核心逻辑了

后面就run起来,没啥逻辑了,改写之后的代码比较清爽,是不是感觉更加适合入门了。

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义发射数据源
        DataGeneratorSource<String> wordSource=new DataGeneratorSource<String>(new GeneratorFunction<Long, String>() {
            Random random=new Random();
            public String map(Long aLong) throws Exception {
                return WordSource.messages[random.nextInt(WordSource.length())];
            }
        },Long.MAX_VALUE, RateLimiterStrategy.perSecond(10), Types.STRING);
        //计算
        env.fromSource(wordSource, WatermarkStrategy.noWatermarks(),"wordMessageSource")
        .flatMap(new Tokenizer()).name("tokenizer")
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
                .sum(1).name("counter")
                .print().name("print-sink");
        env.execute();
    }
}

这种程序是本机就可以运行的,先确保本机的输出符合预期:

Docker里面的Flink集群跑起来

注意了,发布之前其实需要打包的,部署失败的时候其实是有日志提示的,按照下面这个方式去找到错误信息。

这种错误是不是感觉特亲切。

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/GeneratorFunction
  at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
  at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
  at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod(Unknown Source) ~[?:?]
  at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307) ~[flink-dist-1.18.1.jar:1.18.1]
  ... 54 more

那如何打包呢,当然直接百度一下没问题的,当然我还是会建议直接去源码中去拿,源码中其实是一个程序里面包含了很多个示例程序,相当于支持了一个应用工程中多个打包程序的定制输出,这其实是直接在我们实际开发中也是碰到的。

这样子的话,我们有个参考,定义一份属于我们的打包文件,我把关键部分放出来,附上我的注释,这样子方便大家对着改

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <id>WordCount</id>
            <!-- 和 package 阶段绑定 -->
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <shadeTestJar>false</shadeTestJar>
          <!-- 最后jar包文件生成时候的名字 -->
          <finalName>WordCount</finalName> 
        <filters>
         <!-- 这里控制哪些依赖的jar包需要包含进来,注意:里面其实是写maven坐标 -->
          <filter>
            <artifact>org.apache.flink:flink-connector-datagen</artifact>
            <includes>
              <include>org/apache/flink/connector/datagen/source/**</include>
            </includes>
          </filter>
        </filters>
        </configuration>
      </plugin>

打包:运行

-DskipTests=true package

那个最丰满的,名字又是那么显眼的,一下子就认出来了。

接下来我们部署上去,首先还是从flink的UI上面部署,

上传之后就看到了一直运行的job了。

以命令行方式提交

这里先给一个小妙招,我们在Docker里面运行的时候需要每次找到对应的容器ID再执行代码比较麻烦,我们可以通过一些查询条件去获取到jobmanager对应的容器ID,再执行里面的flink命令,那么就变成下面这样子,比如说我要知道flink运行了哪些作业,执行下面的命令:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink list

可以看到我们刚才提交的命令

Waiting for response...
------------------ Running/Restarting Jobs -------------------
03.03.2024 07:50:15 : e356b3168e95ec2716c0830380f6c57b : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

由于我们对jobmanager的容器id下面会重复使用,而且比较长,我们定义一个变量代替,这样就方便使用

FLINK_JOBMANAGER=$(docker ps --filter name=jobmanager --format={{.ID}})

可以输出这个结果:

echo $FLINK_JOBMANAGER
cd422928babc

接下来,我们先拆解动作,我们首先把宿主机器上面的jar包复制到我们jobmanager容器中:

docker cp /Users/zhuxuemin/docker-app-flink/target/WordCount.jar $FLINK_JOBMANAGER:/opt/WordCount.jar

可以验证查看

docker exec $FLINK_JOBMANAGER ls /opt
flink
java
WordCount.jar

下一步,我们可以在容器内部实施提交了,还比较丝滑

docker exec -it $FLINK_JOBMANAGER flink \
 run -c com.blog.flink.WordCount \
 /opt/WordCount.jar

其实这样就可以了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
24天前
|
机器人 网络安全 数据安全/隐私保护
autMan奥特曼机器人-对接Docker版本NTQQ详细教程
本文介绍了如何在服务器上搭建NTQQ机器人,通过官方NTQQ对接各框架,实现QQ登录的稳定运行。文章提到了需要准备一台服务器和相应的软件,并详细描述了通过SSH链接服务器、创建文件夹和配置文件、编辑配置文件地址端口、运行容器等步骤。同时,文章还介绍了VNC连接的使用和配置,以及使用watchtower进行NTQQ的更新。文章总结起来就是在服务器上搭建NTQQ机器人,实现QQ登录的稳定性和自动登录功能,同时提供了更新和维护的方法。
57 3
autMan奥特曼机器人-对接Docker版本NTQQ详细教程
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
1月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
38 4
|
2月前
|
Docker 容器
如何查看docker版本|12
如何查看docker版本|12
36 2
|
1月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
46 0
|
1月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
45 0
|
1月前
|
NoSQL MongoDB Docker
求助,有没有大神可以找到arm64架构下mongodb的3.6.8版本的docker镜像?
在Docker Hub受限的情况下,寻求适用于ARM架构的docker镜像资源或拉取链接,以便在x86架构上获取;内网中的机器为ARM架构,因此优先请求适合ARM的Docker镜像或Dockerfile,非常感激您的帮助。
|
1月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
73 0
|
3月前
|
关系型数据库 MySQL Linux
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】
这篇文章提供了在Linux系统中使用Docker安装Mysql 5.7版本的详细过程和图解,包括安装指定版本、创建实例、启动、使用Navicat连接测试、文件挂载与端口映射、进入容器、配置文件修改以及重新启动容器等步骤。
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】