大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,1000CU*H 3个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Sink 的基本概念等内容

Sink的相关信息 配置与使用

Sink案例写入Redis

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。


Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。


Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:


依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。

定义数据源和数据流:创建并处理数据流。

配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。

启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:


批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。

连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。

索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。

数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。

模拟几条数据流,写入到 MySQL中。

package icu.wzk;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkSqlTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Person> data = env.getJavaEnv().fromElements(
                new Person("wzk", 18, 1),
                new Person("icu", 20, 1),
                new Person("wzkicu", 13, 2)
        );
        data.addSink(new MySqlSinkFunction());

        env.execute();
    }

    public static class MySqlSinkFunction extends RichSinkFunction<Person> {

        private PreparedStatement preparedStatement = null;

        private Connection connection = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
            String username = "hive";
            String password = "hive@wzk.icu";
            connection = DriverManager.getConnection(url, username, password);
            String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
            preparedStatement = connection.prepareStatement(sql);
        }

        @Override
        public void invoke(Person value, Context context) throws Exception {
            preparedStatement.setString(1, value.getName());
            preparedStatement.setInt(2, value.getAge());
            preparedStatement.setInt(3, value.getSex());
            preparedStatement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            if (null != connection) {
                connection.close();
            }
            if (null != preparedStatement) {
                preparedStatement.close();
            }
        }
    }

    public static class Person {
        private String name;
        private Integer age;
        private Integer sex;

        public Person() {

        }

        public Person(String name, Integer age, Integer sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public Integer getSex() {
            return sex;
        }

        public void setSex(Integer sex) {
            this.sex = sex;
        }
    }
}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。

运行代码

我们运行代码,等待运行结束。

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。

案例:写入到Kafka

编写代码

package icu.wzk;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class SinkKafkaTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
        String brokerList = "h121.wzk.icu:9092";
        String topic = "flink_test";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
        data.addSink(producer);
        env.execute("SinkKafkaTest");
    }

}

运行代码

启动一个 nc

nc -lk 9999
• 1

我们通过回车的方式,可以发送数据。

Java 程序中等待

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test -

可以看到刚才的数据已经写入了:

目录
相关文章
|
8月前
|
存储 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
本文全面剖析数据库课程设计 MySQL,展现其奇幻魅力与严峻挑战。通过实际案例凸显数据库设计重要性,详述数据安全要点及学习目标。深入阐述备份与恢复方法,并分享优秀实践项目案例。为开发者提供 MySQL 数据库课程设计的全面指南,助力提升数据库设计与管理能力,保障数据安全稳定。
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
|
7月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
9月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
897 0
|
7月前
|
存储 关系型数据库 MySQL
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
本文详细介绍了在 MySQL 中创建数据库和表的方法。包括安装 MySQL、用命令行和图形化工具创建数据库、选择数据库、创建表(含数据类型介绍与选择建议、案例分析、最佳实践与注意事项)以及查看数据库和表的内容。文章专业、严谨且具可操作性,对数据管理有实际帮助。
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
|
8月前
|
关系型数据库 MySQL 数据安全/隐私保护
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
本文深入探讨数据库课程设计 MySQL 的数据安全。以医疗、电商、企业案例,详述用户管理、数据加密、备份恢复及网络安全等措施,结合数据安全技术发展趋势,与《大数据新视界 -- 大数据大厂之 MySQL 数据库课程设计》紧密关联,为 MySQL 数据安全提供全面指南。
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
|
8月前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
8月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
8月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
10月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2374 45
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
745 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成

热门文章

最新文章

推荐镜像

更多