打开Eclipse
创建Maven工程
项目创建完毕 需要配置一下 pom.xml
在pom.xml中 添加ActiveMQ架包
添加一个com.tong文件夹 在里面创建一个名为HelloWordProducer的类
如果javax.jms 报错可以在pom.xml里面添加架包
编写消息的生产者
package com.tong; import javax.jms.ConnectionFactory; import javax.jms.Connection; import javax.jms.Session; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Message; import org.apache.*; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloWordProducer { /*** * 生产消息 */ public void sendHelloWordActiveMQ(String msgTest) { //定义链接工厂 ConnectionFactory connectionFactory=null; //定义链接对象 Connection connection=null; //定义会话 Session session=null; //定义目的地 Destination destination=null; //定义消息的发送者 MessageProducer producer=null; //定义消息 Message messag=null; try { /*** * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号 */ connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616"); //创建连接对象 connection =connectionFactory.createConnection(); //启动连接对象 connection.start(); /** * transacted:是否使用事务:true|false * true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED * false:不适用事务,设置次变量 则acknowledge参数必须设置 * * acknowledgeMode : * Session.AUTO_ACKNOWLEDGE:自动消息确认机制 * Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列 */ session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列 destination=session.createQueue("helloword-destination"); //创建消息的生产者 producer=session.createProducer(destination); //创建消息对象 messag =session.createTextMessage(msgTest); //发送消息 producer.send(messag); }catch(Exception e) { e.printStackTrace(); }finally { //回收消息发送者资源 if(producer!=null) { try { producer.close(); }catch(Exception e) { e.printStackTrace(); } } if(session!=null) { try { session.close(); }catch(Exception e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); }catch(Exception e) { e.printStackTrace(); } } } } }
编写消息的消费者
创建新的Maven项目
pom.xml 文件和消息生产者 一致
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tong</groupId> <artifactId>mq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api --> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <!-- MQ --> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.0.6.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> </dependencies> </project>
package com.tong; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloworldConsumer { /*** * 生产消息 */ public void sendHelloWordActiveMQ() { //定义链接工厂 ConnectionFactory connectionFactory=null; //定义链接对象 Connection connection=null; //定义会话 Session session=null; //定义目的地 Destination destination=null; //定义消息的消费者 MessageConsumer consumer=null; //定义消息 Message messag=null; try { /*** * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号 */ connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616"); //创建连接对象 connection =connectionFactory.createConnection(); //启动连接对象 connection.start(); /** * transacted:是否使用事务:true|false * true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED * false:不适用事务,设置次变量 则acknowledge参数必须设置 * * acknowledgeMode : * Session.AUTO_ACKNOWLEDGE:自动消息确认机制 * Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列 */ session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列 destination=session.createQueue("helloword-destination"); //创建消息的消费者 consumer=session.createConsumer(destination); //创建消息对象 messag = consumer.receive(); //处理消息 String msg= ((TextMessage)messag).getText(); System.out.println("从ActiveMQ服务中获取文本信息"+msg); }catch(Exception e) { e.printStackTrace(); }finally { //回收消息发送者资源 if(consumer!=null) { try { consumer.close(); }catch(Exception e) { e.printStackTrace(); } } if(session!=null) { try { session.close(); }catch(Exception e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); }catch(Exception e) { e.printStackTrace(); } } } } }
创建Test.java当启动类
package com.tong; import com.tong.HelloWordProducer; public class Test { public static void main(String[] args) { HelloWordProducer producer=new HelloWordProducer(); producer.sendHelloWordActiveMQ("HelloWord"); } }
处理对象消息
关键必须实现 Serializable 接口
package com.tong; import java.io.Serializable; public class Users implements Serializable { private int userid; private String username; private int userage; @Override public String toString() { return "Users [userid=" + userid + ", username=" + username + ", userage=" + userage + "]"; } public int getUserid() { return userid; } public void setUserid(int userid) { this.userid = userid; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public int getUserage() { return userage; } public void setUserage(int userage) { this.userage = userage; } }
生产者
package com.tong; import javax.jms.ConnectionFactory; import javax.jms.Connection; import javax.jms.Session; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Message; import org.apache.*; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloWordProducer2 { /*** * 生产消息 */ public void sendHelloWordActiveMQ(Users user) { //定义链接工厂 ConnectionFactory connectionFactory=null; //定义链接对象 Connection connection=null; //定义会话 Session session=null; //定义目的地 Destination destination=null; //定义消息的发送者 MessageProducer producer=null; //定义消息 Message messag=null; try { /*** * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号 */ connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616"); //创建连接对象 connection =connectionFactory.createConnection(); //启动连接对象 connection.start(); /** * transacted:是否使用事务:true|false * true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED * false:不适用事务,设置次变量 则acknowledge参数必须设置 * * acknowledgeMode : * Session.AUTO_ACKNOWLEDGE:自动消息确认机制 * Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列 */ session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列 destination=session.createQueue("my-users"); //创建消息的生产者 producer=session.createProducer(destination); //创建消息对象 messag =session.createObjectMessage(user); //发送消息 producer.send(messag); }catch(Exception e) { e.printStackTrace(); }finally { //回收消息发送者资源 if(producer!=null) { try { producer.close(); }catch(Exception e) { e.printStackTrace(); } } if(session!=null) { try { session.close(); }catch(Exception e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); }catch(Exception e) { e.printStackTrace(); } } } } }
消费者
package com.tong; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloworldConsumer2 { /*** * 生产消息 */ public void sendHelloWordActiveMQ() { //定义链接工厂 ConnectionFactory connectionFactory=null; //定义链接对象 Connection connection=null; //定义会话 Session session=null; //定义目的地 Destination destination=null; //定义消息的消费者 MessageConsumer consumer=null; //定义消息 Message messag=null; try { /*** * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号 */ connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616"); //创建连接对象 connection =connectionFactory.createConnection(); //启动连接对象 connection.start(); /** * transacted:是否使用事务:true|false * true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED * false:不适用事务,设置次变量 则acknowledge参数必须设置 * * acknowledgeMode : * Session.AUTO_ACKNOWLEDGE:自动消息确认机制 * Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制 * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列 */ session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列 destination=session.createQueue("my-users"); //创建消息的消费者 consumer=session.createConsumer(destination); //创建消息对象 messag = consumer.receive(); //处理消息 ObjectMessage objMessage=(ObjectMessage) messag; Users users=(Users)objMessage.getObject(); System.out.println(users); }catch(Exception e) { e.printStackTrace(); }finally { //回收消息发送者资源 if(consumer!=null) { try { consumer.close(); }catch(Exception e) { e.printStackTrace(); } } if(session!=null) { try { session.close(); }catch(Exception e) { e.printStackTrace(); } } if(connection!=null) { try { connection.close(); }catch(Exception e) { e.printStackTrace(); } } } } }
测试启动类
package com.tong; import com.tong.HelloWordProducer; public class Test { public static void main(String[] args) { // HelloWordProducer producer=new HelloWordProducer(); // producer.sendHelloWordActiveMQ("HelloWord"); Users users=new Users(); users.setUserid(1); users.setUsername("童小纯"); users.setUserage(20); HelloWordProducer2 producer=new HelloWordProducer2(); producer.sendHelloWordActiveMQ(users); } }