在项目中导入依赖坐标
<!--使用消息队列,导入依赖坐标--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
使用队列queue的情况
producer端
public static void main(String[] args) { ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616"); try { //创建连接对象 Connection connection = connect.createConnection(); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //使用队列queue Queue testqueue = session.createQueue("boss drink"); //创建提供者 MessageProducer producer = session.createProducer(testqueue); TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("我渴了,我要喝水!"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(textMessage); session.commit();// 事务型消息,必须提交后才生效 connection.close(); } catch (JMSException e) { e.printStackTrace(); } }
consumer端1
public static void main(String[] args) { // 连接 ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616"); try { Connection connection = connect.createConnection(); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination testqueue = session.createQueue("boss drink"); MessageConsumer consumer = session.createConsumer(testqueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ try { String text = ((TextMessage) message).getText(); System.out.println(text+",员工1马上拿起杯子打水。。。"); //session.rollback(); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); }catch (Exception e){ e.printStackTrace();; } }
consumer端2
public static void main(String[] args) { // 连接 ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616"); try { Connection connection = connect.createConnection(); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination testqueue = session.createQueue("boss drink"); MessageConsumer consumer = session.createConsumer(testqueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ try { String text = ((TextMessage) message).getText(); System.out.println(text+",员工2马上拿起杯子打水。。。"); //session.rollback(); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); }catch (Exception e){ e.printStackTrace();; } }
使用topic话题的情况:
producer端:
public static void main(String[] args) { ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616"); try { //创建连接对象 Connection connection = connect.createConnection(); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //使用topic话题(订阅) Topic topic = session.createTopic("boss speak"); //创建提供者 MessageProducer producer = session.createProducer(topic); TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("我们要为中国的伟大复兴而努力奋斗!"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(textMessage); session.commit();// 事务型消息,必须提交后才生效 connection.close(); } catch (JMSException e) { e.printStackTrace(); } }
consumer端1:
public static void main(String[] args) { // 连接 ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616"); try { Connection connection = connect.createConnection(); //设置客户端id connection.setClientID("userOne"); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("boss speak"); //在消费端标记,Session.createDurableSubscriber(Topic topic, String name) //是发布-订阅持久化的接收端的设置。 //参数 topic -> 与发送端的topic 对应,建立连接 //参数 name -> 为订阅者的标识(相当于id) MessageConsumer consumer =session.createDurableSubscriber(topic,"userOne"); //MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ try { String text = ((TextMessage) message).getText(); System.out.println(text+",员工1这个月工资不要了。。。"); //session.rollback(); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); }catch (Exception e){ e.printStackTrace();; } }
consumer端2:
public static void main(String[] args) { // 连接 ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616"); try { Connection connection = connect.createConnection(); connection.start(); //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //Destination :目标 Destination topic = session.createTopic("boss speak"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof TextMessage){ try { String text = ((TextMessage) message).getText(); System.out.println(text+",员工2周末主动来加班。。。"); //session.rollback(); session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); }catch (Exception e){ e.printStackTrace();; } }
关于事务控制
持久化与非持久化
通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置
持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
与springboot整合
在application.properties文件中加入spring.activemq.broker-url=tcp://mq.server.com:61616
配置类ActiveMQConfig:项目启动的时候加载并执行里面所有的方法
@Configuration public class ActiveMQConfig { @Value("${spring.activemq.broker-url:disabled}") String brokerURL ; @Value("${activemq.listener.enable:disabled}") String listenerEnable; @Bean public ActiveMQUtil getActiveMQUtil() throws JMSException { if(brokerURL.equals("disabled")){ return null; } ActiveMQUtil activeMQUtil=new ActiveMQUtil(); activeMQUtil.init(brokerURL); return activeMQUtil; } //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); if(!listenerEnable.equals("true")){ return null; } factory.setConnectionFactory(activeMQConnectionFactory); //设置并发数 factory.setConcurrency("5"); //重连间隔时间 factory.setRecoveryInterval(5000L); factory.setSessionTransacted(false); factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return factory; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory ( ){ /* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){ url=brokerURL; }*/ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( brokerURL); return activeMQConnectionFactory; } }
工具类ActiveMQUtil
public class ActiveMQUtil { PooledConnectionFactory pooledConnectionFactory=null; public ConnectionFactory init(String brokerUrl) { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); //加入连接池 pooledConnectionFactory=new PooledConnectionFactory(factory); //出现异常时重新连接 pooledConnectionFactory.setReconnectOnException(true); // pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setExpiryTimeout(10000); return pooledConnectionFactory; } public ConnectionFactory getConnectionFactory(){ return pooledConnectionFactory; } }
案例:
controller:
@RequestMapping("/alipay/callback/return") public String callBackReturn(HttpServletRequest request,Map<String,String> paramsMap){// 页面同步反转的回调 String out_trade_no = request.getParameter("out_trade_no"); String trade_no = request.getParameter("trade_no"); String sign = request.getParameter("sign"); try { boolean b = AlipaySignature.rsaCheckV1(paramsMap,AlipayConfig.alipay_public_key,AlipayConfig.charset,AlipayConfig.sign_type);// 对支付宝回调签名的校验 } catch (AlipayApiException e) { e.printStackTrace(); } // 修改支付信息 PaymentInfo paymentInfo = new PaymentInfo(); paymentInfo.setPaymentStatus("已支付"); paymentInfo.setCallbackContent(request.getQueryString()); paymentInfo.setOutTradeNo(out_trade_no); paymentInfo.setAlipayTradeNo(trade_no); paymentInfo.setCallbackTime(new Date()); //这里使用Queue队列 // 发送系统消息,出发并发商品支付业务服务O2O消息队列 paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),trade_no); paymentService.updatePayment(paymentInfo); return "finish"; }
servcieimpl:
servcieimpl: @Ov