详解Kafka1-基础使用

简介: Kafka消息队列技术指南 本文系统介绍了Kafka消息队列的核心概念与应用实践。主要内容包括: 消息队列基础 两种模型:生产者-消费者模型和发布-订阅模型 应用场景:异步处理、系统解耦、流量削峰等 Kafka核心架构 重要组件:Broker、Zookeeper、Topic、Partition等 消费者组机制实现负载均衡 消息存储与分区策略 实践操作 集群搭建与环境配置 基准测试方法与性能指标 Java API编程示例(生产者/消费者) 事务编程实现原子操作 高级特性 生产者幂等性原理与配置 事务API与隔

 

1.消息队列基础概念

1.1 消息队列介绍

  • 消息队列——用于存放消息的组件
  • 可以将消息放入到队列中,也可以从消息队列中获取消息
  • 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)
  • 消息队列中间件:消息队列的组件,例如:Kafka、Active MQ、RabbitMQ、RocketMQ、ZeroMQ

1.2 消息队列应用场景

  • 异步处理
  • 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
  • 比较常见的:发送短信验证码、发送邮件

image.gif 编辑

  • 系统解耦
  • 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
  • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦 image.gif 编辑
  • 流量削峰
  • 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发

image.gif 编辑

  • 日志处理
  • 可以使用消息队列作为临时存储,或者一种通信管道

image.gif 编辑

1.3 消息队列两种模型

1.3.1 生产者,消费者模型

  • 生产者负责将消息生产到MQ中
  • 消费者负责从MQ中获取消息
  • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序

image.gif 编辑

1.3.2 消息队列的模式

点对点:一个消费者消费一个消息

image.gif 编辑

消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

点对点模式特点:

  1. 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  2. 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  3. 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅:多个消费者可以消费一个消息 image.gif 编辑

发布/订阅模式特点:

  1. 每个消息可以有多个订阅者;
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3. 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

2.Kafka基础概念

2.1 简介

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的:

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

  1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
  2. 以容错的持久化方式存储数据流
  3. 处理数据流

我们重点关键三个部分的关键词:

  1. Publish and subscribe:发布与订阅
  2. Store:存储
  3. Process:处理

kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。

2.1 Kafka应用场景

我们通常将Apache Kafka用在两类程序:

  1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
  2. 构建实时流应用程序,以转换或响应数据流

image.gif 编辑

上图,我们可以看到:

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中。
  4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

Kafka优势:

特性

ActiveMQ

RabbitMQ

Kafka

RocketMQ

所属社区/公司

Apache

Mozilla Public License

Apache

Apache/Ali

成熟度

成熟

成熟

成熟

比较成熟

生产者-消费者模式

支持

支持

支持

支持

发布-订阅

支持

支持

支持

支持

REQUEST-REPLY

支持

支持

-

支持

API完备性

低(静态配置)

多语言支持

支持JAVA优先

语言无关

支持,JAVA优先

支持

单机呑吐量

万级(最差)

万级

十万级

十万级(最高)

消息延迟

-

微秒级

毫秒级

-

可用性

高(主从)

高(主从)

非常高(分布式)

消息丢失

-

理论上不会丢失

-

消息重复

-

可控制

理论上会有重复

-

事务

支持

不支持

支持

支持

文档的完备性

提供快速入门

首次部署难度

-

在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。

Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。

Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

3.Kafka环境搭建

3.1 搭建Kafka集群

1. 将Kafka的安装包上传到虚拟机,并解压

cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/

