@boothsun
2017-09-21T10:28:03.000000Z
字数 9312
阅读 2733
消息队列
当你想把同一个消息一次发送到多个消息队列,那么可以在客户端使用组合队列(将多个消息destination采用英文逗号分隔)。
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
producer.send(queue, someMessage);
如果你想发送到不同类型的destination,那么需要加上前缀queue://或topic://
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
还有一种更透明的方式是在broker端配置重订向的复合目的地,使客户端发送到单个目的地的消息将被透明地复制到多个物理目的地。
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<queue physicalName="FOO" />
<topic physicalName="BAR" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
你还可以在转发前,先通过JMS Selector判断一个消息是否需要被转发,例如:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="FOO"/>
<filteredDestination selector="i = 5" topic="BAR"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
虽然ActiveMQ可以在使用的时候自动创建队列,但有些情况下配置启动时创建还是很有必要的:例如配置了安全设置以后,使用队列的用户没有创建的权限。
此时只需要在配置文件的broker节点添加需要启动时创建的队列即可:
<broker xmlns="http://activemq.apache.org/schema/core">
<destinations>
<queue physicalName="FOO.BAR" />
<topic physicalName="SOME.TOPIC" />
</destinations>
</broker>
在ActiveMQ的destionations不再使用之后,可以通过web控制台或者JMX方式来删除掉;当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收响应的资源。
example:
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
运行log:
destination选项是给consumer在JMS规范之后添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。
下面是我个人的翻译:
从ActiveMQ 4.x起,Broker可以在Connection URI、Connection And ConnectionFactory级别上配置同步或者异步分发消息给消费者,而之前只能在transport Server上配置。这样的好处是对每个消费者变得更可定制化。
在Consumer级别上配置消息异步分发,可以让在异步发送消息给慢消费者的同时,再同步发送消息给消费能力快的消费者(这样做的好处是可以降低SEDA队列的同步和上下文切换成本; SEDA: 详见Wiki 、Staged Event-Driven Architecture) 使用同步消息传输的缺点是如果消费者存在慢消费者,生产者可以出现消息堆积造成的阻塞。
默认配置(dispatchAsync=true)已经是最佳性能的了。如果你想更好的处理慢消费者问题,你可以使用Consumer级别上的此项配置。如果你想要更好的吞吐量并且出现慢消费的可能性比较低,你就可以改成dispatchAsync=false 。
在ConnectionFactory级别上
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
在Connection级别上配置消费异步分发
在Connection级别上的配置将会覆盖ConnectionFactory级别上的配置。
在Destination URI级别上的配置
此处的配置将会覆盖Connection和ConnectionFactory级别上的配置。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
在Transport Connector级别上禁用异步分发消息
如果此处设置禁用异步分发消息,则单个Clients的开启设置都将会失效。
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>
预获取消息数量,重要的调优参数之一,当Consumer活跃时,broker将会批量发送prefetchSize条消息给Consumer,consumer也可以配合optimizeAcknowledge来批量确认它们;批量传送,极大的提高了网络传输效率,此值默认为1000。通过上述,我们对broker消息转发机制的了解,可以知道,broker端将会根据consumer指定的prefetchSize来决定pendingBuffer的大小,prefetchSize越大,broker批量发送的消息就回越多,如果消费者消费速度较快,再配合optimizeAck,这将是相对完美的消息传送方案。
不过,prefetchSize也会带来一定的问题,在Queue中(Topic中没有效果),broker将使用“轮询”的方式来平衡多个消费者之间的消息传送数量。如果消费者消费速度较慢,而且prefetchSize较大,这将不利于消息量在多个消费者之间平衡。通常情况下,如果consumer数量较多,或者消费速度较慢,或者消息量较少时,我们设定prefetchSize为较小的值。
//在destinationUri中指定,默认为1000
//也可以在Topic中使用,可以优化ACK策略
orderQueue?customer.prefetchSize=100
对于Consumer而言,支持同步消费(receive)和异步侦听(messageListener),这两种方式都经常用。无论何种情况,prefetchSize都可以提供有效的优化。需要注意,如果开发者使用messageListener方式异步侦听消息,将不能设定prefetchSize <= 0的任何值。如果使用receive方式,且prefetchSize = 0时,将触发Client端使用Pull机制,即每次receive调用,都会向Broker端发送Pull指令,如果broker端有消息才会转发,在这种情况下,Broker不会主动Push消息给client。
消费者权重,这个权重和消息的权重不同,它用来标记Consumer消费消息的优先级,broker端将会对优先级较高的consumer,优先转发消息(优先填充pending buffer),比如Consumer1的权重为10,Consumer2的权重为5,它们的prefetch(预获取消息的buffer尺寸)都是10,那么当Broker端有12条消息,将会优先将Consumer1的buffer填充完毕(获取10条消息),Consumer2将会获得2条消息。我们通常可以对网络良好、业务简单(比如selector更加简单)的Consumer设定较高的权重。参见ConsumerInfo类。
//在destinationUril中设定,默认所有的consumer权重都一样,为0
orderQueue?consumer.priority=10
不过对于Topic而言,当指定了consumer的priority之后,还有一个可选的转发策略,用来优化消息传送优先级,理论上所有的Subscripter(订阅者)都会收到相同的消息,但是我们在broker端的转发时机上,让优先级较高的订阅者先得到消息,尽管这看起来似乎没有什么意义:
//对Queue无效
<policyEntry topic=">">
<dispatchPolicy>
<priorityDispatchPolicy />
</dispatchPolicy>
</policyEntry>
"可追溯"消费者,只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,订阅者不可能获取实例创建之前的消息,因为broker根本不知道它的存在。对于broker而言,如果一个Topic通道创建,且有发布者发布消息(Publisher),那么broker将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是durable(耐久的),它可以获取最近发布的一些消息;如果是durable,它可以获取存储器中尚未删除的所有的旧消息。
//在destinationUrl中设置,默认为false
feedTopic?consumer.retroactive=true
在broker端,可以配置当前Topic默认为“可追溯的”,不过Topic并不会在此种情况下额外的保存消息,只不过表示订阅者默认都是可追溯的而已。
<!-- 只对topic有效,默认为false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
在Consumer中使用选择器可以帮助broker过滤消息,就像我们使用在RDBMS中使用where字句过滤数据一样,具体selector的写法,此处不做详细介绍。当Consumer创建之后,将会把selector信息传递给broker,此后在consumer的整个生命周期中,都将有效,不过一旦指定selector,重复设定selector将不会有效,除非关闭consumer并重建实例。
在Topic和Queue使用selector时机,有所不同。
对于Queue而言,broker在dispatch每条消息时,都会遍历整个消费者列表,并匹配selector表达式,一旦匹配成功则将消息发送给Consumer,如果所有的selector都无法匹配,消息将沉积。注意,这些沉积的消息将会在每次pageIn时都会被加载而且也会在内存中不断叠加(直到过期),将会对Queue的转发效率带来很大的危险,如果你发现Queue的消息大量积压(undeque),你应该检测是否与selector有关。如果你使用了selector,你一定要让全局中所有的selector覆盖所有的消息,或者至少有一个没有selector的consumer。
对于Topic而言,似乎有些不同,主要是在selector应用的时机上;如果订阅者是durable(持久化的),那么订阅者的ClientId + selector信息都会被持久化保存(参见TopicMessageStore),以后即使订阅者离线,符合selector的消息,仍然会为它创建消息副本(为当前订阅者ID与messageId的列表关系,但实际消息只有一条),如果selector不匹配,存储器将不会为此订阅者创建消息副本,也意味者当此订阅者上线将不会感知到任何事情,就像那些消息从来都没与出现过一样;不过在broker转发消息给订阅者之前,仍然会使用selector匹配;因为订阅者可以修改selector,为了避免现有的消息副本不适合新的selector,将会在消息发送发送给订阅者也使用selector匹配,对于不匹配selector的消息,此时将会按照“已消费”来处理,它们也不会发送给订阅者。此处需要提醒,创建持久化类型的订阅者时,需要在connection2上指定ClientId,全局中同一个Topic上所有的ClientId都不能相同,且同一个Connection上,不能创建多个持久化订阅者。
如果订阅者是非持久化(即为temporary),也就不存在创建消息副本的情况,那么将会在消息转发给订阅者之前检测即可。
(源码参见:Topic,DurableTopicSubscription,TopicMesssageStore,RoundRobinDispatchPolicy等)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations来建立一个virtual queue来把消息转发到多个queues中。但是,为系统中每个queue都进行如此的配置可能会很麻烦。
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:
Xml代码:
<broker xmlns="http://activemq.org/config/1.0" brokerName="MirroredQueuesBroker1" useMirroredQueues="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationInterceptors>
<mirroredQueue copyMessage = "true" prefix="Mirror.Topic"/>
</destinationInterceptors>
</broker>
假如某个producer向名为Foo.Bar的queue中发送消息,那么你可以通过订阅名为Mirror.Topic.Foo.Bar的topic来获得发送到Foo.Bar中的所有消息。
ActiveMQ中,topic只有在持久化订阅下是持久化的。存在持久订阅时。每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1. 同一个应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅者不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要消费者分组,这个功能jms规范本身是没有的。
2. 同一个应用内consumer端failover的问题:由于只能使用单个的持久化订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
个人理解:
1. 集群环境下,对于同一个Topic,一个集群多台机器也相当于多个订阅者,每台机器都会全量获取到广播出来的消息,多台机器之间可能会收到完全重复的消息。
2. 如果订阅者是单点的话,存在单点故障的问题。
为了解决这个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。
对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
对于消息接收端来说,是个队列,不同应用(或者说不同集群 不同组)里使用不同的前缀作为队列的名称,即可表明自己的身份也可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST来说明他是A组的消费者。可以在同一个组内使用多个consumer来消费此组消息。每组相当于一个持久化订阅者,组内可以有多个消费者共同消费消息。
实际开发中,我们经常会使用层次结构的方式来组织队列,比如A.B.C.D,这样便于归类和管理的方式。比如我们公司通常就是:三级部门缩写.集群名.业务描述
使用通配符的方式,可以让我们同时订阅多个主题或者发布到多个消息目的地。
通常通配符有三个:
示例:
订阅 | 含义 |
---|---|
PRICE.> | 匹配任何以PIRCE开头的消息目的地。示例都会被匹配到:PRICE.ZFPT,PRICE.ZFPT.TRADE |
PRICE.TRADE.> | 匹配任何以PRICE.TRADE开头的消息目的地。能匹配:PRICE.ZFPT.TRADE,PRICE.ZFPT.TRADE.SALARY |
PRICE.>.TRADE | 匹配任何以PRICE开头的消息目的地。匹配的:PRICE.ZFPT.TRADE 、PRICE.ZFPT.SALARY |
PRICE.* | 匹配任何以PRICE开头的消息目的地。匹配:PRICE.ZFPT.TRADE,PRICE.ZFPT.SALARY |
PRICE.*.TRADE | 匹配任何以PRICE开头,以TRADE结尾的消息目的地。匹配:PRICE.ZFPT.TRADE |
从版本5.5以后,可以自定义路径分隔符了:
<plugins>
.....
<destinationPathSeparatorPlugin/>
</plugins>
此时FOO.BAR.* 可以等同于 FOO/BAR/*;也可以通过pathSeparator属性定义其他符号路径分隔符。