SpringBoot - Kafka的集成与使用详解11(消费者6:消息转发)
在实际开发中,我们常常需要使用转发功能实现业务解耦。比如:应用 A 从 topic1 获取到消息,经过处理后转发到 topic2。应用 B 监听 topic2 获取消息再次进行处理。
(2)编写测试方法,发送 1 条消息:
(3)控制台输出如下,可以看到消息成功处理并转发了:
十一、消费者6:消息转发
(1)Spring-Kafka 只需要通过一个 @SendTo 注解即可以实现消息的转发,被注解方法的 return 值即转发的消息内容:
@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic1"}) @SendTo("topic2") public String listen1(String data) { System.out.println("业务A收到消息:" + data); return data + "(已处理)"; } // 消费监听 @KafkaListener(topics = {"topic2"}) public void listen2(String data) { System.out.println("业务B收到消息:" + data); } }
(2)编写测试方法,发送 1 条消息:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic1", "1条测试消息"); } }
(3)控制台输出如下,可以看到消息成功处理并转发了: