架构师之路-如何建立高可用消息中间件kafka

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: Kafka 一、熟悉kafka l  Server-1 broker其实就是kafka的server,因为producer和consumer都要去连它。Broker主要还是做存储用。

Kafka

一、熟悉kafka






l  Server-1 broker其实就是kafka的server,因为producer和consumer都要去连它。Broker主要还是做存储用。


l  Server-2是zookeeper的server端,zookeeper的具体作用你可以去官网查,在这里你可以先想象,它维持了一张表,记录了各个节点的IP、端口等信息(以后还会讲到,它里面还存了kafka的相关信息)。


l  Server-3、4、5他们的共同之处就是都配置了zkClient,更明确的说,就是运行前必须配置zookeeper的地址,道理也很简单,这之间的连接都是需要zookeeper来进行分发的。


l  Server-1和Server-2的关系,他们可以放在一台机器上,也可以分开放,zookeeper也可以配集群。目的是防止某一台挂了。


简单说下整个系统运行的顺序:

1. 启动zookeeper的server

2. 启动kafka的server

3. Producer如果生产了数据,会先通过zookeeper找到broker,然后将数据存放进broker

4.  Consumer如果要消费数据,会先通过zookeeper找对应的broker,然后消费。

 

Kafka 分布式消息队列 类似产品有JBoss、MQ


一、由Linkedln 开源,使用scala开发,有如下几个特点:

(1)高吞吐

(2)分布式

(3)支持多语言客户端 (C++、Java)


二、组成: 客户端是 producer 和 consumer,提供一些API,服务器端是Broker,客户端提供可以向Broker内发布消息、消费消息,服务器端提供消息的存储等功能

Kafka 特点是支持分区、分布式、可拓展性强


三、Kafka 的消息分几个层次

(1)Topic 一类主题

(2)Partition 默认每个消息有2个分区,创建Topic可以指定分区数,1天有 1亿行可以分8个分区,如果每天几十万行就一个分区吧

(3)Message 是每个消息


四、数据处理流程

1.生产者 生产消息、将消息发布到指定的topic分区

2.kafka 集群接收到producer发过来的消息后,将其持久化到硬盘,可以指定时长,而不关注消息是否被消费

3.consumer从kafka集群pull或push方式,并控制获取消息的offset偏移量,consumer重启时需要根据offset开始再次消费数据,consumer自己维护offset


五、kafka如何实现高吞吐量

1.充分利用磁盘的顺序读写
2.数据批量发送
3.数据压缩
4.Topic划分多个partition


六、kafka 如何实现load balance &HA

1)producer 根据用户指定的算法,将消息发送到指定的partition
2)存在多个partition,每个partition存在多个副本replica,每个replica分布在不同的broker节点上
3)每个partition需要选取lead partition,leader partition负责读写,并由zookeeper负责fail over 快速失败
4)通过zookeeper管理broker与consumer的动态加入与离开


七、扩容

当需要增加broker节点时,新增的broker会向zookeeper注册,而producer及consumer会根据zookeeper上的watcher感知这些变化,并及时作出调整

 

副本分配逻辑规则如下:

  • 在Kafka集群中,每个Broker都有均等分配Partition的Leader机会。

  • 上述图Broker      Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。

  • 上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。

 

副本分配算法如下:

  • 将所有N      Broker和待分配的i个Partition排序.

  • 将第i个Partition分配到第(i mod      n)个Broker上.

  • 将第i个Partition的第j个副本分配到第((i +      j) mod n)个Broker上.

 

二、安装zookeeper,并配置集群

准备三台机器做集群

服务器

IP地址

端口

服务器1

172.16.0.41

2181/2881/3881

服务器2

172.16.0.42

2182/2882/3882

服务器3

172.16.0.43

2183/2883/3883


2.1配置java环境

将jdk-7u79-linux-x64上传到三台服务器安装配置。

给三台服务器分别创建java文件夹。

将jdk 放到java文件夹下并解压,然后删掉压缩文件。

配置jdk全局变量。

#vi /etc/profile

export JAVA_HOME=/usr/local/java/jdk1.7.0_79

export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH

export PATH=$JAVA_HOME/bin:$PATH


2.2 修改操作系统的/etc/hosts文件,添加IP与主机名映射:

   # zookeeper cluster servers

172.16.0.41 edu-zk-01

172.16.0.42 edu-zk-02

172.16.0.43 edu-zk-03


2.3下载zookeeper-3.4.7.tar.gz 到/home/zy/zookeeper目录

# mkdir -p /usr/local/zookeeper

# cd / usr/local/zookeeper/

# wget http://apache.fayea.com/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz


2.4 解压zookeeper安装包,并对节点重民名

#tar -zxvf zookeeper-3.4.7.tar.gz

服务器1:

#mv zookeeper-3.4.7 node-01

服务器2:
    #mv zookeeper-3.4.7 node-02

服务器3:

#mv zookeeper-3.4.7 node-03


2.5 在zookeeper的各个节点下 创建数据和日志目录

#cd /usr/local/zookeeper

#mkdir data

#mkdir logs


2.6 重命名配置文件

    将zookeeper/node-0X/conf目录下的zoo_sample.cfg文件拷贝一份,命名为zoo.cfg:

#cp zoo_sample.cfg zoo.cfg


2.7 修改zoo.cfg 配置文件

三台服务器做同样配置:zookeeper/node-01的配置(/usr/local/zookeeper/node-01/conf/zoo.cfg)如下:




参数说明:

tickTime=2000

tickTime这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳。


initLimit=10

initLimit这个配置项是用来配置Zookeeper接受客户端(这里所说的客户端不是用户连接Zookeeper服务器的客户端,而是Zookeeper服务器集群中连接到Leader的Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过10个心跳的时间(也就是tickTime)长度后Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是10*2000=20 秒。


syncLimit=5

syncLimit这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。

dataDir=/usr/local/zookeeper/node-01/data

dataDir顾名思义就是Zookeeper保存数据的目录,默认情况下Zookeeper将写数据的日志文件也保存在这个目录里。

clientPort=2181

clientPort这个端口就是客户端(应用程序)连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求。

server.A=B:C:D

server.1=edu-zk-01:2881:3881

server.2=edu-zk-02:2882:3882

server.3=edu-zk-03:2883:3883

A是一个数字,表示这个是第几号服务器;

B是这个服务器的IP地址(或者是与IP地址做了映射的主机名);

C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的Leader服务器交换信息的端口;

D是在leader挂掉时专门用来进行选举leader所用的端口。

注意:如果是伪集群的配置方式,不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。


2.8 创建myid文件

在dataDir=/usr/local/zookeeper/node-0X/data 下创建myid文件

编辑myid文件,并在对应的IP的机器上输入对应的编号。如在node-01上,myid文件内容就是1,node-02上就是2,node-03上就是3:

#vi /usr/local/zookeeper/node-01/data/myid## 值为1

#vi /usr/local/zookeeper/node-02/data/myid## 值为2

#vi /usr/local/zookeeper/node-03/data/myid## 值为3


2.9 启动测试zookeeper

(1)进入/usr/local/zookeeper/node-0X/bin目录下执行:

#/usr/local/zookeeper/node-01/bin/zkServer.sh start

#/usr/local/zookeeper/node-02/bin/zkServer.sh start

#/usr/local/zookeeper/node-03/bin/zkServer.sh start




(2)输入jps命令查看进程:




其中,QuorumPeerMain是zookeeper进程,说明启动正常

(3)查看状态:

   # /usr/local/zookeeper/node-01/bin/zkServer.sh status




(4)查看zookeeper服务输出信息:

 由于服务信息输出文件在/usr/local/zookeeper/node-0X/bin/zookeeper.out

$ tail -500f zookeeper.out




三、KAFKA集群配置

利用安装zookeeper的三台服务器做KAFKA集群,也可以新建三个虚拟机去操作。

服务器

IP地址

端口

服务器1

172.16.0.41

9092

服务器2

172.16.0.42

9092

服务器3

172.16.0.43

9092

 

4.1 下载 kafka_2.9.2-0.8.1

分别在三台服务器创建kafka目录并且下载kafka压缩包

#mkdir /usr/local/kafka

#tar –zxvf kafka_2.9.2-0.8.1.tar.gz


4.2 创建log文件夹

#mkdir /usr/local/kafka/kafkalogs


4.3 配置kafka

#cd /usr/local/kafka/kafka_2.9.2-0.8.1/config

#vi server.properties  修改项如下:

broker.id=0      //当前机器在集群中的唯一标识

port=9092       //kafka对外提供服务的tcp端口

host.name=172.16.0.41    //主机IP地址

log.dirs=/usr/local/kafka/kafkalogs    //log存放目录

message.max.byte=5048576     //kafka一条消息容纳的消息最大为多少

default.replication.factor=2   //每个分区默认副本数量

replica.fetch.max.bytes=5048576  

zookeeper.connect=172.16.0.41:2181,172.16.0.42:2182,172.16.0.43:2183


4.4 启动kafka

# ./kafka-server-start.sh  -daemon ../config/server.properties   //后台启动运行


4.5 问题解决

[root@master ~]#  /export/kafka/bin/kafka-console-producer.sh  --broker-list 10.14.2.201:9092,10.14.2.202:9092,10.14.2.203:9092,10.14.2.204:9092    --topic test

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

 

 # /export/kafka/bin/kafka-console-consumer.sh  --zookeeper   10.14.2.201:2181,10.14.2.202:2181,10.14.2.203:2181,10.14.2.204:2181  --topic test --from-beginning

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


解决方法:

 下载slf4j-1.7.6.zip

 http://www.slf4j.org/dist/slf4j-1.7.6.zip

 

解压

 unzip slf4j-1.7.6.zip

 把slf4j-nop-1.7.6.jar 包复制到kafka libs目录下面

 cd  slf4j-1.7.6

 cp slf4j-nop-1.7.6.jar  /export/kafka/libs/


四、KAFKA集群验证

5.1 创建topic

#./kafka-topics.sh --create --zookeeper 172.16.0.42:2182 --replication-factor 1 --partitions 1 --topic test


5.2 查看topic

# ./kafka-topics.sh --list --zookeeper 172.16.0.42:2182


5.3 开启发送者并发送消息

#./kafka-console-producer.sh --broker-list 172.16.0.41:9092 --topic test




5.4 开启消费者并接收消息

#./kafka-console-consumer.sh --zookeeper 172.16.0.42:2182 --topic test --from-beginning




更多参考内容:http://www.roncoo.com/article/index

相关文章
|
2月前
|
存储 SQL 关系型数据库
Mysql高可用架构方案
本文阐述了Mysql高可用架构方案,介绍了 主从模式,MHA模式,MMM模式,MGR模式 方案的实现方式,没有哪个方案是完美的,开发人员在选择何种方案应用到项目中也没有标准答案,合适的才是最好的。
249 3
Mysql高可用架构方案
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
1月前
|
消息中间件 缓存 监控
go高并发之路——消息中间件kafka
本文介绍了高并发业务中的流量高峰应对措施,重点讲解了Kafka消息中间件的使用,包括常用的Go语言库sarama及其版本问题,以及Kafka的版本选择建议。文中还详细解释了Kafka生产者的四种分区策略:轮询、随机、按Key和指定分区,并提供了相应的代码示例。
go高并发之路——消息中间件kafka
|
5月前
|
存储 Cloud Native 关系型数据库
PolarDB 高可用架构设计与实践
【8月更文第27天】 在现代互联网应用中,数据库作为核心的数据存储层,其稳定性和可靠性尤为重要。阿里云的 PolarDB 作为一款云原生的关系型数据库服务,提供了高可用、高性能和自动化的特性,适用于各种规模的应用。本文将详细介绍 PolarDB 的高可用架构设计,并探讨其实现数据安全性和业务连续性的关键技术。
141 0
|
2月前
|
Kubernetes 关系型数据库 MySQL
Kubernetes入门:搭建高可用微服务架构
【10月更文挑战第25天】在快速发展的云计算时代,微服务架构因其灵活性和可扩展性备受青睐。本文通过一个案例分析,展示了如何使用Kubernetes将传统Java Web应用迁移到Kubernetes平台并改造成微服务架构。通过定义Kubernetes服务、创建MySQL的Deployment/RC、改造Web应用以及部署Web应用,最终实现了高可用的微服务架构。Kubernetes不仅提供了服务发现和负载均衡的能力,还通过各种资源管理工具,提升了系统的可扩展性和容错性。
162 3
|
3月前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
3月前
|
消息中间件 存储 Kafka
面试题:Kafka如何保证高可用?有图有真相
面试题:Kafka如何保证高可用?有图有真相
|
5月前
|
运维 监控 关系型数据库
【一文搞懂PGSQL】7. PostgreSQL + repmgr + witness 高可用架构
该文档介绍了如何构建基于PostgreSQL的高可用架构,利用repmgr进行集群管理和故障转移,并引入witness节点增强网络故障检测能力。repmgr是一款轻量级的开源工具,支持一键部署、自动故障转移及分布式节点管理。文档详细描述了环境搭建步骤,包括配置postgresql参数、安装与配置repmgr、注册集群节点以及配置witness节点等。此外,还提供了故障手动与自动切换的方法及常用命令,确保集群稳定运行。
|
5月前
|
JSON API 网络架构
Django 后端架构开发:DRF 高可用API设计与核心源码剖析
Django 后端架构开发:DRF 高可用API设计与核心源码剖析
119 1
|
5月前
|
监控 安全 中间件
Python Django 后端架构开发: 中间件架构设计
Python Django 后端架构开发: 中间件架构设计
58 1

相关产品

  • 云消息队列 Kafka 版