下载安装地址:archive.apache.org/dist/activemq
Maven依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.3.6</version> </dependency>
- 注意:不要直接导入activemq-all这个依赖不让会有以来冲突,这个包太大了,可以分开导入
MQ的产品种类和对比
编辑
现有主流的MQ:
- kafka
编程语言:scala。
大数据领域的主流MQ。
- rabbitmq
编程语言:erlang
基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。
- rocketmq
编程语言:java
适用于大型项目。适用于集群。
- activemq
编程语言:java
适用于中小型项目。
MQ的主要作用
- 解耦,解除各个系统之间的耦合性
- 削峰,抵御大量的流量,保护主要业务
- 异步,调用者无需等待
MQ的两种模式:
- Queue模式:一对一
- topic主题模式:一对多编辑
编辑
JMS编码规范
编辑
编辑
队列消息Queue总结
同步阻塞方式(receive):
订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage()):
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
队列消息Queue特点
编辑
Queue模式代码
gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Pro/src/main/java/icu/look/smile/basic/queue
Topic
在发布订阅消息传递域中,目的地被称为主题(topic)
发布/订阅消息传递域的特点如下:
(1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
(2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
(3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
Topic模式代码
gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Pro/src/main/java/icu/look/smile/basic/topic
Topic和Queue的对比
编辑
JMS规范
- jms是什么?
什么是Java消息服务?
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
消息头
JMS的消息头有哪些属性:
JMSDestination:消息目的地
JMSDeliveryMode:消息持久化模式
JMSExpiration:消息过期时间,设为0表示永不过期
JMSPriority:消息的优先级,0~4普通,5~9加急,默认4
JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。
说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。
这些属性在send方法里面也可以设置,可以用雪花算法。
消息体
编辑
消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
- 示例代码:
for (int i = 1; i < 4 ; i++) { TextMessage textMessage = session.createTextMessage("topic_name--" + i); // 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。 textMessage.setStringProperty("From","ZhangSan@qq.com"); textMessage.setByteProperty("Spec", (byte) 1); textMessage.setBooleanProperty("Invalide",true); messageProducer.send(textMessage); }
if (null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println("消息体:"+textMessage.getText()); System.out.println("消息属性:"+textMessage.getStringProperty("From")); System.out.println("消息属性:"+textMessage.getByteProperty("Spec")); System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide")); }catch (JMSException e) { } }
消息的持久和非持久
- Queue的持久和非持久编辑
- 注意:默认是持久
- Topic的持久和非持久
gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Pro/src/main/java/icu/look/smile/basic/deliverytopic
消息的签收机制
- 签收的几种方式
- 自动签收(AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
- 手动签收(CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
- 允许重复消息(DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
- 事务下的签收(SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。
- 事务和签收的关系
- 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
- 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
- 生产者事务开启,只有commit后才能将全部消息变为已消费。
- 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。
- 例子:
- DeliveryTopicConsumer:
package icu.look.smile.basic.deliverytopic.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; import java.util.Properties; /* * @Author gcq * @Description : * topic持久化 * @Date 16:38 2c021/5/7 **/ @SuppressWarnings("all") public class DeliveryTopicConsumer { private static ActiveMQConnectionFactory factory = null; private static Connection connection = null; private static Session session = null; private static TopicSubscriber durableSubscriber = null; public static void main(String[] args) { Properties properties = new Properties(); try { properties.load(DeliveryTopicConsumer.class.getClassLoader().getResourceAsStream("mq-detail.properties")); factory = new ActiveMQConnectionFactory( (String)properties.get("BROCK_USERNAME") , (String)properties.get("BROCK_PASSWORD"), (String) properties.get("BROCK_URL") ); connection = factory.createConnection(); // ClienID一定要在session之前创建connection之后 connection.setClientID("clinet1"); //ture提交事务,结合commit session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic6"); durableSubscriber = session.createDurableSubscriber(topic, "reamaker2..."); connection.start(); durableSubscriber.setMessageListener((message) -> { if (null != message & message instanceof TextMessage){ try { System.out.println("detail:"+((TextMessage)message).getText()); message.acknowledge(); session.commit(); } catch (JMSException e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException jmsException) { jmsException.printStackTrace(); } } } }); } catch (IOException | JMSException e) { e.printStackTrace(); }finally { if (connection != null ){ try { System.in.read(); durableSubscriber.close(); session.close(); connection.close(); } catch (JMSException | IOException e) { e.printStackTrace(); } } } } }
- DeliveryTopicProducer:
package icu.look.smile.basic.deliverytopic.producer; import icu.look.smile.basic.deliverytopic.consumer.DeliveryTopicConsumer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; /* * @Author gcq * @Description : * topic持久化 * @Date 16:38 2021/5/7 **/ public class DeliveryTopicProducer { private static ActiveMQConnectionFactory factory = null; private static Connection connection = null; private static Session session = null; private static MessageProducer producer =null; public static void main(String[] args) throws JMSException { Properties properties = new Properties(); try { properties.load(DeliveryTopicConsumer.class.getClassLoader().getResourceAsStream("mq-detail.properties")); factory = new ActiveMQConnectionFactory( (String) properties.get("BROCK_USERNAME"), (String) properties.get("BROCK_PASSWORD"), (String) properties.get("BROCK_URL") ); connection = factory.createConnection(); session = connection.createSession(true, Session.SESSION_TRANSACTED); Topic topic = session.createTopic("topic6"); producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); while (true){ try { TextMessage textMessage = session.createTextMessage("msg:" + new Date() + "new!!!"); producer.send(textMessage); session.commit(); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); session.rollback(); } } } catch (IOException | JMSException e) { e.printStackTrace(); session.rollback(); }finally { if (producer != null){ try { producer.close(); session.close(); connection.close(); } catch ( JMSException e) { e.printStackTrace(); } } } } }
JMS点对点总结
点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收
队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势
- JMS的发布订阅总结
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送
- 非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
- 持久订阅
客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
当持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息。
- 非持久和持久化订阅如何选择
当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。
ActiveMQ的Broker(嵌入式mq)
- 相当于一个ActiveMQ服务器实例。说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
- maven依赖坐标:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.1</version> </dependency>
- Broker来启动类:
package icu.look.smile.broker; import org.apache.activemq.broker.BrokerService; /* * @Author gcq * @Description : * activemq 服务器 * @Date 9:05 2021/5/8 **/ public class EmActiveMqServer { public static void main(String[] args) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setPopulateJMSXUserID(true); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
Spring整合ActiveMQ
gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_Spring
SpringBoot整合ActiveMQ【Queue】
gitee地址:https://gitee.com/mantianx/Mq_Sub/tree/master/ActiveMq_SpringBoot