Springboot集成高低版本kafka

简介: Springboot集成高低版本kafka

1、版本选择

springboot整合kafka的时候一定要根据自己springboot版本选择对应版本的kafka,两者版本对应关系可以直接查看官网

https://spring.io/projects/spring-kafka#overview

2、低版本springboot整合kafka

这里我SpringBoot版本是1.4.2.RELEASE版本,版本很低,官网显示的SpringBoot版本最低是1.5.x,可以使用1.3.x的版本,很明显我的这个不在官网给的范围内,然后我的spring版本是4.3.9.RELEASE,这里我在上面这个maven仓库spring-kafka地址里面看了一个1.3.0版本,如下:

直到我往下继续找,终于发现1.2.2.RELEASE这个版本是与我项目对应的。

刚好这个版本对应的spring版本是4.3.9.RELEASE与我项目的spring版本一致,于是我就使用了这个spring-kafka版本

这里之所以是在Java类里面写生产者和消费者配置,是因为springboot和kafka集成版本太低,不支持直接在application.yml里面配置,好像springboot高版本至少2.几的版本可以直接在application.yml里面配置,至于2.几的版本才支持我给忘记了

kafka生产者配置

这里是带用户名密码协议配置,最下面三个就是,协议类型为:SASL/SCRAM-SHA-256,如果你们那里的kafka配置没有设置这个,可以不需要配置最下面三个。企业开发一般需要进行认证才能发送消息。

@Configuration
@EnableKafka
public class KafkaProductConfig {
 
    //指定kafka 代理地址,多个地址用英文逗号隔开
    private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地测试kafka使用
    //消息重发次数,如果配置了事务,则不能为0,改为1
    private int retries=0;
    //每次批量发送消息的数量
    private String batchSize="16384";
    //默认值为0,意思就是说消息必须立即被发送,但这样会影响性能
    //一般设置10毫秒左右,这个消息发送完后会进入本地的一个batch,如果10毫秒内这个batch满了16kb就会随batch一起发送出去
    private String lingerMs="10";
    //生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接收新消息
    private String bufferMemory="33554432";
    //指定消息key和消息体的编解码方式
    private String keySerializer="org.apache.kafka.common.serialization.StringSerializer";
    private String valueSerializer="org.apache.kafka.common.serialization.StringSerializer";
    //确认等级ack,kafka生产端最重要的选项,如果配置了事务,那必须是-1或者all
    //acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应
    //acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
    //acks=-1,表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为product请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
    private String acks="1";
 
 
    //协议类型,为SASL类型
    private String securityProtocol="SASL_PLAINTEXT";
    //协议
    private String saslMechanism="SCRAM-SHA-256";
    //用户名密码配置
    private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;";
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG,retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
 
        //如果kafka配置文件没有设置用户名密码协议,注释掉(ps:有些企业会使用jks加密文件通讯,那kafka 配置还的有其他配置 可参考kakfa 专栏spring整合kakfa)
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaas);
 
       
        return new DefaultKafkaProducerFactory<>(props);
    }
 
 
    @Bean
    public KafkaTemplate<String, String> kafkaTestTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

kafka消费者配置

如果kafka配置文件没有配置用户名密码协议,认证后才能消费消息,可以将最下面的三个注释掉不使用。

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.util.ClassUtils;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaConsumerConfig  {
    //指定kafka 代理地址,多个地址用英文逗号隔开
    private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地测试kafka使用
    //指定默认消费者group id,消费者监听到的也是这个
    private String groupId="test-consumer-group";//本地测试使用
    //消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读
    private String autoOffsetReset="earliest";
    //是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效
    private boolean  enableAutoCommit=true;
    //自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)
    private String autoCommitInterval="1000";
    //指定消息key和消息体的编解码方式
    private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer";
    private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer";
    //批量消费每次最多消费多少条信息
    private String maxPollRecords="50";
    //协议类型,为SASL类型
    private String securityProtocol="SASL_PLAINTEXT";
    //协议
    private String saslMechanism="SCRAM-SHA-256";
    //用户名密码配置
    private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;";
 
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错
        return factory;
    }
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
 
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas);
 
        return new DefaultKafkaConsumerFactory<>(props);
    }
}
 

发送消息给kafka的Controller代码

这里使用addCallback这个方法,是可以在生产者发送消息给kafka时,如果生产者配置有问题或者服务有问题,我可以直接看到接口返回结果,所以没有直接这样kafkaTemplate.send(“first”,data);写。

package com.gmcc.project.controllers.kafka;
 
import com.gmcc.project.core.utils.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
 
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
 
