详解Kafka1-基础使用

简介: Kafka消息队列技术指南 本文系统介绍了Kafka消息队列的核心概念与应用实践。主要内容包括: 消息队列基础 两种模型:生产者-消费者模型和发布-订阅模型 应用场景:异步处理、系统解耦、流量削峰等 Kafka核心架构 重要组件:Broker、Zookeeper、Topic、Partition等 消费者组机制实现负载均衡 消息存储与分区策略 实践操作 集群搭建与环境配置 基准测试方法与性能指标 Java API编程示例(生产者/消费者) 事务编程实现原子操作 高级特性 生产者幂等性原理与配置 事务API与隔

 

1.消息队列基础概念

1.1 消息队列介绍

  • 消息队列——用于存放消息的组件
  • 可以将消息放入到队列中,也可以从消息队列中获取消息
  • 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)
  • 消息队列中间件:消息队列的组件,例如:Kafka、Active MQ、RabbitMQ、RocketMQ、ZeroMQ

1.2 消息队列应用场景

  • 异步处理
  • 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
  • 比较常见的:发送短信验证码、发送邮件

image.gif 编辑

  • 系统解耦
  • 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
  • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦 image.gif 编辑
  • 流量削峰
  • 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发

image.gif 编辑

  • 日志处理
  • 可以使用消息队列作为临时存储,或者一种通信管道

image.gif 编辑

1.3 消息队列两种模型

1.3.1 生产者,消费者模型

  • 生产者负责将消息生产到MQ中
  • 消费者负责从MQ中获取消息
  • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序

image.gif 编辑

1.3.2 消息队列的模式

点对点:一个消费者消费一个消息

image.gif 编辑

消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

点对点模式特点:

  1. 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  2. 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  3. 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅:多个消费者可以消费一个消息 image.gif 编辑

发布/订阅模式特点:

  1. 每个消息可以有多个订阅者;
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3. 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

2.Kafka基础概念

2.1 简介

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的:

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

  1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
  2. 以容错的持久化方式存储数据流
  3. 处理数据流

我们重点关键三个部分的关键词:

  1. Publish and subscribe:发布与订阅
  2. Store:存储
  3. Process:处理

kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。

2.1 Kafka应用场景

我们通常将Apache Kafka用在两类程序:

  1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
  2. 构建实时流应用程序,以转换或响应数据流

image.gif 编辑

上图,我们可以看到:

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中。
  4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

Kafka优势:

特性

ActiveMQ

RabbitMQ

Kafka

RocketMQ

所属社区/公司

Apache

Mozilla Public License

Apache

Apache/Ali

成熟度

成熟

成熟

成熟

比较成熟

生产者-消费者模式

支持

支持

支持

支持

发布-订阅

支持

支持

支持

支持

REQUEST-REPLY

支持

支持

-

支持

API完备性

低(静态配置)

多语言支持

支持JAVA优先

语言无关

支持,JAVA优先

支持

单机呑吐量

万级(最差)

万级

十万级

十万级(最高)

消息延迟

-

微秒级

毫秒级

-

可用性

高(主从)

高(主从)

非常高(分布式)

消息丢失

-

理论上不会丢失

-

消息重复

-

可控制

理论上会有重复

-

事务

支持

不支持

支持

支持

文档的完备性

提供快速入门

首次部署难度

-

在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。

Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。

Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

3.Kafka环境搭建

3.1 搭建Kafka集群

1. 将Kafka的安装包上传到虚拟机,并解压

cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/

