当前位置: > > > SpringBoot - 集成MQTT教程2(订阅消息)

SpringBoot - 集成MQTT教程2(订阅消息)

    在之前的文章中我介绍了 SpringBoot 项目如何实现 MQTT 消息推送功能,本文接着介绍消息的订阅处理。其中 pom.xml 文件的依赖配置、以及 application.properties 文件的 MQTT 服务器配置同前文一样(点击查看),这里就不重复说明了。

三、实现 MQTT 消息的订阅

1,MqttReceiverConfig.java(MQTT 消息订阅配置类)

/**
 * MQTT配置,消费者
 */
@Configuration
public class MqttReceiverConfig {
    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";

    // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
    private static final byte[] WILL_DATA;
    static {
        WILL_DATA = "offline".getBytes();
    }

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.receiver.clientId}")
    private String clientId;

    @Value("${mqtt.receiver.defaultTopic}")
    private String defaultTopic;

    /**
     * MQTT连接器选项
     */
    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置连接的用户名
        if(!username.trim().equals("")){
            options.setUserName(username);
        }
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置连接的地址
        options.setServerURIs(StringUtils.split(url, ","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        return options;
    }

    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory receiverMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(消费者)
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }


    /**
     * MQTT消息订阅绑定(消费者)
     */
    @Bean
    public MessageProducer inbound() {
        // 可以同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clientId, receiverMqttClientFactory(),
                        StringUtils.split(defaultTopic, ","));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String msg = message.getPayload().toString();
                System.out.println("\n--------------------START-------------------\n" +
                        "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
                        "\n---------------------END--------------------");
            }
        };
    }
}

2,测试运行

(1)项目启动后,我们使用 MQTTBox 对“hangge”这个主题,发送一条消息:

(2)可以看到 SprinBoot 项目这边成功接收到消息并打印出来:

附:动态设置监听的 Topic

(1)上面的样例中我们代码在初始化时就配置好监听主题。如果希望在运行过程中能够动态地新增或者删除订阅的 Topic,可以对 MQTT 消息订阅配置类稍作修改,将MqttPahoMessageDrivenChannelAdapter 对象定义成一个全局变量:
/**
 * MQTT配置,消费者
 */
@Configuration
public class MqttReceiverConfig {
    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";

    public MqttPahoMessageDrivenChannelAdapter adapter;

    // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
    private static final byte[] WILL_DATA;
    static {
        WILL_DATA = "offline".getBytes();
    }

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.receiver.clientId}")
    private String clientId;

    @Value("${mqtt.receiver.defaultTopic}")
    private String defaultTopic;

    @Autowired
    private ReceiveDataController receiveDataController;

    /**
     * MQTT连接器选项
     */
    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置连接的用户名
        if(!username.trim().equals("")){
            options.setUserName(username);
        }
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        // 设置连接的地址
        options.setServerURIs(url.split(","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        // 但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        return options;
    }

    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory receiverMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(消费者)
     */
    @Bean(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息订阅绑定(消费者)
     */
    @Bean
    public MessageProducer inbound() {
        // 可以同时消费(订阅)多个Topic
        adapter = new MqttPahoMessageDrivenChannelAdapter(
                        clientId, receiverMqttClientFactory(),
                        defaultTopic.split(","));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String msg = message.getPayload().toString();
                /**System.out.println("\n--------------------START-------------------\n" +
                        "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
                        "\n---------------------END--------------------");
                 **/
                EventBean event = JSON.parseObject(msg, EventBean.class);
                receiveDataController.receive(event);
            }
        };
    }
}

(2)然后我们就可以在代码中通过这个 adapter 来添加或者删除 Topic
@RestController
public class TestController {
    @Autowired
    MqttReceiverConfig mqttReceiverConfig;

    @GetMapping("/test")
    public void test() {
        // 添加一个或多个监听Topic
        mqttReceiverConfig.adapter.addTopic("topic1"); // 默认qos为1
        mqttReceiverConfig.adapter.addTopic("topic2", 1);
        mqttReceiverConfig.adapter.addTopic("topic3", "topic4");
        mqttReceiverConfig.adapter.addTopics(new String[]{"topic5", "topic6"},new int[]{1, 1});
        // 删除一个或多个监听Topic
        mqttReceiverConfig.adapter.removeTopic("topic1");
        mqttReceiverConfig.adapter.removeTopic("topic2", "topic3");
    }
}
评论0