Flink的DataSource三部曲之一:直接API

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 《Flink的DataSource三部曲》系列通过实战熟悉和了解flink的数据源,从内置到自定义逐步上手。

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

  • 本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和了解Flink的DataSource,为以后的深入学习打好基础,由以下三部分组成:
  1. 直接API:即本篇,除了准备环境和工程,还学习了StreamExecutionEnvironment提供的用来创建数据来的API;
  2. 内置connector:StreamExecutionEnvironment的addSource方法,入参可以是flink内置的connector,例如kafka、RabbitMQ等;
  3. 自定义:StreamExecutionEnvironment的addSource方法,入参可以是自定义的SourceFunction实现类;

关于Flink的DataSource

官方对DataSource的解释:Sources are where your program reads its input from,即DataSource是应用的数据来源,如下图的两个红框所示:

20200412160322256.png

DataSource类型

对于常见的文本读入、kafka、RabbitMQ等数据来源,可以直接使用Flink提供的API或者connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:

20200412160348802.png

环境和版本

  • 熟练掌握内置DataSource的最好办法就是实战,本次实战的环境和版本如下:
  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

    源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:

20200412160553465.png

环境和版本

  • 本次实战的环境和版本如下:
  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

    创建工程

  • 在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):

    mvn \
    archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.9.2
    
  • 现在maven工程已生成,用IDEA导入这个工程,如下图:
    20200412160709222.png

  • 以maven的类型导入:

20200412160722435.png

  • 导入成功的样子:

20200412160734110.png

  • 项目创建成功,可以开始写代码实战了;

    辅助类Splitter

    实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:
package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
   
   
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
   
   

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
   
   
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {
   
   
            collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

准备完毕,可以开始实战了,先从最简单的Socket开始。

Socket DataSource

  • Socket DataSource的功能是监听指定IP的指定端口,读取网络数据;
  • 在刚才新建的工程中创建一个类Socket.java:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        //每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来
        socketDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("API DataSource demo : socket");
    }
}

从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;

  • 在IDEA上运行Socket类,启动成功后再回到刚才执行nc -lk 9999的控制台,输入一些字符串再回车,可见Socket的功能已经生效:

20200412161153535.png

集合DataSource(generateSequence)

  • 基于集合的DataSource,API如下图所示:

20200412161228924.png

  • 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource:
package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        //通过generateSequence得到Long类型的DataSource
        DataStream<Long> dataStream = env.generateSequence(1, 10);

        //做一次过滤,只保留偶数,然后打印
        dataStream.filter(new FilterFunction<Long>() {
   
   
            @Override
            public boolean filter(Long aLong) throws Exception {
   
   
                return 0L==aLong.longValue()%2L;
            }
        }).print();

        env.execute("API DataSource demo : collection");
    }
}
  • 运行时会打印偶数:

20200412163652386.png

集合DataSource(fromElements+fromCollection)

  • fromElements和fromCollection就在一个类中试了吧,创建FromCollection类,里面是这两个API的用法:
package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class FromCollection {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        //创建一个List,里面有两个Tuple2元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 1));

        //通过List创建DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        //通过多个Tuple2元素创建DataStream
        DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
                new Tuple2("ccc", 1),
                new Tuple2("ddd", 1),
                new Tuple2("aaa", 1)
        );

        //通过union将两个DataStream合成一个
        DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

        //统计每个单词的数量
        unionDataStream
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : collection");
    }
}
  • 运行结果如下:

20200412163756277.png

文件DataSource

  • 下面的ReadTextFile类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {
   
   
    public static void main(String[] args) throws Exception {
   
   
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);

        //用txt文件作为数据源
        DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");

        //统计单词数量并打印出来
        textDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : readTextFile");
    }
}
  • 请确保代码中的绝对路径下存在名为README.txt文件,运行结果如下:

20200412163837210.png

  • 打开StreamExecutionEnvironment.java源码,看一下刚才使用的readTextFile方法实现如下,原来是调用了另一个同名方法,该方法的第三个参数确定了文本文件是一次性读取完毕,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间:

    public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
         
         
          Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
    
          TextInputFormat format = new TextInputFormat(new Path(filePath));
          format.setFilesFilter(FilePathFilter.createDefaultFilter());
          TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
          format.setCharsetName(charsetName);
    
          return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
      }
    
  • 上面的FileProcessingMode是个枚举,源码如下:

    @PublicEvolving
    public enum FileProcessingMode {
         
         
    
      /** Processes the current contents of the path and exits. */
      PROCESS_ONCE,
    
      /** Periodically scans the path for new data. */
      PROCESS_CONTINUOUSLY
    }
    
  • 另外请关注readTextFile方法的filePath参数,这是个URI类型的字符串,除了本地文件路径,还可以是HDFS的地址:hdfs://host:port/file/path

  • 至此,通过直接API创建DataSource的实战就完成了,后面的章节我们继续学习内置connector方式的DataSource;

    欢迎关注阿里云开发者社区博客:程序员欣宸

    学习路上,你不孤单,欣宸原创一路相伴...

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
14天前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
102 0
|
22天前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
22天前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
22天前
|
SQL Java API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
消息中间件 存储 SQL
Flink 数据源 DataSource是这个样子的?(三)
因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:
Flink 数据源 DataSource是这个样子的?(三)
|
消息中间件 存储 缓存
Flink 数据源 DataSource是这个样子的?(二)
因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:
Flink 数据源 DataSource是这个样子的?(二)
|
消息中间件 Kafka API
Flink 数据源 DataSource是这个样子的?(一)
因为本篇文章中,有个 Kafka 数据源的 Demo,在一开始解答小伙伴有可能的困惑:
Flink 数据源 DataSource是这个样子的?(一)
|
12天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
629 0
|
12天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
536 0