最近偶然接触到了JMS,故学习一下
1、下载&文档&视频
无疑官网是最好的选择(如果你英语可以的话),上面有最全的国外大牛的视频和博客讲解。http://activemq.apache.org/
首先通过http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip下载压缩包(最新版本5.10.0)
2、ActiveMQ优点
1.多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3.对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
5.通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
6.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
7.从设计上保证了高性能的集群,客户端-服务器,点对点
8.支持Ajax
9.支持与Axis的整合
10.可以很容易得调用内嵌JMS provider,进行测试
3、启动服务
把下载好的压缩包解压至任意磁盘,打开目录。如下:
打开bin中的win64文件夹(我的是win7 64位系统)执行activemq.bat
启动后在浏览器中输入http://localhost:8161/admin/会出现提示框要求输入用户名和密码
输入admin - admin即可进入
这些都是默认配置,都可以在conf文件夹中进行配置。我们先用默认的进行测试就行了
下面的这个Queues你们是没有东西的,我这个my-queue是我之前用来测试的队列,没有删除。
4、编写测试代码
首先往项目中导入下载的文件夹中的jar包 activemq-all-5.10.0.jar
(1)ProducerTool类
- package com.activemq.producer;
-
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ProducerTool {
-
- private String user = ActiveMQConnection.DEFAULT_USER ;
-
- private String password = ActiveMQConnection.DEFAULT_PASSWORD ;
-
- private String url = ActiveMQConnection.DEFAULT_BROKER_URL ;
-
- private String subject = "benjamin" ;
-
- private Destination destination = null ;
-
- private Connection connection = null ;
-
- private Session session = null ;
-
- private MessageProducer producer = null ;
-
- //初始化
- private void initialize() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url) ;
- connection = connectionFactory.createConnection() ;
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) ;
- destination = session.createQueue(subject) ;
- producer = session.createProducer(destination) ;
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) ;
- }
-
- //发送消息
- public void produceMessage(String message) throws Exception {
- initialize() ;
- TextMessage msg = session.createTextMessage(message) ;
- connection.start() ;
- System.out.println("Producer:->Sending message: " + message);
- producer.send(msg) ;
- System.out.println("Producer:->Message sent complete!");
- }
-
- //关闭连接
- public void close() throws JMSException {
- System.out.println("Producer:->Closing Connection");
- if(producer != null) {
- producer.close() ;
- }
- if(session != null) {
- session.close() ;
- }
- if(connection != null) {
- connection.close() ;
- }
- }
- }
(2)ConsumerTool类
- package com.activemq.consumer;
-
- import javax.jms.Connection;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class ConsumerTool implements MessageListener{
-
- private String user = ActiveMQConnection.DEFAULT_USER ;
-
- private String password = ActiveMQConnection.DEFAULT_PASSWORD ;
-
- private String url = ActiveMQConnection.DEFAULT_BROKER_URL ;
-
- private String subject = "benjamin" ;
-
- private Destination destination = null ;
-
- private Connection connection = null ;
-
- private Session session = null ;
-
- private MessageConsumer consumer = null ;
-
- //初始化
- public void initialize() throws JMSException {
- //连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url) ;
- //连接工厂创建一个jms connection
- connection = connectionFactory.createConnection() ;
- //是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) ;
- //目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅
- destination = session.createQueue(subject) ;
- //会话创建消息的生产者将消息发送到目的地
- consumer = session.createConsumer(destination) ;
- }
-
- //消费消息
- public void consumeMessage() throws JMSException {
- initialize() ;
- connection.start() ;
-
- System.out.println("Consumer:->Begin listening...");
- //开始监听
- //如果不想监听,想去主动的接受消息,只需要改为receive()即可
- consumer.setMessageListener(this) ;
- // Message message = consumer.receive();
- }
-
- //关闭连接
- public void close() throws JMSException {
- System.out.println("consumer:->Closing Connection");
- if(consumer != null) {
- consumer.close() ;
- }
- if(session != null) {
- session.close() ;
- }
- if(connection != null) {
- connection.close() ;
- }
- }
-
- //消息处理函数
- @Override
- public void onMessage(Message message) {
- try {
- if(message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message ;
- String msg = txtMsg.getText() ;
- System.out.println("Consumer:->Received: " + msg);
- }else {
- System.out.println("Consumer:->Received: " + message);
- }
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
(3)测试TestJMS类
- package com.activemq.testjms;
-
- import javax.jms.JMSException;
-
- import org.apache.activemq.ActiveMQConnection;
-
- import com.activemq.consumer.ConsumerTool;
- import com.activemq.producer.ProducerTool;
-
- public class TestJMS {
- public static void main(String[] args) throws JMSException, Exception {
- ConsumerTool ct = new ConsumerTool() ;
- ProducerTool pt = new ProducerTool() ;
- System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL + "-------------");
- //开始监听
- ct.consumeMessage() ;
-
- //延迟500毫秒后发送消息
- Thread.sleep(500) ;
- pt.produceMessage("Hello World!") ;
- pt.close() ;
-
- //延迟500毫秒后停止接收信息
- Thread.sleep(500) ;
- ct.close() ;
- }
- }
输出台会输出:
因为我们在程序中创建了名为“Benjamin”的队列,所以我们打开刚才的管理ActiveMQ的页面,可以看到Queue下面多了个队列。