[关闭]
@boothsun 2017-09-24T15:06:33.000000Z 字数 6461 阅读 1892

ActiveMQ 整合Spring 消息监听器

消息队列


  1. 原文地址:Spring整合JMS(二)——三种消息监听器

1. MessageListener

MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消息费者的时候用的消息监听器就是MessageListener,代码如下:

  1. @Service
  2. public class ConsumerMessageListener implements MessageListener {
  3. public void onMessage(Message message) {
  4. //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
  5. TextMessage textMsg = (TextMessage) message;
  6. System.out.println("接收到一个纯文本消息。");
  7. try {
  8. System.out.println("消息内容是:" + textMsg.getText());
  9. } catch (JMSException e) {
  10. e.printStackTrace();
  11. }
  12. }
  1. <!-- 消息监听器 -->
  2. <bean id="consumerMessageListener" class="com.boothsun.mq.impl.ConsumerMessageListener"/>
  3. <!-- 消息监听容器 queue-->
  4. <bean id="salaryContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  5. <property name="connectionFactory" ref="connectionFactory" />
  6. <property name="destination" ref="salaryDestination" />
  7. <property name="messageListener" ref="consumerMessageListener" />
  8. </bean>

2. SessionAwareMessageListener

SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListner。MessageListener的设计只是纯粹用来接收消息的,假设我们在使用MessgaeListener处理接收到的消息时我们需要发送一个消息通知对方我们已收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的OnMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。

  1. @Service
  2. public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage> {
  3. @Autowired
  4. @Qualifier("topicDestination")
  5. Destination topicDestination ;
  6. @Override
  7. public void onMessage(TextMessage message, Session session) throws JMSException {
  8. System.out.println(Thread.currentThread().getName() + "-收到消息,消息内容:" + message.getText());
  9. MessageProducer producer = session.createProducer(topicDestination);
  10. TextMessage rtnMessage = session.createTextMessage("我是消费后的回复");
  11. producer.send(rtnMessage);
  12. session.commit();
  13. }
  14. }
  1. <!--queue -->
  2. <bean id="salaryDestination" class="org.apache.activemq.command.ActiveMQQueue">
  3. <constructor-arg>
  4. <value>zfpt.salary</value>
  5. </constructor-arg>
  6. </bean>
  7. <!-- topic -->
  8. <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
  9. <constructor-arg value="topic"/>
  10. </bean>
  11. <!-- 消息监听器 -->
  12. <bean id="consumerMessageListener" class="com.boothsun.mq.impl.ConsumerMessageListener"/>
  13. <bean id="consumerSessionAwareMessageListener" class="com.boothsun.mq.impl.ConsumerSessionAwareMessageListener"/>
  14. <bean id="salaryContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  15. <property name="connectionFactory" ref="connectionFactory" />
  16. <property name="destination" ref="salaryDestination" />
  17. <property name="messageListener" ref="consumerSessionAwareMessageListener" />
  18. </bean>
  19. <!-- 消息监听容器 topic-->
  20. <bean id="topicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  21. <property name="connectionFactory" ref="connectionFactory" />
  22. <property name="destination" ref="topicDestination" />
  23. <property name="messageListener" ref="consumerMessageListener" />
  24. </bean>

上面代码在接收到一条消息后,在通过广播的形式将消费成功的消息广播出去,运行截图如下:

MessageListenerAdapter

MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。
MessageListenerAdapter会把接收到的消息做如下转换:

  1. TextMessage转换成String对象。
  2. ByteMessage转换成Byte数组。
  3. MapMessage转换为Map对象。
  4. ObjectMessage转换为对于的Serializable对象。

既然前面说了MessageListenerAdapter会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器 —— 一个普通的Java类进行处理(如果真正的目标处理器是一个MessageListener或者一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为参数调用它们的onMessage方法,而不会再利用发射去进行调用),那么我们在定义一个MessageListenerAdapter的时候就需要为它指定这样一个目标类。这个目标类,我们可以通过MessageListenerAdapter的构造方法参数指定,如:

  1. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <constructor-arg>
  3. <bean class="com.boothsun.mq.impl.ConsumerMessageListener"/>
  4. </constructor-arg>
  5. <property name="defaultListenerMethod" value="receiveMessage"/>
  6. </bean>

也可以通过它的delegate属性来指定,如:

  1. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <property name="delegate">
  3. <bean class="com.boothsun.mq.impl.ConsumerMessageListener"/>
  4. </property>
  5. <property name="defaultListenerMethod" value="receiveMessage"/>
  6. </bean>

前面说了如果我们指定的这个目标处理器是一个MessageListener或者是一个SessionAwareMessageListener的时候Spring直接利用接收到的Message对象作为方法参数调用它们的onMessage方法。但是如果指定的目标处理器是一个普通的Java类时Spring将利用Message进行了类型转换之后的对象作为参数通过反射去调用真正的目标处理器的处理方法,那么Spring是如何知道该调用哪个方法呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的,当我们没有指定该属性时,Spring会默认调用目标处理器的handleMessage方法。

实例代码如下:

  1. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <constructor-arg>
  3. <bean class="com.boothsun.mq.impl.ConsumerMessageListener"/>
  4. </constructor-arg>
  5. <property name="defaultListenerMethod" value="receiveMessage"/>
  6. </bean>
  1. public class ConsumerMessageListener {
  2. public void handleMessage(Serializable message) {
  3. System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);
  4. }
  5. public void receiveMessage(Serializable message) {
  6. System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);
  7. }
  8. }
  1. @Component
  2. public class ProducerServiceImpl implements ProducerService {
  3. @Resource
  4. JmsTemplate jmsTemplate;
  5. public void sendMessage(Destination destination, final String message) {
  6. System.out.println("---------------生产者发了一个消息:" + message);
  7. User user = new User();
  8. user.setAge(1);
  9. user.setId(1);
  10. user.setPassword("11");
  11. user.setUserName("111");
  12. jmsTemplate.send(destination, session -> session.createObjectMessage(user));
  13. }
  14. }

