消息结构
消息头
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:JMSDestination,JMSMessageID等。
消息属性
如果需要除消息头字段以外的值,那么可以使用消息属性。这种新属性包含以下几种:
应用需要用到的属性;
消息头中原有的一些可选属性;
JMS Provider 需要用到的属性。
消息体
JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
TextMessage: java.lang.String对象,如xml文件内容;
MapMessage: 名/值对的集合,名是String对象,值类型可以是Java任何基本类型;
BytesMessage: 字节流
StreamMessage:Java中的输入输出流
ObjectMessage:Java中的可序列化对象
消息域
1.点对点方式:
点对点消息传递域的特点如下:
• 每个消息只能有一个消费者。
• 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息
2.发布/订阅消息传递域
发布/订阅消息传递域的特点如下:
• 每个消息可以有多个消费者。
• 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
消息首发时序图
队列实现
生产者:
创建一个连接
ConnectionFactory connectionFactory;
connectionFactory = new ActiveMQConnectionFactory();
通过connectionFactory 创建Connection,并启动它
connection = connectionFactory.createConnection();
Connection.start();
通过connection创建回话,设置是否支持事务和acknowledge标志
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
通过destination创建MessgeProducer对象,同时设置持久化模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
发送消息到队列(queue)
封装TextMessage消息,使用MessageProceducer的send方法;
消费者:
通过connectionFactory 创建Connection,并启动它。这里和生产 者一致,
同理也要创建Session和Destination
创建MessageConsumer:
MessageConsumer = session.createConsumer(destination);
实现MessageListener接口,并实现OnMessage方法
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
在此方法中接受proceduer发送的消息进行处理
}
发布/订阅实现
上图展示了pub/sub模式的流程图,消息的具体实现过程和队列模式类似这里不再赘述;其区别如下:
Publish/Subscribe
Point-to-Point
消息公共接口
第一列中 的接口是PTP和Pub/Sub所共有的,
【ConnectionFactory】:建立连接工厂,创建一个消息传递的连接
ConnetionFactory cf = new ActiveMQConnectionFactory();
【Connection】:从连接工厂中获取到连接;
Connection conn = cf.createConnection();
【session】:客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收:
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
【destination】:客户端用Session 创建Destination 对象。此处的目标为队列,队列由队列名识别;
destination = session.createQueue("test-queue" );
【MessageProducer】:客户端用MessageProducer 发送消息到队列
producer = session.createProducer(destination);
【MessageConsumer】:客户端用MessageConsumer 接收队列中的消息,如果用户在receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到
producer = session. createConsumer(destination);
队列和发布/订阅的接口实现和上面类似;这里不再赘述。
消息通讯机制
内存的使用
activeMQ在内存使用方面非常慎重,任何消息只有在内存里有空闲区域时,才能放到内存里,之后才能发给消费者。当消息被消费者消耗掉了后,确认信息会发给activeMQ。Queue接收到这些确认消息后,会把那些被确认的消息所占用的内存释放掉。
消息的审查(audit)
为了防止消息的重复发送,activeMQ采用了一个审查机制,它负责审查某条消息是否重复。它是一个最近最久未使用算法(LRU)队列。每个队列元素它是一个bit数组,它的运行机制如下所示:
每个消息都知道它的上一个和下一个消息,当它自身被删除后,相应的关系会进行调整
但是activeMQ5.1版本的实现,问题就出在第三步的顺序读入。因为从文件中读入它有个先决条件,那就是必须要有可用的内存,如果没有可用的话,就放弃本次消息读入,并且应该放弃这次读取操作。但是5.1版本是继续往下读,这就导致顺序错乱,使得当内存可用的时候,读入的消息在进行审查的时候,发生错误,错误认为它们是重复消息。这就导致发送20W条消息,不能保证完全收到。
通过上图,我们以TCP协议来分析ActiveMQ的通讯机制
先明确以下概念:
客户(Client):消息生产者和消费者对于MQ都是客户。
消息中转器(Message Broker):它是MQ的核心,负责处理消息。
通过以下三个方面说明MQ如何通讯:
建立连接
关闭链接
心跳
建立链接:
activeMQ初始化时,通过TcpTransportServer类根据配置打开TCP侦听端口,客户通过该端口发起建立链接的动作。
把accept的Socket放入阻塞队列中。
另外一个线程Socket handler阻塞着等待队列中是否有新的Socket,如果有则取出来
生成一个TransportConnection的实例。TransportConnection类的主要作用是处理链路的状态信息,并实现CommandVisitor接口来完成各类消息的处理
TransportConnection会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序:MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环就是TcpTransport类,它是实际和Client获取和发送数据的地方,该类的重要方法有run()和oneway(),一个负责读取,一个负责发送。
建链完成,可以进行通讯操作。
关闭链接:
activeMQ发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的 int n = in.read(buffer, position, buffer.length - position);
心跳:
为了更好的维护TCP链路的使用,activeMQ采用了心跳机制作为判断双方链路的健康情况。activeMQ使用的是双向心跳,也就是activeMQ的Broker和Client双方都进行相互心跳,但不管是Broker或Client心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍
心跳会产生两个线程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回true。