image.gif

    2.修改 server.properties

    cd /export/server/kafka_2.12-2.4.1/config
    vim server.properties
    # 指定broker的id
    broker.id=0
    # 指定Kafka数据的位置
    log.dirs=/export/server/kafka_2.12-2.4.1/data
    # 配置zk的三个节点
    zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181

    image.gif

    3.将安装好的kafka复制到另外两台服务器

    cd /export/server
    scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD
    scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD
    修改另外两个节点的broker.id分别为1和2
    ---------node2.itcast.cn--------------
    cd /export/server/kafka_2.12-2.4.1/config
    vim erver.properties
    broker.id=1
    --------node3.itcast.cn--------------
    cd /export/server/kafka_2.12-2.4.1/config
    vim server.properties
    broker.id=2

    image.gif

    4.配置KAFKA_HOME环境变量

    vim /etc/profile
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:$PATH:${KAFKA_HOME}
    分发到各个节点
    scp /etc/profile node2.itcast.cn:$PWD
    scp /etc/profile node3.itcast.cn:$PWD
    每个节点加载环境变量
    source /etc/profile

    image.gif

    5.启动服务器

    # 启动ZooKeeper
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    # 启动Kafka
    cd /export/server/kafka_2.12-2.4.1
    nohup bin/kafka-server-start.sh config/server.properties &
    # 测试Kafka集群是否启动成功
    bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list

    image.gif

      注意:

      • Kafka集群是必须要有ZooKeeper的
      • 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
      • log.dir数据存储目录需要配置

      3.2 目录结构分析

      目录名称

      说明

      bin

      Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、

      创建Topic、生产者、消费者程序等等

      config

      Kafka的所有配置文件

      libs

      运行Kafka所需要的所有JAR包

      logs

      Kafka的所有日志文件,如果Kafka出现一些问题,需要到

      该目录中去查看异常信息

      site-docs

      Kafka的网站帮助文件

      3.3 Kafka一键启动脚本

      为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

      1. 在节点1中创建 /export/onekey 目录
      cd /export/onekey
      1. image.gif
      2. 准备slave配置文件,用于保存要启动哪几个节点上的kafka
      node1.itcast.cn
      node2.itcast.cn
      node3.itcast.cn
      1. image.gif
      2. 编写start-kafka.sh脚本
      vim start-kafka.sh
      cat /export/onekey/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
      }&
      wait
      done
      1. image.gif
      2. 编写stop-kafka.sh脚本
      vim stop-kafka.sh
      cat /export/onekey/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
      }&
      wait
      done
      1. image.gif
      2. 给start-kafka.sh、stop-kafka.sh配置执行权限
      chmod u+x start-kafka.sh
      chmod u+x stop-kafka.sh
      1. image.gif
      2. 执行一键启动、一键关闭
      ./start-kafka.sh
      ./stop-kafka.sh
      1. image.gif

      3.4 基础操作 image.gif 编辑

      创建topic:

      创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

      # 创建名为test的主题
      bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test
      # 查看目前Kafka中的主题
      bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092

      image.gif

      生产消息到Kafka:

      使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

      bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test

      image.gif

      从Kafka消费消息:

      使用下面的命令来消费 test 主题中的消息:

      bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning

      image.gif

      3.5 使用Kafka Tools操作Kafka

      连接Kafka集群:

      安装Kafka Tools后启动Kafka

      image.gif 编辑 image.gif 编辑 image.gif 编辑

      创建topic:

      image.gif 编辑 image.gif 编辑 image.gif 编辑

      3.6 Kafka基准测试

      基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。我们可以通过基准测试,了解到软件、硬件的性能水平。主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。

      基于1个分区1个副本的基准测试

      测试步骤:

      1. 启动Kafka集群
      2. 创建一个1个分区1个副本的topic: benchmark
      3. 同时运行生产者、消费者基准测试程序
      4. 观察结果

      创建topic:

      bin/kafka-topics.sh --zookeeper node1.itcast.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1

      image.gif

      生产消息基准测试:

      在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。

      bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

      image.gif

      bin/kafka-producer-perf-test.sh 
      --topic topic的名字
      --num-records 总共指定生产数据量(默认5000W)
      --throughput  指定吞吐量——限流(-1不指定)
      --record-size   record数据大小(字节)
      --producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式

      image.gif

      测试结果:

      吞吐量

      93092.533979 records/sec

      每秒9.3W条记录

      吞吐速率

      (88.78 MB/sec)

      每秒约89MB数据

      平均延迟时间

      346.62 ms avg latency

      最大延迟时间

      1003.00 ms max latency

      消费消息基准测试:

      bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic benchmark --fetch-size 1048576 --messages 5000000

      image.gif

      bin/kafka-consumer-perf-test.sh
      --broker-list 指定kafka集群地址
      --topic 指定topic的名称
      --fetch-size 每次拉取的数据大小
      --messages 总共要消费的消息个数

      image.gif

      data.consumed.in.MB

      共计消费的数据

      4768.3716MB

      MB.sec

      每秒消费的数量

      445.6006

      每秒445MB

      data.consumed.in.nMsg

      共计消费的数量

      5000000

      nMsg.sec

      每秒的数量

      467246.0518

      每秒46.7W条

      4.Java编程操作Kafka

      4.1 同步生产消息到Kafka中

      需求:

      我们将编写Java程序,将1-100的数字消息写入到Kafka中。

      导入依赖:

      <repositories><!-- 代码库 -->
          <repository>
              <id>central</id>
              <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
              <releases>
                  <enabled>true</enabled>
              </releases>
              <snapshots>
                  <enabled>true</enabled>
                  <updatePolicy>always</updatePolicy>
                  <checksumPolicy>fail</checksumPolicy>
              </snapshots>
          </repository>
      </repositories>
      <dependencies>
          <!-- kafka客户端工具 -->
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.4.1</version>
          </dependency>
          <!-- 工具类 -->
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-io</artifactId>
              <version>1.3.2</version>
          </dependency>
          <!-- SLF桥接LOG4J日志 -->
          <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
              <version>1.7.6</version>
          </dependency>
          <!-- SLOG4J日志 -->
          <dependency>
              <groupId>log4j</groupId>
              <artifactId>log4j</artifactId>
              <version>1.2.16</version>
          </dependency>
      </dependencies>
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-compiler-plugin</artifactId>
                  <version>3.7.0</version>
                  <configuration>
                      <source>1.8</source>
                      <target>1.8</target>
                  </configuration>
              </plugin>
          </plugins>
      </build>

      image.gif

      导入log4j.properties:

      将log4j.properties配置文件放入到resources文件夹中

      log4j.rootLogger=INFO,stdout
      log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
      log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

      image.gif

      开发步骤:

      可以参考以下方式来编写第一个Kafka示例程序

      参考以下文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

      1. 创建用于连接Kafka的Properties配置
      Properties props = new Properties();
      props.put("bootstrap.servers", "192.168.88.100:9092");
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      1. image.gif
      2. 创建一个生产者对象KafkaProducer
      3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
      4. 再调用一个Future.get()方法等待响应
      5. 关闭生产者

      参考代码:

      public class KafkaProducerTest {
          public static void main(String[] args) {
              // 1. 创建用于连接Kafka的Properties配置
              Properties props = new Properties();
              props.put("bootstrap.servers", "192.168.88.100:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              // 2. 创建一个生产者对象KafkaProducer
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
              // 3. 调用send发送1-100消息到指定Topic test
              for(int i = 0; i < 100; ++i) {
                  try {
                      // 获取返回值Future,该对象封装了返回值
                      Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                      // 调用一个Future.get()方法等待响应
                      future.get();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
              }
              // 5. 关闭生产者
              producer.close();
          }
      }

      image.gif

      4.2 从Kafka的topic中消费消息

      需求:

      从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

      开发步骤:

      1. 创建Kafka消费者配置
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
      props.setProperty("group.id", "test");
      props.setProperty("enable.auto.commit", "true");
      props.setProperty("auto.commit.interval.ms", "1000");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      image.gif

      1. 创建Kafka消费者
      2. 订阅要消费的主题
      3. 使用一个while循环,不断从Kafka的topic中拉取消息
      4. 将将记录(record)的offset、key、value都打印出来

      代码:

      • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。
      • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据
      • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取
      /**
       * 消费者程序
       *
       * 1.创建Kafka消费者配置
       * Properties props = new Properties();
       * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
       * props.setProperty("group.id", "test");
       * props.setProperty("enable.auto.commit", "true");
       * props.setProperty("auto.commit.interval.ms", "1000");
       * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       *
       * 2.创建Kafka消费者
       * 3.订阅要消费的主题
       * 4.使用一个while循环,不断从Kafka的topic中拉取消息
       * 5.将将记录(record)的offset、key、value都打印出来
       */
      public class KafkaConsumerTest {
          public static void main(String[] args) {
              // 1.创建Kafka消费者配置
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
              // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
              // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
              props.setProperty("group.id", "test");
              // 自动提交offset
              props.setProperty("enable.auto.commit", "true");
              // 自动提交offset的时间间隔
              props.setProperty("auto.commit.interval.ms", "1000");
              // 拉取的key、value数据的
              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              // 2.创建Kafka消费者
              KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
              // 3. 订阅要消费的主题
              // 指定消费者从哪个topic中拉取数据
              kafkaConsumer.subscribe(Arrays.asList("test"));
              // 4.使用一个while循环,不断从Kafka的topic中拉取消息
              while(true) {
                  // Kafka的消费者一次拉取一批的数据
                  ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
                  // 5.将将记录(record)的offset、key、value都打印出来
                  for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                      // 主题
                      String topic = consumerRecord.topic();
                      // offset:这条消息处于Kafka分区中的哪个位置
                      long offset = consumerRecord.offset();
                      // key\value
                      String key = consumerRecord.key();
                      String value = consumerRecord.value();
                      System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                  }
              }
          }
      }

      image.gif

      4.3 生产者使用异步方式生产消息

      如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

      需求:

      1. 在发送消息出现异常时,能够及时打印出异常信息
      2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset

      代码实现:

      • 使用匿名内部类实现Callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法
      • metadata:消息的元数据(属于哪个topic、属于哪个partition、对应的offset是什么)
      • exception:这个对象Kafka生产消息封装了出现的异常,如果为null,表示发送成功,如果不为null,表示出现异常。
      public class KafkaProducerTest {
          public static void main(String[] args) {
              // 1. 创建用于连接Kafka的Properties配置
              Properties props = new Properties();
              props.put("bootstrap.servers", "node1.itcast.cn:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              // 2. 创建一个生产者对象KafkaProducer
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
              // 3. 调用send发送1-100消息到指定Topic test
              for(int i = 0; i < 100; ++i) {
                  // 一、同步方式
                  // 获取返回值Future,该对象封装了返回值
                  // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                  // 调用一个Future.get()方法等待响应
                  // future.get();
                  // 二、带回调函数异步方式
                  producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
                      @Override
                      public void onCompletion(RecordMetadata metadata, Exception exception) {
                          if(exception != null) {
                              System.out.println("发送消息出现异常");
                          }
                          else {
                              String topic = metadata.topic();
                              int partition = metadata.partition();
                              long offset = metadata.offset();
                              System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
                          }
                      }
                  });
              }
              // 5. 关闭生产者
              producer.close();
          }
      }

      image.gif

      5.Kafka架构

      5.1 Kafka重要组件

      5.1.1 broker

      image.gif 编辑

      1. 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
      2. broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
      3. 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

      5.1.2 Zookeeper

      image.gif 编辑

      1. ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
      2. ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

      PS:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

      5.1.3 producer

      生产者负责将数据推送给broker的topic

      5.1.4 consumer

      消费者负责从broker的topic中拉取数据,并自己进行处理

      5.1.5 consumer group image.gif 编辑

      1. consumer group是kafka提供的可扩展且具有容错性的消费者机制
      2. 一个消费者组可以包含多个消费者
      3. 一个消费者组有一个唯一的ID(group Id)
      4. 组内的消费者一起消费主题的所有分区数据

      5.1.6 分区(Partitions)

      image.gif 编辑

      在Kafka集群中,主题被分为多个分区。

      5.1.7 副本(Replicas)

      image.gif 编辑

      1. 副本可以确保某个服务器出现故障时,确保数据依然可用
      2. 在Kafka中,一般都会设计副本的个数>1

      5.1.8 主题(Topic)

      image.gif 编辑

      1. 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
      2. Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
      3. 在主题中的消息是有结构的,一般一个主题包含某一类消息
      4. 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

      5.1.9 偏移量(offset)

      image.gif 编辑

      1. offset记录着下一条将要发送给Consumer的消息的序号
      2. 默认Kafka将offset存储在ZooKeeper中
      3. 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
      4. 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

      5.2 消费者组

      Kafka支持有多个消费者同时消费一个主题中的数据。我们接下来,给大家演示,启动两个消费者共同来消费 test 主题的数据。

      1. 首先,修改生产者程序,让生产者每3秒生产1-100个数字。
      // 3. 发送1-100数字到Kafka的test主题中
      while(true) {
          for (int i = 1; i <= 100; ++i) {
              // 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回
              // 这样可以让消息发送变得更高效
              producer.send(new ProducerRecord<>("test", i + ""));
          }
          Thread.sleep(3000);
      }

      image.gif

        2. 接下来,同时运行两个消费者。

        image.gif 编辑

        3.同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。

        # 设置 test topic为2个分区
        bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test

        image.gif

          重新运行生产者、两个消费者程序,我们就可以看到两个消费者都可以消费Kafka Topic的数据了

          6.Kafka生产者幂等性

          6.1 概念

          拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

          image.gif 编辑

          如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。 image.gif 编辑

          在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

          配置幂等性:

          props.put("enable.idempotence",true);

          image.gif

          6.2 幂等性原理

          为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

          1. PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
          2. Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。

          image.gif 编辑

          7.Kafka事务

          7.1 简介

          Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

          image.gif 编辑


          7.2 事务操作API

          Producer接口中定义了以下5个事务相关方法:

          1. initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
          2. beginTransaction(开始事务):启动一个Kafka事务
          3. sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
          4. commitTransaction(提交事务):提交事务
          5. abortTransaction(放弃事务):取消事务

          7.3 事务相关属性配置

          7.3.1 生产者:

          // 配置事务的id,开启了事务会默认开启幂等性
          props.put("transactional.id", "first-transactional");

          image.gif

          7.3.2 消费者:

          // 1. 消费者需要设置隔离级别
          props.put("isolation.level","read_committed");
          //  2. 关闭自动提交
           props.put("enable.auto.commit", "false");

          image.gif

          7.4 事务编程

          需求:

          在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

          姓名,性别,出生日期
          张三,1,1980-10-09
          李四,0,1985-11-01

          image.gif

          我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

          启动生产者控制台程序模拟数据:

          # 创建名为ods_user和dwd_user的主题
          bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
          bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
          # 生产数据到 ods_user
          bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
          # 从dwd_user消费数据
          bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user --from-beginning  --isolation-level read_committed

          image.gif

          编写创建消费者代码:

          编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

          实现步骤:

          1. 创建Kafka消费者配置
          Properties props = new Properties();
           props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
           props.setProperty("group.id", "ods_user");
           props.put("isolation.level","read_committed");
           props.setProperty("enable.auto.commit", "false");
           props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          1. image.gif
          2. 创建消费者,并订阅 ods_user 主题

          编写创建生产者代码:

          编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

          1. 创建生产者配置
          Properties props = new Properties();
          props.put("bootstrap.servers", "node1.itcast.cn:9092");
          props.put("transactional.id", "dwd_user");
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          1. image.gif

          编写代码消费并生产数据:

          实现步骤:

          1. 调用之前实现的方法,创建消费者、生产者对象
          2. 生产者调用initTransactions初始化事务
          3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
          1. 生产者开启事务
          2. 消费者拉取消息
          3. 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
          4. 生产消息到dwd_user topic中
          5. 提交偏移量到事务中
          6. 提交事务
          7. 捕获异常,如果出现异常,则取消事务
          public static void main(String[] args) {
                  Consumer<String, String> consumer = createConsumer();
                  Producer<String, String> producer = createProducer();
                  // 初始化事务
                  producer.initTransactions();
                  while(true) {
                      try {
                          // 1. 开启事务
                          producer.beginTransaction();
                          // 2. 定义Map结构,用于保存分区对应的offset
                          Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
                          // 2. 拉取消息
                          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
                          for (ConsumerRecord<String, String> record : records) {
                              // 3. 保存偏移量
                              offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                                      new OffsetAndMetadata(record.offset() + 1));
                              // 4. 进行转换处理
                              String[] fields = record.value().split(",");
                              fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
                              String message = fields[0] + "," + fields[1] + "," + fields[2];
                              // 5. 生产消息到dwd_user
                              producer.send(new ProducerRecord<>("dwd_user", message));
                          }
                          // 6. 提交偏移量到事务
                          producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
                          // 7. 提交事务
                          producer.commitTransaction();
                      } catch (Exception e) {
                          // 8. 放弃事务
                          producer.abortTransaction();
                      }
                  }
              }

          image.gif

          往之前启动的console-producer中写入消息进行测试,同时检查console-consumer是否能够接收到消息: image.gif 编辑

          逐个测试一下消息:

          张三,1,1980-10-09
          李四,0,1985-11-01

          image.gif

          模拟异常事务:

          // 3. 保存偏移量
          offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                  new OffsetAndMetadata(record.offset() + 1));
          // 4. 进行转换处理
          String[] fields = record.value().split(",");
          fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
          String message = fields[0] + "," + fields[1] + "," + fields[2];
          // 模拟异常
          int i = 1/0;
          // 5. 生产消息到dwd_user
          producer.send(new ProducerRecord<>("dwd_user", message));

          image.gif

          启动程序一次,抛出异常。

          再启动程序一次,还是抛出异常。

          直到我们处理该异常为止。

          我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。

            相关文章
            |
            6天前
            |
            人工智能 JSON 机器人
            让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
            本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
            10861 75
            让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
            |
            6天前
            |
            人工智能 IDE API
            2026年国内 Codex 安装教程和使用教程:GPT-5.4 完整指南
            Codex已进化为AI编程智能体,不仅能补全代码,更能理解项目、自动重构、执行任务。本文详解国内安装、GPT-5.4接入、cc-switch中转配置及实战开发流程,助你从零掌握“描述需求→AI实现”的新一代工程范式。(239字)
            3756 129
            |
            1天前
            |
            人工智能 Kubernetes 供应链
            深度解析:LiteLLM 供应链投毒事件——TeamPCP 三阶段后门全链路分析
            阿里云云安全中心和云防火墙已在第一时间上线相关检测与拦截策略!
            1304 5
            |
            2天前
            |
            人工智能 自然语言处理 供应链
            【最新】阿里云ClawHub Skill扫描:3万个AI Agent技能中的安全度量
            阿里云扫描3万+AI Skill,发现AI检测引擎可识别80%+威胁,远高于传统引擎。
            1249 2
            |
            12天前
            |
            人工智能 JavaScript API
            解放双手!OpenClaw Agent Browser全攻略(阿里云+本地部署+免费API+网页自动化场景落地)
            “让AI聊聊天、写代码不难,难的是让它自己打开网页、填表单、查数据”——2026年,无数OpenClaw用户被这个痛点困扰。参考文章直击核心:当AI只能“纸上谈兵”,无法实际操控浏览器,就永远成不了真正的“数字员工”。而Agent Browser技能的出现,彻底打破了这一壁垒——它给OpenClaw装上“上网的手和眼睛”,让AI能像真人一样打开网页、点击按钮、填写表单、提取数据,24小时不间断完成网页自动化任务。
            2650 6