在Docker跑通Flink分布式版本的WordCount

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Docker跑通Flink分布式版本的WordCount

前言

前文我们介绍了,使用Docker快速部署Flink分布式集群,,这一把我们研究一下怎么自己撸一个WordCount上去跑起来。

官网例子的问题

大家发现我的风格或多或少是因为引导大家怎么去入门到熟悉的过程,所以我希望传递给大家一些学习的办法。我是比较大家直接看官网源码的例子的,而且官网刻意给出来本身也就是这个作用,WordCount非常重要,这是一个麻雀虽小五脏俱全的例子,这个例子熟悉了,其实就是入门了。熟悉的第一步特征就是可以试着自己跑起来,第二步就是去改写了,前面我们已经运行过了,这一把我们试着去改一改。

是这样,官网的例子呢,默认是帮你跑批模式,里面是做了一个WordCountData静态类作为批的输入,然后就跑完就结束了。实时程序码,我们还是希望源源不断的输入来一个计算一个嘛,那才有感觉。第二点就是里面倒是有输入一个,计算一个的例子,但是这个例子是需要基于Socket通信的输入,就是其他额外的动作比较多,在容器中操作起来会麻烦一些,也是搞得很容易放弃的,比较难受。

所以我们基于这种程序做一些改造,我们希望程序自己去生成数据,源源不断产生就行,然后后面可以简单的帮我做计算,这样给我们一种丝滑体验。

flink工程构建

命令行构建maven工程

mvn --settings /Users/zhuxuemin/.m2/settings_aliyun.xml archetype:generate -DgroupId=com.blog.flink -DartifactId=docker-app-flink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
• 1

导入idea,然后改吧改吧,测试代码什么的直接删除。注意仓库调整好,还是用阿里巴巴的就行

<mirror>
           <id>alimaven</id>
           <name>aliyun maven</name>
           <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
           <mirrorOf>central</mirrorOf>
</mirror>

我们来开始定义pom.xml,这里注意了,我们里面用啥版本,和我们容器上一致就行,低版本虽然也是可以兼容的,但是代码风格有差别的,直接干到最新就行,学习嘛。去哪里可以抄呢,还记得我们的命令行上面其实是带的。

那java版本是多少呢,自然是看文档啦,官网文档在这里

我们一看很清楚的

Java8目前是标注了过时了, 是建议搞到11

Java11呢,明显字那么多,就是当前支持的啦

Java17呢,实验性支持。

所以有的老铁说那我直接干到最新行不行,现在JDK都干到了21去了,太新太旧肯定都不合适的,官网都有的

使是所以我们的版本就定了基调了,

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.18.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

那引入哪些包呢,这里其实去源码里面抄就行的,就是对应WordCount里面就有,不过记得把源码搞到对应的版本。

怎么操作呢,从github上面拉取源码之后一般是master分支,然后我们找到远程的大版本分支就行

git branch -a
 
 remotes/origin/HEAD -> origin/master
 remotes/origin/blink
 ...这里好多分支
 remotes/origin/release-1.16
 remotes/origin/release-1.17
 remotes/origin/release-1.18. --这个就是我们要的

找到1.18分支,然后

git checkout release-1.18

然后找到对应的文件,就可以开始抄了

这里是要啥补啥,不断测试的时候就删减

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope> <!--本地运行时注释,打包时打开-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

WordCount编写

其实整个代码就是需要改造输入源,因为我不需要做输入来源的选择,所以对参数判断的逻辑就干掉了。

总共三个类,第一个是WordSource,我准备了一些句子,算是做数据准备

public class WordSource {
    static String[]  messages=new String[]{
            "How are you?",
            "Nice to meet you.",
            "How is it going?",
            "How's everything with you?",
            "Hi!Are you having fun",
            "Nice to see you again.",
            "How have you been?"
    };
    public static int length(){
        return messages.length;
    }
}

第二个就是分词器,就是把上面的每一句话打散成一个个单词,从源码里直接Copy:

public  final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

第三个类是主类,首先我们改写了源码中的数据源,通过DataGeneratorSource自动生成的方式,随机从我们WordSource里面的消息获取句子,每次取一句,源头就构建好了

第二步就是计算,WordCount的核心逻辑了

后面就run起来,没啥逻辑了,改写之后的代码比较清爽,是不是感觉更加适合入门了。

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义发射数据源
        DataGeneratorSource<String> wordSource=new DataGeneratorSource<String>(new GeneratorFunction<Long, String>() {
            Random random=new Random();
            public String map(Long aLong) throws Exception {
                return WordSource.messages[random.nextInt(WordSource.length())];
            }
        },Long.MAX_VALUE, RateLimiterStrategy.perSecond(10), Types.STRING);
        //计算
        env.fromSource(wordSource, WatermarkStrategy.noWatermarks(),"wordMessageSource")
        .flatMap(new Tokenizer()).name("tokenizer")
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
                .sum(1).name("counter")
                .print().name("print-sink");
        env.execute();
    }
}

这种程序是本机就可以运行的,先确保本机的输出符合预期:

