[关闭]
@boothsun 2017-09-09T10:26:56.000000Z 字数 2805 阅读 1211

ActiveMq入门实例

消息队列


pom.xml配置

  1. <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-core</artifactId>
  5. <version>5.5.1</version>
  6. </dependency>

消息生产者

  1. public class JMSProducer {
  2. //默认连接用户名
  3. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  4. //默认连接密码
  5. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  6. //默认连接地址
  7. private static final String BROKEURL = "tcp://xxx.xx.xx.xx:61616";
  8. //发送的消息数量
  9. private static final int SENDNUM = 10;
  10. public static void main(String[] args) {
  11. //连接工厂
  12. ConnectionFactory connectionFactory;
  13. //连接
  14. Connection connection = null;
  15. //会话 接受或者发送消息的线程
  16. Session session;
  17. //消息的目的地
  18. Destination destination;
  19. //消息生产者
  20. MessageProducer messageProducer;
  21. //实例化连接工厂
  22. connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
  23. try {
  24. //通过连接工厂获取连接
  25. connection = connectionFactory.createConnection();
  26. //启动连接
  27. connection.start();
  28. //创建session
  29. session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  30. //创建一个名称为HelloWorld的消息队列
  31. destination = session.createQueue("HelloWorld");
  32. //创建消息生产者
  33. messageProducer = session.createProducer(destination);
  34. //发送消息
  35. sendMessage(session, messageProducer);
  36. session.commit();
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }finally{
  40. if(connection != null){
  41. try {
  42. connection.close();
  43. } catch (JMSException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }
  49. /**
  50. * 发送消息
  51. * @param session
  52. * @param messageProducer 消息生产者
  53. * @throws Exception
  54. */
  55. public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
  56. for (int i = 0; i < JMSProducer.SENDNUM; i++) {
  57. //创建一条文本消息
  58. TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
  59. System.out.println("发送消息boothsun:Activemq 发送消息" + i);
  60. //通过消息生产者发出消息
  61. messageProducer.send(message);
  62. }
  63. }
  64. }

运行截图:

消息管理后台页面:

消息消费者

  1. public class JMSConsumer {
  2. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
  3. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
  4. private static final String BROKEURL = "tcp://xxx.xxx.xxx.xxx:61616";//连接地址
  5. public static void main(String[] args) {
  6. ConnectionFactory connectionFactory;//连接工厂
  7. Connection connection = null;//连接
  8. Session session;//会话 接受或者发送消息的线程
  9. Destination destination;//消息的目的地
  10. MessageConsumer messageConsumer;//消息的消费者
  11. //实例化连接工厂
  12. connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
  13. try {
  14. //通过连接工厂获取连接
  15. connection = connectionFactory.createConnection();
  16. //启动连接
  17. connection.start();
  18. //创建session
  19. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  20. //创建一个连接HelloWorld的消息队列
  21. destination = session.createQueue("HelloWorld");
  22. //创建消息消费者
  23. messageConsumer = session.createConsumer(destination);
  24. while (true) {
  25. TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
  26. if(textMessage != null){
  27. System.out.println("收到的消息:" + textMessage.getText());
  28. }else {
  29. break;
  30. }
  31. }
  32. } catch (JMSException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }

运行截图:

消息管理后台页面:

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注