Kafka官方给出的定义是:Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. (Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。)
大数据领域中我们常用kakfa来构建流处理数据管道,与Spark或者Flink对接。
搭建Kafka集群,我们选用的kafka版本是kafka_2.12-2.4.1,Zookeeper版本为3.6.3。
zookeeper配置
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper/data dataLogDir=/tmp/zookeeper/log # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true server.0=10.60.**.**:2888:3888 server.1=10.60.**.**:2888:3888 server.2=10.60.**.**:2888:3888
默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录。
第四步,在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。
启动
./zkServer.sh start
查看zookeeper服务状态
./zkServer.sh status
三个节点成功部署的话,jps后会有一个Leader和两个follower。
Kafka配置
修改 server.properties
# 指定broker的id broker.id=0 # 指定Kafka数据的位置 log.dirs=/usr/local/kafka/data # 配置zk的三个节点 zookeeper.connect=10.60.**.**:2181,10.60.**.**:2181,10.60.**.**:2181
将安装好的kafka复制到另外两台服务器,修改另外两个节点的broker.id分别为1和2
配置KAFKA_HOME环境变量
export KAFKA_HOME=/usr/local/kafka export PATH=:$PATH:${KAFKA_HOME}
启动服务器
nohup bin/kafka-server-start.sh config/server.properties &
由于集群的服务器可能很多,手动启动比较麻烦,可以写一个一键启动和关闭的shell脚本:
在kafkalist文件中写入需要启动的服务器ip
cat /root/myshell/kafkalist | while read line do { echo $line ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & " }& wait done
示例demo
我们创建一个topic,用java代码创建一个kafka producer向topic中发送数据。
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --create --topic topic-demo --replication-factor 3 --partitions 4
查看该topic
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --describe --topic topic-demo
Java Producer:
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * * 1. 创建用于连接Kafka的Properties配置 * 2. 创建一个生产者对象KafkaProducer * 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值 * 4. 再调用一个Future.get()方法等待响应 * 5. 关闭生产者 */ public class KafkaProducerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "117.50.**.**:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); // 3. 发送1-100的消息到指定的topic中 for(int i = 0; i < 10000000; ++i) { // 一、使用同步等待的方式发送消息 // // 构建一条消息,直接new ProducerRecord // ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + ""); // Future<RecordMetadata> future = kafkaProducer.send(producerRecord); // // 调用Future的get方法等待响应 // future.get(); // System.out.println("第" + i + "条消息写入成功!"); // 二、使用异步回调的方式发送消息 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-demo", null, i + ""); kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 1. 判断发送消息是否成功 if(exception == null) { // 发送成功 // 主题 String topic = metadata.topic(); // 分区id int partition = metadata.partition(); // 偏移量 long offset = metadata.offset(); System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset); } else { // 发送出现错误 System.out.println("生产消息出现异常!"); // 打印异常消息 System.out.println(exception.getMessage()); // 打印调用栈 System.out.println(exception.getStackTrace()); } } }); } // 4.关闭生产者 kafkaProducer.close(); } }
消费数据
[root@master kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic-demo --from-beginning 441665 441666 441667 441668 441669 441670 441671 441672 441673 441674 441675 441676 441677 441678 441679 441680 441681 441682 441683
可以看到数据已经写入。