Docker里面的Flink集群跑起来

注意了,发布之前其实需要打包的,部署失败的时候其实是有日志提示的,按照下面这个方式去找到错误信息。

这种错误是不是感觉特亲切。

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/GeneratorFunction
  at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
  at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
  at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod(Unknown Source) ~[?:?]
  at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307) ~[flink-dist-1.18.1.jar:1.18.1]
  ... 54 more

那如何打包呢,当然直接百度一下没问题的,当然我还是会建议直接去源码中去拿,源码中其实是一个程序里面包含了很多个示例程序,相当于支持了一个应用工程中多个打包程序的定制输出,这其实是直接在我们实际开发中也是碰到的。

这样子的话,我们有个参考,定义一份属于我们的打包文件,我把关键部分放出来,附上我的注释,这样子方便大家对着改

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <id>WordCount</id>
            <!-- 和 package 阶段绑定 -->
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <shadeTestJar>false</shadeTestJar>
          <!-- 最后jar包文件生成时候的名字 -->
          <finalName>WordCount</finalName> 
        <filters>
         <!-- 这里控制哪些依赖的jar包需要包含进来,注意:里面其实是写maven坐标 -->
          <filter>
            <artifact>org.apache.flink:flink-connector-datagen</artifact>
            <includes>
              <include>org/apache/flink/connector/datagen/source/**</include>
            </includes>
          </filter>
        </filters>
        </configuration>
      </plugin>

打包:运行

-DskipTests=true package

那个最丰满的,名字又是那么显眼的,一下子就认出来了。

接下来我们部署上去,首先还是从flink的UI上面部署,

上传之后就看到了一直运行的job了。

以命令行方式提交

这里先给一个小妙招,我们在Docker里面运行的时候需要每次找到对应的容器ID再执行代码比较麻烦,我们可以通过一些查询条件去获取到jobmanager对应的容器ID,再执行里面的flink命令,那么就变成下面这样子,比如说我要知道flink运行了哪些作业,执行下面的命令:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink list

可以看到我们刚才提交的命令

Waiting for response...
------------------ Running/Restarting Jobs -------------------
03.03.2024 07:50:15 : e356b3168e95ec2716c0830380f6c57b : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

由于我们对jobmanager的容器id下面会重复使用,而且比较长,我们定义一个变量代替,这样就方便使用

FLINK_JOBMANAGER=$(docker ps --filter name=jobmanager --format={{.ID}})

可以输出这个结果:

echo $FLINK_JOBMANAGER
cd422928babc

接下来,我们先拆解动作,我们首先把宿主机器上面的jar包复制到我们jobmanager容器中:

docker cp /Users/zhuxuemin/docker-app-flink/target/WordCount.jar $FLINK_JOBMANAGER:/opt/WordCount.jar

可以验证查看

docker exec $FLINK_JOBMANAGER ls /opt
flink
java
WordCount.jar

下一步,我们可以在容器内部实施提交了,还比较丝滑

docker exec -it $FLINK_JOBMANAGER flink \
 run -c com.blog.flink.WordCount \
 /opt/WordCount.jar

其实这样就可以了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8天前
|
Cloud Native 安全 Docker
云上攻防-云原生篇&Docker安全&系统内核&版本&CDK自动利用&容器逃逸
云上攻防-云原生篇&Docker安全&系统内核&版本&CDK自动利用&容器逃逸
|
10天前
|
应用服务中间件 网络安全 nginx
docker 搭建 最新版本的 gitlab,使用HTTPS访问,以及gitlab的基础使用讲解
docker 搭建 最新版本的 gitlab,使用HTTPS访问,以及gitlab的基础使用讲解
|
27天前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
80 0
|
29天前
|
流计算 Docker 容器
在docker中玩flink时候记录一些组合命令
在docker中玩flink时候记录一些组合命令
19 0
|
29天前
|
Java 大数据 流计算
使用Docker快速部署Flink分布式集群
使用Docker快速部署Flink分布式集群
155 0
|
1月前
|
NoSQL Java 关系型数据库
【Redis系列笔记】分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
440 2
|
2天前
|
NoSQL Redis
redis分布式锁redisson
底层会尝试去加锁,如果加锁失败,会睡眠,自旋加锁,直到获取到锁为止。
9 1
|
13天前
|
NoSQL 算法 Java
探讨redis分布式锁
探讨redis分布式锁
18 1
|
20天前
|
缓存 NoSQL 安全
玩转Redis!非常强大的Redisson分布式集合,少写60%代码
Redisson是Java的Redis客户端,提供实时数据平台服务,简化了分布式环境下的数据管理。它包含RList、RSet、RMap等分布式集合,支持ConcurrentMap和Set接口,确保线程安全和数据一致性。例如,RMap实现了本地缓存和监听器功能,允许数据监听和本地加速读取。此外,还提供了RSet的排序和去重功能,以及RQueue和RBlockingQueue等队列实现,支持阻塞操作。通过Redisson,开发者能轻松处理分布式系统的数据同步和操作。
|
1月前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
246 16
探秘Redis分布式锁:实战与注意事项