在kafka connect 同步 mysql 主从数据库

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 在kafka connect 同步 mysql 主从数据库

在kafka connect 同步 mysql 主从数据库

下载以下文件,解压,放置到kafka的libs目录

kafka-connect-jdbc-4.1.1

从这里选择适合的mysql connector

mysql-connector-java-8.0.16.jar

将里面的jar文件提取出来,也放到kafka的libs目录

在config目录下创建 connect-mysql-source.properties

创建 A数据库源表person

CREATE TABLE `person` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

创建 B数据库目标表kafkaperson

CREATE TABLE `kafkaperson` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

connect-mysql-source.properties 内容为

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=person
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-

connect-mysql-sink.properties 内容

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson

启动kafka

参考 kafka安装

如果报 The server time zone value ” is  unrecognized or represents more than one time 。。。

以命令行进入mysql

mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name    | Value  |
+------------------+--------+
| system_time_zone |        |
| time_zone        | SYSTEM |
+------------------+--------+
2 rows in set

如果输出结果是system

设置time_zone即可

1. mysql> set global time_zone='+8:00';
2. Query OK, 0 rows affected;稍微有点延迟、并且只有添加才会同步,更新、删除都不行。


目录
打赏
0
0
0
0
8
分享
相关文章
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
本文详细介绍了在 MySQL 中创建数据库和表的方法。包括安装 MySQL、用命令行和图形化工具创建数据库、选择数据库、创建表(含数据类型介绍与选择建议、案例分析、最佳实践与注意事项)以及查看数据库和表的内容。文章专业、严谨且具可操作性,对数据管理有实际帮助。
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
MySQL下载安装全攻略!小白也能轻松上手,从此数据库不再难搞!
这是一份详细的MySQL安装与配置教程,适合初学者快速上手。内容涵盖从下载到安装的每一步操作,包括选择版本、设置路径、配置端口及密码等。同时提供基础操作指南,如数据库管理、数据表增删改查、用户权限设置等。还介绍了备份恢复、图形化工具使用和性能优化技巧,帮助用户全面掌握MySQL的使用方法。附带常见问题解决方法,保姆级教学让你无忧入门!
MySQL下载安装全攻略!小白也能轻松上手,从此数据库不再难搞!
MySQL与Clickhouse数据库:探讨日期和时间的加法运算。
这一次的冒险就到这儿,期待你的再次加入,我们一起在数据库的世界中找寻下一个宝藏。
44 9
【赵渝强老师】OceanBase数据库从零开始:MySQL模式
《OceanBase数据库从零开始:MySQL模式》是一门包含11章的课程,涵盖OceanBase分布式数据库的核心内容。从体系架构、安装部署到租户管理、用户安全,再到数据库对象操作、事务与锁机制,以及应用程序开发、备份恢复、数据迁移等方面进行详细讲解。此外,还涉及连接路由管理和监控诊断等高级主题,帮助学员全面掌握OceanBase数据库的使用与管理。
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
docker拉取MySQL后数据库连接失败解决方案
通过以上方法,可以解决Docker中拉取MySQL镜像后数据库连接失败的常见问题。关键步骤包括确保容器正确启动、配置正确的环境变量、合理设置网络和权限,以及检查主机防火墙设置等。通过逐步排查,可以快速定位并解决连接问题,确保MySQL服务的正常使用。
506 82
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。

热门文章

最新文章

推荐镜像

更多