【引言】ActiveMQ作为一代经典的消息队列,虽然没怎么用过,但难免以后会用到,所谓未雨绸缪,也就先趁着有精力预研一番;起码在日后遇到了不会一无所知两眼茫然!
概念
MQ的定义
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
MQ的特点
MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
JMS的定义
JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务;用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
ActiveMQ
基本定义
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ的优点
是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
ActiveMQ的劣势
ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。
通信模式
点对点(queue)
- 一个消息只能被一个服务接收
- 消息一旦被消费,就会消失
- 如果没有被消费,就会一直等待,直到被消费
- 多个服务监听同一个消费空间,先到先得
发布/订阅(topic)
- 一个消息可以被多个服务接收
- 订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
- 消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)
- 消息是被推送和拉取的(消息生产端和消费端),而不是mq服务器去主动发送的(类似于kafka)
下载和安装
下载
最新版本:http://activemq.apache.org/activemq-5155-release.html
安装
So easy,开箱即用!解压完后,按照机器是32位还是64位自行选择启动bat脚本文件,双击运行就OK了,比如我的电脑安装后的启动脚本路径:1
D:\ProgramsGreen\apache-activemq-5.15.5\bin\win64\activemq.bat
验证
启动脚本后就是哗哗哗一通CMD窗口刷屏的节奏,等到出现下面一堆日志时,就可以打开 http://127.0.0.1:8161/admin/ 进入web管理平台了(默认用户名密码admin/admin):1
2
3
4
5jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher'
jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath
jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml
传统实现
Producer
1 | package com.example.demo.traditional; |
Consumer
1 | package com.example.demo.traditional; |
Result Log
1 | # Consumer是阻塞等待的,下面的日志时因为运行了两遍Producer才出现的;也试过先启动Producer,此时消息会在服务端缓存,待Consumer上线之后再做消费 |
Result Log Plus
1 | # Consumer是阻塞等待的,如果我们测试时,启动两个Consumer,然后再启动Producer来发消息,实际上消息是被两个Consumer分享的 |
Springboot加持
Maven引入
1 | <dependency> |
配置项
1 | # URL of the ActiveMQ broker. Auto-generated by default. |
FAQ
在使用Springboot+junit测试时,遇到两个不大不小的问题
报错
- java.lang.IllegalStateException: Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration or @SpringBootTest(classes=…) with your test
- Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type ‘org.springframework.jms.core.JmsMessagingTemplate’ available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}
解决方法
- 保证测试类的包分支要和主包内的分支在同一条上(比如:主包的待测类是com.xxx.service,那么测试类也必须在com.xxx里面);同时发现测试类的命名也有讲究,这个点没有继续深究可能理解不是很准确(也就是测试类名需要为启动类名+Tests这样)
- application.properties里面的配置项最后千万别带空格(切记)
单向消息传递
Producer
1 | package com.example.demo.service; |
Consumer
1 | package com.example.demo.service; |
Tester
1 | package com.example.demo; |
Result Log
1 | ...... |
双向消息传递
Producer
1 | package com.example.demo.service; |
Consumer
1 | package com.example.demo.service; |
Tester
1 | package com.example.demo; |
Result Log
1 | ...... |
Producer
1 | package com.example.demo.pubsub; |
Consumer
1 | package com.example.demo.pubsub; |
Result Log
1 | # Producer发送了5条消息 |
消息流程总结
下图是从网上看到的一张流程总结,先致谢一下!图画的很清晰,一目了然;整个流程围绕创建连接-创建会话-发送/接收消息-释放连接的模式展开,非常类似于jdbc操作的流程。