在kafka connect 同步 mysql 主从数据库

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 在kafka connect 同步 mysql 主从数据库

在kafka connect 同步 mysql 主从数据库

下载以下文件,解压,放置到kafka的libs目录

kafka-connect-jdbc-4.1.1

从这里选择适合的mysql connector

mysql-connector-java-8.0.16.jar

将里面的jar文件提取出来,也放到kafka的libs目录

在config目录下创建 connect-mysql-source.properties

创建 A数据库源表person

CREATE TABLE `person` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

创建 B数据库目标表kafkaperson

CREATE TABLE `kafkaperson` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

connect-mysql-source.properties 内容为

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=person
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-

connect-mysql-sink.properties 内容

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson

启动kafka

参考 kafka安装

如果报 The server time zone value ” is  unrecognized or represents more than one time 。。。

以命令行进入mysql

mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name    | Value  |
+------------------+--------+
| system_time_zone |        |
| time_zone        | SYSTEM |
+------------------+--------+
2 rows in set

如果输出结果是system

设置time_zone即可

1. mysql> set global time_zone='+8:00';
2. Query OK, 0 rows affected;稍微有点延迟、并且只有添加才会同步,更新、删除都不行。


相关文章
|
17天前
|
SQL 数据可视化 关系型数据库
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
轻松入门MySQL:深入探究MySQL的ER模型,数据库设计的利器与挑战(22)
|
13天前
|
存储 关系型数据库 MySQL
MySQL基础入门:数据库操作全攻略
MySQL基础入门:数据库操作全攻略
44 0
|
13天前
|
关系型数据库 MySQL 数据库
卸载云服务器上的 MySQL 数据库
卸载云服务器上的 MySQL 数据库
30 0
|
1天前
|
关系型数据库 MySQL 分布式数据库
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
19 2
|
3天前
|
SQL 关系型数据库 MySQL
MySQL环境搭建——“MySQL数据库”
MySQL环境搭建——“MySQL数据库”
|
3天前
|
SQL NoSQL 关系型数据库
初识MySQL数据库——“MySQL数据库”
初识MySQL数据库——“MySQL数据库”
|
6天前
|
关系型数据库 MySQL 数据库
数据库基础(mysql)
数据库基础(mysql)
|
6天前
|
SQL 关系型数据库 数据库
【后端面经】【数据库与MySQL】SQL优化:如何发现SQL中的问题?
【4月更文挑战第12天】数据库优化涉及硬件升级、操作系统调整、服务器/引擎优化和SQL优化。SQL优化目标是减少磁盘IO和内存/CPU消耗。`EXPLAIN`命令用于检查SQL执行计划,关注`type`、`possible_keys`、`key`、`rows`和`filtered`字段。设计索引时考虑外键、频繁出现在`where`、`order by`和关联查询中的列,以及区分度高的列。大数据表改结构需谨慎,可能需要停机、低峰期变更或新建表。面试中应准备SQL优化案例,如覆盖索引、优化`order by`、`count`和索引提示。优化分页查询时避免大偏移量,可利用上一批的最大ID进行限制。
32 3
|
6天前
|
存储 关系型数据库 MySQL
【后端面经】【数据库与MySQL】为什么MySQL用B+树而不用B树?-02
【4月更文挑战第11天】数据库索引使用规则:`AND`用`OR`不用,正用反不用,范围中断。索引带来空间和内存代价,包括额外磁盘空间、内存占用和数据修改时的维护成本。面试中可能涉及B+树、聚簇索引、覆盖索引等知识点。MySQL采用B+树,因其利于范围查询和内存效率。数据库不使用索引可能因`!=`、`LIKE`、字段区分度低、特殊表达式或全表扫描更快。索引与NULL值处理在不同数据库中有差异,MySQL允许NULL在索引中的使用。
11 3
|
8天前
|
关系型数据库 MySQL 数据库连接
Django(四):Django项目部署数据库及服务器配置详解(MySQL)
Django(四):Django项目部署数据库及服务器配置详解(MySQL)
30 11

热门文章

最新文章