activeMQ 推送之mqtt客户端-阿里云开发者社区

开发者社区> 开发与运维> 正文

activeMQ 推送之mqtt客户端

简介:

使用activeMQ进行Android推送

activeMQ下载地址:http://activemq.apache.org/download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击[启动]按钮,开始接收推送消息
 对应的主类是:
MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

Java代码  收藏代码
  1. /*** 
  2.      * 客户端和activeMQ服务器建立连接 
  3.      * @param BROKER_URL 
  4.      * @param clientId : 用于标识客户端,相当于ios中的device token 
  5.      * @param TOPIC 
  6.      * @param isCleanSession :false--可以接受离线消息; 
  7.      * @return 是否启动成功  
  8.      */  
  9.     private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){  
  10.         try {  
  11.             ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);  
  12.             mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());  
  13.             MqttConnectOptions options= new MqttConnectOptions();  
  14.             options.setCleanSession(isCleanSession);//mqtt receive offline message  
  15.             ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);  
  16.             options.setKeepAliveInterval(30);  
  17.             //推送回调类,在此类中处理消息,用于消息监听  
  18.             mqttClient.setCallback(new MyCallBack(MqttSwing.this));  
  19.             boolean isSuccess=false;  
  20.             try {  
  21.                 mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME  
  22.                 isSuccess=true;  
  23.             } catch (Exception e) {  
  24.                 if(isPrintException){  
  25.                     e.printStackTrace();  
  26.                 }  
  27.             }  
  28.             if(!isSuccess){  
  29.                 String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";  
  30.                 ComponentUtil.appendResult(resultTextPane, message, true);  
  31.                 GUIUtil23.warningDialog(message);  
  32.                 return false;  
  33.             }else{  
  34.             //Subscribe to topics   
  35.                 mqttClient.subscribe(new String[]{TOPIC,clientId});  
  36.                  
  37.                 System.out.println("topic:"+TOPIC+",  "+(clientId));  
  38.                 ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);  
  39.             }  
  40.   
  41.         } catch (MqttException e) {  
  42.             if(isPrintException){  
  43.             e.printStackTrace();}  
  44.             GUIUtil23.errorDialog(e.getMessage());  
  45.             return false;  
  46.         }  
  47.         return true;  
  48.     }  

 

 

推送消息到来时的回调类:MyCallBack

 

Java代码  收藏代码
  1. package com.mqtt.hw.callback;  
  2.   
  3. import org.apache.commons.lang.StringEscapeUtils;  
  4. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  5. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  7. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  8.   
  9. import com.mqtt.hw.MqttSwing;  
  10. import com.time.util.TimeHWUtil;  
  11.   
  12. public class MyCallBack implements MqttCallback {  
  13.     private MqttSwing mqttSwing;  
  14.       
  15.       
  16.       
  17.     public MyCallBack(MqttSwing mqttSwing) {  
  18.         super();  
  19.         this.mqttSwing = mqttSwing;  
  20.     }  
  21.   
  22.     @Override  
  23.     public void connectionLost(Throwable cause) {  
  24.           
  25.     }  
  26.   
  27.     @Override  
  28.     public void messageArrived(MqttTopic topic, MqttMessage message)  
  29.             throws Exception {  
  30.         System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());  
  31.         String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));  
  32.         System.out.println("message:"+messageStr);  
  33.         this.mqttSwing.receiveMessage(messageStr);  
  34.         //使窗口处于激活状态  
  35.   
  36.     }  
  37.   
  38.     @Override  
  39.     public void deliveryComplete(MqttDeliveryToken token) {  
  40.           
  41.     }  
  42.   
  43. }  

 

 

推送者与activeMQ建立连接:

 

Java代码  收藏代码
  1. /** 
  2.      * 初始化connection和session 
  3.      *  
  4.      * @throws Exception 
  5.      */  
  6.     private void init(/* String mqIp,boolean transacted */throws Exception {  
  7.         if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {  
  8.             return;  
  9.         }  
  10.         String transactedStr = transactedTextField.getText();  
  11.         boolean transacted = false;  
  12.         if (ValueWidget.isNullOrEmpty(transactedStr)) {  
  13.             transacted = false;  
  14.         } else {  
  15.             transacted = Boolean.parseBoolean(transactedStr);  
  16.         }  
  17.         String message = "transacted:" + transacted;  
  18.         ComponentUtil.appendResult(resultTextArea, message, true);  
  19.         System.out.println(message);  
  20.         String brokerUrl = String.format(BROKER_URL,  
  21.                 serverIpTextField.getText());  
  22.         // 创建链接工厂  
  23.         TopicConnectionFactory factory = new ActiveMQConnectionFactory(  
  24.                 ActiveMQConnection.DEFAULT_USER,  
  25.                 ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);  
  26.         ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);  
  27.         // 通过工厂创建一个连接  
  28.           
  29.         connection = factory.createTopicConnection();  
  30.         // 启动连接  
  31.         connection.start();  
  32.         ComponentUtil.appendResult(resultTextArea, "启动connection 成功"true);  
  33.         // 创建一个session会话 transacted  
  34.         session = connection.createTopicSession(  
  35.                 transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);  
  36.   
  37.     }  

 

 

项目源代码见附件mqtt_swing.zip

手机android客户端(测试推送)见附件android-mqtt-push-master.zip

也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件mqtt推送详解.zip

 依赖的jar包

io0007-find_progess-0.0.8.4-SNAPSHOT.jar

io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章