Flink(四)【DataStream API - Source算子】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(四)【DataStream API - Source算子】

前言

       今天开始学习 DataStream 的 API ,这一块是 Flink 的核心部分,我们不去学习 DataSet 的 API 了,因为从 Flink 12 开始已经实现了流批一体, DataSet 已然是被抛弃了。忘记提了,从这里开始,我开始换用 Flink 17 了。

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成:

  1. 获取执行环境(execution environment)
  2. 读取数据源(source)
  3. 定义基于数据的转换操作(transformations)
  1. 定义计算结果的输出位置(sink)
  2. 触发程序执行(execute)

其中,获取环境和触发执行,都可以认为是针对执行环境的操作。

1、执行环境(Execution Environment)

不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

1.1、创建执行环境

1、getExecutionEnvironment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这是最简单高效的一种方式了,它可以自己根据环境的信息去判断。

我们也可以给它传递一个 Configuration 对象作为参数,这样我们可以设置运行时的一些配置,比如端口号等。

Configuration conf = new Configuration();
        conf.set(RestOptions.BIND_PORT,"8082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

这里我们设置端口号为 8082 ,这样我们在默认的 8081 端口就无法访问 Web UI 了,只能通过 8082 端口来访问。

2、createLocalEnvironment

这种方式了解即可,它是用来创建一个本地的模拟集群环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

3、createRemoteEnvironment

这种方式同样了解即可,因为配置起来比较繁琐,我们既然是在集群下运行了,一般都是把代码打包成 jar 去执行,不会把配置信息写死的。

StreamExecutionEnvironment.createRemoteEnvironment("hadoop102",8081,"/opt/module/xxx.jar");

1.2、执行模式(Execution Mode)

默认的执行模式就是 Streaming 模式。

1、batch 模式

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

2、streaming 模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

3、自动模式

前两种方式都过于死板,打包后的程序都不能修改,所以我们一般不明确指定执行模式到底是 流处理 还是 批处理,而是执行时通过命令行来配置:

bin/flink run -Dexecution.runtime-mode=BATCH ...

1.3、触发程序执行

默认执行方式

       Flink 是事件驱动型的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)但是这个返回对象我们一般不怎么用,而且这个返回结果在程序运行完才会返回。

 默认 env.execute() 触发生成一个 Flink Job。

env.execute();

异步执行方式

  极少情况下,可能我们一套代码中有两部分处理逻辑,比如 env.execute() 之后,又进行了一些操作然后再进行 execute() ,但在 main 线程中是会阻塞的,这就需要启动一个异步的 execute() 方法。

     executeAsync() 会触发执行多个 Flink Job。

1. env.execute();
2. 
3. // 其他处理代码...
4. 
5. env.executeAsync();

2、源算子(Source)

2.1、准备工作

写一个 Java Bean,注意类的属性序列化问题(这里我们的属性都是一些基本类型,Flink 是支持对它进行序列化的),Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。

import java.util.Objects;
public class WaterSensor {
    public String id;
    public Long ts;
    public Integer vc;
    public WaterSensor(){}
    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);
    }
    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public Long getTs() {
        return ts;
    }
    public void setTs(Long ts) {
        this.ts = ts;
    }
    public Integer getVc() {
        return vc;
    }
    public void setVc(Integer vc) {
        this.vc = vc;
    }
}

2.2、从集合中读取

和 Spark 一样,集合类型我们一般只在测试的时候使用。

主要方法就是 fromCollection 或者 fromElements 。

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WaterSensor sensor1 = new WaterSensor("1",1L,1);
        WaterSensor sensor2 = new WaterSensor("2",2L,2);
        // 从集合读取数据
        DataStreamSource<WaterSensor> source = env
//                .fromElements(sensor1,sensor2); //直接填写元素
                .fromCollection(Arrays.asList(sensor1,sensor2));   // 从集合读取数据
        source.print();
        env.execute();
    }
}

2.3、从文件中读取

读取文件,需要添加文件连接器依赖:

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>

新的 Source 读取语法:

env.fromSource(Source的实现类,Watermark,source名称)

示例:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 从文件中读取
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                new TextLineInputFormat(),
                new Path("input/words.txt")
        ).build();
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"fileSource").print();
        env.execute();
    }
}

2.4、从 Socket 读取数据

这种方式同样常用于模拟流数据,稳定性较差,通常用来测试。

DataStream<String> stream = env.socketTextStream("localhost", 9999);

2.5、从 Kafka 读取数据

实际开发也是用 Kafka 来读取的,我们的实时流数据都是由 Kafka 来做收集和传输的。

导入依赖:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka</artifactId>
 <version>${flink.version}</version>
</dependency>

案例

package com.lyh.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 从 Kafka 读取
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")    //指定kafka地址和端口
                .setGroupId("lyh")  // 指定消费者组id
                .setTopics("like")  // 指定消费的topic,可以是多个用List<String>
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因为kafka是生产者 flink作为消费者要反序列化
                .setStartingOffsets(OffsetsInitializer.latest())    // 指定flink消费kafka的策略
                .build();
        env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"kafkaSource")
                .print();
        env.execute();
    }
    /*
     *  kafka 消费者的参数: 
     *      auto.reset.offsets:
     *          earliest: 如果有offset,从offset继续消费;如果没有 就从 最早 消费
     *          latest  : 如果有offset,从offset继续消费;如果没有 就从 最新 消费
     * flink 的 kafkaSource offset消费者策略: offsetsInitializer,默认是 earliest
     *      earliest: 一定从 最早 消费 (不管有没有offset) 
     *      latest  : 一定从 最新 消费 (不管有没有offset)
     */
}

启动 Kafka 集群(需要先启动 zookeeper)

使用命令行生产者生产消息:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic like

2.6、从数据生成器读取数据

导入依赖:

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-datagen</artifactId>
            <version>${flink.version}</version>
        </dependency>

案例

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
public class DataGeneratorDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        /**
         * 数据生成器参数说明:
         *  1. GeneratorFunction接口,需要重写 map 方法,输入类型必须是Long
         *  2. Long类型, 自动生成的数字序列(从0自增)的最大值
         *  3. 限速策略, 比如每秒生成几条数据
         *  4. 返回的数据类型
         */
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "number: " + value;
                    }
                },
                10L,
                RateLimiterStrategy.perSecond(1),
                Types.STRING
        );
        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(),"dataGenerator")
                .print();
        env.execute();
    }
}

运行效果:

number: 0
number: 1
number: 2
number: 3
number: 4
number: 5
number: 6
number: 7
number: 8
number: 9
Process finished with exit code 0

如果想达到无界流的效果,直接给数据生成器的第二个参数传一个 Long.MAX_VALUE。

假如我们的第二个参数设置为100(意味着从0自增到99)。如果并行度为3,那么第二个线程将从100的1/3处(即34)开始累加,第三个线程将从100的2/3(即67)开始累加。

Flink 支持的数据类型

       这里主要说泛型类型和类型提示,别的类型比如我们基本的数据类型及其包装类型和String(引用类型)、基本类型数组、对象数组、复合数据类型(Flink 内置的 Tuple0~Tuple25),辅助类型Option、Either、List、Map等,还有 POJO 类型,Flink 的 TypeInfomation 类型都已经为我们封装好了,它为每个数据类型生成了特定的序列化、反序列化器和比较器。

泛型

Flink 支持所有的 Java 类和 Scala 类。但如果没有按照 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。

在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。

Flink 对 POJO 类型的要求如下:

⚫ 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);

⚫ 类有一个公共的无参构造方法;

⚫ 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范。所以我们上面的 WaterSensor,就是我们创建的符合 Flink POJO 定义的数据类型。

类型提示(Type Hints)

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,它是不可靠的;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

为了解决这类问题,Java API 提供了专门的“类型提示”(type hints)。回忆一下之前的 word count 流处理程序,我们在将 String 类型的每个词转换成(word,count)二元组后,就明确地用 returns 指定了返回的类型。因为对于 map 里传入的 Lambda 表达式,系统只能推断出返回的是 Tuple2 类型,而无法得到 Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

下面给出两种写法:

DataStreamSource<String> lineDS = env.socketTextStream("hadoop102",9999);
        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        })
                //.returns(Types.TUPLE(Types.STRING, Types.LONG));
                .returns(new TypeHint<Tuple2<String, Long>>() {});  //也可以这样写

这是一种比较简单的场景,二元组的两个元素都是基本数据类型。那如果元组中的一个元素又有泛型,该怎么处理呢?

Flink 专门提供了 TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的 DataStream 里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在一个任务中使用多个source
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在自定义RichSinkFunction中,如何获取source的schema
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
5月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
251 0
|
1天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应
|
11天前
|
编解码 监控 API
直播源怎么调用api接口
调用直播源的API接口涉及开通服务、添加域名、获取API密钥、调用API接口、生成推流和拉流地址、配置直播源、开始直播、监控管理及停止直播等步骤。不同云服务平台的具体操作略有差异,但整体流程简单易懂。
|
24天前
|
人工智能 自然语言处理 PyTorch
Text2Video Huggingface Pipeline 文生视频接口和文生视频论文API
文生视频是AI领域热点,很多文生视频的大模型都是基于 Huggingface的 diffusers的text to video的pipeline来开发。国内外也有非常多的优秀产品如Runway AI、Pika AI 、可灵King AI、通义千问、智谱的文生视频模型等等。为了方便调用,这篇博客也尝试了使用 PyPI的text2video的python库的Wrapper类进行调用,下面会给大家介绍一下Huggingface Text to Video Pipeline的调用方式以及使用通用的text2video的python库调用方式。