@boothsun
2017-09-09T10:25:31.000000Z
字数 4540
阅读 1958
消息队列
说明:官方文档
参考好的博文:ActiveMQ 分析系列(三)
broker将消息发送给客户端默认是异步的,异步就意味着多线程和上下文切换的消耗。但异步能将资源最大利用,因为不同的客户端访问的环境可能不同,速度有快有慢。如果认为客户端都能足够快,且不想将资源消耗在上下文切换上,可以设置为同步。如下:
//1. connect级别
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
//2. connectFactory级别
((ActiveMQConnection)connection).setDispatchAsync(false);
//3. 消息级别
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
//4. broker级别(直接修改activemq.xml中的配置)
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>
消息的发送是可以有权重级别的,同样消息的消费也是可以有优先级的;消息的消费端可以在创建的时候设置级别:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
优先级值的范围是:0~127,高级别能接受低级别的消息。如果在生产者端设置的级别是4,那么3级别的消费者不能马上收到该消息,而在5和6之间,broker会把消息优先发送给6。只有在消息堆积超过broker队列或缓存极限时,才会把消息发给低级别的消费者。
一般来说,消息队列都会保证queue当中的消息消费顺序。然而如果有多个consumer(比如集群环境下多台机器)同时消费同一个queue,那么这时就不能保证消息的消费顺序了。
有时候,消息的消费顺序是非常重要的,比如:退款消息肯定要在支付成功消息之后被消费。为了能顺序的消费消息,我们只能启动一个consumer来消费这个queue,但是这样就存在单点故障的隐患。
那么如何提高consumer的可用性呢?通常的做法是启动多个consumer,让其中一个consumer成为master,其他的为slave,成为master的consumer才让消费queue中的消息(通常的做法是使用zookeeper进行选主)。
但是这仍然没有解决全部问题。假设master在要消费一条消息之前,发了一次很长的gc,zookeeper认为master挂了,重新选主,一个slave被提升为master,并且开始消费这个queue的消息,这时原来的master从gc中恢复,在一定的时间范围内,原master还没有收到zookeeper通知它已经不是主了,这时它会继续消费queue中的消息。也就是说,在这段时刻,我们有2个认为自己是master的consumer。这种现象是分布式系统中所说的脑裂。zookeeper可以用来选主,zookeeper会保证只选出一个主,但是zookee并不能保证在某一时间系统中只有一个主,也就是说用了zookeeper并不能保证脑裂不会发生。zookeeper并不是用来防止脑裂的。
那么如何解决这问题那?很多mq都有exclusive consumer的概念,我们可以用它来解决这个问题。ActiveMQ也不例外。
ActiveMQ能够保证在集群环境下,同一个queue的消息消费行为只绑定到单一的客户端上;但如果被绑定的客户端发生故障宕机,则会自动转移到其他客户端。这其实和zookeeper选主一样,只不过ActiveMQ保证了同一时刻只能有一个Master。ActiveMQ 使用exclusive consumer的方式如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
但是,如果使用独家模式,我们就会发现消费者处理消息的能力将会严重下降。即使集群中有多台机器,使用独占模式之后,也就只能有一台机器进行消息处理。
消息的持久化,保证了消息者离线后,再次进入系统,不会错过消息,但是这样也很耗费系统资源。而且一些应用程序发送的消息是具有时效性的,一旦客户端长时间不消费此消息,我们就需要将这些过期的消息进行删除,防止消息堆积。另外,为了更好的系统性能,我们还需要断开与那些不活跃消费者的连接。
过期消息检测周期配置:
<policyEntry topic=">" expireMessagesPeriod="300000"/>
上面的配置表示每隔5分钟检测删除过期消息。
删除长期不活跃的消费者:
属性 | 默认 | 描述 |
---|---|---|
offlineDurableSubscriberTimeout | -1 | 在此之后,我们删除不活动的持久子站的时间(以毫秒为单位)。默认值-1,表示不删除它们 |
offlineDurableSubscriberTaskSchedule | 300000 | 我们多久检查一次(毫秒) |
示例配置如下:
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
上面的配置意味着我们每隔一小时检查一次,并删除已经离线一天的用户。
Retroactive Consumer属于非持久化订阅者,但它是消费 非持久化消息的订阅者。(其他非持久化订阅者 可以消费持久化消息)
那非持久化订阅者如何成为Retroactive Consumer呢?最简单的方式是在创建Topic的时候指定consumer为retroactive。
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
consumer = session.createConsumer(topic);
JMS Selectors is a filter. JMS选择器可以支持对消息的筛选。消息大多数情况都是发送到broker的,在知道Destination的情况下,都可以消费,因此有些情况下需要我们将消息分组、隔离,或者指定A消息,只能由A消费者消费等情况。下面我们举个例子,限制消息生产者A生产的消息只能让A消费者消费,生产者B生产的消息只能让B消费者消费。
发送者代码:
Destination send_destination = session.createQueue("order_queue");
MessageProducer producer = session.createProducer(send_destination);
for(int i =0;i<300;i++){
// 创建一个文本消息
TextMessage message = session.createTextMessage("A-张三-"+i);
// 这里我们分别设置对应的消息信息,当成是一组消息
message.setStringProperty("JMSXGroupID","A");
producer.send(message);
TextMessage message1 = session.createTextMessage("B-李四-"+i);
message1.setStringProperty("JMSXGroupID","B");
producer.send(message1);
}
消费者代码:
Destination destination = session.createQueue("order_queue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='A'");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("A:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
消费者B代码:
// 指定接收消息的地方
Destination destination = session.createQueue("order_queue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='B'");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("B:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
然后开启A、B消费者监听,启动发送者,那么就能看到消息分别消费了。同时Selector支持一些表达式的过滤,比如可以写成:JMSXGroupID = 'A' or JMSXGroupID = 'B'。