Flink之DataSet数据源

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 笔记

(1)Flink Source之Text数据源


5.png

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from an HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

本地文件或者HDFS文件地址。

text数据源:

java java java hive
hive hadoop hive spark
java python

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 11:10 上午
 */
public class TextSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取text数据源
        DataSet<String> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount");
        // flatmap
        DataSet<Tuple2<String, Integer>> wordcounts = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")){
                    collector.collect(new Tuple2<>(word,1));
                }
            }
        })
        // 指定分区key
        .groupBy(0)
        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                return new Tuple2<String, Integer>(t1.f0,t1.f1 + t2.f1);
            }
        });
        wordcounts.print();
    }
}

运行结果:

(hadoop,1)
(java,4)
(python,1)
(spark,1)
(hive,3)


(2)Flink Source之JSON数据源


DataSet<WordCountPOJO> dataSet = env.readTextFile("file:///path/to/my/jsonfile")

本地文件或者HDFS文件地址。

解析JSON,需要添加依赖包jackson-databind

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.12.2</version>
</dependency>

json数据源:

{"word": "java","count": 1}
{"word": "hive","count": 11}
{"word": "spark","count": 21}
{"word": "hadoop","count": 3}
{"word": "java","count": 12}
{"word": "hive","count": 12}

POJO类:

package com.aikfk.dataset;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 12:55 下午
 */
public class WordCountPOJO {
    public String word;
    public int count;
    public WordCountPOJO() {
    }
    public WordCountPOJO(String word, int count) {
        this.word = word;
        this.count = count;
    }
    @Override
    public String toString() {
        return "WordCount{" +
                "word='" + word + '\'' +
                ", count=" + count +
                '}';
    }
}

测试代码:

package com.aikfk.dataset;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 11:54 上午
 */
public class JsonSourceJava {
    final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取json数据源
        DataSet<WordCountPOJO> dataSet = env.readTextFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.json")
                .map(new MapFunction<String, WordCountPOJO>() {
                    @Override
                    public WordCountPOJO map(String line) throws Exception {
                        WordCountPOJO wordCountPOJO = OBJECT_MAPPER.readValue(line,WordCountPOJO.class);
                        return wordCountPOJO;
                    }
                })
                .groupBy("word")
                .reduce(new ReduceFunction<WordCountPOJO>() {
                    @Override
                    public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                        return new WordCountPOJO(t1.word,t1.count + t2.count);
                    }
                });
        dataSet.print();
    }
}

运行结果:

WordCount{word='hadoop', count=3}
WordCount{word='java', count=13}
WordCount{word='spark', count=21}
WordCount{word='hive', count=23}


(3)FlinkSource之CSV数据源


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
                         .types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

csv数据源:

java,12,1
java,10,1
hive,4,1
spark,2,2
spark,3,2
hive,3,1

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 2:04 下午
 */
public class CsvSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取csv数据(方式一:映射POJO类对象)
        DataSet<WordCountPOJO> dataSet1 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                .pojoType(WordCountPOJO.class,"word","count")
                .groupBy("word")
                .reduce(new ReduceFunction<WordCountPOJO>() {
                    @Override
                    public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                        return new WordCountPOJO(t1.word, t1.count + t2.count);
                    }
                });
        dataSet1.print();
        /**
         * WordCount{word='java', count=22}
         * WordCount{word='spark', count=5}
         * WordCount{word='hive', count=7}
         */
        // 读取csv数据(方式二:映射成Tuple类,带有三个字段)
        DataSet<Tuple3<String,Integer,Integer>> dataSet2 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                .types(String.class,Integer.class,Integer.class)
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> t1, Tuple3<String, Integer, Integer> t2) throws Exception {
                        return new Tuple3<>(t1.f0,t1.f1 + t2.f1,t2.f2);
                    }
                });
        dataSet2.print();
        /**
         * (java,22,1)
         * (spark,5,2)
         * (hive,7,1)
         */
        // 读取csv数据(方式二:映射成Tuple类,带有三个字段,只取两个字段)
        DataSet<Tuple2<String,Integer>> dataSet3 = env.readCsvFile("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/wordcount.csv")
                // 只取了第一个第二个字段(1表示取,2表示没有取)
                .includeFields("110")
                .types(String.class,Integer.class)
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                        return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
                    }
                });
        dataSet3.print();
        /**
         * (java,22)
         * (spark,5)
         * (hive,7)
         */
    }
}


(4)Flink Source之MySQL数据源


第一步:添加Flink-jdbc依赖包

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

第二步:添加mysql-connector-java依赖包

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

第三步:创建MySQL表

create table wordcount(
word varchar(100) not null,
count int not null);
insert into wordcount(word,count) values("spark",1);
insert into wordcount(word,count) values("spark",10);
insert into wordcount(word,count) values("java",2);
insert into wordcount(word,count) values("java",3);
insert into wordcount(word,count) values("hive",5);

第四步:编写程序

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JdbcInputFormat.buildJdbcInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

测试代码:

package com.aikfk.dataset;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/7 5:08 下午
 */
public class MySQLSourceJava {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取mysql数据源
        DataSet<Row> dbData = env.createInput(
                        JdbcInputFormat.buildJdbcInputFormat()
                                .setDrivername("com.mysql.jdbc.Driver")
                                .setDBUrl("jdbc:mysql://bigdata-pro-m07:3306/flink?serverTimezone=GMT%2B8&useSSL=false")
                                .setUsername("root")
                                .setPassword("199911")
                                .setQuery("select  * from wordcount")
                                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                                .finish()
                );
        /**
         * DataSet中的数据类型是POJO类
         */
        DataSet<WordCountPOJO> dataSet = dbData.map(new MapFunction<Row, WordCountPOJO>() {
            @Override
            public WordCountPOJO map(Row row) throws Exception {
                return new WordCountPOJO(String.valueOf(row.getField(0)), (Integer) row.getField(1));
            }
        })
        .groupBy("word")
        .reduce(new ReduceFunction<WordCountPOJO>() {
            @Override
            public WordCountPOJO reduce(WordCountPOJO t1, WordCountPOJO t2) throws Exception {
                return new WordCountPOJO(t1.word,t1.count + t2.count);
            }
        });
        dataSet.print();
        /**
         * WordCount{word='java', count=5}
         * WordCount{word='spark', count=11}
         * WordCount{word='hive', count=5}
         */
//        /**
//         * DataSet中的数据类型是Tuple类
//         */
//        DataSet<Tuple2<String,Integer>> dataSet = dbData.map(new MapFunction<Row, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(Row row) throws Exception {
//                return new Tuple2<>(String.valueOf(row.getField(0)), (Integer) row.getField(1));
//            }
//        })
//        .groupBy(0)
//        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
//                return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
//            }
//        });
    }
}

运行结果:

WordCount{word='java', count=5}
WordCount{word='spark', count=11}
WordCount{word='hive', count=5}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
228 3
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
155 3
|
3月前
|
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
89 0
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
37 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
101 4
实时计算 Flink版产品使用合集之多线程环境中,遇到 env.addSource 添加数据源后没有执行到 env.execut,是为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
118 3
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
60 1
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等