当前位置: 首页 > news >正文

在JAVA中使用Paho MQTT客户端

1.在maven里面配置好依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>

2.创建APP类

package com.leo;import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "topic";  //主题String pubTopic = "topic";String content = "Hello World66+6";  //内容int qos = 2;String broker = "tcp://ip:1883";    //ip是自己服务器的ipString clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("name");   //用户名connOpts.setPassword("12345".toCharArray());   //密码// 保留会话connOpts.setCleanSession(true);// 设置回调client.setCallback(new OnMessageCallback());// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");// 主循环,保持程序运行while (true) {try {Thread.sleep(1000); // 每秒检查一次} catch (InterruptedException e) {e.printStackTrace();}}} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}static class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Message arrived: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("Delivery complete for token: " + token);}}
}

3.创建回调消息处理类 OnMessageCallback

package io.emqx;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}
}

4.结果展示

至此第一部分的功能已经完成了,以下是扩展功能

5.整合到springboot

创建一个 MqttService 类

package com.leo.service;import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;@Service
public class MqttService {public void startMqttClient() {String subTopic = "topic";  //订阅的主题String pubTopic = "topic";String content = "Hello World66+6";int qos = 2;String broker = "tcp://ip:1883";  //你自己的ipString clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("8266");connOpts.setPassword("12345".toCharArray());// 保留会话connOpts.setCleanSession(true);// 设置回调client.setCallback(new OnMessageCallback());// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");// 主循环,保持程序运行while (true) {try {Thread.sleep(1000); // 每秒检查一次} catch (InterruptedException e) {e.printStackTrace();}}} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}static class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Message arrived: " +"Topic: " + topic +", ID: " + message.getId() +", QoS: " + message.getQos() +", Class: " + message.getClass().getName() +", Payload: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("Delivery complete for token: " + token);}}
}

在 Spring Boot 应用中调用服务

SprintBootDemo1Application 中调用 MqttServicestartMqttClient 方法。为了确保主线程不会因为 Spring Boot 的启动过程而退出,可以在 startMqttClient 方法中使用一个新的线程来运行 MQTT 客户端逻辑。

package com.leo;import com.leo.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class SprintBootDemo1Application implements CommandLineRunner {@Autowiredprivate MqttService mqttService;public static void main(String[] args) {SpringApplication.run(SprintBootDemo1Application.class, args);}@Overridepublic void run(String... args) throws Exception {// 在新线程中启动 MQTT 客户端new Thread(() -> mqttService.startMqttClient()).start();}
}

就ok了


http://www.mrgr.cn/news/63431.html

相关文章:

  • vue elemnt-ui自定义时间日期选择
  • 使用网页版Jupyter Notebook和VScode打开.ipynb文件
  • 再次梳理ISP的大致流程
  • 左值引用(Lvalue Reference)和右值引用(Rvalue Reference)详解
  • 自动化测试
  • dolphinscheduler2.0.9升级3.1.9版本问题记录
  • ArkTS基础
  • Excel函数学习记录
  • Matlab中国三大自然分区
  • 智慧园区有哪些优势
  • Java解析word中的表格或者文本
  • 揭秘云计算 | 1、云从哪里来?
  • Redis(2):内存模型
  • 物联网设备如何助力实现高效远程老人监护
  • batc和mini-batch
  • Java面试题十五
  • 基于springboot的Java学习论坛平台
  • prometheus 快速入门
  • python enum用法
  • opencv - py_imgproc - py_grabcut GrabCut 算法提取前景
  • JavaScript 实战技巧:让你成为前端高手的必备知识3(进阶版)
  • 【环境问题】pycharm远程服务器文件路径问题
  • 【前端】项目中遇到的问题汇总(长期更新)
  • 热点扫描:人工智能专利布局背后的商业博弈
  • Java思想
  • 拒绝无效发稿!软文推广这样精选媒体,一不小心省下百万宣发费用!媒介盒子分享