一、MQTT協議簡介
MQTT(Message Queuing Telemetry Transport)是一種輕量級、基於發佈/訂閱模式的消息傳輸協議,具有開放、簡單和易於實現的特點。
MQTT協議可以用於物聯網、移動設備等多種場景,主要用於設備與設備之間的通訊,將設備產生的數據傳輸到雲平台或控制其他設備。
MQTT協議採用基於TCP/IP的傳輸方式,支持Quality of Service(QoS)0,1,2等多種服務質量級別,可以支持百萬級別的TCP/IP連接,是當前IoT領域最為流行的協議之一。
二、ActiveMQ簡介
ActiveMQ是一個開源的、基於Java的消息中間件,支持多種消息傳輸協議,例如TCP/IP、STOMP、WebSocket等。
ActiveMQ採用JMS(Java Message Service)作為消息API,可以與Java應用程序無縫集成,擁有高可伸縮性、高可用性、容錯性等優點。
三、ActiveMQ支持的MQTT版本
ActiveMQ從版本5.9.0開始支持MQTT 3.1.1版本。
在ActiveMQ中,MQTT消息協議是作為插件存在的,因此需要使用ActiveMQ的命令來進行啟用和管理。
四、使用ActiveMQ MQTT傳輸消息
我們可以通過ActiveMQ MQTT插件來實現基於MQTT協議的消息傳輸,下面是一個簡單的示例,展示如何使用ActiveMQ MQTT發送和接收消息。
1、啟用ActiveMQ MQTT插件
<transportConnectors>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.defaultKeepAlive=60000&transport.closeAsync=true"/>
</transportConnectors>
在ActiveMQ的配置文件activemq.xml中,可以添加上述代碼來啟用MQTT插件。
2、使用ActiveMQ MQTT發送消息
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MQTTSender {
public static void main(String[] args) {
String topic = "MQTT Examples";
String content = "MQTT Test";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaMQTTSender";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}
上述代碼中,我們使用eclipse的paho.mqttv3包來實現消息發送。運行該代碼後,可以在ActiveMQ控制台查看推送的消息。
3、使用ActiveMQ MQTT接收消息
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTSubscriber implements MqttCallback {
public static void main(String[] args) {
String topic = "MQTT Examples";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaMQTTSubscriber";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
sampleClient.setCallback(new MQTTSubscriber());
sampleClient.subscribe(topic, qos);
System.out.println("Subscribing topic "+ topic +" with QoS "+qos);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
public void connectionLost(Throwable arg0) {
System.out.println("Connection lost");
}
public void deliveryComplete(IMqttDeliveryToken arg0) {
System.out.println("Delivery complete");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived:");
System.out.println(" Topic: "+ topic);
System.out.println(" Message: "+ new String(message.getPayload()));
}
}
上述代碼中,我們實現了一個MQTT client來接收消息。
五、總結
本文簡單介紹了MQTT協議和ActiveMQ,以及如何通過ActiveMQ實現MQTT消息傳輸。
通過學習本文,您可以了解到:
1. 什麼是MQTT協議,以及MQTT協議適用的場景
2. ActiveMQ支持的MQTT版本和如何啟用MQTT插件
3. 如何使用ActiveMQ MQTT發送和接收消息
原創文章,作者:OZARK,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/334436.html