//kafka生产者
@RestController
@RequestMapping("kafkaProducer")
public class KafkaProducerController {
 
    @Resource
    private KafkaTemplate<String,String> kafkaTestTemplate;
 
    //向kafka发送消息
    @RequestMapping(value = "/sendFileMd5", method = RequestMethod.POST)
    public Map<String, Object> sendFileMd5(@RequestParam(value = "fileMd5", required = false) String fileMd5,
                                           @RequestParam(value = "uuid", required = false) String uuid){
        Map<String, Object> returnMap = new HashMap<>();
        //写在success里面只会返回一次,第二次就给你返回一个空map对象
        returnMap.put("message", "发送消息成功!");
        returnMap.put("result", null);
        returnMap.put("status", "200");
        //非空判断
        if(StringUtils.isBlank(fileMd5)) {
            returnMap.put("message", "fileMd5不能为空!");
            returnMap.put("result", "");
            returnMap.put("status", "999");
            return returnMap;
        }
        if(StringUtils.isBlank(uuid)) {
            returnMap.put("message", "uuid不能为空!");
            returnMap.put("result", "");
            returnMap.put("status", "999");
            return returnMap;
        }
        try{
            //需要发送的消息
            String data="{\"file_md5\":\""+fileMd5+"\",\"uuid\":\""+uuid+"\",\"vendor\":\"etone\",\"model\":\"5g信令回放\"}";
            //pro环境使用topic为test_sample_get
            //本地测试使用,向topic为first发送消息
            kafkaTestTemplate.send("first",data).addCallback(success -> {
                // 消息发送到的topic
                String topic = success.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = success.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = success.getRecordMetadata().offset();
                System.out.println("发送消息成功:"+data+",主题:"+topic+",分区:"+partition+",偏移量:"+offset);
            }, failure -> {
                returnMap.put("message", "发送消息失败:" + failure.getMessage());
                returnMap.put("result", null);
                returnMap.put("status", "500");
            });
        }catch (Exception e){
            returnMap.put("message", e.getMessage());
            returnMap.put("result", null);
            returnMap.put("status", "500");
        }
        return returnMap;
    }
}
 

消费者消费代码

package com.gmcc.project.controllers.kafka;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsumer {
 
    //逐条消费
    @KafkaListener(topics = "first")
    //@KafkaListener(topics = "test_sample_return")
    public void onMessage(ConsumerRecord<?,?> record){
 
        try{
            //消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
 
    //批量消费方法
    /*@KafkaListener(topics = "first")
    public void onMessage(List<ConsumerRecord<?,?>> records){
        System.out.println("消费数量="+records.size());
        for(ConsumerRecord<?,?> record:records){
            //消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }*/
}
 

3、高版本springboot整合kafka

这里我的SpringBoot版本是2.6.2版本,spring-kafka版本是2.8.1版本。符合官网给的版本推荐

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml

这里之所以可以在application.yml直接配置kafka,是因为springboot和spring-kafka版本很高。这里生产者配置和消费者配置都在里面

server:
  port: 8080
 
spring:
  kafka:
    # 指定kafka 代理地址,多个地址用英文逗号隔开
    bootstrap-servers: 192.168.11.111:9092
    #初始化生产者配置
    producer:
      #消息重发次数,如果配置了事务,则不能为0,改为1
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      #生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接收新消息
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #确认等级ack,kafka生产端最重要的选项,如果配置了事务,那必须是-1或者all
      #acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应
      #acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
      #acks=-1,表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为product请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
      acks: all
      #配置事务,名字随便起
      #transaction-id-prefix: hbz-transaction-
 
    #初始化消费者配置
    consumer:
      # 指定默认消费者group id,消费者监听到的也是这个
      group-id: test-consumer-group
      #消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读
      auto-offset-reset: earliest
      #是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效
      enable-auto-commit: true
      #自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)
      auto-commit-interval: 1000
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      #批量消费每次最多消费多少条信息
      max-poll-records: 50
 
    #监听器设置
    listener:
      #消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false
      #设置消费类型 批量消费batch,单条消费single
      type: batch
      #指定容器的线程数,提高并发量,默认为1
      #concurrency: 3
      #手动提交偏移量,当enable-auto-commit为true自动提交时,不需要设置改属性
      #ack-mode: manual
 

其他配置参考

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

生成者

 
@RestController
public class KafkaProducer {
    String topic = "first";
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send(topic , normalMessage);
    }
}

消费者

package com.project.kafkademo.kafkaconsumer;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import java.util.List;
 
@Component
public class KafkaConsumer {
 
    //消费监听,topics=监听的主题名,groupId=分组,consumer.properties里面的group.id配置
    //如果在这里直接写groupId="test-consumer-group"会导致application.yml里面设置的group-id不起效
    //最终会被这里的设置直接覆盖掉,所以这里不应该加groupId="test-consumer-group"这个属性
    //@KafkaListener(topics = "first",groupId="test-consumer-group")
    //这样写的话,application.yml里面设置的group-id就会生效,监控的就是application.yml里面的了
    //逐条消费
    /*@KafkaListener(topics = "first")
    public void onMessage(ConsumerRecord<?,?> record){
        //消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }*/
 
    //批量消费,用List批量接收消息,ConsumerRecord<?,?>只能单条消费消息
    /*@KafkaListener(topics = "first")
    public void onMessage(List<ConsumerRecord<?,?>> records){
        System.out.println("消费数量="+records.size());
        for(ConsumerRecord<?,?> record:records){
            //消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        }
    }*/
 
    //批量消费,ConsumerRecords<?,?>用于批量消费消息
    @KafkaListener(topics = "first")
    public void onMessage(ConsumerRecords<?,?> records){
        System.out.println("消费数量="+records.count());
        for(ConsumerRecord<?,?> record:records){
            //消费的哪个topic、partition(哪个分区)的消息,打印出消息内容
            System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.key()+"-"+record.value());
        }
    }
}
 


