ActiveMQ点对点消息传递

简介: 上篇文章中详细介绍了ActiveMQ。本文继续介绍ActiveMQ的具体操作

上篇文章中详细介绍了ActiveMQ。本文继续介绍ActiveMQ的具体操作

ActiveMQ

处理对象消息

1.定义消息载体对象

/**
 * Order Bean
 * 定义消息载体类型. 即要在ActiveMQ中传递的数据实体类型.
 * 消息载体对象必须实现接口java.io.Serializable, 因为消息需要在网络中传递,要求必须可序列化
 * @author dengp
 *
 */
public class Order implements Serializable{
  private static final long serialVersionUID = 1L;
  private String id;
  private String nick;
  private Long price;
  private Date createTime;
  public String getId() {
    return id;
  }
  public void setId(String id) {
    this.id = id;
  }
  public String getNick() {
    return nick;
  }
  public void setNick(String nick) {
    this.nick = nick;
  }
  public Long getPrice() {
    return price;
  }
  public void setPrice(Long price) {
    this.price = price;
  }
  public Date getCreateTime() {
    return createTime;
  }
  public void setCreateTime(Date createTime) {
    this.createTime = createTime;
  }
  public static long getSerialversionuid() {
    return serialVersionUID;
  }
  @Override
  public String toString() {
    return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]";
  }
}


2.定义消息生产者

/**
 * ActiveMQ中的生产者(Producer)
 * @author dengp
 *
 */
