kafka

本文涉及的产品
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: kafka

一、启动kafka集群

(1)先启动Zookeeper集群,然后启动Kafka。

[atguigu@hadoop102   kafka]$ zk.sh start

(2)依次在hadoop102hadoop103hadoop104节点上启动Kafka。

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

注意:配置文件的路径要能够到server.properties

8关闭集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh

[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh

[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh

集群启停脚本

1)在/home/atguigu/bin目录下创建文件kf.sh脚本文件

cd /root/bin

[atguigu@hadoop102 bin]$ vim kf.sh

脚本如下:
#! /bin/bash
case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

2)添加执行权限

[atguigu@hadoop102 bin]$ chmod +x kf.sh

3)启动集群命令

[atguigu@hadoop102 ~]$ kf.sh start

4)停止集群命令

[atguigu@hadoop102 ~]$ kf.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

image.png

配置环境变量

(1)在/etc/profile文件中增加kafka环境变量配置

[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh

增加如下内容:

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

(2)刷新一下环境变量。

[atguigu@hadoop102 module]$ source /etc/profile

(3)分发环境变量文件到其他节点,并source。

[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

[atguigu@hadoop103 module]$ source /etc/profile

[atguigu@hadoop104 module]$ source /etc/profile

实时计算中,基本计算模式:数据源持续不断生成数据,计算系统持续不断处理数据,这其中的一个隐含之意:数据源写入数据的顺序,要与计算系统读取数据的顺序保持一致
kafka是一个消息缓存系统,主要应用于大数据流式计算的数据缓冲,在其中起的作用有两点:数据源和计算引擎之间的解耦,以及这两者之间的削峰填谷,但因为kafka是一个集群,无法100%确保读写的前后顺序严格一致,它可以保证分区内的数据读写顺序一致
kafka中数据的分类隔离概念:topic(主题)
数据会以topic来进行底层的分割,然后topic内,会被分割成若干个partition,每个partition都可以有多个分区,
partititon的多个副本中,大家地位各不相同,其中一定有一个副本learder
其他的可以是follower,也可以是observer

二、kafka基础架构

image.png

1.主题命令行操作

kafka-topics.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--create

创建主题。

--delete

删除主题。

--alter

修改主题。

--list

查看所有主题。

--describe

查看主题详细描述。

--partitions <Integer: # of partitions>

设置分区数。

--replication-factor<Integer: replication factor>

设置分区副本。

--config <String: name=value>

更新系统默认的配置。


1.查看first主题的详情

 kafka-topics.sh --bootstrap-server node1:9092 --describe --topic first

2.查看当前服务器中的所有topic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

3.创建first topic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first

4.修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

5.删除topic(学生自己演示)

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

2.生产者命令行操作

bin/kafka-console-producer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

2)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>hello world
>atguigu  atguigu

3.消费者命令行操作

1)查看操作消费者命令参数

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh

 

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--from-beginning

从头开始消费。

--group <String: consumer group id>

指定消费者组名称。

2)消费消息

(1)消费first主题中的数据。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2把主题中所有的数据都读取出来(包括历史数据)。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

问题:

first的partition 0的leader副本在哪台broker    1

同一台broker是否可以管理多个不同的partititon的leader副本

image.png

image.png

topic 的分区数量,以及每个分区的副本数量,以及每个副本所在的 broker

节点,以及每个分区的 leader 副本所在 broker 节点,以及每个分区的 ISR 副本列表;

ISRin sync replicas 同步副本(当然也包含 leader 自身 replica.lag.time.max.ms =10000

OSRout of sync replicas 失去同步的副本(该副本上次请求 leader 同步数据距现在的时间间隔超

出配置阈值)

p1的leader在哪个broker    2

image.png

image.png

image.png

image.png

spark和mr的容错能力体现在哪里?

task失败,会被重试,如果在yarn上运行的话,就是去重新申请container,来重跑失败的那个task
rdd的血缘,checkpoint,shuffle数据落盘

image.png

image.png

image.png

image.png

image.png

kafka的读写操作,实际开发中,是通过各类更上层的组件实现、

而这些组件去读写kafka数据时,用的当然是kafka的java api操作,比如fink,spaarkstreaming,flume

image.png

image.png

1.构造生产者,可以持续不断产生数据

2.哪些参数必须得有?

bootstrap.server

key.serivalizer

value.serivalizer

其他的可选

3.kafka的生产者发送用户数据时,是可以使用jdk的序列化框架序列化用户数据吗

不可以,序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializer

image.png

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

Note that altering partition numbers while setting this config to latest may cause message delivery loss since producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets.

image.png

image.png

kafka消费者的起始消费位置有两种决定机制:
1.手动设置了起始位置,它肯定从你指定的位置开始
2.如果你没有手动指定起始位置,它去找消费组之前所记录的偏移量开始
3.如果之前的位置也获取不到,就看参数:auto.offset所指定的重置策略
Iterable 可迭代的
iterable是迭代器的再封装,叫:可迭代的
实现了iterble的对象,可以用增强for循环去遍历迭代
也可以从对象上取到iterator,来用iterator hasnext来迭代


image.png

image.png


image.png

image.png

image.png








相关文章
|
消息中间件 存储 Kafka
Kafka详解
当今数字化世界中,数据的流动变得至关重要。为了满足不断增长的数据需求,企业需要强大而可靠的数据处理工具。Apache Kafka就是这样一个工具,它在数据流处理领域表现出色。本文将详细介绍Apache Kafka,探讨它的核心概念、用途以及如何使用它来构建强大的数据流应用。
|
8月前
|
消息中间件 存储 分布式计算
|
4月前
|
消息中间件 缓存 算法
kafka(三)
kafka(三)
|
4月前
|
消息中间件 存储 缓存
kafka(一)
kafka(一)
|
4月前
|
消息中间件 存储 算法
kafka(二)
kafka(二)
|
5月前
|
消息中间件 Kafka
kafka里的acks是什么
【8月更文挑战第3天】kafka里的acks是什么
247 0
|
8月前
|
消息中间件 存储 分布式计算
kafka 详细介绍
kafka 详细介绍
|
8月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
57 0
|
8月前
|
消息中间件 Kafka Linux
kafka
Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
720 0
|
消息中间件 分布式计算 Java
浅谈kafka 一
浅谈kafka 一