ActiveMQ扫盲系列(二):编码之外

【引言】经过前几章,对ActiveMQ基本已经入门了;但是作为消息队列,光能用起来还是远远不够的,所以在编码之外,还有很多需要掌握的知识:比如持久化策略、比如高可用等等,所以本章我们就来谈谈编码之外的知识。


JMS

絮语

  其实在本系列的第一篇就提到了JMS的概念,但仅作为初识阶段形成一个概念,所以这次既然专程开了一章对编码本身之外的内容做梳理的话,就从最基本的JMS的概念来细细说说吧!

MOM

  这里的MOM和我们平时见得可不是一个意思,它的全称是Message-oriented middleware;译作面向消息的中间件,它的出现主要就是为了降低各个应用程序之间的耦合性的。
  传统的通信方式,某个主机需要用到的终端的话都是直接通信;而MOM的理念则是,在消息发送者与消息的接收者之间有一个message mediator,这样就可以把消息的发送者和使用者分离开来。
  有了MOM之后,系统的就可以不再有那么强的依赖:

  • 可以做到不必要求发送者和接收者同时“在线”(actived)才能进行消息通信。发送者只管把消息发给MOM,然后可以“离开”,接收者可以在随后任何时间去取(取决于何种通信模式),即发送者不需要知道接收者的存在,且可进行异步通信。
  • MOM还可以提供一些额外的功能:比如,消息的持久化(message persistence)、消息的转化、消息之间的路由……而这些在未引入MOM系统中很难实现的。
  • MOM支持各种各样的连接协议,即Client可以通过如HTTP/TCP….协议连接MOM。
  • MOM提供了相应的API发送及接收消息,避免了开发者的重复工作。

Why JMS?

  前一节提到了MOM,它的好处不必多言了,但是也存在一个棘手的问题,不同的厂商提供的MOM都各具特色,每家都有一套自己的发送、接收消息的API(这也不难理解,难不成IBM开发个组件还去跟微软确认么?),根本谈不上兼容性和统一性。
  所以JMS的出现就是为了解决这个标准不统一的问题的,JMS客户端通过JMS 规定的API访问各种各样的基于JMS协议的消息中间件产品,JMS既承担了一个middleware的角色,也承担了一个标准制定者的角色,对调用端,它是一个有着统一调用标准的middleware,而对于服务端,不同的厂商的MOM都需要遵循这个标准(当然你也可以不遵循,但是调用端为了编码的便捷和统一性,自然会优先选择统一标准的中间件使用)。

JMS的结构

  • Headers:包含该消息的属性;
    • 消息的发送目的地,由JMSDestination表示
    • 消息的传递模式,由JMSDeliveryMode表示;传输模式有两种:Persistent和Non-Persistent,应该很好理解吧
  • Properties:与Headers有点类似
  • Payload:存储JMS实际消息的地方。可以以text形式、二进制形式存储消息。

JMS Selector

  Message Selector允许用户只接收自己感兴趣的消息;实际应用时,也就是控制订阅哪个消息主题(Topic)

JMS Domain

点对点

  • The point-to-point (PTP) messaging domain uses destinations known as queues.(点对点模型传输的目的地是队列。)
  • Each message received on the queue is delivered to once-and-only-once consumer.(消息只能发送给唯一的一个consumer。)
  • Consumers receive messages from the queue either synchronously using the MessageConsumer.receive() method or asynchronously by registering a MessageListener implementation using the MessageConsumer.setMessageListener() method.(它支持同步通信和异步通信,同步通信使用MessageConsumer.receive()来接收消息。异步通信需要MessageListener监听器的支持。)

