基于 Spring Boot 的多连接 MQTT 客户端

张开发
2026/4/17 11:40:51 15 分钟阅读

分享文章

基于 Spring Boot 的多连接 MQTT 客户端
Multi-MQTT: 基于 Spring Boot 的多连接 MQTT 客户端实现项目概述Multi-MQTT 是一个基于Spring Boot 4和Spring Integration MQTT构建的轻量级 MQTT 消息客户端解决方案。该项目展示了如何在企业级 Java 应用中优雅地集成 MQTT 协议实现多服务器连接、自动重连、事件监听等核心功能。技术栈技术版本说明Java21支持虚拟线程Spring Boot4.0.0应用框架Spring Integration MQTT7.0.4MQTT 集成模块Eclipse Paho1.2.5MQTT 客户端库Lombok1.18.42代码简化核心特性1. 多服务器 URI 配置支持配置多个 MQTT Broker 地址实现高可用连接mqtt:uris:-wss://primary-broker:8084-wss://backup-broker:80842. 自动重连机制内置自动重连功能网络中断后可自动恢复连接mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setKeepAliveInterval(2);// 2 秒心跳3. 完整的事件监听监听 MQTT 生命周期中的所有关键事件MqttSubscribedEvent- 订阅成功MqttMessageSentEvent- 消息发送MqttMessageDeliveredEvent- 消息送达确认MqttConnectionFailedEvent- 连接失败4. 动态 Client ID每次启动生成唯一 Client ID避免并发冲突StringclientIdmqttConfig.getClientId()-System.currentTimeMillis();5. QoS 服务质量保障支持 QoS 1 级别的消息传递确保消息至少送达一次。架构设计项目结构multi-mqtt/ ├── src/main/java/com/multi/mqtt/ │ ├── MultiMqttApplication.java # 应用入口 │ ├── config/ │ │ ├── MqttConfig.java # 配置属性类 │ │ └── MqttClientConfig.java # MQTT 客户端配置 │ └── listener/ │ └── MqttListener.java # 事件监听器 └── src/main/resources/ ├── application.yml # 主配置文件 └── application-dev.yml # 开发环境配置组件交互图┌─────────────────┐ │ MqttConfig │ ← 配置属性 (ConfigurationProperties) └────────┬────────┘ │ ▼ ┌─────────────────┐ │ MqttClientConfig│ ← Bean 配置 (Configuration) └────────┬────────┘ │ ┌────┴────┐ ▼ ▼ ┌─────────┐ ┌──────────────┐ │ inbound │ │ mqttInputChannel │ │ (订阅) │ │ (消息通道) │ └────┬────┘ └───────┬──────┘ │ │ └──────┬───────┘ ▼ ┌─────────────┐ │ handler() │ ← 消息处理器 └─────────────┘快速开始依赖配置dependencies!-- Spring Boot Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Integration MQTT --dependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactIdversion7.0.4/version/dependency!-- Eclipse Paho Client --dependencygroupIdorg.eclipse.paho/groupIdartifactIdorg.eclipse.paho.client.mqttv3/artifactIdversion1.2.5/version/dependency/dependencies配置文件server:port:8080mqtt:uris:-wss://your-broker-address:8084username:your-usernamepassword:your-passwordclient-id:java-mqtt-clienttopics:-mqtt-topic/#运行项目mvn spring-boot:run核心代码解析1. 配置属性类 (MqttConfig.java)ConfigurationConfigurationProperties(prefixmqtt)DatapublicclassMqttConfig{privateStringusername;privateStringpassword;privateString[]uris;// 支持多服务器privateStringclientId;privateString[]topics;}2. 客户端工厂配置 (MqttClientConfig.java)BeanpublicMqttPahoClientFactorymqttClientFactory(){DefaultMqttPahoClientFactoryfactorynewDefaultMqttPahoClientFactory();MqttConnectOptionsoptionsnewMqttConnectOptions();options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());options.setServerURIs(mqttConfig.getUris());// 多服务器 URIoptions.setKeepAliveInterval(2);// 心跳间隔options.setAutomaticReconnect(true);// 自动重连factory.setConnectionOptions(options);returnfactory;}3. 事件监听器 (MqttListener.java)ComponentSlf4jpublicclassMqttListener{EventListenerpublicvoidhandleEvent(MqttIntegrationEventevent){if(eventinstanceofMqttConnectionFailedEvent){log.error([MQTT]-connection failed,((MqttConnectionFailedEvent)event).getCause());}elseif(eventinstanceofMqttSubscribedEvent){log.info([MQTT]-subscribed successfully);}// ... 其他事件处理}}扩展建议1. 添加消息发布功能取消注释MqttClientConfig.java中的发布相关代码即可启用消息发布能力。2. 自定义消息转换器DefaultPahoMessageConverterconverternewDefaultPahoMessageConverter();converter.setPayloadType(String.class);// 或自定义类型adapter.setConverter(converter);3. 多主题订阅管理支持通配符订阅topic/#- 订阅 topic 下所有层级topic/- 订阅 topic 下一级所有主题4. SSL/TLS 安全连接配置中使用wss://协议已启用加密传输。最佳实践生产环境使用持久化 Client ID- 当前使用时间戳生成临时 ID生产环境应使用固定 ID 并配合 Clean Session合理设置 KeepAlive 间隔- 根据网络环境调整建议 60-120 秒监控连接状态- 通过事件监听器实现告警机制消息堆积处理- 考虑使用消息队列缓冲大量消息参考资料Spring Integration MQTT 官方文档Eclipse Paho MQTT C 客户端MQTT 协议规范LicenseMIT License本文档适用于技术分享与学习参考。作者w | 创建日期2026-04-07

更多文章