ActiveMQ系列:ActiveMQ标准API结合JavaSE实战
上篇初步认识了ActiveMQ,有兴趣的可以移步:https://blog.csdn.net/qq_26975307/article/details/98875098,此篇不多逼逼,上手原生撸码(类比JDBC连接数据库)
1、JMS开发的基本步骤
2、两种消费方式
2.1、同步阻塞方式(receive())
订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive方法在能够接收到消息之前(或超时之、前)将一直阻塞。
2.2、异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。
3、点对点的消息传递域
3.1、特点
(1)每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
(2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费都可
以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
(3)消息被消费后队列中不会再存储,所以消费者不会消费到己经被消费掉的消息。
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活
状态时发送的消息。
4、两大模式的比较
5、ActiveMQ在JavaSE中最简单的实现 —— 队列
5.1、创建生产者
package com.phubing; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * */ public class JmsProduce_Queue { public static final String ACTIVEMQ_URL = "tcp://192.168.177.130:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception{ //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过链接工厂,获得链接connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建session会话 /** * 两个参数: 1、事务;2、签收 * */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列,还是Topic) Queue queue = session.createQueue(QUEUE_NAME); //5、创建消息的生产者 MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //6、通过使用消息生产者生产消息并发送到MQ的队列中 for(int i=1;i<=3;i++){ //7、创建消息 //可以理解为最简单的字符串 TextMessage textMessage = session.createTextMessage("msg-" + i); /** * String消息设置属性 textMessage.setStringProperty("c01", "vip"); */ /** * 在每一条消息发送之前,可以设置消息的请求头属性 MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("map-k1", "map-v1"); producer.send(mapMessage); */ //8、通过消息生产者发送给MQ producer.send(textMessage); } //9、关闭资源 producer.close(); session.close(); connection.close(); System.out.println("消息发送到MQ完成********"); } }
5.2、创建消费者
package com.phubing; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.jms.JmsException; import javax.jms.*; /** * @ClassName JmsConsumer_Queue * @Description TODO * @Author phubing * @Date 2019-07-24 21:42 * @Version 1.0 **/ public class JmsConsumer_Queue { public static final String ACTIVEMQ_URL = "tcp://192.168.177.130:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception { //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过链接工厂,获得链接connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建session会话 /** * 两个参数: 1、事务;2、签收 * */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列,还是Topic) Queue queue = session.createQueue(QUEUE_NAME); //5、创建消费者 MessageConsumer consumer = session.createConsumer(queue); /** * 通过监听方式来消费消息 * */ //如果是接口,则可以直接new ,使用其匿名内部类 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("监听到的消息:"+textMessage.getText()); System.out.println("监听到的属性消息:"+textMessage.getStringProperty("c01")); } catch (JMSException e) { e.printStackTrace(); } }else{ } /* if(null != message && message instanceof MapMessage){ MapMessage mapMessage = (MapMessage) message; try { System.out.println("监听到的Map消息:"+mapMessage.getString("map-k1")); } catch (JMSException e) { e.printStackTrace(); } }else{ } */ } }); //press any key to exit(保证控制台不关闭,直到按下任一按键) System.in.read(); /* * 同步阻塞方式 * 订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive方法能在接收到消息之前(或超时之前)将一直阻塞 * //6、接收消息 while(true){ //若consumer.receive()不指定超时时间,则会一直等待直到有消息进来 TextMessage message = (TextMessage) consumer.receive(3000L); if(null != message){ System.out.println("消费者接收到消息:"+message.getText()); }else { break; } } */ consumer.close(); session.close(); connection.close(); } }
启动并在浏览器和IDEA控制台查看消息的发送与消费情况
5.4、几个问题
先生产,启动1号消费者,再启动2号消费者,问题:2号消费者还能消费吗?
1号消费者不能,2号消费者不能,因为消息已被消费
先生产,只启动1号消费者,问题:1号消费者能消费吗?
能
先启动2个消费者,再生产,问题:消费情况如何?
两个消费者等候,生产者发出来的消息假设是同一队列,则平均分配
MQ挂了,那么消息的持久化和丢失情况分别如何?
看是否设置消息持久化(接下来会有写到)
消息默认的持久化模式?
默认使用持久化
6、ActiveMQ在JavaSE中最简单的实现 —— 主题
6.1、创建生产者
package com.phubing; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ClassName JmsProduce_Topic * @Description TODO * @Author phubing * @Date 2019-07-31 20:56 * @Version 1.0 **/ public class JmsProduce_Topic { public static final String ACTIVEMQ_URL = "tcp://192.168.177.130:61616"; public static final String TOPIC_NAME = "topic-phubing"; public static void main(String[] args) throws Exception{ //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过链接工厂,获得链接connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建session会话 /** * 两个参数: 1、事务;2、签收 * */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列,还是Topic) Topic topic = session.createTopic(TOPIC_NAME); //5、创建消息的生产者 MessageProducer producer = session.createProducer(topic); //6、通过使用消息生产者生产消息并发送到MQ的队列中 for(int i=1;i<3;i++){ //7、创建消息 //可以理解为最简单的字符串 TextMessage textMessage = session.createTextMessage("topic-name-" + i); //8、通过消息生产者发送给MQ producer.send(textMessage); } //9、关闭资源 producer.close(); session.close(); connection.close(); System.out.println("TOPIC消息发送到MQ完成********"); } }
6.2、创建消费者
package com.phubing; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ClassName JmsConsumer_Topic * @Description TODO * @Author phubing * @Date 2019-07-31 20:56 * @Version 1.0 **/ public class JmsConsumer_Topic { public static final String ACTIVEMQ_URL = "tcp://192.168.177.130:61616"; public static final String TOPIC_NAME = "topic-phubing"; public static void main(String[] args) throws Exception{ //1、创建链接工厂,按照给定的URL地址,采用默认用户名密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过链接工厂,获得链接connection并启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建session会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地(具体是队列,还是Topic) Topic topic = session.createTopic(TOPIC_NAME); //5、创建消费者 MessageConsumer consumer = session.createConsumer(topic); //6、如果是接口,则可以直接new ,使用其匿名内部类 consumer.setMessageListener((message) -> { if(null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("监听到的Topic消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }else{} }); System.in.read(); consumer.close(); session.close(); connection.close(); } }
切记,先启动消费者,再启动生产者(非要先生产再消费,你也可以看看什么现象)
6.3、几个问题
3个订阅者同时开启,再开启生产者,问:3个订阅者的接收消息情况如何?
每个订阅者都有生产者生产的所有消息,进队列是生产者生产的数量,出队列的是消费者数量*生产者生产的消息数量
(谁订阅谁收到,不订阅不打扰)
先启动生产者,再启动订阅者,现象如何?
不会收到之前生产的消息,进队列的是生产者生产的数量,出队列数量无(原来的消息已成为废消息)
未完待续........(下篇详细讲解下JMS)