当前位置: > > > SpringBoot - Kafka的集成与使用详解6(消费者1:指定topic、partition、offset)

SpringBoot - Kafka的集成与使用详解6(消费者1:指定topic、partition、offset)

六、消费者1:指定 topic、partition、offset 

1,使用 topics 指定 topic

(1)监听器主要是使用 @KafkaListenter 注解即可,而通过 topics 参数设置监听的 topic(可监听多个,用逗号隔开):
其他参数介绍id(消费者 ID)、 groupId(消费组 ID
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"topic1","topic2"})
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边都能够收到:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}

2,使用 topicPartitions 指定 topic、parition、offset

(1)topicPartitions 可配置更加详细的监听信息,比如下面代码同样是同时监听 topic1 topic2,不同在于这次:
  • 监听 topic1 0 号分区
  • 监听 topic2 0 号和 1 号分区(其中 1 号分区的初始偏移量为 100
注意topics topicPartitions 不能同时使用。
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边只会收到指定的消息:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}
评论0