Flink1.12官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/
(1)基于IDEA配置开发环境
基本环境:
IntelliJ IDEA 2021
apache-maven-3.5.4
jdk1.8.0_271
scala-2.11.12
配置pom.xml文件:
https://flink.apache.org/zh/downloads.html#section-9
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <flink.version>1.12.1</flink.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--java--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--scala--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
(2)基于Java语言的WordCount程序开发
package com.aikfk.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/5 3:07 下午 */ public class WordCountJava { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env.socketTextStream("bigdata-pro-m07",9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String,Tuple2<String,Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")){ out.collect(new Tuple2<String,Integer>(word,1)); } } } }
运行结果
3> (java,2) 2> (hive,2) 3> (java,1) 2> (hive,1) 1> (spark,2)
(3)基于Scala语言的WordCount程序开发
package com.aikfk.flink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object WindowWordCount { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("bigdata-pro-m07", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1) counts.print() env.execute("Window Stream WordCount") } }
运行结果
3> (java,2) 2> (hive,2) 3> (java,1) 2> (hive,1) 1> (spark,2)
(4)程序打包并发布测试
IDEA中打Jar包时要特别注意这里,不能用默认的地址,需要改一下MANIFEST. MF的地址到src目录
启动集群服务:
bin/start-cluster.sh
运行jar包:
bin/flink run -jar /opt/jars/WordCountJava.jar com.aikfk.flink.WordCountJava
Job has been submitted with JobID eff5617c683e2ea65bd498b4c4d062a8
运行NC服务:
nc -lk 9999
查看结果:
cat flink-root-taskexecutor-0-bigdata-pro-m07.out
(def,1) (fdfr,1) (sdijo,1) (re,1) (sedwvf,1) (f,1) (wedfwb,1)
查看WEB监控: