大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节完成了如下的内容:


Flink YARN 模式

YARN模式下申请资源

YARN模式下提交任务

bd13341d95f6b43a8e7ccf524000989a_2ca4b50887d642ab91eb019f03f107af.png DataStream API

主要分为3块:

● DataSource:程序的数据源输入,可以通过StreamExecutionEnvironment.addSource为程序添加数据源

● Transformation:具体的操作,它对一个或者多个输入源进行计算处理,比如Map、FlatMap、Filter操作等

● Sink:程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中


Flink针对DataStream提供了大量已经实现的DataSource(数据源接口)。

下面来进行分析。


基于文件

readTextFile(path):读取本地文件,文件遵循TextInputFormat逐行读取规则并返回

如果你是本地IDEA要读取HDFS,那你需要额外的依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  <version>1.11.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.9.2</version>
</dependency>

基于Socket

socketTextStream:从Socket中读取数据,元素可以通过一个分割符号分开。


基于集合

fromCollection:通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

如果满足一下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用)


该类是共有且独立的(没有非静态内部类)

该类有共有的无参构造方法

类(及父类)中所有的不被static、transient修饰的属性要么是公有的且不被final修饰,要么是包含公有的Getter和Setter方法,这些方法遵循JavaBean的命名规范。

编写代码

编写的代码如下:

package icu.wzk;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class StreamFromCollection {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<People> peopleList = new ArrayList<>();
        peopleList.add(new People("wzk", 18));
        peopleList.add(new People("icu", 15));
        peopleList.add(new People("wzkicu", 10));

        DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People value) throws Exception {
                return value.getAge() > 15;
            }
        });
        filtered.print();
        env.execute("StreamFromCollection");
    }

    public static class People {

        private String name;
        private Integer age;

        public People() {

        }

        public People(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }
    }

}

运行结果

运行结果如下图所示:

toString

我们可以通过重写 People 的 toString() 方法,来打印内容:

@Override
public String toString() {
    return "name: " + this.name + ", age: " + this.age;
}

重新运行

重新运行可以看到:

自定义输入

可以使用 StreamExecutionEnvironment.addSource()将一个数据源添加到程序中。

Flink提供了许多预先实现的源函数,但是也可以编写自己的自定义源,方法是非并行源:implements SourceFunction,或者为并行源 implements ParallelSourceFuction接口,或者 extends RichParallelSourceFunction

Flink也提供了一些内置的 Connector(连接器),如下表列了几个主要的:

d12d79b72b7eca1e72956d3975f0e753_c45ee2d9194d4fe093e44cdf13ceed4b.png

Kafka连接器

添加依赖

我们需要继续添加依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.1</version>
</dependency>

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class StreamFromKafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        // Kafka
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }

}

启动Kafka

我们需要启动 Kafka 的服务来进行测试,之前章节我们已经配置和启动过Kafka了,这里就是直接启动了。

cd /opt/servers/kafka_2.12-2.7.2/bin
./kafka-server-start.sh ../config/server.properties

启动结果如下图所示:

创建主题

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test

生产消息

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test
# 我们等Java程序启动后,产生几条消息

运行代码

观察控制台可以看到:

3> (hello,1)
5> (world,1)
3> (hello,2)
5> (world,2)
3> (hello,3)
3> (hello,4)
2> (hello!,1)
2> (hello!,2)
...

运行的截图如下所示:

目录
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
44 0
|
1月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
134 0
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
46 0
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
130 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
271 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
135 0