SpringBoot - 消息服务JMS(ActiveMQ)的整合与使用详解(附样例)
一、基本概念介绍
1,什么是消息队列?
- 消息队列(Message Queue)是一种进程间或者线程间的异步通信方式。
- 使用消息队列,消息生产者在产生消息后,会将消息保存在消息队列中,直到消息消费者来取走它,即消息的发送者和接收者不需要同时与消息队列交互。
- 使用消息队列可以有效实现服务的解耦,并提高系统的可靠性以及可扩展性。
- 目前,开源的消息队列服务非常多,如 Apache ActiveMQ、RabbitMQ 等,这些产品也就是常说的消息中间件。
2,什么是 JMS?
- JMS(Java Message Service)即 Java 消息服务,它通过统一 JAVA API 层面的标准,使得多个客户端可以通过 JMS 进行交互,大部分消息中间件提供商都对 JMS 提供支持。
- JMS 包括两种消息模型点对点和发布者/订阅者,同时 JMS 仅支持 Java 平台。
- JMS 和 ActiveMQ 的关系就象 JDBC 和 JDBC 驱动的关系。
3,什么是 ActiveMQ?
(1)Apache ActiveMQ 是一个开源的消息中间件:- 它不仅完全支持 JMSI.1 规范
- 而且支持多种编程语言, 例如 C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、Javascript、perl、PHP、Pike、Python 和 Ruby 等
- 也支持多种协议,例如 OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP 以及 AMQP
二、Spring Boot 整合 JMS(ActiveMQ)
1,添加依赖
首先编辑项目的 pom.xml 文件,添加 ActiveMQ 依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
2,配置 ActiveMQ
(1)首先在 application.properties 中配置 ActiveMQ 的连接信息:
# broker地址,默认端口是61616
spring.activemq.broker-url=tcp://192.168.60.133:61616
# 信任所有的包,这个配置是为了支持发送对象消息
spring.activemq.packages.trust-all=true
# ActiveMQ的用户名
spring.activemq.user=admin
# ActiveMQ的密码
spring.activemq.password=admin
# queue和topic不能同时使用,使用topic的时候,把下面这行解除注释
#spring.jms.pub-sub-domain=true
(2)接着在项目配置类中提供两个消息队列 Bean(分别对应 Queue 和 Topic 这两种模式),这些 Bean 实例由 ActiveMQ 提供:
Queue 模式与 Topic 模式主要区别就是是能否重复消费:
- Queue 为点对点模式:即使多个消费者,一个消息只会有一个消费者可以消费,消费后其它的消费者则不能消费此消息。当消费者不存在时,消息会一直保存,直到有消费消费。
- Topic 为发布订阅模式:允许多个消息消费者(订阅该 Topic)消费同一个消息。如果订阅者者不存在时,消息发出后也不会保存,直接丢失(不落地)
@Configuration public class JmsConfig { // Queue模式下的Destination @Bean public Queue queue(){ return new ActiveMQQueue("amq.queue"); } // Topic模式下的Destination @Bean public Topic topic(){ return new ActiveMQTopic("amq.topic"); } }
3,创建生产者
(1)这里我们使用 Spring 提供的 JMS 消息发送模版 JmsMessagingTemplate 进行消息发送,发送的是一个对象消息:@RestController public class ProducerController { // JMS 消息发送模版 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; // 发送Queue消息 @GetMapping("/sendQueueMsg") public void sendQueueMsg(Book book) { this.jmsMessagingTemplate.convertAndSend(this.queue, book); } // 发送Topic消息 @GetMapping("/sendTopicMsg") public void sendTopicMsg(Book book) { this.jmsMessagingTemplate.convertAndSend(this.topic, book); } }
(2)这里发送的消息对象 Book 是一个普通的 JavaBean,具体代码如下:
@Setter @Getter @ToString public class Book implements Serializable { private Integer id; private String name; }
4,创建消费者
在方法上添加 @JmsListener 注解表示该方法是一个消息消费者,destination 参数表示消息消费者订阅的消息 destination:@RestController public class ConsumerController { // 监听和读取queue消息 @JmsListener(destination="amq.queue") public void readActiveQueue(Book book) { System.out.println("接收到queue消息:" + book); } // 监听和读取topic消息 @JmsListener(destination="amq.topic") public void readActiveTopic(Book book) { System.out.println("接收到topic消息:" + book); } }
三、运行测试
1,Queue 模式
(1)启动项目,使用浏览器访问如下地址:
- http://localhost:8080/sendQueueMsg?id=123&name=活着
(2)可以看到控制台输出如下信息:

2,Topic 模式
(1)使用 Topic 模式时,我们需要先将 spring.jms.pub-sub-domain=true 这个配置放开:

(2)启动项目,使用浏览器访问如下地址:
- http://localhost:8080/sendTopicMsg?id=123&name=西游记
(3)可以看到控制台输出如下信息:
