当前位置: > > > 消息驱动微服务框架Spring Cloud Stream使用详解3(自定义通道接口、消息反馈)

消息驱动微服务框架Spring Cloud Stream使用详解3(自定义通道接口、消息反馈)

    在前文中我们使用 Spring Cloud Stream 自带的 Sink Source 通道接口来进行消息的接收和发送。但如果业务复杂,即一个应用中需有多个输入通道,或者多个输出通道(对应不同主题),那么就需要我们自定义接口来实现。

三、自定义消息通道接口

1,接口定义

首先我们自定义一个消息通道接口 MyProcessor,里面包含两个输出通道,以及两个输入通道:
public interface MyProcessor {

    String MESSAGE_OUTPUT = "message_output";

    String MESSAGE_INPUT = "message_input";

    String LOG_OUTPUT = "log_output";

    String LOG_INPUT = "log_input";

    @Output(MESSAGE_OUTPUT)
    MessageChannel messageOutput();

    @Input(MESSAGE_INPUT)
    SubscribableChannel messageInput();

    @Output(LOG_OUTPUT)
    MessageChannel logOutput();

    @Input(LOG_INPUT)
    SubscribableChannel logInput();
}

2,设置主题

    编辑项目的 applicaiton.properties 文件,添加如下配置,让 message_output message_inputlog_output log_input 通道的主题两两相同(即指向同一个 Exchange 交换器)
spring.cloud.stream.bindings.message_output.destination=hangge.message
spring.cloud.stream.bindings.message_input.destination=hangge.message
spring.cloud.stream.bindings.log_output.destination=hangge.log
spring.cloud.stream.bindings.log_input.destination=hangge.log

3,创建消息消费者

    在该服务中我们分别监听并处理 message_input 和 log_input 这两个通道的消息。同时我们还通过 @SendToprocessMessage 这个处理方法返回的内容以消息的方式发送到 log_output 通道中。
@EnableBinding(MyProcessor.class)
public class MyProcessorReceiver {
    /**
     * 通过 MyProcessor.MESSAGE_INPUT 接收消息
     * 然后通过 @SendTo 将处理后的消息发送到 MyProcessor.LOG_OUTPUT
     */
    @StreamListener(MyProcessor.MESSAGE_INPUT)
    @SendTo(MyProcessor.LOG_OUTPUT)
    public String processMessage(String message) {
        System.out.println("接收到消息:" + message);
        DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy年MM月dd日hh:mm:ss");
        String now = LocalDateTime.now().format(formatter2);
        return now + "接收到1条消息";
    }

    /**
     * 接收来自 MyProcessor.LOG_INPUT 的消息
     * 也就是通过上面的 @SendTo 发送来的日志消息,
     * 因为 MyProcessor.LOG_OUTPUT 和 MyProcessor.LOG_INPUT 是指向同一 exchange
     */
    @StreamListener(MyProcessor.LOG_INPUT)
    public void processLog(String message) {
        System.out.println("接收到日志:" + message);
    }
}

4,创建消息生产者

最后创建一个 Controller 用来发送消息:
@RestController
public class HelloController {

    @Autowired
    private MyProcessor myProcessor;

    @GetMapping("/test")
    public void test(){
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                .withPayload("welcome to hangge.com")
                .build();
        // 发送Message对象
        myProcessor.messageOutput().send(message);
    }
}

5,运行测试

(1)启动应用,从 RabbitMQ 控制台中可以看到增加了两个分别名叫 hangge.log hangge.messageexchange 交换器:

(2)访问 /test 接口发送消息,可以看到控制台输出如下消息,说明消息的生产和消费都成功了。整个流程就是:
  • 原始消息发送到名为 hangge.messagesexchange
  • 消费者从名为 hangge.messagesexchange 接收原始消息,然后生成日志消息发送到 hangge.logexchange
  • 消费者从名为 hangge.logexchange 接收日志消息
评论0