FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

01 引言

最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

02 实现

2.1 添加依赖

在工程,除了添加基础的Flink环境依赖,还需要添加flink-connector-kafka的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>

除此,因为FlinkKafka作为了Source,所以读取的字符串是有解析方式的,本文主要使用的是“json”的方式,因此还需要引入序列化包的,但是flink-connector-kafka已经自带了,所以没必要再引入。

ok,到这里如果我们写好FlinkSQL去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar'这个包,但是也无需引入,因为在flink-connector-kafka里面已经自带了。

为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”?因为如果我们采用Flink On Yarn的方式部署时,这两个包是需要放到HDFS的,如下:

2.2 Flink SQL

好了,到了关键的FlinkSQL了,该如何写呢?

首先看看Source,也就是我们的Kafka,如下:

CREATE TABLE t_student (
  id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'cdc_user',
  'properties.bootstrap.servers' = '10.194.166.92:9092',
  'properties.group.id' = 'flink-cdc-mysql-kafka',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

然后Sink输出,我这里需要输出到MySQL

CREATE TABLE t_student_copy (
      id INT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 't_student_copy'
)

最后,使用INSERT INTO声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student

2.3 配置Kafka域名

还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:

unable to connect broker…

这个时候,我们要在跑Flink的程序的服务器配置Kafka的域名,具体在hosts文件里配置:

vi /etc/hosts

ok,到这里,只要我们只要使用Kafka工具发送json格式的数据,Flink程序就能实时收到,并写入MySQL数据库。

03 文末

本文主要是记录Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!

附:KafkaTool的使用教程

目录
相关文章
|
29天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
96 0
|
2天前
|
安全 关系型数据库 MySQL
node实战——后端koa结合jwt连接mysql实现权限登录(node后端就业储备知识)
node实战——后端koa结合jwt连接mysql实现权限登录(node后端就业储备知识)
11 3
|
2天前
|
SQL 关系型数据库 MySQL
不允许你不知道的 MySQL 优化实战(一)
不允许你不知道的 MySQL 优化实战(一)
10 2
|
5天前
|
关系型数据库 MySQL 中间件
【MySQL实战笔记】07 | 行锁功过:怎么减少行锁对性能的影响?-02 死锁和死锁检测
【4月更文挑战第19天】在高并发环境下,死锁发生在多个线程间循环等待资源时,导致无限期等待。MySQL中,死锁可通过`innodb_lock_wait_timeout`参数设置超时或`innodb_deadlock_detect`开启死锁检测来解决。默认的50s超时可能不适用于在线服务,而频繁检测会消耗大量CPU。应对热点行更新引发的性能问题,可以暂时关闭死锁检测(风险是产生大量超时),控制并发度,或通过分散记录减少锁冲突,例如将数据分拆到多行以降低死锁概率。
19 1
|
8天前
|
SQL 关系型数据库 MySQL
Python与MySQL数据库交互:面试实战
【4月更文挑战第16天】本文介绍了Python与MySQL交互的面试重点,包括使用`mysql-connector-python`或`pymysql`连接数据库、执行SQL查询、异常处理、防止SQL注入、事务管理和ORM框架。易错点包括忘记关闭连接、忽视异常处理、硬编码SQL、忽略事务及过度依赖低效查询。通过理解这些问题和提供策略,可提升面试表现。
29 6
|
15天前
|
存储 关系型数据库 MySQL
【MySQL实战笔记】 04 | 深入浅出索引(上)-02
【4月更文挑战第9天】InnoDB数据库使用B+树作为索引模型,其中主键索引的叶子节点存储完整行数据,非主键索引则存储主键值。主键查询只需搜索一棵树,而非主键查询需两次搜索,因此推荐使用主键查询以提高效率。在插入新值时,B+树需要维护有序性,可能导致数据页分裂影响性能。自增主键在插入时可避免数据挪动和页分裂,且占用存储空间小,通常更为理想。然而,如果场景仅需唯一索引,可直接设为主键以减少查询步骤。
16 1
【MySQL实战笔记】 04 | 深入浅出索引(上)-02
|
17天前
|
存储 SQL 关系型数据库
【MySQL实战笔记】03.事务隔离:为什么你改了我还看不见?-02
【4月更文挑战第7天】数据库通过视图实现事务隔离,不同隔离级别如读未提交、读已提交、可重复读和串行化采用不同策略。以可重复读为例,MySQL使用多版本并发控制(MVCC),每个事务有其独立的视图。回滚日志在无更早视图时被删除。长事务可能导致大量存储占用,应避免。事务启动可显式用`begin`或设置`autocommit=0`,但后者可能意外开启长事务。建议使用`autocommit=1`并显式管理事务,若需减少交互,可使用`commit work and chain`。
30 5
|
19天前
|
SQL 存储 关系型数据库
【MySQL实战笔记】02.一条SQL更新语句是如何执行的-2
【4月更文挑战第5天】两阶段提交是为确保`redo log`和`binlog`逻辑一致,避免数据不一致。若先写`redo log`, crash后数据可能丢失,导致恢复后状态错误;若先写`binlog`,crash则可能导致重复事务,影响数据库一致性。一天一备相较于一周一备,能缩短“最长恢复时间”,但需权衡额外的存储成本。
16 1
|
29天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)(一)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)
30 0
|
1月前
|
存储 Kubernetes 关系型数据库
KubeSphere 核心实战之一【在kubesphere平台上部署mysql】(实操篇 1/4)
KubeSphere 核心实战之一【在kubesphere平台上部署mysql】(实操篇 1/4)
48 0

热门文章

最新文章