使用activeMQ进行Android推送
activeMQ下载地址:http://activemq.apache.org/download.html
下载后是一个压缩包:apache-activemq-5.9.0-bin.zip
启动方式:
解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务
启动之后:
android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件
但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:
使用方法:点击[启动]按钮,开始接收推送消息
对应的主类是:MqttSwing,用于接收推送消息.
我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:
使用方法:点击[连接]按钮,才可以发送推送消息
对应的主类:PusherApp,用于发送推送消息.
核心代码介绍如下.
客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)
方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)
- /***
- * 客户端和activeMQ服务器建立连接
- * @param BROKER_URL
- * @param clientId : 用于标识客户端,相当于ios中的device token
- * @param TOPIC
- * @param isCleanSession :false--可以接受离线消息;
- * @return 是否启动成功
- */
- private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){
- try {
- ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);
- mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
- MqttConnectOptions options= new MqttConnectOptions();
- options.setCleanSession(isCleanSession);//mqtt receive offline message
- ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);
- options.setKeepAliveInterval(30);
- //推送回调类,在此类中处理消息,用于消息监听
- mqttClient.setCallback(new MyCallBack(MqttSwing.this));
- boolean isSuccess=false;
- try {
- mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME
- isSuccess=true;
- } catch (Exception e) {
- if(isPrintException){
- e.printStackTrace();
- }
- }
- if(!isSuccess){
- String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";
- ComponentUtil.appendResult(resultTextPane, message, true);
- GUIUtil23.warningDialog(message);
- return false;
- }else{
- //Subscribe to topics
- mqttClient.subscribe(new String[]{TOPIC,clientId});
- System.out.println("topic:"+TOPIC+", "+(clientId));
- ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+", "+(clientId), true);
- }
- } catch (MqttException e) {
- if(isPrintException){
- e.printStackTrace();}
- GUIUtil23.errorDialog(e.getMessage());
- return false;
- }
- return true;
- }
推送消息到来时的回调类:MyCallBack
- package com.mqtt.hw.callback;
- import org.apache.commons.lang.StringEscapeUtils;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- import com.mqtt.hw.MqttSwing;
- import com.time.util.TimeHWUtil;
- public class MyCallBack implements MqttCallback {
- private MqttSwing mqttSwing;
- public MyCallBack(MqttSwing mqttSwing) {
- super();
- this.mqttSwing = mqttSwing;
- }
- @Override
- public void connectionLost(Throwable cause) {
- }
- @Override
- public void messageArrived(MqttTopic topic, MqttMessage message)
- throws Exception {
- System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());
- String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));
- System.out.println("message:"+messageStr);
- this.mqttSwing.receiveMessage(messageStr);
- //使窗口处于激活状态
- }
- @Override
- public void deliveryComplete(MqttDeliveryToken token) {
- }
- }
推送者与activeMQ建立连接:
- /**
- * 初始化connection和session
- *
- * @throws Exception
- */
- private void init(/* String mqIp,boolean transacted */) throws Exception {
- if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {
- return;
- }
- String transactedStr = transactedTextField.getText();
- boolean transacted = false;
- if (ValueWidget.isNullOrEmpty(transactedStr)) {
- transacted = false;
- } else {
- transacted = Boolean.parseBoolean(transactedStr);
- }
- String message = "transacted:" + transacted;
- ComponentUtil.appendResult(resultTextArea, message, true);
- System.out.println(message);
- String brokerUrl = String.format(BROKER_URL,
- serverIpTextField.getText());
- // 创建链接工厂
- TopicConnectionFactory factory = new ActiveMQConnectionFactory(
- ActiveMQConnection.DEFAULT_USER,
- ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);
- ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);
- // 通过工厂创建一个连接
- connection = factory.createTopicConnection();
- // 启动连接
- connection.start();
- ComponentUtil.appendResult(resultTextArea, "启动connection 成功", true);
- // 创建一个session会话 transacted
- session = connection.createTopicSession(
- transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);
- }
项目源代码见附件mqtt_swing.zip
手机android客户端(测试推送)见附件android-mqtt-push-master.zip
也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo
下载
详细配置参阅附件mqtt推送详解.zip
依赖的jar包
io0007-find_progess-0.0.8.4-SNAPSHOT.jar
io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar