使用阿里云消息队列
控制台地址:http://ons.console.aliyun.com/#/home/topic
(1)生成Producer ID
点击"申请发布"
示例代码:
- package com.alibaba.ons.demo;
- import java.util.Properties;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.SendResult;
- public class ProducerClient {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ProducerId, "PID_whuang");
- properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
- properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
- Producer producer = ONSFactory.createProducer(properties);
- //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
- producer.start();
- Message msg = new Message(
- //Message Topic
- "com_hbjltv",
- //Message Tag,
- //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
- "TagA",
- //Message Body
- //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
- "Hello ONS".getBytes()
- );
- // 设置代表消息的业务关键属性,请尽可能全局唯一。
- // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
- // 注意:不设置也不会影响消息正常收发
- msg.setKey("ORDERID_100");
- //发送消息,只要不抛异常就是成功
- SendResult sendResult = producer.send(msg);
- System.out.println(sendResult);
- // 在应用退出前,销毁Producer对象
- // 注意:如果不销毁也没有问题
- producer.shutdown();
- }
- }
(2)生成Consumer ID
点击"申请订阅"
示例代码:
- public class ConsumerTest {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");
- properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
- properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("com_hbjltv", "*", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println("Receive: " + message);
- return Action.CommitMessage;
- }
- });
- consumer.start();
- System.out.println("Consumer Started");
- }
- }
(3) clientId 的限制
阿里云消息队列对clientId的名称有严格限制:
(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,
例如:
CID_tv_mobile@@@86458fd 是合法的
CID_tv_mobile@@86458fd 是非法的,因为只有两个@
(b)总长度不能超过23个字符
例如
CID_tv_mobile@@@86458_A是合法的
CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符
(4)在手机端(客户端)增加订阅逻辑
- package com.service;
- import java.security.InvalidKeyException;
- import java.security.NoSuchAlgorithmException;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
- import android.app.Service;
- import android.content.Context;
- import android.content.Intent;
- import android.content.SharedPreferences;
- import android.os.IBinder;
- import com.common.util.SystemHWUtil;
- import com.dict.Constants3;
- import com.jianli.R;
- import com.push.PushCallback;
- import com.string.widget.util.ValueWidget;
- import com.util.MacSignature;
- import com.util.ShopUtil;
- /**
- * @author Dominik Obermaier
- */
- public class MQTTService extends Service {
- // public static final String BROKER_URL =
- // "tcp://broker.mqttdashboard.com:1883";
- // public static String BROKER_URL = "tcp://172.16.15.50:1883";
- public static String BROKER_URL_FORMAT = "tcp://%s:%s";
- // public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";
- /*
- * In a real application, you should get an Unique Client ID of the device
- * and use this, see
- * http://android-developers.blogspot.de/2011/03/identifying
- * -app-installations.html
- */
- public static String clientId = null;
- /**
- * 不能含有英文句点,可以包含下划线
- */
- public static final String TOPIC = "com_hbjltv";
- private MqttClient mqttClient;
- // private String ip="182.92.80.122";
- /***
- * 是否连接上activeMQ
- */
- private boolean online = false;
- boolean isAliyun=false;
- public IBinder onBind(Intent intent) {
- return null;
- }
- @Override
- public void onCreate() {
- super.onCreate();
- }
- private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{
- return new MqttClient(serverURL, clientId,
- new MemoryPersistence());
- }
- /***
- *
- * @param serverURL
- * @param clientId
- * : 最大长度:23
- * @param isAllowOffline
- * @param username
- * @param password
- * @throws MqttException
- */
- private void connectAndSubscribe(String serverURL, String clientId,
- /* String topicFilter, */boolean isAllowOffline, String username,
- String password) throws MqttException {
- if(isAliyun){
- if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){
- return;
- }
- }
- mqttClient = createMqttClient(serverURL, clientId);
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(!isAllowOffline);// mqtt receive offline message
- if (ValueWidget.isNullOrEmpty(username)) {
- username = null;
- }
- String sign=null;
- if(isAliyun){
- try {
- sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);
- password=sign;
- } catch (InvalidKeyException e) {
- e.printStackTrace();
- } catch (NoSuchAlgorithmException e) {
- e.printStackTrace();
- }
- }
- if (ValueWidget.isNullOrEmpty(password)) {
- password = null;
- } else {
- options.setPassword(password.toCharArray());
- }
- options.setUserName(username);
- options.setConnectionTimeout(10);
- options.setKeepAliveInterval(10);
- if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
- mqttClient = createMqttClient(serverURL, clientId);
- }
- mqttClient.setCallback(new PushCallback(this));
- boolean isSuccess=false;
- mqttClient.connect(options);
- isSuccess=true;
- // Subscribe to all subtopics of homeautomation
- // mqttClient.subscribe(topicFilter);
- if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
- mqttClient = createMqttClient(serverURL, clientId);
- }
- if(isAliyun){
- final String p2ptopic = TOPIC+"/p2p/";
- //同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式
- final String[] topicFilters=new String[]{TOPIC,p2ptopic};
- mqttClient.subscribe(topicFilters);
- }else{
- mqttClient.subscribe(new String[] { TOPIC, clientId });
- }
- }
- @Override
- public void onStart(Intent intent, int startId) {
- final boolean isRestart=intent.getBooleanExtra("isRestart", false);
- ShopUtil.logger2("restart MQTT service:"+isRestart);
- // super.onStart(intent, startId);
- // if (intent == null) {//重启服务时intent 确实为空
- // Log.d(Constants.LOG_TAG, "intent is null");
- // return;
- // }
- Context context = getApplicationContext();
- clientId = ShopUtil.getIMEI(context);
- // Bundle bundle=intent.getExtras();
- // String ip=bundle.getString(Constants.ACTIVEMQ_IP);
- // final String ip = context.getString(R.string.pushserver_ip);
- SharedPreferences preferences = getApplicationContext()
- .getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,
- Context.MODE_PRIVATE);
- final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip));
- final String port = preferences.getString("pushserver_port", "1883");
- isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false"));
- // String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC);
- System.out.println("push ip:"+ip);
- new Thread(new Runnable() {
- /****
- * 尝试连接的次数,为什么要尝试连接几次那?
- * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br />
- * (2)之前连接上,然后断开wifi,然后又启动wifi,<br />
- * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br />
- *
- */
- private int tryTime = 5;
- @Override
- public void run() {
- System.out.println(tryTime+","+mqttClient+","+isOnline() );
- while (tryTime > 0
- && (!isOnline() || mqttClient == null || (!mqttClient
- .isConnected())||isRestart)) {
- try {
- ShopUtil.logger2("start push service");
- ShopUtil.logger2("push server:"+ip);
- String prefix=Constants3.CONSUMER_ID_TV+"@@@";
- int remainingLength=23-prefix.length();
- String suffix=null;
- if(clientId.length()>remainingLength){
- suffix=clientId.substring(0,remainingLength);
- }else{
- suffix=clientId;
- }
- String clientId2=prefix+suffix;
- connectAndSubscribe(String.format(
- MQTTService.BROKER_URL_FORMAT, ip, port),
- clientId2, /* topic, */true, ""/*自己申请的access key*/, ""/*secret*/);
- ShopUtil.logger2("clientId:" + clientId2);
- ShopUtil.logger2("succeed to connect to activeMQ");
- setOnline(true);
- } catch (MqttException e) {
- setOnline(false);
- mqttClient=null;
- e.printStackTrace();
- ShopUtil.logger2("抛异常:"+e.getMessage());
- ShopUtil.logger2("ip:" + ip + " ,port:" + port);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- tryTime--;
- }
- }
- }).start();
- // new Thread(new Runnable() {
- // @Override
- // public void run() {
- // System.out.println("start:"+System.currentTimeMillis());
- // try {
- // Thread.sleep(10000);
- // } catch (InterruptedException e) {
- // e.printStackTrace();
- // }
- // while(true){
- // try {
- // Thread.sleep(10000);
- // if(mqttClient!=null&& !mqttClient.isConnected()){
- // System.out.println("disConnected:"+System.currentTimeMillis());
- // }
- // } catch (InterruptedException e) {
- // e.printStackTrace();
- // }
- // }
- // }
- // }).start();
- }
- @Override
- public void onDestroy() {
- setOnline(false);
- try {
- ShopUtil.logger2("MQTTService destory");
- mqttClient.disconnect(0);
- } catch (MqttException e) {
- // Toast.makeText(getApplicationContext(),
- // "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG)
- // .show();
- e.printStackTrace();
- }
- mqttClient = null;
- stopForeground(true);
- Intent intent = new Intent("com.dbjtech.waiqin.destroy");
- sendBroadcast(intent);
- }
- public boolean isOnline() {
- return online;
- }
- public void setOnline(boolean online) {
- this.online = online;
- }
- @Override
- public int onStartCommand(Intent intent, int flags, int startId) {
- flags = START_STICKY;
- return super.onStartCommand(intent, flags, startId);
- }
- }