四、ActiveMQ
4.1、下载安装(Win)
Ps1:直接启动:找到 activemq.bat 启动(推荐管理员方式运行)。
Ps2:使用服务启动:找到 InstallService.bat 启动(推荐管理员方式运行)。4.2、队列模式
AppProducer 类
packagecom.myimooc.jms.queue; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.MessageProducer; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; /*** App 生产者-队列模式*/publicclassAppProducer { // 指定ActiveMQ服务的地址privatestaticfinalStringURL="tcp://127.0.0.1:61616"; // 指定队列的名称privatestaticfinalStringQUEUE_NAME="queue-test"; publicstaticvoidmain(String[] args) throwsJMSException { // 1.创建ConnectionFactoryConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL); // 2.创建ConnectionConnectionconnection=connectionFactory.createConnection(); // 3.启动连接connection.start(); // 4.创建会话(第一个参数:是否在事务中处理)Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 创建一个目标Destinationdestination=session.createQueue(QUEUE_NAME); // 6.创建一个生产者MessageProducerproducer=session.createProducer(destination); for (inti=0; i<100; i++) { // 7.创建消息TextMessagetextMessage=session.createTextMessage("test"+i); // 8.发布消息producer.send(textMessage); System.out.println("消息发送:"+textMessage.getText()); } // 9.关闭连接connection.close(); } }
AppConsumer 类
packagecom.myimooc.jms.queue; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.Message; importjavax.jms.MessageConsumer; importjavax.jms.MessageListener; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; /*** App 消费者-队列模式*/publicclassAppConsumer { /** 指定ActiveMQ服务的地址 */privatestaticfinalStringURL="tcp://127.0.0.1:61616"; /** 指定队列的名称 */privatestaticfinalStringQUEUE_NAME="queue-test"; publicstaticvoidmain(String[] args) throwsJMSException { // 1.创建ConnectionFactoryConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL); // 2.创建ConnectionConnectionconnection=connectionFactory.createConnection(); // 3.启动连接connection.start(); // 4.创建会话(第一个参数:是否在事务中处理)Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建一个目标Destinationdestination=session.createQueue(QUEUE_NAME); // 6.创建一个消费者MessageConsumerconsumer=session.createConsumer(destination); // 7.创建一个监听器consumer.setMessageListener(newMessageListener() { publicvoidonMessage(Messagemessage) { TextMessagetextMessage= (TextMessage)message; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSExceptione) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭连接//connection.close(); } }
JMS队列模式:
消息发布出去后,只要有消费者消费就算OK,不存在消费者要先订阅(启动监听)的问题。
4.3、主题模式
AppProducer 类
packagecom.myimooc.jms.topic; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.MessageProducer; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; /*** App 生产者-主题模式*/publicclassAppProducer { /** 指定ActiveMQ服务的地址 */privatestaticfinalStringURL="tcp://127.0.0.1:61616"; /** 指定主题的名称 */privatestaticfinalStringTOPIC_NAME="topic-test"; publicstaticvoidmain(String[] args) throwsJMSException { // 1.创建ConnectionFactoryConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL); // 2.创建ConnectionConnectionconnection=connectionFactory.createConnection(); // 3.启动连接connection.start(); // 4.创建会话(第一个参数:是否在事务中处理)Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 创建一个目标Destinationdestination=session.createTopic(TOPIC_NAME); // 6.创建一个生产者MessageProducerproducer=session.createProducer(destination); for (inti=0; i<100; i++) { // 7.创建消息TextMessagetextMessage=session.createTextMessage("test"+i); // 8.发布消息producer.send(textMessage); System.out.println("消息发送:"+textMessage.getText()); } // 9.关闭连接connection.close(); } }
AppConsumer 类
packagecom.myimooc.jms.topic; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.Message; importjavax.jms.MessageConsumer; importjavax.jms.MessageListener; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; /*** App 消费者-主题模式*/publicclassAppConsumer { /** 指定ActiveMQ服务的地址 */privatestaticfinalStringURL="tcp://127.0.0.1:61616"; /** 指定主题的名称 */privatestaticfinalStringTOPIC_NAME="topic-test"; publicstaticvoidmain(String[] args) throwsJMSException { // 1.创建ConnectionFactoryConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(URL); // 2.创建ConnectionConnectionconnection=connectionFactory.createConnection(); // 3.启动连接connection.start(); // 4.创建会话(第一个参数:是否在事务中处理)Sessionsession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建一个目标Destinationdestination=session.createTopic(TOPIC_NAME); // 6.创建一个消费者MessageConsumerconsumer=session.createConsumer(destination); // 7.创建一个监听器consumer.setMessageListener(newMessageListener() { publicvoidonMessage(Messagemessage) { TextMessagetextMessage= (TextMessage)message; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSExceptione) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭连接//connection.close(); } }
JMS主题模式:
“订阅者先订阅,发布者后发布消息 ---导致--> 订阅者才能收到消息"
就个人理解,先启动订阅者就是先于发布者监听目标队列,其次再由发布者向目标队列发送消息,这样订阅者才会收到信息。如果在订阅前先发布消息再订阅,那么之前的消息收不到,订阅之后的消息还能收到。