Flink1.12官网对dataset数据输出(sink)的解释:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/batch/#data-sinks
(1)Flink Sink之TextFile
将DataSet数据以Text0utputFormat文本格式写入文件系统,其中文件系统可以是本地文件系统,也可以是HDFS文件系统,根据用户指定路径的前缀进行识别。
// write DataSet to a file on the local file system textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
writeAsText直接将DataSet数据输出到指定文件,在使用write相关方式输出文件的过程中,用户也可以指定写入文件的模式,分为0VERWRITE
模式和NOT_OVERWRITE
模式,前者代表对文件内容进行覆盖写入,后者代表输出的数据将追加到文件尾部。
示例代码:
package com.aikfk.flink.dataset.sink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 11:14 下午 */ public class TextSinkJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * String -> flatMap() -> groupBy() -> reduceGroup() -> Tuple2 */ DataSet<Tuple2<String,Integer>> wordcount = dateSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String, Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); // 写入到text文件中,写的模式是覆盖之前的内容 wordcount.writeAsText("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/textfile", FileSystem.WriteMode.OVERWRITE); env.execute(); } }
查看结果集
(hadoop,1) (hive,2) (java,6) (spark,2)
(2)Flink Sink之CSV
该方法将数据集以CSV文件格式输出到指定文件系统中,并且可以在输出方法中指定行切割符、列切割符等基于csv文件的配置。
dataSet.writeAsCsv(本地文件路径,指定行切割符,列切割符) 例如: dataSet.writeAsCsv("file://path/file", "\n" , ",")
示例代码:
package com.aikfk.flink.dataset.sink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 11:14 下午 */ public class CsvSinkJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * String -> flatMap() -> groupBy() -> reduceGroup() -> Tuple2 */ DataSet<Tuple2<String,Integer>> wordcount = dateSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String, Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); // 写入到csv文件里,指定行切割符、列切割符 wordcount.writeAsCsv("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/csvfile.csv", "\n",","); env.execute(); } }
查看结果集:
hadoop,1 hive,2 java,6 spark,2
(3)Flink Sink之MySQL
Flink也支持将结果集写入到外部数据中,比如mysql数据库
// write Tuple DataSet to a relational database myResult.output( // build and configure OutputFormat JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() );
示例代码:
package com.aikfk.flink.dataset.sink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcOutputFormat; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/8 11:14 下午 */ public class MySQLSinkJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataSet<String> dateSource = env.fromElements( "java java spark hive", "hive java java spark", "java java hadoop" ); /** * String -> flatMap() -> groupBy() -> reduceGroup() -> Tuple2 */ DataSet<Tuple2<String,Integer>> wordcount = dateSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception { String key = null; int count = 0; for (Tuple2<String, Integer> tuple2 : iterable){ key = tuple2.f0; count = count + tuple2.f1; } collector.collect(new Tuple2<>(key,count)); } }); /** * Tuple2 -> map() -> Row */ DataSet<Row> wordcountMysql = wordcount.map(new MapFunction<Tuple2<String, Integer>, Row>() { @Override public Row map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return Row.of(stringIntegerTuple2.f0,stringIntegerTuple2.f1); } }); // write Tuple DataSet to a relational database wordcountMysql.output( // build and configure OutputFormat JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://bigdata-pro-m07:3306/flink?serverTimezone=GMT%2B8&useSSL=false") .setUsername("root") .setPassword("199911") .setQuery("insert into wordcount (word, count) values (?,?)") .finish() ); env.execute(); } }
查看结果集:
mysql> select * from wordcount; +--------+-------+ | word | count | +--------+-------+ | hadoop | 1 | | hive | 2 | | java | 6 | | spark | 2 | +--------+-------+ 4 rows in set (0.01 sec)