【Flink】Flink Java 统计词频 开发

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发

在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Flink底层是 以Java编写的,并为开发人员同时提供了完整的Java和Scala API。  

  1. 开发环境:

在开始之前,请确保你的开发环境已经安装了以下软件:

  • JDK 1.8或更高版本
  • Maven 3.x
  • Apache Flink 1.x

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我 们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。

  1. Maven环境准备:

配置基础的Maven环境:

在项目的pom文件中,增加标签设置属性,然后增加标签引 入需要的依赖。我们需要添加的依赖最重要的就是 Flink 的相关组件,包括 flink-java、 flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点 奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink 的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。

搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的 示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的 WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的 Hello World。  

单词计数:使用批处理的方式统计词频:

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了

public class WorldCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境:
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件中读取数据:
        DataSource<String> source = env.readTextFile("E:\\back_end\\demo\\src\\main\\resources\\data\\data_txt.txt");
        //3.将每个单词提取出来,进行分词,转换成为一个二元组:
        FlatMapOperator<String, Tuple2<String, Long>> wordAndTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行拆分,转换为二元组:
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
            //指定数据返回值类型:
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.按照word进行分组:
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndTuple.groupBy(0);//给定元组索引位置
        //5.分组内进行聚合统计:
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        //6.打印输出:
        sum.print();
    }
}

单词计数:使用流处理的方式统计词频:

我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。  下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1.创建一个流式的执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取文件:
        DataStreamSource<String> source = executionEnvironment.readTextFile("src/main/resources/data/data_txt.txt");
        //3.转换计算:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

时时监听端口输入:

public class StreamWordCountTrue {
    public static void main(String[] args) throws Exception {
        //1.创建流式执行环境:
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从参数中提取主机名:
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String hostname = fromArgs.get("host");
        Integer port = fromArgs.getInt("port");
        //2.监听端口,时时获取数据:
        DataStreamSource<String> source = executionEnvironment.socketTextStream(hostname, port);
        //3.对获取到的文本流进行处理:
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = source.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4.分组操作:
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        //5.求和:
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        //6.输出:
        sum.print();
        //7.启动执行:
        executionEnvironment.execute();
    }
}

IDEA配置输入参数:

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2天前
|
存储 搜索推荐 Java
探索安卓开发中的自定义视图:打造个性化UI组件Java中的异常处理:从基础到高级
【8月更文挑战第29天】在安卓应用的海洋中,一个独特的用户界面(UI)能让应用脱颖而出。自定义视图是实现这一目标的强大工具。本文将通过一个简单的自定义计数器视图示例,展示如何从零开始创建一个具有独特风格和功能的安卓UI组件,并讨论在此过程中涉及的设计原则、性能优化和兼容性问题。准备好让你的应用与众不同了吗?让我们开始吧!
|
9天前
|
IDE Java 开发工具
快速上手指南:如何用Spring Boot开启你的Java开发之旅?
【8月更文挑战第22天】Spring Boot由Pivotal团队开发,简化了Spring应用的创建过程。本文详述了从零开始搭建Spring Boot项目的步骤:首先确保安装了新版JDK、Maven/Gradle及IDE如IntelliJ IDEA或Eclipse;接着访问Spring Initializr网站(start.spring.io),选择所需依赖(如Web模块)并生成项目;最后,使用IDE打开生成的项目,添加`@SpringBootApplication`注解及main方法来启动应用。通过这些步骤,即便是新手也能快速上手,专注于业务逻辑的实现。
25 1
|
11天前
|
Java 持续交付 项目管理
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。它采用项目对象模型(POM)来描述项目,简化构建流程。Maven提供依赖管理、标准构建生命周期、插件扩展等功能,支持多模块项目及版本控制。在Java Web开发中,Maven能够自动生成项目结构、管理依赖、自动化构建流程并运行多种插件任务,如代码质量检查和单元测试。遵循Maven的最佳实践,结合持续集成工具,可以显著提升开发效率和项目质量。
29 1
|
11天前
|
Java API 数据库
详细介绍如何使用Spring Boot简化Java Web开发过程。
Spring Boot简化Java Web开发,以轻量级、易用及高度可定制著称。通过预设模板和默认配置,开发者可迅速搭建Spring应用。本文通过创建RESTful API示例介绍其快速开发流程:从环境准备、代码编写到项目运行及集成数据库等技术,展现Spring Boot如何使Java Web开发变得更高效、简洁。
32 1
|
13天前
|
分布式计算 Java API
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率
Java 8带来了流处理与函数式编程等新特性,极大提升了开发效率。流处理采用声明式编程模型,通过filter、map等操作简化数据集处理,提高代码可读性。Lambda表达式支持轻量级函数定义,配合Predicate、Function等接口,使函数式编程无缝融入Java。此外,Optional类及新日期时间API等增强功能,让开发者能更优雅地处理潜在错误,编写出更健壮的应用程序。
20 1
|
1天前
|
IDE Java Linux
探索安卓开发:从基础到进阶的旅程Java中的异常处理:从基础到高级
【8月更文挑战第30天】在这个数字时代,移动应用已经成为我们日常生活中不可或缺的一部分。安卓系统由于其开放性和灵活性,成为了开发者的首选平台之一。本文将带领读者踏上一段从零开始的安卓开发之旅,通过深入浅出的方式介绍安卓开发的基础知识、核心概念以及进阶技巧。我们将一起构建一个简单的安卓应用,并探讨如何优化代码以提高性能和应用的用户体验。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供宝贵的知识和启发。
|
2天前
|
前端开发 安全 Java
在Java服务器端开发的浩瀚宇宙中,Servlet与JSP犹如两颗璀璨的明星,它们联袂登场,共同编织出动态网站的绚丽篇章。
在Java服务器端开发的浩瀚宇宙中,Servlet与JSP犹如两颗璀璨的明星,它们联袂登场,共同编织出动态网站的绚丽篇章。
5 0
|
7天前
|
Java 编译器 开发工具
JDK vs JRE:面试大揭秘,一文让你彻底解锁Java开发和运行的秘密!
【8月更文挑战第24天】JDK(Java Development Kit)与JRE(Java Runtime Environment)是Java环境中两个核心概念。JDK作为开发工具包,不仅包含JRE,还提供编译器等开发工具,支持Java程序的开发与编译;而JRE仅包含运行Java程序所需的组件如JVM和核心类库。一个简单的&quot;Hello, World!&quot;示例展示了两者用途:需借助JDK编译程序,再利用JRE或JDK中的运行环境执行。因此,开发者应基于实际需求选择安装JDK或JRE。
30 0
|
11天前
|
数据可视化 IDE Java
Java8的Stream流太难用了?看看JDFrame如何简化开发
【8月更文挑战第21天】在Java的世界里,Java 8引入的Stream API无疑是一场革命,它极大地提升了集合处理的表达能力和简洁性。然而,对于许多开发者而言,尤其是那些刚从旧版本Java迁移过来的开发者,Stream API的复杂性和抽象性可能会让人感到困惑和挫败。今天,我们就来探讨如何通过JDFrame这样的框架或工具,来简化Java 8 Stream的使用,提升开发效率。
22 0
|
12天前
|
SQL 缓存 测试技术
实时计算 Flink版产品使用问题之如何实现滚动窗口统计用户不重复的总数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
云函数