Java客户端连接配置 本文为您介绍分布式消息服务MQTT客户端连接配置。 引入依赖//版本号按需调整 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5 连接mqtt云消息服务的示例代码如下: import java.security.SecureRandom; import java.security.cert.X509Certificate; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class ConnTest { // 填入您在mqtt控制台创建的ACL账号密码。 private static final String USERNAME "yourusername"; private static final String AUTHPASSWORD "yourpassword"; // 是否使用tls加密传输 private static final Boolean isTls true; public static void main(String[] args) { // 填写mqtt云消息服务的接入点。接入点分为tls以及非tls两种接入。tls接入格式为:ssl://{ip}:8085 String broker "tcp://localhost:1883"; // 指定连接客户端的id,该id可用于查询连接会话信息以及设备轨迹信息。 String clientId "ctgmqttclienttest"; MemoryPersistence persistence new MemoryPersistence(); try { final MqttClient myClient getMqttClient(broker, clientId, persistence); MqttConnectOptions connOpts new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(USERNAME); connOpts.setPassword(AUTHPASSWORD.toCharArray()); // 设置心跳间隔(这里示例为2分钟) connOpts.setKeepAliveInterval(120); // 设置自动重连 connOpts.setAutomaticReconnect(true); // 设置tls相关配置(可选) // 目前暂未支持自动配置ssl证书,默认的ssl证书需要客户端进行默认证书信任。不影响正常的tls链路加密 if (isTls) { SSLContext sslContext SSLContext.getInstance("TLS"); // 默认信任服务端ssl证书 sslContext.init(null, new TrustManager[]{new X509TrustManager() { public void checkClientTrusted(X509Certificate[] chain, String authType) { } public void checkServerTrusted(X509Certificate[] chain, String authType) { } public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }}, new SecureRandom()); // 可以按照自定义的方式进行ssl证书的主机名验证 connOpts.setHttpsHostnameVerificationEnabled(false); connOpts.setSSLHostnameVerifier((hostname, session) > true); connOpts.setSocketFactory(sslContext.getSocketFactory()); } System.out.println("Connecting to broker: " + broker); myClient.connect(connOpts); System.out.println("Connected"); // 这里编写您的消息收发逻辑 myClient.disconnect(); System.out.println("Disconnected"); System.exit(0); } catch (MqttException me) { // 打印详细的错误信息。 System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } catch (Exception e) { throw new RuntimeException(e); } } private static MqttClient getMqttClient(String broker, String clientId, MemoryPersistence persistence) throws MqttException { final MqttClient myClient new MqttClient(broker, clientId, persistence); myClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { // 连接建立成功 } @Override public void connectionLost(Throwable cause) { // 连接丢失,建议记录日志,做好监控 } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // 收到消息的回调,这里不要进行阻塞操作,以免卡住导致连接断开 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 成功发送消息到服务端 } }); return myClient; } }