1.环境介绍
本次用到的环境有:
**JDK1.8
Flink 1.13.0
Oracle Linux7.4
**
2.Scala交互统计:
1.启动flink进入 scala 交互模式。
start-scala-shell.sh local
2.输入脚本,求词频。
val wordcount=benv.readTextFile(“file:///root/experiment/datas/wordcount.txt”).flatMap(_.split("\t")).map((_,1)).groupBy(0).sum(1)
wordcount.print
3.JAVA工程统计
打开IDEA集成开发工具:
选择 Create New Project菜单项,创建一个新的项目工程,在新的项目工程中选择窗口左侧的Maven菜单项,如图:
点击Next按钮,如下图:
输入框GroupId中填写experiment, 输入框ArtifactId中填写project,如下图:
点击Next按钮,如下图:
点击Finish按钮,如下图:
点击提示窗口中的close按钮,如下图:
在右下角弹出的对话框中,选择Enable Auto-Import(如未弹出该对话框请忽略此步骤)
更改pom.xml文件,如下图:
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.13.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
鼠标点击项目中Java文件夹,单击右键选择New,子菜单中选择Package,如下图所示:
弹出对话框,如下图:
窗口输入框中填写创建的包名flink,如下图:
点击按钮OK,如下图:
鼠标点击包flink,单击右键选择New,子菜单中选择java Class,如下图所示:
- 弹出窗口,如下图:
弹出窗口中填写类名,如下图:
创建的WordCountJava类中,输入代码:
package flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountJava { public static void main(String[] args) throws Exception { // 创建Flink运行的上下文环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建DataSet,这里我们的输入是一行一行的文本 DataSet<String> text = env.fromElements( “Flink Spark Storm”, “Flink Flink Flink”, “Spark Spark Spark”, “Storm Storm Storm” ); // 通过Flink内置的转换函数进行计算 DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); //结果打印 counts.printToErr(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // 将文本分割 String[] tokens = value.toLowerCase().split("\\W+"); // 输出内容到控制台 for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
代码中点击鼠标右键选择Run运行程序
执行结果如下图
至此,Flink基础实操-计算单词出现次数