FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

01 引言

最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

02 实现

2.1 添加依赖

在工程,除了添加基础的Flink环境依赖,还需要添加flink-connector-kafka的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>

除此,因为FlinkKafka作为了Source,所以读取的字符串是有解析方式的,本文主要使用的是“json”的方式,因此还需要引入序列化包的,但是flink-connector-kafka已经自带了,所以没必要再引入。

ok,到这里如果我们写好FlinkSQL去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar'这个包,但是也无需引入,因为在flink-connector-kafka里面已经自带了。

为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”?因为如果我们采用Flink On Yarn的方式部署时,这两个包是需要放到HDFS的,如下:

2.2 Flink SQL

好了,到了关键的FlinkSQL了,该如何写呢?

首先看看Source,也就是我们的Kafka,如下:

CREATE TABLE t_student (
  id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'cdc_user',
  'properties.bootstrap.servers' = '10.194.166.92:9092',
  'properties.group.id' = 'flink-cdc-mysql-kafka',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

然后Sink输出,我这里需要输出到MySQL

CREATE TABLE t_student_copy (
      id INT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 't_student_copy'
)

最后,使用INSERT INTO声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student

2.3 配置Kafka域名

还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:

unable to connect broker…

这个时候,我们要在跑Flink的程序的服务器配置Kafka的域名,具体在hosts文件里配置:

vi /etc/hosts

ok,到这里,只要我们只要使用Kafka工具发送json格式的数据,Flink程序就能实时收到,并写入MySQL数据库。

03 文末

本文主要是记录Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!

附:KafkaTool的使用教程

目录
相关文章
|
2月前
|
缓存 关系型数据库 MySQL
MySQL索引策略与查询性能调优实战
在实际应用中,需要根据具体的业务需求和查询模式,综合运用索引策略和查询性能调优方法,不断地测试和优化,以提高MySQL数据库的查询性能。
225 66
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
108 5
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
260 0
|
2月前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
413 1
|
3月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
134 5
|
3月前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
149 1
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
62 3
|
2月前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
189 0
|
4月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
98 2
zabbix agent集成percona监控MySQL的插件实战案例