ActiveMQ扫盲系列(三):核心配置和应用

【引言】无意间接触到一种消息积压处理的场景,想着是否可以监控MQ的队列参数进行发送端的策略调整,于是有了这一篇文章;当然本次也参考了网络上很多的分享,但无奈的发现天下文章一大抄,很难找到一篇写的完整的。


细说核心配置文件


参考版本5.15.8

activemq.xml

  文件路径一般为~\apache-activemq-5.15.8\conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
<!-- 【Lin.C】 首节是关于License等的说明 -->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->

<!-- 【Lin.C】 xml namespaces和xml schema instance定义 -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- 【Lin.C】 PropertyPlaceholderConfigurer是Spring相关的类,具体的关于此类的说明参考后面的章节;简单理解这个配置文件也是用来实现配置注入的 -->
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>

<!-- 【Lin.C】网上找了一大圈,也没有发现关于这个配置项的解释,猜测是关于日志监控相关的,存疑【???】 -->
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

<!-- 【Lin.C】broker节点是activemq的核心配置节点,首先在根节点中配置了namespace、broker的名字(保证网络环境内的唯一性)、数据存储目录 -->
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

<!-- 【Lin.C】为慢消费者制定的消息保留策略 -->
<destinationPolicy>
<policyMap>
<policyEntries>

<!-- 【Lin.C】 1. 仅针对topic生效,且只对nondurable的consumer有效,用于控制有大量消息在通道中积压时,broker可以保留的消息量
2. 针对Topic,某条消息只有所有订阅者都消费了才会被删除 -->
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->

<!-- 【Lin.C】 这里的pendingMessageLimitStrategy是控制消息保存的策略的,会有两种配置方式 -->
<!-- 【Lin.C】 1. ConstantPendingMessageLimitStrategy,保留固定数量,超过limit则会使用MessageEvictionStrategy策略清除消息
2. PrefetchRatePendingMessageLimitStrategy,保留prefetchSize的N倍条消息 -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
<!-- <prefetchRatePendingMessageLimitStrategy multiplier="10"/> -->
</pendingMessageLimitStrategy>

<!-- 【Lin.C】 1. 这个配置项在默认的配置文件里面是没有的.
2. 这个配置项也是针对慢消费者的,仅针对topic生效,且只对nondurable的consumer有效 -->
<!-- 【Lin.C】 1. OldestMessageEvictionStrategy: 移除最旧的消息,默认策略
2. OldestMessageWithLowestPriorityEvictionStrategy: 移除旧消息中权重较低的消息
3. UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。
property可由开发人员指定,从此属性值相同的消息列表中移除最旧的(也就是消息创建时间最早的) -->
<messageEvictionStrategy>
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

<!-- 【Lin.C】 慢消费者策略:Broker根据此配置决定如何处理慢消费者。Broker会启动一个后台线程用来监测所有的慢速消费者并按策略处理。 -->
<!-- 【Lin.C】 1. AbortSlowConsumerStrategy: 慢消费将会被关闭;但abortConnection可控制是否关闭连接
2. AbortSlowConsumerStrategy: 如果慢消费者最后一个ACK距离现在的时间间隔超过maxTimeSinceLastAck,则中慢速消费者。 -->
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
<!-- <abortSlowConsumerStrategy maxTimeSinceLastAck="3000"/> -->
</slowConsumerStrategy>

<!-- 【Lin.C】 转发策略:用于控制将消息转发给消费者的方式 -->
<!-- 【Lin.C】 1. RoundRobinDispatchPolicy: 轮询策略,消息将按顺序依次发给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,
对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。
2. StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,
没有移动“订阅者”顺序的操作。
3. PriorityDispatchPolicy: 基于权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。
4. SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。PriorityDispatchPolicy是其子类。 -->
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>

<!-- 【Lin.C】 恢复策略:用于控制ActiveMQ服务重启如何进行数据恢复-->
<!-- 【Lin.C】 1. FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息,使用maximumSize属性指定大小(单位:Byte)。
2. FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。 使用maximumSize属性指定保存的消息条数
3. LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据
4. QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够恢复多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还
存在,则都可以恢复。
5. TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。使用recoverDuration属性指定保存时间(单位毫秒)
6. NoSubscriptionRecoveryPolicy: 关闭“恢复机制”,默认值。 -->
<subscriptionRecoveryPolicy>
<!-- eg: 恢复最近半小时内的信息-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
<!-- <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> -->
</subscriptionRecoveryPolicy>

<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="true" connectorPort="11099" />
</managementContext>

<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:

http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>


<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>

<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:

http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>

</broker>

<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file

Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

