SpringBoot - 动态创建多个Kafka消费分组、多个KafkaTemplate教程
有时我们程序中的 Kafka 消费者或生产者并不是一开始就定好的,而是需要在程序运行过程中根据情况(比如从数据库读取配置)动态地创建多个消费者、消费者分组(consumer group)进行数据消费,或者动态创建多个生产者(KafkaTemplate)往不同的目标地址生产数据。下面通过样例演示这些功能如何实现。
一、动态创建消费者分组
1,创建监听类
首先我创建一个监听类 MyListener,里面定义一个监听方法并添加 @KafkaListener 注解,方法内容将收到的消息以及当前的 group id 打印出来。
注意:监听类 MyListener 不需要的增加 @Component 注解,因为接下来我们需要动态地进行创建。
public class MyListener { @KafkaListener(topics = "test_topic") public void listen(@Payload String data, @Header(KafkaHeaders.GROUP_ID) String groupId) { System.out.println(groupId + ":" +data); } }
2,动态创建消费分组
(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /create 接口时,会动态创建三个消费者分组:
注意:MyListener 这个 Bean 需要添加 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 注解,这样每次请求它的实例,spring 会给返回一个新的实例。
@RestController @RequestMapping("consumer") public class ConsumerController { /** * 应用程序上行文 */ @Autowired ApplicationContext context; /** * 监听器容器工厂 */ @Autowired ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory; /** * 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中 */ @Autowired KafkaListenerEndpointRegistry registry; /** * 创建消费者分组 */ @GetMapping("/create") public void create() { //动态创建三个消费者分组 String[] groupIds = {"group-0", "group-1", "group-2"}; for (String groupId : groupIds) { // 初始化当前消费者分组的配置 Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.60.4:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 设置监听器容器工厂 containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps)); // 获取监听类实例 context.getBean(MyListener.class); } } /** * 停止所有消费监听 */ @GetMapping("/stop") public void stop() { registry.getListenerContainers().forEach(container -> { //System.out.println(container.getGroupId()); //System.out.println(container.getListenerId()); container.stop(); }); } /** * 获取监听类实例 */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public MyListener listener() { return new MyListener(); } }
- 由于本样例中三个消费者配置只有 group id 不一样,其他都是一样的,我们也可以把公共配置写在 application.properties 配置文件中:
spring.kafka.bootstrap-servers=192.168.60.4:9092 spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 然后加载这个默认配置,再覆盖设置 group id 即可:
@RestController @RequestMapping("consumer") public class ConsumerController { /** * 应用程序上行文 */ @Autowired ApplicationContext context; /** * 消费者工厂 */ @Autowired ConsumerFactory<Object, Object> cf; /** * 监听器容器工厂 */ @Autowired ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory; /** * 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中 */ @Autowired KafkaListenerEndpointRegistry registry; /** * 创建消费者分组 */ @GetMapping("/create") public void create() { //动态创建三个消费者分组 String[] groupIds = {"group-0", "group-1", "group-2"}; for (String groupId : groupIds) { // 初始化当前消费者分组的配置 Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 设置监听器容器工厂 containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps)); // 获取监听类实例 context.getBean(MyListener.class); } } /** * 停止所有消费监听 */ @GetMapping("/stop") public void stop() { registry.getListenerContainers().forEach(container -> { //System.out.println(container.getGroupId()); //System.out.println(container.getListenerId()); container.stop(); }); } /** * 获取监听类实例 */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public MyListener listener() { return new MyListener(); } }
3,运行测试
(1)程序启动后,首先访问 /consumer/create 接口初始化三个消费者分组:(2)接着往 test 这个 topic 发送一条消息,可以看到三个消费者分组实例都收到了数据:
二、动态创建多个 KafkaTemplate
(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /send 接口时,会动态创建三个 KafkaTemplate,并发送消息:
@RestController @RequestMapping("producer") public class ProducerController { @GetMapping("/send") public void send() { // 动态创建三个生产者 List<KafkaTemplate> kafkaTemplates = new ArrayList<>(); String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"}; for (String server : servers) { Map<String, Object> producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProps); KafkaTemplate kafkaTemplate = new KafkaTemplate(pf, true); kafkaTemplates.add(kafkaTemplate); } //遍历三个生产者发送消息 for (KafkaTemplate kafkaTemplate:kafkaTemplates) { kafkaTemplate.send("test", "welcome to hangge.com"); } } }
- 由于本样例中三个生产者配置只有服务器地址(bootstrap-servers)不一样,其他都是一样的,我们可以把公共配置写在 application.properties 配置文件中:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- 然后加载这个默认配置,再覆盖设置 bootstrap-servers 即可:
@RestController @RequestMapping("producer") public class ProducerController { /** * 生产者工厂 */ @Autowired ProducerFactory<Object, Object> pf; @GetMapping("/send") public void send() { // 动态创建三个生产者 List<KafkaTemplate> kafkaTemplates = new ArrayList<>(); String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"}; for (String server : servers) { Map<String, Object> producerProps = new HashMap<>(pf.getConfigurationProperties()); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); DefaultKafkaProducerFactory dpf = new DefaultKafkaProducerFactory<>(producerProps); KafkaTemplate kafkaTemplate = new KafkaTemplate(dpf, true); kafkaTemplates.add(kafkaTemplate); } //遍历三个生产者发送消息 for (KafkaTemplate kafkaTemplate:kafkaTemplates) { kafkaTemplate.send("test", "welcome to hangge.com"); } } }