【Flink】Flink Java 统计词频 开发

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发

在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。  

  1. 开发环境:

在开始之前,请确保你的开发环境已经安装了以下软件:

  • JDK 1.8或更高版本
  • Maven 3.x
  • Apache Flink 1.x

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我 们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

  1. Maven环境准备:

配置基础的Maven环境:

在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。  

单词计数:使用批处理的方式统计词频:

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了

public class WorldCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境:
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件中读取数据:
        DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt");
        //3.将每个单词提取出来,进行分词,转换成为一个二元组:
        FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行拆分,转换为二元组:
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            //指定数据返回值类型:
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组:
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置
        //5.分组内进行聚合统计:
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        //6.打印输出:
        sum.print();
    }
}

单词计数:使用流处理的方式统计词频:

我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。  下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个流式的执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件:
        DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt");
        //3.转换计算:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

时时监听端口输入:

public class StreamWordCountTrue {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从参数中提取主机名:
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String hostname = fromArgs.get("host");
        Integer port = fromArgs.getInt("port");
        //2.监听端口,时时获取数据:
        DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port);
        //3.对获取到的文本流进行处理:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

IDEA配置输入参数:

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
消息中间件 人工智能 Java
抖音微信爆款小游戏大全:免费休闲/竞技/益智/PHP+Java全筏开源开发
本文基于2025年最新行业数据,深入解析抖音/微信爆款小游戏的开发逻辑,重点讲解PHP+Java双引擎架构实战,涵盖技术选型、架构设计、性能优化与开源生态,提供完整开源工具链,助力开发者从理论到落地打造高留存、高并发的小游戏产品。
|
1月前
|
存储 Java 关系型数据库
Java 项目实战基于面向对象思想的汽车租赁系统开发实例 汽车租赁系统 Java 面向对象项目实战
本文介绍基于Java面向对象编程的汽车租赁系统技术方案与应用实例,涵盖系统功能需求分析、类设计、数据库设计及具体代码实现,帮助开发者掌握Java在实际项目中的应用。
53 0
|
2月前
|
安全 Java 数据库
Java 项目实战病人挂号系统网站设计开发步骤及核心功能实现指南
本文介绍了基于Java的病人挂号系统网站的技术方案与应用实例,涵盖SSM与Spring Boot框架选型、数据库设计、功能模块划分及安全机制实现。系统支持患者在线注册、登录、挂号与预约,管理员可进行医院信息与排班管理。通过实际案例展示系统开发流程与核心代码实现,为Java Web医疗项目开发提供参考。
130 2
|
2月前
|
JavaScript 安全 前端开发
Java开发:最新技术驱动的病人挂号系统实操指南与全流程操作技巧汇总
本文介绍基于Spring Boot 3.x、Vue 3等最新技术构建现代化病人挂号系统,涵盖技术选型、核心功能实现与部署方案,助力开发者快速搭建高效、安全的医疗挂号平台。
166 3
|
2月前
|
移动开发 Cloud Native 安全
Java:跨平台之魂,企业级开发的磐石
Java:跨平台之魂,企业级开发的磐石
|
2月前
|
安全 Oracle Java
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
226 0
JAVA高级开发必备·卓伊凡详细JDK、JRE、JVM与Java生态深度解析-形象比喻系统理解-优雅草卓伊凡
|
3月前
|
并行计算 Java API
Java List 集合结合 Java 17 新特性与现代开发实践的深度解析及实战指南 Java List 集合
本文深入解析Java 17中List集合的现代用法,结合函数式编程、Stream API、密封类、模式匹配等新特性,通过实操案例讲解数据处理、并行计算、响应式编程等场景下的高级应用,帮助开发者提升集合操作效率与代码质量。
164 1
|
3月前
|
安全 Java API
Java 17 及以上版本核心特性在现代开发实践中的深度应用与高效实践方法 Java 开发实践
本项目以“学生成绩管理系统”为例,深入实践Java 17+核心特性与现代开发技术。采用Spring Boot 3.1、WebFlux、R2DBC等构建响应式应用,结合Record类、模式匹配、Stream优化等新特性提升代码质量。涵盖容器化部署(Docker)、自动化测试、性能优化及安全加固,全面展示Java最新技术在实际项目中的应用,助力开发者掌握现代化Java开发方法。
147 1
|
3月前
|
IDE Java API
Java 17 新特性与微服务开发的实操指南
本内容涵盖Java 11至Java 17最新特性实战,包括var关键字、字符串增强、模块化系统、Stream API、异步编程、密封类等,并提供图书管理系统实战项目,帮助开发者掌握现代Java开发技巧与工具。
202 1