Apache Kafka-通过设置Consumer Group实现广播模式

简介: Apache Kafka-通过设置Consumer Group实现广播模式

20191116123525638.png

概述


传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)

  • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer
  • publish-subscribe模式:消息会被广播给所有的consumer

Kafka基于这2种模式提供了一种consumer的抽象概念: consumer group

  • queue模式:所有的consumer都位于同一个consumer group 下。
  • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group


20210218124811789.png



说明: 由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, A有2个consumer instances ,B有4个。


通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者( logicalsubscriber )。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。


广播模式的应用 ----> 应用里缓存了数据字典等配置表在内存中,可以通过 Kafka 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。


Code


20210218135947378.png


POM依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 引入 Spring-Kafka 依赖 -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>


配置文件

 spring:
  # Kafka 配置项,对应 KafkaProperties 配置类
  kafka:
    bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    # Kafka Producer 配置项
    producer:
      acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
      retries: 3 # 发送失败时,重试发送的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
    # Kafka Consumer 配置项
    consumer:
      auto-offset-reset: latest # 在广播订阅下,一般情况下,无需消费历史的消息,而是从订阅的 Topic 的队列的尾部开始消费即可
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.artisan.springkafka.domain
    # Kafka Consumer Listener 监听器配置
    listener:
      missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
logging:
  level:
    org:
      springframework:
        kafka: ERROR # spring-kafka
      apache:
        kafka: ERROR # kafka


auto-offset-reset: latest 广播模式,一般情况下,无需消费历史的消息,从订阅的 Topic 的队列的尾部开始消费即可


生产者

 package com.artisan.springkafka.producer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanProducerMock {
    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;
    /**
     * 异步发送
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {
        // 模拟发送的消息
        Integer id = new Random().nextInt(100);
        MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
        // 异步发送消息
        ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);
        return result ;
    }
}



消费者

 package com.artisan.springkafka.consumer;
import com.artisan.springkafka.domain.MessageMock;
import com.artisan.springkafka.constants.TOPIC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanCosumerMockDiffConsumeGroup {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "MOCK-B" ;
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC +  "-" + "#{T(java.util.UUID).randomUUID()})")
    public void onMessage(MessageMock messageMock){
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }
}


注意: groupId 通过 Spring EL 表达式,在每个消费者分组的名字上配合 UUID 生成其后缀。这样,就能保证每个项目启动的消费者分组不同,从而达到广播消费的目的。


单元测试

 package com.artisan.springkafka.produceTest;
import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private ArtisanProducerMock artisanProducerMock;
    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
        logger.info("开始发送 测试广播模式");
        artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info(" 发送异常{}]]", throwable);
            }
            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                logger.info("回调结果 Result =  topic:[{}] , partition:[{}], offset:[{}]",
                        objectObjectSendResult.getRecordMetadata().topic(),
                        objectObjectSendResult.getRecordMetadata().partition(),
                        objectObjectSendResult.getRecordMetadata().offset());
            }
        });
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
}


启动多次单元测试, 观察消息的接受情况


测速结果


20210218140216389.png


可以看到不消费组下的 消费者(目前是一个消费组下一个消费者) 均收到了 这条消息,这就是广播模式


源码地址


https://github.com/yangshangwei/boot2/tree/master/springkafkaBroadCast

相关文章
|
3月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
178 7
|
3月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
137 5
|
3月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
109 5
|
3月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
85 1
|
3月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
3月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
73 0
|
20天前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
4月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
184 1
|
4月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
83 1

热门文章

最新文章

推荐镜像

更多