Kafka开发环境搭建及应用

本文涉及的产品
云原生网关 MSE Higress,422元/月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: Kafka开发环境搭建及应用


kafka的介绍

Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?

  • 解耦:允许我们独立的扩展或修改队列两边的处理过程。
  • 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  • 缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况。
  • 灵活性&峰值处理能力:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键组件顶
    住突发的访问压力。
    异步通信:消息队列允许用户把消息放入队列但不立即处理它。
    典型应用: 原链接

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地

Elasticsearch 是一个分布式、RESTful ⻛格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。

作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

架构

注意:版面原因这里没有画上zookeeper, broker都是由zookeeper管理。

Kafka 存储的消息来自任意多被称为 Producer 生产者的进程。数据从而可以被发布到不同的Topic 主题下的不同 Partition 分区。

在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为 Consumer 消费者的进程可以从分区订阅消息。

Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。

下面给出 Kafka 一些重要概念,让大家对 Kafka 有个整体的认识和感知,后面还会详细的解析每一个概念的作用以及更深入的原理:

  • Producer:消息生产者,向 Kafka Broker 发消息的客户端。
  • Consumer:消息消费者,从 Kafka Broker 取消息的客户端。
  • Consumer Group:消费者组(CG),消费者组内每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台 Kafka 机器就是一个 Broker。一个集群( kafka cluster )由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。
  • Topic:可以理解为一个队列,Topic 将消息分类,生产者和消费者面向的是同一个 Topic。
  • Partition:为了实现扩展性,提高并发能力,一个非常大的 Topic 可以分布到多个 Broker(即服务器)上,一个 Topic 可以分为多个 Partition, 同一个topic在不同的分区的数据是不重复的, 每个 Partition 是一个有序的队列,其 表现形式就是一个一个的文件夹 。
  • Replication : 每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Message:每一条发送的消息主体。
  • Leader:每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
  • Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader。
  • Offset:消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
  • ZooKeeper:Kafka 集群能够正常工作,需要依赖于 ZooKeeper,ZooKeeper 帮助 Kafka存储和管理集群信息。

工作流程

Kafka集群将 Record 流存储在称为 Topic 的类别中,每个记录由一个键、一个值和一个时间戳组成。

Kafka 是一个分布式流平台,这到底是什么意思?

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错的持久方式存储记录流。
  • 处理记录流。

Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,面向的都是同一个Topic。

Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 对应于一个 log 文件,该log 文件中存储的就是 Producer 生产的数据。

Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 Offset。

消费者组中的每个消费者,都会实时记录自己消费到了哪个 Offset,以便出错恢复时,从上次的位置继续消费。

日志默认在: /tmp/kafka-logs

存储机制

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。

它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和“.log” 数据文件。

这些文件位于同一文件下,该文件夹的命名规则为:topic 名-分区号。例如,test这个 topic 有三分分区,则其对应的文件夹为 test-0,test-1,test-2。

ls /tmp/kafka-logs/test-1
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
leader-epoch-checkpoint

index 和 log 文件以当前 Segment 的第一条消息的 Offset 命名。下图为 index 文件和 log 文件

的结构示意图:

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。

查看索引:./kafka-dump-log.sh --files /tmp/kafka-logs/test-1/00000000000000000000.index

生产者

producer就是生产者,是数据的入口。Producer在写入数据的时候永远的找leader,不会直接将数据写入follower。

分区策略

分区原因:

  • 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 Topic 又可以有多个 Partition 组成,因此可以以 Partition 为单位读写了。
  • 可以提高并发,因此可以以 Partition 为单位读写了。

分区原则:我们需要将 Producer 发送的数据封装成一个 ProducerRecord 对象。

该对象需要指定一些参数:

  • topic:string 类型,NotNull。
  • partition:int 类型,可选。
  • timestamp:long 类型,可选。
  • key:string 类型,可选。
  • value:string 类型,可选。
  • headers:array 类型,Nullable。
    指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值。
    没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition 值。
    既没有 Partition 有没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin轮询算法。

数据可靠性保证

为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到Producer 发送的数据后,都需要向 Producer 发送 ACK(ACKnowledge 确认收到)。如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。

副本数据同步策略

何时发送 ACK?确保有 Follower 与 Leader 同步完成,Leader 再发送 ACK,这样才能保证Leader 挂掉之后,能在 Follower 中选举出新的 Leader 而不丢数据。

