SpringBoot - Kafka的集成与使用详解9(消费者4:异常处理器)
通常来说 KafkaListener 要做的事只是监听 Topic 中的数据并消费,如果在 KafkaListener 中还需要对异常进行 try catch 捕获并处理的话,则会显得代码块非常臃肿不利于维护。
(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:
(2)可以看到异常处理器已经能正常使用了:
好在 spring-kafka 为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理 consumer 在消费时发生的异常。
九、消费者4:异常处理器
1,注册异常处理器
(1)注册一个异常处理器就是新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,使用 @Bean 注入(BeanName 默认就是方法名):
@Configuration public class KafkaInitialConfiguration { //异常处理器 @Bean public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { System.out.println("--- 发生消费异常 ---"); System.out.println(message.getPayload()); System.out.println(exception); return null; } }; } }
(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:
@Configuration public class KafkaInitialConfiguration { //异常处理器 @Bean public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() { return (message, exception, consumer) -> { System.out.println("--- 发生消费异常 ---"); System.out.println(message.getPayload()); System.out.println(exception); return null; }; } }
2,使用异常处理器
异常处理器注册好之后,将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}, errorHandler = "myConsumerAwareErrorHandler") public void listen1(String data) throws Exception { throw new Exception("模拟一个异常"); } }
3,开始测试
(1)编写测试方法,发送一条消息:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic3", "hangge.com"); } }
(2)可以看到异常处理器已经能正常使用了:
附:批量消费异常处理器
批量消费异常处理器和之前单消息消费异常处理器差不多,上面异常处理器代码可以完全不用改动直接使用,只不过传递过来的数据都是 List 集合方式: