activeMQ 推送之mqtt客户端

简介:

使用activeMQ进行Android推送

activeMQ下载地址:http://activemq.apache.org/download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击[启动]按钮,开始接收推送消息
 对应的主类是:
MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

Java代码   收藏代码
  1. /*** 
  2.      * 客户端和activeMQ服务器建立连接 
  3.      * @param BROKER_URL 
  4.      * @param clientId : 用于标识客户端,相当于ios中的device token 
  5.      * @param TOPIC 
  6.      * @param isCleanSession :false--可以接受离线消息; 
  7.      * @return 是否启动成功  
  8.      */  
  9.     private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){  
  10.         try {  
  11.             ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);  
  12.             mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());  
  13.             MqttConnectOptions options= new MqttConnectOptions();  
  14.             options.setCleanSession(isCleanSession);//mqtt receive offline message  
  15.             ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);  
  16.             options.setKeepAliveInterval(30);  
  17.             //推送回调类,在此类中处理消息,用于消息监听  
  18.             mqttClient.setCallback(new MyCallBack(MqttSwing.this));  
  19.             boolean isSuccess=false;  
  20.             try {  
  21.                 mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME  
  22.                 isSuccess=true;  
  23.             } catch (Exception e) {  
  24.                 if(isPrintException){  
  25.                     e.printStackTrace();  
  26.                 }  
  27.             }  
  28.             if(!isSuccess){  
  29.                 String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";  
  30.                 ComponentUtil.appendResult(resultTextPane, message, true);  
  31.                 GUIUtil23.warningDialog(message);  
  32.                 return false;  
  33.             }else{  
  34.             //Subscribe to topics   
  35.                 mqttClient.subscribe(new String[]{TOPIC,clientId});  
  36.                  
  37.                 System.out.println("topic:"+TOPIC+",  "+(clientId));  
  38.                 ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);  
  39.             }  
  40.   
  41.         } catch (MqttException e) {  
  42.             if(isPrintException){  
  43.             e.printStackTrace();}  
  44.             GUIUtil23.errorDialog(e.getMessage());  
  45.             return false;  
  46.         }  
  47.         return true;  
  48.     }  

 

 

推送消息到来时的回调类:MyCallBack

 

Java代码   收藏代码
  1. package com.mqtt.hw.callback;  
  2.   
  3. import org.apache.commons.lang.StringEscapeUtils;  
  4. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  5. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  7. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  8.   
  9. import com.mqtt.hw.MqttSwing;  
  10. import com.time.util.TimeHWUtil;  
  11.   
  12. public class MyCallBack implements MqttCallback {  
  13.     private MqttSwing mqttSwing;  
  14.       
  15.       
  16.       
  17.     public MyCallBack(MqttSwing mqttSwing) {  
  18.         super();  
  19.         this.mqttSwing = mqttSwing;  
  20.     }  
  21.   
  22.     @Override  
  23.     public void connectionLost(Throwable cause) {  
  24.           
  25.     }  
  26.   
  27.     @Override  
  28.     public void messageArrived(MqttTopic topic, MqttMessage message)  
  29.             throws Exception {  
  30.         System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());  
  31.         String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));  
  32.         System.out.println("message:"+messageStr);  
  33.         this.mqttSwing.receiveMessage(messageStr);  
  34.         //使窗口处于激活状态  
  35.   
  36.     }  
  37.   
  38.     @Override  
  39.     public void deliveryComplete(MqttDeliveryToken token) {  
  40.           
  41.     }  
  42.   
  43. }  

 

 

推送者与activeMQ建立连接:

 

Java代码   收藏代码
  1. /** 
  2.      * 初始化connection和session 
  3.      *  
  4.      * @throws Exception 
  5.      */  
  6.     private void init(/* String mqIp,boolean transacted */throws Exception {  
  7.         if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {  
  8.             return;  
  9.         }  
  10.         String transactedStr = transactedTextField.getText();  
  11.         boolean transacted = false;  
  12.         if (ValueWidget.isNullOrEmpty(transactedStr)) {  
  13.             transacted = false;  
  14.         } else {  
  15.             transacted = Boolean.parseBoolean(transactedStr);  
  16.         }  
  17.         String message = "transacted:" + transacted;  
  18.         ComponentUtil.appendResult(resultTextArea, message, true);  
  19.         System.out.println(message);  
  20.         String brokerUrl = String.format(BROKER_URL,  
  21.                 serverIpTextField.getText());  
  22.         // 创建链接工厂  
  23.         TopicConnectionFactory factory = new ActiveMQConnectionFactory(  
  24.                 ActiveMQConnection.DEFAULT_USER,  
  25.                 ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);  
  26.         ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);  
  27.         // 通过工厂创建一个连接  
  28.           
  29.         connection = factory.createTopicConnection();  
  30.         // 启动连接  
  31.         connection.start();  
  32.         ComponentUtil.appendResult(resultTextArea, "启动connection 成功"true);  
  33.         // 创建一个session会话 transacted  
  34.         session = connection.createTopicSession(  
  35.                 transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);  
  36.   
  37.     }  

 

 

项目源代码见附件mqtt_swing.zip

手机android客户端(测试推送)见附件android-mqtt-push-master.zip

也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件mqtt推送详解.zip

 依赖的jar包

io0007-find_progess-0.0.8.4-SNAPSHOT.jar

io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
119 1
|
6月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
安全 网络性能优化
MQTT 客户端 MQTT.fx 使用说明
MQTT 客户端 MQTT.fx 使用说明
460 0
|
6月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
7月前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。