TiDB实时同步数据到PostgreSQL(一) ---- 搭建kafka集群

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: TiDB实时同步数据到PostgreSQL的第一篇,主要介绍kafka集群的搭建。

业务系统早期采用LNMP架构,随着业务发展,数据量逐渐增加,主要业务表数据数据量均已过亿,加之产品设计原因,无法界定什么数据是历史数据,无法做冷热分离;业务又比较复杂,经常需要多张大表join查询;由于一些业务的特殊性以及架构原因,分库分表也很困难。几年前将数据库从MySQL迁移到TiDB,由于使用云服务器,又因为种种原因无法大量增加硬件,经常出现个别大用户一个大查询导致数据库雪崩,虽然可以通过设置一些参数限制SQL执行时间,但这样又导致一些大用户连统计分析业务都无法正常完成。也考虑过使用TiFlash,但经过测试,同等硬件、同样的数据,对于我们大部分业务查询,PostgreSQL的性能更胜一筹,本着适合自己的才是最好的这一原则,最终选择将TiDB的数据实时同步到PostgreSQL,TiDB完成业务,由PostgreSQL完成财务、统计分析等查询业务。

TiDB还是非常不错的,通过TiCDC结合canal-json协议将数据库变更数据同步到kafka,再通过自行开发的同步软件消费kafka数据将数据同步到PostgreSQL中。所以万里长征的第一步就是要搭建kafka集群,曾经也想偷懒整个单机kafka,但发现单机kafka极其不稳定,经常莫明其妙就崩了,生产环境中是绝对不能容忍这种情况发生的,所以一定要集群!!!

这里使用三台服务器(192.168.0.1、192.168.0.2、192.168.0.3),分别安装zookeeper跟kafka,有条件的话建议zookeeper与kafka分开安装。这里使用kafka 2.7.0以及kafka自带的zookeeper 3.5.8为例说明kafka集群的搭建过程。

一、下载及解压kafka

(本例假设将kafka安装到/home/test/server/kafka目录下)

wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar zxvf kafka_2.13-2.7.0.tgz

二、配置zookeeper

cd /home/test/server/kafka/conf
cat zookeeper.properties
# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0# Disable the adminserver by default to avoid port conflicts.# Set the port to something non-conflicting if choosing to enable thisadmin.enableServer=false# admin.serverPort=8080tickTime=2000initLimit=10syncLimit=5server.0=192.168.0.1:2888:3888
server.1=192.168.0.2:2888:3888
server.2=192.168.0.3:2888:3888

这里主要设置dataDir、dataLogDir、clientPort、server.0、server.1、server.2几个选项,其他的先用默认值。其余两台服务器的设置相同。

然后分别在三台服务器的/tmp/zookeeper/data目录下创建myid文件做为节点标识(分别对应配置文件中的server.0、server.1、server.2):

echo"0" > /tmp/zookeeper/data/myid #192.168.0.1echo"1" > /tmp/zookeeper/data/myid #192.168.0.2echo"2" > /tmp/zookeeper/data/myid #192.168.0.3

三、启动zookeeper

cd /home/test/server/kafka
#先不带-daemon参数直接启动,观察控制台输出是否有错,服务是否正常启动bin/zookeeper-server-start.sh config/zookeeper.properties
#正常启动无误后,可使用如下命令bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

四、配置kafka

kafka的配置文件有三个,分别是:producer.properties、consumer.properties、server.properties

cat /home/test/server/kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.broker.id=0#三台服务器的id值不能想同,分别取0、1、2# The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.0.1:9092  #本机ip地址# A comma separated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs  #消息队列数据会保存在这里,这里偷懒放在/tmp目录下,生产环境可别这样做!!!# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168#这个值是队列数据保存时间
cat /home/test/server/kafka/config/producer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
cat /home/test/server/kafka/config/consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092

五、启动kafka

cd /home/test/server/kafka
bin/kafka-server-start.sh -daemon config/server.properties

启动完成后可用通过ps -ef检查zookeeper、kafka是否正常运行。


六、kafka集群管理


前台启动broker

bin/kafka-server-start.sh <path>/server.properties

Ctrl + C 关闭

后台启动broker

bin/kafka-server-start.sh -daemon <path>/server.properties

