ActiveMQ(三)消息机制

简介:

消息结构

消息头

消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:JMSDestination,JMSMessageID等。 

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性。这种新属性包含以下几种:

  • 应用需要用到的属性;

  • 消息头中原有的一些可选属性;

  • JMS Provider 需要用到的属性。 

消息体

JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

TextMessage:  java.lang.String对象,如xml文件内容;

MapMessage:   名/值对的集合,名是String对象,值类型可以是Java任何基本类型;

BytesMessage: 字节流

StreamMessage:Java中的输入输出流

ObjectMessage:Java中的可序列化对象

消息域

1.点对点方式:

点对点消息传递域的特点如下:

• 每个消息只能有一个消费者。

• 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息

2.发布/订阅消息传递域

发布/订阅消息传递域的特点如下:

• 每个消息可以有多个消费者。 

• 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。


在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

消息首发时序图

135525_MOfn_1767531.jpg

队列实现

生产者:

  • 创建一个连接

ConnectionFactory connectionFactory; 
connectionFactory = new ActiveMQConnectionFactory();
  • 通过connectionFactory 创建Connection,并启动它

connection = connectionFactory.createConnection()Connection.start();
  • 通过connection创建回话,设置是否支持事务和acknowledge标志

session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
  • 通过destination创建MessgeProducer对象,同时设置持久化模式

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  • 发送消息到队列(queue)

封装TextMessage消息,使用MessageProceducer的send方法;

消费者:

  • 通过connectionFactory 创建Connection,并启动它。这里和生产 者一致,

  • 同理也要创建Session和Destination

  • 创建MessageConsumer:

MessageConsumer = session.createConsumer(destination);
  • 实现MessageListener接口,并实现OnMessage方法

consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
    在此方法中接受proceduer发送的消息进行处理
}

发布/订阅实现

135916_7im3_1767531.png

上图展示了pub/sub模式的流程图,消息的具体实现过程和队列模式类似这里不再赘述;其区别如下:

Publish/Subscribe

140449_rxOh_1767531.png

Point-to-Point

140449_CWkt_1767531.png

消息公共接口

141624_UNQI_1767531.png

第一列中 的接口是PTP和Pub/Sub所共有的,

【ConnectionFactory】:建立连接工厂,创建一个消息传递的连接

ConnetionFactory  cf =  new ActiveMQConnectionFactory();

【Connection】:从连接工厂中获取到连接;

Connection conn = cf.createConnection();

【session】:客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收:

session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

【destination】:客户端用Session 创建Destination 对象。此处的目标为队列,队列由队列名识别;

destination = session.createQueue("test-queue" );

【MessageProducer】:客户端用MessageProducer 发送消息到队列

producer = session.createProducer(destination);

【MessageConsumer】:客户端用MessageConsumer 接收队列中的消息,如果用户在receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到

producer = session. createConsumer(destination);

队列和发布/订阅的接口实现和上面类似;这里不再赘述。

消息通讯机制

141819_ec2x_1767531.png

内存的使用

activeMQ在内存使用方面非常慎重,任何消息只有在内存里有空闲区域时,才能放到内存里,之后才能发给消费者。当消息被消费者消耗掉了后,确认信息会发给activeMQ。Queue接收到这些确认消息后,会把那些被确认的消息所占用的内存释放掉。

消息的审查(audit)

为了防止消息的重复发送,activeMQ采用了一个审查机制,它负责审查某条消息是否重复。它是一个最近最久未使用算法(LRU)队列。每个队列元素它是一个bit数组,它的运行机制如下所示:

每个消息都知道它的上一个和下一个消息,当它自身被删除后,相应的关系会进行调整

但是activeMQ5.1版本的实现,问题就出在第三步的顺序读入。因为从文件中读入它有个先决条件,那就是必须要有可用的内存,如果没有可用的话,就放弃本次消息读入,并且应该放弃这次读取操作。但是5.1版本是继续往下读,这就导致顺序错乱,使得当内存可用的时候,读入的消息在进行审查的时候,发生错误,错误认为它们是重复消息。这就导致发送20W条消息,不能保证完全收到。

通过上图,我们以TCP协议来分析ActiveMQ的通讯机制

先明确以下概念:

客户(Client):消息生产者和消费者对于MQ都是客户。

消息中转器(Message Broker):它是MQ的核心,负责处理消息。

通过以下三个方面说明MQ如何通讯:

  • 建立连接

  • 关闭链接

  • 心跳

建立链接:

  1. activeMQ初始化时,通过TcpTransportServer类根据配置打开TCP侦听端口,客户通过该端口发起建立链接的动作。

  2. 把accept的Socket放入阻塞队列中。 

  3. 另外一个线程Socket handler阻塞着等待队列中是否有新的Socket,如果有则取出来

  4. 生成一个TransportConnection的实例。TransportConnection类的主要作用是处理链路的状态信息,并实现CommandVisitor接口来完成各类消息的处理

  5. TransportConnection会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序:MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环就是TcpTransport类,它是实际和Client获取和发送数据的地方,该类的重要方法有run()和oneway(),一个负责读取,一个负责发送。 

  6. 建链完成,可以进行通讯操作。 

关闭链接:

activeMQ发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的 int n = in.read(buffer, position, buffer.length - position);

心跳:

为了更好的维护TCP链路的使用,activeMQ采用了心跳机制作为判断双方链路的健康情况。activeMQ使用的是双向心跳,也就是activeMQ的Broker和Client双方都进行相互心跳,但不管是Broker或Client心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍

心跳会产生两个线程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回true。

目录
相关文章
|
5月前
|
消息中间件 Java Maven
消息中间件系列教程(12) -RabbitMQ-消息确认机制
消息中间件系列教程(12) -RabbitMQ-消息确认机制
44 0
|
5月前
|
消息中间件 Java Maven
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
48 0
|
3月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
42 0
|
9月前
|
消息中间件
|
8月前
|
消息中间件 Java API
可靠消息传递的选择:深入了解 Apache ActiveMQ 消息队列
在分布式系统中,可靠的消息传递是实现异步通信、解耦和数据同步的关键。Apache ActiveMQ,作为一款开源的消息队列系统,为开发者提供了一个强大的工具来实现可靠的消息传递。本文将为您详细介绍 Apache ActiveMQ 的核心概念、特性以及在分布式架构中的应用。
107 0
|
存储 消息中间件 缓存
ActiveMQ系列:ActiveMQ的持久化机制
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
194 0
ActiveMQ系列:ActiveMQ的持久化机制
|
消息中间件
springbooot整合ActiveMQ实现消息队列和主题
springbooot整合ActiveMQ实现消息队列和主题
174 0
|
消息中间件 XML 开发框架
|
消息中间件 Java Spring
消息中间件系列三、JMS和activeMQ的简单使用
一、JMS 1、什么是JMS   JMS(JAVA Message Service,java消息服务)本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
4061 1
|
存储 消息中间件 监控
消息中间件系列四、认识AMQP和RabbiyMq的简单使用
AMQP AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制 。
3308 0