一、环境准备
1.下载地址:https://flink.apache.org
下面那个hadoop的整合包要放到flink的lib中去。
2.上传到linux中去,并解压到相关目录
tar -zxvf flink-1.9.1-… apps/
二、standalone部署
2.1 修改conf中的flink-conf.yaml
# 1.主节点的主机名 jobmanager.rpc.address: hadoop01 # 2.节点的资源槽数 taskmanager.numberOfTaskSlots: 2 # 3.单机的话,暂时不用配置zookeeper的地址
2.2 修改conf中的slaves
# 设置从节点 hadoop02 hadoop03
2.3 拷贝到其他节点
scp -r flink-1.9.1/ hadoop02:$PWD scp -r flink-1.9.1/ hadoop03:$PWD
2.4 启动集群
bin/start-cluster.sh
2.5 测试访问
hadoop01:8081
2.6 页面提交程序jar包
提前开设端口,在上面的提交之前
2.7 命令行窗口提交程序jar包
# --hostname hadoop01 --port 8888 为参数 bin/flink run -m hadoop01 -p 4 -c com.wang.Main.class /jar路径 --hostname hadoop01 --port 8888
三、项目整合【Maven3.x+Jdk8/Scala-2.11】
3.1 pom依赖
<dependencies> <!--如果是java程序 java所需要的jar--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.1</version> <scope>provided</scope> </dependency> <!--如果是scala程序 scala所需要的jar--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.9.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.9.1</version> <scope>provided</scope> </dependency> <!--依赖日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.11</scalaCompatVersion> <scalaVersion>2.11.12</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打jar包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置jar包的入口类(可选) --> <mainClass>com.wang.flink.SocketWordCount</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
3.2 实时计算【DataStream】
1.利用socket通信实现实时的单词计数计算。
2.开启socket端口号
nc -lk 8888
3.java程序
public class StreamingWordCount { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建数据集Source DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); // 3.数据转换Transformations DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { // 输出到收集器 out.collect(word); } } }); // 4.单词和1组合 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }); // 5. 分组聚合 单词:次数 SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = wordAndOne.keyBy(0).sum(1); // 6.Sink 数据下沉 // 这里只打印到控制台 sumed.print(); // 7.启动 env.execute("StreamingWordCount"); } }
3.测试结果
4.打包到集群运行
将程序中的socket通信的主机地址和端口号改为参数形式如:
DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));
1)web页面提交,提前开启socket端口
测试结果如下:
5.java8 lambda表达式的优化
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> { Arrays.stream(line.split(" ")).forEach(w -> { out.collect(Tuple2.of(w, 1)); }); });
3.3 离线计算【DataSet】
1.整理一个文件,放入一些数据如
flink flink spark hadoop
flink vue java
hdfs spark
2.java程序
public class BatchWordCount { public static void main(String[] args) throws Exception { // 1.获取配置 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2.读取数据 DataSource<String> lines = env.readTextFile(args[0]); // 3.切分压平 FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); // 4.离线计算 分组聚合groupBy(),而不是实时计算的keyBy() AggregateOperator<Tuple2<String, Integer>> sumed = wordAndOne.groupBy(0).sum(1); // 5.保存数据 设置并行度 数据几个文件 sumed.writeAsText(args[1]).setParallelism(2); // 6.执行 env.execute("BatchWordCount"); } }