FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

01 引言


最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。


02 实现


2.1 添加依赖


在工程,除了添加基础的Flink环境依赖,还需要添加flink-connector-kafka的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>


除此,因为Flink把Kafka作为了Source,所以读取的字符串是有解析方式的,本文主要使用的是“json”的方式,因此还需要引入序列化包的,但是flink-connector-kafka已经自带了,所以没必要再引入。


ok,到这里如果我们写好FlinkSQL去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar'这个包,但是也无需引入,因为在flink-connector-kafka里面已经自带了。

23ef8522f2c14ca7a1064665600018b1.png


2.2 Flink SQL


好了,到了关键的FlinkSQL了,该如何写呢?


首先看看Source,也就是我们的Kafka,如下:

CREATE TABLE t_student (
  id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'cdc_user',
  'properties.bootstrap.servers' = '10.194.166.92:9092',
  'properties.group.id' = 'flink-cdc-mysql-kafka',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)


然后Sink输出,我这里需要输出到MySQL

CREATE TABLE t_student_copy (
      id INT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 't_student_copy'
)


最后,使用INSERT INTO声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student


2.3 配置Kafka域名


还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:

unable to connect broker…


这个时候,我们要在跑Flink的程序的服务器配置Kafka的域名,具体在hosts文件里配置

vi /etc/hosts


ok,到这里,只要我们只要使用Kafka工具发送json格式的数据,Flink程序就能实时收到,并写入MySQL数据库。

03 文末


本文主要是记录Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!


附:KafkaTool的使用教程:


目录
相关文章
|
关系型数据库 MySQL 数据库
MySQL命令记录
MySQL编码 1.查看数据库编码格式 show create database <数据库名>; 2.查看数据表的编码格式 show create table <表名>; 3.创建数据库时指定数据库的字符集 create database <数据库名> character set utf8; 4.创建数据表时指定数据表的编码格式 create table tb_books( name varchar(45) not null, price double not null, bookCount int not null,
|
关系型数据库 MySQL 数据库
记录学习MySql常用函数
记录学习MySql常用函数
167 16
|
关系型数据库 MySQL 数据库
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
773 0
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
|
SQL 关系型数据库 MySQL
校招面试题有了!——记录一个有趣的mySql查询
## 前言 最近在做一个内部系统的报表统计功能,遇到了一个麻烦的查询场景,因为对sql语句确实不太熟练,在网上查了一些资料,最终找到了一个解法。具体场景和表结构并不复杂,对sql大佬们来说应该也是小case,不过细想确实有趣,特此记录下来。 ## 问题 已知表A有列:主键id、日期date和标签tag,其中tag是逗号分割的多值字符串(值的集合size不超过25个),如下如示例。请统计某段时间(例
|
存储 SQL 关系型数据库
mysql实现查询分组查询最后一次的记录
mysql实现查询分组查询最后一次的记录
180 0
mysql实现查询分组查询最后一次的记录
|
前端开发 关系型数据库 MySQL
三言两语记录mysql for update锁
另一个连接B 发起普通select请求,正常返回结果 3.连接B 发起select for update请求,由于第一个步骤的事务还没有结束,所以不能获取,会一直堵塞,直到超时 或者锁被释放后返回
201 0
|
运维 关系型数据库 MySQL
mysql安装记录
mysql安装记录
114 0
|
SQL IDE 关系型数据库
mysql——去重并获取最新记录
mysql——去重并获取最新记录
705 0
mysql——去重并获取最新记录
|
存储 SQL 关系型数据库
mysql实现查询分组查询最后一次的记录
mysql实现查询分组查询最后一次的记录
130 0
mysql实现查询分组查询最后一次的记录
|
存储 缓存 Cloud Native