发布订阅

  • The publish/subscribe (pub/sub) messaging domain uses destinations known as topics.(发布-订阅模型传输的目的地是Topics。)
  • Any messages sent to the topic are delivered to all subscribers via a push model, as messages are automatically delivered to the subscriber.(JMS采用Push方式,即主动地把消息推送给订阅者。)
  • 发布-订阅模型中有两种订阅方式
    • 持久订阅(Durable subscriptions):Using a durable subscription, when a subscriber disconnects from the JMS provider, it is the responsibility of the JMS provider to store messages for the subscriber. 也就是消费端不在线的情况下,JMS Provider会存储这个消息。当持久订阅者处于 inactive 状态时,Broker需要为持久订阅者保存消息,如果持久订阅者订阅的消息太多则会溢出。
    • 非持久订阅:只有当Client与JMS Provider(如,ActiveMQ)保持连接状态才能收到发送到某个Topic的消息。若Client处于离线,这个时间段发送到Topic的消息会丢失。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      public interface DeliveryMode {

      /** This is the lowest-overhead delivery mode because it does not require
      * that the message be logged to stable storage. The level of JMS provider
      * failure that causes a {@code NON_PERSISTENT} message to be lost is
      * not defined.
      */

      static final int NON_PERSISTENT = 1;

      /** This delivery mode instructs the JMS provider to log the message to stable
      * storage as part of the client's send operation. Only a hard media
      * failure should cause a {@code PERSISTENT} message to be lost.
      */

      static final int PERSISTENT = 2;
      }

持久订阅和持久化

  • Message Durability针对 Pub/Sub Domain而言的,它是指接收者以何种方式去接收消息,控制的是消费端不在线时的消息处理机制。
  • Message Persistence与Domain无关,它是针对消息服务器而言的,描述的是消息的可靠性,即当消息服务器宕机后,消息是否丢失。

ActiveMQ Persistence

存储介质

  • file-based(存储在文件中)
  • in-memory(存储在内存中)
  • relational databases(存储在关系数据库中)

Persistence Message的作用

  Persistent messages are ideal if you want messages to always be available to a message consumer after they have been delivered to a message broker, or you need messages to survive even if there has been a system failure。
  很显而易见的两个特性:①消息对消费者总是可用。②系统宕机后,消息不会丢失。

关于Destination

  • 主题(Topic):对应的Domain是发布-订阅模型(Pub/Sub);它内部又分持久化订阅和非持久化订阅两种模式。持久订阅者维护者一个消息指针,指针总是指向该订阅者需要的下一个消息,所以即使订阅者当前不在线,也不会丢失消息。

    1
    MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME));
  • 队列(Queue):对应的Domain是点对点模型(P2P);按FIFO的顺序存储消息,所有消费者中只有其中一个消费者可以接收该消息;也只有当消费者消费了之后,该消息才能删除。

    1
    MessageProducer producer = session.createProducer(session.createQueue(TOPIC_NAME));

存储方式

ActiveMQ provides a pluggable API for message stores as well as multiple message store implementations including:

  • AMQ Message Store
  • KahaDB Message Store: default
  • JDBC Message Store
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    From activemq.xml
    <!--
    Configure message persistence for the broker. The default persistence
    mechanism is the KahaDB store (identified by the kahaDB tag).
    For more information, see:
    http://activemq.apache.org/persistence.html
    -->
    <persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

ActiveMQ存储方式细说

AMQ

  AMQ消息存储,就像KahaDB,综合了可靠的持久化(系统崩溃时仍可存活)的业务日志和高性能索引的特性,对那些以消息产能为主要需求的应用来说,该存储成为的最好选择!但由于它为每个索引使用两个独立的文件,每种目标(destination)使用一个索引,因此,若你打算在每个服务端使用数以千计的队列时,就不要使用AMQ消息存储了。如果服务端未正常关闭,再次启动时恢复将非常缓慢。这是因为所有的索引都需要重建,这需要服务端再次将所有的数据日志重新建立索引。

内部实现

  • 数据记录:作为消息日志
  • 缓存:在数据写入数据记录以后,为快速从内存中查询而保留消息。
  • 引用库:保持了由消息ID索引的日志中的消息的引用

