Flink教程(08)- Flink批流一体API(Sink示例)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink教程(08)- Flink批流一体API(Sink示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Transformation使用有了一定的了解了,有兴趣的同学可以参阅下:

本文开始继续讲解Flink程序模型对里面的Sink

02 Sink

贴上一张官方对于sink的描述:

可以看到sink分为如下几类:

  • writeAsText():基于文件的sink
  • print()/printToErr():基于控制台的sink
  • addSink:自定义的sink

2.1 基于控制台和文件的Sink

2.1.1 API

API如下:

  • ds.print :直接输出到控制台
  • ds.printToErr(): 直接输出到控制台,用红色
  • ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)

注意:在输出到path的时候,可以在前面设置并行度,如果

  • 并行度>1,则path为目录
  • 并行度=1,则path为文件名

2.1.2 示例代码

/**
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:29 下午
 * <p>
 * 1.ds.print 直接输出到控制台
 * 2.ds.printToErr() 直接输出到控制台,用红色
 * 3.ds.collect 将分布式数据收集为本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
 */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");
        //3.transformation
        //4.sink
        ds.print();
        ds.printToErr();
        ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        //注意:
        //Parallelism=1为文件
        //Parallelism>1为文件夹
        //5.execute
        env.execute();
    }
}

可以看到生成了两个文件:

2.2 自定义Sink

需求:将Flink集合中的数据通过自定义Sink保存到MySQL;

示例代码如下:

/**
 * sink-custom
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:34 下午
 */
public class SinkDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.fromElements(new Student(null, "jim", 18));
        //3.Transformation
        //4.Sink
        studentDS.addSink(new MySQLSink());
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
    public static class MySQLSink extends RichSinkFunction<Student> {
        private Connection conn = null;
        private PreparedStatement ps = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            //加载驱动,开启连接
            //Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/big_data", "root", "123456");
            String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void invoke(Student value, Context context) throws Exception {
            //给ps中的?设置具体值
            ps.setString(1, value.getName());
            ps.setInt(2, value.getAge());
            //执行sql
            ps.executeUpdate();
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
        }
    }
}

查看数据库,可以看到添加了一条数据:

04 文末

本文主要讲解Flink批流一体API中的Sink用法,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
10天前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
313 12
Flink CDC YAML:面向数据集成的 API 设计
|
4天前
|
XML 数据挖掘 API
1688商品详情数据示例参考,1688API接口系列
在成长的路上,我们都是同行者。这篇关于详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦!
|
1月前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
29天前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
39 12
|
4月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
106 0
|
3天前
|
JSON 安全 API
亚马逊商品详情API接口的使用方法和示例返回
亚马逊商品详情API接口是亚马逊为开发者提供的一个重要工具,它允许开发者通过编程方式获取亚马逊平台上的商品详细信息。以下是该接口的使用方法和示例返回的简要说明
|
1月前
|
XML JSON API
淘宝商品详情(item get)API接口系列,示例说明参考
淘宝商品详情(item_get)API接口是淘宝开放平台(Taobao Open Platform)提供的一个重要接口,允许开发者通过HTTP请求获取淘宝商品的详细信息。以下是对该接口系列的示例说明参考
|
2月前
|
JSON API 数据安全/隐私保护
淘宝评论API接口操作步骤详解,代码示例参考
淘宝评论API接口是淘宝开放平台提供的一项服务,通过该接口,开发者可以访问商品的用户评价和评论。这些评论通常包括评分、文字描述、图片或视频等内容。商家可以利用这些信息更好地了解消费者的需求和偏好,优化产品和服务。同时,消费者也可以从这些评论中获得准确的购买参考,做出更明智的购买决策。
|
2月前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
68 11
|
3月前
|
API 开发工具 开发者
探究亚马逊国际获得AMAZON商品详情 API 接口功能、作用与实际应用示例
亚马逊提供的Amazon Product Advertising API或Selling Partner API,使开发者能编程访问亚马逊商品数据,包括商品标题、描述、价格等。支持跨境电商和数据分析,提供商品搜索和详情获取等功能。示例代码展示了如何使用Python和boto3库获取特定商品信息。使用时需遵守亚马逊政策并注意可能产生的费用。

热门文章

最新文章