zookeeper+kafka 集群和高可用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: zookeeper+kafka 集群和高可用

1、本机环境

操作系统:ubuntu 12.04
    需安装:
     java的环境,安装过程可参考:
         http://blog.csdn.net/u014388408/article/details/50587438

2、 Zookeeper集群搭建

(1)下载zookeeper安装

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

然后解压:

tar -xvf  zookeeper-3.3.6.tar.gz -C /opt/amqbroker(需要解压的路径)

修改配置文件:

zoo_sample.cfg 修改文件名为 zoo.cfg

修改zoo.cfg 配置文件内容为

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/amqbroker/zookeeper/data
dataLogDir=/opt/amqbroker/zookeeper/log
clientPort=2181
server.one=192.168.0.100:2888:3888
server.two=192.168.0.101:2888:3888
server.three=192.168.0.102:2888:3888

然后在/opt/amqbroker/zookeeper/data目录下创建myid文件,在文件中写入当前机器的id,例如配置中server.one=192.168.0.100:2888:3888,在myid文件写入字符 “one” 保存退出。

另外2台机器和这台机器的配置一样,myid 写各自服务器的id名。

3、Kafka 安装

下载地址: wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz 
解压: tar -xvf kafka_2.11-0.9.0.1.tgz

进入kafka_2.11-0.9.0.1.tgz/config/目录下

vi server.properties 修改一下几处:
broker.id=11               //注意,id名,各个服务器配置一个不相同的名字。
hostname.name=192.168.0.100
port=9092
advertised.host.name=192.168.0.100
advertised.port=9092
以及集群的配置
zookeeper.connect=192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181

然后在启动kafka,两外2台服务器也采用以上配置,分别启动kafka。

bin/kafka-server-start.sh config/server.properties

接下来创建两个分区,两个副本的Topic。

bin/kafka-topics.sh --create --zookeeper 192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181 --replication-factor 2 --partitions 2 --topic kafkatest

查看Topic(test)的状态

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

输出结果:

Topic:test  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: test Partition: 0    Leader: 158 Replicas: 0,158 Isr: 158
Topic: test Partition: 1    Leader: 111 Replicas: 111,0 Isr: 111

Topoc(test)有两个分区 0,1

分区0:处于leader服务器的是broker的id为158
      Replicas(副本)为0,158两台服务器。
      Isr(in-sync replicas): 副本列表,158

4、用JAVA程序来测试消息的生产和消费。

(1)util工具类

package com.kafka;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaUtil {  
    private static KafkaProducer<String, String> kp;  
    private static KafkaConsumer<String, String> kc;  
    public static KafkaProducer<String, String> getProducer() {  
        if (kp == null) {  
            Properties props = new Properties();  
            props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092");  
            props.put("acks", "1");  
            props.put("retries", 0);  
            props.put("batch.size", 16384);  
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
            kp = new KafkaProducer<String, String>(props);  
        }  
        return kp;  
    }  
    public static KafkaConsumer<String, String> getConsumer() {  
        if(kc == null) {  
            Properties props = new Properties();  
            props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092");  
            props.put("group.id", "0");  
            props.put("enable.auto.commit", "true");  
            props.put("auto.commit.interval.ms", "1000");  
            props.put("session.timeout.ms", "30000");  
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
            kc = new KafkaConsumer<String, String>(props);  
        }  
        return kc;  
    }  
}

(2)Producer类

package com.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaTest {
    public static void main(String[] args) throws Exception{
    Producer<String, String> producer = KafkaUtil.getProducer();
        int i = 0;
        while(true) {
            ProducerRecord<String, String> record = new    ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
   producer.send(record, new Callback() {
   public void onCompletion(RecordMetadata metadata, Exception e) {
       if (e != null)
        e.printStackTrace();
        System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
            i++;
            Thread.sleep(1000);
        }
    }
}

(3)Consumer 类

package com.kafka;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaTest1 {
     public static void main(String[] args) throws Exception{  
            KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();  
            consumer.subscribe(Arrays.asList("test"));  
while(true) {  
ConsumerRecords<String, String> records = consumer.poll(1000);  
       for(ConsumerRecord<String, String> record : records) {  
          System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());  
                }  
            }  
        }  
}

运行 producer端 类,结果如下:

message send to partition 0, offset: 306
message send to partition 0, offset: 307
message send to partition 0, offset: 308
message send to partition 0, offset: 309
message send to partition 0, offset: 310
message send to partition 0, offset: 311
message send to partition 0, offset: 312
message send to partition 0, offset: 313

然后运行 Consumer端 类,能看到打印的消息即可。

5、 Kafka集群高可用性测试

(1)查看当前副本及状态

bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

输出结果

Topic:test  PartitionCount:2    ReplicationFactor:2 Configs:
    Topic: test Partition: 0    Leader: 158 Replicas: 0,158 Isr: 158
    Topic: test Partition: 1    Leader: 111 Replicas: 111,0 Isr: 111

然后停掉一个broker服务器,如158,在查看当前副本状态,Leader状态会变成其他borker服务器,会通过一种选举策略生成一个新的Leader。

相关文章
|
3月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
144 4
|
4月前
|
消息中间件 运维 算法
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
325 1
|
4月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
206 2
|
2月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
2月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
2月前
|
消息中间件 存储 Kafka
2024最全Kafka集群方案汇总
Apache Kafka 是一个高吞吐量、可扩展、可靠的分布式消息系统,广泛应用于数据驱动的应用场景。Kafka 支持集群架构,具备高可用性和容错性。其核心组件包括 Broker(服务器实例)、Topic(消息分类)、Partition(有序消息序列)、Producer(消息发布者)和 Consumer(消息消费者)。每个分区有 Leader 和 Follower,确保数据冗余和高可用。Kafka 2.8+ 引入了不依赖 Zookeeper 的 KRaft 协议,进一步简化了集群管理。常见的集群部署方案包括单节点和多节点集群,后者适用于生产环境以确保高可用性。
104 0
|
4月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
138 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
3月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
4月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
144 6
|
4月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
103 1

热门文章

最新文章