在Docker跑通Flink分布式版本的WordCount

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在Docker跑通Flink分布式版本的WordCount

前言

前文我们介绍了,使用Docker快速部署Flink分布式集群,,这一把我们研究一下怎么自己撸一个WordCount上去跑起来。

官网例子的问题

大家发现我的风格或多或少是因为引导大家怎么去入门到熟悉的过程,所以我希望传递给大家一些学习的办法。我是比较大家直接看官网源码的例子的,而且官网刻意给出来本身也就是这个作用,WordCount非常重要,这是一个麻雀虽小五脏俱全的例子,这个例子熟悉了,其实就是入门了。熟悉的第一步特征就是可以试着自己跑起来,第二步就是去改写了,前面我们已经运行过了,这一把我们试着去改一改。

是这样,官网的例子呢,默认是帮你跑批模式,里面是做了一个WordCountData静态类作为批的输入,然后就跑完就结束了。实时程序码,我们还是希望源源不断的输入来一个计算一个嘛,那才有感觉。第二点就是里面倒是有输入一个,计算一个的例子,但是这个例子是需要基于Socket通信的输入,就是其他额外的动作比较多,在容器中操作起来会麻烦一些,也是搞得很容易放弃的,比较难受。

所以我们基于这种程序做一些改造,我们希望程序自己去生成数据,源源不断产生就行,然后后面可以简单的帮我做计算,这样给我们一种丝滑体验。

flink工程构建

命令行构建maven工程

mvn --settings /Users/zhuxuemin/.m2/settings_aliyun.xml archetype:generate -DgroupId=com.blog.flink -DartifactId=docker-app-flink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
• 1

导入idea,然后改吧改吧,测试代码什么的直接删除。注意仓库调整好,还是用阿里巴巴的就行

<mirror>
           <id>alimaven</id>
           <name>aliyun maven</name>
           <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
           <mirrorOf>central</mirrorOf>
</mirror>

我们来开始定义pom.xml,这里注意了,我们里面用啥版本,和我们容器上一致就行,低版本虽然也是可以兼容的,但是代码风格有差别的,直接干到最新就行,学习嘛。去哪里可以抄呢,还记得我们的命令行上面其实是带的。

那java版本是多少呢,自然是看文档啦,官网文档在这里

我们一看很清楚的

Java8目前是标注了过时了, 是建议搞到11

Java11呢,明显字那么多,就是当前支持的啦

Java17呢,实验性支持。

所以有的老铁说那我直接干到最新行不行,现在JDK都干到了21去了,太新太旧肯定都不合适的,官网都有的

使是所以我们的版本就定了基调了,

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.18.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

那引入哪些包呢,这里其实去源码里面抄就行的,就是对应WordCount里面就有,不过记得把源码搞到对应的版本。

怎么操作呢,从github上面拉取源码之后一般是master分支,然后我们找到远程的大版本分支就行

git branch -a
 
 remotes/origin/HEAD -> origin/master
 remotes/origin/blink
 ...这里好多分支
 remotes/origin/release-1.16
 remotes/origin/release-1.17
 remotes/origin/release-1.18. --这个就是我们要的

找到1.18分支,然后

git checkout release-1.18

然后找到对应的文件,就可以开始抄了

这里是要啥补啥,不断测试的时候就删减

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope> <!--本地运行时注释,打包时打开-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

WordCount编写

其实整个代码就是需要改造输入源,因为我不需要做输入来源的选择,所以对参数判断的逻辑就干掉了。

总共三个类,第一个是WordSource,我准备了一些句子,算是做数据准备

public class WordSource {
    static String[]  messages=new String[]{
            "How are you?",
            "Nice to meet you.",
            "How is it going?",
            "How's everything with you?",
            "Hi!Are you having fun",
            "Nice to see you again.",
            "How have you been?"
    };
    public static int length(){
        return messages.length;
    }
}

第二个就是分词器,就是把上面的每一句话打散成一个个单词,从源码里直接Copy:

public  final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

第三个类是主类,首先我们改写了源码中的数据源,通过DataGeneratorSource自动生成的方式,随机从我们WordSource里面的消息获取句子,每次取一句,源头就构建好了

第二步就是计算,WordCount的核心逻辑了

