POM
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
Service.java
package com.vipsoft.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Scanner; public class Service { public static void main(String[] args) throws Exception { String host = "tcp://172.16.3.88:1883"; String topic = "VipSoft_MQTT"; String clientId = "server_id"; // clientId不能重复这个是server的id //新建mqtt连接 MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); //新建mqtt客户端 MqttClient client = new MqttClient(host, clientId); client.connect(options); //新建mqtt消息 MqttMessage message = new MqttMessage(); @SuppressWarnings("resource") Scanner scanner = new Scanner(System.in); System.out.println("请输入要发送的内容:"); while (true) { String MsgMessage= scanner.nextLine(); message.setPayload(MsgMessage.getBytes()); client.publish(topic, message); } } }
Client.java
package com.vipsoft.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; public class Client { public static void main(String[] args) throws Exception { String host = "tcp://172.16.3.88:1883"; String topic = "VipSoft_MQTT"; String clientId = "client_id"; // 1.设置mqtt连接属性 MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); // 2.实例化mqtt客户端 MqttClient client = new MqttClient(host, clientId); // 3.连接 client.connect(options); //这里的setCallback需要新建一个Callback类并实现 MqttCallback 这个类 client.setCallback(new PushCallback()); while (true) { client.subscribe(topic, 2); } } }
PushCallback.java
package com.vipsoft.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; /** * 发布消息的回调类 * * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 * * public void connectionLost(Throwable cause)在断开连接时调用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 * */ public class PushCallback implements MqttCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void connectionLost(Throwable cause) { // 连接丢失后进行重连 System.out.println("连接断开,可以做重连"); logger.info("掉线时间:{}", new Date()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 // System.out.println(message); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } }