Flink教程(06)- Flink批流一体API(Source示例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Flink教程(06)- Flink批流一体API(Source示例)

01 引言

在前面的博客,我们已经对Flink的原理有了一定的了解了,有兴趣的同学可以参阅下:

本文开始讲解Flink程序模型对应的代码,也就是Flink批流一体对应的API,分别对应为:SourceTransformationSlink,本文讲Source

02 Source

Source对应的就是Flink编程模型里面的Data Source数据源:

Flink官网,我们可以知道Source有如下几种类型:

转义为中文即:

  • File-based:基于文件的的Source
  • Socket-based: 基于Socket的Source
  • Collection-based: 基于集合的Source
  • Custom: 自定义Source

2.1 基于集合的Source

相关API(一般用于学习测试时编造数据时使用):

  • env.fromElements(可变参数);
  • env.fromColletion(各种集合);
  • env.generateSequence(开始,结束);
  • env.fromSequence(开始,结束)。

示例代码:

/**
 * 把本地的普通的Java集合/Scala集合变为分布式的Flink的DataStream集合!
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 2:55 下午
 * <p>
 * 1.env.fromElements(可变参数);
 * 2.env.fromColletion(各种集合);
 * 3.env.generateSequence(开始,结束);
 * 4.env.fromSequence(开始,结束);
 */
public class SourceDemo1 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        // * 1.env.fromElements(可变参数);
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        // * 2.env.fromColletion(各种集合);
        DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
        // * 3.env.generateSequence(开始,结束);
        DataStream<Long> ds3 = env.generateSequence(1, 10);
        //* 4.env.fromSequence(开始,结束);
        DataStream<Long> ds4 = env.fromSequence(1, 10);
        //3.Transformation
        //4.sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();
        //5.execute
        env.execute();
    }
}

运行结果:

2.2 基于文件的Source

相关API(一般用于学习测试):

  • env.readTextFile(本地/HDFS文件/文件夹/压缩文件)

示例代码:

/**
 * env.readTextFile(本地/HDFS文件/文件夹/压缩文件)
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 2:59 下午
 */
public class SourceDemo2 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        // * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
        DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
        DataStream<String> ds2 = env.readTextFile("data/input/dir");
        DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
        DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
        //3.Transformation
        //4.sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();
        //5.execute
        env.execute();
    }
}

2.3 基于Socket的Source

需求:在node1上使用nc -lk 9999 向指定端口发送数据(ncnetcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据),如果没有该命令可以下安装:

yum install -y nc

使用Flink编写流处理应用程序实时统计单词数量,代码如下:

/**
 * SocketSource
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 3:02 下午
 */
public class SourceDemo3 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.处理数据-transformation
        //|_____3.1每一行数据按照空格切分成一个个的单词组成一个集合
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的数据
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//将切割处理的一个个的单词收集起来并返回
                }
            }
        });
        //|_____3.2对集合中的每个单词记为1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是进来一个个的单词
                return Tuple2.of(value, 1);
            }
        });
        //|_____3.3对数据按照单词(key)进行分组
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //|_____3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.输出结果-sink
        result.print();
        //5.触发执行-execute
        env.execute();
    }
}

2.4 自定义Source

2.4.1 案例 - 随机生成数据

Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

  • SourceFunction:非并行数据源(并行度只能=1)
  • RichSourceFunction:多功能非并行数据源(并行度只能=1)
  • ParallelSourceFunction:并行数据源(并行度能够>=1)
  • RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) ,Kafka数据源使用的就是该接口。

需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

要求:

  • 随机生成订单ID(UUID)
  • 随机生成用户ID(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间

示例代码:

/**
 * 自定义Source
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 3:08 下午
 * Flink还提供了数据源接口,我们实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
 * SourceFunction:非并行数据源(并行度只能=1)
 * RichSourceFunction:多功能非并行数据源(并行度只能=1)
 * ParallelSourceFunction:并行数据源(并行度能够>=1)
 * RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)--后续学习的Kafka数据源使用的就是该接口
 */
public class SourceDemo4 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.Source
        DataStream<Order> orderDS = env
                .addSource(new MyOrderSource())
                .setParallelism(2);
        //3.Transformation
        //4.Sink
        orderDS.print();
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private String id;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
    public static class MyOrderSource extends RichParallelSourceFunction<Order> {
        private Boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                Thread.sleep(1000);
                String id = UUID.randomUUID().toString();
                int userId = random.nextInt(3);
                int money = random.nextInt(101);
                long createTime = System.currentTimeMillis();
                ctx.collect(new Order(id, userId, money, createTime));
            }
        }
        //取消任务/执行cancle命令的时候执行
        @Override
        public void cancel() {
            flag = false;
        }
    }
}
• 65
• 66

