SpringBoot - 集成MQTT教程1(发布消息)
MQTT 协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议。具体介绍可以查看我之前写的文章(点击跳转)。
本文演示 SpringBoot 项目中如何集成 MQTT,并实现消息的发送。
一、安装配置
1,依赖配置
编辑项目的 pom.xml 文件,添加如下依赖:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2,配置 MQTT 服务器基本信息
编辑项目的 application.properties 文件,增加 MQTT 服务器配置信息:
# 用户名(这里为空)
mqtt.username=
# 密码(这里为空)
mqtt.password=
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
mqtt.url=tcp://192.168.60.133:1883
# 生产者连接服务器默认客户端ID
mqtt.sender.clientId=mqttProducer
# 默认的推送主题,实际可在调用接口时指定
mqtt.sender.defaultTopic=hangge
# 消费者连接服务器默认客户端ID(这里使用随机数)
mqtt.receiver.clientId=${random.value}
# 默认的接收主题,实际可在调用接口时指定
mqtt.receiver.defaultTopic=hangge
二、实现 MQTT 消息的发送
1,MqttSenderConfig.java(MQTT 消息推送配置类)
/** * MQTT配置,生产者 */ @Configuration public class MqttSenderConfig { /** * 发布的bean名称 */ public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel"; // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String url; @Value("${mqtt.sender.clientId}") private String clientId; @Value("${mqtt.sender.defaultTopic}") private String defaultTopic; /** * MQTT连接器选项 */ @Bean public MqttConnectOptions getSenderMqttConnectOptions(){ MqttConnectOptions options=new MqttConnectOptions(); // 设置连接的用户名 if(!username.trim().equals("")){ options.setUserName(username); } // 设置连接的密码 options.setPassword(password.toCharArray()); // 设置连接的地址 options.setServerURIs(StringUtils.split(url, ",")); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 // 但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 options.setWill("willTopic", WILL_DATA, 2, false); return options; } /** * MQTT客户端 */ @Bean public MqttPahoClientFactory senderMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getSenderMqttConnectOptions()); return factory; } /** * MQTT信息通道(生产者) */ @Bean(name = CHANNEL_NAME_OUT) public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_OUT) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( clientId, senderMqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } }
2,IMqttSender(消息推送接口类)
/** * MQTT生产者消息发送接口 */ @Component @MessagingGateway(defaultRequestChannel = MqttSenderConfig.CHANNEL_NAME_OUT) public interface IMqttSender { /** * 发送信息到MQTT服务器 * * @param data 发送的文本 */ void sendToMqtt(String data); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
3,HelloController.java(测试类)
@RestController public class HelloController { /** * 注入发送MQTT的Bean */ @Autowired private IMqttSender iMqttSender; // 发送自定义消息内容(使用默认主题) @RequestMapping("/test1/{data}") public void test1(@PathVariable("data") String data) { iMqttSender.sendToMqtt(data); } // 发送自定义消息内容,且指定主题 @RequestMapping("/test2/{topic}/{data}") public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) { iMqttSender.sendToMqtt(topic, data); } }
4,测试运行
(1)项目启动后,使用 MQTTBox 订阅“hangge”这个主题,然后使用浏览器访问 http://localhost:8080/test1/abcd1234 可以看到 MQTTBox 这边可以成功接收到发布的消息:

(2)接着我们访问 http://localhost:8080/test2/china/abcd1234,这次除了发送自定消息外还指定了主题(china,而不是使用默认主题),MQTTBox 这边订阅该主题并显示结果:
