Flink(一)【WordCount 快速入门】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(一)【WordCount 快速入门】

前言

       学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

  最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

       这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。 

  就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

   我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties>
 <flink.version>1.13.0</flink.version>
 <java.version>1.8</java.version>
 <scala.binary.version>2.12</scala.binary.version>
 <slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖-->
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-api</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-to-slf4j</artifactId>
 <version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

入门案例

0、数据准备

在 根目录下创建 words.txt

1. hello flink
2. hello java
3. hello spark
4. hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个执行批式数据处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSet
        DataSource<String> lineDS = env.readTextFile("input/words.txt");
        // 3. 将每行数据转换成一个二元组类型
        // 输入类型: String 输出类型: Tuple2
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =
                // String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型
                lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型
        // 4. 根据 word 分组
        UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置
        // 5. 分组内进行聚合
        AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置
        // 6. 打印结果
        res.print();
    }
}

运行结果:

1. (hadoop,1)
2. (flink,1)
3. (hello,4)
4. (java,1)
5. (spark,1)
6. 
7. Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");
        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);
        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);
        // 6. 输出结果
        res.print();
        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果:

1. 3> (java,1)
2. 13> (flink,1)
3. 1> (spark,1)
4. 5> (hello,1)
5. 5> (hello,2)
6. 5> (hello,3)
7. 5> (hello,4)
8. 15> (hadoop,1)

      我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

   Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

 我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class UnBoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");
        DataStreamSource<String> lineDS = env.socketTextStream(host,port);
        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);
        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);
        // 6. 输出结果
        res.print();
        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果:

       可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

       Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。





相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 缓存 资源调度
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
650 0
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
|
5月前
|
消息中间件 资源调度 Kafka
2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(JianYi收藏)
2021年最新最全Flink系列教程_Flink快速入门(概述,安装部署)(一)(JianYi收藏)
60 0
|
分布式计算 资源调度 Hadoop
Flink快速入门--安装与示例运行
flink是一款开源的大数据流式处理框架,他可以同时批处理和流处理,具有容错性、高吞吐、低延迟等优势,本文简述flink在windows和linux中安装步骤,和示例程序的运行。
1900 0
Flink快速入门--安装与示例运行
|
SQL 消息中间件 Java
Flink 入门程序 WordCount 和 SQL 实现
我们右键运行时相当于在本地启动了一个单机版本。生产中都是集群环境,并且是高可用的,生产上提交任务需要用到flink run 命令,指定必要的参数。本课时我们主要介绍 Flink 的入门程序以及 SQL 形式的实现。 上一课时已经讲解了 Flink 的常用应用场景和架构模型设计,这一课时我们将会从一个最简单的 WordCount 案例作为切入点,并且同时使用 SQL 方式进行实现,为后面的实战课程打好基础。
308 0
Flink 入门程序 WordCount 和 SQL 实现
|
存储 弹性计算 网络安全
实时计算Flink —— 独享模式快速入门
已开通地域 目前共享模式仅在华东一(杭州),华东二(上海),华南一(深圳),华北二(北京)四个区域开通。如果对其他地域有开通需求,请联系阿里云技术支持。 准备工作 实时计算独享模式开通时,实时计算会为您在您的VPC内创建安全组以及申请弹性网卡。
2098 0
|
运维 数据库 流计算
实时计算Flink —— 快速入门 —— 步骤四:生产运维
启动业务 在数据运维页面,选中作业名称(如bj_dim_join),点击启动,流式作业即可启动。选择启动位点。 注意:实时计算作业启动时候需要您指定启动时间。实际上就是从源头数据存储的指定时间点开始读取数据。
1353 0
|
存储 关系型数据库 流计算
实时计算Flink > 快速入门 —— 步骤三:数据开发,作业上线
本页目录 新建作业 开发储存 用RDS为数据的维表或结果表 编写业务逻辑的SQL 作业调试StreamSQL 作业上线 说明: 实时计算 Flink按照标准的阿里云产品规格提供账号信息支持,即一个项目隶属于一个项目所有者,多人协作模型必须使用主子账号完成。
2257 0
|
存储 关系型数据库 数据库
实时计算Flink > 快速入门 —— 步骤二:注册上下游存储
本页目录 登录阿里云账号 注册上游存储 注册下游存储 注册上下游存储操作步骤如下。 登录阿里云账号 登录阿里云账号。 注册上游存储 登录DataHub 登录DataHub控制台。 创建DataHub源表 为简化问题,我们将源源不断的数据抽象简化为如下二维表。
1420 0
|
流计算
实时计算Flink > 快速入门 —— 步骤一:购买订单,绑定订单创建项目
本文档主要介绍如何购买订单,绑定订单,创建项目。 操作详情 请您参看如何购买。 注意: 在您开通实时计算服务时,主账号需要授予一个名称为AliyunStreamDefaultRole的系统默认角色给实时计算的服务账号。
1456 0
|
SQL 存储 监控
实时计算Flink——快速入门概述
本页目录 安全监控背景介绍 安全监控业务架构图 作业操作流程 文本提供了一个安全监控作业案例,来帮助您了解实时计算开发流式作业。 安全监控背景介绍 随着科技的进步,数据的安全越来越被世人所关注,安全实时监控报警尤为重要。
1594 0