JMS始终在JavaEE五花八门的协议里,WebService满天飞的时候占一位置,是因为:
它可以把不影响用户执行结果又比较耗时的任务(比如发邮件通知管理员)异步的扔给JMS 服务端去做,而尽快的把屏幕返还给用户。服务端能够多线程排队响应高并发的请求。可以在Java世界里达到最高的解耦。客户端与服务端无需直连,甚至无需知晓对方是谁、在哪里、有多少人,只要对流过的信息作响应就行了,在企业应用环境复杂时作用明显。
JMS(Java Message Service)即 java消息服务 用于分布式系统得消息通信,JMS是J2EE技术中的一个部分,Sun公司定义了JMS的标准接口,即javax.jms.*。JMS服务使得分布式系统的信息通信松散连接的,发送信息的客户端只需要负责发送信息,接收信息的客户端接收信息,两个客户端之间没有必要是同时可用的,甚至发送客户端都没有必要知道客户端的信息,只要负责发送到接收的服务端就可以,JMS可以说是低耦合的。同时JMS API 做到了以下2点:
异步的,服务端可以发送信息到一个客户端,客户端不需要为了收到信息而请求信息。
异步的,服务端可以发送信息到一个客户端,客户端不需要为了收到信息而请求信息。
可靠的,JMS API保证了服务端所有发送的信息最少发送一次和只发送一次,下面介绍一些JMS API 的基本知识
JMS API 基本组成
JMS provider服务者,是一个消息系统通过实现JMS API 接口,用于管理和控制信息。l
l JMS client 客户端 用于发送和接收信息,通常是用在java程序中用java编写的
Messagesl 用于在客户端信息通信的对象
Administered Objects 由JMSl provider为了client创建的对象,通常是connectionFactory和destination
l JMS client 客户端 用于发送和接收信息,通常是用在java程序中用java编写的
Messagesl 用于在客户端信息通信的对象
Administered Objects 由JMSl provider为了client创建的对象,通常是connectionFactory和destination
Messages 通信方式
JMS通信方式分为点对点通信,和发布/订阅方式
JMS通信方式分为点对点通信,和发布/订阅方式
点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息,具体的信息就是 Sender Client发送Message 到Queue ,而 receiver Cliernt 从Queue中接收消息和发送消息已接受到Quere,确认消息接收。在使用点对点方式时需要注意,一条消息只有一个接收端,消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息,具体的信息就是 Sender Client发送Message 到Queue ,而 receiver Cliernt 从Queue中接收消息和发送消息已接受到Quere,确认消息接收。在使用点对点方式时需要注意,一条消息只有一个接收端,消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式
作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。
发布/订阅方式用于多接收客户端的方式
作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。
作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
基础知识这里说的很少,具体的可以参照java.sun.com.的JMS API 指南
ActiveMQ的特性:
完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,也是Apache Geronimo默认的JMS provider。
POJO withdout EJB Container,不需要实现EJB繁琐复杂的Message Bean接口和配置。
Spring Base,可以使用Spring的各种特性如IOC、AOP 。
Effective,基于Jencks的JCA Container实现 pool connection,control transactions and manage security。
下面是它的特性列表
1.多种语言和协议编写客户端
语言: Java, C, C++, C#, Ruby, Perl, Python, PHP
应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3.对Spring的支持,ActiveMQ可以很容得内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4.通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,
可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.支持通过JDBC和journal提供高速的消息持久化
7.从设计上保证了高性能的集群,客户端-服务器,点对点
8.支持Ajax
9.支持与Axis的整合
10.可以很容易得调用内嵌JMS provider,进行测试
1.多种语言和协议编写客户端
语言: Java, C, C++, C#, Ruby, Perl, Python, PHP
应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3.对Spring的支持,ActiveMQ可以很容得内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4.通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,
可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.支持通过JDBC和journal提供高速的消息持久化
7.从设计上保证了高性能的集群,客户端-服务器,点对点
8.支持Ajax
9.支持与Axis的整合
10.可以很容易得调用内嵌JMS provider,进行测试
作为一个JMS client 都需要以下的组成
Administered Objects:ConnectionFactory,Destination
Connection;
Session
Message Producers/Message Cosumers
Messages
Administered Objects:ConnectionFactory,Destination
Connection;
Session
Message Producers/Message Cosumers
Messages
Hello实例
=================================Hello.java===========================
import java.io.Serializable;
/**Hello.java用来传递JAVA对象
* Author: cjp
* Date: 2005-11-8
* Time: 22:24:02
*/
public class Hello implements Serializable {
private String id;
private Hello hello;
private PointList pointList;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Hello getHello() {
return hello;
}
public void setHello(Hello hello) {
this.hello = hello;
}
}
=========================SpringTest .java========================
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.AbstractDependencyInjectionSpringContextTests;
import javax.jms.*;
/**
*发送JMS消息
*/
public class SpringTest extends AbstractDependencyInjectionSpringContextTests
{
protected String[] getConfigLocations()
{
return new String[]{"file:D:\\wosame\\test\\com\\wosame\\room\\jms\\jms.xml"};
}
public void testSendMessage() throws Exception
{
JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator()
{
public Message createMessage(Session session) throws JMSException
{
ObjectMessage message=session.createObjectMessage();
Hello hello=new Hello();
hello.setId("test");
message.setObject(hello);
return message;
}
});
}
}
================================HelloMDP .java==================================
/**
处理JMS消息
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
public class HelloMDP implements MessageListener
{
protected Log log = LogFactory.getLog(HelloMDP.class);
public void onMessage(Message message)
{
try
{
ObjectMessage objMessage = (ObjectMessage) message;
Hello hello= (Hello) objMessage.getObject();
System.out.println("hello.getId() = " + hello.getId());
} catch (JMSException e)
{
log.error("Parse failed", e);
}
}
}
================================jms.xml==================================
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
" http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<!--嵌入式的JMS连接,也就是跟随JVM一起启动,可以参看activemq的文档-->
<bean id="connectionFactory" class="org.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost"/>
<property name="useEmbeddedBroker" value="true"/>
</bean>
<!--消息监听器,也就是消息的具体的处理器-->
<bean id="HelloMDP" class="HelloMDP"/>
<!--jms监听需要JTA容器的支持-->
<bean id="activeMQContainer" class="org.activemq.jca.JCAContainer">
<property name="workManager">
<bean id="workManager" class="org.activemq.work.SpringWorkManager"/>
</property>
<property name="resourceAdapter">
<bean id="activeMQResourceAdapter" class="org.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="vm://localhost"/>
</bean>
</property>
</bean>
<!--消息的消费者,也就是将监听器与具体的队列关联-->
<bean id="HelloQueueConsumer" factory-method="addConnector" factory-bean="activeMQContainer">
<property name="activationSpec">
<bean class="org.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="Hello.Queue"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="ref" value="HelloMDP"/>
</bean>
<!--spring的JMS template,用来发送JMS消息到指定的队列-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="Hello.Queue"/>
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
import java.io.Serializable;
/**Hello.java用来传递JAVA对象
* Author: cjp
* Date: 2005-11-8
* Time: 22:24:02
*/
public class Hello implements Serializable {
private String id;
private Hello hello;
private PointList pointList;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Hello getHello() {
return hello;
}
public void setHello(Hello hello) {
this.hello = hello;
}
}
=========================SpringTest .java========================
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.AbstractDependencyInjectionSpringContextTests;
import javax.jms.*;
/**
*发送JMS消息
*/
public class SpringTest extends AbstractDependencyInjectionSpringContextTests
{
protected String[] getConfigLocations()
{
return new String[]{"file:D:\\wosame\\test\\com\\wosame\\room\\jms\\jms.xml"};
}
public void testSendMessage() throws Exception
{
JmsTemplate jmsTemplate = (JmsTemplate) applicationContext.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator()
{
public Message createMessage(Session session) throws JMSException
{
ObjectMessage message=session.createObjectMessage();
Hello hello=new Hello();
hello.setId("test");
message.setObject(hello);
return message;
}
});
}
}
================================HelloMDP .java==================================
/**
处理JMS消息
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
public class HelloMDP implements MessageListener
{
protected Log log = LogFactory.getLog(HelloMDP.class);
public void onMessage(Message message)
{
try
{
ObjectMessage objMessage = (ObjectMessage) message;
Hello hello= (Hello) objMessage.getObject();
System.out.println("hello.getId() = " + hello.getId());
} catch (JMSException e)
{
log.error("Parse failed", e);
}
}
}
================================jms.xml==================================
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
" http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<!--嵌入式的JMS连接,也就是跟随JVM一起启动,可以参看activemq的文档-->
<bean id="connectionFactory" class="org.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost"/>
<property name="useEmbeddedBroker" value="true"/>
</bean>
<!--消息监听器,也就是消息的具体的处理器-->
<bean id="HelloMDP" class="HelloMDP"/>
<!--jms监听需要JTA容器的支持-->
<bean id="activeMQContainer" class="org.activemq.jca.JCAContainer">
<property name="workManager">
<bean id="workManager" class="org.activemq.work.SpringWorkManager"/>
</property>
<property name="resourceAdapter">
<bean id="activeMQResourceAdapter" class="org.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="vm://localhost"/>
</bean>
</property>
</bean>
<!--消息的消费者,也就是将监听器与具体的队列关联-->
<bean id="HelloQueueConsumer" factory-method="addConnector" factory-bean="activeMQContainer">
<property name="activationSpec">
<bean class="org.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="Hello.Queue"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="ref" value="HelloMDP"/>
</bean>
<!--spring的JMS template,用来发送JMS消息到指定的队列-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="Hello.Queue"/>
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
例子二 MQ源码
import java.util.Arrays;
import java.util.Date;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
/**
* A simple tool for publishing messages
*
* @version $Revision: 1.2 $
*/
public class ProducerTool {
* A simple tool for publishing messages
*
* @version $Revision: 1.2 $
*/
public class ProducerTool {
private Destination destination;
private int messageCount = 10;
private long sleepTime = 0L;
private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private boolean topic = false;
private boolean transacted = false;
private boolean persistent = false;
private int messageCount = 10;
private long sleepTime = 0L;
private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private boolean topic = false;
private boolean transacted = false;
private boolean persistent = false;
public static void main(String[] args) {
ProducerTool producerTool = new ProducerTool();
String[] unknonwn = CommnadLineSupport.setOptions(producerTool, args);
if( unknonwn.length > 0 ) {
System.out.println("Unknown options: "+Arrays.toString(unknonwn));
System.exit(-1);
}
producerTool.run();
}
ProducerTool producerTool = new ProducerTool();
String[] unknonwn = CommnadLineSupport.setOptions(producerTool, args);
if( unknonwn.length > 0 ) {
System.out.println("Unknown options: "+Arrays.toString(unknonwn));
System.exit(-1);
}
producerTool.run();
}
public void run() {
Connection connection=null;
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue" + ": " + subject);
System.out.println("Using " + (persistent ? "persistent" : "non-persistent" + " messages" ;
System.out.println("Sleeping between publish " + sleepTime + " ms" ;
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + " ms" ;
}
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0)
producer.setTimeToLive(timeToLive);
// Start sending messages
sendLoop(session, producer);
Connection connection=null;
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue" + ": " + subject);
System.out.println("Using " + (persistent ? "persistent" : "non-persistent" + " messages" ;
System.out.println("Sleeping between publish " + sleepTime + " ms" ;
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + " ms" ;
}
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0)
producer.setTimeToLive(timeToLive);
// Start sending messages
sendLoop(session, producer);
System.out.println("Done." ;
// Use the ActiveMQConnection interface to dump the connection stats.
ActiveMQConnection c = (ActiveMQConnection) connection;
c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
// Use the ActiveMQConnection interface to dump the connection stats.
ActiveMQConnection c = (ActiveMQConnection) connection;
c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void sendLoop(Session session, MessageProducer producer)
throws Exception {
throws Exception {
for (int i = 0; i < messageCount || messageCount == 0; i++) {
TextMessage message = session
.createTextMessage(createMessageText(i));
.createTextMessage(createMessageText(i));
if (verbose) {
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: " + msg);
}
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: " + msg);
}
producer.send(message);
if (transacted) {
session.commit();
}
if (transacted) {
session.commit();
}
Thread.sleep(sleepTime);
}
}
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
public void setPersistent(boolean durable) {
this.persistent = durable;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}
本文转自elbertchen 51CTO博客,原文链接:http://blog.51cto.com/linkyou/283240,如需转载请自行联系原作者