ActiveMQ扫盲系列(一):理论和实践

【引言】ActiveMQ作为一代经典的消息队列,虽然没怎么用过,但难免以后会用到,所谓未雨绸缪,也就先趁着有精力预研一番;起码在日后遇到了不会一无所知两眼茫然!


概念

MQ的定义

  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

MQ的特点

  MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

JMS的定义

  JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务;用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

ActiveMQ

基本定义

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ的优点

  是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。

ActiveMQ的劣势

  ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。

通信模式

点对点(queue)

  • 一个消息只能被一个服务接收
  • 消息一旦被消费,就会消失
  • 如果没有被消费,就会一直等待,直到被消费
  • 多个服务监听同一个消费空间,先到先得

发布/订阅(topic)

  • 一个消息可以被多个服务接收
  • 订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
  • 消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)
  • 消息是被推送和拉取的(消息生产端和消费端),而不是mq服务器去主动发送的(类似于kafka)

下载和安装

下载

  最新版本:http://activemq.apache.org/activemq-5155-release.html

安装

  So easy,开箱即用!解压完后,按照机器是32位还是64位自行选择启动bat脚本文件,双击运行就OK了,比如我的电脑安装后的启动脚本路径:

1
D:\ProgramsGreen\apache-activemq-5.15.5\bin\win64\activemq.bat

验证

  启动脚本后就是哗哗哗一通CMD窗口刷屏的节奏,等到出现下面一堆日志时,就可以打开 http://127.0.0.1:8161/admin/ 进入web管理平台了(默认用户名密码admin/admin):

1
2
3
4
5
jvm 1    |  INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

传统实现

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.example.demo.traditional;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