运行结果如下:

2.4.2 案例 - MySQL

需求:实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据。

那么现在先完成一个简单的需求:

  • MySQL中实时加载数据;
  • 要求MySQL中的数据有变化,也能被实时加载出来。

首先准备数据:

CREATE TABLE `t_student` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `name` varchar(255) DEFAULT NULL,
    `age` int(11) DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
INSERT INTO `t_student` VALUES ('6', 'rose', '20');

代码实现如下:

/**
 * 简单的需求:
 * 从MySQL中实时加载数据
 * 要求MySQL中的数据有变化,也能被实时加载出来
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 3:17 下午
 */
public class SourceDemo5 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
        //3.Transformation
        //4.Sink
        studentDS.print();
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
    public static class MySQLSource extends RichParallelSourceFunction<Student> {
        private Connection conn = null;
        private PreparedStatement ps = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            //加载驱动,开启连接
            //Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/big_data", "root", "123456");
            String sql = "select id,name,age from t_student";
            ps = conn.prepareStatement(sql);
        }
        private boolean flag = true;
        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
            while (flag) {
                ResultSet rs = ps.executeQuery();
                while (rs.next()) {
                    int id = rs.getInt("id");
                    String name = rs.getString("name");
                    int age = rs.getInt("age");
                    ctx.collect(new Student(id, name, age));
                }
                TimeUnit.SECONDS.sleep(5);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
        }
    }
}

运行结果:

03 文末

本文主要讲解Flink批流一体API中的Source用法,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
JSON API 数据格式
python 使用 Stable Diffusion API 生成图片示例
本文提供了一个使用Python调用Stable Diffusion API生成图片的示例程序,包括启动API设置、发送POST请求、保存生成的图片和JSON数据,以及如何通过API调用特定模型的说明。
python 使用 Stable Diffusion API 生成图片示例
|
1月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
27天前
|
JavaScript API PHP
一言API搭建教程:搭建属于自己的文言API接口
这篇文章介绍了如何搭建一个属于自己的文言API接口。文章首先介绍了准备工作,包括代码编辑器和两个文件的创建。然后详细说明了如何将代码复制到php文件中并上传至网站根目录。最后给出了一个示例代码来调用文言API接口。整个过程非常简单。
34 1
|
1月前
|
JSON 前端开发 API
【淘系】商品详情属性解析(属性规格详情图sku等json数据示例返回参考),淘系API接口系列
在淘宝(或天猫)平台上,商品详情属性(如属性规格、详情图、SKU等)是商家在发布商品时设置的,用于描述商品的详细信息和不同规格选项。这些信息对于消费者了解商品特性、进行购买决策至关重要。然而,直接通过前端页面获取这些信息的结构化数据(如JSON格式)并非直接暴露给普通用户或开发者,因为这涉及到平台的商业机密和数据安全。 不过,淘宝平台提供了丰富的API接口(如淘宝开放平台API),允许有资质的开发者或合作伙伴通过编程方式获取商品信息。这些API接口通常需要注册开发者账号、申请应用密钥(App Key)和秘钥(App Secret),并遵守淘宝的API使用协议。
|
1月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
71 2
UnityWebRequest教程☀️2021,你还在使用过时的 www API吗?
UnityWebRequest教程☀️2021,你还在使用过时的 www API吗?
|
1月前
|
开发框架 .NET API
在IIS上部署ASP.NET Core Web API和Blazor Wasm详细教程
在IIS上部署ASP.NET Core Web API和Blazor Wasm详细教程
117 3
|
22天前
|
SQL Shell API
python Django教程 之 模型(数据库)、自定义Field、数据表更改、QuerySet API
python Django教程 之 模型(数据库)、自定义Field、数据表更改、QuerySet API
|
24天前
|
API 网络架构 C++
【Azure Key Vault】使用REST API调用Azure Key Vault Secret的示例步骤
【Azure Key Vault】使用REST API调用Azure Key Vault Secret的示例步骤