XML Namespace

  XML Namespaces一般简写为xmlns,在xml文件头上基本都会见到,它是XML(eXtensible Markup Language)的命名空间。作用是赋予命名空间一个唯一的名称。下面是从Spring的beans命名空间文件中节选的部分(参考地址: http://www.springframework.org/schema/beans/spring-beans.xsd )。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<xsd:schema xmlns="http://www.springframework.org/schema/beans"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://www.springframework.org/schema/beans">

<xsd:import namespace="http://www.w3.org/XML/1998/namespace"/>

<!-- Top-level <beans> tag -->
<xsd:element name="beans">
<xsd:annotation>
<xsd:documentation><![CDATA[
Container for <bean> and other elements, typically the root element in the document.
Allows the definition of default values for all nested bean definitions. May itself
be nested for the purpose of defining a subset of beans with certain default values or
to be registered only when certain profile(s) are active. Any such nested <beans> element
must be declared as the last element in the document.
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="description" minOccurs="0"/>
<xsd:choice minOccurs="0" maxOccurs="unbounded">
<xsd:element ref="import"/>
<xsd:element ref="alias"/>
<xsd:element ref="bean"/>
<xsd:any namespace="##other" processContents="strict" minOccurs="0" maxOccurs="unbounded"/>
</xsd:choice>
<xsd:element ref="beans" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
<xsd:attribute name="profile" use="optional" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[...]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<!-- 省略若干行...... -->
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:complexType>
</xsd:element>

<!-- 省略若干行...... -->

<!-- simple internal types -->
<xsd:simpleType name="defaultable-boolean">
<xsd:restriction base="xsd:NMTOKEN">
<xsd:enumeration value="default"/>
<xsd:enumeration value="true"/>
<xsd:enumeration value="false"/>
</xsd:restriction>
</xsd:simpleType>

</xsd:schema>

PropertyPlaceholderConfigurer

  PropertyPlaceholderConfigurer这个类是一个bean工厂后置处理器的实现类,是实现了BeanFactoryPostProcessor接口的,

1
2
3
4
5
6
7
8
9
10
11
PropertyPlaceholderConfigurer
PlaceholderResolvingStringValueResolver
PropertyPlaceholderConfigurerResolver
resolvePlaceholder
resolvePlaceholder
resolveSystemProperty
processProperties
SYSTEM_PROPERTIES_MODE_NEVER
SYSTEM_PROPERTIES_MODE_FALLBACK
SYSTEM_PROPERTIES_MODE_OVERRIDE
constants


一次基于JMX的服务监控应用


这次研究这个主题的起因是因为无意间接触到一种消息积压处理的场景,想着是否可以监控MQ的队列参数进行发送端的策略调整,于是有了这一篇文章。下面就这个Demo实现的过程逐一说明。

安装Activemq

  鉴于本机并没有AMQ的环境,于是临时下载了一个Release版本安装了一下(其实也谈不上安装那么复杂,解压即用而已),AMQ的目录结构大致是下面这个样子(有点类似于tomcat的目录结构),本次重点涉及的几个文件都做了标注,以备参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
PS D:\05-Pracs\apache-activemq-5.15.8> tree /f
文件夹 PATH 列表
卷序列号为 DA18-EBFA
D:.

├─bin
│ │ activemq # 启动配置文件
│ │ activemq.bat # 启动文件(实际使用时可能用到bin下面的32和64两个子目录下的activemq.bat直接启动)
│ # 使用这里的启动文件,需要全命令模式(.\activemq.bat start)
├─conf
│ activemq.xml # activemq自身的配置文件
│ ...
│ jmx.access # 用户管理
│ jmx.password # 用户密码管理
│ ...

├─data
│ ├─...

├─docs
│ ├─...

├─examples
│ ├─...

├─lib
│ ├─...

├─webapps
│ ├─...

├─webapps-demo
│ ├─...

PS D:\05-Pracs\apache-activemq-5.15.8>

修改配置项

  配置项的修改其实比较简单,也就是在managementContext节点开启一个端口(本项修改涉及的文件为conf目录下的activemq.xml):

1
2
3
4
5
6
7
8
9
<!-- 修改前 -->
<managementContext>
<managementContext/>
</managementContext>

<!-- 修改后 -->
<managementContext>
<managementContext createConnector="true" connectorPort="11099" />
</managementContext>

添加新用户

  这里的配置主要是控制登录用户的连接后操作权限的(本项修改涉及的文件为conf目录下的jmx.access和jmx.password两个文件):

1
2
3
4
5
6
7
# jmx.access文件,定义用户的权限
admin readwrite
monitor readonly

# jmx.password文件,定义用户登录密码
admin activemq
monitor smart

修改启动项

  这里添加了一些启动参数,其实也是基于前面的几项操作对应添加的,比如端口号、用户文件、密码文件等等(本项修改涉及的文件为bin目录下的activemq);修改位置在invoke_start方法之前:

1
2
3
4
5
6
7
8
9
# - $ACTIVEMQ_SSL_OPTS     : options for SSL encryption

ACTIVEMQ_CONF="D:/05-Pracs/apache-activemq-5.15.8/conf"
ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=11099 "
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ_CONF}/jmx.password"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ_CONF}/jmx.access"
ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"

invoke_start(){

启动Activemq

  通过bat脚本启动AMQ服务,通过观察日志可以发现很多有用信息(比如:JMX的启动信息包括启动端口号,不同的连接服务信息比如TCP、Stomp等等):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
PS D:\05-Pracs\apache-activemq-5.15.8\bin> .\activemq.bat start
Java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jdk1.8.0_181\jre
Heap sizes: current=1005056k free=989327k max=1005056k
JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties ...
Extensions classpath:
[D:\05-Pracs\apache-activemq-5.15.8\bin\..\lib,...]
ACTIVEMQ_HOME: D:\05-Pracs\apache-activemq-5.15.8\bin\..
ACTIVEMQ_BASE: D:\05-Pracs\apache-activemq-5.15.8\bin\..
ACTIVEMQ_CONF: D:\05-Pracs\apache-activemq-5.15.8\bin\..\conf
ACTIVEMQ_DATA: D:\05-Pracs\apache-activemq-5.15.8\bin\..\data
Loading message broker from: xbean:activemq.xml
INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5f2108b5: startup date [Tue Dec 18 15:13:33 CST 2018];...
INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\05-Pracs\apache-activemq-5.15.8\bin\..\data\kahadb]
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:11099/jmxrmi
INFO | KahaDB is version 6
INFO | PListStore:[D:\05-Pracs\apache-activemq-5.15.8\bin\..\data\localhost\tmp_storage] started
INFO | Apache ActiveMQ 5.15.8 (localhost, ID:IVF21BBAA9XWLKD-65387-1545117214542-0:1) is starting
INFO | Listening for connections at:
tcp://IVF21BBAA9XWLKD:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector openwire started
INFO | Listening for connections at:
amqp://IVF21BBAA9XWLKD:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector amqp started
INFO | Listening for connections at:
stomp://IVF21BBAA9XWLKD:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector stomp started
INFO | Listening for connections at:
mqtt://IVF21BBAA9XWLKD:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector mqtt started
INFO | Starting Jetty server
INFO | Creating Jetty connector
WARN | ServletContext@o.e.j.s.ServletContextHandler@38604b81{/,null,STARTING} has uncovered http methods for path: /
INFO | Listening for connections at
ws://IVF21BBAA9XWLKD:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector ws started
INFO | Apache ActiveMQ 5.15.8 (localhost, ID:IVF21BBAA9XWLKD-65387-1545117214542-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
INFO | No Spring WebApplicationInitializer types detected on classpath
INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
INFO | Initializing Spring FrameworkServlet 'dispatcher'
INFO | No Spring WebApplicationInitializer types detected on classpath
INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml
WARN | Transport Connection to: tcp://127.0.0.1:60850 failed: java.net.SocketException: Connection reset
WARN | Transport Connection to: tcp://127.0.0.1:57564 failed: java.net.SocketException: Connection reset

编码之Maven引入

  本次编码涉及到的关于activemq-web的一些类,需要通过Maven引入:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-web -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-web</artifactId>
<version>5.15.8</version>
</dependency>

编码之核心代码

  这里的Demo也没有过多的操作,只是针对AMQ的一些基础信息做了常规的打印,有扩展需求的话可以根据这里的打印信息参考扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.ttfisher;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.web.RemoteJMXBrokerFacade;
import org.apache.activemq.web.config.SystemPropertiesConfiguration;

/**
* 基于JMX的Activemq服务状态监控
*
* @author: chenglin
* @date: 2018/12/18 18:59
*/
public class JMXMonitor {

private static String jmxUrl = "service:jmx:rmi:///jndi/rmi://127.0.0.1:11099/jmxrmi";
private static String jmxUser = "monitor";
private static String jmxPwd = "smart";

/**
* 监控服务
*/
private static void doMonitor() {
try {
// 创建连接
RemoteJMXBrokerFacade createConnector = new RemoteJMXBrokerFacade();
System.setProperty("webconsole.jmx.url", jmxUrl);
System.setProperty("webconsole.jmx.user", jmxUser);
System.setProperty("webconsole.jmx.password", jmxPwd);
createConnector.setConfiguration(new SystemPropertiesConfiguration());

// 监控Broker
BrokerViewMBean brokerAdmin = createConnector.getBrokerAdmin();
System.out.println("============= Broker Info ================");
System.out.println("BrokerName: " + brokerAdmin.getBrokerName());
System.out.println("TotalMessageCount: " + brokerAdmin.getTotalMessageCount());
System.out.println("TotalConsumerCount: " + brokerAdmin.getTotalConsumerCount());
System.out.println("TotalDequeueCount: " + brokerAdmin.getTotalDequeueCount());
System.out.println("TotalEnqueueCount: " + brokerAdmin.getTotalEnqueueCount());

// 监控所有的Topic
System.out.println("============= Topic List Info ================");
for (TopicViewMBean topicViewMBean : createConnector.getTopics()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
System.out.println("Topic Name: " + topicViewMBean.getName());
System.out.println("ConsumerCount: " + topicViewMBean.getConsumerCount());
System.out.println("DequeueCount: " + topicViewMBean.getDequeueCount());
System.out.println("EnqueueCount: " + topicViewMBean.getEnqueueCount());
System.out.println("DispatchCount: " + topicViewMBean.getDispatchCount());
System.out.println("ExpiredCount: " + topicViewMBean.getExpiredCount());
System.out.println("MaxEnqueueTime: " + topicViewMBean.getMaxEnqueueTime());
System.out.println("ProducerCount: " + topicViewMBean.getProducerCount());
System.out.println("MemoryPercentUsage: " + topicViewMBean.getMemoryPercentUsage());
System.out.println("MemoryLimit: " + topicViewMBean.getMemoryLimit());
}

// 监控所有的Queue
System.out.println("============ Queue List Info ==================");
for (QueueViewMBean queueViewMBean : createConnector.getQueues()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
System.out.println("Queue Name: " + queueViewMBean.getName());
System.out.println("ConsumerCount: " + queueViewMBean.getConsumerCount());
System.out.println("DequeueCount: " + queueViewMBean.getDequeueCount());
System.out.println("EnqueueCount: " + queueViewMBean.getEnqueueCount());
System.out.println("DispatchCount: " + queueViewMBean.getDispatchCount());
System.out.println("ExpiredCount: " + queueViewMBean.getExpiredCount());
System.out.println("MaxEnqueueTime: " + queueViewMBean.getMaxEnqueueTime());
System.out.println("ProducerCount: " + queueViewMBean.getProducerCount());
System.out.println("MemoryPercentUsage: " + queueViewMBean.getMemoryPercentUsage());
System.out.println("MemoryLimit: " + queueViewMBean.getMemoryLimit());
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* Have a test
*
* @param args
*/
public static void main(String[] args) {
doMonitor();
}
}

编码之测试结果

  通过测试日志能看到AMQ的基本信息和实时的数据状态,比如Broker的信息,不同Topic的信息(包括出入队的数据量等):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
08:41:46.963 [main] DEBUG org.apache.activemq.web.RemoteJMXBrokerFacade - Creating a new JMX-Connection to the broker
08:41:47.219 [main] INFO org.apache.activemq.web.RemoteJMXBrokerFacade - Connected via JMX to the broker at service:jmx:rmi:///jndi/rmi://127.0.0.1:11099/jmxrmi
============= Broker Info ================
BrokerName: localhost
TotalMessageCount: 0
TotalConsumerCount: 0
TotalDequeueCount: 55
TotalEnqueueCount: 610
============= Topic List Info ================
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: queue.pub.sub
ConsumerCount: 0
DequeueCount: 55
EnqueueCount: 120
DispatchCount: 55
ExpiredCount: 0
MaxEnqueueTime: 6
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: ActiveMQ.Advisory.MasterBroker
ConsumerCount: 0
DequeueCount: 0
EnqueueCount: 1
DispatchCount: 0
ExpiredCount: 0
MaxEnqueueTime: 0
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: ActiveMQ.Advisory.Connection
ConsumerCount: 0
DequeueCount: 0
EnqueueCount: 244
DispatchCount: 0
ExpiredCount: 0
MaxEnqueueTime: 0
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: ActiveMQ.Advisory.Consumer.Topic.queue.pub.sub
ConsumerCount: 0
DequeueCount: 0
EnqueueCount: 4
DispatchCount: 0
ExpiredCount: 0
MaxEnqueueTime: 0
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: ActiveMQ.Advisory.Topic
ConsumerCount: 0
DequeueCount: 0
EnqueueCount: 1
DispatchCount: 0
ExpiredCount: 0
MaxEnqueueTime: 0
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Topic Name: ActiveMQ.Advisory.Producer.Topic.queue.pub.sub
ConsumerCount: 0
DequeueCount: 0
EnqueueCount: 240
DispatchCount: 0
ExpiredCount: 0
MaxEnqueueTime: 0
ProducerCount: 0
MemoryPercentUsage: 0
MemoryLimit: 720424141
============ Queue List Info ==================

Process finished with exit code 0

------2019 Lin.C ------