大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Sink 的基本概念等内容

Sink的相关信息 配置与使用

Sink案例写入Redis

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。


Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包括 MySQL。在使用 JDBC Sink 时,需要提供数据库连接信息和 SQL 语句,通过这些信息,Flink 将数据流中的记录插入或更新到 MySQL 表中。


Flink 到 MySQL 的基本步骤

将数据流写入 MySQL 的步骤主要包括以下几点:


依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常需要配置 Maven 或 Gradle。

定义数据源和数据流:创建并处理数据流。

配置 JDBC Sink:提供数据库的连接信息和插入 SQL 语句。

启动任务:将数据流写入 MySQL。

优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:


批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提升写入性能。

连接池:对于高并发的写入操作,建议使用连接池来减少数据库连接开销。

索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速度,因此需要权衡。

数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。

案例:流数据下沉到MySQL

添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

编写代码

一个Person的类,对应MySQL中的一张表的字段。

模拟几条数据流,写入到 MySQL中。

package icu.wzk;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkSqlTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Person> data = env.getJavaEnv().fromElements(
                new Person("wzk", 18, 1),
                new Person("icu", 20, 1),
                new Person("wzkicu", 13, 2)
        );
        data.addSink(new MySqlSinkFunction());

        env.execute();
    }

    public static class MySqlSinkFunction extends RichSinkFunction<Person> {

        private PreparedStatement preparedStatement = null;

        private Connection connection = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
            String username = "hive";
            String password = "hive@wzk.icu";
            connection = DriverManager.getConnection(url, username, password);
            String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
            preparedStatement = connection.prepareStatement(sql);
        }

        @Override
        public void invoke(Person value, Context context) throws Exception {
            preparedStatement.setString(1, value.getName());
            preparedStatement.setInt(2, value.getAge());
            preparedStatement.setInt(3, value.getSex());
            preparedStatement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            if (null != connection) {
                connection.close();
            }
            if (null != preparedStatement) {
                preparedStatement.close();
            }
        }
    }

    public static class Person {
        private String name;
        private Integer age;
        private Integer sex;

        public Person() {

        }

        public Person(String name, Integer age, Integer sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public Integer getSex() {
            return sex;
        }

        public void setSex(Integer sex) {
            this.sex = sex;
        }
    }
}

数据库配置

我们新建一张表出来,person表,里边有我们需要的字段。

运行代码

我们运行代码,等待运行结束。

查看结果

查看数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。

案例:写入到Kafka

编写代码

package icu.wzk;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class SinkKafkaTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
        String brokerList = "h121.wzk.icu:9092";
        String topic = "flink_test";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
        data.addSink(producer);
        env.execute("SinkKafkaTest");
    }

}

运行代码

启动一个 nc

nc -lk 9999
• 1

我们通过回车的方式,可以发送数据。

Java 程序中等待

查看结果

我们登录到服务器查看信息

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test -

可以看到刚才的数据已经写入了:

目录
相关文章
|
2月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
87 0
|
6天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
258 2
探索Flink动态CEP:杭州银行的实战案例
|
12天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
97 27
|
8天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
48 16
|
4天前
|
消息中间件 运维 大数据
道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展
阿里云云消息队列 Kafka 版 Serverless 系列凭借其卓越的弹性能力,为道旅科技提供了灵活高效的数据流处理解决方案。无论是应对突发流量还是规划长期资源需求,该方案均能帮助企业实现资源动态调整和成本优化,同时保障业务的高可用性和连续性。
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
54 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
42 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
62 1
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
62 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
71 0

热门文章

最新文章