01 引言
在前面的博客,我们已经可以基本把Flink
的环境搭建起来了,有兴趣的同学可以参阅下:
Flink
的环境既然有了,那么本文开始讲解Flink
的入门案例。
02 开发前准备
在写入门案例之前,需要知道一些概念,即:API
和编程模型。
2.1 API
在 《Flink教程(02)- Flink入门》讲述过Flink
是由以下组件栈组成的:
- 物理部署层
- RuntimeTime核心层
- API & Libraires
- 扩展库
Flink提供了多个层次的API
供开发者使用,越往上抽象程度越高,使用起来越方便,越往下越底层,使用起来难度越大。
注意:在Flink1.12
时支持流批一体,DataSetAPI
已经不推荐使用了,所以会优先使用DataStream
流式API
,既支持无界数据处理/流处理,也支持有界数据处理/批处理!
参阅文献:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
- https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
2.2 编程模型
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
Flink
应用程序结构主要包含三部分,Source/Transformation/Sink
,如下图所示:
03 入门案例
现在提一个需求:使用Flink
实现WordCount
。
3.1 项目搭建
首先pom
文件添加依赖(为了方便以后的项目演示,这里的pom
文件依赖了将来要讲解的库):
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ylw</groupId> <artifactId>flink-demo</artifactId> <version>1.0.0</version> <description>Flink demo</description> <developers> <developer> <name>Yang Lin Wei</name> <url>https://yanglinwei.blog.csdn.net/</url> </developer> </developers> <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>apache</id> <url>https://repository.apache.org/content/repositories/snapshots/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.12.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- flink执行计划,这是1.9版本之前的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink执行计划,1.11+默认的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!-- flink连接器--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!-- <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>--> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> <exclusions> <exclusion> <artifactId>flink-streaming-java_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-runtime_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-java</artifactId> <groupId>org.apache.flink</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-10.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> <!--<version>8.0.20</version>--> </dependency> <!-- 高性能异步组件:Vertx--> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-jdbc-client</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-redis-client</artifactId> <version>3.9.0</version> </dependency> <!-- 日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <scope>provided</scope> </dependency> <!-- 参考:https://blog.csdn.net/f641385712/article/details/84109098--> <!--<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency>--> <!--<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> <scope>provided</scope> </dependency>--> <!--<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.2-jre</version> </dependency>--> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <!-- 打包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 设置jar包的入口类(可选) --> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
然后配置log4j.properties
文件:
log4j.rootLogger=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3.2 代码实现
这里我们按照官方的入门案例来讲解:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
开发步骤:
- step1:准备环境-env
- step2:准备数据-source
- step3:处理数据-transformation
- step4:输出结果-sink
- step5:触发执行-execute
创建环境可以使用如下3种方式:
getExecutionEnvironment() //推荐使用 createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
3.2.1 基于DataSet
/** * 需求:使用Flink完成WordCount-DataSet * * @author : YangLinWei * @createTime: 2022/3/7 9:47 上午 * <p> * 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute */ public class WordCount1 { /** * 老版本的批处理API如下,但已经不推荐使用了 */ public static void main(String[] args) throws Exception { //1.准备环境-env ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.准备数据-source DataSet<String> lineDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; } */ DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { //value就是一行行的数据 String[] words = value.split(" "); for (String word : words) { out.collect(word);//将切割处理的一个个的单词收集起来并返回 } } }); //3.2对集合中的每个单词记为1 /* public interface MapFunction<T, O> extends Function, Serializable { O map(T value) throws Exception; } */ DataSet<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { //value就是进来一个个的单词 return Tuple2.of(value, 1); } }); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOnesDS.groupBy(0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1); //3.5排序 DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1); //4.输出结果-sink result.print(); //5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute //env.execute();//'execute()', 'count()', 'collect()', or 'print()'. } }
运行结果:
3.2.2 基于DataStream
/** * 需求:使用Flink完成WordCount-DataStream * * @author : YangLinWei * @createTime: 2022/3/7 10:02 上午 * <p> * 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute */ public class WordCount2 { /** * 新版本的流批统一API,既支持流处理也支持批处理 */ public static void main(String[] args) throws Exception { //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; } */ DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { //value就是一行行的数据 String[] words = value.split(" "); for (String word : words) { out.collect(word);//将切割处理的一个个的单词收集起来并返回 } } }); //3.2对集合中的每个单词记为1 /* public interface MapFunction<T, O> extends Function, Serializable { O map(T value) throws Exception; } */ DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { //value就是进来一个个的单词 return Tuple2.of(value, 1); } }); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0); KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.输出结果-sink result.print(); //5.触发执行-execute env.execute();//DataStream需要调用execute } }
运行结果:
3.2.3 Lambda版
/** * 需求:使用Flink完成WordCount-DataStream--使用lambda表达式 * * @author : YangLinWei * @createTime: 2022/3/7 10:05 上午 * <p> * 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute */ public class WordCount3 { public static void main(String[] args) throws Exception { //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation //3.1每一行数据按照空格切分成一个个的单词组成一个集合 /* public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; } */ //lambda表达式的语法: // (参数)->{方法体/函数体} //lambda表达式就是一个函数,函数的本质就是对象 DataStream<String> wordsDS = linesDS.flatMap( (String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect) ).returns(Types.STRING); //3.2对集合中的每个单词记为1 /* public interface MapFunction<T, O> extends Function, Serializable { O map(T value) throws Exception; } */ /*DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map( (String value) -> Tuple2.of(value, 1) ).returns(Types.TUPLE(Types.STRING, Types.INT));*/ DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map( (String value) -> Tuple2.of(value, 1) , TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { }) ); //3.3对数据按照单词(key)进行分组 //0表示按照tuple中的索引为0的字段,也就是key(单词)进行分组 //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0); //KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0); KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0); //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加! DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.输出结果-sink result.print(); //5.触发执行-execute env.execute(); }
输出结果:
3.2.3 在Yarn上运行(待验证)
step1:首先运行hadoop
cd /usr/local/Cellar/hadoop/3.3.1/sbin ./start-all.sh
step2:注意写入HDFS
存在权限问题,进行如下设置:
hadoop fs -chmod -R 777 /
step3:代码如下(注意代码添加了:System.setProperty("HADOOP_USER_NAME", "root")
):
/** * 需求:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行 * * @author : YangLinWei * @createTime: 2022/3/7 10:11 上午 * <p> * 编码步骤: * 1.准备环境-env * 2.准备数据-source * 3.处理数据-transformation * 4.输出结果-sink * 5.触发执行-execute//批处理不需要调用!流处理需要 */ public class WordCount4 { public static void main(String[] args) throws Exception { //获取参数 ParameterTool params = ParameterTool.fromArgs(args); String output = null; if (params.has("output")) { output = params.get("output"); } else { output = "hdfs://127.0.0.1:8020/wordcount/output_" + System.currentTimeMillis(); } //1.准备环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //2.准备数据-source DataStream<String> linesDS = env.fromElements("ylw hadoop spark", "ylw hadoop spark", "ylw hadoop", "ylw"); //3.处理数据-transformation DataStream<Tuple2<String, Integer>> result = linesDS .flatMap( (String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect) ).returns(Types.STRING) .map( (String value) -> Tuple2.of(value, 1) ).returns(Types.TUPLE(Types.STRING, Types.INT)) //.keyBy(0); .keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0) .sum(1); //4.输出结果-sink result.print(); //如果执行报hdfs权限相关错误,可以执行 hadoop fs -chmod -R 777 / System.setProperty("HADOOP_USER_NAME", "root");//设置用户名 //result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1); result.writeAsText(output).setParallelism(1); //5.触发执行-execute env.execute(); } }
step4:打包
step5:打包成功后jar包:
step5:提交至服务器指定的路径(参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html):
- 注意:本系统是mac系统,所有环境都在本机,所以就不上传了。
step6:Flink命令执行jar:
flink run -Dexecution.runtime-mode=BATCH \ -m yarn-cluster \ -yjm 1024 \ -ytm 1024 \ -c com.ylw.WordCount4 \ 项目路径/flink-demo/target/original-flink-demo-1.0.0.jar\ --output hdfs://127.0.0.1:8020/wordcount/output_xx
在Web页面可以观察到提交的程序:
或者在Standalone
模式下使用web
界面提交
05 文末
本文主要讲解了Flink
的入门例子,主要基于DataSet
和DataStream
来讲解,谢谢大家的阅读,本文完!