SpringBoot - Kafka的集成与使用详解5(生产者3:使用事务)
Kafka 同数据库一样支持事务,当发生异常或者出现特定逻辑判断的时候可以进行回滚,确保消息监听器不会接收到一些错误的或者不需要的消息。Kafka 使用事务有两种方式,下面分别进行介绍。
五、生产者3:使用事务
1,使用 executeInTransaction 方法
(1)通常情况下,如果不声明事务,即使发送消息后面报错了,前面消息也已经发送成功:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic1", "test executeInTransaction"); throw new RuntimeException("fail"); } }
(2)我们可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务。这种方式开启事务是不需要配置事务管理器的,也可以称为本地事务。
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { // 声明事务:后面报错消息不会发出去 kafkaTemplate.executeInTransaction(operations -> { operations.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); }); } }
2,使用 @Transactional 注解方式
(1)如果要使用注解方式开启事务,首先我们需要配置 KafkaTransactionManager,这个类就是 Kafka 提供给我们的事务管理类,我们需要使用生产者工厂来创建这个事务管理类。
注意:我们需要在 producerFactory 中开启事务功能,并设置 TransactionIdPrefix,TransactionIdPrefix 是用来生成 Transactional.id 的前缀。
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.properties.linger.ms}") private int linger; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.ACKS_CONFIG, acks); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, Object> producerFactory() { DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); factory.transactionCapable(); factory.setTransactionIdPrefix("tran-"); return factory; } @Bean public KafkaTransactionManager transactionManager() { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory()); return manager; } }
(2)之后如果一个方法需要使用事务,我们只需要在该方法上添加 @Transactional 注解即可:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") @Transactional public void test() { kafkaTemplate.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); } }