/**
* Created by chenglin on 2018/8/21.
*/
public class ActiveMQTProducer {

public static final String TOPIC_NAME = "queue.traditional";

/**
* Have a test
* @param args
*/
public static void main(String[] args) {
Connection connection = null;
try {
// 总体过程类似于jdbc操作,先创建连接,然后启动,然后创建目的队列,随后发送消息,然后再提交
connection = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL).createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(session.createQueue(TOPIC_NAME));
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

/**
* 发送消息
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
for(int i = 0; i< 5; i++){
messageProducer.send(session.createTextMessage("Traditional activemq send : msg-" + i));
}
}

}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.demo.traditional;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* Created by chenglin on 2018/8/21.
*/
public class ActiveMQTConsumer {

public static void main(String[] args) {
Connection connection = null;
try {
connection = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL).createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(ActiveMQTProducer.TOPIC_NAME));
while (true) {
TextMessage message = (TextMessage) messageConsumer.receive(100000);
if (message != null) {
System.out.println("Traditional activemq receive :" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

Result Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Consumer是阻塞等待的,下面的日志时因为运行了两遍Producer才出现的;也试过先启动Producer,此时消息会在服务端缓存,待Consumer上线之后再做消费
......
Traditional activemq receive :Traditional activemq send : msg-0
Traditional activemq receive :Traditional activemq send : msg-1
Traditional activemq receive :Traditional activemq send : msg-2
Traditional activemq receive :Traditional activemq send : msg-3
Traditional activemq receive :Traditional activemq send : msg-4
17:27:27.887 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:27:27.888 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
Traditional activemq receive :Traditional activemq send : msg-0
Traditional activemq receive :Traditional activemq send : msg-1
Traditional activemq receive :Traditional activemq send : msg-2
Traditional activemq receive :Traditional activemq send : msg-3
Traditional activemq receive :Traditional activemq send : msg-4
......

Result Log Plus

1
2
3
4
5
6
7
8
9
10
11
# Consumer是阻塞等待的,如果我们测试时,启动两个Consumer,然后再启动Producer来发消息,实际上消息是被两个Consumer分享的
......
# Consumer process 1
Traditional activemq receive :Traditional activemq send : msg-0
Traditional activemq receive :Traditional activemq send : msg-2
Traditional activemq receive :Traditional activemq send : msg-4

# Consumer process 2
Traditional activemq receive :Traditional activemq send : msg-1
Traditional activemq receive :Traditional activemq send : msg-3
......

Springboot加持

Maven引入

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
# URL of the ActiveMQ broker. Auto-generated by default.
# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true

# 如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
spring.activemq.pool.enabled=false
# 特殊依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

FAQ

  在使用Springboot+junit测试时,遇到两个不大不小的问题

报错

  • java.lang.IllegalStateException: Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration or @SpringBootTest(classes=…) with your test
  • Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type ‘org.springframework.jms.core.JmsMessagingTemplate’ available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}

解决方法

  • 保证测试类的包分支要和主包内的分支在同一条上(比如:主包的待测类是com.xxx.service,那么测试类也必须在com.xxx里面);同时发现测试类的命名也有讲究,这个点没有继续深究可能理解不是很准确(也就是测试类名需要为启动类名+Tests这样)
  • application.properties里面的配置项最后千万别带空格(切记)

单向消息传递

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.example.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
* Created by chenglin on 2018/8/21.
*/
@Service("ActiveMQProducer")
public class ActiveMQProducer {

// 这里也可以注入JmsTemplate,JmsMessagingTemplate是对JmsTemplate进行了封装的
@Autowired
private JmsMessagingTemplate jmsTemplate;

/**
* 发送消息
* @param destination 消息目的队列
* @param message 待发送消息内容
*/
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.demo.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
* Created by chenglin on 2018/8/21.
*/
@Component
public class ActiveMQConsumer {

/**
* JmsListener注解用来配置消费者监听的队列(Destination)
* @param text 接收到的消息
*/
@JmsListener(destination = "queue.demo")
public void receiveQueue(String text) {
System.out.println("This is consumer receiving : " + text);
}
}

Tester

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.example.demo;

import com.example.demo.service.ActiveMQProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.jms.Destination;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootdemoApplicationTests {

@Autowired
private ActiveMQProducer producer;

@Test
public void contextLoads() throws InterruptedException {
Destination destination = new ActiveMQQueue("queue.demo");

for(int i=0; i < 5; i++){
System.out.println("************************************************");
String msg = "This is chenglin speaking " + i;
System.out.println("This is producer sending : " + msg);
producer.sendMessage(destination, msg);
}
}

}

Result Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
......
************************************************
This is producer sending : This is chenglin speaking 0
This is consumer receiving : This is chenglin speaking 0
************************************************
This is producer sending : This is chenglin speaking 1
This is consumer receiving : This is chenglin speaking 1
************************************************
This is producer sending : This is chenglin speaking 2
This is consumer receiving : This is chenglin speaking 2
************************************************
This is producer sending : This is chenglin speaking 3
This is consumer receiving : This is chenglin speaking 3
************************************************
This is producer sending : This is chenglin speaking 4
This is consumer receiving : This is chenglin speaking 4
......

双向消息传递

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.demo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
* Created by chenglin on 2018/8/21.
*/
@Service("ActiveMQProducer")
public class ActiveMQProducer {

// 这里也可以注入JmsTemplate,JmsMessagingTemplate是对JmsTemplate进行了封装的
@Autowired
private JmsMessagingTemplate jmsTemplate;

/**
* 发送消息
* @param destination 消息目的队列
* @param message 待发送消息内容
*/
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}

/**
* 接收返回消息
* @param text 返回的消息内容
*/
@JmsListener(destination="queue.reply")
public void consumerMessage(String text){
System.out.println("This is producer receiving msg from consumer reply : " + text);
}

}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.example.demo.service;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.Random;

/**
* Created by chenglin on 2018/8/21.
*/
@Component
public class ActiveMQConsumer {

/**
* JmsListener注解用来配置消费者监听的队列(Destination)
* @param text 接收到的消息
*/
@JmsListener(destination = "queue.demo")
@SendTo("queue.reply")
public String receiveQueue(String text) {
System.out.println("This is consumer receiving : " + text);
return "Reply from consumer : " + text;
}
}

Tester

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.example.demo;

import com.example.demo.service.ActiveMQProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.jms.Destination;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootdemoApplicationTests {

@Autowired
private ActiveMQProducer producer;

@Test
public void contextLoads() throws InterruptedException {
Destination destination = new ActiveMQQueue("queue.demo");

for(int i=0; i < 5; i++){
System.out.println("************************************************");
String msg = "This is chenglin speaking " + i;
System.out.println("This is producer sending : " + msg);
producer.sendMessage(destination, msg);
}

// 如果这里不做一个sleep的话,返回消息的最后一条会缓存到服务端,下一次启动才会消费
System.out.println("**********************Sleep************************");
Thread.sleep(1000);
}

}

Result Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
......
# 如果上面测试类的最后不做一个sleep的话,这里就会先打印一条上一轮遗留未消费掉的最后一条消息
************************************************
This is producer sending : This is chenglin speaking 0
This is consumer receiving : This is chenglin speaking 0
************************************************
This is producer sending : This is chenglin speaking 1
This is producer receiving msg from consumer reply : Reply from consumer : This is chenglin speaking 0
This is consumer receiving : This is chenglin speaking 1
************************************************
This is producer sending : This is chenglin speaking 2
This is consumer receiving : This is chenglin speaking 2
This is producer receiving msg from consumer reply : Reply from consumer : This is chenglin speaking 1
************************************************
This is producer sending : This is chenglin speaking 3
This is consumer receiving : This is chenglin speaking 3
This is producer receiving msg from consumer reply : Reply from consumer : This is chenglin speaking 2
************************************************
This is producer sending : This is chenglin speaking 4
This is consumer receiving : This is chenglin speaking 4
This is producer receiving msg from consumer reply : Reply from consumer : This is chenglin speaking 3
**********************Sleep************************
This is producer receiving msg from consumer reply : Reply from consumer : This is chenglin speaking 4
......

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.example.demo.pubsub;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* Created by chenglin on 2018/8/22.
* @author chenglin
*/
public class PubTopicProducer {

public static final String TOPIC_NAME = "queue.pub.sub";

/**
* 发送消息
* @param msg
*/
public void send(String msg) {
Connection connection = null;
try {
// 创建一个connection,并打开连接
connection = new ActiveMQConnectionFactory().createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME));

// 消息是否持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(msg));
} catch (JMSException e) {
e.printStackTrace();
} finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

/**
* Have a test
* @param args
*/
public static void main(String[] args) {
PubTopicProducer producer = new PubTopicProducer();
for (int i = 0; i < 5; i++) {
producer.send("This is pub sending msg : msg-" + i);
}
}

}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example.demo.pubsub;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* Created by chenglin on 2018/8/22.
* @author chenglin
*/
public class SubTopicConsumer implements MessageListener {

@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
try {
System.out.println("This is sub receiving msg : " + txtMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
}

/**
* 消息接收流程
*/
public void receive() {
Connection connection = null;

try {
connection = new ActiveMQConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(PubTopicProducer.TOPIC_NAME));
consumer.setMessageListener(new SubTopicConsumer());
} catch (JMSException e) {
e.printStackTrace();
}/* 这里为什么不能关闭connection?一旦关闭了,监听就会断了,就无法接收消息了
finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}*/
}

/**
* Have a test
* @param args
*/
public static void main(String[] args) {
new SubTopicConsumer().receive();
}

}

Result Log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Producer发送了5条消息

# Consumer Process 1
This is sub receiving msg : This is pub sending msg : msg-0
This is sub receiving msg : This is pub sending msg : msg-1
This is sub receiving msg : This is pub sending msg : msg-2
This is sub receiving msg : This is pub sending msg : msg-3
This is sub receiving msg : This is pub sending msg : msg-4

# Consumer Process 2
This is sub receiving msg : This is pub sending msg : msg-0
This is sub receiving msg : This is pub sending msg : msg-1
This is sub receiving msg : This is pub sending msg : msg-2
This is sub receiving msg : This is pub sending msg : msg-3
This is sub receiving msg : This is pub sending msg : msg-4

消息流程总结

  下图是从网上看到的一张流程总结,先致谢一下!图画的很清晰,一目了然;整个流程围绕创建连接-创建会话-发送/接收消息-释放连接的模式展开,非常类似于jdbc操作的流程。

------2019 Lin.C ------