SpringBoot - 集成MQTT教程2(订阅消息)
在之前的文章中我介绍了 SpringBoot 项目如何实现 MQTT 消息推送功能,本文接着介绍消息的订阅处理。其中 pom.xml 文件的依赖配置、以及 application.properties 文件的 MQTT 服务器配置同前文一样(点击查看),这里就不重复说明了。

(2)然后我们就可以在代码中通过这个 adapter 来添加或者删除 Topic:
三、实现 MQTT 消息的订阅
1,MqttReceiverConfig.java(MQTT 消息订阅配置类)
/** * MQTT配置,消费者 */ @Configuration public class MqttReceiverConfig { /** * 订阅的bean名称 */ public static final String CHANNEL_NAME_IN = "mqttInboundChannel"; // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String url; @Value("${mqtt.receiver.clientId}") private String clientId; @Value("${mqtt.receiver.defaultTopic}") private String defaultTopic; /** * MQTT连接器选项 */ @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions options = new MqttConnectOptions(); // 设置连接的用户名 if(!username.trim().equals("")){ options.setUserName(username); } // 设置连接的密码 options.setPassword(password.toCharArray()); // 设置连接的地址 options.setServerURIs(StringUtils.split(url, ",")); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 // 但这个方法并没有重连的机制 options.setKeepAliveInterval(20); return options; } /** * MQTT客户端 */ @Bean public MqttPahoClientFactory receiverMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } /** * MQTT信息通道(消费者) */ @Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInboundChannel() { return new DirectChannel(); } /** * MQTT消息订阅绑定(消费者) */ @Bean public MessageProducer inbound() { // 可以同时消费(订阅)多个Topic MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId, receiverMqttClientFactory(), StringUtils.split(defaultTopic, ",")); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); // 设置订阅通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter; } /** * MQTT消息处理器(消费者) */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); System.out.println("\n--------------------START-------------------\n" + "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg + "\n---------------------END--------------------"); } }; } }
2,测试运行
(1)项目启动后,我们使用 MQTTBox 对“hangge”这个主题,发送一条消息:

(2)可以看到 SprinBoot 项目这边成功接收到消息并打印出来:

附:动态设置监听的 Topic
(1)上面的样例中我们代码在初始化时就配置好监听主题。如果希望在运行过程中能够动态地新增或者删除订阅的 Topic,可以对 MQTT 消息订阅配置类稍作修改,将MqttPahoMessageDrivenChannelAdapter 对象定义成一个全局变量:
/** * MQTT配置,消费者 */ @Configuration public class MqttReceiverConfig { /** * 订阅的bean名称 */ public static final String CHANNEL_NAME_IN = "mqttInboundChannel"; public MqttPahoMessageDrivenChannelAdapter adapter; // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String url; @Value("${mqtt.receiver.clientId}") private String clientId; @Value("${mqtt.receiver.defaultTopic}") private String defaultTopic; @Autowired private ReceiveDataController receiveDataController; /** * MQTT连接器选项 */ @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions options = new MqttConnectOptions(); // 设置连接的用户名 if(!username.trim().equals("")){ options.setUserName(username); } // 设置连接的密码 options.setPassword(password.toCharArray()); // 设置连接的地址 options.setServerURIs(url.split(",")); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线 // 但这个方法并没有重连的机制 options.setKeepAliveInterval(20); return options; } /** * MQTT客户端 */ @Bean public MqttPahoClientFactory receiverMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } /** * MQTT信息通道(消费者) */ @Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInboundChannel() { return new DirectChannel(); } /** * MQTT消息订阅绑定(消费者) */ @Bean public MessageProducer inbound() { // 可以同时消费(订阅)多个Topic adapter = new MqttPahoMessageDrivenChannelAdapter( clientId, receiverMqttClientFactory(), defaultTopic.split(",")); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); // 设置订阅通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter; } /** * MQTT消息处理器(消费者) */ @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); /**System.out.println("\n--------------------START-------------------\n" + "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg + "\n---------------------END--------------------"); **/ EventBean event = JSON.parseObject(msg, EventBean.class); receiveDataController.receive(event); } }; } }
(2)然后我们就可以在代码中通过这个 adapter 来添加或者删除 Topic:
@RestController public class TestController { @Autowired MqttReceiverConfig mqttReceiverConfig; @GetMapping("/test") public void test() { // 添加一个或多个监听Topic mqttReceiverConfig.adapter.addTopic("topic1"); // 默认qos为1 mqttReceiverConfig.adapter.addTopic("topic2", 1); mqttReceiverConfig.adapter.addTopic("topic3", "topic4"); mqttReceiverConfig.adapter.addTopics(new String[]{"topic5", "topic6"},new int[]{1, 1}); // 删除一个或多个监听Topic mqttReceiverConfig.adapter.removeTopic("topic1"); mqttReceiverConfig.adapter.removeTopic("topic2", "topic3"); } }