【Flink】Flink Java 统计词频 开发

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发

在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。  

  1. 开发环境:

在开始之前,请确保你的开发环境已经安装了以下软件:

  • JDK 1.8或更高版本
  • Maven 3.x
  • Apache Flink 1.x

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我 们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

  1. Maven环境准备:

配置基础的Maven环境:

在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。  

单词计数:使用批处理的方式统计词频:

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了

public class WorldCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境:
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件中读取数据:
        DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt");
        //3.将每个单词提取出来,进行分词,转换成为一个二元组:
        FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行拆分,转换为二元组:
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            //指定数据返回值类型:
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组:
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置
        //5.分组内进行聚合统计:
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        //6.打印输出:
        sum.print();
    }
}

单词计数:使用流处理的方式统计词频:

我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。  下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个流式的执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件:
        DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt");
        //3.转换计算:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

时时监听端口输入:

public class StreamWordCountTrue {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从参数中提取主机名:
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String hostname = fromArgs.get("host");
        Integer port = fromArgs.getInt("port");
        //2.监听端口,时时获取数据:
        DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port);
        //3.对获取到的文本流进行处理:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

IDEA配置输入参数:

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
16天前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的服装商城管理系统
基于Java+Springboot+Vue开发的服装商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的服装商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
50 2
基于Java+Springboot+Vue开发的服装商城管理系统
|
14天前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的大学竞赛报名管理系统
基于Java+Springboot+Vue开发的大学竞赛报名管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的大学竞赛报名管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
32 3
基于Java+Springboot+Vue开发的大学竞赛报名管理系统
|
15天前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的蛋糕商城管理系统
基于Java+Springboot+Vue开发的蛋糕商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的蛋糕商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
40 3
基于Java+Springboot+Vue开发的蛋糕商城管理系统
|
15天前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的美容预约管理系统
基于Java+Springboot+Vue开发的美容预约管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的美容预约管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
25 3
基于Java+Springboot+Vue开发的美容预约管理系统
|
16天前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的房产销售管理系统
基于Java+Springboot+Vue开发的房产销售管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的房产销售管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
34 3
基于Java+Springboot+Vue开发的房产销售管理系统
|
15天前
|
存储 网络协议 Java
Java NIO 开发
本文介绍了Java NIO(New IO)及其主要组件,包括Channel、Buffer和Selector,并对比了NIO与传统IO的优势。文章详细讲解了FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel及Pipe.SinkChannel和Pipe.SourceChannel等Channel实现类,并提供了示例代码。通过这些示例,读者可以了解如何使用不同类型的通道进行数据读写操作。
Java NIO 开发
|
4天前
|
安全 Java API
Java 泛型在安卓开发中的应用
在Android开发中,Java泛型广泛应用于集合类、自定义泛型类/方法、数据绑定、适配器及网络请求等场景,有助于实现类型安全、代码复用和提高可读性。例如,结合`ArrayList`使用泛型可避免类型转换错误;自定义泛型类如`ApiResponse&lt;T&gt;`可处理不同类型API响应;RecyclerView适配器利用泛型支持多种视图数据;Retrofit结合泛型定义响应模型,明确数据类型。然而,需注意类型擦除导致的信息丢失问题。合理使用泛型能显著提升代码质量和应用健壮性。
|
17天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
735 9
|
2天前
|
存储 分布式计算 Java
Stream很好,Map很酷,但答应我别用toMap():Java开发中的高效集合操作
在Java的世界里,Stream API和Map集合无疑是两大强大的工具,它们极大地简化了数据处理和集合操作的复杂度。然而,在享受这些便利的同时,我们也应当警惕一些潜在的陷阱,尤其是当Stream与Map结合使用时。本文将深入探讨Stream与Map的优雅用法,并特别指出在使用toMap()方法时需要注意的问题,旨在帮助大家在工作中更高效、更安全地使用这些技术。
9 0
|
2天前
|
JSON 安全 前端开发
第二次面试总结 - 宏汉科技 - Java后端开发
本文是作者对宏汉科技Java后端开发岗位的第二次面试总结,面试结果不理想,主要原因是Java基础知识掌握不牢固,文章详细列出了面试中被问到的技术问题及答案,包括字符串相关函数、抽象类与接口的区别、Java创建线程池的方式、回调函数、函数式接口、反射以及Java中的集合等。
10 0