只要注册ofo就送你10块钱,还等什么,快来注册吧
Running Broker
直接运行bin/activemq脚本可以启动一个broker。
此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子:
activemq
Runs a broker using the default ‘xbean:activemq.xml
‘ as the broker configuration file.activemq xbean:myconfig.xml
Runs a broker using the filemyconfig.xml
as the broker configuration file that is located in the classpath.activemq xbean:file:./conf/broker1.xml
Runs a broker using the filebroker1.xml
as the broker configuration file that is located in the relative file path./conf/broker1.xml
activemq xbean:file:C:/ActiveMQ/conf/broker2.xml
Runs a broker using the filebroker2.xml
as the broker configuration file that is located in the absolute file pathC:/ActiveMQ/conf/broker2.xml
activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true
Runs a broker with two transport connectors and JMX enabled.activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false
Runs a broker with 1 transport connector and 1 network connector with persistence disabled.
Embedded Broker
可以通过在应用程序中以编码的方式启动broker,例如:
|
|
如果需要启动多个broker,那么需要为broker设置一个名字。例如:
|
|
如果希望在同一个JVM内访问这个broker,那么可以使用VM Transport,URI是:vm://brokerName
。
可以通过BrokerFactory
来创建broker,例如:
|
|
someURI
的可选值如下:
类型 | 示例 | 描述 |
---|---|---|
xbean: |
xbean:activemq.xml |
Searches the classpath for an XML document with the given URI (activemq.xml in this case) which will then be used as the Xml Configuration |
file: |
file:foo/bar/activemq.xml |
Loads the given file (in this example foo/bar/activemq.xml ) as the Xml Configuration |
broker: |
broker:tcp://localhost:61616 |
Uses the Broker Configuration URI to configure the broker |
当使用XBean
的配置方式的时候,需要指定一个xml配置文件,例如:
使用Spring的配置方式如下:
Monitoring Broker
JMX
在使用JMX监控broker之前,首先要启用broker的JMX监控功能,例如在配置文件中设置useJmx="true"
,如下:
|
|
接下来运行JDK自带的jconsole。在运行了jconsole后,它会弹出对话框来选择需要连接到的agent
。如果是在启动broker的主机上运行jconsole,那么ActiveMQ broker会出现在jconsole的Local标签中。如果要连接到远程的broker,那么可以在Advanced标签中指定JMX URL,以下是一个连接到本机的JMX URL:service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
在jconsole的MBeans标签中,可以查看详细信息,也可以执行相应的operation。需要注意的是,在jconsole连接到broker的时候,并不需要输入用户名和密码,如果这存在潜在的安全问题,那么就需要为JMX Connector配置密码保护(需要使用1.5以上版本的JDK)。
首先要禁止ActiveMQ创建自己的connector,例如:
|
|
然后在ActiveMQ的conf
目录下创建一个访问控制文件和密码文件,如下:
|
|
|
|
然后修改ActiveMQ的bin目录下activemq的启动脚本,查找包含”SUNJMX=
“的一行如下:
|
|
把它替换成
|
|
最后重启ActiveMQ和jconsole,这时候需要强制login。如果在启动activemq的过程中出现以下错误,那么需要为这个文件增加访问控制。
Web Console
Web Console被集成到了ActiveMQ的二进制发布包中,因此缺省访问http://localhost:8161/admin
即可访问Web Console。
在配置文件中,可以通过修改nioConnector
的port
属性来修改Web console的缺省端口:
|
|
出于安全性或者可靠性的考虑,Web Console可以被部署到不同于ActiveMQ的进程中。例如把activemq-web-console.war
部署到一个单独的web容器中(Tomcat,Jetty等)。在ActiveMQ5.0的二进制发布包中不包含activemq-web-console.war
,因此需要下载 ActiveMQ的源码,然后进入到${activemq.base}/src/activemq-web-console
目录中执行mvn instanll
。如果一切正常,那么缺省会在${activemq.base}/src/activemq-web-console/target
目录中生成activemq-web-console-5.0.0.war
。然后将activemq-web-console-5.0.0.war
拷贝到Tomcat的webapps
目录中,并重命名成activemq-web-console.war
。
需要注意的是,要将activemq-all-5.0.0.jar
拷贝到WEB-INF\lib
目录中(可能还需要拷贝jms.jar
)。还要为Tomcat设置以下五个系统属性(修改catalina.bat
文件):
如果JMX没有配置密码保护,那么webconsole.jmx.role
和webconsole.jmx.password
设置成""
即可。如果broker被配置成了Master/Slave模式,那么可以配置成使用failover transport,例如:
-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)
顺便说一下,由于webconsole.type
属性是properties
,因此实际上起作用的Web Console的配置文件是WEB-INF/webconsole-properties.xml
。最后启动被监控的ActiveMQ,访问http://localhost:8080 /activemq-web-console/
,查看显示是否正常。
Advisory Message
ActiveMQ支持Advisory Messages,它允许你通过标准的JMS消息来监控系统。目前的Advisory Messages支持:
• consumers, producers and connections starting and stopping
• temporary destinations being created and destroyed
• messages expiring on topics and queues
• brokers sending messages to destinations with no consumers.
• connections starting and stopping
Advisory Messages可以被想象成某种的管理通道,通过它你可以得到关于JMS provider、producers、consumers和destinations的信息。Advisory topics都使用ActiveMQ.Advisory.
这个前缀,以下是目前支持的topics:
Client based advisories
Advisory Topics Description
ActiveMQ.Advisory.Connection Connection start & stop messages
ActiveMQ.Advisory.Producer.Queue Producer start & stop messages on a Queue
ActiveMQ.Advisory.Producer.Topic Producer start & stop messages on a Topic
ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue
ActiveMQ.Advisory.Consumer.Topic Consumer start & stop messages on a Topic
在消费者启动/停止的Advisory Messages的消息头中有个consumerCount
属性,它用来指明目前desination上活跃的consumer的数量。
Destination and Message based advisories
Advisory Topics Description
ActiveMQ.Advisory.Queue Queue create & destroy
ActiveMQ.Advisory.Topic Topic create & destroy
ActiveMQ.Advisory.TempQueue Temporary Queue create & destroy
ActiveMQ.Advisory.TempTopic Temporary Topic create & destroy
ActiveMQ.Advisory.Expired.Queue Expired messages on a Queue
ActiveMQ.Advisory.Expired.Topic Expired messages on a Topic
ActiveMQ.Advisory.NoConsumer.Queue No consumer is available to process messages being sent on a Queue
ActiveMQ.Advisory.NoConsumer.Topic No consumer is available to process messages being sent on a Topic
以上的这些destnations都可以用来作为前缀,在其后面追加其它的重要信息,例如topic、queue、clientID、 producderID和consumerID等。这令你可以利用Wildcards和Selectors来过滤Advisory Messages。
例如,如果你希望订阅FOO.BAR
这个queue上Consumer的start/stop的消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.FOO.BAR
;如果希望订阅所有queue上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.Queue.
;如果希望订阅所有queue或者topic上的start/stop消息,那么可以订阅ActiveMQ.Advisory.Consumer.
。org.apache.activemq.advisory.AdvisorySupport
类上有如下的helper methods,用来在程序中得到advisory destination objects。
以下是段使用Advisory Messages的程序代码:
Command Agent
在介绍Command Agent前首先简要介绍一下XMPP(Jabber)协议,XMPP是一种基于XML的即时通信协议,它由Jabber软件基金会开发。在配置文件中通过增加transportConnector
来支持XMPP协议:
ActiveMQ提供了ActiveMQ messages和XMPP之间的双向桥接:
• 如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMS topic。
• 尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic。
• 呆在一个聊天室中意味着这将保持一个对相应JMS topic的订阅。因此发送到这个topic的JMS消息也会被发送到聊天室。
从4.2版本起,ActiveMQ支持Command Agent
。在配置文件中,通过设置commandAgent
来启用Command Agent:
|
|
启用了Command Agent的broker上会有一个来自Command Agent的连接,它同时订阅topic:ActiveMQ.Agent
。在你启动XMPP客户端,加入到ActiveMQ.Agent
聊天室后,就可以同broker进行交谈了。通过在XMPP客户端中键入help,可以得到帮助信息。
需要注意的是,ActiveMQ5.0版本有个小bug,如果broker没有采用缺省的用户名和密码,那么Command Agent便无法正常启动。Apache官方文档说,此bug已经被修正,预定在5.2.0版本上体现。修改方式如下:
Visualization plugin
ActiveMQ支持以broker插件的形式生成DOT文件(可以用agrviewer来查看),以图表的方式描述connections、sessions、producers、consumers、destinations等信息。配置方式如下:
需要注意的是,笔者认为ActiveMQ5.0版本的Visualization Plugin尚不稳定,存在诸多问题。例如:如果使用connectionDotFilePlugin
,那么brokerName
必须是localhost
;如果使用destinationDotFilePlugin
可能会导致ArrayStoreException
。
Transport
ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。
VM Transport
VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
|
|
例如:vm://broker1?marshal=false&broker.persistent=false
Transport Options的可选值如下:
Option Name | Default Value | Description |
---|---|---|
Marshal | false | If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat |
wireFormat | default | The name of the WireFormat to use |
wireFormat. All the properties with this prefix are used to configure the wireFormat |
| create | true | If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1 |
| broker. | All | the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information |
以下是高级配置语法:
例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
使用配置文件的配置语法:vm://localhost?brokerConfig=xbean:activemq.xml
例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml
使用Spring的配置:
如果persistent
是true
,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data
的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]:javax.naming.NameAlreadyBoundException…
可以通过在transportOptions
中追加broker.useJmx=false
来禁用JMX来避免这个警告。
TCP Transport
TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
Transport Options的可选值如下:
Option Name | Default Value | Description |
---|---|---|
minmumWireFormatVersion | 0 | The minimum version wireformat that is allowed |
trace | false | Causes all commands that are sent over the transport to be logged |
useLocalHost | true | When true, it causes the local machines name to resolve to “localhost”. |
socketBufferSize | 64 * 1024 | Sets the socket buffer size in bytes |
soTimeout 0 sets the socket timeout in milliseconds |
| connectionTimeout | 30000 | A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored. |
| wireFormat | default | The name of the WireFormat to use wireFormat.* All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information |
例如:tcp://localhost:61616?trace=false
Failover Transport
Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:
Transport Options的可选值如下:
Option Name | Default Value | Description |
---|---|---|
initialReconnectDelay | 10 | How long to wait before the first reconnect attempt (in ms) |
maxReconnectDelay | 30000 | The maximum amount of time we ever wait between reconnect attempts (in ms) |
useExponentialBackOff | true | Should an exponential backoff be used between reconnect attempts |
backOffMultiplier | 2 | The exponent used in the exponential backoff attempts |
maxReconnectAttempts | 0 | If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
randomize | true | use a random algorithm to choose the URI to use for reconnect from the list provided |
backup | false | initialize and hold a second transport connection - to enable fast failover |
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:
Transport Options的可选值如下:
Option Name | Default Value | Description |
---|---|---|
initialReconnectDelay | 10 | How long to wait before the first reconnect attempt |
maxReconnectDelay | 30000 | The maximum amount of time we ever wait between reconnect attempts |
useExponentialBackOff | true | Should an exponential backoff be used btween reconnect attempts |
backOffMultiplier | 2 | The exponent used in the exponential backoff attempts |
maxReconnectAttempts | 0 | If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client |
例如:discovery:(multicast://default)?initialReconnectDelay=100
为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:
在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个 broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker 的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
在transport重连的时候,可以在connection上注册TransportListener
来获得回调,例如:
Persistence
AMQ Message Store
AMQ Message Store是ActiveMQ5.0缺省的持久化存储。Message commands被保存到transactional journal(由rolling data logs组成)。Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子:
Property name | Default value | Comments |
---|---|---|
directory | activemq-data | the path to the directory to use to store the message store data and log files |
useNIO | true | use NIO to write messages to the data logs |
syncOnWrite | false | sync every write to disk |
maxFileLength | 32mb | a hint to set the maximum size of the message data logs |
persistentIndex | true | use a persistent index for the message logs. If this is false, an in-memory structure is maintained |
maxCheckpointMessageAddSize | 4kb | the maximum number of messages to keep in a transaction before automatically committing |
cleanupInterval | 30000 | time (ms) before checking for a discarding/moving message data logs that are no longer used |
indexBinSize | 1024 | default number of bins used by the index. The bigger the bin size - the better the relative performance of the index |
indexKeySize | 96 | the size of the index key - the key is the message id |
indexPageSize 16kb the size of the index page - the bigger the page - the better the write performance of the index |
| directoryArchive | archive | the path to the directory to use to store discarded data logs |
| archiveDataLogs | false | if true data logs are moved to the archive directory instead of being deleted |
Kaha Persistence
Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。以下是其配置的一个例子:
JDBC Persistence
目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer, Sybase。
如果你使用的数据库不被支持,那么可以调整StatementProvider
来保证使用正确的SQL方言(flavour of SQL)。通常绝大多数数据库支持以下adaptor:
• org.activemq.store.jdbc.adapter.BlobJDBCAdapter
• org.activemq.store.jdbc.adapter.BytesJDBCAdapter
• org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
• org.activemq.store.jdbc.adapter.ImageJDBCAdapter
也可以在配置文件中直接指定JDBC adaptor,例如:
以下是其配置的一个例子:
<persistence>
<jdbcPersistence dataSourceRef=" mysql-ds"/>
</persistence>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
需要注意的是,如果使用MySQL,那么需要设置relaxAutoCommit
标志为true
。
Disable Persistence
以下是其配置的一个例子:
|
|
Security
ActiveMQ支持可插拔的安全机制,用以在不同的provider之间切换。
Simple Authentication Plugin
Simple Authentication Plugin适用于简单的认证需求,或者用于建立测试环境。它允许在XML配置文件中指定用户、用户组和密码等信息。以下是ActiveMQ配置的一个例子:
|
|
JAAS Authentication Plugin
JAAS Authentication Plugin依赖标准的JAAS机制来实现认证。通常情况下,你需要通过设置java.security.auth.login.config
系统属性来配置login modules的配置文件。如果没有指定这个系统属性,那么JAAS Authentication Plugin会缺省使用login.config
作为文件名。以下是一个login.config
文件的例子:
这个login.config
文件中设置了两个属性:org.apache.activemq.jaas.properties.user
和org.apache.activemq.jaas.properties.group
分别用来指向user.properties
和group.properties
文件。需要注意的是,PropertiesLoginModule
使用本地文件的查找方式,而且查找时采用的base directory是login.config
文件所在的目录。因此这个login.config
说明user.properties
和group.properties
文件存放在跟login.config
文件相同的目录里。
以下是ActiveMQ配置的一个例子:
<plugins>
...
<jaasAuthenticationPlugin configuration="activemq-domain" />
</plugins>
基于以上的配置,在JAAS的LoginContext
中会使用activemq-domain
中配置的PropertiesLoginModule
来进行登陆。
ActiveMQ JAAS还支持LDAPLoginModule
、CertificateLoginModule
、TextFileCertificateLoginModule
等login module。
Custom Authentication Implementation
可以通过编码的方式为ActiveMQ增加认证功能。例如编写一个类继承自XBeanBrokerService
。
|
|
以下是ActiveMQ配置文件的一个例子:
|
|
在这个配置文件中增加了一个namespace auth,用于指向之前编写的哪个类。同时为SimpleAuthBroker
注入了两个属性值user
和password
,因此在被SimpleAuthBroker
改写的addInterceptors
方法里,可以使用这两个属性进行认证了。ActiveMQ提供的SimpleAuthenticationBroker
类继承自BrokerFilter
可以简单的看成是Broker的Adaptor),它的构造函数中的两个Map
分别是userPasswords
和userGroups
。SimpleAuthenticationBroker
在addConnection
方法中使用userPasswords
进行认证,同时会把userGroups
的信息保存到ConnectionContext
中。
Authorization Plugin
可以通过Authorization Plugin为认证后的用户授权,以下ActiveMQ配置文件的一个例子:
Clustering
ActiveMQ从多种不同的方面提供了集群的支持。
Queue consumer clusters
ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,那么所有未被确认 (unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
需要注意的是,笔者发现AcitveMQ5.0版本的Queue consumer clusters存在一个bug:采用AMQ Message Store,运行一个producer,两个consumer,并采用如下的配置文件:
那么经过一段时间后可能会报出如下错误:
Apache官方文档说,此bug已经被修正,预定在5.1.0版本上体现。
Broker clusters
一个常见的场景是有多个JMS broker,一个客户连接到其中一个broker。如果这个broker失效,那么客户会自动重新连接到其它的broker。在ActiveMQ中使用failover://
协议来实现这个功能。ActiveMQ3.x版本的reliable://
协议已经变更为failover://
。
如果某个网络上有多个brokers而且客户使用静态发现(使用Static Transport或Failover Transport)或动态发现(使用Discovery Transport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。然而,stand alone brokers并不了解其它brokers上的consumers,也就是说如果某个broker上没有consumers,那么这个broker上的消息可能会因得不到处理而积压起来。目前的解决方案是使用Network of brokers,以便在broker之间存储转发消息。ActiveMQ在未来会有更好的特性,用来在客户端处理这个问题。
从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。有两种方式配置Network of brokers,一种是使用static transport:
另外一种是使用multicast discovery,如下:
Network Connector有以下属性:
Property | Default Value | Description |
---|---|---|
name | bridge | name of the network - for more than one network connector between the same two brokers - use different names |
dynamicOnly | false | if true, only forward messages if a consumer is active on the connected broker |
decreaseNetworkConsumerPriority | false | decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer |
networkTTL | 1 | the number of brokers in the network that messages and subscriptions can pass through |
conduitSubscriptions | true | multiple consumers subscribing to the same destination are treated as one consumer by the network |
excludedDestinations | empty | destinations matching this list won’t be forwarded across the network |
dynamicallyIncludedDestinations | empty | destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded |
staticallyIncludedDestinations | empty | destinations that match will always be passed across the network - even if no consumers have ever registered an interest |
duplex | false | if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc. |
关于conduitSubscriptions属性,这里稍稍说明一下。设想有两个brokers,分别是brokerA和brokerB,它们之间用 forwarding bridge连接。有一个consumer连接到brokerA并订阅queue:Q.TEST
。有两个consumers连接到brokerB,也是订 阅queue:Q.TEST
。这三个consumers有相同的优先级。然后启动一个producer。
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最好玩的游戏
中国人都在使用的地球上最快的浏览器
中国人都在使用的地球上最厉害的安全软件
中国人都在使用的地球上最好的看图王
中国人都在使用的地球上最快速的视频软件
中国人都在使用的地球上最全的视频软件
中国人都在使用的地球上最好最全的压缩软件
中国人都在使用的地球上最好的音乐播放器
中国人都在使用的地球上最安全的杀毒软件
中国人都在使用的地球上最全的影视大全