Flink之DataSet数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 笔记

(1)Flink Source之Text数据源


5.png

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

本地文件或者HDFS文件地址。

text数据源:

java java java hive
hive hadoop hive spark
java python

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 11:10 上午
 */
public class TextSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取text数据源
        DataSet<String> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount");
        // flatmap
        DataSet<Tuple2<String, Integer>> wordcounts = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        })
        // 指定分区key
        .groupBy(0)
        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<String, Integer>(t1.f0,t1.f1 + t2.f1);
            }
        });
        wordcounts.print();
    }
}

运行结果:

(hadoop,1)
(java,4)
(python,1)
(spark,1)
(hive,3)


(2)Flink Source之JSON数据源


DataSet<WordCountPOJO> dataSet = env.readTextFile("file:///path/to/my/jsonfile")

本地文件或者HDFS文件地址。

解析JSON,需要添加依赖包jackson-databind

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.12.2</version>
</dependency>

json数据源:

{"word": "java","count": 1}
{"word": "hive","count": 11}
{"word": "spark","count": 21}
{"word": "hadoop","count": 3}
{"word": "java","count": 12}
{"word": "hive","count": 12}

POJO类:

package com.aikfk.dataset;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 12:55 下午
 */
public class WordCountPOJO {
    public String word;
    public int count;
    public WordCountPOJO() {
    }
    public WordCountPOJO(String word, int count) {
        this.word = word;
        this.count = count;
    }
    @Override
    public String toString() {
        return "WordCount{" +
                "word='" + word + '\'' +
                ", count=" + count +
                '}';
    }
}

测试代码:

package com.aikfk.dataset;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 11:54 上午
 */
public class JsonSourceJava {
    final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取json数据源
        DataSet<WordCountPOJO> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.json")
                .map(new MapFunction<String, WordCountPOJO>() {
                    @Override
                    public WordCountPOJO map(String line) throws Exception {
                        WordCountPOJO wordCountPOJO = OBJECT_MAPPER.readValue(line,WordCountPOJO.class);
                        return wordCountPOJO;
                    }
                })
                .groupBy("word")
                .reduce(new ReduceFunction<WordCountPOJO>() {
                    @Override
                    public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                        return new WordCountPOJO(t1.word,t1.count + t2.count);
                    }
                });
        dataSet.print();
    }
}

运行结果:

WordCount{word='hadoop', count=3}
WordCount{word='java', count=13}
WordCount{word='spark', count=21}
WordCount{word='hive', count=23}


(3)FlinkSource之CSV数据源


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
                         .types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

csv数据源:

java,12,1
java,10,1
hive,4,1
spark,2,2
spark,3,2
hive,3,1

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 2:04 下午
 */
public class CsvSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<WordCountPOJO> dataSet1 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                .pojoType(WordCountPOJO.class,"word","count")
                .groupBy("word")
                .reduce(new ReduceFunction<WordCountPOJO>() {
                    @Override
                    public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                        return new WordCountPOJO(t1.word, t1.count + t2.count);
                    }
                });
        dataSet1.print();
        /**
         * WordCount{word='java', count=22}
         * WordCount{word='spark', count=5}
         * WordCount{word='hive', count=7}
         */
        // 读取csv数据(方式二:映射成Tuple类,带有三个字段)
        DataSet<Tuple3<String,Integer,Integer>> dataSet2 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                .types(String.class,Integer.class,Integer.class)
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1, Tuple3<String, Integer, Integer> t2) throws Exception {
                        return new Tuple3<>(t1.f0,t1.f1 + t2.f1,t2.f2);
                    }
                });
        dataSet2.print();
        /**
         * (java,22,1)
         * (spark,5,2)
         * (hive,7,1)
         */
        // 读取csv数据(方式二:映射成Tuple类,带有三个字段,只取两个字段)
        DataSet<Tuple2<String,Integer>> dataSet3 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                // 只取了第一个第二个字段(1表示取,2表示没有取)
                .includeFields("110")
                .types(String.class,Integer.class)
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                        return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
                    }
                });
        dataSet3.print();
        /**
         * (java,22)
         * (spark,5)
         * (hive,7)
         */
    }
}


(4)Flink Source之MySQL数据源


第一步:添加Flink-jdbc依赖包

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

第二步:添加mysql-connector-java依赖包

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

第三步:创建MySQL表

create table wordcount(
word varchar(100) not null,
count int not null);
insert into wordcount(word,count) values("spark",1);
insert into wordcount(word,count) values("spark",10);
insert into wordcount(word,count) values("java",2);
insert into wordcount(word,count) values("java",3);
insert into wordcount(word,count) values("hive",5);

第四步:编写程序

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JdbcInputFormat.buildJdbcInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 5:08 下午
 */
public class MySQLSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取mysql数据源
        DataSet<Row> dbData = env.createInput(
                        JdbcInputFormat.buildJdbcInputFormat()
                                .setDrivername("com.mysql.jdbc.Driver")
                                .setDBUrl("jdbc:mysql://bigdata-pro-m07:3306/flink?serverTimezone=GMT%2B8&useSSL=false")
                                .setUsername("root")
                                .setPassword("199911")
                                .setQuery("select  * from wordcount")
                                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                                .finish()
                );
        /**
         * DataSet中的数据类型是POJO类
         */
        DataSet<WordCountPOJO> dataSet = dbData.map(new MapFunction<Row, WordCountPOJO>() {
            @Override
            public WordCountPOJO map(Row row) throws Exception {
                return new WordCountPOJO(String.valueOf(row.getField(0)), (Integer) row.getField(1));
            }
        })
        .groupBy("word")
        .reduce(new ReduceFunction<WordCountPOJO>() {
            @Override
            public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                return new WordCountPOJO(t1.word,t1.count + t2.count);
            }
        });
        dataSet.print();
        /**
         * WordCount{word='java', count=5}
         * WordCount{word='spark', count=11}
         * WordCount{word='hive', count=5}
         */
//        /**
//         * DataSet中的数据类型是Tuple类
//         */
//        DataSet<Tuple2<String,Integer>> dataSet = dbData.map(new MapFunction<Row, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(Row row) throws Exception {
//                return new Tuple2<>(String.valueOf(row.getField(0)), (Integer) row.getField(1));
//            }
//        })
//        .groupBy(0)
//        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
//                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
//            }
//        });
    }
}

运行结果:

WordCount{word='java', count=5}
WordCount{word='spark', count=11}
WordCount{word='hive', count=5}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 关系型数据库
Flink数据源问题之读取mysql报错如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
51 0
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
33 2
|
3月前
|
消息中间件 SQL Kafka
Flink数据源问题之定时扫描key如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
41 0
|
3月前
|
存储 Oracle 关系型数据库
Flink CDC 数据源问题之连接释放冲突如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
73 0
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 数据源问题之数据变动如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
34 1
|
3月前
|
SQL Java 数据库连接
Flink CDC 数据源问题之数据源连接如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
43 0
|
3月前
|
Oracle NoSQL 关系型数据库
Flink CDC 数据源问题之定时扫描key如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
26 0
|
2月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
55 3
|
3月前
|
SQL 机器学习/深度学习 HIVE
Flink数据源问题之无法写入数据如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
36 2
|
3月前
|
SQL 关系型数据库 流计算
Flink数据源问题之脏数据如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
45 2