消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式

简介: 消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式

引言

代码已提交至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringBoot-ActiveMQ-Demo

在上一篇博客《ActiveMQ -安装&入门案例》中,里面的入门案例其实是ActiveMQ点对点的模式了,本文主要来讲解一下Active的发布订阅模式。

1.生产者

1.添加mave依赖:

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.0.1.RELEASE</version>
</parent>
<!-- 管理依赖 -->
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>Finchley.M7</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <!-- SpringBoot整合Web组件 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <!-- SpringBoot Activemq -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>
</dependencies>
<!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
<repositories>
  <repository>
    <id>spring-milestones</id>
    <name>Spring Milestones</name>
    <url>https://repo.spring.io/libs-milestone</url>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

2.配置application.yml文件:

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://192.168.162.131:61616
    user: admin
    password: admin
  kafka:
    template:
      default-topic:
queue: springboot-queue
topic: spring-topic

3.主题生产者TopicProducer:

@Component
public class TopicProducer {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Topic topic;
    @Scheduled(fixedDelay = 5000)
    public void send() {
        String msg = System.currentTimeMillis() + "";
        System.out.println("采用发布订阅方式,生产者向消费者发送内容:" + msg);
        jmsMessagingTemplate.convertAndSend(topic, msg);
    }
}

4.主题配置:

import javax.jms.Topic;
@Component
public class TopicConfig {
    @Value("${topic}")
    private String topicName;
    @Bean
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }
}

5.启动类:

@SpringBootApplication
@EnableScheduling
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

2.消费者

1.添加maven依赖:

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.0.1.RELEASE</version>
</parent>
<!-- 管理依赖 -->
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>Finchley.M7</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <!-- SpringBoot整合Web组件 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <!-- SpringBoot Activemq -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>
  <!--fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.49</version>
  </dependency>
</dependencies>
<!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
<repositories>
  <repository>
    <id>spring-milestones</id>
    <name>Spring Milestones</name>
    <url>https://repo.spring.io/libs-milestone</url>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

2.配置文件:

server:
  port: 8081
spring:
  activemq:
    broker-url: tcp://192.168.162.131:61616
    user: admin
    password: admin
  #### 开启发布订阅
  jms:
    pub-sub-domain: true 
topic: spring-topic    
queue: springboot-queue

3.主题消费者:

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
    @JmsListener(destination = "spring-topic")
    public void receive(String msg) {
        System.out.println("发布与订阅消费者接受,生产者内容:" + msg);
    }
}

3.测试

step1.启动ActiveMQ服务:

/usr/local/apache-activemq-5.15.10/bin/activemq start

step2.浏览器访问:http://192.168.162.131:8161/admin(账号:admin、密码:admin),可以看到ActiveMQ服务开启成功。

step3.启动生产者和消费者项目,在生产者控制台可以看到:

消费者控制台可以看到:

目录
相关文章
|
2月前
|
消息中间件 网络协议 Java
消息中间件-ActiveMQ
消息中间件-ActiveMQ
|
5月前
|
消息中间件 存储 监控
Java一分钟之-ActiveMQ:消息中间件
【6月更文挑战第11天】Apache ActiveMQ是广泛使用的开源消息中间件,支持JMS和多种消息协议。本文介绍了ActiveMQ的基础知识,包括消息队列和主题模型,以及持久化和高可用性配置。同时,提出了三个常见问题:配置不当、消息堆积和网络错误,并给出了相应的解决策略。通过Java示例代码展示了如何使用ActiveMQ发送和接收消息。正确配置、管理消息处理和持续监控是确保ActiveMQ高效运行的关键。
147 2
|
4月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
消息中间件 Java Maven
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
71 1
|
消息中间件 存储 缓存
消息中间件系列教程(06) -RabbitMQ -五种队列形式
消息中间件系列教程(06) -RabbitMQ -五种队列形式
98 1
|
消息中间件 存储 分布式计算
消息中间件系列教程(19) -Kafka-简介
消息中间件系列教程(19) -Kafka-简介
115 0
|
消息中间件 数据库
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
82 0
|
消息中间件 Java Maven
消息中间件系列教程(12) -RabbitMQ-消息确认机制
消息中间件系列教程(12) -RabbitMQ-消息确认机制
79 0
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
63 0
|
消息中间件 Java Maven
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
60 0