多少个 Follower 同步完成后发送 ACK?全部 Follower 同步完成,再发送 ACK。

方案 优点 缺点
半数以上完成同步,就发送ack 延迟低 选举新的leader时,容忍n台节点故障,需要2n+1个副本。
全部完成同步,才发送ack 选举新的leader时,容忍n台节点故障,需要n+1个副本。 延迟高。

ISR

采用第二种方案,所有 Follower 完成同步,Producer 才能继续发送数据,设想有一个 Follower因为某种原因出现故障,那 Leader 就要一直等到它完成同步。

这个问题怎么解决?Leader维护了一个动态的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。

当 ISR 集合中的 Follower 完成数据的同步之后,Leader 就会给 Follower 发送 ACK。

如果 Follower ⻓时间未向 Leader 同步数据,则该 Follower 将被踢出 ISR 集合,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障后,就会从 ISR 中选举出新的 Leader。

ACK 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 Follower 全部接受成功。

所以 Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下的配置。

ACK 参数配置:

  • 0:Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
  • 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在Follower 同步成功之前 Leader 故障,那么将会丢失数据。
  • -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。

可靠性指标

没有一个中间件能够做到百分之百的完全可靠,可靠性更多的还是基于几个9的衡量指标,比如4个9、5个9。软件系统的可靠性只能够无限去接近100%,但不可能达到100%。所以kafka如何是实现最大可能的可靠性呢?

  • 分区副本,你可以创建更多的分区来提升可靠性,但是分区数过多也会带来性能上的开销,一般来说,3个副本就能满足对大部分场景的可靠性要求
  • acks,生产者发送消息的可靠性,也就是我要保证我这个消息一定是到了broker并且完成了多副本的持久化,但这种要求也同样会带来性能上的开销。它有几个可选项
    - 1,生产者把消息发送到leader副本,leader副本在成功写入到本地日志之后就告诉生产者消息提交成功,但是如果isr集合中的follower副本还没来得及同步leader副本的消息,leader挂了,就会造成消息丢失
    - -1,消息不仅仅写入到leader副本,并且被ISR集合中所有副本同步完成之后才告诉生产者已经提交成功,这个时候即使leader副本挂了也不会造成数据丢失。
    - 0:表示producer不需要等待broker的消息确认。这个选项时延最小但同时⻛险最大(因为当server宕机时,数据将会丢失)。
  • 保障消息到了broker之后,消费者也需要有一定的保证,因为消费者也可能出现某些问题导致消息没有消费到。
  • enable.auto.commit默认为true,也就是自动提交offset,自动提交是批量执行的,有一个时间窗口,这种方式会带来重复提交或者消息丢失的问题,所以对于高可靠性要求的程序,要使用手动提交。 对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。

消费者

消费方式

Consumer 采用 Pull(拉取)模式从 Broker 中读取数据。

Pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。Pull 模式不足之处是,如果Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。

因为消费者从 Broker 主动拉取数据,需要维护一个⻓轮询,针对这一点, Kafka 的消费者在消费数据时会传入一个时⻓参数 timeout。

如果当前没有数据可供消费,Consumer 会等待一段时间之后再返回,这段时⻓即为 timeout。

分区分配策略

一个 Consumer Group 中有多个 Consumer,一个 Topic 有多个 Partition,所以必然会涉及到Partition 的分配问题,即确定哪个 Partition 由哪个 Consumer 来消费。

Kafka 有三种分配策略:

  1. RoundRobin
  2. Range,默认为Range
  3. Sticky

当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。

这里主要讲Range、RoundRobin。

Range(默认策略)

Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。

假设我们有10个分区,3个消费者,排完序的分区将会是0,1,2,3,4,5,6,7,8,9;消费者线程排完序将会是C1-0,C2-0,C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分 区,3个消费者线程, 10/3 = 3,而且除不尽,那么消费者线程 C1-0将会多消费一个分区

结果看起来是这样的:

C1-0 将消费 0, 1, 2, 3 分区

C2-0将消费 4,5,6分区

C3-0将消费 7,8,9分区

假如我们有11个分区,那么最后分区分配的结果看起来是这样的:

C1-0将消费 0,1,2,3分区

C2-0将消费 4,5,6,7分区

C3-0 将消费 8, 9, 10 分区

假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区

C2-0将消费 T1主题的 4,5,6分区以及 T2主题的 4,5,6分区

