消息生产者
public class TopicProducer { public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); /*4.获取session (参数1:是否启动事务, 参数2:消息确认模式)*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到MQ世界!"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
消息2个消费者(消费者2代码同消费者1)
public class TopicConsumer1 { public static void main(String[] args) throws JMSException, IOException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息消费者 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费1--接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
运行查看测试结果
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。
总结:
发布订阅的模式: 默认的请情况下消息的内容不存在服务器,当生产者发送了一个消息,如果消费者之前没有订阅,就没了。
点对点的方式:默认的请情况下:将消息存储在服务器上,消费者随时来取,但是一旦一个消费者获取到了消息,这个消息就没有了。