当前位置: > > > SpringBoot - 动态创建多个Kafka消费分组、多个KafkaTemplate教程

SpringBoot - 动态创建多个Kafka消费分组、多个KafkaTemplate教程

    有时我们程序中的 Kafka 消费者或生产者并不是一开始就定好的,而是需要在程序运行过程中根据情况(比如从数据库读取配置)动态地创建多个消费者、消费者分组(consumer group)进行数据消费,或者动态创建多个生产者(KafkaTemplate)往不同的目标地址生产数据。下面通过样例演示这些功能如何实现。

一、动态创建消费者分组

1,创建监听类

    首先我创建一个监听类 MyListener,里面定义一个监听方法并添加 @KafkaListener 注解,方法内容将收到的消息以及当前的 group id 打印出来。
注意:监听类 MyListener 不需要的增加 @Component 注解,因为接下来我们需要动态地进行创建。
public class MyListener {
  @KafkaListener(topics = "test_topic")
  public void listen(@Payload String data, @Header(KafkaHeaders.GROUP_ID) String groupId) {
    System.out.println(groupId + ":" +data);
  }
}

2,动态创建消费分组

(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /create 接口时,会动态创建三个消费者分组:
注意MyListener 这个 Bean 需要添加 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 注解,这样每次请求它的实例,spring 会给返回一个新的实例。
@RestController
@RequestMapping("consumer")
public class ConsumerController {

  /**
   * 应用程序上行文
   */
  @Autowired
  ApplicationContext context;


  /**
   * 监听器容器工厂
   */
  @Autowired
  ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;


  /**
   * 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中
   */
  @Autowired
  KafkaListenerEndpointRegistry registry;

  /**
   * 创建消费者分组
   */
  @GetMapping("/create")
  public void create() {
    //动态创建三个消费者分组
    String[] groupIds = {"group-0", "group-1", "group-2"};

    for (String groupId : groupIds) {
      // 初始化当前消费者分组的配置
      Map<String, Object> consumerProps = new HashMap<>();
      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.60.4:9092");
      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringDeserializer");
      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringDeserializer");
      // 设置监听器容器工厂
      containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
      // 获取监听类实例
      context.getBean(MyListener.class);
    }
  }

  /**
   * 停止所有消费监听
   */
  @GetMapping("/stop")
  public void stop() {
    registry.getListenerContainers().forEach(container -> {
      //System.out.println(container.getGroupId());
      //System.out.println(container.getListenerId());
      container.stop();
    });
  }

  /**
   * 获取监听类实例
   */
  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  public MyListener listener() {
    return new MyListener();
  }
}

  • 由于本样例中三个消费者配置只有 group id 不一样,其他都是一样的,我们也可以把公共配置写在 application.properties 配置文件中:
spring.kafka.bootstrap-servers=192.168.60.4:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

  • 然后加载这个默认配置,再覆盖设置 group id 即可:
@RestController
@RequestMapping("consumer")
public class ConsumerController {

  /**
   * 应用程序上行文
   */
  @Autowired
  ApplicationContext context;

  /**
   * 消费者工厂
   */
  @Autowired
  ConsumerFactory<Object, Object> cf;

  /**
   * 监听器容器工厂
   */
  @Autowired
  ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;


  /**
   * 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中
   */
  @Autowired
  KafkaListenerEndpointRegistry registry;

  /**
   * 创建消费者分组
   */
  @GetMapping("/create")
  public void create() {
    //动态创建三个消费者分组
    String[] groupIds = {"group-0", "group-1", "group-2"};

    for (String groupId : groupIds) {
      // 初始化当前消费者分组的配置
      Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      // 设置监听器容器工厂
      containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
      // 获取监听类实例
      context.getBean(MyListener.class);
    }
  }

  /**
   * 停止所有消费监听
   */
  @GetMapping("/stop")
  public void stop() {
    registry.getListenerContainers().forEach(container -> {
      //System.out.println(container.getGroupId());
      //System.out.println(container.getListenerId());
      container.stop();
    });
  }

  /**
   * 获取监听类实例
   */
  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  public MyListener listener() {
    return new MyListener();
  }
}

3,运行测试

(1)程序启动后,首先访问 /consumer/create 接口初始化三个消费者分组:

(2)接着往 test 这个 topic 发送一条消息,可以看到三个消费者分组实例都收到了数据:

二、动态创建多个 KafkaTemplate

(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /send 接口时,会动态创建三个 KafkaTemplate,并发送消息: 
@RestController
@RequestMapping("producer")
public class ProducerController {

  @GetMapping("/send")
  public void send() {

    // 动态创建三个生产者
    List<KafkaTemplate> kafkaTemplates = new ArrayList<>();
    String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"};
    for (String server : servers) {
      Map<String, Object> producerProps = new HashMap<>();
      producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
      producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer");
      producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer");
      DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProps);
      KafkaTemplate kafkaTemplate = new KafkaTemplate(pf, true);
      kafkaTemplates.add(kafkaTemplate);
    }

    //遍历三个生产者发送消息
    for (KafkaTemplate kafkaTemplate:kafkaTemplates) {
      kafkaTemplate.send("test", "welcome to hangge.com");
    }
  }
}

  • 由于本样例中三个生产者配置只有服务器地址(bootstrap-servers)不一样,其他都是一样的,我们可以把公共配置写在 application.properties 配置文件中:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

  • 然后加载这个默认配置,再覆盖设置 bootstrap-servers 即可:
@RestController
@RequestMapping("producer")
public class ProducerController {

  /**
   * 生产者工厂
   */
  @Autowired
  ProducerFactory<Object, Object> pf;

  @GetMapping("/send")
  public void send() {

    // 动态创建三个生产者
    List<KafkaTemplate> kafkaTemplates = new ArrayList<>();
    String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"};
    for (String server : servers) {
      Map<String, Object> producerProps = new HashMap<>(pf.getConfigurationProperties());
      producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
      DefaultKafkaProducerFactory dpf = new DefaultKafkaProducerFactory<>(producerProps);
      KafkaTemplate kafkaTemplate = new KafkaTemplate(dpf, true);
      kafkaTemplates.add(kafkaTemplate);
    }

    //遍历三个生产者发送消息
    for (KafkaTemplate kafkaTemplate:kafkaTemplates) {
      kafkaTemplate.send("test", "welcome to hangge.com");
    }
  }
}
评论0