JMS 消息队列接口基本使用指南

简介: JMS 消息队列接口基本使用指南

概述

介绍

JMS(Java Message Service)即 Java 消息服务应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对JMS 提供支持。

简短来说,JMS 是一种与厂商无关的 API,是 sun 公司为了统一厂商的接口规范,而定义出的一组api接口,用来访问消息收发系统消息。它类似于 JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。


JMS 体系结构

  • JMS 提供者(JMS 的实现者,比如 activemq、jbossmq、tonglinkmq 等)
  • JMS 客户(使用提供者发送消息的程序或对象,例如在 12306 中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS 生产者(producer、sender):负责创建并发送消息的客户
  • JMS 消费者(customer、listener):负责接收并处理消息的客户
  • JMS 消息(message):在 JMS 客户之间传递数据的对象
  • JMS 队列(queue):一个容纳那些被发送的等待阅读的消息的区域
  • JMS 主题(topic):一种支持发送消息给多个订阅者的机制


JMS 对象模型

  • 连接工厂(connectionFactory)客户端使用 JNDI 查找连接工厂,然后利用连接工厂创建一个 JMS 连接
  • JMS 连接:表示 JMS 客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的
  • JMS 会话:session 标识 JMS 客户端和服务端的会话状态。会话建立在 JMS 连接上,标识客户与服务器之间的一个会话进程。
  • JMS 目的(Destination): 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型:分为队列类型(优先先进先出)以及订阅类型


消息监听器

MessageListener

MessageListener 是最原始的消息监听器,它是 JMS 规范中定义的一个接口。其中定义了一个用于处理接收到的消息的 onMessage() 方法,该方法只接收一个 Message 参数。

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
    public void onMessage(Message message) {
        // 若生产者发送的是一个纯文本消息,可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一个纯文本消息。");
        try {
            System.out.println("消息内容是:" + textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}


SessionAwareMessageListener

SessionAwareMessageListener 是 Spring 提供的,它不是标准的 JMS MessageListener。

MessageListener 的设计只是纯粹用来接收消息的,假如在使用 MessageListener 处理接收到的消息时需要发送一个消息通知对方已经收到这个消息了,那么这个时候就需要在代码里面去重新获取一个 Connection 或 Session。而 SessionAwareMessageListener 的设计就是为了方便在接收到消息后发送一个回复的消息,它同样提供了一个处理接收到的消息的 onMessage() 方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的 Session 对象。

使用 SessionAwareMessageListener 监听器,可以在监听并消费了消息后,不用重新获取一个 Connection 或 Session,而是直接向原 Connection 或 Session 的某一个队列发送消息。

代码示例:

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener {
    private Destination destination;
    public void onMessage(TextMessage message, Session session) throws JMSException {
        System.out.println("收到一条消息");
        System.out.println("消息内容是:" + message.getText());
        MessageProducer producer = session.createProducer(destination);
        Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
        producer.send(textMessage);
    }
    public Destination getDestination() {
        returndestination;
    }
    public void setDestination(Destination destination) {
        this.destination = destination;
    }
}

说明:定义了一个 SessionAwareMessageListener,在这个 Listener 中在接收到了一个消息之后,利用对应的 Session 创建了一个到 destination 的生产者和对应的消息,然后利用创建好的生产者发送对应的消息。


MessageListenerAdapter

MessageListenerAdapter 类实现了 MessageListener 接口和 SessionAwareMessageListener 接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的 Java 类进行处理。

  • MessageListenerAdapter 会把接收到的消息做如下转换:
  • TextMessage 转换为 String 对象
  • BytesMessage 转换为 byte 数组
  • MapMessage 转换为 Map 对象
  • ObjectMessage 转换为对应的 Serializable 对象
  • 代码示例:
// 目标处理器类
public class ConsumerListener {  
    public void handleMessage(String message) {  
        System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);  
    }  
    public void receiveMessage(String message) {  
        System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);  
    }  
}  
<!-- 消息监听适配器 -->  
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
    <property name="delegate">  
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
    </property>  
    <property name="defaultListenerMethod" value="receiveMessage"/>  
</bean>  
<!-- 消息监听适配器对应的监听容器 -->  
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    <property name="connectionFactory" ref="connectionFactory"/>  
    <property name="destination" ref="adapterQueue"/>  
    <!-- 使用MessageListenerAdapter来作为消息监听器 -->  
    <property name="messageListener" ref="messageListenerAdapter"/>
</bean> 
  • 注意:
  • MessageListenerAdapter 会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器:一个普通的 Java 类(ConsumerListener)进行处理。
    如果真正的目标处理器是一个 MessageListener 或者是一个 SessionAwareMessageListener,那么 Spring 将直接使用接收到的Message 对象作为参数调用它们的 onMessage 方法,而不会再利用反射去进行调用。
    故在定义一个 MessageListenerAdapter 的时候就需要为它指定这样一个目标类。这个目标类可以通过 MessageListenerAdapter 的构造方法参数指定,也可以通过它的 delegate 属性来指定。
  • MessageListenerAdapter 另外一个主要的功能是可以通过 MessageListenerAdapter 注入的 handleMessage 方法自动的发送返回消息。当用于处理接收到的消息的方法(默认是 handleMessage)的返回值不为空(null或者void)的时候,Spring 会自动将它封装为一个 JMS Message,然后自动进行回复。这个回复消息将发送到的地址主要有两种方式可以指定:
  • 可以通过发送的 Message 的 setJMSReplyTo 方法指定该消息对应的回复消息的目的地
  • 通过 MessageListenerAdapter 的 defaultResponseDestination 属性来指定


基本使用

依赖

<!-- jms -->
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
</dependency>
<!-- spring jms -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
</dependency>
<!-- tonglinkMq jms api -->
<dependency>
    <groupId>com.tongtech.tlq</groupId>
    <artifactId>TongJMS-without-atomikos</artifactId>
    <version>8.1.0-SNAPSHOT</version>
</dependency>


SpringBoot 集成 jms

jms 配置类

import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.tongtech.tmqi.ConnectionFactory;
@EnableJms  // 声明对 JMS 注解的支持
@Configuration
public class TestCreator {
    private String host;
    private Integer port;
    private String queueManager;
    private String channel;
    private String username;
    private String password;
    private int ccsid;
    private String queueName;
    private long receiveTimeout;
  // 配置连接工厂(tonglinkMq)
    @Bean
    public ConnectionFactory connectionFactory() throws JMSException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");
        connectionFactory.setProperty("tmqiDefaultUsername", "admin");
        connectionFactory.setProperty("tmqiDefaultPassword", "123456");
        return connectionFactory;
    }
  // 配置缓存连接工厂 不配置该类则每次与MQ交互都需要重新创建连接,大幅降低速度。
    @Bean
    @Primary
    public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
        cachingConnectionFactory.setSessionCacheSize(500);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }
  // 配置DefaultJmsListenerContainerFactory, 用@JmsListener注解来监听队列消息时,尤其存在多个监听的时候,通过实例化配置DefaultJmsListenerContainerFactory来控制消息分发
    @Bean(name = "jmsQueueListenerCF")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        // 设置连接数。如果对消息消费有顺序要求,这里建议设置为"1-1"
        // 注:使用同一个监听工厂类监听多个队列时,连接数需大于等于监听队列数
        factory.setConcurrency("3-10"); // 下限-上限
        // 重连间隔时间
        factory.setRecoveryInterval(1000L);
        // factory.setPubSubDomain(true); // 支持发布订阅功能(topic)
        // factory.setConcurrency("1");   // topic 模式,并发必须设置为1,不然一条消息可能会被消费多次
        return factory;
    }
  // 配置JMS模板,实例化jmsTemplate后,可以在方法中通过@autowired的方式注入模板,用方法调用发送/接收消息
    // 注:如果只是接收消息,可以不配置此步
    @Bean
    public JmsTemplate jmsQueueTemplate(CachingConnectionFactory cachingConnectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(receiveTimeout); // 设置超时时间
        // jmsTemplate.setPubSubDomain(true); // 开启发布订阅功能(topic)
        return jmsTemplate;
    }
}


