应用场景
按照搭建hadoop完全分布式集群博文搭建完hadoop集群后,发现hadoop完全分布式集群自带了HDFS,MapReduce,Yarn等基本的服务,一些其他的服务组件需要自己重新安装,比如Hive,Hbase,sqoop,zookeeper,spark等,这些组件集群模式都在前面相关博文中有介绍,今天我们需要安装另外一个组件,它就是分布式消息系统Kafka。
Kafka介绍
Kafka是由LinkedIn开发的一个分布式基于发布/订阅的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。
Kafka是什么?举个例子,生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、http什么的),也称为报文,也叫“消息”。消息队列满了,其实就是篮子满了,”鸡蛋“放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
就类似微博,有人发布消息,有人消费消息,这就是一个Kafka的场景。
Kafka和其他主流分布式消息系统的对比
- 定义:
-
1、Java 和 scala都是运行在JVM上的语言。
2、erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言,所以RabbitMQ天生就有很高的并发性能,但是 有RabbitMQ严格按照AMQP进行实现,受到了很多限制。kafka的设计目标是高吞吐量,所以kafka自己设计了一套高性能但是不通用的协议,他也是仿照AMQP( Advanced Message Queuing Protocol 高级消息队列协议)设计的。
3、事物的概念:在数据库中,多个操作一起提交,要么操作全部成功,要么全部失败。举个例子, 在转账的时候付款和收款,就是一个事物的例子,你给一个人转账,你转成功,并且对方正常行收到款项后,这个操作才算成功,有一方失败,那么这个操作就是失败的。
对应消在息队列中,就是多条消息一起发送,要么全部成功,要么全部失败。3个中只有ActiveMQ支持,这个是因为,RabbitMQ和Kafka为了更高的性能,而放弃了对事物的支持 。
4、集群:多台服务器组成的整体叫做集群,这个整体对生产者和消费者来说,是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。
5、负载均衡,对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息,系统必须均衡这些请求使得每一台服务器的请求达到平衡,而不是大量的请求,落到某一台或几台,使得这几台服务器高负荷或超负荷工作,严重情况下会停止服务或宕机。
6、动态扩容是很多公司要求的技术之一,不支持动态扩容就意味着停止服务,这对很多公司来说是不可以接受的。
操作步骤
1. 安装Kafka之前需要安装zookeeper集群
2. 下载Kafka安装压缩包
Kafka2.11安装包下载地址,下载完毕后,上传到主节点的/opt目录下
Kafka其他版本安装包下载地址
3. 解压Kafka并更换目录名
# cd /opt
# tar -xzvf kafka_2.11-0.10.2.1.tgz
# mv kafka_2.11-0.10.2.1 kafka2.11
# chmod 777 -R /opt/kafka2.11 #为kafka目录进行授权
4. 配置环境变量
# vim /etc/profile
export KAFKA_HOME=/opt/kafka2.11
export PATH=$PATH:$KAFKA_HOME/bin #在最后添加这两行Kafka的配置
# source /etc/profile #使配置生效
5. 修改Kafka的server.properties配置
# cd /opt/kafka2.11/config
# vim server.properties
修改如下:
broker.id=1 #每台服务器的broker.id都不能相同
message.max.byte=5242880 #在log.retention.hours=168属性下加上如下三行配置
default.replication.factor=3
replica.fetch.max.bytes=5242880
zookeeper.connect=192.168.210.70:2181,192.168.210.71:2181,192.168.210.72:2181 #修改zookeeper引用外部的zookeeper
6. 将上述在其余节点也布置一下
# cd /opt
# scp -r kafka2.11 root@hadoop1:/opt/
# scp -r kafka2.11 root@hadoop2:/opt/
注意:修改环境变量,修改配置文件的broker.id
7. 启动服务
#从后台启动Kafka集群(3台都需要启动)
# cd /opt/kafka2.11/bin #进入到kafka的bin目录
# ./kafka-server-start.sh -daemon ../config/server.properties
主节点:
两个从节点: