ActiveMQ理解

中国人最喜欢访问的网站
只要注册ofo就送你10块钱,还等什么,快来注册吧

Running Broker

直接运行bin/activemq脚本可以启动一个broker。

此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子:

  • activemq Runs a broker using the default ‘xbean:activemq.xml‘ as the broker configuration file.
  • activemq xbean:myconfig.xml Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.
  • activemq xbean:file:./conf/broker1.xml Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml
  • activemq xbean:file:C:/ActiveMQ/conf/broker2.xml Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml
  • activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true Runs a broker with two transport connectors and JMX enabled.
  • activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false Runs a broker with 1 transport connector and 1 network connector with persistence disabled.

Embedded Broker

可以通过在应用程序中以编码的方式启动broker,例如:

1
2
3
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();

如果需要启动多个broker,那么需要为broker设置一个名字。例如:

1
2
3
4
BrokerService broker = new BrokerService();
broker.setName("fred");
broker.addConnector("tcp://localhost:61616");
broker.start();

如果希望在同一个JVM内访问这个broker,那么可以使用VM Transport,URI是:vm://brokerName

可以通过BrokerFactory来创建broker,例如:

1
BrokerService broker = BrokerFactory.createBroker(new URI(someURI));

someURI的可选值如下:

类型 示例 描述
xbean: xbean:activemq.xml Searches the classpath for an XML document with the given URI (activemq.xml in this case) which will then be used as the Xml Configuration
file: file:foo/bar/activemq.xml Loads the given file (in this example foo/bar/activemq.xml) as the Xml Configuration
broker: broker:tcp://localhost:61616 Uses the Broker Configuration URI to configure the broker

当使用XBean的配置方式的时候,需要指定一个xml配置文件,例如:

1
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/test/activemq.xml"));

使用Spring的配置方式如下:

1
2
3
4
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
<property name="start" value="true" />
</bean>

Monitoring Broker

JMX

在使用JMX监控broker之前,首先要启用broker的JMX监控功能,例如在配置文件中设置useJmx="true",如下:

1
2
3
4
5
6
<broker useJmx="true" brokerName="broker1>
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
...
</broker>

接下来运行JDK自带的jconsole。在运行了jconsole后,它会弹出对话框来选择需要连接到的agent。如果是在启动broker的主机上运行jconsole,那么ActiveMQ broker会出现在jconsole的Local标签中。如果要连接到远程的broker,那么可以在Advanced标签中指定JMX URL,以下是一个连接到本机的JMX URL:
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
在jconsole的MBeans标签中,可以查看详细信息,也可以执行相应的operation。需要注意的是,在jconsole连接到broker的时候,并不需要输入用户名和密码,如果这存在潜在的安全问题,那么就需要为JMX Connector配置密码保护(需要使用1.5以上版本的JDK)。

首先要禁止ActiveMQ创建自己的connector,例如:

1
2
3
4
5
<broker xmlns="http://activemq.org/config/1.0" brokerName="localhost"useJmx="true">
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
</broker>

然后在ActiveMQ的conf目录下创建一个访问控制文件和密码文件,如下:

1
2
3
4
5
conf/jmx.access:
# The "monitorRole" role has readonly access.
# The "controlRole" role has readwrite access.
monitorRole readonly
controlRole readwrite
1
2
3
4
5
conf/jmx.password:
# The "monitorRole" role has password "abc123".
# The "controlRole" role has password "abcd1234".
monitorRole abc123
controlRole abcd1234

然后修改ActiveMQ的bin目录下activemq的启动脚本,查找包含”SUNJMX=“的一行如下:

1
REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1616 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

把它替换成

1
set SUNJMX=-Dcom.sun.management.jmxremote.port=1616 -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password -Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access

最后重启ActiveMQ和jconsole,这时候需要强制login。如果在启动activemq的过程中出现以下错误,那么需要为这个文件增加访问控制。

1
Error: Password file read access must be restricted: D:\apache-activemq-5.0.0\bin\../conf/jmx.password

Web Console

Web Console被集成到了ActiveMQ的二进制发布包中,因此缺省访问http://localhost:8161/admin即可访问Web Console。
在配置文件中,可以通过修改nioConnectorport属性来修改Web console的缺省端口:

1
2
3
4
5
6
<jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
<connectors>
<nioConnector port="8161" />
</connectors>
...
</jetty>

出于安全性或者可靠性的考虑,Web Console可以被部署到不同于ActiveMQ的进程中。例如把activemq-web-console.war部署到一个单独的web容器中(Tomcat,Jetty等)。在ActiveMQ5.0的二进制发布包中不包含activemq-web-console.war,因此需要下载 ActiveMQ的源码,然后进入到${activemq.base}/src/activemq-web-console目录中执行mvn instanll。如果一切正常,那么缺省会在${activemq.base}/src/activemq-web-console/target目录中生成activemq-web-console-5.0.0.war。然后将activemq-web-console-5.0.0.war拷贝到Tomcat的webapps目录中,并重命名成activemq-web-console.war
需要注意的是,要将activemq-all-5.0.0.jar拷贝到WEB-INF\lib目录中(可能还需要拷贝jms.jar)。还要为Tomcat设置以下五个系统属性(修改catalina.bat文件):

1
2
3
4
5
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jms.url="tcp://localhost:61616"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.role=""
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.jmx.password=""

如果JMX没有配置密码保护,那么webconsole.jmx.rolewebconsole.jmx.password设置成""即可。如果broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如:

-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)

顺便说一下,由于webconsole.type属性是properties,因此实际上起作用的Web Console的配置文件是WEB-INF/webconsole-properties.xml。最后启动被监控的ActiveMQ,访问http://localhost:8080 /activemq-web-console/,查看显示是否正常。

Advisory Message

ActiveMQ支持Advisory Messages,它允许你通过标准的JMS消息来监控系统。目前的Advisory Messages支持:
• consumers, producers and connections starting and stopping
• temporary destinations being created and destroyed
• messages expiring on topics and queues
• brokers sending messages to destinations with no consumers.
• connections starting and stopping

Advisory Messages可以被想象成某种的管理通道,通过它你可以得到关于JMS provider、producers、consumers和destinations的信息。Advisory topics都使用ActiveMQ.Advisory.这个前缀,以下是目前支持的topics:

Client based advisories
Advisory Topics Description
ActiveMQ.Advisory.Connection Connection start & stop messages
ActiveMQ.Advisory.Producer.Queue Producer start & stop messages on a Queue
ActiveMQ.Advisory.Producer.Topic Producer start & stop messages on a Topic
ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue
ActiveMQ.Advisory.Consumer.Topic Consumer start & stop messages on a Topic

在消费者启动/停止的Advisory Messages的消息头中有个consumerCount属性,它用来指明目前desination上活跃的consumer的数量。
Destination and Message based advisories
Advisory Topics Description
ActiveMQ.Advisory.Queue Queue create & destroy
ActiveMQ.Advisory.Topic Topic create & destroy
ActiveMQ.Advisory.TempQueue Temporary Queue create & destroy
ActiveMQ.Advisory.TempTopic Temporary Topic create & destroy
ActiveMQ.Advisory.Expired.Queue Expired messages on a Queue
ActiveMQ.Advisory.Expired.Topic Expired messages on a Topic
ActiveMQ.Advisory.NoConsumer.Queue No consumer is available to process messages being sent on a Queue
ActiveMQ.Advisory.NoConsumer.Topic No consumer is available to process messages being sent on a Topic

以上的这些destnations都可以用来作为前缀,在其后面追加其它的重要信息,例如topic、queue、clientID、 producderID和consumerID等。这令你可以利用Wildcards和Selectors来过滤Advisory Messages。
例如,如果你希望订阅FOO.BAR这个queue上Consumer的start/stop的消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望订阅所有queue上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.;如果希望订阅所有queue或者topic上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.
org.apache.activemq.advisory.AdvisorySupport类上有如下的helper methods,用来在程序中得到advisory destination objects。

1
2
3
4
5
6
7
AdvisorySupport.getConsumerAdvisoryTopic()
AdvisorySupport.getProducerAdvisoryTopic()
AdvisorySupport.getDestinationAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()

以下是段使用Advisory Messages的程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
...
public void onMessage(Message msg){
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}

Command Agent

在介绍Command Agent前首先简要介绍一下XMPP(Jabber)协议,XMPP是一种基于XML的即时通信协议,它由Jabber软件基金会开发。在配置文件中通过增加transportConnector来支持XMPP协议:

1
2
3
4
5
6
<broker xmlns="http://activemq.org/config/1.0">
<transportConnectors>
...
<transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
</transportConnectors>
</broker>

ActiveMQ提供了ActiveMQ messages和XMPP之间的双向桥接:
• 如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMS topic。
• 尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic。
• 呆在一个聊天室中意味着这将保持一个对相应JMS topic的订阅。因此发送到这个topic的JMS消息也会被发送到聊天室。

从4.2版本起,ActiveMQ支持Command Agent。在配置文件中,通过设置commandAgent来启用Command Agent:

1
2
3
4
5
6
<beans>
<broker useJmx="true" xmlns="http://activemq.org/config/1.0">
...
</broker>
<commandAgent xmlns="http://activemq.org/config/1.0"/>
</beans>

启用了Command Agent的broker上会有一个来自Command Agent的连接,它同时订阅topic:ActiveMQ.Agent。在你启动XMPP客户端,加入到ActiveMQ.Agent聊天室后,就可以同broker进行交谈了。通过在XMPP客户端中键入help,可以得到帮助信息。
需要注意的是,ActiveMQ5.0版本有个小bug,如果broker没有采用缺省的用户名和密码,那么Command Agent便无法正常启动。Apache官方文档说,此bug已经被修正,预定在5.2.0版本上体现。修改方式如下:

1
<commandAgent xmlns="http://activemq.org/config/1.0" brokerUser="user" brokerPassword="passward"/>

Visualization plugin

ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer来查看),以图表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:

1
2
3
4
5
6
7
<broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" useJmx="true">
...
<plugins>
<connectionDotFilePlugin file="connection.dot"/>
<destinationDotFilePlugin file="destination.dot"/>
</plugins>
</broker>

需要注意的是,笔者认为ActiveMQ5.0版本的Visualization Plugin尚不稳定,存在诸多问题。例如:如果使用connectionDotFilePlugin,那么brokerName必须是localhost;如果使用destinationDotFilePlugin可能会导致ArrayStoreException

Transport

ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。

VM Transport

VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。

1
vm://brokerName?transportOptions

例如:vm://broker1?marshal=false&broker.persistent=false
Transport Options的可选值如下:

Option Name Default Value Description
Marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
wireFormat default The name of the WireFormat to use

wireFormat. All the properties with this prefix are used to configure the wireFormat |
| create | true | If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1 |
| broker.
| All | the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information |

以下是高级配置语法:

1
2
vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions
vm:broker:(tcp://localhost)?brokerOptions

例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

使用配置文件的配置语法:
vm://localhost?brokerConfig=xbean:activemq.xml
例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml

使用Spring的配置:

1
2
3
4
5
6
7
8
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
<property name="start" value="true" />
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
<property name="brokerURL" value="vm://localhost"/>
</bean>

如果persistenttrue,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]:javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。

TCP Transport

TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:

1
tcp://hostname:port?transportOptions

Transport Options的可选值如下:

Option Name Default Value Description
minmumWireFormatVersion 0 The minimum version wireformat that is allowed
trace false Causes all commands that are sent over the transport to be logged
useLocalHost true When true, it causes the local machines name to resolve to “localhost”.
socketBufferSize 64 * 1024 Sets the socket buffer size in bytes

soTimeout 0 sets the socket timeout in milliseconds |
| connectionTimeout | 30000 | A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored. |
| wireFormat | default | The name of the WireFormat to use wireFormat.* All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information |

例如:tcp://localhost:61616?trace=false

Failover Transport

Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:

1
2
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN

Transport Options的可选值如下:

Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
backup false initialize and hold a second transport connection - to enable fast failover

例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

Discovery transport

Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:

1
2
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI

Transport Options的可选值如下:

Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

例如:discovery:(multicast://default)?initialReconnectDelay=100
为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:

1
2
3
4
5
6
<broker name="foo">
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</transportConnectors>
...
</broker>

在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个 broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker 的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(ActiveMQConnection)connection).addTransportListener(new TransportListener() {
public void onCommand(Object cmd) {
}
public void onException(IOException exp) {
}
public void transportInterupted() {
// The transport has suffered an interruption from which it hopes to recover.
}
public void transportResumed() {
// The transport has resumed after an interruption.
}
});

Persistence

AMQ Message Store

AMQ Message Store是ActiveMQ5.0缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子:

1
2
3
4
5
<broker brokerName="broker" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
</persistenceAdapter>
</broker>

Property name Default value Comments
directory activemq-data the path to the directory to use to store the message store data and log files
useNIO true use NIO to write messages to the data logs
syncOnWrite false sync every write to disk
maxFileLength 32mb a hint to set the maximum size of the message data logs
persistentIndex true use a persistent index for the message logs. If this is false, an in-memory structure is maintained
maxCheckpointMessageAddSize 4kb the maximum number of messages to keep in a transaction before automatically committing
cleanupInterval 30000 time (ms) before checking for a discarding/moving message data logs that are no longer used
indexBinSize 1024 default number of bins used by the index. The bigger the bin size - the better the relative performance of the index
indexKeySize 96 the size of the index key - the key is the message id

indexPageSize 16kb the size of the index page - the bigger the page - the better the write performance of the index |
| directoryArchive | archive | the path to the directory to use to store discarded data logs |
| archiveDataLogs | false | if true data logs are moved to the archive directory instead of being deleted |

Kaha Persistence

Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。以下是其配置的一个例子:

1
2
3
4
5
<broker brokerName="broker" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<kahaPersistenceAdapter directory="activemq-data" maxDataFileLength="33554432"/>
</persistenceAdapter>
</broker>

JDBC Persistence

目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。
如果你使用的数据库不被支持,那么可以调整StatementProvider来保证使用正确的SQL方言(flavour of SQL)。通常绝大多数数据库支持以下adaptor:
• org.activemq.store.jdbc.adapter.BlobJDBCAdapter
• org.activemq.store.jdbc.adapter.BytesJDBCAdapter
• org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
• org.activemq.store.jdbc.adapter.ImageJDBCAdapter

也可以在配置文件中直接指定JDBC adaptor,例如:

1
<jdbcPersistenceAdapter adapterClass="org.apache.activemq.store.jdbc.adapter.ImageBasedJDBCAdaptor"/>

以下是其配置的一个例子:

<persistence> 
     <jdbcPersistence dataSourceRef=" mysql-ds"/> 
</persistence> 

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 
     <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
     <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
     <property name="username" value="activemq"/> 
     <property name="password" value="activemq"/> 
     <property name="poolPreparedStatements" value="true"/> 
</bean> 

需要注意的是,如果使用MySQL,那么需要设置relaxAutoCommit标志为true

Disable Persistence

以下是其配置的一个例子:

1
<broker persistent="false"></broker>

Security

ActiveMQ支持可插拔的安全机制,用以在不同的provider之间切换。

Simple Authentication Plugin

Simple Authentication Plugin适用于简单的认证需求,或者用于建立测试环境。它允许在XML配置文件中指定用户、用户组和密码等信息。以下是ActiveMQ配置的一个例子:

1
2
3
4
5
6
7
8
9
10
<plugins>
...
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="users,admins"/>
<authenticationUser username="user" password="password" groups="users"/>
<authenticationUser username="guest" password="password" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

JAAS Authentication Plugin

JAAS Authentication Plugin依赖标准的JAAS机制来实现认证。通常情况下,你需要通过设置java.security.auth.login.config系统属性来配置login modules的配置文件。如果没有指定这个系统属性,那么JAAS Authentication Plugin会缺省使用login.config作为文件名。以下是一个login.config文件的例子:

1
2
3
activemq-domain {
org.apache.activemq.jaas.PropertiesLoginModule required debug=true org.apache.activemq.jaas.properties.user="users.properties" org.apache.activemq.jaas.properties.group="groups.properties";
};

这个login.config文件中设置了两个属性:org.apache.activemq.jaas.properties.userorg.apache.activemq.jaas.properties.group分别用来指向user.propertiesgroup.properties文件。需要注意的是,PropertiesLoginModule使用本地文件的查找方式,而且查找时采用的base directory是login.config文件所在的目录。因此这个login.config说明user.propertiesgroup.properties文件存放在跟login.config文件相同的目录里。
以下是ActiveMQ配置的一个例子:

<plugins> 
        ... 
        <jaasAuthenticationPlugin configuration="activemq-domain" /> 
</plugins> 

基于以上的配置,在JAAS的LoginContext中会使用activemq-domain中配置的PropertiesLoginModule来进行登陆。
ActiveMQ JAAS还支持LDAPLoginModuleCertificateLoginModuleTextFileCertificateLoginModule等login module。

Custom Authentication Implementation

可以通过编码的方式为ActiveMQ增加认证功能。例如编写一个类继承自XBeanBrokerService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.yourpackage;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.SimpleAuthenticationBroker;
import org.apache.activemq.xbean.XBeanBrokerService;
public class SimpleAuthBroker extends XBeanBrokerService {
//
private String user;
private String password;
@SuppressWarnings("unchecked")
protected Broker addInterceptors(Broker broker) throws Exception {
broker = super.addInterceptors(broker);
Map passwords = new HashMap();
passwords.put(getUser(), getPassword());
broker = new SimpleAuthenticationBroker(broker, passwords, new HashMap());
return broker;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}

以下是ActiveMQ配置文件的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
<beans>
<auth:SimpleAuthBroker
xmlns:auth="java://com.yourpackage"
xmlns="http://activemq.org/config/1.0" brokerName="SimpleAuthBroker1" user="user" password="password" useJmx="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</auth:SimpleAuthBroker>
</beans>

在这个配置文件中增加了一个namespace auth,用于指向之前编写的哪个类。同时为SimpleAuthBroker注入了两个属性值userpassword,因此在被SimpleAuthBroker改写的addInterceptors方法里,可以使用这两个属性进行认证了。ActiveMQ提供的SimpleAuthenticationBroker类继承自BrokerFilter可以简单的看成是Broker的Adaptor),它的构造函数中的两个Map分别是userPasswordsuserGroups
SimpleAuthenticationBrokeraddConnection方法中使用userPasswords进行认证,同时会把userGroups的信息保存到ConnectionContext中。

Authorization Plugin

可以通过Authorization Plugin为认证后的用户授权,以下ActiveMQ配置文件的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<plugins>
<jaasAuthenticationPlugin configuration="activemq-domain"/>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>

Clustering

ActiveMQ从多种不同的方面提供了集群的支持。

Queue consumer clusters

ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认 (unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
需要注意的是,笔者发现AcitveMQ5.0版本的Queue consumer clusters存在一个bug:采用AMQ Message Store,运行一个producer,两个consumer,并采用如下的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
<beans>
<broker xmlns="http://activemq.org/config/1.0" brokerName="BugBroker1" useJmx="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<persistenceAdapter>
<amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
</persistenceAdapter>
</broker>
</beans>

那么经过一段时间后可能会报出如下错误:

1
ERROR [ActiveMQ Transport: tcp:///127.0.0.1:1843 - RecoveryListenerAdapter.java:58 - RecoveryListenerAdapter] Message id ID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from the data store!

Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。

Broker clusters

一个常见的场景是有多个JMS broker,一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover:// 协议来实现这个功能。ActiveMQ3.x版本的reliable://协议已经变更为failover://
如果某个网络上有多个brokers而且客户使用静态发现(使用Static Transport或Failover Transport)或动态发现(使用Discovery Transport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,stand alone brokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Network of brokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。
从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Network of brokers,一种是使用static transport:

1
2
3
4
5
6
7
8
9
10
<broker brokerName="receiver" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
</networkConnectors>
</broker>

另外一种是使用multicast discovery,如下:

1
2
3
4
5
6
7
8
9
<broker name="sender" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
...
</broker>

Network Connector有以下属性:

Property Default Value Description
name bridge name of the network - for more than one network connector between the same two brokers - use different names
dynamicOnly false if true, only forward messages if a consumer is active on the connected broker
decreaseNetworkConsumerPriority false decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer
networkTTL 1 the number of brokers in the network that messages and subscriptions can pass through
conduitSubscriptions true multiple consumers subscribing to the same destination are treated as one consumer by the network
excludedDestinations empty destinations matching this list won’t be forwarded across the network
dynamicallyIncludedDestinations empty destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded
staticallyIncludedDestinations empty destinations that match will always be passed across the network - even if no consumers have ever registered an interest
duplex false if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc.

关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用 forwarding bridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST。有两个consumers连接到brokerB,也是订 阅queue:Q.TEST。这三个consumers有相同的优先级。然后启动一个producer。

快下载安装吧,今天头条送你钱啦!!!!
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最快的浏览器
中国人都在使用的地球上最厉害的安全软件
中国人都在使用的地球上最好的看图王
中国人都在使用的地球上最快速的视频软件
中国人都在使用的地球上最全的视频软件
中国人都在使用的地球上最好最全的压缩软件
中国人都在使用的地球上最好的音乐播放器
中国人都在使用的地球上最安全的杀毒软件
中国人都在使用的地球上最全的影视大全