上一篇制作完CA之后如何接入平台

注意点1 ,设备证书的 SN 码如何获取 ,也就是绑定设备需要的

也就是这个

上一篇制作完CA之后如何接入平台

方法

查看设备证书 SN openssl x509 -noout -text -in device-1.crt

就是这个 ,去掉中间的冒号

上一篇制作完CA之后如何接入平台



代码实现


pom.xml 配置

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.1</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.61</version>
</dependency>

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.13</version>
</dependency>


证书准备


根证书root

物联网平台根证书,可以从官网文档中下载 https://help.aliyun.com/document_detail/73742.html

设备证书

上一篇制作完CA之后如何接入平台

设备私钥

上一篇制作完CA之后如何接入平台


代码实现





/*   
 * Copyright © 2019 Alibaba. All rights reserved.
 */
package com.aliyun.iot.demo;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.spec.PKCS8EncodedKeySpec;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;

import org.apache.commons.codec.binary.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import com.alibaba.fastjson.JSONObject;

/**
 * mqtt客户端直连阿里云物联网平台,基于eclipse paho开发 <br>
 * 基于X.509认证接入 https://help.aliyun.com/document_detail/140588.html
 */
public class IotMqttClientWithAuthByX509 {

    // ===================需要用户填写的参数开始===========================
    // 站点id,根据实际站点获取对应id,https://help.aliyun.com/document_detail/40654.html
    private static String regionId = "cn-shanghai"; // 目前只支持上海,也就是华东2节点
    // ===================需要用户填写的参数结束===========================

    // 设备证书
    private String certPath = "";
    // 设备私钥,无密码
    private String privateKeyPath = "";
    // 密码固定为空
    private String privateKeyPassword = "";
    // X.509认证返回信息的topic,无需创建,无需订阅,直接使用
    private static final String AUTH_TOPIC = "/ext/auth/identity/response";
    // 设备productKey,用于接受平台下发的productKey,无需填写
    private static String productKey = "";
    // 设备deviceName,用于接受平台下发的deviceName,无需填写
    private static String deviceName = "";

    // mqtt客户端
    private MqttClient sampleClient = null;

    /**
     * 建立mqtt连接
     * 
     * @param certPath 证书路径
     * @param privateKeyPath 私钥路径
     * @param privateKeyPassword 私钥密码,目前固定为空
     */
    public void connect(String certPath, String privateKeyPath, String privateKeyPassword) {

        this.certPath = certPath;
        this.privateKeyPath = privateKeyPath;
        this.privateKeyPassword = privateKeyPassword;

        // 接入域名
        String broker = "ssl://x509.itls." + regionId + ".aliyuncs.com:1883";

        // 表示客户端ID,建议使用设备的MAC地址或SN码,64字符内
        String clientId = ".";

        // 只能securemode=2表示使用TLS
        String clientOpts = "|securemode=2|";

        // mqtt接入客户端ID
        String mqttClientId = clientId + clientOpts;

        // 建立mqtt连接,使用证书认证,所以不需要username和password
        connect(broker, mqttClientId, "", "");
    }

    /**
     * 建立mqtt连接
     * 
     * @param serverURL 连接服务器地址
     * @param clientId mqtt接入客户端ID
     * @param username mqtt接入用户名
     * @param password mqtt接入密码
     */
    protected void connect(String serverURL, String clientId, String username, String password) {
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            sampleClient = new MqttClient(serverURL, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setMqttVersion(4);// MQTT 3.1.1
            connOpts.setUserName(username);// 用户名
            connOpts.setPassword(password.toCharArray());// 密码
            connOpts.setSocketFactory(createSSLSocket()); // 使用TLS,需要下载根证书root.crt,设置securemode=2
            connOpts.setCleanSession(false); // 不清理离线消息,qos=1的消息在设备离线期间会保存在云端
            connOpts.setAutomaticReconnect(false); // 作为demo关闭自动重连,生产环境强烈建议开启自动重连
            connOpts.setKeepAliveInterval(300); // 设置心跳,建议300秒
            // 先设置回调,如果是先connect后设置回调,可能会导致消息到达时回调还没准备好,这样消息就丢失了
            sampleClient.setCallback(new MqttCallback() {

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 只处理X.509认证返回信息
                    if (AUTH_TOPIC.equals(topic)) {
                        JSONObject json = JSONObject
                                .parseObject(new String(message.getPayload(), StandardCharsets.UTF_8));
                        productKey = json.getString("productKey");
                        deviceName = json.getString("deviceName");
                    } else {
                        // 其他下行消息处理,强烈建议另起线程处理,以免回调堵塞
                    }
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                @Override
                public void connectionLost(Throwable cause) {
                }
            });
            System.out.println("Connecting to broker: " + serverURL);
            sampleClient.connect(connOpts);
            System.out.print("Connected: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
        } catch (MqttException e) {
            System.out.print("connect failed: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
            System.out.println("reason " + e.getReasonCode());
            System.out.println("msg " + e.getMessage());
            System.out.println("loc " + e.getLocalizedMessage());
            System.out.println("cause " + e.getCause());
            System.out.println("excep " + e);
            e.printStackTrace();
        } catch (Exception e) {
            System.out.print("connect exception: clientId=" + clientId);
            System.out.println(",username=" + username + ",password=" + password);
            System.out.println("msg " + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 发布消息,默认qos=0
     * 
     * @param topic 发布消息的topic
     * @param payload 发布的消息内容
     */
    public void publish(String topic, String payload) {
        byte[] content = payload.getBytes(StandardCharsets.UTF_8);
        publish(topic, 0, content);
    }

    /**
     * 发布消息
     * 
     * @param topic 发布消息的topic
     * @param qos 消息等级,平台支持qos=0和qos=1,不支持qos=2
     * @param payload 发布的消息内容
     */
    public void publish(String topic, int qos, byte[] payload) {
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        try {
            sampleClient.publish(topic, message);
            System.out.println("Message published: topic=" + topic + ",qos=" + qos);
        } catch (MqttException e) {
            System.out.println("publish failed: topic=" + topic + ",qos=" + qos);
            System.out.println("reason " + e.getReasonCode());
            System.out.println("msg " + e.getMessage());
            System.out.println("loc " + e.getLocalizedMessage());
            System.out.println("cause " + e.getCause());
            System.out.println("excep " + e);
            e.printStackTrace();
        }
    }

    protected SSLSocketFactory createSSLSocket() throws Exception {

        // CA根证书,可以从官网下载 https://help.aliyun.com/document_detail/73742.html
        // 设备证书,可以从控制台设备信息下载

        // CA certificate is used to authenticate server
        InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream("/root.crt");
        CertificateFactory cf = CertificateFactory.getInstance("X.509");
        Certificate ca = cf.generateCertificate(in);
        in.close();
        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca", ca);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(keyStore);

        // client key and certificates are sent to server so it can authenticate us
        InputStream certIn = IotMqttClientWithAuthByX509.class.getResourceAsStream(certPath);
        CertificateFactory certCf = CertificateFactory.getInstance("X.509");
        Certificate certCa = certCf.generateCertificate(certIn);
        certIn.close();
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", certCa);
        PrivateKey privateKey = getPrivateKey(privateKeyPath);
        ks.setKeyEntry("private-key", privateKey, privateKeyPassword.toCharArray(), new Certificate[] { certCa });
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, privateKeyPassword.toCharArray());

        SSLContext context = SSLContext.getInstance("TLSV1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
        SSLSocketFactory socketFactory = context.getSocketFactory();
        return socketFactory;
    }

    private PrivateKey getPrivateKey(String path) throws Exception {
        byte[] buffer = Base64.decodeBase64(getPem(path));
        PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer);
        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
        return keyFactory.generatePrivate(keySpec);
    }

    private String getPem(String path) throws Exception {
        InputStream in = IotMqttClientWithAuthByX509.class.getResourceAsStream(path);
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String readLine = null;
        StringBuilder sb = new StringBuilder();
        while ((readLine = br.readLine()) != null) {
            if (readLine.charAt(0) == '-') {
                continue;
            } else {
                sb.append(readLine);
                sb.append('\r');
            }
        }
        in.close();
        return sb.toString();
    }

    /**
     * 1、设备证书从控制台设备列表页面下载,下载包包含cer和key两个文件 <br>
     * 2、私钥是pkcs1格式的,java原生ssl需要转成pkcs8格式使用 <br>
     * 3、mqttClientId连接参数只需要也必需要填写 |securemode=2|,表示只能使用TLS <br>
     * 4、mqttUsername和mqttPassword填写"",因为使用证书认证就不需要这两个参数了 <br>
     * 5、连接成功后会下发pk+dn信息到/ext/auth/identity/response,用于组装topic进行消息收发 <br>
     * 
     * @param args
     */
    public static void main(String[] args) {

        IotMqttClientWithAuthByX509 client = new IotMqttClientWithAuthByX509();

        // 填写设备证书路径信息
        client.connect("您的设备证书路径", "您的证书私钥路径", "");

        // 连接之后,睡两秒,保证pk+dn已经下发,不然消息收发是topic的pk+dn字段是空的
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 发送消息验证连接是否成功
        String updateTopic = "/" + productKey + "/" + deviceName + "/user/update";
        client.publish(updateTopic, "hello mqtt with X.509 auth");
    }
}


上一篇:物联网平台 如何自己制作CA证书


下一篇:RHEL6.3配置文件共享(2) autofs服务