注意点1 ,设备证书的 SN 码如何获取 ,也就是绑定设备需要的
也就是这个
方法
查看设备证书 SN openssl x509 -noout -text -in device-1.crt
就是这个 ,去掉中间的冒号
代码实现
pom.xml 配置
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.61</version></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.13</version></dependency>
证书准备
根证书root
物联网平台根证书,可以从官网文档中下载 https://help.aliyun.com/document_detail/73742.html
设备证书
设备私钥
代码实现
/* * Copyright © 2019 Alibaba. All rights reserved. */ package com.aliyun.iot.demo; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.security.KeyFactory; import java.security.KeyStore; import java.security.PrivateKey; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.security.spec.PKCS8EncodedKeySpec; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import org.apache.commons.codec.binary.Base64; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import com.alibaba.fastjson.JSONObject; /** * mqtt客户端直连阿里云物联网平台,基于eclipse paho开发 <br> * 基于X.509认证接入 https://help.aliyun.com/document_detail/140588.html */ public class IotMqttClientWithAuthByX509 { // ===================需要用户填写的参数开始=========================== // 站点id,根据实际站点获取对应id,https://help.aliyun.com/document_detail/40654.html private static String regionId = "cn-shanghai"; // 目前只支持上海,也就是华东2节点 // ===================需要用户填写的参数结束=========================== // 设备证书 private String certPath = ""; // 设备私钥,无密码 private String privateKeyPath = ""; // 密码固定为空 private String privateKeyPassword = ""; // X.509认证返回信息的topic,无需创建,无需订阅,直接使用 private static final String AUTH_TOPIC = "/ext/auth/identity/response"; // 设备productKey,用于接受平台下发的productKey,无需填写 private static String productKey = ""; // 设备deviceName,用于接受平台下发的deviceName,无需填写 private static String deviceName = ""; // mqtt客户端 private MqttClient sampleClient = null; /** * 建立mqtt连接 * * @param certPath 证书路径 * @param privateKeyPath 私钥路径 * @param privateKeyPassword 私钥密码,目前固定为空 */ public void connect(String certPath, String privateKeyPath, String privateKeyPassword) { this.certPath = certPath; this.privateKeyPath = privateKeyPath; this.privateKeyPassword = privateKeyPassword; // 接入域名 String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883"; // 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内 String clientId = "."; // 只能securemode=2表示使用TLS String clientOpts = "|securemode=2|"; // mqtt接入客户端ID String mqttClientId = clientId + clientOpts; // 建立mqtt连接,使用证书认证,所以不需要username和password connect(broker, mqttClientId, "", ""); } /** * 建立mqtt连接 * * @param serverURL 连接服务器地址 * @param clientId mqtt接入客户端ID * @param username mqtt接入用户名 * @param password mqtt接入密码 */ protected void connect(String serverURL, String clientId, String username, String password) { try { MemoryPersistence persistence = new MemoryPersistence(); sampleClient = new MqttClient(serverURL, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setMqttVersion(4);// MQTT 3.1.1 connOpts.setUserName(username);// 用户名 connOpts.setPassword(password.toCharArray());// 密码 connOpts.setSocketFactory(createSSLSocket()); // 使用TLS,需要下载根证书root.crt,设置securemode=2 connOpts.setCleanSession(false); // 不清理离线消息,qos=1的消息在设备离线期间会保存在云端 connOpts.setAutomaticReconnect(false); // 作为demo关闭自动重连,生产环境强烈建议开启自动重连 connOpts.setKeepAliveInterval(300); // 设置心跳,建议300秒 // 先设置回调,如果是先connect后设置回调,可能会导致消息到达时回调还没准备好,这样消息就丢失了 sampleClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 只处理X.509认证返回信息 if (AUTH_TOPIC.equals(topic)) { JSONObject json = JSONObject .parseObject(new String(message.getPayload(), StandardCharsets.UTF_8)); productKey = json.getString("productKey"); deviceName = json.getString("deviceName"); } else { // 其他下行消息处理,强烈建议另起线程处理,以免回调堵塞 } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable cause) { } }); System.out.println("Connecting to broker: " + serverURL); sampleClient.connect(connOpts); System.out.print("Connected: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); } catch (MqttException e) { System.out.print("connect failed: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); System.out.println("reason " + e.getReasonCode()); System.out.println("msg " + e.getMessage()); System.out.println("loc " + e.getLocalizedMessage()); System.out.println("cause " + e.getCause()); System.out.println("excep " + e); e.printStackTrace(); } catch (Exception e) { System.out.print("connect exception: clientId=" + clientId); System.out.println(",username=" + username + ",password=" + password); System.out.println("msg " + e.getMessage()); e.printStackTrace(); } } /** * 发布消息,默认qos=0 * * @param topic 发布消息的topic * @param payload 发布的消息内容 */ public void publish(String topic, String payload) { byte[] content = payload.getBytes(StandardCharsets.UTF_8); publish(topic, 0, content); } /** * 发布消息 * * @param topic 发布消息的topic * @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2 * @param payload 发布的消息内容 */ public void publish(String topic, int qos, byte[] payload) { MqttMessage message = new MqttMessage(payload); message.setQos(qos); try { sampleClient.publish(topic, message); System.out.println("Message published: topic=" + topic + ",qos=" + qos); } catch (MqttException e) { System.out.println("publish failed: topic=" + topic + ",qos=" + qos); System.out.println("reason " + e.getReasonCode()); System.out.println("msg " + e.getMessage()); System.out.println("loc " + e.getLocalizedMessage()); System.out.println("cause " + e.getCause()); System.out.println("excep " + e); e.printStackTrace(); } } protected SSLSocketFactory createSSLSocket() throws Exception { // CA根证书,可以从官网下载 https://help.aliyun.com/document_detail/73742.html // 设备证书,可以从控制台设备信息下载 // CA certificate is used to authenticate server InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream("/root.crt"); CertificateFactory cf = CertificateFactory.getInstance("X.509"); Certificate ca = cf.generateCertificate(in); in.close(); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("ca", ca); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(keyStore); // client key and certificates are sent to server so it can authenticate us InputStream certIn = IotMqttClientWithAuthByX509.class.getResourceAsStream(certPath); CertificateFactory certCf = CertificateFactory.getInstance("X.509"); Certificate certCa = certCf.generateCertificate(certIn); certIn.close(); KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", certCa); PrivateKey privateKey = getPrivateKey(privateKeyPath); ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] { certCa }); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, privateKeyPassword.toCharArray()); SSLContext context = SSLContext.getInstance("TLSV1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); SSLSocketFactory socketFactory = context.getSocketFactory(); return socketFactory; } private PrivateKey getPrivateKey(String path) throws Exception { byte[] buffer = Base64.decodeBase64(getPem(path)); PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer); KeyFactory keyFactory = KeyFactory.getInstance("RSA"); return keyFactory.generatePrivate(keySpec); } private String getPem(String path) throws Exception { InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream(path); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String readLine = null; StringBuilder sb = new StringBuilder(); while ((readLine = br.readLine()) != null) { if (readLine.charAt(0) == '-') { continue; } else { sb.append(readLine); sb.append('\r'); } } in.close(); return sb.toString(); } /** * 1、设备证书从控制台设备列表页面下载,下载包包含cer和key两个文件 <br> * 2、私钥是pkcs1格式的,java原生ssl需要转成pkcs8格式使用 <br> * 3、mqttClientId连接参数只需要也必需要填写 |securemode=2|,表示只能使用TLS <br> * 4、mqttUsername和mqttPassword填写"",因为使用证书认证就不需要这两个参数了 <br> * 5、连接成功后会下发pk+dn信息到/ext/auth/identity/response,用于组装topic进行消息收发 <br> * * @param args */ public static void main(String[] args) { IotMqttClientWithAuthByX509 client = new IotMqttClientWithAuthByX509(); // 填写设备证书路径信息 client.connect("您的设备证书路径", "您的证书私钥路径", ""); // 连接之后,睡两秒,保证pk+dn已经下发,不然消息收发是topic的pk+dn字段是空的 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 发送消息验证连接是否成功 String updateTopic = "/" + productKey + "/" + deviceName + "/user/update"; client.publish(updateTopic, "hello mqtt with X.509 auth"); } }