Kafka主要特点:
- 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
- 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
- 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
- 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
- 支持online和offline的场景。
几个基本的消息系统术语:
- Kafka将消息以topic为单位进行归纳。
- 将向Kafka topic发布消息的程序成为producers.
- 将预订topics并消费消息的程序成为consumer.
- Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:
创建路径:
mkdir /kafka
## 登陆到zk任一节点执行
su -l admin
/opt/dtstack/zookeeper/bin/zkCli.sh
create /kafka ''
解压安装包:
tar -zxvf kafka_2.11-0.9.0.1.tgz
mv kafka_2.11-0.9.0.1 /opt/dtstack/
ln -s /opt/dtstack/kafka_2.11-0.9.0.1 /opt/dtstack/kafka
chown -R admin.admin /opt/dtstack/kafka_2.11-0.9.0.1/
创建数据目录:
mkdir -p /data/kafka/logs
chown -R admin.admin /data/kafka/
修改kafka-server的配置文件:
vim /opt/dtstack/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
advertised.host.name=1.2.3.4
log.dirs=/data/kafka/logs
num.partitions=2
auto.create.topics.enable=true
log.retention.hours=168
zookeeper.connect=1.2.3.4:2181/kafka
开启服务:
bin/zookeeper-server-start.sh config/zookeeper.properties &
/bin/kafka-server-start.sh &
创建一个名为dt_all_log的topic:(有三个partitions)
bin/kafka-topics.sh --create --zookeeper 1.2.3.4:2181/kafka --replication-factor 2 --partitions 3 --topic dt_all_log
查看所有topic分区情况:
bin/kafka-topics.sh --zookeeper 1.2.3.4:2181/kafka
查看指定topic的分区情况:
bin/kafka-topics.sh --zookeeper 1.2.3.4:2181/kafka --describe --topic dt_all_log
为Topic增加 partition数目:
bin/kafka-add-partitions.sh --topic dt_all_log --partition 2 --zookeeper 1.2.3.4:2181 (为topic
dt_all_log
增加2个分区)
删除topic(慎用):
只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 1.2.3.4:2181