ActiveMQ --- 整合篇

简介: 之前说到了activeMQ的一些基本用法,本文将介绍activeMQ如何与spring以及spring boot整合。

一、activeMQ与spring整合


1、需要的依赖:

<dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-databind</artifactId>
     <version>2.9.9.3</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.9</version>
</dependency>


2、applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 1、开启自动扫描 -->
    <context:component-scan base-package="com.zhu.study"/>
    <!-- 2、配置连接 -->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.x.xx:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>
    <!-- 3、配置目的地,队列 -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <!-- 配置目的地,主题 -->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>
    <!-- 4、配置spring提供的jms模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <!--<property name="defaultDestination" ref="destinationTopic"/>-->
        <property name="messageConverter">
            <!-- 做消息类型转换的 -->
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>


配置很简单,就是四步:


  • 开启注解扫描;
  • 配置连接;
  • 配置目的地;
  • 配置spring提供的jmsTemplate.


如果想将目的地由queue换成topic,只需要在配置jmsTemplate的时候,将defaultDestination指向你上面配置的topic即可,然后启动时先启动消费者,其他任何地方不用改。


3、生产者:

@Service
public class Produce {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args){
        // 1、加载配置
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        // 2、获取bean
        Produce produce = context.getBean(Produce.class);
        // 3、调用jmsTemplate发送消息
        /*produce.jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("spring整合activeMQ");
                return message;
            }
        });*/
        // lambda方式:编程口诀(拷贝小括号,写死右箭头,落地大括号)
        produce.jmsTemplate.send((Session session) -> {
            TextMessage message = session.createTextMessage("spring整合activeMQ");
            return message;
        });
        System.out.println("activemq send success!");
    }
}


4、消费者:

@Service
public class Consumer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args){
        // 1、加载配置
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        // 2、获取bean
        Consumer consumer = context.getBean(Consumer.class);
        // 3、消费消息
        String result = (String)consumer.jmsTemplate.receiveAndConvert();
        System.out.println("消费者收到消息:" + result);
    }
}


这样就搞定了。不过上面说了,如果把目的地换成topic要先启动消费者。其实还可以配置监听程序,这样就不需要手动启动消费者了,消费者会一直处于待命状态。先写一个监听的类。


5、监听程序:

@Component
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (null != message && message instanceof TextMessage){
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("从activemq收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}


然后在applicationContext.xml中配置:

<!-- 5、配置监听程序 -->
 <bean id="jmsListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="destination" ref="destinationTopic"/>
        <property name="messageListener" ref="myMessageListener"/>
 </bean>


这样就只需启动生产者即可了,监听程序会自动监听,收到消息就会进行消费。


二、activeMQ与springboot整合


首先新建一个springboot项目,用来编写生产者代码。


1、需要的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>


2、application.yml:

server:
  port: 6666
spring:
  activemq:
    broker-url: tcp://192.168.x.xx:61616
    user: admin
    password: admin
  # false = Queue;  true = Topic
  jms:
    pub-sub-domain: false
#队列名称
myqueue: boot-activemq


上面配置的是队列,要用主题的话,把上面的false改成true。


3、配置类:

@Component
@EnableJms // 这个注解必不可少
public class ConfigBean {
    @Value("${myqueue}")
    private String myQueue;
    // 1、创建队列
    @Bean
    public Queue queue(){
        return new ActiveMQQueue(myQueue);
    }
}


这个配置类主要就是创建了一个队列,队列名从配置文件中读取。


4、生产者:


这里主要有两种生产模式,一种是触发投递,一种是定时投递。触发投递就是程序启动后,满足某个条件才会去调用发送消息的方法;定时投递就是相当于一个定时任务。

@Component
public class Produce {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Queue queue;
    // 触发投递
    public void produceMsg(){
        String message = "springboot整合activemq成功";
        jmsMessagingTemplate.convertAndSend(queue,message);
        System.out.println("消息触发投递成功");
    }
    // 定时生产,每隔5秒中向MQ发送一次消息(还需在启动类上加上@EnableScheduling注解)
    @Scheduled(fixedDelay = 5000)
    public void produceMsgScheduled(){
        String message = "定时投递消息";
        jmsMessagingTemplate.convertAndSend(queue,message);
        System.out.println("消息定时投递成功");
    }
}


注意定时投递需要在启动类上加@EnableScheduling注解!要测试定时投递,直接运行spring boot的启动类就好了,就可以看到每隔5秒“消息定时投递成功”就会被打印一次。要测试触发投递,就需要我们手动地去调用produceMsg方法,可以写个如下的测试类:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqspringbootApplicationTests {
    @Autowired
    private Produce produce;
    @Test
    public void contextLoads() {
        produce.produceMsg();
    }
}


运行这个测试类就可以看到会打印出“消息触发投递成功”,然后程序就会停止。


5、消费者:


我们知道消费消息有两种方式,一种是用receive方法,还有就是监听。用receive方法和spring中的一样,这里讲如何配置监听。

@Component
public class Queue_Consumer {
    @JmsListener(destination = "${myqueue}")
    public void recevice(TextMessage message) throws  Exception{
        System.out.println("消费者收到消息:" + message.getText());
    }
}


没错,就是这么简单!在spring中还需要我们自己新建监听类,然后配置到配置文件中,在springboot中,一个注解就搞定了!


6、发布订阅:


上面用的是队列,主题的使用方法如下:


  • 生产者和消费者项目配置文件中的pub-sub-domain的值改为true。


  • 生产者的配置类中,new的不是队列,而是主题了,如下:

@Bean
public Topic topic(){
     return new ActiveMQTopic(myTopic);
}


  • 生产消息的时候注入主题,而非队列,其他无异。


可以看出,springboot整合activemq比spring整个它简单很多!




相关文章
|
2月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
消息中间件 存储 安全
从0开始回顾Kafka---系列二
生产者 1、 Kafka 中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么? 分区器 ● 消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。 ● 如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。 序列化器 ● 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通
|
5月前
|
消息中间件 存储 缓存
从0开始回顾Kafka---系列三
消费者只能拉取到这个 offset 之前的消息。
|
5月前
|
消息中间件 存储 容灾
从0开始回顾Kafka---系列一
2、 Kafka有哪些优点和缺点? 优点: 1. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。 2. 可扩展性:kafka集群支持水平扩展。 3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。 4. 容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障)。 5. 高并发:支持数千个客户端同时读写。 缺点: 1. 同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送。 2. Kafka 不太适合在线业务场景,由于是批量发送,所以数据达不到真正的实时。 3.
|
5月前
|
消息中间件 Java
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
293 0
|
5月前
|
消息中间件 安全 Linux
2022年3月13日安装和启动ActiveMQ遇到问题
2022年3月13日安装和启动ActiveMQ遇到问题
|
消息中间件 Java Kafka
springboot整合kafka和zookeeper简易示例(win平台)
springboot整合kafka和zookeeper简易示例(win平台)
120 0
|
消息中间件 Java
SpringBoot整合ActiveMq实现Queue和Topic两种模式
SpringBoot整合ActiveMq实现Queue和Topic两种模式
131 4
SpringBoot整合ActiveMq实现Queue和Topic两种模式
|
消息中间件 存储 Dubbo
ActiveMQ --- 入门篇
ActiveMQ --- 入门篇
ActiveMQ --- 入门篇
|
消息中间件 网络协议 NoSQL
消息队列---RabbitMQ深入研究(含Springboot+RabbitMQ整合)(一)
消息队列---RabbitMQ深入研究(含Springboot+RabbitMQ整合)
251 0
消息队列---RabbitMQ深入研究(含Springboot+RabbitMQ整合)(一)