存储结构

  • lock文件:确保在任一时刻,只有一个服务端可以访问这个数据。在同一个系统中多个名称相同的服务端,通常出于热备目的而使用这个lock文件。
  • 临时存储目录:用于存储非持久化消息,这些消息可能不再存储在服务端内存中。这些消息通常是正等待发送到一些运行缓慢的消费者的客户端上。
  • kr库:部分AMQ消息存储,引用(索引)的目录结构。默认情况下使用Kaha引用库(Kaha是ActiveMQ核心库的一部分)去索引和存储数据日志中消息的引用。kr库中有两个独立的部分:
  • 数据目录:包含了索引和指向数据记录中保存的信息引用的集合。如果服务端没有完全关闭时,这个数据目录是恢复时删除并重建的一部分。你可以在启动服务端之前通过手工删除这个目录强制恢复。
    • 状态目录:保存了持久topic消费者的信息。日志本身不包含消费者信息,所以当它需要恢复时,它首先要查询持久订阅者的信息才能准确地重建它的数据库。
  • 日志目录:包含了数据记录的数据文件,数据控制文件包含了一些元数据。数据文件有引用计数,所以当所有的消息都已传递完成时,该数据文件可能被删除或归档
  • 归档目录:只有当归档开关启用时才存在.它的默认位置可以在下一个日志中看到。使用一个独立的分区或磁盘是有道理的。归档用于存储来自于日志目录中数据,这些数据移动到这里而不再是被直接删除掉。使日后从归档中重提取消息成为可能。重提取消息时,会移动归档数据记录(或一个子集)到一个新的日志目录当中,然后启动一个新的服务端指向这个新目录。它会自动地从这个日志目录中读取消息。

配置说明

属性名 默认值 描述
directory activemq-data AMQ消除存储使用的目录路径
useNIO true NIO提供对磁盘的更快的直接写入
syncOnWrite false 每次写入磁盘都同步
syncOnTransaction true 每次事务都同步
maxFileLength 32mb 新文件使用前消息日志数据文件的最大大小
persistentIndex true 使用持久化索引。如果设置为false, 会使用内存中的HashMap
maxCheckpointMessageAddSize 4kb 写入磁盘前事务使用的最大内存
cleanupInterval 3000(ms) 检测那个日志数据文件仍旧需要前的时间
checkpointInterval 20000(ms) 移动缓存消息ID到引用库索引前的时间
indexBinSize 1024 索引使用的哈希表初始化桶数量
indexMaxBinSize 16384 使用的哈希表桶的最大值
directoryArchive archive AMQ消息存储放置归档日志文件的目录路径
archiveDataLogs false 如果为true,日志文件会被归档而不会被删除
recoverReferenceStore true 如果服务端没有完全关闭完成,恢复引用库,另外这种错误要格外消息
forceRecoverReferenceStore false 强制引用库的恢复

配置样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<amqPersistenceAdapter
directory="target/Broker2-data/activemq-data"
syncOnWrite="true"
indexPageSize="16kb"
indexMaxBinSize="100"
maxFileLength="10mb" >
</amqPersistenceAdapter>
</persistenceAdapter>
</broker>
</beans>

KahaDB

  自从ActiveMQ5.3以后,推荐使用的通用消息存储就是Kaha DB。这是一种基于文件的消息存储,包含事务型的日志,支持消息的存储与恢复,具有良好的性能和可扩展性。
  KahaDB是一个基于文件的,事务型存储,可以微调,并设计为快速的消息存储。KahaDB的目标是易用,且尽可能地快。使用文件数据库,就意味着没有第三方数据库这个先决条件。这个存储使得ActiveMQ从下载到运行只需几分钟。另外,KahaDB的文件目录为了消息服务端的需要,设计的非常精简。
  KahaDB消息存储使用事务日志存储索引,并只用一个索引文件应对所有的目标消息。它已经用于10000个连接的生产环境当中,每个连接都有自己的queue。可配置的KahaDB,意味着可以微调以用于更多的场景,从高吞吐的应用(如交易平台),到存储海量消息(如GPS定位)。

内部实现

  • 数据日志(data log) : 扮演了消息日志的角色,它包含了滚动的消息日志和存储在特定长度的文件中的命令(如事务边界和消息删除)。当当前使用的数据文件达到最大长度时,一个新的数据文件会随之创建。所有数据文件中的消息上都有引用计数,因此一旦文件中的消息不再有需要了,这个文件就会被归档或删除。在数据文件中,消息以追加的方式添加到当前文件末尾,因此存储非常之快。
  • 如果存在接收消息的活跃消费者,缓存会将消息临时保存。如果这些活跃的消费者,消息被分发的同时会被安排存储起来。如果消息及时被确认了,就无需写入磁盘。
  • BTree索引通过对消息ID的索引保留了对在数据文件中消息的引用。索引为queue维护了先进先出的数据结构和持久订阅者指向它们的topic消息。redo日志仅在ActiveMQ服务端没有完整地关闭时使用,通常是要确保维护完整的BTree索引。

存储结构

  • 数据日志文件(db log files):KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
  • 归档目录(archive directory):只有当归档启用时才会存在。归档用于存储那些KahaDB不再需要的数据记录,使日后再想恢复消息成为可能。如果归档没有被启用(默认情况),那么不再使用的数据会被直接从文件系统中删除。
  • db.data:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息。
  • db.redo:这是redo文件,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。

配置说明

属性名 默认值 描述
directory activemq-data KahaDB使用的目录路径
indexWriteBatchSize 1000 批量写入磁盘的索引页数
indexCacheSize 1000 内存中索引页数的缓存大小
enableIndexWriteAsync false 如果设置了,会异步写入索引
journalMaxFileLength false 设置每个消息数据记录文件的最大空间
enableJournalDiskSyncs true 确保每个非事务记录跟随磁盘sync写入(JMS 耐久性要求)
cleanupInterval 30000 检查需要抛弃或移除的不再使用的消息数据记录的时间间隔,单位毫秒
checkpointInterval 30000 Time (ms) before checkpointing the journal ignore
MissingJournalfiles false 如果启用, 将忽略丢失的消息日志文件
checkForCorruptJournalFiles false 如果启用, 启动时将校验消息数据记录是否被损坏
checksumJournalFiles false 如果启用,将为每个消息数据记录提供校验和(checksum)
archiveDataLogs false 如果启用, 将会把消息数据记录移动到归档目录而不是删除它们
directoryArchive null 定义了当消息数据记录中包含的所有消息都被消费过后,将数据记录的移动过程路径
databaseLockedWaitDelay 10000 定义了试图获取数据锁的等待时间,单位毫秒(用于主从共享数据时)
maxAsyncJobs 10000 排队等待存储的异步消息最大数目(最好与并发消息生产者相同)
concurrentStoreAndDispatchTransactions true 允许消息分发与事务存储并发地进行
concurrentStoreAndDispatchTopics true 允许topic消息分发与事务存储并发地进行
concurrentStoreAndDispatchQueues true 允许queue消息分发与事务存储并发地进行

配置样例

1
2
3
4
5
6
7
<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="16mb"></kahaDB>
</persistenceAdapter>
...
</broker>

JDBC

  灵活的ActiveMQ插件式消息存储API允许你选择不同的实现。最原始和通用的存储实现是以JDBC来进行消息存储。
  这么多组织选择使用JDBC消息存储的原因是它们对关系型数据库有很多使用经验。JDBC持久化很明显没有上面提到的消息存储在性能方面的优势。事实上真相是很多企业已经在关系型数据库上花费了大量金钱,因此他们想尽其所能充分使用它们。
  但是使用共享数据库是对于由多个服务端组成的master/slave(主从)拓扑结构非常之有用。当一个ActiveMQ服务端组配置使用一个共享数据库,它们都会尝试连接并获取一个表级锁的锁,但只有一个能成功并成功master。其余的服务端将变成slaves,并将进入等待状态,不再接收客户端连接直到master失败。
  当使用JDBC消息存储,ActiveMQ使用的默认的JDBC驱动是Apache Derby。但其他的关系型数据也受支持

支持度

  • Apache Derby
  • MySQL
  • Postgre SQL
  • Oracle
  • SQL Server
  • Sybase
  • Informix
  • MaxDB

消息存储模式

  JDBC消息存储使用了包含三个table的模式。两个表用来存储消息,第三个用来作为一个锁表,确保同一时刻只有一个ActiveMQ服务端可以访问数据库。

