Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)

                                                                                 

                       星光下的赶路人star的个人主页

                  世间真正温煦的春色,都熨帖着大地,潜伏在深谷


文章目录



1、输出算子(Sink)


Flink作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。


1.1 连接到外部系统


Flink的DataStream API专门提供了向外部提供写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));
• 1

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12开始,同样重构了Sink架构,

stream.sinkTo(…)
• 1

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。


1.2 输出到文件


Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:


行编码: FileSink.forRowFormat(basePath,rowEncoder)。

批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);
        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );
        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
        // 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();
        dataGen.sinkTo(fieSink);
        env.execute();
    }
}

1.3 输出到Kafka


(1)添加Kafka 连接器依赖

由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

(2)启动Kafka集群

(3)编写输出到Kafka的示例代码

输出无key的record:

public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);
        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                // 指定序列化器:指定Topic名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写到kafka的一致性级别: 精准一次、至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("atguigu-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
                .build();
        sensorDS.sinkTo(kafkaSink);
        env.execute();
    }
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);
        /**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<String>() {
                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        sensorDS.sinkTo(kafkaSink);
        env.execute();
    }
}

运行代码,在Linux主机启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
• 1


1.4 输出到MySQL(JDBC)


(1)添加依赖

<!--mysql驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.0-1.17</version>
</dependency>

(2)启动MySQL,在test库下建表

CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)输出到MySQL的示例代码

public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());
        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        sensorDS.addSink(jdbcSink);
        env.execute();
    }
}

4)运行代码,用客户端连接MySQL,查看是否成功写入数据。


1.4 自定义Sink输出


如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());
• 1

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

                                                                                     

                                                                        您的支持是我创作的无限动力

                                                                                     

                      希望我能为您的未来尽绵薄之力

                                                                                     

                    如有错误,谢谢指正若有收获,谢谢赞美

目录
打赏
0
0
1
0
60
分享
相关文章
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
53 28
MySQL底层概述—4.InnoDB数据文件
本文介绍了InnoDB表空间文件结构及其组成部分,包括表空间、段、区、页和行。表空间是最高逻辑层,包含多个段;段由若干个区组成,每个区包含64个连续的页,页用于存储多条行记录。文章还详细解析了Page结构,分为通用部分(文件头与文件尾)、数据记录部分和页目录部分。此外,文中探讨了行记录格式,包括四种行格式(Redundant、Compact、Dynamic和Compressed),重点介绍了Compact行记录格式及其溢出机制。最后,文章解释了不同行格式的特点及应用场景,帮助理解InnoDB存储引擎的工作原理。
MySQL底层概述—4.InnoDB数据文件
MySQL补充性文件
通过以上内容,您可以全面了解和掌握 MySQL 补充性文件的配置、查看及其作用,从而提升数据库管理的效率和质量。
64 36
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
204 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。
解决MySQL删除/var/lib/mysql下的所有文件后无法启动的问题
删除 `/var/lib/mysql` 下的所有文件后,需要重新初始化数据目录,确保正确的权限设置,并重新启动 MySQL 服务。通过按照上述步骤操作,可以解决 MySQL 无法启动的问题,并恢复数据库的正常运行。初始化数据目录后,别忘了配置安全设置,并根据需要恢复备份数据。这些步骤不仅能够恢复 MySQL 的正常运行,还能确保数据库的安全性和完整性。
158 2
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。
【赵渝强老师】MySQL的参数文件
MySQL启动时会读取配置文件my.cnf来确定数据库文件位置及初始化参数。该文件分为Server和Client两部分,包含动态与静态参数。动态参数可在运行中通过命令修改,而静态参数需修改my.cnf并重启服务生效。文中还提供了相关代码示例和视频教程。
125 6
【赵渝强老师】MySQL的全量日志文件
MySQL全量日志记录所有操作的SQL语句,默认禁用。启用后,可通过`show variables like %general_log%检查状态,使用`set global general_log=ON`临时开启,执行查询并查看日志文件以追踪SQL执行详情。
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
91 2
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等