深入了解ActiveMQ MQTT消息通信

一、MQTT协议简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模式的消息传输协议,具有开放、简单和易于实现的特点。

MQTT协议可以用于物联网、移动设备等多种场景,主要用于设备与设备之间的通讯,将设备产生的数据传输到云平台或控制其他设备。

MQTT协议采用基于TCP/IP的传输方式,支持Quality of Service(QoS)0,1,2等多种服务质量级别,可以支持百万级别的TCP/IP连接,是当前IoT领域最为流行的协议之一。

二、ActiveMQ简介

ActiveMQ是一个开源的、基于Java的消息中间件,支持多种消息传输协议,例如TCP/IP、STOMP、WebSocket等。

ActiveMQ采用JMS(Java Message Service)作为消息API,可以与Java应用程序无缝集成,拥有高可伸缩性、高可用性、容错性等优点。

三、ActiveMQ支持的MQTT版本

ActiveMQ从版本5.9.0开始支持MQTT 3.1.1版本。

在ActiveMQ中,MQTT消息协议是作为插件存在的,因此需要使用ActiveMQ的命令来进行启用和管理。

四、使用ActiveMQ MQTT传输消息

我们可以通过ActiveMQ MQTT插件来实现基于MQTT协议的消息传输,下面是一个简单的示例,展示如何使用ActiveMQ MQTT发送和接收消息。

1、启用ActiveMQ MQTT插件


  <transportConnectors>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.defaultKeepAlive=60000&transport.closeAsync=true"/>
  </transportConnectors>

在ActiveMQ的配置文件activemq.xml中,可以添加上述代码来启用MQTT插件。

2、使用ActiveMQ MQTT发送消息


  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
   
  public class MQTTSender {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      String content      = "MQTT Test";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSender";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        System.out.println("Publishing message: "+content);
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        sampleClient.publish(topic, message);
        System.out.println("Message published");
        sampleClient.disconnect();
        System.out.println("Disconnected");
        System.exit(0);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
  }

上述代码中,我们使用eclipse的paho.mqttv3包来实现消息发送。运行该代码后,可以在ActiveMQ控制台查看推送的消息。

3、使用ActiveMQ MQTT接收消息


  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
   
  public class MQTTSubscriber implements MqttCallback {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSubscriber";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        sampleClient.setCallback(new MQTTSubscriber());
        sampleClient.subscribe(topic, qos);
        System.out.println("Subscribing topic "+ topic +" with QoS "+qos);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
    public void connectionLost(Throwable arg0) {
      System.out.println("Connection lost");
    }
   
    public void deliveryComplete(IMqttDeliveryToken arg0) {
      System.out.println("Delivery complete");
    }
   
    public void messageArrived(String topic, MqttMessage message) throws Exception {
      System.out.println("Message arrived:");
      System.out.println("  Topic: "+ topic);
      System.out.println("  Message: "+ new String(message.getPayload()));
    }
  }

上述代码中,我们实现了一个MQTT client来接收消息。

五、总结

本文简单介绍了MQTT协议和ActiveMQ,以及如何通过ActiveMQ实现MQTT消息传输。

通过学习本文,您可以了解到:

1. 什么是MQTT协议,以及MQTT协议适用的场景

2. ActiveMQ支持的MQTT版本和如何启用MQTT插件

3. 如何使用ActiveMQ MQTT发送和接收消息

原创文章,作者:OZARK,如若转载,请注明出处:https://www.506064.com/n/334436.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
OZARKOZARK
上一篇 2025-02-05 13:05
下一篇 2025-02-05 13:05

相关推荐

  • MQTT使用教程

    MQTT是一种轻量级的消息传输协议,适用于物联网领域中的设备与云端、设备与设备之间的数据传输。本文将介绍使用MQTT实现设备与云端数据传输的方法和注意事项。 一、准备工作 在使用M…

    编程 2025-04-29
  • RabbitMQ和Yii2的消息队列应用

    本文将探讨RabbitMQ和Yii2之间的消息队列应用。从概念、安装和配置、使用实例等多个方面详细讲解,帮助读者了解和掌握RabbitMQ和Yii2的消息队列应用。 一、Rabbi…

    编程 2025-04-29
  • ROS线程发布消息异常解决方法

    针对ROS线程发布消息异常问题,我们可以从以下几个方面进行分析和解决。 一、检查ROS代码是否正确 首先,我们需要检查ROS代码是否正确。可能会出现的问题包括: 是否正确初始化RO…

    编程 2025-04-28
  • 使用Python发送微信消息给别人

    问题:如何使用Python发送微信消息给别人? 一、配置微信开发者平台 首先,要想发送微信消息,需要在微信开发者平台中进行配置,来获取对应的授权信息。具体步骤如下: 1、登录微信公…

    编程 2025-04-28
  • 跨域通信浮标——实现客户端之间的跨域通信

    本文将介绍跨域通信浮标的使用方法,该浮标可以实现客户端之间的跨域通信,解决了浏览器同源策略的限制,让开发者能够更加方便地进行跨域通信。 一、浮标的原理 跨域通信浮标的原理是基于浮动…

    编程 2025-04-27
  • 通信专业Python和Java的开发技巧

    本文旨在介绍通信专业Python和Java的开发技巧,为读者提供实用且可操作的思路和方法。 一、Python在通信领域中的应用 Python是一种优秀的程序设计语言,因其易学易用、…

    编程 2025-04-27
  • 通过验证后如何看验证消息

    验证消息通常告诉用户某些操作是否成功或失败,它对于用户体验和操作流程都非常重要。当用户通过一项操作之后,获取到相应的验证消息能够帮助用户更好的了解操作结果,从而采取相应的行动和决策…

    编程 2025-04-27
  • 深入解析Vue3 defineExpose

    Vue 3在开发过程中引入了新的API `defineExpose`。在以前的版本中,我们经常使用 `$attrs` 和` $listeners` 实现父组件与子组件之间的通信,但…

    编程 2025-04-25
  • 深入理解byte转int

    一、字节与比特 在讨论byte转int之前,我们需要了解字节和比特的概念。字节是计算机存储单位的一种,通常表示8个比特(bit),即1字节=8比特。比特是计算机中最小的数据单位,是…

    编程 2025-04-25
  • 深入理解Flutter StreamBuilder

    一、什么是Flutter StreamBuilder? Flutter StreamBuilder是Flutter框架中的一个内置小部件,它可以监测数据流(Stream)中数据的变…

    编程 2025-04-25

发表回复

登录后才能评论