文章目录
前言
Apache Flink是一个框架和分布式处理引擎,用于无界和有界数据流的有状态计算。Flink被设计成可以在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。
Flink的优点:
- 批流一体化.
- 同时支持高吞吐、低延迟、高性能.
- 支持事件时间(Event Time).
- 支持有状态计算.
- 支持高度灵活的窗口(Window)操作.
1.wordcount案例
1.1 java版本
public static void main( String[] args ) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> ds = env.readTextFile("./data/data.txt");
//切割单词
FlatMapOperator<String, String> words = ds.flatMap((String lines, Collector<String> collection) -> {
String[] arr = lines.split(" ");
for (String s : arr) {
collection.collect(s);
}
}).returns(Types.STRING);
//统计单词
MapOperator<String, Tuple2<String, Long>> WordsDS =
words.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
WordsDS.groupBy(0).sum(1).print();
}
输出:
(flink1,1)
(flink,1)
(hello,3)
(spark,1)
1.2 流式处理
public static void main( String[] args ) throws Exception {
//1.创建流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件数据
DataStreamSource<String> ds = env.readTextFile("./data/data.txt");
//切分单词
SingleOutputStreamOperator<Tuple2<String, Long>> wordDs=
ds.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
wordDs.keyBy(tp->tp.f0).sum(1).print();
env.execute();
}
输出
1> (spark,1)
5> (hello,1)
13> (flink,1)
9> (flink1,1)
5> (hello,2)
5> (hello,3)
1.3 scala版本
//
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.api.scala._
//加载文件
val linesDS: DataSet[String] = env.readTextFile("./data/data.txt")
//输出
linesDS.flatMap(line => {
line.split(" ")
}).map((_, 1)).groupBy(0).sum(1).print()
1.4 scala流式处理
//创建环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取文件
val ds: DataStream[String] = env.readTextFile("./data/data.txt")
//统计
ds.flatMap(line=>{line.split(" ")})
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
//执行
env.execute()