相关文章
|
4月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
153 0
|
4月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
153 0
|
1月前
|
前端开发
SpringBoot2.3.1集成Knife4j接口文档
SpringBoot2.3.1集成Knife4j接口文档
140 44
|
1月前
|
缓存 安全 Java
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
Shiro简介及SpringBoot集成Shiro(狂神说视频简易版)
96 6
|
2月前
|
缓存 Java 数据库
SpringBoot集成Ehcache缓存使用指南
以上是SpringBoot集成Ehcache缓存的基本操作指南,帮助你在实际项目中轻松实现缓存功能。当然,Ehcache还有诸多高级特性,通过学习和实践,你可以更好地发挥它的威力。
130 20
|
3月前
|
Java 开发工具 Spring
【Azure Application Insights】为Spring Boot应用集成Application Insight SDK
本文以Java Spring Boot项目为例,详细说明如何集成Azure Application Insights SDK以收集和展示日志。内容包括三步配置:1) 在`pom.xml`中添加依赖项`applicationinsights-runtime-attach`和`applicationinsights-core`;2) 在main函数中调用`ApplicationInsights.attach()`;3) 配置`applicationinsights.json`文件。同时提供问题排查建议及自定义日志方法示例,帮助用户顺利集成并使用Application Insights服务。
|
3月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
182 7
|
4月前
|
消息中间件 存储 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
本教程介绍ActiveMQ的安装与基本使用。首先从官网下载apache-activemq-5.15.3版本,解压后即可完成安装,非常便捷。启动时进入解压目录下的bin文件夹,根据系统选择win32或win64,运行activemq.bat启动服务。通过浏览器访问`http://127.0.0.1:8161/admin/`可进入管理界面,默认用户名密码为admin/admin。ActiveMQ支持两种消息模式:点对点(Queue)和发布/订阅(Topic)。前者确保每条消息仅被一个消费者消费,后者允许多个消费者同时接收相同消息。
109 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
|
4月前
|
消息中间件 Java 微服务
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——发布/订阅消息的生产和消费
本文详细讲解了Spring Boot中ActiveMQ的发布/订阅消息机制,包括消息生产和消费的具体实现方式。生产端通过`sendMessage`方法发送订阅消息,消费端则需配置`application.yml`或自定义工厂以支持topic消息监听。为解决点对点与发布/订阅消息兼容问题,可通过设置`containerFactory`实现两者共存。最后,文章还提供了测试方法及总结,帮助读者掌握ActiveMQ在异步消息处理中的应用。
139 0
|
4月前
|
消息中间件 网络协议 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ集成
本文介绍了在 Spring Boot 中集成 ActiveMQ 的详细步骤。首先通过引入 `spring-boot-starter-activemq` 依赖并配置 `application.yml` 文件实现基本设置。接着,创建 Queue 和 Topic 消息类型,分别使用 `ActiveMQQueue` 和 `ActiveMQTopic` 类完成配置。随后,利用 `JmsMessagingTemplate` 实现消息发送功能,并通过 Controller 和监听器实现点对点消息的生产和消费。最后,通过浏览器访问测试接口验证消息传递的成功性。
124 0