ACTIVEMQ_MSGS表

  queue和topic消息都会被分解并保存到ACTIVEMQ_MSGS表中。

Column Type Description
ID INTEGER 用于查询消息的序列化ID
CONTAINER VARCHAR(250) 消息的目的地
MSGID_PROD VARCHAR(250) 消息生产者目的地
MSGID_SEQ INTEGER 生产者的消息序列号. 这个字段与MSGID_PROD一起等价于JMSMessageID.
EXPIRATION BIGINT 消息过期的毫秒时间
MSG BLOB 消息自身的序列化

ACTIVEMQ_ACKS表

  本表用来保存持久订阅者消息,表中的LAST_ACKED_ID序列是用于简单外键指向ACTIVEMQ_MSGS,并使持久订阅者的消息易于从ACTIVEMQ_MSGS查询到。

Column Type Description
CONTAINER VARCHAR(250) 消息的目的地
SUB_DEST VARCHAR(250) 持久订阅者的目的地(can be different from the container if using wildcards)
CLIENT_ID VARCHAR(250) 持久订阅者的client ID
SUB_NAME VARCHAR(250) 持久订阅者的订阅名称
SELECTOR VARCHAR(250) 持久订阅者的选择器
LAST_ACKED_ID Integer 订阅者的接收到最新消息的序列ID

ACTIVEMQ_LOCK表

  本表用于确保同一时间只有一个ActiveMQ服务端实例可以访问数据库。如果ActiveMQ服务端不能获取到数据库锁,服务端将不能完全初始化,并一直等待锁被释放,或者自己关闭。

Column Type Description
ID INTEGER 锁的唯一ID
Broker Name VARCHAR(250) 持有锁的ActiveMQ服务端名称

配置样例

Apache Derby

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#derby-ds"
dataDirectory="activemq-data" ></journalPersistenceAdapterFactory>
</persistenceFactory>
</broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb" ></property>
<property name="createDatabase" value="create" ></property>
</bean>
</beans>

JDBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" persistent="true"
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" ></jdbcPersistenceAdapter>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" ></property>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" ></property>
<property name="username" value="activemq" ></property>
<property name="password" value="activemq" ></property>
<property name="maxActive" value="200" ></property>
<property name="poolPreparedStatements" value="true" ></property>
</bean>
</beans>

Apache Commons DBCP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" persistent=true
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#oracle-ds" ></jdbcPersistenceAdapter>
</persistenceAdapter>
</broker>
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" ></property>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB" ></property>
<property name="username" value="scott" ></property>
<property name="password" value="tiger" ></property>
<property name="maxActive" value="200" ></property>
<property name="poolPreparedStatements" value="true" ></property>
</bean>
</beans>

内存消息存储

  内存消息存储把所有的持久消息都保存到内存当中。不涉及主动缓存(No active caching is involved),所以你必须要留意JVM和ActiveMQ服务端的内存设置,使之足以同时容纳所有存在的消息。
  如果能确定服务端将只存储有限数量的消息,而且通常能很快被消费掉,那么用内存存储就很好。真正使用它一般是一些小的测试用例,这种场景下,你可能想要验证与服务端的互通,但在开始时又不想增加消息的存储成本,或者想在测试完成之后更易容地清理掉消息。

配置内存存储

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635" ></transportConnector>
</transportConnectors>
</broker>
</beans>

非持久订阅可靠性提升

关于非持久订阅

  对于持久订阅者而言,只要订阅了某个Topic,就不用担心自己“离线”(inactive)后,错过某些消息,因为所有消息都做了缓存了;而对于非持久订阅者则会存在如下状况:

  • 生产者发送了若干个消息到Topic后,非持久订阅者才去订阅该Topic,则它会错过(收不到)在它订阅之前发送的消息。
  • 生产者向Topic发送了若干个消息,而此时因网络中断原因或者非持久订阅者宕机时,非持久订阅者刚好不在线(inactive),就会错过(收不到)生产者发送的消息。
  • 从消息的角度而言,有些消息是实时消息(如,实时股票价格),需要快速地消费掉,对消息进行持久化就没有太大的意义,而且会因为存储消息而造成一定的开销。
      因此,为了提高非持久订阅者的可靠性,以及实时的消费消息,就需要:
  • 消息不进行持久化并缓存消息(Caching message for nondurable consumers);
  • 对缓存的消息的消费策略

