(1)Flink Source之Text数据源
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read text file from local files system DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // read text file from an HDFS running at nnHost:nnPort DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
本地文件或者HDFS文件地址。
text数据源:
java java java hive hive hadoop hive spark java python
测试代码:
package com.aikfk.dataset; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 11:10 上午 */ public class TextSourceJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取text数据源 DataSet<String> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount"); // flatmap DataSet<Tuple2<String, Integer>> wordcounts = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : s.split(" ")){ collector.collect(new Tuple2<>(word,1)); } } }) // 指定分区key .groupBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<String, Integer>(t1.f0,t1.f1 + t2.f1); } }); wordcounts.print(); } }
运行结果:
(hadoop,1) (java,4) (python,1) (spark,1) (hive,3)
(2)Flink Source之JSON数据源
DataSet<WordCountPOJO> dataSet = env.readTextFile("file:///path/to/my/jsonfile")
本地文件或者HDFS文件地址。
解析JSON,需要添加依赖包jackson-databind
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.2</version> </dependency>
json数据源:
{"word": "java","count": 1} {"word": "hive","count": 11} {"word": "spark","count": 21} {"word": "hadoop","count": 3} {"word": "java","count": 12} {"word": "hive","count": 12}
POJO类:
package com.aikfk.dataset; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 12:55 下午 */ public class WordCountPOJO { public String word; public int count; public WordCountPOJO() { } public WordCountPOJO(String word, int count) { this.word = word; this.count = count; } @Override public String toString() { return "WordCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } }
测试代码:
package com.aikfk.dataset; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 11:54 上午 */ public class JsonSourceJava { final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取json数据源 DataSet<WordCountPOJO> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.json") .map(new MapFunction<String, WordCountPOJO>() { @Override public WordCountPOJO map(String line) throws Exception { WordCountPOJO wordCountPOJO = OBJECT_MAPPER.readValue(line,WordCountPOJO.class); return wordCountPOJO; } }) .groupBy("word") .reduce(new ReduceFunction<WordCountPOJO>() { @Override public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception { return new WordCountPOJO(t1.word,t1.count + t2.count); } }); dataSet.print(); } }
运行结果:
WordCount{word='hadoop', count=3} WordCount{word='java', count=13} WordCount{word='spark', count=21} WordCount{word='hive', count=23}
(3)FlinkSource之CSV数据源
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read a CSV file with three fields DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .types(Integer.class, String.class, Double.class); // read a CSV file with five fields, taking only two of them DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); // read a CSV file with three fields into a POJO (Person.class) with corresponding fields DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode");
csv数据源:
java,12,1 java,10,1 hive,4,1 spark,2,2 spark,3,2 hive,3,1
测试代码:
package com.aikfk.dataset; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 2:04 下午 */ public class CsvSourceJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取csv数据(方式一:映射POJO类对象) DataSet<WordCountPOJO> dataSet1 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv") .pojoType(WordCountPOJO.class,"word","count") .groupBy("word") .reduce(new ReduceFunction<WordCountPOJO>() { @Override public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception { return new WordCountPOJO(t1.word, t1.count + t2.count); } }); dataSet1.print(); /** * WordCount{word='java', count=22} * WordCount{word='spark', count=5} * WordCount{word='hive', count=7} */ // 读取csv数据(方式二:映射成Tuple类,带有三个字段) DataSet<Tuple3<String,Integer,Integer>> dataSet2 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv") .types(String.class,Integer.class,Integer.class) .groupBy(0) .reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() { @Override public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1, Tuple3<String, Integer, Integer> t2) throws Exception { return new Tuple3<>(t1.f0,t1.f1 + t2.f1,t2.f2); } }); dataSet2.print(); /** * (java,22,1) * (spark,5,2) * (hive,7,1) */ // 读取csv数据(方式二:映射成Tuple类,带有三个字段,只取两个字段) DataSet<Tuple2<String,Integer>> dataSet3 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv") // 只取了第一个第二个字段(1表示取,2表示没有取) .includeFields("110") .types(String.class,Integer.class) .groupBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { return new Tuple2<>(t1.f0, t1.f1 + t2.f1); } }); dataSet3.print(); /** * (java,22) * (spark,5) * (hive,7) */ } }
(4)Flink Source之MySQL数据源
第一步:添加Flink-jdbc依赖包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency>
第二步:添加mysql-connector-java依赖包
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
第三步:创建MySQL表
create table wordcount( word varchar(100) not null, count int not null); insert into wordcount(word,count) values("spark",1); insert into wordcount(word,count) values("spark",10); insert into wordcount(word,count) values("java",2); insert into wordcount(word,count) values("java",3); insert into wordcount(word,count) values("hive",5);
第四步:编写程序
// Read data from a relational database using the JDBC input format DataSet<Tuple2<String, Integer> dbData = env.createInput( JdbcInputFormat.buildJdbcInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() );
测试代码:
package com.aikfk.dataset; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.connector.jdbc.JdbcInputFormat; import org.apache.flink.types.Row; /** * @author :caizhengjie * @description:TODO * @date :2021/3/7 5:08 下午 */ public class MySQLSourceJava { public static void main(String[] args) throws Exception { // 准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 读取mysql数据源 DataSet<Row> dbData = env.createInput( JdbcInputFormat.buildJdbcInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://bigdata-pro-m07:3306/flink?serverTimezone=GMT%2B8&useSSL=false") .setUsername("root") .setPassword("199911") .setQuery("select * from wordcount") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() ); /** * DataSet中的数据类型是POJO类 */ DataSet<WordCountPOJO> dataSet = dbData.map(new MapFunction<Row, WordCountPOJO>() { @Override public WordCountPOJO map(Row row) throws Exception { return new WordCountPOJO(String.valueOf(row.getField(0)), (Integer) row.getField(1)); } }) .groupBy("word") .reduce(new ReduceFunction<WordCountPOJO>() { @Override public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception { return new WordCountPOJO(t1.word,t1.count + t2.count); } }); dataSet.print(); /** * WordCount{word='java', count=5} * WordCount{word='spark', count=11} * WordCount{word='hive', count=5} */ // /** // * DataSet中的数据类型是Tuple类 // */ // DataSet<Tuple2<String,Integer>> dataSet = dbData.map(new MapFunction<Row, Tuple2<String, Integer>>() { // @Override // public Tuple2<String, Integer> map(Row row) throws Exception { // return new Tuple2<>(String.valueOf(row.getField(0)), (Integer) row.getField(1)); // } // }) // .groupBy(0) // .reduce(new ReduceFunction<Tuple2<String, Integer>>() { // @Override // public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { // return new Tuple2<>(t1.f0, t1.f1 + t2.f1); // } // }); } }
运行结果:
WordCount{word='java', count=5} WordCount{word='spark', count=11} WordCount{word='hive', count=5}