消息驱动微服务框架Spring Cloud Stream使用详解2(基本用法)
本文通过一个简单的样例演示如何通过 Spring Cloud Stream 实现消息的发送和接收。Spring Cloud Stream 默认提供了三个通道接口:Sink、Source 和 Processor,如果当前应用最多只需一个输入通道、一个输出通道,那么直接使用默认通道接口即可,无需再自定义通道接口。
二、基本用法
1,添加依赖
(1)创建一个基础的 Spring Boot 工程,编辑 pom.xml 文件,引入 Spring Cloud Stream 对 RabbitMQ 的支持:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
(2)在 application.properties 中配置 RebbitMQ 的连接信息:
spring.rabbitmq.host=192.168.60.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=hangge spring.rabbitmq.password=123
2,创建消息的消费者
(1)我们创建一个用于接收来自 RabbitMQ 消息的消费者 SinkReceiver:
(1)@EnableBinding 注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定:
- @EnableBinding(Sink.class) 表示绑定了 Sink 接口。
- @StreamListener(Sink.INPUT) 表示将 receive 方法注册为 input 消息通道的监听处理器,当接收到消息时,该方法机会作出对应的响应动作。
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void reveive(Object payload) { System.out.println("收到:" + payload); } }
(1)三个通道接口说明:
- Sink 和 Source 中分别通过 @Input 和 @Output 注解定义了输入通道和输出通道。
- Processor 通过继承 Source 和 Sink 的方式同时定义了一个输入通道和一个输出通道。
- 因为指定具体的值,Sink 和 Source 的消息通道名称分别为 input 和 output
public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); } public interface Processor extends Source, Sink { }
(3)启动程序,我们可以在 RabbitMQ 控制台的 Exchanges 中看到增加了一个名为 input 的交换器:
(2)我们还可以注入消息通道对象来使用,比如下面我们注入 Source 接口中定义的名为 output 的消息输出通道,代码效果同上面是一样的:
(4)同时在 Queues 中有一个前缀为 input.anonymous. 的对列:
(5)我们可以进入 input 交换器的管理页面,通过 Publish message 功能来发送一条(由于该交换器与 input.anonymous.xxxx 队列已经绑定,所以消息会转发到对应队列上):
(6)而在应用程序的控制台这边可以看到如下内容,说明消费者已经成功接收到消息队列中消息对象。
3,创建消息的生产者
(1)我们创建一个 Controller 接口,在里面注入默认的 Source 输出通道接口实例,并调用它的发送消息方法。
@RestController public class HelloController { @Autowired private Source source; @GetMapping("/test") public void test(){ // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload("welcome to hangge.com") .build(); // 发送Message对象 source.output().send(message); } }
注意:很多时候我们在一个微服务应用中可能会创建多个不同名的 MessageChannel 实例,这样通过 @Autowired 注入时,要注意参数命名需要与通道同名才能被正确注入,或者也可以使用 @Qualifier 注解来特别指定具体实例的名称,该名称需要与定义 MessageChannel 的 @Output 中的 value 参数一致,这样才能被正确注入。
@RestController public class HelloController { @Autowired private MessageChannel output; @GetMapping("/test") public void test(){ // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload("welcome to hangge.com") .build(); // 发送Message对象 output.send(message); } }
(3)然后对之前的消费者 SinkReceiver 做一些修改,在 @EnableBinding 注解中增加对 SinkSender 接口的指定,使得 Spring Cloud Stream 能创建出对应的实例。
我们知道 Processor 接口是继承 Sink 和 Processor 接口的,因此 @EnableBinding 注解直接指定 Processor 接口也是可以的:
- @EnableBinding(Processor.class)
@EnableBinding({Sink.class, Source.class}) public class SinkReceiver { @StreamListener(Sink.INPUT) public void reveive(Object payload) { System.out.println("收到:" + payload); } }
(4)启动应用调用 /test 接口发送消息,会发现消费者并没有接收到消息。这是因为 Sink 和 Source 的消息通道名称分别为 input 和 output,如果没有指定主题的话,那么主题即为通道名称。在 RabbitMQ 这边则对应的是 Exchange 名称:
(5)我们在 application.properties 文件中将这两个通道都设置成同一个主题,比如:publish
spring.cloud.stream.bindings.input.destination=publish spring.cloud.stream.bindings.output.destination=publish
(6)再次启动应用,发现 RabbitMQ 这边多了个名为 publish 的 Exchange 交换器:
(7)再次调用应用的 /test 接口发送消息,会发现消费者已经能成功接收到消息了。
附:改用 Kafka 作为消息中间件
上面的样例我们都是使用 RabbitMQ 这个消息中间件,如果想替换成 Kafka 十分简单。由于 Spring Cloud Stream 的特性,我们只需修改一些配置,代码完全不需要修改。
(1)首先编辑 pom.xml 文件,增加引入 Spring Cloud Stream 对 Kafka 的支持:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
(2)然后在 application.properties 中配置 Kafka 的连接信息即可:
spring.cloud.stream.kafka.binder.brokers=192.168.60.133:9092
(3)启动后我们通过命令查看 Kafka 中的 Topic,可以发现里面多个了名为 publish 的 Topic:
(4)调用 /test 接口发送消息,会发现消费者也能成功接收到消息了。