SpringBoot - 消息服务AMQP(RabbitMQ)的整合与使用详解(附样例)
一、基本概念介绍
1,什么是消息队列?
- 消息队列(Message Queue)是一种进程间或者线程间的异步通信方式。
- 使用消息队列,消息生产者在产生消息后,会将消息保存在消息队列中,直到消息消费者来取走它,即消息的发送者和接收者不需要同时与消息队列交互。
- 使用消息队列可以有效实现服务的解耦,并提高系统的可靠性以及可扩展性。
- 目前,开源的消息队列服务非常多,如 Apache ActiveMQ、RabbitMQ 等,这些产品也就是常说的消息中间件。
2,什么是 AMQP?
- AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个线路层的协议规范,而不是 APl 规范(例如 JMS)。
- 由于 AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像 SMTP、HTTP 等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过 AMQP 进行消息交互。
- 像目前流行的 StormMQ、RabbitMQ 等都实现了 AMQP。
3,什么是 RabbitMQ?
- RabbitMQ 是一个实现了 AMQP 的开源消息中间件,使用高性能的 Erlang 编写。
- RabbitMQ 具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
- 在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的消息策略将消息分发到不同的 Queue 中。
- RabbitMQ 一共提供 4 中不同的 Exchange 策略,分别是 Direct、Fanout、Topic 以及 Header,这 4 种策略中,前3种的使用频率较高,第 4 种的使用频率较低。
二、RabbitMQ 的整合配置
1,添加依赖
首先编辑项目的 pom.xml 文件,添加 AMQP 自动化配置依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2,配置 RebbitMQ
接下来在 application.properties 中配置 RebbitMQ 的连接信息:spring.rabbitmq.host=192.168.60.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=hangge
spring.rabbitmq.password=123
三、使用样例1(Direnct 策略)
1,Direnct 策略介绍
(1)DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上。
例如:消息队列名为“hello-queue”,则 routingKey 为“hello-queue”的消息就会被该消息队列接收。
(2)简单来说,Direct 交换机就是完全根据 Key 进行投递。2,样例代码
(1)首先进行路由配置:
注意:如果使用 DirectExchange,可以只配置一个 Queue 的实例即可。后面 DirectExchange 和 Binding 的配置可以省略调。
@Configuration public class RabbitDirectConfig { // 提供一个消息队列 Queue @Bean Queue queue() { return new Queue("hello-queue"); } // 提供一个 DirectExchange @Bean DirectExchange directExchange() { // 三个参数分别是名字、重启后是否依然有效、长期未使用时是否删除 return new DirectExchange("hangge-direct", true, false); } // 创建一个Binding对象,将Exchange和Queue绑定在一起 @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()).with("direct"); } }
(2)接下来配置一个消费者:
@Component public class DirectReceiver { // @RabbitListener 注解的方法是一个消息消费方法,方法参数就是所接收到的消息。 @RabbitListener(queues = "hello-queue") public void handler1(String msg) { System.out.println("DirectReceiver:" + msg); } }
(3)最后创建一个 Controller,并注入一个 RabbitTemplate 对象来进行消息发送:
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { rabbitTemplate.convertAndSend("hello-queue", "hello direct!"); } }
(4)项目启动,访问 http://localhost:8080/hello 地址发送一条消息,可以看到控制台这边成功接收到消息并打印出来:

四、使用样例2(Fanout 策略)
1,Fanout 策略介绍
(1)FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue,在这种策略中,routingkey 将不起任何作用。
(2)简单来说,Fanout 交换机不需要任何 Key,它采取广播的模式,一个消息进来时,投递到与该交换机绑定的所有队列。
2,样例代码
(1)首先进行路由配置,这次我们创建两个 Queue(使用不同的 routingkey),但它们都绑定到同一个 FanoutExchange 上:
@Configuration public class RabbitFanoutConfig { public final static String FANOUTNAME = "hangge-fanout"; // 提供一个 FanoutExchange @Bean FanoutExchange fanoutExchange() { // 三个参数分别是名字、重启后是否依然有效、长期未使用时是否删除 return new FanoutExchange(FANOUTNAME, true, false); } // 提供两个消息队列 Queue @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } // 将这两个 Queue 都绑定到 FanoutExchange 上 @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
(2)接下来创建两个消费者,它们分别消费两个消息队列中的消息:
@Component public class FanoutReceiver { @RabbitListener(queues = "queue-one") public void handler1(String message) { System.out.println("FanoutReceiver:handler1:" + message); } @RabbitListener(queues = "queue-two") public void handler2(String message) { System.out.println("FanoutReceiver:handler2:" + message); } }
(3)最后创建一个 Controller,并注入一个 RabbitTemplate 对象来进行消息发送:
注意:这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 直接传一个 null。
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "hello fanout!"); } }
(4)项目启动,访问 http://localhost:8080/hello 地址发送一条消息,可以看到所有和这个 FanoutExchange 绑定的 Queue 都收到了消息。

五、使用样例3(Topic 策略)
1,Topic 策略介绍
(1)TopicExchange 是比较复杂也比较灵活的一种路由策略:- 在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上。
- 当消息到达 TopicExchange 后,TopicExchange 根据消息的 rountingkey 将消息路由到一个或者多个 Queue 上。
2,样例代码
(1)首先进行路由配置,这次我们创建三个 Queue,并绑定到同一个 TopicExchange 上(使用不同的规则):@Configuration public class RabbitTopicConfig { public final static String TOPICNAME = "hangge-topic"; // 提供一个 TopicExchange @Bean TopicExchange topicExchange() { // 三个参数分别是名字、重启后是否依然有效、长期未使用时是否删除 return new TopicExchange(TOPICNAME, true, false); } // 提供三个消息队列 Queue @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } // 将三个 Queue 都绑定到 TopicExchange 上 @Bean Binding xiaomiBinding() { // 凡是 routingkey 以 "xiaomi" 开头的消息,都被路由到 xiaomi 这个 Queue 上 return BindingBuilder.bind(xiaomi()).to(topicExchange()) .with("xiaomi.#"); } @Bean Binding huaweiBinding() { // 凡是 routingkey 以 "huawei" 开头的消息,都被路由到 huiwei 这个 Queue 上 return BindingBuilder.bind(huawei()).to(topicExchange()) .with("huawei.#"); } @Bean Binding phoneBinding() { // 凡是 routingkey 中包含 "phone" 的消息,都被路由到 phone 这个 Queue 上 return BindingBuilder.bind(phone()).to(topicExchange()) .with("#.phone.#"); } }
(2)接下来针对三个 Queue 创建三个消费者:
@Component public class TopicReceiver { @RabbitListener(queues = "phone") public void handler1(String message) { System.out.println("PhoneReceiver:"+ message); } @RabbitListener(queues = "xiaomi") public void handler2(String message) { System.out.println("XiaoMiReceiver:"+message); } @RabbitListener(queues = "huawei") public void handler3(String message) { System.out.println("HuaWeiReceiver:"+message); } }
(3)最后创建一个 Controller,并注入一个 RabbitTemplate 对象来进行消息发送:
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news", "(1)小米新闻.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.phone", "(2)小米手机.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "phone.news", "(3)手机新闻.."); } }
(4)项目启动,访问 http://localhost:8080/hello 地址发送 3 条消息,可以看到:
- 第一条消息被路由到名为“xiaomi”的 Queue 上。
- 第二条消息被路由到名为“xiaomi”以及名为“phone”的 Queue 上。
- 第三条消息被路由到名为“phone”的 Queue 上。

六、使用样例4(Header 策略)
1,Header 策略介绍
2,样例代码
(1)首先进行路由配置,这次我们创建两个 Queue,并绑定到同一个 HeadersExchange 上(使用不同的 Header 匹配规则):注意:
除了下面用到的 whereAny 和 where 这两个匹配方法,还有个 whereAll 方法,该方法表示消息的所有的 Header 都要匹配。
@Configuration public class RabbitHeaderConfig { public final static String HEADERNAME = "hangge-header"; // 提供一个 HeadersExchange @Bean HeadersExchange headersExchange() { // 三个参数分别是名字、重启后是否依然有效、长期未使用时是否删除 return new HeadersExchange(HEADERNAME, true, false); } // 提供两个消息队列 Queue @Bean Queue queueName() { return new Queue("name-queue"); } @Bean Queue queueAge() { return new Queue("age-queue"); } // 将两个 Queue 都绑定到 HeadersExchange 上 @Bean Binding bindingName() { Map<String, Object> map = new HashMap<>(); map.put("name", "hangge"); return BindingBuilder.bind(queueName()) .to(headersExchange()) .whereAny(map) // 消息只要有一个Header匹配上map中的key/value,便路由到这个Queue上 .match(); } @Bean Binding bindingAge() { return BindingBuilder.bind(queueAge()) .to(headersExchange()) .where("age") //只要消息Header中包含age(无论值多少),便路由到这个Queue上 .exists(); } }
(2)接下来创建两个消息消费者。注意:这里的参数用 byte 数组接收:
@Component public class HeaderReceiver { @RabbitListener(queues = "name-queue") public void handler1(byte[] msg) { System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg) { System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length)); } }
(3)最后创建一个 Controller,并注入一个 RabbitTemplate 对象发送两条消息:
@RestController public class HelloController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/hello") public void hello() { Message nameMsg = MessageBuilder .withBody("hello header! name-queue".getBytes()) .setHeader("name", "hangge").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); Message ageMsg = MessageBuilder .withBody("hello header! age-queue".getBytes()) .setHeader("age", "99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); } }
(4)项目启动,访问 http://localhost:8080/hello 地址发送 2 条消息,可以看到不同 header 的消息被不同的 Queue 中:
