ActiveMQ
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)
安装启动
官网下载压缩包,解压到相应目录 , 这里解压到/opt
启动mq , ./bin/activemq start
关闭mq, ./bin/activemq stop
访问 :127.0.0.1:8161/admin/ 账号admin 密码admin
采用61616端口提供JMS服务,采用8161提供管理控制台服务
通信协议:TCP
生产者生产消息
package com.steak.activemq.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class mqTest { private static final StringACTIVE_URL ="tcp://127.0.0.1:61616"; private static final StringQUEUE ="queue"; public static void main(String[] args)throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂,获得连接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地 Queue queue = session.createQueue(QUEUE); //创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //通过使用messageProducer生产消息发送到MQ队列里 for (int i =0 ; i <3 ; i++){ //创建消息 TextMessage textMessage = session.createTextMessage("消息 "+i); //通过messageProducer发送消息 messageProducer.send(textMessage); } //关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送完成"); } }
启动消费者发送消息,此时等待消费的消息为3条,消费者为0个,进队的消息为3条,出队消息为0条
Number Of Pending Messages:等待消费的消息
Number of Conumers : 消费者数量
Message Enqueued : 进队消息数
Message Dequeued : 出队消息数
消费者消费消息
package com.steak.activemq.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer { private static final StringACTIVE_URL ="tcp://127.0.0.1:61616"; private static final StringQUEUE ="queue"; public static void main(String[] args)throws JMSException { //创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL); //通过连接工厂,获得连接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //创建session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地 Queue queue = session.createQueue(QUEUE); //创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //通过同步阻塞接受消息 while (true){ //接受消息的类型要和发送消息的一样 , TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (null != textMessage){ System.out.println("消息 "+textMessage.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); } }
此时消费者有一个,并且消费了3条消息
消费者接收消息时可以一直等待(耗费系统资源),也可以设置时间,
当为receive()(同步阻塞)时,消费者一直阻塞等待消费消息
为recevice(long varl)(异步阻塞)时,当超时后,消费者就消失
通过异步监听方式来接收消息
//通过异步监听方式来接收消息 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (null != message && messageinstanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("消息 "+((TextMessage) message).getText()); }catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); //保证控制台不关闭,作用是消费完消息以后才能关闭 messageConsumer.close(); session.close(); connection.close(); }
消费者消费情况
1,先启动生产者,再启动多个消费者,一定是第一个消费者消费完所有消息,后面的都消费不到
2,先启动多个消费者,在启动生产者,消息基本是平均消费的,消息个数是基数的时候也是一个的差异
主体模式
当为(主题模式)Topic时,所有消费者收到的消息都是一样的,前提是要先订阅,订阅后才能收到消息,消息是无状态的,发送消息后就什么都不管了
主题模式
如:两人订阅
生产者发布消息,此时发布了三条消息,但是为主题模式,有两个消费者,所以一共消费6条