当前位置: > > > SpringBoot - Kafka的集成与使用详解5(生产者3:使用事务)

SpringBoot - Kafka的集成与使用详解5(生产者3:使用事务)

    Kafka 同数据库一样支持事务,当发生异常或者出现特定逻辑判断的时候可以进行回滚,确保消息监听器不会接收到一些错误的或者不需要的消息。Kafka 使用事务有两种方式,下面分别进行介绍。

五、生产者3:使用事务

1,使用 executeInTransaction 方法

(1)通常情况下,如果不声明事务,即使发送消息后面报错了,前面消息也已经发送成功:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", "test executeInTransaction");
        throw new RuntimeException("fail");
    }
}

(2)我们可以使用 KafkaTemplateexecuteInTransaction 方法来声明事务。这种方式开启事务是不需要配置事务管理器的,也可以称为本地事务。
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        // 声明事务:后面报错消息不会发出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
    }
}

2,使用 @Transactional 注解方式

(1)如果要使用注解方式开启事务,首先我们需要配置 KafkaTransactionManager,这个类就是 Kafka 提供给我们的事务管理类,我们需要使用生产者工厂来创建这个事务管理类。
注意:我们需要在 producerFactory 中开启事务功能,并设置 TransactionIdPrefixTransactionIdPrefix 是用来生成 Transactional.id 的前缀。
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    @Value("${spring.kafka.producer.properties.linger.ms}")
    private int linger;
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        factory.transactionCapable();
        factory.setTransactionIdPrefix("tran-");
        return factory;
    }

    @Bean
    public KafkaTransactionManager transactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        return manager;
    }
}

(2)之后如果一个方法需要使用事务,我们只需要在该方法上添加 @Transactional 注解即可:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    @Transactional
    public void test() {
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    }
}
评论0