KAFKA集群搭建

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

一、简介  

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。

  Kafka适合做什么? 官方文档介绍,它通常被使用在两大类应用中:

  1. 搭建实时数据流管道,在系统或应用之间可靠的获取数据

  2. 搭建对数据流进行转换或相应的实时流应用程序、

  为了了解Kafka具体如何实现这些功能, 首先理解几个概念:

  1. Kafka是作为集群,运行在一台或多台服务器上的.

  2. Kafka集群用主题(topics)来分类别储存数据流(records).

  3. 每个记录(record)由一个键(key),一个值(value)和一个时间戳(timestamp)组成

  Kafka有4个核心APIs:

  1. Producer API负责生产数据流,允许应用程序将记录流发布到一个或多个Kafka主题(topics).

  2. Consumer API负责使用数据流,允许应用程序订阅一个或多个主题并处理为其生成的数据流.

  3. Streams API负责处理或转化数据流,允许应用程序充当数据流处理器的角色, 处理来自一个或多个主题的输入数据流,并产生输出数据流到一个或多个输出主题,一次来有效地将输入流转换成输出流.

  4. Connector API负责将数据流与其他应用或系统结合,允许搭建建和运行可重复使用的生产者或消费者,将Kafka数据主题与现有应用程序或数据系统相连接的。 例如,关系数据库的连接器可能会将表的每个更改的事件,都捕获为一个数据流.

二、环境准备

  kafka集群的搭建是建立在jdk和zookeeper集群环境之上的;文中环境在Ubuntu1404系统上搭建;

1)安装JAVA

1
2
3
4
5
6
7
# add-apt-repository ppa:webupd8team/java 
# apt-get update
# apt-get install oracle-java8-installer 
# java -version //检验Java版本
java version  "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

2)安装ZOOKEEPER集群

  zookeeper集群搭建可以参考前一篇博文:zookeeper集群搭建,也可以使用单机版的zookeeper,实际生产环境最好使用集群保证高可用性。

3)机器列表,/etc/hosts文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
172.30.100.1 kafka-001
172.30.100.2 kafka-002
172.30.100.3 kafka-003
172.30.100.4 kafka-004
172.30.100.5 kafka-005
 
172.30.100.21 zookeeper-001
172.30.100.22 zookeeper-002
172.30.100.23 zookeeper-003
172.30.100.24 zookeeper-004
172.30.100.25 zookeeper-005

三、Kafka集群搭建

1)下载相应版本kafka软件包:

  官方0.10.2.0版本下载链接如下:

http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz

这里需要注意kafka的版本,下载之前一定要确保你的client端是否支持当前kafka server的版本,否则就得重新安装。

这里给出官方提供的其他几个下载链接,包括其他版本的都在下面的地址里能够找到。

https://www.apache.org/dyn/closer.cgi#verify

2)配置kafka集群

  下载完之后解压,并配置:

1
2
3
tar  -xzf kafka_2.11-0.10.2.0.tgz
cd  kafka_2.11-0.10.2.0
> vim config /server .properties

kafka的配置文件示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#唯一标识在集群中的ID,要求是正数。
broker. id =1
delete.topic. enable = true
# 监听地址和端口号
listeners=PLAINTEXT: //172 .30.100.1:9092
# 处理网络请求的最大线程数
num.network.threads=9
# 处理磁盘I/O的线程数
num.io.threads=16
# # leader中进行复制的线程数,增大这个数值会增加relipca的IO
num.replica.fetchers=3
#配置log的文件目录,前提确保目录存在
log. dirs = /data/kafka-logs
# 每个topic的分区个数,更多的partition会产生更多的segment file
num.partitions=2
# 配置zookeeper服务的地址
zookeeper.connect=zookeeper-001:2181,zookeeper-002:2181,zookeeper-003:2181,zookeeper-004:2181,zookeeper-005:2181

这里注意的地方:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上kafka。

  复制server.properties到其他kafka节点的机器上,并修改broker.id,listeners。

3)服务的启动和停止

  启动服务:

1
bin /kafka-server-start .sh -daemon config /server .properties

注意一定要加-daemon选项,不然终端退出,服务会随之退出。  

  停止服务:

1
  bin /kafka-server-stop .sh

四、Kafka集群测试

1)创建topic:

1
> bin /kafka-topics .sh --create --zookeeper zookeeper-001:2181 --replication-factor 3 --partitions 2 --topic  test

这里的--zookeeper可以随机指定一个zookeeper的地址。

检测topic是否创建成功:

1
2
> bin /kafka-topics .sh --list --zookeeper zookeeper-001:2181
test

查看topic描述信息:

1
2
3
4
5
6
7
  bin /kafka-topics .sh --describe --zookeeper zookeeper-001:2181 --topic  test
Topic:CloudMonitor  PartitionCount:5    ReplicationFactor:3 Configs:
     Topic:  test     Partition: 0   Leader: 2  Replicas: 2,3,4    Isr: 2,3,4
     Topic:  test     Partition: 1   Leader: 3  Replicas: 3,4,5    Isr: 3,4,5
     Topic:  test     Partition: 2   Leader: 4  Replicas: 4,1,2    Isr: 4,1,2
     Topic:  test     Partition: 3   Leader: 5  Replicas: 5,2,3    Isr: 5,3,2
     Topic:  test     Partition: 4   Leader: 1  Replicas: 1,3,4    Isr: 1,3,4

2)生产消息:

1
2
3
> bin /kafka-console-producer .sh --broker-list localhost:9092 --topic  test
This is a message
This is another message

3)消费消息:

1
2
3
> bin /kafka-console-consumer .sh --bootstrap-server localhost:9092 --topic  test  --from-beginning
This is a message
This is another message

会看到刚才生产的两条messsage。

结尾:

  结尾附上几个常用Message Queue的对比,摘自网络:

RabbitMQ

  RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

Redis

  Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

  ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够

应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。

ActiveMQ

  ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

Kafka/Jafka

  Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。



      本文转自Jx战壕  51CTO博客,原文链接:http://blog.51cto.com/xujpxm/1934487,如需转载请自行联系原作者




相关文章
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
112 4
|
3月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
141 2
|
19天前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
23天前
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
52 0
|
2月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
3月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
123 6
|
5月前
|
消息中间件 Kafka 测试技术
【Kafka揭秘】Leader选举大揭秘!如何打造一个不丢失消息的强大Kafka集群?
【8月更文挑战第24天】Apache Kafka是一款高性能分布式消息系统,利用分区机制支持数据并行处理。每个分区含一个Leader处理所有读写请求,并可有多个副本确保数据安全与容错。关键的Leader选举机制保障了系统的高可用性和数据一致性。选举发生于分区创建、Leader故障或被手动移除时。Kafka提供多种选举策略:内嵌机制自动选择最新数据副本为新Leader;Unclean选举快速恢复服务但可能丢失数据;Delayed Unclean选举则避免短暂故障下的Unclean选举;Preferred选举允许基于性能或地理位置偏好指定特定副本为首选Leader。
115 5
|
5月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
121 2
|
6月前
|
消息中间件 Kafka
kafka 集群环境搭建
kafka 集群环境搭建
79 8

相关实验场景

更多