一、JMS (JAVA Message Service)
1、 JMS基本概念
JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
2、 JMS五种不同的消息正文格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage -- Java原始值的数据流
- MapMessage--一套名称-值对
- TextMessage--一个字符串对象
- ObjectMessage--一个序列化的 Java对象
- BytesMessage--一个字节的数据流
3、 JMS两种消息模型
3.1 点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
3.2. 发布/订阅(Pub/Sub)模型
4、 JMS编码接口之间的关系
- ConnectionFactory:创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
- Connection:Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
- Session:Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
- MessageProducer:消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
- MessageConsumer :消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
- Destination:Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
- MessageListener:消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
二、消息队列ActiveMQ
1、什么是ActiveMQ
- 首先你得了解什么是MOM:MOM(Message Oriented Middleware)面向消息的中间件,分布式系统的集成,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
- 然后你得知道什么是JMS:`JMS(Java Message Service)Java消息服务,应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
- 最后才是ActiveMQ:Apache下的一个非常流行的消息中间件,使用JAVA支持的JMS Provider实现,所以和JAVA程序完全兼容,开发java项目中间件首选。当然ActiveMQ不仅仅支持JAVA,在C++、Dotnet、Python、Php、Ruby、Websocket等多种客户端都可以提供良好的服务。
2、ActiveMQ的使用场景
其实也就是为什么要使用MQ。
- 异步通信 不需要即时处理的业务,将其放去消息队列中,在需要处理的时候直接去队列中取出来,达到了生产者和消费者不用互相了解对方,生产者只需要专注于生产,消费者专注于消费。
- 解耦 降低工程之间的耦合程度,从设计角度来讲,达到低耦合高内聚的目的。当应用需要维护的时候,不同应用可以独立的扩展或修改,只需要遵循同样的接口约束即可。
- 冗余 消息队列可以对队列中的消息进行持久化处理,防止数据丢失。很多消息队列都采用“插入-获取-删除”的模式,只有当处理数据的过程成功并且返回提示,才会进行消息的删除,否则消息将一直保存在队列之中。
- 过载保护 在请求量突发的高峰期间,为了让系统保持正常工作,又不想每时每刻都按最大峰值投入资源。使用消息队列就可以让关键组件顶住突发压力,不至于让整个系统崩溃。
- 保证有序 消息队列可以对消息进行优先级设定,然后根据优先级来对消息进行排序,达到重要数据优先处理。
- 缓冲 消息队列有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
- 数据流处理 大数据业务需要对数据流进行分析,在消息队列中进行处理是最好不过的。
3、ActiveMQ原理剖析
- 两种运行模型
- PTP点对点通信:使用queue作为信息载体,满足生产者与消费者模式,一个消息只能被一个消费者使用,没有被消费的消息可以持久保持在queue 中等待被消费。
- Pub/Sub发布订阅模式:使用Topic主题作为通信载体,类似于广播模式,在消息广播期间,所有的订阅者都可以接受到广播消息,在一条消息广播之后才订阅的用户是收不到该条消息的。
- ActiveMQ的组成模块
- Broker:消息服务器,作为server提供消息核心服务。
- Producer:消息生产者,业务的发起方,负责生产消息传输给broker。
- Consumer:消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理。
- Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播。
- Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的 queue完成指定消息的接收。
- Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。
- ActiveMQ的常用协议
- AMQP协议 AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消 息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
- MQTT协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时 通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网 物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协 议。
- STOMP协议 STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为 MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提 供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
- OPENWIRE协议 ActiveMQ特有的协议,官方描述如下 OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later. For other languages see the following...
对于ActiveMQ的上述协议,每种协议端口都不一样,可以自行修改。