在kafka connect 同步 mysql 主从数据库

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 在kafka connect 同步 mysql 主从数据库

在kafka connect 同步 mysql 主从数据库

下载以下文件,解压,放置到kafka的libs目录

kafka-connect-jdbc-4.1.1

从这里选择适合的mysql connector

mysql-connector-java-8.0.16.jar

将里面的jar文件提取出来,也放到kafka的libs目录

在config目录下创建 connect-mysql-source.properties

创建 A数据库源表person

CREATE TABLE `person` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

创建 B数据库目标表kafkaperson

CREATE TABLE `kafkaperson` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

connect-mysql-source.properties 内容为

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=person
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-

connect-mysql-sink.properties 内容

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson

启动kafka

参考 kafka安装

如果报 The server time zone value ” is  unrecognized or represents more than one time 。。。

以命令行进入mysql

mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name    | Value  |
+------------------+--------+
| system_time_zone |        |
| time_zone        | SYSTEM |
+------------------+--------+
2 rows in set

如果输出结果是system

设置time_zone即可

1. mysql> set global time_zone='+8:00';
2. Query OK, 0 rows affected;稍微有点延迟、并且只有添加才会同步,更新、删除都不行。


相关文章
|
9天前
|
关系型数据库 MySQL 分布式数据库
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
45 2
|
6天前
|
SQL 存储 关系型数据库
数据库开发之mysql前言以及详细解析
数据库开发之mysql前言以及详细解析
14 0
|
3天前
|
缓存 NoSQL 关系型数据库
在Python Web开发过程中:数据库与缓存,MySQL和NoSQL数据库的主要差异是什么?
MySQL与NoSQL的主要区别在于数据结构、查询语言和可扩展性。MySQL是关系型数据库,依赖预定义的数据表结构,使用SQL进行复杂查询,适合垂直扩展。而NoSQL提供灵活的存储方式(如JSON、哈希表),无统一查询语言,支持横向扩展,适用于处理大规模、非结构化数据和高并发场景。选择哪种取决于应用需求、数据模型及扩展策略。
13 0
|
11天前
|
SQL 关系型数据库 MySQL
MySQL环境搭建——“MySQL数据库”
MySQL环境搭建——“MySQL数据库”
|
11天前
|
SQL NoSQL 关系型数据库
初识MySQL数据库——“MySQL数据库”
初识MySQL数据库——“MySQL数据库”
|
13天前
|
关系型数据库 MySQL 数据库
数据库基础(mysql)
数据库基础(mysql)
|
13天前
|
SQL 关系型数据库 数据库
【后端面经】【数据库与MySQL】SQL优化:如何发现SQL中的问题?
【4月更文挑战第12天】数据库优化涉及硬件升级、操作系统调整、服务器/引擎优化和SQL优化。SQL优化目标是减少磁盘IO和内存/CPU消耗。`EXPLAIN`命令用于检查SQL执行计划,关注`type`、`possible_keys`、`key`、`rows`和`filtered`字段。设计索引时考虑外键、频繁出现在`where`、`order by`和关联查询中的列,以及区分度高的列。大数据表改结构需谨慎,可能需要停机、低峰期变更或新建表。面试中应准备SQL优化案例,如覆盖索引、优化`order by`、`count`和索引提示。优化分页查询时避免大偏移量,可利用上一批的最大ID进行限制。
39 3
|
25天前
|
SQL 数据可视化 关系型数据库
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
106 0
|
25天前
|
存储 关系型数据库 MySQL
轻松入门MySQL:数据库设计之范式规范,优化企业管理系统效率(21)
轻松入门MySQL:数据库设计之范式规范,优化企业管理系统效率(21)
|
25天前
|
关系型数据库 MySQL 数据库
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)
轻松入门MySQL:精准查询,巧用WHERE与HAVING,数据库查询如虎添翼(7)