TiDB实时同步数据到PostgreSQL(一) ---- 搭建kafka集群

本文涉及的产品
PolarClaw,2核4GB
简介: TiDB实时同步数据到PostgreSQL的第一篇,主要介绍kafka集群的搭建。

业务系统早期采用LNMP架构,随着业务发展,数据量逐渐增加,主要业务表数据数据量均已过亿,加之产品设计原因,无法界定什么数据是历史数据,无法做冷热分离;业务又比较复杂,经常需要多张大表join查询;由于一些业务的特殊性以及架构原因,分库分表也很困难。几年前将数据库从MySQL迁移到TiDB,由于使用云服务器,又因为种种原因无法大量增加硬件,经常出现个别大用户一个大查询导致数据库雪崩,虽然可以通过设置一些参数限制SQL执行时间,但这样又导致一些大用户连统计分析业务都无法正常完成。也考虑过使用TiFlash,但经过测试,同等硬件、同样的数据,对于我们大部分业务查询,PostgreSQL的性能更胜一筹,本着适合自己的才是最好的这一原则,最终选择将TiDB的数据实时同步到PostgreSQL,TiDB完成业务,由PostgreSQL完成财务、统计分析等查询业务。

TiDB还是非常不错的,通过TiCDC结合canal-json协议将数据库变更数据同步到kafka,再通过自行开发的同步软件消费kafka数据将数据同步到PostgreSQL中。所以万里长征的第一步就是要搭建kafka集群,曾经也想偷懒整个单机kafka,但发现单机kafka极其不稳定,经常莫明其妙就崩了,生产环境中是绝对不能容忍这种情况发生的,所以一定要集群!!!

这里使用三台服务器(192.168.0.1、192.168.0.2、192.168.0.3),分别安装zookeeper跟kafka,有条件的话建议zookeeper与kafka分开安装。这里使用kafka 2.7.0以及kafka自带的zookeeper 3.5.8为例说明kafka集群的搭建过程。

一、下载及解压kafka

(本例假设将kafka安装到/home/test/server/kafka目录下)

wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar zxvf kafka_2.13-2.7.0.tgz

二、配置zookeeper

cd /home/test/server/kafka/conf
cat zookeeper.properties
# the directory where the snapshot is stored.dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0# Disable the adminserver by default to avoid port conflicts.# Set the port to something non-conflicting if choosing to enable thisadmin.enableServer=false# admin.serverPort=8080tickTime=2000initLimit=10syncLimit=5server.0=192.168.0.1:2888:3888
server.1=192.168.0.2:2888:3888
server.2=192.168.0.3:2888:3888

这里主要设置dataDir、dataLogDir、clientPort、server.0、server.1、server.2几个选项,其他的先用默认值。其余两台服务器的设置相同。

然后分别在三台服务器的/tmp/zookeeper/data目录下创建myid文件做为节点标识(分别对应配置文件中的server.0、server.1、server.2):

echo"0" > /tmp/zookeeper/data/myid #192.168.0.1echo"1" > /tmp/zookeeper/data/myid #192.168.0.2echo"2" > /tmp/zookeeper/data/myid #192.168.0.3

三、启动zookeeper

cd /home/test/server/kafka
#先不带-daemon参数直接启动,观察控制台输出是否有错,服务是否正常启动bin/zookeeper-server-start.sh config/zookeeper.properties
#正常启动无误后,可使用如下命令bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

四、配置kafka

kafka的配置文件有三个,分别是:producer.properties、consumer.properties、server.properties

cat /home/test/server/kafka/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.broker.id=0#三台服务器的id值不能想同,分别取0、1、2# The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.0.1:9092  #本机ip地址# A comma separated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs  #消息队列数据会保存在这里,这里偷懒放在/tmp目录下,生产环境可别这样做!!!# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168#这个值是队列数据保存时间
cat /home/test/server/kafka/config/producer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
cat /home/test/server/kafka/config/consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092

五、启动kafka

cd /home/test/server/kafka
bin/kafka-server-start.sh -daemon config/server.properties

启动完成后可用通过ps -ef检查zookeeper、kafka是否正常运行。


六、kafka集群管理


前台启动broker

bin/kafka-server-start.sh <path>/server.properties

Ctrl + C 关闭

后台启动broker

bin/kafka-server-start.sh -daemon <path>/server.properties

关闭broker

bin/kafka-server-stop.sh


七、kafkaTopic管理


创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topicname

删除topic

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topicname

查询topic列表

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

查询topic详情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topicname

修改topic

bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic topicname


八、Kafka Consumer-Groups管理


查询消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查询消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname

重设消费者组位移

最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
调整到某个时间之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000

删除消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname


九、kafka脚本工具


producer脚本

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
参数含义:
--compression-codec lz4  压缩类型
--request-required-acks all acks的值
--timeout 3000  linger.ms的值
--message-send-max-retries 10   retries的值
--max-partition-memory-bytes batch.size值

consumer脚本

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--consumer-property group.id=old-consumer-group
指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
--partition 0

kafka-run-class脚本

kafka-run-class.sh kafka.tools.ConsoleConsumer   就是 kafka-console-consumer.sh
kafka-run-class.sh kafka.tools.ConsoleProducer   就是 kafka-console-producer.sh

获取topic当前消息数

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1

--time -1表示最大位移 --time -2表示最早位移

查询_consumer_offsets

bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"


十、MirrorMaker


跨机房灾备工具

bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB


相关文章
|
7月前
|
安全 Oracle 关系型数据库
【赵渝强老师】基于PostgreSQL的MPP集群:Greenplum
Greenplum是基于PostgreSQL的MPP架构分布式数据库,由Master、Segment和Interconnect组成,支持海量数据并行处理。本文介绍其架构及集群安装配置全过程。
621 1
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
391 1
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据库集群
PostgreSQL的逻辑存储结构涵盖了数据库集群、数据库、表、索引、视图等对象,每个对象都有唯一的oid标识。数据库集群是由单个PostgreSQL实例管理的所有数据库集合,共享同一配置和资源。集群的数据存储在一个称为数据目录的单一目录中,可通过-D选项或PGDATA环境变量指定。
329 3
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
2124 0
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
604 1

推荐镜像

更多