Flink基础实操-计算单词出现次数

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink基础实操-计算单词出现次数


1.环境介绍


本次用到的环境有:

**JDK1.8

Flink 1.13.0

Oracle Linux7.4

**


2.Scala交互统计:


1.启动flink进入 scala 交互模式。

start-scala-shell.sh local


2.输入脚本,求词频。


val wordcount=benv.readTextFile(“file:///root/experiment/datas/wordcount.txt”).flatMap(_.split("\t")).map((_,1)).groupBy(0).sum(1)

wordcount.print


3.JAVA工程统计


打开IDEA集成开发工具:

选择 Create New Project菜单项,创建一个新的项目工程,在新的项目工程中选择窗口左侧的Maven菜单项,如图:

点击Next按钮,如下图:

输入框GroupId中填写experiment, 输入框ArtifactId中填写project,如下图:

点击Next按钮,如下图:

点击Finish按钮,如下图:

点击提示窗口中的close按钮,如下图:


在右下角弹出的对话框中,选择Enable Auto-Import(如未弹出该对话框请忽略此步骤)

更改pom.xml文件,如下图:


<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.13.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

鼠标点击项目中Java文件夹,单击右键选择New,子菜单中选择Package,如下图所示:


弹出对话框,如下图:

窗口输入框中填写创建的包名flink,如下图:

点击按钮OK,如下图:



鼠标点击包flink,单击右键选择New,子菜单中选择java Class,如下图所示:

  1. 弹出窗口,如下图:

弹出窗口中填写类名,如下图:


创建的WordCountJava类中,输入代码:

package flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountJava {
public static void main(String[] args) throws Exception {
// 创建Flink运行的上下文环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建DataSet,这里我们的输入是一行一行的文本
DataSet<String> text = env.fromElements(
“Flink Spark Storm”,
“Flink Flink Flink”,
“Spark Spark Spark”,
“Storm Storm Storm”
);
// 通过Flink内置的转换函数进行计算
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//结果打印
counts.printToErr();
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 将文本分割
String[] tokens = value.toLowerCase().split("\\W+");
// 输出内容到控制台
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}

代码中点击鼠标右键选择Run运行程序

执行结果如下图

至此,Flink基础实操-计算单词出现次数

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
6月前
|
SQL 网络安全 API
实时计算 Flink版产品使用问题之使用ProcessTime进行窗口计算,并且有4台机器的时间提前了2个小时,会导致什么情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行DWS层的实时聚合计算时,遇到多次更新同一个字段的情况,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
流计算
Flink CDC里假设我做widow计算使用ProcessTime计算
【1月更文挑战第23天】【1月更文挑战第113篇】Flink CDC里假设我做widow计算使用ProcessTime计算
231 45
|
消息中间件 分布式计算 Kafka
将Apache Flink任务实时消费Kafka窗口的计算改为MaxCompute
将Apache Flink任务实时消费Kafka窗口的计算改为MaxCompute
149 6
|
消息中间件 并行计算 Java
10分钟了解Flink窗口计算
在有状态流处理中,时间在计算中起着重要的作用。比如,当进行时间序列分析、基于特定时间段进行聚合,或者进行事件时间去处理数据时,都与时间相关。接下来将重点介绍在使用实时Flink应用程序时应该考虑的跟时间相关的一些元素。
10分钟了解Flink窗口计算
|
存储 SQL 消息中间件
Apache Flink 不止于计算,数仓架构或兴起新一轮变革
在大数据实时化转型大趋势之下,Flink 不只能做一件事情。
Apache Flink 不止于计算,数仓架构或兴起新一轮变革
|
存储 消息中间件 关系型数据库
三、【计算】Exactly Once 语义在Flink中的实现(下) | 青训营笔记
三、【计算】Exactly Once 语义在Flink中的实现(下) | 青训营笔记
三、【计算】Exactly Once 语义在Flink中的实现(下) | 青训营笔记
|
SQL 运维 OLAP
二、【计算】流|批|OLAP一体 的Flink引擎(下) | 青训营笔记
二、【计算】流|批|OLAP一体 的Flink引擎(下) | 青训营笔记
二、【计算】流|批|OLAP一体 的Flink引擎(下) | 青训营笔记