当前位置: > > > SpringBoot - Kafka的集成与使用详解12(消费者7:动态开启、关闭监听)

SpringBoot - Kafka的集成与使用详解12(消费者7:动态开启、关闭监听)

    默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic 的消息)。如果我们不想让监听器立即工作,想在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,下面通过样例进行演示。

十二、消费者7:动态启动、停止监听器

1,动态地开启、关闭监听器

(1)消费者这边代码没有什么特别的,主要是设置了个消费者 ID(监听器 ID),后面要根据这个 ID 来开启、关闭监听:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"})
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

(2)然后我定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、关闭:
注意:
  • KafkaListenerEndpointRegistry SpringIO 中已经被注册为 Bean,直接注入使用即可。
  • 还需要注意一下启动监听容器的方法,resume 是恢复的意思不是启动的意思。所以我们需要判断容器是否运行,如果运行则调用 resume 方法,否则调用 start 方法。
@RestController
public class KafkaProducer {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        System.out.println("监听器发送消息!");
        kafkaTemplate.send("topic1", "1条测试消息");
    }

    // 开启监听
    @GetMapping("/start")
    public void start() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }

    // 关闭监听
    @GetMapping("/stop")
    public void stop() {
        System.out.println("关闭监听");
        //判断监听容器是否启动,未启动则将其启动
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)开始测试一下:
  • 启动项目,调用 /test 接口发送一条消息,监听器成功接收到消息。
  • 调用 /stop 接口关闭监听,再次发送一条消息,监听器不会收到消息。
  • 调用 /start 接口开启监听,监听器收到之前发送的消息。
  • 由于监听已经开启,再次发送一条消息,监听器成功接收到消息。

2,禁止监听器自启动

(1)默认情况下,当项目启动的时候,监听器就开始工作。如果想要禁止监听器自启动,首先我们定义一个不自动启动的监听容器工厂:
@Configuration
public class KafkaInitialConfiguration {

    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;

    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

(2)然后将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样该监听器就不会自动启动了。
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

(3)开始测试一下:
  • 启动项目,由于监听器默认没有启动,调用 /test 接口发送一条消息,监听器不会收到消息。
  • 调用 /start 接口开启监听,监听器收到之前发送的消息。
  • 由于监听已经开启,再次发送一条消息,监听器成功接收到消息。

附:定时启动、停止监听器

    有时我们需要监听器在我们指定的时间点开始工作,或者在我们指定的时间点停止工作。比如:项目需要利用 Kafka 做数据持久化的功能,由于用户活跃的时间为早上 10 点至晚上 12 点,那在这个时间段做一个大数据量的持久化可能会影响数据库性能导致用户体验降低,我们可以选择在用户活跃度低的时间段去做持久化的操作,也就是晚上 12 点后到第二条的早上 10 点前。这个可以结合定时任务来实现。

(1)首先在项目启动类上添加 @EnableScheduling 注解开启定时任务:
@SpringBootApplication
@EnableScheduling
public class KtestApplication {
    public static void main(String[] args) {
        SpringApplication.run(KtestApplication.class, args);
    }
}

(2)下面代码我们创建两个定时任务,一个用来在指定时间点启动 myListener1 这个监听器,另一个在指定时间点停止 myListener1 这个监听器:
@Component
public class MyKafkaSchedule {
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    //定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }

    //定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        System.out.println("关闭监听");
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)最后记得禁止这个监听器自启动(关于禁止自动启动的监听容器工厂见文章第 2 点内容):
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}
评论0