【Flink】Flink Java 统计词频 开发

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发

在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。  

  1. 开发环境:

在开始之前,请确保你的开发环境已经安装了以下软件:

  • JDK 1.8或更高版本
  • Maven 3.x
  • Apache Flink 1.x

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我 们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

  1. Maven环境准备:

配置基础的Maven环境:

在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。  

单词计数:使用批处理的方式统计词频:

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了

public class WorldCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境:
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件中读取数据:
        DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt");
        //3.将每个单词提取出来,进行分词,转换成为一个二元组:
        FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行拆分,转换为二元组:
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            //指定数据返回值类型:
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组:
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置
        //5.分组内进行聚合统计:
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        //6.打印输出:
        sum.print();
    }
}

单词计数:使用流处理的方式统计词频:

我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。  下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个流式的执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件:
        DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt");
        //3.转换计算:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

时时监听端口输入:

public class StreamWordCountTrue {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从参数中提取主机名:
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String hostname = fromArgs.get("host");
        Integer port = fromArgs.getInt("port");
        //2.监听端口,时时获取数据:
        DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port);
        //3.对获取到的文本流进行处理:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

IDEA配置输入参数:

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9天前
|
监控 JavaScript 前端开发
《理解 WebSocket:Java Web 开发的实时通信技术》
【4月更文挑战第4天】WebSocket是Java Web实时通信的关键技术,提供双向持久连接,实现低延迟、高效率的实时交互。适用于聊天应用、在线游戏、数据监控和即时通知。开发涉及服务器端实现、客户端连接及数据协议定义,注意安全、错误处理、性能和兼容性。随着实时应用需求增加,WebSocket在Java Web开发中的地位将更加重要。
|
10天前
|
Java 开发工具 流计算
flink最新master代码编译出现Java Runtime Environment 问题
在尝试编译Flink源码时遇到Java运行时环境致命错误:EXCEPTION_ACCESS_VIOLATION。问题出现在JVM.dll+0x88212。使用的是Java 11.0.28和Java HotSpot(TM) 64-Bit Server VM。系统为Windows客户端,没有生成核心dump文件。错误日志保存在hs_err_pid39364.log和replay_pid39364.log。要解决这个问题,建议检查JDK版本兼容性,更新JDK或参照错误报告文件提交Bug至http://bugreport.java.com/bugreport/crash.jsp。
|
3天前
|
运维 NoSQL 算法
Java开发-深入理解Redis Cluster的工作原理
综上所述,Redis Cluster通过数据分片、节点发现、主从复制、数据迁移、故障检测和客户端路由等机制,实现了一个分布式的、高可用的Redis解决方案。它允许数据分布在多个节点上,提供了自动故障转移和读写分离的功能,适用于需要大规模、高性能、高可用性的应用场景。
9 0
|
4天前
|
人工智能 小程序 Java
JAVA开发智慧学校系统源码+人脸电子班牌布局
智慧校园是通过利用物联网,大数据技术来改变师生和校园资源相互交互的方式,以便提高交互的明确性、灵活性和响应速度,从而实现智慧化服务和管理的校园模式。
|
10天前
|
XML JSON JavaScript
使用JSON和XML:数据交换格式在Java Web开发中的应用
【4月更文挑战第3天】本文比较了JSON和XML在Java Web开发中的应用。JSON是一种轻量级、易读的数据交换格式,适合快速解析和节省空间,常用于API和Web服务。XML则提供更强的灵活性和数据描述能力,适合复杂数据结构。Java有Jackson和Gson等库处理JSON,JAXB和DOM/SAX处理XML。选择格式需根据应用场景和需求。
|
10天前
|
前端开发 Java API
构建RESTful API:Java中的RESTful服务开发
【4月更文挑战第3天】本文介绍了在Java环境中构建RESTful API的重要性及方法。遵循REST原则,利用HTTP方法处理资源,实现CRUD操作。在Java中,常用框架如Spring MVC简化了RESTful服务开发,包括定义资源、设计表示层、实现CRUD、考虑安全性、文档和测试。通过Spring MVC示例展示了创建RESTful服务的步骤,强调了其在现代Web服务开发中的关键角色,有助于提升互操作性和用户体验。
构建RESTful API:Java中的RESTful服务开发
|
15天前
|
存储 安全 Java
【Java技术专题】「Guava开发指南」手把手教你如何进行使用Guava工具箱进行开发系统实战指南(不可变集合篇)
【Java技术专题】「Guava开发指南」手把手教你如何进行使用Guava工具箱进行开发系统实战指南(不可变集合篇)
24 1
|
15天前
|
Java API Apache
【Java技术专题】「Guava开发指南」手把手教你如何进行使用Guava工具箱进行开发系统实战指南(基础编程篇)
【Java技术专题】「Guava开发指南」手把手教你如何进行使用Guava工具箱进行开发系统实战指南(基础编程篇)
43 0
|
20天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
32 2
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5

热门文章

最新文章