Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
一、消息队列
1、为什么要有消息队列
2、什么是消息队列
MQ(Message Quene) : 翻译为 消息队列
,通过典型的 生产者和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
- 消息 Message
网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。
- 队列 Queue
一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO)。入队、出队。
- 消息队列 MQ
消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。
3、消息中间件的分类
当今市面上有很多主流的消息中间件,如老牌的 ActiveMQ 、 RabbitMQ ,炙手可热的Kafka ,阿里巴巴自主开发 RocketMQ 等。
Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10kb)的性能比RabbitMQ好,长消息性能比RabbitMQ差
Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
RabbitMQ Erlang编写,支持多协议AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式。
RabbitMQ前期我已经写过博客:
🍅基础版本:RabbitMQ安装以及消息模型使用攻略
🍅SpringBoot版本:SpringBoot中使用RabbitMQ
4、消息队列的分类
MQ主要分为两类:点对点(p2p),发布订阅(Pub/Sub
)
- Peer-to-Peer 一般基于Pull或者Polling接收数据 发送到队列中的消息被一个而且仅仅一个接收者所接受,即使有多个接收者在同一个队列中侦听同一消息 即支持异步“即发即收”的消息传递方式,也支持同步请求/应答传送方式
- 发布订阅 发布到同一个主题的消息,可被多个订阅者所接收 发布/订阅即可基于Push消费数据,也可基于Pull或者Polling消费数据 解耦能力比P2P模型更强
5、p2p和发布订阅MQ的比较
- 共同点
消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。 - 不同点
1. P2P模型包括:消息队列(Queue),发送者(Sender)、接收者(Recevier) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。 2.pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。
6、消息队列的使用场景
- 解耦 :各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
- **冗余 **:部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
- 扩展 :消息系统是统一的数据接口,各系统可独立扩展
- 峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
- **可恢复性 **系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据
- **异步通信 **在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理
二、KafKa简介
1、什么是KafKa
Kafka是分布式的发布——订阅消息系统,它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
三大特点:
- 高吞吐量
可以满足每秒百万级别消息的生产和消费——生产消费。
- 持久性
有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
- 分布式
基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
- 健壮性
2、设计目标
- 高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写
- 消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放
- 完全分布式 Producer,Broker,Consumer均支持水平扩展
- 同时适应在线流处理和离线批处理
3、Kafka核心的概念
(1)一个消息队列需要哪些部分呢?
生产、消费、消息类别、存储等等。 对于kafka而言,kafka服务就像是一个大的水池。不断的生产、存储、消费着各种类别的消息。
(2)那么kafka由何组成呢?
- Kafka服务:
1. Topic:主题,Kafka处理的消息的不同分类。 2. Broker:消息服务器代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬 盘中。每个topic都是有分区的。 3. Partition:Topic物理上的分组(part划分),一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候 指定。 4. Message:消息,是通信的基本单位,每个消息都属于一个partition
Kafka服务相关:
1. Producer:消息和数据的生产者,向Kafka的一个topic发布消息。 2. Consumer:消息和数据的消费者,定于topic并处理其发布的消息。 3. Zookeeper:协调kafka的正常运行。
三、KafKa的分布式安装(部署三台)
1、版本下载
- 安装包:http://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
- 源码包:http://archive.apache.org/dist/kafka/1.1.1/kafka-1.1.1-src.tgz
2、安装Kafka的相关前提
(1)安装jdk
- 前提:由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。
yum install java-1.8.0-openjdk* -y
(2)安装Zookeeper
- kafka之前的版本依赖zookeeper,所以需要先安装zookeeper,
现在kafka最新版本不在依赖于zk了!!!
但是我们目前的kafka是需要zookeeper的 - centos7安装zookeeper3.4.12集群
- 将某台的zookper移动到另外一台linux上
scp -r /usr/soft/apache-zookeeper-3.6.3/ 192.168.5.130:/usr/soft/
集群配置
dataDir=/usr/soft/apache-zookeeper-3.6.3/data dataLogDir=/usr/soft/apache-zookeeper-3.6.3/log server.1=192.168.5.128:2888:3888 server.2=192.168.5.130:2888:3888 server.3=192.168.5.131:2888:3888
3、安装过程
- 从官网下载一个Kafka稳定版本,这里采用的是Kafka 2.11-1.1.1版本 Apache Kafka
- 解压文件:tar -zxvf kafka_2.11-1.1.1.tgz
- (1)解压
[root@localhost soft]# tar -zxvf kafka_2.11-1.1.1.tgz -C /usr/apps/
2)重命名
mv kafka_2.11-1.1.1 /usr/apps/kafka
(3)添加环境变量(这个文件最初是没有的)
- 参考文章:Linux-使用 /etc/profile.d/ 去配置 Hadoop的环境变量
- 在
/etc/profile.d/
中添加 my_env.sh文件, 此sh文件名称随意噢
vim /etc/profile.d/hadoop-etc.sh
export KAFKA_HOME=/usr/apps/kafka export PATH=$PATH:$KAFKA_HOME/bin
- 更新
source /etc/profile.d/hadoop-etc.sh
(4)配置
- 修改/usr/apps/kafka/config/server.properties
1. vim /usr/apps/kafka/config/server.properties
## 当前kafka实例的id,必须为整数,一个集群中不可重复 broker.id=0 ## 生产到kafka中的数据存储的目录,目录需要手动创建 log.dirs=/usr/apps/data/kafka ## kafka数据在zk中的存储目录 zookeeper.connect=1192.168.5.128:2181 #设置zookeeper的连接端口 zookeeper.connect=192.168.5.128:2181,192.168.5.130:2181,192.168.5.131:2181 #设置zookeeper的连接超时时间 zookeeper.connection.timeout.ms=6000
- 在bin/kafka-server-start.sh文件中,设置服务器可用内存大小,内存不足时,启动会报:error='Cannot allocate memory'
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
分发安装包到另外两个虚拟机
1. scp -r /usr/apps/kafka/ 192.168.5.130:/usr/apps/kafka 2. scp -r /usr/apps/kafka/ 192.168.5.131:/usr/apps/kafka
修改192.168.5.130,192.168.5.131节点kafka配置文件conf/server.properties里面的broker.id和listeners的值分别为去1,2
四、Kafka集群启动
1、执行脚本启动zookeeper
- 所有zookeeper节点都需要执行
zkServer.sh start
- 显示以下信息表示启动正常
2、后台启动Kafka集群服务
- 所有Kafka节点都需要执行
[root@localhost kafka]# ./bin/kafka-server-start.sh config/server.properties &
- 启动成功