C3-0将消费 T1主题的 7,8,9分区以及 T2主题的 7,8,9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端

即是,如下图所示,Consumer0、Consumer1 同时订阅了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内订阅的主题越多,分区分配可能越不均衡。

RoundRobin

RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。

轮询分区策略是把所有partition和所有consumer线程都列出来,然后按照hashcode进行排序。最后通过轮询算法分配partition给消费线程。如果所有consumer实例的订阅是相同的,那么partition会均匀 分布。

在我们的例子里面,假如按照 hashCode排序完的topic-partitions组依次为T1-5,T1-3,T1-0,T1-8,T1-2,T1-1,T1-4,T1-7,T1-6,T1-9,我们的消费者线程排序为C1-0,C1-1,C2-0,C2-1,最后分区分配的结果为:

C1-0将消费 T1-5,T1-2,T1-6分区;

C1-1将消费 T1-3,T1-1,T1-9分区;

C2-0将消费 T1-0,T1-4分区;

C2-1将消费 T1-8,T1-7分区;

但是,当消费者组内订阅不同主题时,可能造成消费混乱,如下图所示,Consumer0 订阅主题A,Consumer1 订阅主题 B。

将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能分配到 Consumer0 中。

使用轮询分区策略必须满足两个条件

  1. 每个主题的消费者实例具有相同数量的流
  2. 每个消费者订阅的主题必须是相同的

kafka开发环境

安装Java环境

下载linux下的安装包

登陆网址: link.

下载完成后,Linux默认下载位置在当前目录下的Download或下载文件夹下,通过命令cd ~/Downloads或cd ~/下载即可查看到对应的文件。

解压安装包jdk-8u202-linux-x64.tar.gz

tar -zxvf jdk-8u291-linux-x64.tar.gz

解压后的文件夹为jdk1.8.0_291

进入文件夹和查看文件

cd jdk1.8.0_291
ls

可以看到bin目录

将解压后的文件移到/usr/lib目录下

在/usr/bin目录下新建jdk目录

sudo mkdir /usr/lib/jdk

将解压的jdk文件移动到新建的/usr/lib/jdk目录下来

sudo mv jdk1.8.0_291 /usr/lib/jdk/

执行命令后可到 usr/lib/jdk 目录下查看是否移动成功。

配置java环境变量

这里是将环境变量配置在etc/profile,即为所有用户配置JDK环境。

使用命令打开/etc/profile文件

sudo vim /etc/profile

在末尾添加以下几行文字:

#set java env
export JAVA_HOME=/usr/lib/jdk/jdk1.8.0_291
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

执行命令使修改立即生效

在终端输入,出现版本号说明安装成功。

java -version

Kafka的安装部署

下载kafka

wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz

安装kafka

我们下载的kafka是已经编译好的程序,只需要解压即可得到执行程序。

tar -zxvf kafka_2.11-2.0.0.tgz

进入kafka目录,以及查看对应的文件和目录

cd kafka_2.11-2.0.0
ls

bin:为执行程序

config:为配置文件

libs:为库文件

配置和启动zookeeper

下载的kafka程序里自带了zookeeper,kafka自带的Zookeeper程序脚本与配置文件名与原生Zookeeper稍有不同。

kafka自带的Zookeeper程序使用bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh来启动和停止Zookeeper。

kafka依赖于zookeeper来做master选举一起其他数据的维护。

  • 启动zookeeper:zookeeper-server-start.sh
  • 停止zookeeper:zookeeper-server-stop.sh

在config目录下,存在一些配置文件

zookeeper.properties
server.properties

所以我们可以通过下面的脚本来启动zk服务,当然,也可以自己独立搭建zk的集群来实现。这里我们直接使用kafka自带的zookeeper。

启动zookeeper

sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

默认端口为:2181,可以通过命令lsof -i:2181 查看zookeeper是否启动成功。

启动和停止kafka

  • 修改server.properties(在config目录), 增加zookeeper的配置
zookeeper.connect=localhost:2181
  • 启动kafka
sh kafka-server-start.sh -daemon ../config/server.properties

默认启动端口9092

  • 停止kafka
sh kafka-server-stop.sh -daemon ../config/server.properties

kafka的基本操作

创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Replication-factor 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数

查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

kafka-topics.sh 使用方式

围绕创建、修改、删除以及查看等功能。

查看帮助–help

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使

