FlinkSQL 实时采集Kafka内容到MySQL(实战记录)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。

01 引言


最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关键的点,细节不再描述,希望能帮助到大家。


02 实现


2.1 添加依赖


在工程,除了添加基础的Flink环境依赖,还需要添加flink-connector-kafka的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>


除此,因为Flink把Kafka作为了Source,所以读取的字符串是有解析方式的,本文主要使用的是“json”的方式,因此还需要引入序列化包的,但是flink-connector-kafka已经自带了,所以没必要再引入。


ok,到这里如果我们写好FlinkSQL去启动,直接就会一闪而退了,为什么呢?因为我们缺少了’ kafka-clients-2.1.0.jar'这个包,但是也无需引入,因为在flink-connector-kafka里面已经自带了。

23ef8522f2c14ca7a1064665600018b1.png


2.2 Flink SQL


好了,到了关键的FlinkSQL了,该如何写呢?


首先看看Source,也就是我们的Kafka,如下:

CREATE TABLE t_student (
  id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'cdc_user',
  'properties.bootstrap.servers' = '10.194.166.92:9092',
  'properties.group.id' = 'flink-cdc-mysql-kafka',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)


然后Sink输出,我这里需要输出到MySQL

CREATE TABLE t_student_copy (
      id INT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://127.0.0.1:3306/big_data',
       'username' = 'root',
       'password' = '123456',
       'table-name' = 't_student_copy'
)


最后,使用INSERT INTO声明如何写入:

INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student


2.3 配置Kafka域名


还有一点需要注意的是,当我们跑Flink的程序的时候,会出现类似如下错误:

unable to connect broker…


这个时候,我们要在跑Flink的程序的服务器配置Kafka的域名,具体在hosts文件里配置

vi /etc/hosts


ok,到这里,只要我们只要使用Kafka工具发送json格式的数据,Flink程序就能实时收到,并写入MySQL数据库。

03 文末


本文主要是记录Kafka如何实时写入到MySQL的一些坑点,完整源码就不贴出来了,希望能给大家一点启示并帮助到大家,谢谢大家的阅读,本文完!


附:KafkaTool的使用教程:


目录
相关文章
|
关系型数据库 MySQL 数据库
记录学习MySql常用函数
记录学习MySql常用函数
188 16
|
关系型数据库 MySQL 数据库
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
1077 0
mysql:添加时间列自动更新时间(用于记录创建时间和修改时间)
|
存储 SQL 关系型数据库
mysql实现查询分组查询最后一次的记录
mysql实现查询分组查询最后一次的记录
209 0
mysql实现查询分组查询最后一次的记录
|
SQL IDE 关系型数据库
mysql——去重并获取最新记录
mysql——去重并获取最新记录
780 0
mysql——去重并获取最新记录
|
存储 SQL 关系型数据库
mysql实现查询分组查询最后一次的记录
mysql实现查询分组查询最后一次的记录
162 0
mysql实现查询分组查询最后一次的记录
|
存储 缓存 Cloud Native
|
存储 SQL 关系型数据库
【MySQL】数据库进阶之触发器内容详解
文章目录 1 触发器概述 2 触发器的基本操作 2.1 创建触发器 2.2 触发器操作实例 3 NEW与OLD 3.1 为什么需要NEW与OLD? 3.2 NEW与OLD实例 4 触发器的其他操作 5 触发器的注意事项
【MySQL】数据库进阶之触发器内容详解
|
消息中间件 分布式计算 监控
Flume+Kafka+Spark Streaming+MySQL实时日志分析
网络发展迅速的时代,越来越多人通过网络获取跟多的信息或通过网络作一番自己的事业,当投身于搭建属于自己的网站、APP或小程序时会发现,经过一段时间经营和维护发现浏览量和用户数量的增长速度始终没有提升。在对其进行设计改造时无从下手,当在不了解用户的浏览喜欢和个用户群体的喜好。虽然服务器日志中明确的记载了用户访浏览的喜好但是通过普通方式很难从大量的日志中及时有效的筛选出优质信息。Spark Streaming是一个实时的流计算框架,该技术可以对数据进行实时快速的分析,通过与Flume、Kafka的结合能够做到近乎零延迟的数据统计分析。
316 0
Flume+Kafka+Spark Streaming+MySQL实时日志分析
|
关系型数据库 MySQL 数据库
MySQL命令记录
MySQL编码 1.查看数据库编码格式 show create database <数据库名>; 2.查看数据表的编码格式 show create table <表名>; 3.创建数据库时指定数据库的字符集 create database <数据库名> character set utf8; 4.创建数据表时指定数据表的编码格式 create table tb_books( name varchar(45) not null, price double not null, bookCount int not null,
|
SQL 关系型数据库 MySQL
校招面试题有了!——记录一个有趣的mySql查询
## 前言 最近在做一个内部系统的报表统计功能,遇到了一个麻烦的查询场景,因为对sql语句确实不太熟练,在网上查了一些资料,最终找到了一个解法。具体场景和表结构并不复杂,对sql大佬们来说应该也是小case,不过细想确实有趣,特此记录下来。 ## 问题 已知表A有列:主键id、日期date和标签tag,其中tag是逗号分割的多值字符串(值的集合size不超过25个),如下如示例。请统计某段时间(例