1、本机环境
操作系统:ubuntu 12.04 需安装: java的环境,安装过程可参考: http://blog.csdn.net/u014388408/article/details/50587438
2、 Zookeeper集群搭建
(1)下载zookeeper安装
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz
然后解压:
tar -xvf zookeeper-3.3.6.tar.gz -C /opt/amqbroker(需要解压的路径)
修改配置文件:
zoo_sample.cfg 修改文件名为 zoo.cfg
修改zoo.cfg 配置文件内容为
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/amqbroker/zookeeper/data dataLogDir=/opt/amqbroker/zookeeper/log clientPort=2181 server.one=192.168.0.100:2888:3888 server.two=192.168.0.101:2888:3888 server.three=192.168.0.102:2888:3888
然后在/opt/amqbroker/zookeeper/data目录下创建myid文件,在文件中写入当前机器的id,例如配置中server.one=192.168.0.100:2888:3888,在myid文件写入字符 “one” 保存退出。
另外2台机器和这台机器的配置一样,myid 写各自服务器的id名。
3、Kafka 安装
下载地址: wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz 解压: tar -xvf kafka_2.11-0.9.0.1.tgz
进入kafka_2.11-0.9.0.1.tgz/config/目录下
vi server.properties 修改一下几处:
broker.id=11 //注意,id名,各个服务器配置一个不相同的名字。 hostname.name=192.168.0.100 port=9092 advertised.host.name=192.168.0.100 advertised.port=9092 以及集群的配置 zookeeper.connect=192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181
然后在启动kafka,两外2台服务器也采用以上配置,分别启动kafka。
bin/kafka-server-start.sh config/server.properties
接下来创建两个分区,两个副本的Topic。
bin/kafka-topics.sh --create --zookeeper 192.168.0.100:2181,192.168.0.101:2181,192.168.0.102:2181 --replication-factor 2 --partitions 2 --topic kafkatest
查看Topic(test)的状态
bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
输出结果:
Topic:test PartitionCount:2 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 158 Replicas: 0,158 Isr: 158 Topic: test Partition: 1 Leader: 111 Replicas: 111,0 Isr: 111
Topoc(test)有两个分区 0,1
分区0:处于leader服务器的是broker的id为158 Replicas(副本)为0,158两台服务器。 Isr(in-sync replicas): 副本列表,158
4、用JAVA程序来测试消息的生产和消费。
(1)util工具类
package com.kafka; import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; public class KafkaUtil { private static KafkaProducer<String, String> kp; private static KafkaConsumer<String, String> kc; public static KafkaProducer<String, String> getProducer() { if (kp == null) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new KafkaProducer<String, String>(props); } return kp; } public static KafkaConsumer<String, String> getConsumer() { if(kc == null) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.100:9092,192.168.0.101:9092,192.168.0.102:9092"); props.put("group.id", "0"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kc = new KafkaConsumer<String, String>(props); } return kc; } }
(2)Producer类
package com.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaTest { public static void main(String[] args) throws Exception{ Producer<String, String> producer = KafkaUtil.getProducer(); int i = 0; while(true) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) e.printStackTrace(); System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); i++; Thread.sleep(1000); } } }
(3)Consumer 类
package com.kafka; import java.util.Arrays; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaTest1 { public static void main(String[] args) throws Exception{ KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(); consumer.subscribe(Arrays.asList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); for(ConsumerRecord<String, String> record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } } } }
运行 producer端 类,结果如下:
message send to partition 0, offset: 306 message send to partition 0, offset: 307 message send to partition 0, offset: 308 message send to partition 0, offset: 309 message send to partition 0, offset: 310 message send to partition 0, offset: 311 message send to partition 0, offset: 312 message send to partition 0, offset: 313
然后运行 Consumer端 类,能看到打印的消息即可。
5、 Kafka集群高可用性测试
(1)查看当前副本及状态
bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
输出结果
Topic:test PartitionCount:2 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 158 Replicas: 0,158 Isr: 158 Topic: test Partition: 1 Leader: 111 Replicas: 111,0 Isr: 111
然后停掉一个broker服务器,如158,在查看当前副本状态,Leader状态会变成其他borker服务器,会通过一种选举策略生成一个新的Leader。