用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --help

下面我们挑选其中使用最为频繁且重要的参数进行说明,以及其中一些坑进行标明。

副本数量不能大于broker的数量

kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败.

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

创建主题–create

创建主题时候,有3个参数是必填的,分别是 --partitions(分区数量)、 --topic(主题名) 、 --replication-factor(复制系数), 同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回:

Created topic “test1”.

此时主题 test1 就已经创建了。另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和–if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

查看broker上所有的主题 --list

sh kafka-topics.sh --list --zookeeper localhost:2181

返回:test1

其中test1便为我们创建的主题。

查看指定主题 topic 的详细信息 --describe

该参数会将该主题的所有信息一一列出打印出来,比如分区数量、副本系数、领导者等待。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

返回:

Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test1

Partition: 0

Leader: 0 Replicas: 0

Isr: 0

修改主题信息 --alter(增加主题分区数量)

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter -- partitions 2
WARNING: If partitions are increased for a topic that has a key, the
partition logic or ordering of the messages will be affected
Adding partitions succeeded!

可以看到已经成功的将主题的分区数量从1修改为了2。

**如果去修改一个不存在的topic信息会怎么样?**比如修改主题 test2,当前这主题是不存在的。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --
partitions 2
Error while executing topic command : Topic test2 does not exist on ZK path
localhost:2181
[2021-07-12 17:28:59,253] ERROR java.lang.IllegalArgumentException: Topic
test2 does not exist on ZK path localhost:2181
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)

注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题topic, 然后重新创建

删除主题 topic --delete

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将

不会有任何作用。

启动生产者:sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1

启动消费者:sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1 --from-beginning

发现此时还是可以发送消息和接收消息。

如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties

中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

重启kafka

停止:sh kafka-server-stop.sh -daemon …/config/server.properties

启动:sh kafka-server-start.sh -daemon …/config/server.properties

再次删除

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
相关文章
|
3月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
103 2
|
3月前
|
消息中间件 监控 大数据
Kafka消息队列架构与应用场景探讨:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Kafka的消息队列架构,包括Broker、Producer、Consumer、Topic和Partition等核心概念,以及消息生产和消费流程。此外,还介绍了Kafka在微服务、实时数据处理、数据管道和数据仓库等场景的应用。针对面试,文章解析了Kafka与传统消息队列的区别、实际项目挑战及解决方案,并展望了Kafka的未来发展趋势。附带Java Producer和Consumer的代码示例,帮助读者巩固技术理解,为面试做好准备。
148 0
|
9天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
28 3
|
22天前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
2月前
|
消息中间件 存储 缓存
高性能、高可靠性!Kafka的技术优势与应用场景全解析
**Kafka** 是一款高吞吐、高性能的消息系统,擅长日志收集、消息传递和用户活动跟踪。其优点包括:零拷贝技术提高传输效率,顺序读写优化磁盘性能,持久化保障数据安全,分布式架构支持扩展,以及客户端状态维护确保可靠性。在实际应用中,Kafka常用于日志聚合、解耦生产者与消费者,以及实时用户行为分析。
114 3
|
1月前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
2月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用合集之如何触发kafka
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
2月前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
2月前
|
消息中间件 存储 大数据
深度分析:Apache Kafka及其在大数据处理中的应用
Apache Kafka是高吞吐、低延迟的分布式流处理平台,常用于实时数据流、日志收集和事件驱动架构。与RabbitMQ(吞吐量有限)、Pulsar(多租户支持但生态系统小)和Amazon Kinesis(托管服务,成本高)对比,Kafka在高吞吐和持久化上有优势。适用场景包括实时处理、数据集成、日志收集和消息传递。选型需考虑吞吐延迟、持久化、协议支持等因素,使用时注意资源配置、数据管理、监控及安全性。
|
3月前
|
消息中间件 存储 传感器
Kafka消息队列原理及应用详解
【5月更文挑战第6天】Apache Kafka是高性能的分布式消息队列,常用于实时数据管道和流应用。它提供高性能、持久化、分布式和可伸缩的消息处理,支持解耦、异步通信和流量控制。Kafka的核心概念包括Broker、Topic、Partition、Producer、Consumer和Consumer Group。其特点是高吞吐、低延迟、数据持久化、分布式架构和容错性。常见应用包括实时数据流处理、日志收集、消息传递和系统间数据交换。

热门文章

最新文章