SpringBoot - Kafka的集成与使用详解10(消费者5:消息过滤器)
消息过滤器可以让消息在抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据交由 KafkaListener 处理,不需要的消息则会过滤掉。
(2)看一下监听器的消费情况,可以发现监听器只消费了偶数(奇数已经被过滤器过滤掉了):
十、消费者5:消息过滤器
1,配置消息过滤器
配置消息过滤器十分简单,只需要为监听容器工厂配置一个 RecordFilterStrategy(消息过滤策略),返回 true 的时候消息将会被抛弃,返回 false 时,消息则能正常抵达监听容器。
@Configuration public class KafkaInitialConfiguration { // 监听器工厂 @Autowired private ConsumerFactory consumerFactory; // 配置一个消息过滤策略 @Bean public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // 被过滤的消息将被丢弃 factory.setAckDiscarded(true); // 消息过滤策略(将消息转换为long类型,判断是奇数还是偶数,把所有奇数过滤,监听器只接收偶数) factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { long data = Long.parseLong((String) consumerRecord.value()); if (data % 2 == 0) { return false; } //返回true将会被丢弃 return true; } }); return factory; } }
2,使用消息过滤器
带有消息过滤策略的容器工厂注册好之后,将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样消息在抵达监听器之前,会先进行过滤。
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}, containerFactory = "myFilterContainerFactory") public void listen1(String data) { System.out.println(data); } }
3,开始测试
(1)编写测试方法,连续发送 5 条消息(消息内容分别是 0 到 4 这个 5 个数字):@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { for (int i = 0; i < 5; i++) { kafkaTemplate.send("topic3", String.valueOf(i)); } } }
(2)看一下监听器的消费情况,可以发现监听器只消费了偶数(奇数已经被过滤器过滤掉了):