使用阿里云消息队列
控制台地址:http://ons.console.aliyun.com/#/home/topic

(1)生成Producer ID
点击"申请发布"

示例代码:
- package com.alibaba.ons.demo;
-
- import java.util.Properties;
-
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.SendResult;
-
- public class ProducerClient {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ProducerId, "PID_whuang");
- properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
- properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
- Producer producer = ONSFactory.createProducer(properties);
-
-
- producer.start();
- Message msg = new Message(
-
- "com_hbjltv",
-
-
- "TagA",
-
-
- "Hello ONS".getBytes()
- );
-
-
-
-
- msg.setKey("ORDERID_100");
-
-
- SendResult sendResult = producer.send(msg);
- System.out.println(sendResult);
-
-
-
- producer.shutdown();
- }
- }
(2)生成Consumer ID
点击"申请订阅"

示例代码:
- public class ConsumerTest {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");
- properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
- properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
- Consumer consumer = ONSFactory.createConsumer(properties);
- consumer.subscribe("com_hbjltv", "*", new MessageListener() {
- public Action consume(Message message, ConsumeContext context) {
- System.out.println("Receive: " + message);
- return Action.CommitMessage;
- }
- });
- consumer.start();
- System.out.println("Consumer Started");
- }
- }
(3) clientId 的限制
阿里云消息队列对clientId的名称有严格限制:
(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,
例如:
CID_tv_mobile@@@86458fd 是合法的
CID_tv_mobile@@86458fd 是非法的,因为只有两个@
(b)总长度不能超过23个字符
例如
CID_tv_mobile@@@86458_A是合法的
CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符

(4)在手机端(客户端)增加订阅逻辑
- package com.service;
-
- import java.security.InvalidKeyException;
- import java.security.NoSuchAlgorithmException;
-
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
-
- import android.app.Service;
- import android.content.Context;
- import android.content.Intent;
- import android.content.SharedPreferences;
- import android.os.IBinder;
-
- import com.common.util.SystemHWUtil;
- import com.dict.Constants3;
- import com.jianli.R;
- import com.push.PushCallback;
- import com.string.widget.util.ValueWidget;
- import com.util.MacSignature;
- import com.util.ShopUtil;
-
-
-
-
- public class MQTTService extends Service {
-
-
-
-
- public static String BROKER_URL_FORMAT = "tcp://%s:%s";
-
-
-
-
-
-
-
-
- public static String clientId = null;
-
-
-
-
- public static final String TOPIC = "com_hbjltv";
- private MqttClient mqttClient;
-
-
-
-
- private boolean online = false;
- boolean isAliyun=false;
-
- public IBinder onBind(Intent intent) {
- return null;
- }
-
- @Override
- public void onCreate() {
- super.onCreate();
- }
-
- private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{
- return new MqttClient(serverURL, clientId,
- new MemoryPersistence());
- }
-
-
-
-
-
-
-
-
-
-
- private void connectAndSubscribe(String serverURL, String clientId,
- boolean isAllowOffline, String username,
- String password) throws MqttException {
-
- if(isAliyun){
- if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){
- return;
- }
- }
- mqttClient = createMqttClient(serverURL, clientId);
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(!isAllowOffline);
- if (ValueWidget.isNullOrEmpty(username)) {
- username = null;
- }
- String sign=null;
- if(isAliyun){
- try {
- sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);
- password=sign;
- } catch (InvalidKeyException e) {
- e.printStackTrace();
- } catch (NoSuchAlgorithmException e) {
- e.printStackTrace();
- }
-
- }
- if (ValueWidget.isNullOrEmpty(password)) {
- password = null;
- } else {
- options.setPassword(password.toCharArray());
- }
- options.setUserName(username);
- options.setConnectionTimeout(10);
- options.setKeepAliveInterval(10);
- if(null==mqttClient){
- mqttClient = createMqttClient(serverURL, clientId);
- }
- mqttClient.setCallback(new PushCallback(this));
- boolean isSuccess=false;
- mqttClient.connect(options);
- isSuccess=true;
-
-
- if(null==mqttClient){
- mqttClient = createMqttClient(serverURL, clientId);
- }
- if(isAliyun){
- final String p2ptopic = TOPIC+"/p2p/";
-
- final String[] topicFilters=new String[]{TOPIC,p2ptopic};
- mqttClient.subscribe(topicFilters);
- }else{
- mqttClient.subscribe(new String[] { TOPIC, clientId });
- }
- }
-
- @Override
- public void onStart(Intent intent, int startId) {
- final boolean isRestart=intent.getBooleanExtra("isRestart", false);
- ShopUtil.logger2("restart MQTT service:"+isRestart);
-
-
-
-
-
-
- Context context = getApplicationContext();
- clientId = ShopUtil.getIMEI(context);
-
-
-
- SharedPreferences preferences = getApplicationContext()
- .getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,
- Context.MODE_PRIVATE);
- final String ip ="mqtt.ons.aliyun.com";
- final String port = preferences.getString("pushserver_port", "1883");
- isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false"));
-
- System.out.println("push ip:"+ip);
- new Thread(new Runnable() {
-
-
-
-
-
-
-
- private int tryTime = 5;
-
- @Override
- public void run() {
- System.out.println(tryTime+","+mqttClient+","+isOnline() );
- while (tryTime > 0
- && (!isOnline() || mqttClient == null || (!mqttClient
- .isConnected())||isRestart)) {
- try {
- ShopUtil.logger2("start push service");
- ShopUtil.logger2("push server:"+ip);
- String prefix=Constants3.CONSUMER_ID_TV+"@@@";
- int remainingLength=23-prefix.length();
- String suffix=null;
- if(clientId.length()>remainingLength){
- suffix=clientId.substring(0,remainingLength);
- }else{
- suffix=clientId;
- }
- String clientId2=prefix+suffix;
- connectAndSubscribe(String.format(
- MQTTService.BROKER_URL_FORMAT, ip, port),
- clientId2, true, "", "");
- ShopUtil.logger2("clientId:" + clientId2);
- ShopUtil.logger2("succeed to connect to activeMQ");
- setOnline(true);
- } catch (MqttException e) {
- setOnline(false);
- mqttClient=null;
- e.printStackTrace();
- ShopUtil.logger2("抛异常:"+e.getMessage());
- ShopUtil.logger2("ip:" + ip + " ,port:" + port);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- tryTime--;
- }
-
- }
- }).start();
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
-
- @Override
- public void onDestroy() {
- setOnline(false);
-
- try {
- ShopUtil.logger2("MQTTService destory");
- mqttClient.disconnect(0);
- } catch (MqttException e) {
-
-
-
- e.printStackTrace();
-
- }
- mqttClient = null;
- stopForeground(true);
- Intent intent = new Intent("com.dbjtech.waiqin.destroy");
- sendBroadcast(intent);
- }
-
-
-
- public boolean isOnline() {
- return online;
- }
-
- public void setOnline(boolean online) {
- this.online = online;
- }
- @Override
- public int onStartCommand(Intent intent, int flags, int startId) {
- flags = START_STICKY;
- return super.onStartCommand(intent, flags, startId);
- }
- }