业务系统早期采用LNMP架构,随着业务发展,数据量逐渐增加,主要业务表数据数据量均已过亿,加之产品设计原因,无法界定什么数据是历史数据,无法做冷热分离;业务又比较复杂,经常需要多张大表join查询;由于一些业务的特殊性以及架构原因,分库分表也很困难。几年前将数据库从MySQL迁移到TiDB,由于使用云服务器,又因为种种原因无法大量增加硬件,经常出现个别大用户一个大查询导致数据库雪崩,虽然可以通过设置一些参数限制SQL执行时间,但这样又导致一些大用户连统计分析业务都无法正常完成。也考虑过使用TiFlash,但经过测试,同等硬件、同样的数据,对于我们大部分业务查询,PostgreSQL的性能更胜一筹,本着适合自己的才是最好的这一原则,最终选择将TiDB的数据实时同步到PostgreSQL,TiDB完成业务,由PostgreSQL完成财务、统计分析等查询业务。
TiDB还是非常不错的,通过TiCDC结合canal-json协议将数据库变更数据同步到kafka,再通过自行开发的同步软件消费kafka数据将数据同步到PostgreSQL中。所以万里长征的第一步就是要搭建kafka集群,曾经也想偷懒整个单机kafka,但发现单机kafka极其不稳定,经常莫明其妙就崩了,生产环境中是绝对不能容忍这种情况发生的,所以一定要集群!!!
这里使用三台服务器(192.168.0.1、192.168.0.2、192.168.0.3),分别安装zookeeper跟kafka,有条件的话建议zookeeper与kafka分开安装。这里使用kafka 2.7.0以及kafka自带的zookeeper 3.5.8为例说明kafka集群的搭建过程。
一、下载及解压kafka
(本例假设将kafka安装到/home/test/server/kafka目录下)
wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz tar zxvf kafka_2.13-2.7.0.tgz
二、配置zookeeper
cd /home/test/server/kafka/conf cat zookeeper.properties # the directory where the snapshot is stored.dataDir=/tmp/zookeeper/data dataLogDir=/tmp/zookeeper/log # the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0# Disable the adminserver by default to avoid port conflicts.# Set the port to something non-conflicting if choosing to enable thisadmin.enableServer=false# admin.serverPort=8080tickTime=2000initLimit=10syncLimit=5server.0=192.168.0.1:2888:3888 server.1=192.168.0.2:2888:3888 server.2=192.168.0.3:2888:3888
这里主要设置dataDir、dataLogDir、clientPort、server.0、server.1、server.2几个选项,其他的先用默认值。其余两台服务器的设置相同。
然后分别在三台服务器的/tmp/zookeeper/data目录下创建myid文件做为节点标识(分别对应配置文件中的server.0、server.1、server.2):
echo"0" > /tmp/zookeeper/data/myid #192.168.0.1echo"1" > /tmp/zookeeper/data/myid #192.168.0.2echo"2" > /tmp/zookeeper/data/myid #192.168.0.3
三、启动zookeeper
cd /home/test/server/kafka #先不带-daemon参数直接启动,观察控制台输出是否有错,服务是否正常启动bin/zookeeper-server-start.sh config/zookeeper.properties #正常启动无误后,可使用如下命令bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
四、配置kafka
kafka的配置文件有三个,分别是:producer.properties、consumer.properties、server.properties
cat /home/test/server/kafka/config/server.properties # The id of the broker. This must be set to a unique integer for each broker.broker.id=0#三台服务器的id值不能想同,分别取0、1、2# The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.# FORMAT:# listeners = listener_name://host_name:port# EXAMPLE:# listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://192.168.0.1:9092 #本机ip地址# A comma separated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs #消息队列数据会保存在这里,这里偷懒放在/tmp目录下,生产环境可别这样做!!!# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168#这个值是队列数据保存时间
cat /home/test/server/kafka/config/producer.properties # list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
cat /home/test/server/kafka/config/consumer.properties # list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...bootstrap.servers=192.168.0.1:9092,192:168.0.2:9092,192.168.0.3:9092
五、启动kafka
cd /home/test/server/kafka bin/kafka-server-start.sh -daemon config/server.properties
启动完成后可用通过ps -ef检查zookeeper、kafka是否正常运行。
六、kafka集群管理
前台启动broker
bin/kafka-server-start.sh <path>/server.properties
Ctrl + C 关闭
后台启动broker
bin/kafka-server-start.sh -daemon <path>/server.properties
关闭broker
bin/kafka-server-stop.sh
七、kafkaTopic管理
创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topicname
删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic topicname
查询topic列表
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
查询topic详情
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topicname
修改topic
bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic topicname
八、Kafka Consumer-Groups管理
查询消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查询消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname
重设消费者组位移
最早处 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute 最新处 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute 某个位置 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute 调整到某个时间之后的最早位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000
删除消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group groupname
九、kafka脚本工具
producer脚本
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname 参数含义: --compression-codec lz4 压缩类型 --request-required-acks all acks的值 --timeout 3000 linger.ms的值 --message-send-max-retries 10 retries的值 --max-partition-memory-bytes batch.size值
consumer脚本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning 指定groupid bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group 指定分区 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0
kafka-run-class脚本
kafka-run-class.sh kafka.tools.ConsoleConsumer 就是 kafka-console-consumer.sh kafka-run-class.sh kafka.tools.ConsoleProducer 就是 kafka-console-producer.sh
获取topic当前消息数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1
--time -1表示最大位移 --time -2表示最早位移
查询_consumer_offsets
bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"
十、MirrorMaker
跨机房灾备工具
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB