深入了解ActiveMQ MQTT消息通信

一、MQTT協議簡介

MQTT(Message Queuing Telemetry Transport)是一種輕量級、基於發布/訂閱模式的消息傳輸協議,具有開放、簡單和易於實現的特點。

MQTT協議可以用於物聯網、移動設備等多種場景,主要用於設備與設備之間的通訊,將設備產生的數據傳輸到雲平台或控制其他設備。

MQTT協議採用基於TCP/IP的傳輸方式,支持Quality of Service(QoS)0,1,2等多種服務質量級別,可以支持百萬級別的TCP/IP連接,是當前IoT領域最為流行的協議之一。

二、ActiveMQ簡介

ActiveMQ是一個開源的、基於Java的消息中間件,支持多種消息傳輸協議,例如TCP/IP、STOMP、WebSocket等。

ActiveMQ採用JMS(Java Message Service)作為消息API,可以與Java應用程序無縫集成,擁有高可伸縮性、高可用性、容錯性等優點。

三、ActiveMQ支持的MQTT版本

ActiveMQ從版本5.9.0開始支持MQTT 3.1.1版本。

在ActiveMQ中,MQTT消息協議是作為插件存在的,因此需要使用ActiveMQ的命令來進行啟用和管理。

四、使用ActiveMQ MQTT傳輸消息

我們可以通過ActiveMQ MQTT插件來實現基於MQTT協議的消息傳輸,下面是一個簡單的示例,展示如何使用ActiveMQ MQTT發送和接收消息。

1、啟用ActiveMQ MQTT插件


  <transportConnectors>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.defaultKeepAlive=60000&transport.closeAsync=true"/>
  </transportConnectors>

在ActiveMQ的配置文件activemq.xml中,可以添加上述代碼來啟用MQTT插件。

2、使用ActiveMQ MQTT發送消息


  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
   
  public class MQTTSender {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      String content      = "MQTT Test";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSender";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        System.out.println("Publishing message: "+content);
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        sampleClient.publish(topic, message);
        System.out.println("Message published");
        sampleClient.disconnect();
        System.out.println("Disconnected");
        System.exit(0);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
  }

上述代碼中,我們使用eclipse的paho.mqttv3包來實現消息發送。運行該代碼後,可以在ActiveMQ控制台查看推送的消息。

3、使用ActiveMQ MQTT接收消息


  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
   
  public class MQTTSubscriber implements MqttCallback {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSubscriber";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        sampleClient.setCallback(new MQTTSubscriber());
        sampleClient.subscribe(topic, qos);
        System.out.println("Subscribing topic "+ topic +" with QoS "+qos);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
    public void connectionLost(Throwable arg0) {
      System.out.println("Connection lost");
    }
   
    public void deliveryComplete(IMqttDeliveryToken arg0) {
      System.out.println("Delivery complete");
    }
   
    public void messageArrived(String topic, MqttMessage message) throws Exception {
      System.out.println("Message arrived:");
      System.out.println("  Topic: "+ topic);
      System.out.println("  Message: "+ new String(message.getPayload()));
    }
  }

上述代碼中,我們實現了一個MQTT client來接收消息。

五、總結

本文簡單介紹了MQTT協議和ActiveMQ,以及如何通過ActiveMQ實現MQTT消息傳輸。

通過學習本文,您可以了解到:

1. 什麼是MQTT協議,以及MQTT協議適用的場景

2. ActiveMQ支持的MQTT版本和如何啟用MQTT插件

3. 如何使用ActiveMQ MQTT發送和接收消息

原創文章,作者:OZARK,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/334436.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
OZARK的頭像OZARK
上一篇 2025-02-05 13:05
下一篇 2025-02-05 13:05

相關推薦

  • MQTT使用教程

    MQTT是一種輕量級的消息傳輸協議,適用於物聯網領域中的設備與雲端、設備與設備之間的數據傳輸。本文將介紹使用MQTT實現設備與雲端數據傳輸的方法和注意事項。 一、準備工作 在使用M…

    編程 2025-04-29
  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • ROS線程發布消息異常解決方法

    針對ROS線程發布消息異常問題,我們可以從以下幾個方面進行分析和解決。 一、檢查ROS代碼是否正確 首先,我們需要檢查ROS代碼是否正確。可能會出現的問題包括: 是否正確初始化RO…

    編程 2025-04-28
  • 使用Python發送微信消息給別人

    問題:如何使用Python發送微信消息給別人? 一、配置微信開發者平台 首先,要想發送微信消息,需要在微信開發者平台中進行配置,來獲取對應的授權信息。具體步驟如下: 1、登錄微信公…

    編程 2025-04-28
  • 跨域通信浮標——實現客戶端之間的跨域通信

    本文將介紹跨域通信浮標的使用方法,該浮標可以實現客戶端之間的跨域通信,解決了瀏覽器同源策略的限制,讓開發者能夠更加方便地進行跨域通信。 一、浮標的原理 跨域通信浮標的原理是基於浮動…

    編程 2025-04-27
  • 通信專業Python和Java的開發技巧

    本文旨在介紹通信專業Python和Java的開發技巧,為讀者提供實用且可操作的思路和方法。 一、Python在通信領域中的應用 Python是一種優秀的程序設計語言,因其易學易用、…

    編程 2025-04-27
  • 通過驗證後如何看驗證消息

    驗證消息通常告訴用戶某些操作是否成功或失敗,它對於用戶體驗和操作流程都非常重要。當用戶通過一項操作之後,獲取到相應的驗證消息能夠幫助用戶更好的了解操作結果,從而採取相應的行動和決策…

    編程 2025-04-27
  • 深入解析Vue3 defineExpose

    Vue 3在開發過程中引入了新的API `defineExpose`。在以前的版本中,我們經常使用 `$attrs` 和` $listeners` 實現父組件與子組件之間的通信,但…

    編程 2025-04-25
  • 深入理解byte轉int

    一、位元組與比特 在討論byte轉int之前,我們需要了解位元組和比特的概念。位元組是計算機存儲單位的一種,通常表示8個比特(bit),即1位元組=8比特。比特是計算機中最小的數據單位,是…

    編程 2025-04-25
  • 深入理解Flutter StreamBuilder

    一、什麼是Flutter StreamBuilder? Flutter StreamBuilder是Flutter框架中的一個內置小部件,它可以監測數據流(Stream)中數據的變…

    編程 2025-04-25

發表回復

登錄後才能評論