运行结果:

MessageListenerAdapter除了会自动的把一个普通Java类当作MessageListener来处理接收到的消息之外,其另外一个主要的功能是可以自动的发送返回消息。
当我们用于处理接收到的消息的方法的返回值不为空的时候,Spring会自动将它封装为一个JMS Message,然后自动进行回复。那么这个时候这个回复消息将发送到哪里呢?这主要有两种方式可以进行指定。

第一种是可以通过发送的Message的setJMSReplyTo方法指定该消息对应的回复消息的目的地。

  1. textMessage.setJMSReplyTo(responseDestination);

第二种是通过MessageListenerAdapter的defaultResponseDestination属性来指定,该属性指定的是其默认的回复地址。 具体配置如下:

  1. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <property name="delegate">
  3. <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
  4. </property>
  5. <property name="defaultListenerMethod" value="receiveMessage"/>
  6. <property name="defaultResponseDestination" ref="defaultResponseQueue"/>
  7. </bean>

总结

1.MessageListener

    JMS规范订单的消息监听器,其通过onMessage方法对消息进行处理。

2.SessionAwareMessageListener

    Spring提供的用于处理消息的监听器,该监听器定义的onMessage方法带有两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。使用场景是:当我们接收到消息后,需要再发送消息通知对方我们已经收到这个消息了或者发送消息做其他处理,此时我们就可以试用这个session对象了。

  1. MessageListenerAdapter
    (1)通过适配,可以把普通Java类转换成消息处理器。
    (2)将接收到的消息进行类型转换。
        TextMessage转换为String对象。
        ByteMessage转换为byte数组。
        MapMessage转换为Map数组。
        ObjectMessage转换为对应的Serializable对象。
    (3)自动将消息处理方法的返回值封装成返回消息进行返回。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注