SpringBoot - Kafka的集成与使用详解8(消费者3:并发、批量消费)
由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。
(2)接着对消费者监听这边代码稍作修改,改成使用 List 来接收:
(3)我们一次性发送的 23 条数据测试一下:
(4)控制台输出内容如下:
(2)配置完毕后,消费者监听这边不需要修改:
(3)上面我们设置 concurrency 为 3,也就是将会启动 3 条线程进行监听。而由于我们创建的 topic 有 4 个 partition(分区),意味着将有 2 条线程都是分配到 1 个 partition,还有 1 条线程分配到 2 个 partition。我们可以通过日志看到每条线程分配到的 partition。
八、消费者3:并发、批量消费
1,批量消费
(1)首先我们在项目 application.properties 文件中添加如下配置,一个设置启用批量消费,一个设置批量消费每次最多消费多少条消息记录。
注意:这里设置 max-poll-records 是 5,并不是说如果没有达到 5 条消息,我们就一直等待。而是说一次 poll 最多返回的记录数为 5。
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=5
(2)接着对消费者监听这边代码稍作修改,改成使用 List 来接收:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen1(List<String> data) { System.out.println("收到"+ data.size() + "条消息:"); System.out.println(data); } }
- 如果使用 ConsumerRecord 类接收,则也是使用 List 来接收:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen1(List<ConsumerRecord<String, Object>> records) { System.out.println("收到"+ records.size() + "条消息:"); System.out.println(records); } }
- 如果使用注解方式获取消息头、消息体,则也是使用 List 来接收:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen2(@Payload List<String> data, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) { System.out.println("收到"+ data.size() + "条消息:"); System.out.println(data); System.out.println(topics); System.out.println(partitions); System.out.println(keys); System.out.println(tss); } }
(3)我们一次性发送的 23 条数据测试一下:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { for (int i = 0; i < 23; i++) { kafkaTemplate.send("topic3", "message-" + i); } } }
(4)控制台输出内容如下:
2,并发消费
(1)为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3:注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。
# 并发数设为3
spring.kafka.listener.concurrency=3
(2)配置完毕后,消费者监听这边不需要修改:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen1(String data) { System.out.println(data); } }
- 并发消费和批量消费可以结合同时使用的,消费者监听使用 List 来接收:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen1(List<String> data) { System.out.println("收到"+ data.size() + "条消息:"); System.out.println(data); } }
(3)上面我们设置 concurrency 为 3,也就是将会启动 3 条线程进行监听。而由于我们创建的 topic 有 4 个 partition(分区),意味着将有 2 条线程都是分配到 1 个 partition,还有 1 条线程分配到 2 个 partition。我们可以通过日志看到每条线程分配到的 partition。