发送消息

public class jmsUtil {
    @Autowired
    private JmsTemplate jmsQueueTemplate;
    /**
     * 发送原始消息 Message
     */
    public void send(){
        jmsQueueTemplate.send("queue1", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("我是原始消息");
            }
        });
    }
    /**
     * 发送消息自动转换成原始消息
     * 注:关于消息转换,还可以通过实现MessageConverter接口来自定义转换内容
     */
    public void convertAndSend(){
        jmsQueueTemplate.convertAndSend("queue1", "我是自动转换的消息");
    }
}


监听接收消息

采用注解 @JmsListener 来设置监听方法

@Slf4j
@Component
// 此处继承MessageListenerAdapter非必需。但若只使用@JmsListener注解监听,可能会出现监听消息获取不及时或者获取不到消息的情况,加上继承MessageListenerAdapter后便不会出现
public class MdxpMessageListener extends MessageListenerAdapter {
    /**
     * 消息队列监听器
     * destination 队列地址,此处使用静态变量,支持配置化详见下文
     * containerFactory 监听器容器工厂(包含配置源), 若存在2个以上的监听容器工厂,需进行指定
     */
    @Override
    @JmsListener(destination = "TEST_QUEUE",containerFactory = "jmsQueueListenerCF")
    public void onMessage(Message message) {
        // JmsListener收到消息后,会自动封装成自己特有的数据格式,需要自行根据消息类型解析原始消息
        String msgText = ""; 
        double d = 0; 
        try { 
            if (msg instanceof TextMessage) {    
                msgText = ((TextMessage) msg).getText(); 
            } else if (msg instanceof StreamMessage) {    
                msgText = ((StreamMessage) msg).readString();    
                d = ((StreamMessage) msg).readDouble();    
            } else if (msg instanceof BytesMessage) {    
                byte[] block = new byte[1024];    
                ((BytesMessage) msg).readBytes(block);    
                msgText = String.valueOf(block);    
            } else if (msg instanceof MapMessage) {    
                msgText = ((MapMessage) msg).getString("name");    
            }
            log.info("接收消息={}", msgText);
        } catch (JMSException e) { 
            log.error("消息接收异常!", e);
        }
    }
    @JmsListener(destination = "TEST_QUEUE2",containerFactory = "jmsQueueListenerCF")
    // @Payload是消费者接受生产者发送的队列消息,将队列中的json字符串变成对象的注解,注意填充类需要实现序列化接口
    public void messageListener2(@payload User user){
        log.info("message={}", user)
    }
}


@JmsListener 注解 destination 支持配置化

注入配置读取类

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
 * 队列名称配置
 * 这里切记要@Data,或手动set和get
 */
@Component
@Data
public class QueueNameConfig {
    @Value("${ibmmq.queue-test}")
    private String testQueue;
}

队列监听类

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
 * MQ消费者
 */
@Component
@Slf4j
public class ReceiveMessage extends MessageListenerAdapter {
    /**
     * destination:监听的队列名称,使用SpEL表达式写入
     * containerFactory:监听的工厂类,为配置类中所配置的名字
     */
    @Override
    @JmsListener(destination = "#{@queueNameConfig.testQueue}", containerFactory = "jmsListenerContainerFactory")
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;  //转换成文本消息
        try {
            String text = textMessage.getText();
            log.info("接收信息:{}", text);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


javax 原生 jms

public class jmstest {
    public static void main(String[] args) throws Exception { 
        // 配置工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");
        connectionFactory.setProperty("tmqiDefaultUsername", "admin");
        connectionFactory.setProperty("tmqiDefaultPassword", "123456");
        // 获取连接和会话
        Connection mqConn = connectionFactory.createConnection(); 
        // 创建会话。CLIENT_ACKNOWLEDGE:手动应答,AUTO_ACKNOWLEDGE:自动应答
        Session mqSession = mqConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); 
        // 创建队列
        Queue queuemq = Session.createQueue(queueName);
        // 获取消费者
        MessageConsumer consumer = mqSession.createConsumer(mqSession.createQueue(queueName)); 
        // 设置监听器
        consumer.setMessageListener(new MessageListener() { 
            public void onMessage(Message msg) { 
                // JmsListener收到消息后,会自动封装成自己特有的数据格式,需要自行根据消息类型解析原始消息
                String msgText = ""; 
                double d = 0; 
                try { 
                    if (msg instanceof TextMessage) {
                        msgText = ((TextMessage) msg).getText(); 
                    } else if (msg instanceof StreamMessage) {    
                        msgText = ((StreamMessage) msg).readString();    
                        d = ((StreamMessage) msg).readDouble();    
                    } else if (msg instanceof BytesMessage) {    
                        byte[] block = new byte[1024];    
                        ((BytesMessage) msg).readBytes(block);    
                        msgText = String.valueOf(block);    
                    } else if (msg instanceof MapMessage) {    
                        msgText = ((MapMessage) msg).getString("name");    
                    }
                    log.info("接收消息={}", msgText);
                    // 手动应答
                  textMessage.acknowledge();
                } catch (JMSException e) { 
                    log.error("消息接收异常!", e);
                }
            }
        }); 
        // 启动连接
        mqConn.start(); 
    }
    // 获取生产者
    MessageProducer producer = mqSession.createProducer(mqSession.createQueue(queueName)); 
    // topic(广播)模式
    // Topic topic = Session.createTopic(queueName);
    // MessageProducer producer = mqSession.createProducer(topic); 
    producer.setDeliveryMode(DeliveryMOde.NON_PERSISTENT);
    producer.send(mqSession.createTexttMessage("这是一条消息"));
    // 关闭资源
    producer.close();
    // 断开连接
    connection.close();
}
相关文章
|
消息中间件 Java Kafka
Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记)
Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记) 1、Kafka是什么  Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
3375 0
|
消息中间件 缓存 Linux
消息队列接口API(posix 接口和 system v接口)
消息队列 posix API 消息队列(也叫做报文队列)能够克服早期unix通信机制的一些缺点。信号这种通信方式更像\"即时\"的通信方式,它要求接受信号的进程在某个时间范围内对信号做出反应,因此该信号最多在接受信号进程的生命周期内才有意义,信号所传递的信息是接近于随进程持续的概念(process-persistent);管道及有名管道则是典型的随进程持续IPC,并且,只能传送无格式的字节流无疑会给应用程序开发带来不便,另外,它的缓冲区大小也受到限制消息队列就是一个消息的链表。
1747 0
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
58 0
手撸MQ消息队列——循环数组
|
5月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
197 1