一、启动kafka集群
(1)先启动Zookeeper集群,然后启动Kafka。
[atguigu@hadoop102 kafka]$ zk.sh start
(2)依次在hadoop102、hadoop103、hadoop104节点上启动Kafka。
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
注意:配置文件的路径要能够到server.properties。
8)关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
集群启停脚本
1)在/home/atguigu/bin目录下创建文件kf.sh脚本文件
cd /root/bin
[atguigu@hadoop102 bin]$ vim kf.sh
脚本如下: #! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac
2)添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf.sh
3)启动集群命令
[atguigu@hadoop102 ~]$ kf.sh start
4)停止集群命令
[atguigu@hadoop102 ~]$ kf.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
配置环境变量
(1)在/etc/profile文件中增加kafka环境变量配置
[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh
增加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
(2)刷新一下环境变量。
[atguigu@hadoop102 module]$ source /etc/profile
(3)分发环境变量文件到其他节点,并source。
[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
[atguigu@hadoop103 module]$ source /etc/profile
[atguigu@hadoop104 module]$ source /etc/profile
实时计算中,基本计算模式:数据源持续不断生成数据,计算系统持续不断处理数据,这其中的一个隐含之意:数据源写入数据的顺序,要与计算系统读取数据的顺序保持一致 kafka是一个消息缓存系统,主要应用于大数据流式计算的数据缓冲,在其中起的作用有两点:数据源和计算引擎之间的解耦,以及这两者之间的削峰填谷,但因为kafka是一个集群,无法100%确保读写的前后顺序严格一致,它可以保证分区内的数据读写顺序一致
kafka中数据的分类隔离概念:topic(主题) 数据会以topic来进行底层的分割,然后topic内,会被分割成若干个partition,每个partition都可以有多个分区, partititon的多个副本中,大家地位各不相同,其中一定有一个副本learder 其他的可以是follower,也可以是observer
二、kafka基础架构
1.主题命令行操作
kafka-topics.sh
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
--create |
创建主题。 |
--delete |
删除主题。 |
--alter |
修改主题。 |
--list |
查看所有主题。 |
--describe |
查看主题详细描述。 |
--partitions <Integer: # of partitions> |
设置分区数。 |
--replication-factor<Integer: replication factor> |
设置分区副本。 |
--config <String: name=value> |
更新系统默认的配置。 |
1.查看first主题的详情
kafka-topics.sh --bootstrap-server node1:9092 --describe --topic first
2.查看当前服务器中的所有topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3.创建first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
4.修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
5.删除topic(学生自己演示)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2.生产者命令行操作
bin/kafka-console-producer.sh
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
2)发送消息 [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first >hello world >atguigu atguigu
3.消费者命令行操作
1)查看操作消费者命令参数
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
参数 |
描述 |
--bootstrap-server <String: server toconnect to> |
连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> |
操作的topic名称。 |
--from-beginning |
从头开始消费。 |
--group <String: consumer group id> |
指定消费者组名称。 |
2)消费消息
(1)消费first主题中的数据。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
问题:
first的partition 0的leader副本在哪台broker 1
同一台broker是否可以管理多个不同的partititon的leader副本
,
topic 的分区数量,以及每个分区的副本数量,以及每个副本所在的 broker
节点,以及每个分区的 leader 副本所在 broker 节点,以及每个分区的 ISR 副本列表;
ISR: in sync replicas 同步副本(当然也包含 leader 自身 replica.lag.time.max.ms =10000
OSR:out of sync replicas 失去同步的副本(该副本上次请求 leader 同步数据距现在的时间间隔超
出配置阈值)
p1的leader在哪个broker 2
spark和mr的容错能力体现在哪里?
task失败,会被重试,如果在yarn上运行的话,就是去重新申请container,来重跑失败的那个task rdd的血缘,checkpoint,shuffle数据落盘
kafka的读写操作,实际开发中,是通过各类更上层的组件实现、
而这些组件去读写kafka数据时,用的当然是kafka的java api操作,比如fink,spaarkstreaming,flume
1.构造生产者,可以持续不断产生数据
2.哪些参数必须得有?
bootstrap.server
key.serivalizer
value.serivalizer
其他的可选
3.kafka的生产者发送用户数据时,是可以使用jdk的序列化框架序列化用户数据吗
不可以,序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializer
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
Note that altering partition numbers while setting this config to latest may cause message delivery loss since producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets.
kafka消费者的起始消费位置有两种决定机制: 1.手动设置了起始位置,它肯定从你指定的位置开始 2.如果你没有手动指定起始位置,它去找消费组之前所记录的偏移量开始 3.如果之前的位置也获取不到,就看参数:auto.offset所指定的重置策略 Iterable 可迭代的 iterable是迭代器的再封装,叫:可迭代的 实现了iterble的对象,可以用增强for循环去遍历迭代 也可以从对象上取到iterator,来用iterator hasnext来迭代