关闭broker

bin/kafka-server-stop.sh


七、kafkaTopic管理


创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topicname

删除topic

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topicname

查询topic列表

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查询topic详情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topicname

修改topic

bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic topicname


八、Kafka Consumer-Groups管理


查询消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname

重设消费者组位移

最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000

删除消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname


九、kafka脚本工具


producer脚本

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
参数含义:
--compression-codec lz4  压缩类型
--request-required-acks all acks的值
--timeout 3000  linger.ms的值
--message-send-max-retries 10   retries的值
--max-partition-memory-bytes batch.size值

consumer脚本

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--consumer-property group.id=old-consumer-group
指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--partition 0

kafka-run-class脚本

kafka-run-class.sh kafka.tools.ConsoleConsumer   就是 kafka-console-consumer.sh
kafka-run-class.sh kafka.tools.ConsoleProducer   就是 kafka-console-producer.sh

获取topic当前消息数

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1

--time -1表示最大位移 --time -2表示最早位移

查询_consumer_offsets

bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"


十、MirrorMaker


跨机房灾备工具

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB


目录
相关文章
|
2月前
|
存储 关系型数据库 分布式数据库
PolarDB常见问题之PolarDB冷存数据到OSS之后恢复失败如何解决
PolarDB是阿里云推出的下一代关系型数据库,具有高性能、高可用性和弹性伸缩能力,适用于大规模数据处理场景。本汇总囊括了PolarDB使用中用户可能遭遇的一系列常见问题及解答,旨在为数据库管理员和开发者提供全面的问题指导,确保数据库平稳运行和优化使用体验。
|
20天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
33 0
|
2天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
9 0
|
14天前
|
SQL 关系型数据库 MySQL
关系型数据库插入数据的语句
使用SQL的`INSERT INTO`语句向关系型数据库的`students`表插入数据。例如,插入一个`id`为1,`name`为&#39;张三&#39;,`age`为20的记录:`INSERT INTO students (id, name, age) VALUES (1, &#39;张三&#39;, 20)。如果`id`自增,则可简化为`INSERT INTO students (name, age) VALUES (&#39;张三&#39;, 20)`。
21 2
|
14天前
|
SQL 存储 Oracle
关系型数据库查询数据的语句
本文介绍了关系型数据库中的基本SQL查询语句,包括选择所有或特定列、带条件查询、排序、分组、过滤分组、表连接、限制记录数及子查询。SQL还支持窗口函数、存储过程等高级功能,是高效管理数据库的关键。建议深入学习SQL及相应数据库系统文档。
10 2
|
17天前
|
消息中间件 存储 Kafka
【Kafka】Kafka 的日志保留期与数据清理策略
【4月更文挑战第13天】【Kafka】Kafka 的日志保留期与数据清理策略
|
17天前
|
消息中间件 存储 缓存
【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
【4月更文挑战第13天】【Kakfa】Kafka 的Topic中 Partition 数据是怎么存储到磁盘的?
|
21天前
|
人工智能 Cloud Native 算法
数据之势丨AI时代,云原生数据库的最新发展趋势与进展
AI与云数据库的深度结合是数据库发展的必然趋势,基于AI能力的加持,云数据库未来可以实现更快速的查询和决策,帮助企业更好地利用海量数据进行业务创新和决策优化。
数据之势丨AI时代,云原生数据库的最新发展趋势与进展
|
23天前
|
消息中间件 Kafka API
【Kafka】kafka 如何不消费重复数据?
【4月更文挑战第7天】【Kafka】kafka 如何不消费重复数据?
|
2月前
|
关系型数据库 MySQL OLAP
PolarDB +AnalyticDB Zero-ETL :免费同步数据到ADB,享受数据流通新体验
Zero-ETL是阿里云瑶池数据库提供的服务,旨在简化传统ETL流程的复杂性和成本,提高数据实时性。降低数据同步成本,允许用户快速在AnalyticDB中对PolarDB数据进行分析,降低了30%的数据接入成本,提升了60%的建仓效率。 Zero-ETL特性包括免费的PolarDB MySQL联邦分析和PolarDB-X元数据自动同步,提供一体化的事务处理和数据分析,并能整合多个数据源。用户只需简单配置即可实现数据同步和实时分析。