Retroactive Consumer

  Retroactive Consumer属于非持久订阅者,但它是消费非持久化消息的订阅者(其他非持久订阅者 可以消费持久化消息);也就是说,通过这个设置,非持久订阅者也可以实现订阅了某topic的客户端就可以追溯之前已发送或错过的消息(ActiveMQ Broker可以为各种Topic缓存消息,但也有两种例外)。

1
2
3
4
5
6
7
8
The ActiveMQ message broker caches messages in memory for every topic that is used.
The only types of topics that are not supported are temporary topics and ActiveMQ advisory topics.
Messages that are cached by the broker are only dispatched to a topic consumer if it is retroactive;
and never to durable topic subscribers.

// 实现方式如下
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
consumer = session.createConsumer(topic);

订阅恢复策略

  因为 retroactive consumer 消费的是非持久化的消息(消息保存在内存中),所以就会出现第一节中提到的这两个问题:比生产者后上线,或网络问题或者宕机;所以就需要订阅恢复策略,订阅恢复策略的目的就是让retroactive consumer能够回到过去某个时间点消费它错过了的消息。恢复订阅策略针对的是非持久化的retroactive consumer订阅者而言的,它提高了非持久化消息的可靠性。
  Summary of Available Recovery Policies

Policy Name Type Description
FixedSizedSubscriptionRecoveryPolicy fixedSizedSubscriptionRecoveryPolicy maximumSize=”1024” Keep a fixed amount of memory in RAM for message history which is evicted in time order.
FixedCountSubscriptionRecoveryPolicy fixedCountSubscriptionRecoveryPolicy maximumSize=”100” Keep a fixed count of last messages.
LastImageSubscriptionRecoveryPolicy lastImageSubscriptionRecoveryPolicy Keep only the last message.
NoSubscriptionRecoveryPolicy noSubscriptionRecoveryPolicy Disables message recovery.
QueryBasedSubscriptionRecoveryPolicy queryBasedSubscriptionRecoveryPolicy query=”JMSType = ‘car’ AND color = ‘blue’” Perform a user specific query mechanism to load any message they may have missed. Details on message selectors are available here: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
TimedSubscriptionRecoveryPolicy timedSubscriptionRecoveryPolicy recoverDuration=”60000” Keep a timed buffer of messages around in memory and use that to recover new subscriptions. Recovery time is in milliseconds.
RetainedMessageSubscriptionRecoveryPolicy retainedMessageSubscriptionRecoveryPolicy Keep the last message with ActiveMQ.Retain property set to true

高可用架构

  HA(高可用性)几乎在所有的架构中都需要有一定的保证;ActiveMQ的高可用性架构是基于Master/Slave 模型的。ActiveMQ总共提供了四种配置方案来配置HA,其中Shared Nothing Master/Slave 在5.8版本之后不再使用了,并在ActiveMQ5.9版本中引入了基于Zookeeper的Replicated LevelDB Store HA方案。

Shared Nothing Master/Slave

概述


  这个架构实际是已经被废弃的架构了,将在5.8+之后版本中移除;但是它的存在必然有存在的价值,所以了解一点也无妨。顾名思义,这个架构的一大特点是没有共享,也就是Master拥有自己的一份数据,Slave也有一份自己的数据;既然曾经存在过现在又被抛弃,那么这个架构想必是有着明显的优缺点的。

