SSM(十七) MQ应用(中)

简介: 写这篇文章的起因是由于之前的一篇关于Kafka异常消费,当时为了解决问题不得不使用临时的方案。 总结起来归根结底还是对Kafka不熟悉导致的,加上平时工作的需要,之后就花些时间看了Kafka相关的资料。

以及生产者的配置文件:


#集群地址,可以多个
bootstrap.servers=10.19.13.51:9094
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true


具体的配置说明详见此处:kafka.apache.org/0100/docume…


流程非常简单,其实就是一些API的调用。


消息发完之后可以通过以下命令查看队列内的情况:


sh kafka-consumer-groups.sh --bootstrap-server localhost:9094 --describe --group group1


其中的lag便是队列里的消息数量。


Kafka消费者


有了生产者自然也少不了消费者,这里首先针对单线程消费:


/**
 * Function:kafka官方消费
 *
 * @author crossoverJie
 *         Date: 2017/10/19 01:11
 * @since JDK 1.8
 */
public class KafkaOfficialConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOfficialConsumer.class);
    /**
     * 日志文件地址
     */
    private static String logPath;
    /**
     * 主题名称
     */
    private static String topic;
    /**
     * 消费配置文件
     */
    private static String consumerProPath ;
    /**
     * 初始化参数校验
     * @return
     */
    private static boolean initCheck() {
        topic = System.getProperty("topic") ;
        logPath = System.getProperty("log_path") ;
        consumerProPath = System.getProperty("consumer_pro_path") ;
        if (StringUtil.isEmpty(topic) || logPath.isEmpty()) {
            LOGGER.error("system property topic ,consumer_pro_path, log_path is required !");
            return true;
        }
        return false;
    }
    /**
     * 初始化kafka配置
     * @return
     */
    private static KafkaConsumer<String, String> initKafkaConsumer() {
        KafkaConsumer<String, String> consumer = null;
        try {
            FileInputStream inputStream = new FileInputStream(new File(consumerProPath)) ;
            Properties properties = new Properties();
            properties.load(inputStream);
            consumer = new KafkaConsumer<String, String>(properties);
            consumer.subscribe(Arrays.asList(topic));
        } catch (IOException e) {
            LOGGER.error("加载consumer.props文件出错", e);
        }
        return consumer;
    }
    public static void main(String[] args) {
        if (initCheck()){
            return;
        }
        int totalCount = 0 ;
        long totalMin = 0L ;
        int count = 0;
        KafkaConsumer<String, String> consumer = initKafkaConsumer();
        long startTime = System.currentTimeMillis() ;
        //消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() <= 0){
                continue ;
            }
            LOGGER.debug("本次获取:"+records.count());
            count += records.count() ;
            long endTime = System.currentTimeMillis() ;
            LOGGER.debug("count=" +count) ;
            if (count >= 10000 ){
                totalCount += count ;
                LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
                totalMin += (endTime-startTime) ;
                startTime = System.currentTimeMillis() ;
                count = 0 ;
            }
            LOGGER.debug("end totalCount={},min={}",totalCount,totalMin);
            /*for (ConsumerRecord<String, String> record : records) {
                record.value() ;
                JsonNode msg = null;
                try {
                    msg = mapper.readTree(record.value());
                } catch (IOException e) {
                    LOGGER.error("消费消息出错", e);
                }
                LOGGER.info("kafka receive = "+msg.toString());
            }*/
        }
    }
}


配合以下启动参数:


-Dlog_path=/log/consumer.log -Dtopic=test -Dconsumer_pro_path=consumer.properties


其中采用了轮询的方式获取消息,并且记录了消费过程中的数据。


消费者采用的配置:


bootstrap.servers=192.168.1.2:9094
group.id=group1
# 自动提交
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.  No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152


为了简便我采用的是自动提交offset


消息存放机制


谈到offset就必须得谈谈Kafka的消息存放机制.


Kafka的消息不会因为消费了就会立即删除,所有的消息都会持久化到日志文件,并配置有过期时间,到了时间会自动删除过期数据,并且不会管其中的数据是否被消费过。

由于这样的机制就必须的有一个标志来表明哪些数据已经被消费过了,offset(偏移量)就是这样的作用,它类似于指针指向某个数据,当消费之后offset就会线性的向前移动,这样一来的话消息是可以被任意消费的,只要我们修改offset的值即可。


消费过程中还有一个值得注意的是:


同一个consumer group(group.id相等)下只能有一个消费者可以消费,这个刚开始确实会让很多人踩坑。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
4月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
121 1
|
4月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
4月前
|
JavaScript Java 测试技术
基于ssm+vue.js+uniapp小程序的代驾应用系统附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的代驾应用系统附带文章和源代码部署视频讲解等
194 21
|
4月前
|
消息中间件 监控 数据安全/隐私保护
RabbitMQ 技术详解与应用指南
**RabbitMQ** 是一个开源消息代理,基于 AMQP 实现,用于应用程序间轻量、可靠的消息传递。本文档详细介绍了 RabbitMQ 的基础,包括**消息、队列、交换机、绑定、路由键和消费者**等概念,以及其**高可靠性、高性能、灵活性、可扩展性和易用性**等特性。RabbitMQ 使用生产者-消费者模型,消息通过交换机路由到队列,消费者接收并处理。文中还涵盖了安装配置的基本步骤和常见应用场景,如**异步处理、消息推送、系统解耦、流量削峰和日志收集**。
323 2
|
4月前
|
JavaScript Java 测试技术
基于ssm+vue.js+uniapp小程序的服装品牌的推广及应用网站附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的服装品牌的推广及应用网站附带文章和源代码部署视频讲解等
46 4
|
4月前
|
Java 测试技术 数据安全/隐私保护
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
33 0
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
|
5月前
|
JavaScript Java 测试技术
基于ssm+vue.js的绿色农产品推广应用网站附带文章和源代码设计说明文档ppt
基于ssm+vue.js的绿色农产品推广应用网站附带文章和源代码设计说明文档ppt
50 4
|
4月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用