public class OrderProducer {
  public void sendhello2ActiveMq(Order messageObject) {
    ConnectionFactory factory = null;
    Connection conn = null;
    Session session = null;
    Destination destination = null;
    MessageProducer producer = null;
    Message message = null;
    try {
      factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
      // 创建链接对象
      conn = factory.createConnection();
      // 启动连接对象
      conn.start();
      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      // 创建目的地,目的地的命名既是队列的命令
      destination = session.createQueue("MQ-Hello-Object");
      producer = session.createProducer(destination);
      // 创建消息对象. 此消息是对象消息, 其中保存数据为对象.
      message = session.createObjectMessage(messageObject);
      // 发送消息
      producer.send(message);
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("访问ActiveMQ服务发生错误!!");
    } finally {
      try {
        // 回收消息发送者资源
        if (null != producer)
          producer.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收会话资源
        if (null != session)
          session.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收链接资源
        if (null != conn)
          conn.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}


3.定义消息消费者

/**
 * ActiveMQ中的消费者(Consumer)
 * @author dengp
 *
 */
public class OrderConsumer {
  public void reciveOrderFormActiveMq() {
    ConnectionFactory factory = null;
    Connection conn = null;
    Session session = null;
    Destination destination = null;
    MessageConsumer consumer = null;
    Message message = null;
    try {
      factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
      conn = factory.createConnection();
      conn.start();
      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      destination = session.createQueue("MQ-Hello-Object");
      // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.
      consumer = session.createConsumer(destination);
      // 从ActiveMQ中获取消息
      message = consumer.receive();
      Object obj = ((ObjectMessage)message).getObject();
      System.out.println("ActiveMQ获取的消息是:"+obj);
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("访问ActiveMQ服务发生错误!!");
    } finally {
      try {
        // 回收消息发送者资源
        if (null != consumer)
          consumer.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收会话资源
        if (null != session)
          session.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收链接资源
        if (null != conn)
          conn.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}


4.测试

生产者测试

public static void main(String[] args) {
  OrderProducer pro = new OrderProducer();
  Order order = new Order();
  order.setId("100");
  order.setNick("波波烤鸭");
  order.setPrice(9999l);
  order.setCreateTime(new Date());
  pro.sendhello2ActiveMq(order);
}


20190216192926598.png

消费者测试

获取到了相关的信息

20190216193138589.png20190216193216119.png

实现队列服务监听

1.观察者模式

1.1事件源

事件发生的源头。 监听器监听的具体位置。

1.2事件

具体触发的事件。 如: 单击事件, 双击事件 等。

其中必然包含事件源信息。

1.3监听器

处理事件的代码逻辑。

Java观察者模式(Observer)


2.定义生成者代码

package com.dpb.observe;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * ActiveMQ中的生产者(Producer)
 * @author dengp
 *
 */
public class MyProducer {
  public void sendhello2ActiveMq(String messageText) {
    // 连接工厂,用于创建Connection对象
    ConnectionFactory factory = null;
    // activeMQ 连接对象
    Connection conn = null;
    // 一次和ActiveMQ的持久会话对象
    Session session = null;
    // 目的地
    Destination destination = null;
    // 消息发送者
    MessageProducer producer = null;
    // 封装消息的对象
    Message message = null;
    try {
      /*
       * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. 
       * 构造方法: public ActiveMQConnectionFactory(String userName, String password,
       * String brokerURL) 
       * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. 
       * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. 
       * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议.
       */
      factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
      // 创建链接对象
      conn = factory.createConnection();
      // 启动连接对象
      conn.start();
      /*
       * 创建会话对象 
       * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
       * transacted - 是否使用事务, 
       * 可选值为true|false 
       * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 
       * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED 
       * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. 
       * acknowledgeMode - 消息确认机制, 可选值为:
       * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 
       * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 
       * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
       */
      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      // 创建目的地,目的地的命名既是队列的命令
      destination = session.createQueue("MQ-Hello-observe");
      // 创建消息生成者, 创建的消息生成者与某目的地对应, 即方法参数目的地.
      producer = session.createProducer(destination);
      // 创建消息对象,创建一个文本消息对象。此消息对象中保存要传递的文本数据.
      message = session.createTextMessage(messageText);
      // 发送消息
      producer.send(message);
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("访问ActiveMQ服务发生错误!!");
    } finally {
      try {
        // 回收消息发送者资源
        if (null != producer)
          producer.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收会话资源
        if (null != session)
          session.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收链接资源
        if (null != conn)
          conn.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}


3.定义消费者代码

package com.dpb.observe;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * ActiveMQ中的消费者(Consumer)
 * @author dengp
 *
 */
public class MyConsumer {
  public void reciveHelloFormActiveMq() {
    // 连接工厂,用于创建Connection对象
    ConnectionFactory factory = null;
    // activeMQ 连接对象
    Connection conn = null;
    // 一次和ActiveMQ的持久会话对象
    Session session = null;
    // 目的地
    Destination destination = null;
    // 消息消费者
    MessageConsumer consumer = null;
    // 封装消息的对象
    Message message = null;
    try {
      /*
       * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. 
       * 构造方法: public ActiveMQConnectionFactory(String userName, String password,
       * String brokerURL) 
       * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. 
       * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. 
       * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议.
       */
      factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
      // 创建链接对象
      conn = factory.createConnection();
      // 启动连接对象
      conn.start();
      /*
       * 创建会话对象 
       * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); 
       * transacted - 是否使用事务, 
       * 可选值为true|false 
       * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, 
       * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED 
       * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. 
       * acknowledgeMode - 消息确认机制, 可选值为:
       * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 
       * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 
       * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
       */
      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      // 创建目的地,目的地的命名既是队列的命令
      destination = session.createQueue("MQ-Hello-observe");
      // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.
      consumer = session.createConsumer(destination);
      // 监听ActiveMQ服务中的消息,当发现消息的时候,自动处理
      consumer.setMessageListener(new MessageListener() {
        /**
         * 当用消息到来的时候触发该方法,在该方法中处理消息
         */
        @Override
        public void onMessage(Message message) {
          TextMessage textMessage = (TextMessage) message;
          String messageString = null;
          try {
            messageString = textMessage.getText();
          } catch (JMSException e) {
            e.printStackTrace();
            messageString = "处理消息失败!!";
          }
          System.out.println("处理的消息内容是 : " + messageString);
        }
      });
      // 阻塞进程
      System.in.read();
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("访问ActiveMQ服务发生错误!!");
    } finally {
      try {
        // 回收消息发送者资源
        if (null != consumer)
          consumer.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收会话资源
        if (null != session)
          session.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
      try {
        // 回收链接资源
        if (null != conn)
          conn.close();
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}


4.测试

生产者

public static void main(String[] args) {
  MyProducer pro = new MyProducer();
  pro.sendhello2ActiveMq("你好啊...listener");
}

20190216194226777.png

消费者

public static void main(String[] args) {
  MyConsumer con = new MyConsumer();
  con.reciveHelloFormActiveMq();
}


20190216194247635.png20190216194324770.png

web页面中可以看到还在线的消费者。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 开发者
实现AMQP的高效消息传递机制
【8月更文第28天】高级消息队列协议 (AMQP) 是一个为消息中间件设计的开放标准应用层协议。它为消息传递系统提供了标准化的方法,从而确保了高性能和可靠性。本文将详细介绍AMQP中的一些关键特性,并通过示例代码展示如何利用这些特性。
103 2
|
4月前
|
消息中间件 存储 Java
使用RabbitMQ实现可靠的消息传递机制
使用RabbitMQ实现可靠的消息传递机制
|
5月前
|
消息中间件 存储 中间件
中间件消息队列协议异步通信
【6月更文挑战第5天】
47 2
|
消息中间件 Java Maven
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
消息中间件系列教程(03) -ActiveMQ -点对点&发布订阅模式
95 0
|
消息中间件 Java Maven
ActiveMQ向消息队列存入消息(点对点模式)
创建一个maven工程, 引入ActiveMQ的依赖
|
6月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
116 0
|
6月前
|
消息中间件
消息系统:点对点&发布订阅?
消息系统:点对点&发布订阅?
78 0
05JMS点对点模式
05JMS点对点模式
40 0
EMQ如何实现点对点消息和发布订阅消息?
EMQ(Erlang MQTT Broker)通过 MQTT 协议实现了点对点消息和发布订阅消息两种消息传递模式。
453 2
|
消息中间件 Java API
可靠消息传递的选择:深入了解 Apache ActiveMQ 消息队列
在分布式系统中,可靠的消息传递是实现异步通信、解耦和数据同步的关键。Apache ActiveMQ,作为一款开源的消息队列系统,为开发者提供了一个强大的工具来实现可靠的消息传递。本文将为您详细介绍 Apache ActiveMQ 的核心概念、特性以及在分布式架构中的应用。
200 0
下一篇
无影云桌面