TiDB实时同步数据到PostgreSQL(一) ---- 搭建kafka集群

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: TiDB实时同步数据到PostgreSQL的第一篇,主要介绍kafka集群的搭建。

业务系统早期采用LNMP架构,随着业务发展,数据量逐渐增加,主要业务表数据数据量均已过亿,加之产品设计原因,无法界定什么数据是历史数据,无法做冷热分离;业务又比较复杂,经常需要多张大表join查询;由于一些业务的特殊性以及架构原因,分库分表也很困难。几年前将数据库从MySQL迁移到TiDB,由于使用云服务器,又因为种种原因无法大量增加硬件,经常出现个别大用户一个大查询导致数据库雪崩,虽然可以通过设置一些参数限制SQL执行时间,但这样又导致一些大用户连统计分析业务都无法正常完成。也考虑过使用TiFlash,但经过测试,同等硬件、同样的数据,对于我们大部分业务查询,PostgreSQL的性能更胜一筹,本着适合自己的才是最好的这一原则,最终选择将TiDB的数据实时同步到PostgreSQL,TiDB完成业务,由PostgreSQL完成财务、统计分析等查询业务。

TiDB还是非常不错的,通过TiCDC结合canal-json协议将数据库变更数据同步到kafka,再通过自行开发的同步软件消费kafka数据将数据同步到PostgreSQL中。所以万里长征的第一步就是要搭建kafka集群,曾经也想偷懒整个单机kafka,但发现单机kafka极其不稳定,经常莫明其妙就崩了,生产环境中是绝对不能容忍这种情况发生的,所以一定要集群!!!

这里使用三台服务器(192.168.0.1、192.168.0.2、192.168.0.3),分别安装zookeeper跟kafka,有条件的话建议zookeeper与kafka分开安装。这里使用kafka 2.7.0以及kafka自带的zookeeper 3.5.8为例说明kafka集群的搭建过程。

一、下载及解压kafka

(本例假设将kafka安装到/home/test/server/kafka目录下)

wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar zxvf kafka_2.13-2.7.0.tgz

二、配置zookeeper

cd /home/test/server/kafka/conf
cat zookeeper.properties
# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0# Disable the adminserver by default to avoid port conflicts.# Set the port to something non-conflicting if choosing to enable thisadmin.enableServer=false# admin.serverPort=8080tickTime=2000initLimit=10syncLimit=5server.0=192.168.0.1:2888:3888
server.1=192.168.0.2:2888:3888
server.2=192.168.0.3:2888:3888

这里主要设置dataDir、dataLogDir、clientPort、server.0、server.1、server.2几个选项,其他的先用默认值。其余两台服务器的设置相同。

然后分别在三台服务器的/tmp/zookeeper/data目录下创建myid文件做为节点标识(分别对应配置文件中的server.0、server.1、server.2):

echo"0" > /tmp/zookeeper/data/myid #192.168.0.1echo"1" > /tmp/zookeeper/data/myid #192.168.0.2echo"2" > /tmp/zookeeper/data/myid #192.168.0.3

三、启动zookeeper

cd /home/test/server/kafka
#先不带-daemon参数直接启动,观察控制台输出是否有错,服务是否正常启动bin/zookeeper-server-start.sh config/zookeeper.properties
#正常启动无误后,可使用如下命令bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

四、配置kafka

kafka的配置文件有三个,分别是:producer.properties、consumer.properties、server.properties

cat /home/test/server/kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.broker.id=0#三台服务器的id值不能想同,分别取0、1、2# The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.0.1:9092  #本机ip地址# A comma separated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs  #消息队列数据会保存在这里,这里偷懒放在/tmp目录下,生产环境可别这样做!!!# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168#这个值是队列数据保存时间
cat /home/test/server/kafka/config/producer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
cat /home/test/server/kafka/config/consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092

五、启动kafka

cd /home/test/server/kafka
bin/kafka-server-start.sh -daemon config/server.properties

启动完成后可用通过ps -ef检查zookeeper、kafka是否正常运行。


六、kafka集群管理


前台启动broker

bin/kafka-server-start.sh <path>/server.properties

Ctrl + C 关闭

后台启动broker

bin/kafka-server-start.sh -daemon <path>/server.properties

关闭broker

bin/kafka-server-stop.sh


七、kafkaTopic管理


创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topicname

删除topic

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topicname

查询topic列表

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查询topic详情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topicname

修改topic

bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic topicname


八、Kafka Consumer-Groups管理


查询消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname

重设消费者组位移

最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000

删除消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname


九、kafka脚本工具


producer脚本

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
参数含义:
--compression-codec lz4  压缩类型
--request-required-acks all acks的值
--timeout 3000  linger.ms的值
--message-send-max-retries 10   retries的值
--max-partition-memory-bytes batch.size值

consumer脚本

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--consumer-property group.id=old-consumer-group
指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--partition 0

kafka-run-class脚本

kafka-run-class.sh kafka.tools.ConsoleConsumer   就是 kafka-console-consumer.sh
kafka-run-class.sh kafka.tools.ConsoleProducer   就是 kafka-console-producer.sh

获取topic当前消息数

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1

--time -1表示最大位移 --time -2表示最早位移

查询_consumer_offsets

bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"


十、MirrorMaker


跨机房灾备工具

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB


相关文章
|
22天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
57 4
|
1月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
53 2
|
20天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
65 6
|
3月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
78 5
|
3月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
3月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
76 2
|
3月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
376 0
|
4月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
69 8
|
3月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。