image.gif

    2.修改 server.properties

    cd /export/server/kafka_2.12-2.4.1/config
    vim server.properties
    # 指定broker的id
    broker.id=0
    # 指定Kafka数据的位置
    log.dirs=/export/server/kafka_2.12-2.4.1/data
    # 配置zk的三个节点
    zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181

    image.gif

    3.将安装好的kafka复制到另外两台服务器

    cd /export/server
    scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD
    scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD
    修改另外两个节点的broker.id分别为1和2
    ---------node2.itcast.cn--------------
    cd /export/server/kafka_2.12-2.4.1/config
    vim erver.properties
    broker.id=1
    --------node3.itcast.cn--------------
    cd /export/server/kafka_2.12-2.4.1/config
    vim server.properties
    broker.id=2

    image.gif

    4.配置KAFKA_HOME环境变量

    vim /etc/profile
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:$PATH:${KAFKA_HOME}
    分发到各个节点
    scp /etc/profile node2.itcast.cn:$PWD
    scp /etc/profile node3.itcast.cn:$PWD
    每个节点加载环境变量
    source /etc/profile

    image.gif

    5.启动服务器

    # 启动ZooKeeper
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    # 启动Kafka
    cd /export/server/kafka_2.12-2.4.1
    nohup bin/kafka-server-start.sh config/server.properties &
    # 测试Kafka集群是否启动成功
    bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list

    image.gif

      注意:

      • Kafka集群是必须要有ZooKeeper的
      • 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)
      • log.dir数据存储目录需要配置

      3.2 目录结构分析

      目录名称

      说明

      bin

      Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、

      创建Topic、生产者、消费者程序等等

      config

      Kafka的所有配置文件

      libs

      运行Kafka所需要的所有JAR包

      logs

      Kafka的所有日志文件,如果Kafka出现一些问题,需要到

      该目录中去查看异常信息

      site-docs

      Kafka的网站帮助文件

      3.3 Kafka一键启动脚本

      为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

      1. 在节点1中创建 /export/onekey 目录
      cd /export/onekey
      1. image.gif
      2. 准备slave配置文件,用于保存要启动哪几个节点上的kafka
      node1.itcast.cn
      node2.itcast.cn
      node3.itcast.cn
      1. image.gif
      2. 编写start-kafka.sh脚本
      vim start-kafka.sh
      cat /export/onekey/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
      }&
      wait
      done
      1. image.gif
      2. 编写stop-kafka.sh脚本
      vim stop-kafka.sh
      cat /export/onekey/slave | while read line
      do
      {
       echo $line
       ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
      }&
      wait
      done
      1. image.gif
      2. 给start-kafka.sh、stop-kafka.sh配置执行权限
      chmod u+x start-kafka.sh
      chmod u+x stop-kafka.sh
      1. image.gif
      2. 执行一键启动、一键关闭
      ./start-kafka.sh
      ./stop-kafka.sh
      1. image.gif

      3.4 基础操作 image.gif 编辑

      创建topic:

      创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

      # 创建名为test的主题
      bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test
      # 查看目前Kafka中的主题
      bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092

      image.gif

      生产消息到Kafka:

      使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

      bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test

      image.gif

      从Kafka消费消息:

      使用下面的命令来消费 test 主题中的消息:

      bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning

      image.gif

      3.5 使用Kafka Tools操作Kafka

      连接Kafka集群:

      安装Kafka Tools后启动Kafka

      image.gif 编辑 image.gif 编辑 image.gif 编辑

      创建topic:

      image.gif 编辑 image.gif 编辑 image.gif 编辑

      3.6 Kafka基准测试

      基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。我们可以通过基准测试,了解到软件、硬件的性能水平。主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。

      基于1个分区1个副本的基准测试

      测试步骤:

      1. 启动Kafka集群
      2. 创建一个1个分区1个副本的topic: benchmark
      3. 同时运行生产者、消费者基准测试程序
      4. 观察结果

      创建topic:

      bin/kafka-topics.sh --zookeeper node1.itcast.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1

      image.gif

      生产消息基准测试:

      在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。

      bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

      image.gif

      bin/kafka-producer-perf-test.sh 
      --topic topic的名字
      --num-records 总共指定生产数据量(默认5000W)
      --throughput  指定吞吐量——限流(-1不指定)
      --record-size   record数据大小(字节)
      --producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式

      image.gif

      测试结果:

      吞吐量

      93092.533979 records/sec

      每秒9.3W条记录

      吞吐速率

      (88.78 MB/sec)

      每秒约89MB数据

      平均延迟时间

      346.62 ms avg latency

      最大延迟时间

      1003.00 ms max latency

      消费消息基准测试:

      bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic benchmark --fetch-size 1048576 --messages 5000000

      image.gif

      bin/kafka-consumer-perf-test.sh
      --broker-list 指定kafka集群地址
      --topic 指定topic的名称
      --fetch-size 每次拉取的数据大小
      --messages 总共要消费的消息个数

      image.gif

      data.consumed.in.MB

      共计消费的数据

      4768.3716MB

      MB.sec

      每秒消费的数量

      445.6006

      每秒445MB

      data.consumed.in.nMsg

      共计消费的数量

      5000000

      nMsg.sec

      每秒的数量

      467246.0518

      每秒46.7W条

      4.Java编程操作Kafka

      4.1 同步生产消息到Kafka中

      需求:

      我们将编写Java程序,将1-100的数字消息写入到Kafka中。

      导入依赖:

      <repositories><!-- 代码库 -->
          <repository>
              <id>central</id>
              <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
              <releases>
                  <enabled>true</enabled>
              </releases>
              <snapshots>
                  <enabled>true</enabled>
                  <updatePolicy>always</updatePolicy>
                  <checksumPolicy>fail</checksumPolicy>
              </snapshots>
          </repository>
      </repositories>
      <dependencies>
          <!-- kafka客户端工具 -->
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>2.4.1</version>
          </dependency>
          <!-- 工具类 -->
          <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-io</artifactId>
              <version>1.3.2</version>
          </dependency>
          <!-- SLF桥接LOG4J日志 -->
          <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
              <version>1.7.6</version>
          </dependency>
          <!-- SLOG4J日志 -->
          <dependency>
              <groupId>log4j</groupId>
              <artifactId>log4j</artifactId>
              <version>1.2.16</version>
          </dependency>
      </dependencies>
      <build>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-compiler-plugin</artifactId>
                  <version>3.7.0</version>
                  <configuration>
                      <source>1.8</source>
                      <target>1.8</target>
                  </configuration>
              </plugin>
          </plugins>
      </build>

      image.gif

      导入log4j.properties:

      将log4j.properties配置文件放入到resources文件夹中

      log4j.rootLogger=INFO,stdout
      log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
      log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

      image.gif

      开发步骤:

      可以参考以下方式来编写第一个Kafka示例程序

      参考以下文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

      1. 创建用于连接Kafka的Properties配置
      Properties props = new Properties();
      props.put("bootstrap.servers", "192.168.88.100:9092");
      props.put("acks", "all");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      1. image.gif
      2. 创建一个生产者对象KafkaProducer
      3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
      4. 再调用一个Future.get()方法等待响应
      5. 关闭生产者

      参考代码:

      public class KafkaProducerTest {
          public static void main(String[] args) {
              // 1. 创建用于连接Kafka的Properties配置
              Properties props = new Properties();
              props.put("bootstrap.servers", "192.168.88.100:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              // 2. 创建一个生产者对象KafkaProducer
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
              // 3. 调用send发送1-100消息到指定Topic test
              for(int i = 0; i < 100; ++i) {
                  try {
                      // 获取返回值Future,该对象封装了返回值
                      Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                      // 调用一个Future.get()方法等待响应
                      future.get();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
              }
              // 5. 关闭生产者
              producer.close();
          }
      }

      image.gif

      4.2 从Kafka的topic中消费消息

      需求:

      从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

      开发步骤:

      1. 创建Kafka消费者配置
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
      props.setProperty("group.id", "test");
      props.setProperty("enable.auto.commit", "true");
      props.setProperty("auto.commit.interval.ms", "1000");
      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      image.gif

      1. 创建Kafka消费者
      2. 订阅要消费的主题
      3. 使用一个while循环,不断从Kafka的topic中拉取消息
      4. 将将记录(record)的offset、key、value都打印出来

      代码:

      • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。
      • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据
      • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取
      /**
       * 消费者程序
       *
       * 1.创建Kafka消费者配置
       * Properties props = new Properties();
       * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
       * props.setProperty("group.id", "test");
       * props.setProperty("enable.auto.commit", "true");
       * props.setProperty("auto.commit.interval.ms", "1000");
       * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       *
       * 2.创建Kafka消费者
       * 3.订阅要消费的主题
       * 4.使用一个while循环,不断从Kafka的topic中拉取消息
       * 5.将将记录(record)的offset、key、value都打印出来
       */
      public class KafkaConsumerTest {
          public static void main(String[] args) {
              // 1.创建Kafka消费者配置
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
              // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
              // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
              props.setProperty("group.id", "test");
              // 自动提交offset
              props.setProperty("enable.auto.commit", "true");
              // 自动提交offset的时间间隔
              props.setProperty("auto.commit.interval.ms", "1000");
              // 拉取的key、value数据的
              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              // 2.创建Kafka消费者
              KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
              // 3. 订阅要消费的主题
              // 指定消费者从哪个topic中拉取数据
              kafkaConsumer.subscribe(Arrays.asList("test"));
              // 4.使用一个while循环,不断从Kafka的topic中拉取消息
              while(true) {
                  // Kafka的消费者一次拉取一批的数据
                  ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
                  // 5.将将记录(record)的offset、key、value都打印出来
                  for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                      // 主题
                      String topic = consumerRecord.topic();
                      // offset:这条消息处于Kafka分区中的哪个位置
                      long offset = consumerRecord.offset();
                      // key\value
                      String key = consumerRecord.key();
                      String value = consumerRecord.value();
                      System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                  }
              }
          }
      }

      image.gif

      4.3 生产者使用异步方式生产消息

      如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

      需求:

      1. 在发送消息出现异常时,能够及时打印出异常信息
      2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset

      代码实现:

      • 使用匿名内部类实现Callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法
      • metadata:消息的元数据(属于哪个topic、属于哪个partition、对应的offset是什么)
      • exception:这个对象Kafka生产消息封装了出现的异常,如果为null,表示发送成功,如果不为null,表示出现异常。
      public class KafkaProducerTest {
          public static void main(String[] args) {
              // 1. 创建用于连接Kafka的Properties配置
              Properties props = new Properties();
              props.put("bootstrap.servers", "node1.itcast.cn:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              // 2. 创建一个生产者对象KafkaProducer
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
              // 3. 调用send发送1-100消息到指定Topic test
              for(int i = 0; i < 100; ++i) {
                  // 一、同步方式
                  // 获取返回值Future,该对象封装了返回值
                  // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                  // 调用一个Future.get()方法等待响应
                  // future.get();
                  // 二、带回调函数异步方式
                  producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
                      @Override
                      public void onCompletion(RecordMetadata metadata, Exception exception) {
                          if(exception != null) {
                              System.out.println("发送消息出现异常");
                          }
                          else {
                              String topic = metadata.topic();
                              int partition = metadata.partition();
                              long offset = metadata.offset();
                              System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
                          }
                      }
                  });
              }
              // 5. 关闭生产者
              producer.close();
          }
      }

      image.gif

      5.Kafka架构

      5.1 Kafka重要组件

      5.1.1 broker

      image.gif 编辑

      1. 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
      2. broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
      3. 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

      5.1.2 Zookeeper

      image.gif 编辑

      1. ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
      2. ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

      PS:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

      5.1.3 producer

      生产者负责将数据推送给broker的topic

      5.1.4 consumer

      消费者负责从broker的topic中拉取数据,并自己进行处理

      5.1.5 consumer group image.gif 编辑

      1. consumer group是kafka提供的可扩展且具有容错性的消费者机制
      2. 一个消费者组可以包含多个消费者
      3. 一个消费者组有一个唯一的ID(group Id)
      4. 组内的消费者一起消费主题的所有分区数据

      5.1.6 分区(Partitions)

      image.gif 编辑

      在Kafka集群中,主题被分为多个分区。

      5.1.7 副本(Replicas)

      image.gif 编辑

      1. 副本可以确保某个服务器出现故障时,确保数据依然可用
      2. 在Kafka中,一般都会设计副本的个数>1

      5.1.8 主题(Topic)

      image.gif 编辑

      1. 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
      2. Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
      3. 在主题中的消息是有结构的,一般一个主题包含某一类消息
      4. 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

      5.1.9 偏移量(offset)

      image.gif 编辑

      1. offset记录着下一条将要发送给Consumer的消息的序号
      2. 默认Kafka将offset存储在ZooKeeper中
      3. 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
      4. 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

      5.2 消费者组

      Kafka支持有多个消费者同时消费一个主题中的数据。我们接下来,给大家演示,启动两个消费者共同来消费 test 主题的数据。

      1. 首先,修改生产者程序,让生产者每3秒生产1-100个数字。
      // 3. 发送1-100数字到Kafka的test主题中
      while(true) {
          for (int i = 1; i <= 100; ++i) {
              // 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回
              // 这样可以让消息发送变得更高效
              producer.send(new ProducerRecord<>("test", i + ""));
          }
          Thread.sleep(3000);
      }

      image.gif

        2. 接下来,同时运行两个消费者。

        image.gif 编辑

        3.同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。

        # 设置 test topic为2个分区
        bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test

        image.gif

          重新运行生产者、两个消费者程序,我们就可以看到两个消费者都可以消费Kafka Topic的数据了

          6.Kafka生产者幂等性

          6.1 概念

          拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

          image.gif 编辑

          如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。 image.gif 编辑

          在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

          配置幂等性:

          props.put("enable.idempotence",true);

          image.gif

          6.2 幂等性原理

          为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

          1. PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
          2. Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。

          image.gif 编辑

          7.Kafka事务

          7.1 简介

          Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

          image.gif 编辑


          7.2 事务操作API

          Producer接口中定义了以下5个事务相关方法:

          1. initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
          2. beginTransaction(开始事务):启动一个Kafka事务
          3. sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
          4. commitTransaction(提交事务):提交事务
          5. abortTransaction(放弃事务):取消事务

          7.3 事务相关属性配置

          7.3.1 生产者:

          // 配置事务的id,开启了事务会默认开启幂等性
          props.put("transactional.id", "first-transactional");

          image.gif

          7.3.2 消费者:

          // 1. 消费者需要设置隔离级别
          props.put("isolation.level","read_committed");
          //  2. 关闭自动提交
           props.put("enable.auto.commit", "false");

          image.gif

          7.4 事务编程

          需求:

          在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

          姓名,性别,出生日期
          张三,1,1980-10-09
          李四,0,1985-11-01

          image.gif

          我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

          启动生产者控制台程序模拟数据:

          # 创建名为ods_user和dwd_user的主题
          bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
          bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
          # 生产数据到 ods_user
          bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
          # 从dwd_user消费数据
          bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user --from-beginning  --isolation-level read_committed

          image.gif

          编写创建消费者代码:

          编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

          实现步骤:

          1. 创建Kafka消费者配置
          Properties props = new Properties();
           props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
           props.setProperty("group.id", "ods_user");
           props.put("isolation.level","read_committed");
           props.setProperty("enable.auto.commit", "false");
           props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          1. image.gif
          2. 创建消费者,并订阅 ods_user 主题

          编写创建生产者代码:

          编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

          1. 创建生产者配置
          Properties props = new Properties();
          props.put("bootstrap.servers", "node1.itcast.cn:9092");
          props.put("transactional.id", "dwd_user");
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          1. image.gif

          编写代码消费并生产数据:

          实现步骤:

          1. 调用之前实现的方法,创建消费者、生产者对象
          2. 生产者调用initTransactions初始化事务
          3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
          1. 生产者开启事务
          2. 消费者拉取消息
          3. 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
          4. 生产消息到dwd_user topic中
          5. 提交偏移量到事务中
          6. 提交事务
          7. 捕获异常,如果出现异常,则取消事务
          public static void main(String[] args) {
                  Consumer<String, String> consumer = createConsumer();
                  Producer<String, String> producer = createProducer();
                  // 初始化事务
                  producer.initTransactions();
                  while(true) {
                      try {
                          // 1. 开启事务
                          producer.beginTransaction();
                          // 2. 定义Map结构,用于保存分区对应的offset
                          Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
                          // 2. 拉取消息
                          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
                          for (ConsumerRecord<String, String> record : records) {
                              // 3. 保存偏移量
                              offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                                      new OffsetAndMetadata(record.offset() + 1));
                              // 4. 进行转换处理
                              String[] fields = record.value().split(",");
                              fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
                              String message = fields[0] + "," + fields[1] + "," + fields[2];
                              // 5. 生产消息到dwd_user
                              producer.send(new ProducerRecord<>("dwd_user", message));
                          }
                          // 6. 提交偏移量到事务
                          producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
                          // 7. 提交事务
                          producer.commitTransaction();
                      } catch (Exception e) {
                          // 8. 放弃事务
                          producer.abortTransaction();
                      }
                  }
              }

          image.gif

          往之前启动的console-producer中写入消息进行测试,同时检查console-consumer是否能够接收到消息: image.gif 编辑

          逐个测试一下消息:

          张三,1,1980-10-09
          李四,0,1985-11-01

          image.gif

          模拟异常事务:

          // 3. 保存偏移量
          offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                  new OffsetAndMetadata(record.offset() + 1));
          // 4. 进行转换处理
          String[] fields = record.value().split(",");
          fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
          String message = fields[0] + "," + fields[1] + "," + fields[2];
          // 模拟异常
          int i = 1/0;
          // 5. 生产消息到dwd_user
          producer.send(new ProducerRecord<>("dwd_user", message));

          image.gif

          启动程序一次,抛出异常。

          再启动程序一次,还是抛出异常。

          直到我们处理该异常为止。

          我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。

            相关文章
            |
            21天前
            |
            安全 NoSQL Java
            基于JWT+SpringSecurity整合一个单点认证授权机制
            本文介绍了基于JWT和SpringSecurity的授权认证机制架构设计。系统采用RBAC权限模型,通过5张表描述用户-角色-权限关系。认证流程包含登录验证、IP检查、密码匹配等环节,使用JWT生成token并保存用户信息到Redis。授权部分利用@PreAuthorize注解和PermissionService实现权限校验,支持单权限、多权限及角色验证。整体架构通过过滤器链实现无状态认证,兼顾安全性和灵活性,为开发者提供了完整的认证授权解决方案。
            |
            21天前
            |
            消息中间件 JavaScript 前端开发
            详解事件循环与浏览器渲染机制
            摘要:浏览器采用多进程架构,渲染主线程通过事件循环机制处理HTML解析、样式计算、布局等任务。异步机制避免主线程阻塞,任务按优先级在微队列、交互队列等不同队列中调度。JS执行会阻碍渲染,因其与渲染任务共享主线程。渲染流程包含解析、样式计算、布局、分层等阶段,最终由合成线程和GPU完成绘制。transform效率高因其仅影响合成阶段,不涉及主线程。reflow是布局重计算,repaint是绘制指令更新,两者均影响性能。
            |
            21天前
            |
            Linux API 云计算
            零基础保姆级|阿里云计算巢+MacOS/Linux/Windows11部署OpenClaw 技能集成+大模型配置全流程
            2026年,AI自动化框架OpenClaw(原Clawdbot)凭借云端+本地双部署、多模型兼容与Skills插件化扩展能力,成为个人与团队实现复杂任务自动化的核心工具。阿里云计算巢提供OpenClaw官方一键部署方案,无需手动配置环境,5分钟即可完成云端部署;本地则支持MacOS、Linux、Windows11全系统部署,搭配阿里云千问、免费Coding Plan大模型API,再通过Skills扩展能力,可实现从信息查询、文件处理到流程自动化的全场景能力。
            968 15
            |
            21天前
            |
            人工智能 Linux API
            OpenClaw 阿里云+本地多系统部署全攻略,大模型配置+planning-with-files技能实战解析
            在基于OpenClaw(Clawdbot,开发者昵称“龙虾”)进行AI协作的过程中,开发者常面临这样的问题:面对复杂任务时,与AI的多轮对话极易丢失上下文,最终输出结果偏离预期,甚至出现逻辑混乱的情况。2026年,planning-with-files技能的出现彻底解决了这一痛点,该技能通过文件化的规划方式,将复杂任务的执行过程沉淀为标准化文档,让AI协作具备可追溯、可恢复、可管控的特性。本文将完整梳理2026年OpenClaw在阿里云及本地MacOS、Linux、Windows11系统的部署流程,详解阿里云千问大模型与免费Coding Plan API的配置方法,深度解析planning-w
            649 3
            |
            21天前
            |
            人工智能 自然语言处理 Java
            大模型应用开发5-SpringAIalibaba实战
            本文介绍了SpringAIAlibaba开源项目,该项目基于SpringAI构建,为阿里云通义系列模型提供Java开发实践。主要内容包括: 基础使用:配置模型API、依赖引入、调用示例,支持同步和流式调用; 多种集成方式:对接本地Ollama模型、ChatClient高级API、SSE流式输出; 核心功能实现:提示词模板、结构化输出、持久化内存、文本生成图片/语音; 高级能力:向量数据库、RAG增强检索、工具调用(Tool Calling); MCP协议:标准化工具调用方案,实现服务端工具共享;
            |
            21天前
            |
            存储 监控 前端开发
            大文件上传下载处理方案-断点续传,秒传,分片,合并
            本文介绍了大文件上传下载的断点续传技术方案。上传方面,通过前端将大文件分块(如5MB/块),后端使用MinIO存储分块并合并,实现断点续传和秒传功能。下载方面,采用Range请求分片下载,前端合并分片触发下载。技术要点包括:1)前端分块计算MD5;2)后端MinIO存储管理;3)分片校验与合并;4)进度监控和异常处理。该方案解决了大文件传输中断问题,提升用户体验,适用于视频等大文件传输场景,完整代码示例包含前后端实现。
            |
            21天前
            |
            机器学习/深度学习 存储 人工智能
            大模型应用开发1-认识大模型
            摘要: 本文系统介绍了大模型的基础概念、本地部署及API调用方法。首先阐述了AI及神经网络的基本原理,重点解析了Transformer架构及其在大语言模型(LLM)中的应用。其次详细对比了三种模型使用方案(开放API/云部署/本地部署)的优缺点,并以Ollama为例演示了本地部署流程,包括模型管理、交互指令和GPU加速配置。最后说明了大模型API调用规范,列举了主流大模型产品及其应用场景,强调大模型在自然语言处理、内容生成等领域的优势,以及与传统编程结合开发智能应用的可能性。全文涵盖技术原理到实践操作,为大
            |
            21天前
            |
            消息中间件 网络协议 Java
            深入剖析Java通信架构下的三种IO模式1
            本文介绍了Java网络编程中的三种I/O模型(BIO、NIO、AIO)及其实际应用。BIO采用同步阻塞模式,每个连接对应一个线程,适用于连接数较少场景;NIO通过多路复用实现非阻塞,适合高并发短连接;AIO基于操作系统异步I/O,适用于长连接重操作。文章详细讲解了BIO模式的实现原理,包括基本通信、多客户端处理、伪异步优化等,并通过一个即时通讯项目案例(支持登录、群聊、私聊等功能)展示了BIO的实际应用。随着JDK版本迭代,NIO和AIO提供了更高性能的网络通信方案。
            |
            21天前
            |
            存储 人工智能 NoSQL
            大模型应用开发3-LangChain4j实战
            本文介绍了LangChain4j框架的使用方法,主要包括以下内容:1. 基础配置:创建SpringBoot项目并配置OpenAI聊天模型;2. AIServices工具类:简化模型调用,支持流式和阻塞式两种调用方式;3. 会话记忆功能:实现多轮对话记忆,支持会话隔离和Redis持久化存储;4. RAG检索增强:通过向量数据库存储和检索专业领域知识,提升大模型回答质量;5. Tools工具:通过Function Calling机制实现业务功能调用。文章详细讲解了每个功能的实现步骤,包括代码示例和配置方法,帮助
            |
            21天前
            |
            Java 调度 Spring
            基于自定义线程池手写一个异步任务管理器
            我们在后端执行某些耗时逻辑操作时往往会导致长时间的线程阻塞,在这种情况之下,我们往往会引一条异步线程去处理这些异步任务,如果每次都创建新的线程来处理这些任务,不仅会增加代码冗余,还可能造成线程管理混乱,影响系统性能。在我们的Spring框架中是自带异步任务处理机制的,比如我们使用@Async 注解可以处理一些简单的异步任务,但这样确实无法精确去控制线程池资源,也无法灵活去管理任务调度,由此,我们可以去自行设计一个高效的自定义异步任务管理器去统一调度处理我们的自定义任务。
            下一篇
            开通oss服务