有悲欢离合,月有阴晴圆缺,此事古难全。

——苏轼《水调歌头》

1. JMS 和 ActiveMQ 介绍

1.1 JMS 是啥

百度百科的解释:

JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

JMS 只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有几个对象模型:

连接工厂:ConnectionFactory

JMS连接:Connection

JMS会话:Session

JMS目的:Destination

JMS生产者:Producer

JMS消费者:Consumer

JMS消息两种类型:点对点和发布/订阅。

可以看出 JMS 实际上和 JDBC 有点类似,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。本文主要使用 ActiveMQ。

1.2 ActiveMQ

ActiveMQ 是 Apache 的一个能力强劲的开源消息总线。ActiveMQ 完全支持JMS1.1和J2EE 1.4规范,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 Java EE 应用中间仍然扮演着特殊的地位。ActiveMQ 用在异步消息的处理上,所谓异步消息即消息发送者无需等待消息接收者的处理以及返回,甚至无需关心消息是否发送成功。

异步消息主要有两种目的地形式,队列(queue)和主题(topic),队列用于点对点形式的消息通信,主题用于发布/订阅式的消息通信。本章节主要来学习一下在 Spring Boot 中如何使用这两种形式的消息。

2. ActiveMQ安装

使用 ActiveMQ 首先需要去官网下载,官网地址为:http://activemq.apache.org/

本课程使用的版本是 apache-activemq-5.15.3,下载后解压缩会有一个名为 apache-activemq-5.15.3的文件夹,没错,这就安装好了,非常简单,开箱即用。打开文件夹会看到里面有个 activemq-all-5.15.3.jar ,这个 jar 我们是可以加进工程里的,但是使用 maven 的话,这个 jar 我们不需要。在使用 ActiveMQ 之前,首先得先启动,刚才解压后的目录中有个 bin 目录,里面有 win32 和 win64两个目录,根据自己电脑选择其中一个打开运行里面的 activemq.bat 即可启动 ActiveMQ。

消息生产者生产消息发布到queue中,然后消息消费者从queue中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue中不再有存储,所以消息消费者不可消费到已经被消费的消息。Queue支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费启动完成后,在浏览器中输入 http://127.0.0.1:8161/admin/ 来访问 ActiveMQ 的服务器,用户名和密码是 admin/admin。如下:

image-20201216134216543

我们可以看到有 Queues 和 Topics 这两个选项,这两个选项分别是点对点消息和发布/订阅消息的查看窗口。何为点对点消息和发布/订阅消息呢?

点对点消息:消息生产者生产消息发布到 queue 中,然后消息消费者从 queue 中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue 中不再有存储,所以消息消费者不可消费到已经被消费的消息。

Queue 支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布/订阅消息:消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。下面分析具体的实现方式。

3. ActiveMQ集成

3.1 依赖导入和配置

在 Spring Boot 中集成 ActiveMQ 需要导入如下 starter 依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

然后在 application.yml 配置文件中,对 activemq 做一下配置:

1
2
3
4
5
6
7
8
spring:
activemq:
# activemq url
broker-url: tcp://localhost:61616
in-memory: true
pool:
   # 如果此处设置为true,需要添加activemq-pool的依赖包,否则会自动配置失败,无法注入JmsMessagingTemplate
   enabled: false

3.2 Queue 和 Topic 的创建

首先我们需要创建两种消息 Queue 和 Topic,这两种消息的创建,我们放到 ActiveMqConfig 中来创建,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* activemq的配置
* @author shengwu ni
*/
@Configuration
public class ActiveMqConfig {
  /**
  * 发布/订阅模式队列名称
  */
  public static final String TOPIC_NAME = "activemq.topic";
  /**
  * 点对点模式队列名称
  */
  public static final String QUEUE_NAME = "activemq.queue";
  @Bean
  public Destination topic() {
    return new ActiveMQTopic(TOPIC_NAME);
  }
  @Bean
  public Destination queue() {
    return new ActiveMQQueue(QUEUE_NAME);
  }
}

可以看出创建 Queue 和 Topic 两种消息,分别使用 new ActiveMQQueue 和 new ActiveMQTopic 来创建,分别跟上对应消息的名称即可。这样在其他地方就可以直接将这两种消息作为组件注入进来了。

3.3 消息的发送接口

在 Spring Boot 中,我们只要注入 JmsMessagingTemplate 模板即可快速发送消息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 消息发送者
* @author shengwu ni
*/
@Service
public class MsgProducer {
  @Resource
  private JmsMessagingTemplate jmsMessagingTemplate;

  public void sendMessage(Destination destination, String msg) {
    jmsMessagingTemplate.convertAndSend(destination, msg);
 }
}

convertAndSend 方法中第一个参数是消息发送的目的地,第二个参数是具体的消息内容。

3.4 点对点消息生产与消费

3.4.1 点对点消息的生产

消息的生产,我们放到 Controller 中来做,由于上面已经生成了 Queue 消息的组件,所以在Controller 中我们直接注入进来即可。然后调用上文的消息发送方法 sendMessage 即可成功生产一条消息。