生产者代码
1.package com.vhukze.Producer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ProducerTopic { //mq通讯地址 private static String url = "tcp://127.0.0.1:61616"; //队列名称 private static String topicName = "my_topic"; public static void main(String[] args) throws JMSException { //创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); //创建连接 Connection connection = factory.createConnection(); //启动连接 connection.start(); //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式 默认采用自动接收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目标(队列) Topic topic = session.createTopic(topicName); //创建生产者 MessageProducer producer = session.createProducer(topic); for (int i = 0; i <= 10 ; i++) { //创建消息 TextMessage textMessage = session.createTextMessage("消息内容" + i); //发送消息 producer.send(textMessage); } connection.close(); } }
消费者代码
package com.vhukze.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ConsumerTopic { //mq通讯地址 private static String url = "tcp://127.0.0.1:61616"; //队列名称 private static String topicName = "my_topic"; public static void main(String[] args) throws JMSException { System.out.println("消费者1"); //创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); //创建连接 Connection connection = factory.createConnection(); //启动连接 connection.start(); //创建会话,参数1,设置是否需要事务方式提交,参数2,消息方式 默认采用自动接收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目标(队列) Topic topic = session.createTopic(topicName); //创建消费者 MessageConsumer consumer = session.createConsumer(topic); //启动监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println("消费消息:"+textMessage.getText()); }catch (Exception e){ e.printStackTrace(); } } }); } }