上篇文章中详细介绍了ActiveMQ。本文继续介绍ActiveMQ的具体操作
ActiveMQ
处理对象消息
1.定义消息载体对象
/** * Order Bean * 定义消息载体类型. 即要在ActiveMQ中传递的数据实体类型. * 消息载体对象必须实现接口java.io.Serializable, 因为消息需要在网络中传递,要求必须可序列化 * @author dengp * */ public class Order implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String nick; private Long price; private Date createTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getNick() { return nick; } public void setNick(String nick) { this.nick = nick; } public Long getPrice() { return price; } public void setPrice(Long price) { this.price = price; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public static long getSerialversionuid() { return serialVersionUID; } @Override public String toString() { return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]"; } }
2.定义消息生产者
/** * ActiveMQ中的生产者(Producer) * @author dengp * */ public class OrderProducer { public void sendhello2ActiveMq(Order messageObject) { ConnectionFactory factory = null; Connection conn = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); // 创建链接对象 conn = factory.createConnection(); // 启动连接对象 conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地,目的地的命名既是队列的命令 destination = session.createQueue("MQ-Hello-Object"); producer = session.createProducer(destination); // 创建消息对象. 此消息是对象消息, 其中保存数据为对象. message = session.createObjectMessage(messageObject); // 发送消息 producer.send(message); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收消息发送者资源 if (null != producer) producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
3.定义消息消费者
/** * ActiveMQ中的消费者(Consumer) * @author dengp * */ public class OrderConsumer { public void reciveOrderFormActiveMq() { ConnectionFactory factory = null; Connection conn = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("MQ-Hello-Object"); // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地. consumer = session.createConsumer(destination); // 从ActiveMQ中获取消息 message = consumer.receive(); Object obj = ((ObjectMessage)message).getObject(); System.out.println("ActiveMQ获取的消息是:"+obj); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收消息发送者资源 if (null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
4.测试
生产者测试
public static void main(String[] args) { OrderProducer pro = new OrderProducer(); Order order = new Order(); order.setId("100"); order.setNick("波波烤鸭"); order.setPrice(9999l); order.setCreateTime(new Date()); pro.sendhello2ActiveMq(order); }
消费者测试
获取到了相关的信息
实现队列服务监听
1.观察者模式
1.1事件源
事件发生的源头。 监听器监听的具体位置。
1.2事件
具体触发的事件。 如: 单击事件, 双击事件 等。
其中必然包含事件源信息。
1.3监听器
处理事件的代码逻辑。
2.定义生成者代码
package com.dpb.observe; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ中的生产者(Producer) * @author dengp * */ public class MyProducer { public void sendhello2ActiveMq(String messageText) { // 连接工厂,用于创建Connection对象 ConnectionFactory factory = null; // activeMQ 连接对象 Connection conn = null; // 一次和ActiveMQ的持久会话对象 Session session = null; // 目的地 Destination destination = null; // 消息发送者 MessageProducer producer = null; // 封装消息的对象 Message message = null; try { /* * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. * 构造方法: public ActiveMQConnectionFactory(String userName, String password, * String brokerURL) * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议. */ factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); // 创建链接对象 conn = factory.createConnection(); // 启动连接对象 conn.start(); /* * 创建会话对象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事务, * 可选值为true|false * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. * acknowledgeMode - 消息确认机制, 可选值为: * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制 */ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地,目的地的命名既是队列的命令 destination = session.createQueue("MQ-Hello-observe"); // 创建消息生成者, 创建的消息生成者与某目的地对应, 即方法参数目的地. producer = session.createProducer(destination); // 创建消息对象,创建一个文本消息对象。此消息对象中保存要传递的文本数据. message = session.createTextMessage(messageText); // 发送消息 producer.send(message); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收消息发送者资源 if (null != producer) producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
3.定义消费者代码
package com.dpb.observe; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiveMQ中的消费者(Consumer) * @author dengp * */ public class MyConsumer { public void reciveHelloFormActiveMq() { // 连接工厂,用于创建Connection对象 ConnectionFactory factory = null; // activeMQ 连接对象 Connection conn = null; // 一次和ActiveMQ的持久会话对象 Session session = null; // 目的地 Destination destination = null; // 消息消费者 MessageConsumer consumer = null; // 封装消息的对象 Message message = null; try { /* * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. * 构造方法: public ActiveMQConnectionFactory(String userName, String password, * String brokerURL) * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议. */ factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616"); // 创建链接对象 conn = factory.createConnection(); // 启动连接对象 conn.start(); /* * 创建会话对象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事务, * 可选值为true|false * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. * acknowledgeMode - 消息确认机制, 可选值为: * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制 */ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地,目的地的命名既是队列的命令 destination = session.createQueue("MQ-Hello-observe"); // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地. consumer = session.createConsumer(destination); // 监听ActiveMQ服务中的消息,当发现消息的时候,自动处理 consumer.setMessageListener(new MessageListener() { /** * 当用消息到来的时候触发该方法,在该方法中处理消息 */ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; String messageString = null; try { messageString = textMessage.getText(); } catch (JMSException e) { e.printStackTrace(); messageString = "处理消息失败!!"; } System.out.println("处理的消息内容是 : " + messageString); } }); // 阻塞进程 System.in.read(); } catch (Exception e) { e.printStackTrace(); System.out.println("访问ActiveMQ服务发生错误!!"); } finally { try { // 回收消息发送者资源 if (null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收会话资源 if (null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收链接资源 if (null != conn) conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
4.测试
生产者
public static void main(String[] args) { MyProducer pro = new MyProducer(); pro.sendhello2ActiveMq("你好啊...listener"); }
消费者
public static void main(String[] args) { MyConsumer con = new MyConsumer(); con.reciveHelloFormActiveMq(); }
web页面中可以看到还在线的消费者。