后面就run起来,没啥逻辑了,改写之后的代码比较清爽,是不是感觉更加适合入门了。

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义发射数据源
        DataGeneratorSource<String> wordSource=new DataGeneratorSource<String>(new GeneratorFunction<Long, String>() {
            Random random=new Random();
            public String map(Long aLong) throws Exception {
                return WordSource.messages[random.nextInt(WordSource.length())];
            }
        },Long.MAX_VALUE, RateLimiterStrategy.perSecond(10), Types.STRING);
        //计算
        env.fromSource(wordSource, WatermarkStrategy.noWatermarks(),"wordMessageSource")
        .flatMap(new Tokenizer()).name("tokenizer")
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
                .sum(1).name("counter")
                .print().name("print-sink");
        env.execute();
    }
}

这种程序是本机就可以运行的,先确保本机的输出符合预期:

Docker里面的Flink集群跑起来

注意了,发布之前其实需要打包的,部署失败的时候其实是有日志提示的,按照下面这个方式去找到错误信息。

这种错误是不是感觉特亲切。

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/datagen/source/GeneratorFunction
  at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
  at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
  at java.lang.Class.getMethodsRecursive(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod0(Unknown Source) ~[?:?]
  at java.lang.Class.getMethod(Unknown Source) ~[?:?]
  at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307) ~[flink-dist-1.18.1.jar:1.18.1]
  ... 54 more

那如何打包呢,当然直接百度一下没问题的,当然我还是会建议直接去源码中去拿,源码中其实是一个程序里面包含了很多个示例程序,相当于支持了一个应用工程中多个打包程序的定制输出,这其实是直接在我们实际开发中也是碰到的。

这样子的话,我们有个参考,定义一份属于我们的打包文件,我把关键部分放出来,附上我的注释,这样子方便大家对着改

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <id>WordCount</id>
            <!-- 和 package 阶段绑定 -->
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <shadeTestJar>false</shadeTestJar>
          <!-- 最后jar包文件生成时候的名字 -->
          <finalName>WordCount</finalName> 
        <filters>
         <!-- 这里控制哪些依赖的jar包需要包含进来,注意:里面其实是写maven坐标 -->
          <filter>
            <artifact>org.apache.flink:flink-connector-datagen</artifact>
            <includes>
              <include>org/apache/flink/connector/datagen/source/**</include>
            </includes>
          </filter>
        </filters>
        </configuration>
      </plugin>

打包:运行

-DskipTests=true package

那个最丰满的,名字又是那么显眼的,一下子就认出来了。

接下来我们部署上去,首先还是从flink的UI上面部署,

上传之后就看到了一直运行的job了。

以命令行方式提交

这里先给一个小妙招,我们在Docker里面运行的时候需要每次找到对应的容器ID再执行代码比较麻烦,我们可以通过一些查询条件去获取到jobmanager对应的容器ID,再执行里面的flink命令,那么就变成下面这样子,比如说我要知道flink运行了哪些作业,执行下面的命令:

docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) flink list

可以看到我们刚才提交的命令

Waiting for response...
------------------ Running/Restarting Jobs -------------------
03.03.2024 07:50:15 : e356b3168e95ec2716c0830380f6c57b : Flink Streaming Job (RUNNING)
--------------------------------------------------------------

由于我们对jobmanager的容器id下面会重复使用,而且比较长,我们定义一个变量代替,这样就方便使用

FLINK_JOBMANAGER=$(docker ps --filter name=jobmanager --format={{.ID}})

可以输出这个结果:

echo $FLINK_JOBMANAGER
cd422928babc

接下来,我们先拆解动作,我们首先把宿主机器上面的jar包复制到我们jobmanager容器中:

docker cp /Users/zhuxuemin/docker-app-flink/target/WordCount.jar $FLINK_JOBMANAGER:/opt/WordCount.jar

可以验证查看

docker exec $FLINK_JOBMANAGER ls /opt
flink
java
WordCount.jar

下一步,我们可以在容器内部实施提交了,还比较丝滑

docker exec -it $FLINK_JOBMANAGER flink \
 run -c com.blog.flink.WordCount \
 /opt/WordCount.jar

其实这样就可以了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
89 3
|
4月前
|
存储 缓存 监控
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
|
2月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
68 4
|
2月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
92 3
|
4月前
|
分布式数据库 流计算 Docker
实时计算 Flink版操作报错合集之在Docker上启动JobManager(JM)时遇到报错,,该怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
84 0
|
5月前
|
Java Scala 流计算
实时计算 Flink版产品使用问题之Docker镜像中的Java路径和容器内的Java路径不一致,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
222 0
|
7月前
|
流计算 Docker 容器
在docker中玩flink时候记录一些组合命令
在docker中玩flink时候记录一些组合命令
45 0
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
下一篇
DataWorks