SpringBoot - Kafka的集成与使用详解6(消费者1:指定topic、partition、offset)
六、消费者1:指定 topic、partition、offset
1,使用 topics 指定 topic
(1)监听器主要是使用 @KafkaListenter 注解即可,而通过 topics 参数设置监听的 topic(可监听多个,用逗号隔开):
其他参数介绍:id(消费者 ID)、 groupId(消费组 ID)
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"topic1","topic2"}) public void listen1(String data) { System.out.println(data); } }
(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边都能够收到:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic1", 0, "key1", "message1"); kafkaTemplate.send("topic1", 1, "key2", "message2"); kafkaTemplate.send("topic2", 0, "key3", "message3"); kafkaTemplate.send("topic2", 1, "key4", "message4"); } }
2,使用 topicPartitions 指定 topic、parition、offset
(1)topicPartitions 可配置更加详细的监听信息,比如下面代码同样是同时监听 topic1 和 topic2,不同在于这次:- 监听 topic1 的 0 号分区
- 监听 topic2 的 0 号和 1 号分区(其中 1 号分区的初始偏移量为 100)
注意:topics 和 topicPartitions 不能同时使用。
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(id = "consumer1",groupId = "my-group1",topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen1(String data) { System.out.println(data); } }
(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边只会收到指定的消息:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic1", 0, "key1", "message1"); kafkaTemplate.send("topic1", 1, "key2", "message2"); kafkaTemplate.send("topic2", 0, "key3", "message3"); kafkaTemplate.send("topic2", 1, "key4", "message4"); } }