数据局限性

  对非持久化消息,Master并不会同步给Slave;因此,一旦Master宕机非持久化消息会丢失。
  Master和Slave各自单独存储一份持久化消息;Slave只是单纯的作为一个备份存在,在Master不能提供服务时能有一定的抗风险能力支撑。但是由于Master只能有一个Slave(而并不是一群Slave),虽然在一定程度上有可靠性保证,但是在抗风险性上跟集群模式相比还是有很大的差距。
  Slave只能同步它连接到Master之后的消息,在Slave连接到Master之前Producer向Master发送的消息将不会同步给Slave;虽然这可以通过配置(waitForSlave)参数,只有当Slave也启动之后,Master才开始初始化TransportConnector接受Client的请求(Producer的请求),但是还是不够灵活。
  如果Master或者Slave其中之一宕机,它们之间不同步的消息无法自动进行同步,此时只能手动恢复不同步的消息了。也就是说:“ActiveMQ没有提供任何有效的手段,能够让master与slave在故障恢复期间,自动进行数据同步”。

响应局限性

  Master收到持久化消息时,需要先同步(sync)给Slave之后,才向Producer发送ACK确认;这个特点会导致对Producer的响应有一定的延时;但若是先响应后同步给Slave的话,又可能会出现数据不一致的问题,这也是一个矛盾点。
  只有Master负责Client的请求,Slave不接收Client请求。Slave连接到Master,负责备份消息;简单的说Slave对外界来说是无感知的,它只负责做Master的备胎。
  Master一旦出现故障,Slave可以有以下两种处理方式:❶自己成为Master;❷关闭自己(停止服务);具体怎么做根据配置而定。

Split Brain现象

  Master 与 Slave之间可能会出现“Split Brain”现象。比如:Master本身是正常的,但是Master与Slave之间的网络出现故障,网络故障导致Slave认为Master已经宕机,因为它自己会成为Master(根据配置:shutdownOnMasterFailure)。此时,对Client而言,就会存在两个Master。

Shared Database Master/Slave

概述


  事实上ActiveMQ官方是推荐“Shared storage”模式作为首选方案,并提供了多个优秀的存储策略,其中kahadb、levedbDB、JDBC Store等都可以便捷的接入。通过命名我们也能发现,这个架构中是有数据库完成数据共享的;所以相较于无共享的模式,它的优势还是很明显的。

不存在单Slave的问题

  这种模式下,对Slave的个数是没有限制的,谁能够获取到排它锁,谁就是Master,这种模式可以很好的允许集群中有任意多个节点的存在(但是节点也不是越多越好,节点过多,数据同步的消耗就越大)。

不存在数据同步问题

  因为存储数据在salve与master之间共享(物理共享),所以当master失效后,slave自动接管服务,而无需手动进行数据的Copy与同步,也无需master与slave之间进行任何额外的数据交互,因为master存储数据之后,它们在任何时候对slave都是可见的。

不同的锁机制

  master与slave之间,通过database的表级排他锁、共享文件的“排他锁”或者分布式排他锁(zookeeper)来决定broker的状态与角色,获取锁权限的broker作为master,如果master失效,它必将失去锁权限,那么slaves将通过锁竞争来选举新master,没有获取锁权限的broker作为slave,并等待锁的释放(间歇性尝试获取锁)。当然slaves仍然不能为Client服务, 它只为故障转移做准备。

依然存在的问题

  对于非持久化消息的同步默认还是不支持的,所有发给Master的非持久化消息不会在slaves中备份,如果Master失效,即使Slave接管了服务,此前保存在Master上的非持久化消息也将会丢失。好在有额外的插件可以支持非持久化消息的同步:

1
2
3
4
5
6
<broker>    
<plugins>
<!-- 将所有消息的传输模式,修改为"PERSISTENT" -->
<forcePersistencyModeBrokerPlugin persistenceFlag="true"/>
</plugins>
</broker>

两种细分类别

  • Shared File System master/slaves:基于共享文件系统的master/slaves模式,只能是基于POSIX接口可以访问的文件系统,比如本地文件系统或者SAN分布式共享文件系统。
  • JDBC Store master/slaves:这种模式下,数据存储引擎为database,activeMQ通过JDBC的方式与database交互,排他锁使用database的表级排他锁,其他原理基本上和文件系统是一致的。

  JDBC Store相对于日志文件而言,通常认为是低效的,尽管数据的可见性较好,但是database的扩容能力非常的弱,无法良好的适应在高并发、大数据情况下(严格来说,单组M-S架构是无法支持大数据的),况且ActiveMQ的消息通常存储时间较短,频繁的写入,频繁的删除,都是性能的影响点。
  JJDBC Store有个小小的问题,就是临时文件无法保存在database中,我们不能在<tmpDataStore>使用JDBC Store;所以临时文件还是只能保存在broker本地。(即非持久消息不会保存在database中,只会保存在master上)

Replicated LevelDB Store

概述


  这里提到的模式,是基于复制的LevelDB Store,这是ActiveMQ全力打造的HA存储引擎,也是目前比较符合“Master-slave”架构模型的存储方案,此特性在5.9+版本中支持。“Replicated LevelDB”也同样允许有多个Slaves,而且Slaves的个数有了约束性的限制,这归结于其使用zookeeper作为Broker master选举。

一些特性

  多数派规则: 当Broker启动时,它首先向zookeeper注册自己的信息(brokerName,消息日志的版本戳等),如果此时group中没有其他broker实例,并阻塞初始化过程,等到足够多的broker加入group;当brokers的数量达到“replicas的多数派”时,开始选举,选举将会根据“消息日志的版本戳”、“权重”的大小决定,即“版本戳”越大(数据最新)、权重越高的broker优先成为master,其他broker作为slave并跟随master。当一个broker成为master时,它会向zookeer注册自己的sync地址信息;此后slaves首先根据sync地址与master建立链接,并同步消息文件(download)。当足够多的slave数据同步结束后,master将初始化transportConnector,此后Client将可以与master进行数据交互。
  Master-slaves集群中,所有的broker必须具有相同的brokerName,它作为group域来限定集群的成员,brokerId可以不同,它仅作为描述信息。“replicas”参数非常重要,默认为3,表示消息最多可以备份在几个broker实例上,同是只有当“replicas/2 + 1”个broker存活时(包含master),集群才有效,才会选举master和备份消息,此值必须>=2。Client发送给Master的持久化消息(包括ACK和事务),master首先在本地保存,然后立即同步(sync)给选定的(replicas/2)个slaves,只有当这些节点也同步成功后,此消息的交互才算结束;对于剩下的replicas个节点,master采用异步的方式(async)转发。这种设计要求,可以保证集群中消息的可靠性,只有当(replicas/2 + 1)个节点物理故障,才会有丢失消息的风险。通常replicas为3,这要求开发者需要至少部署3个broker实例。如果replicas过大,会严重影响master的吞吐能力,因为它在sync消息的过程中会消耗太多的时间。

failover协议

为什么要说它?

  在Activemq的高可用架构中,都需要用到主备的可用性检查和平滑切换,实际应用时我们都会用到下面的配置,那么自然而然我就想了解了解这个failover到底是个什么。

1
2
activemq:
broker-url: failover:(tcp://192.168.1.151:61616,tcp://192.168.1.152:61616)?randomize=false

概念剖析

  The Failover transport layers reconnect logic on top of any of the other transports. The configuration syntax allows you to specify any number of composite URIs. The Failover transport randomly chooses one of the composite URIs and attempts to establish a connection to it. If it does not succeed, or if it subsequently fails, a new connection is established choosing one of the other URIs randomly from the list.
  上面这段定义摘自Apache的 http://activemq.apache.org/failover-transport-reference.html ;大致理解呢就是failover是凌驾在普通协议之上的一层逻辑协议,可以支持N个不同的URI每次随机取一个访问,一旦失败可以切换到其他的URI。
  但是通过上面的定义我们又发现一个问题,在Activemq使用时,我们是默认要用Master的,而不是随机的,那怎么办呢?想必大家已经看到了url最后有这么个参数:randomize=false,基本上也能猜个八九不离十这个参数是来控制顺序调度还是随机调度的,事实上也确实是通过这个来控制的。关于failover这个协议的一些其他参数项的具体说明,可以通过前面的链接查看,这里就不一一列举了。

------2019 Lin.C ------