[关闭]
@File 2019-10-24T07:06:53.000000Z 字数 3983 阅读 135

rocket-mq 中间件

java web


一、安装运行服务

1. 服务器

1.1 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
1.2 把 /bin 目录,配置到环境变量
1.3 cmd 执行 mqbroker.cmd -n localhost:9876
1.4 cmd 执行 mqnamesrv.cmd

2. 可视化控制台

1.1 下载地址:https://github.com/apache/rocketmq-externals
1.2 进入 /rocketmq-console 目录
1.3 cmd 执行 mvn clean package -Dmaven.test.skip=true
1.4 cmd 在执行 java -jar target/rocketmq-console-ng-1.0.1.jar
1.5 修改配置文件 rocketmq-externals\rocketmq-console\src\main\resourcesapplication.properties 中的 rocketmq.config.namesrvAddr=localhost:9876

二、依赖(接收方和发送方都要有)

  1. <!-- rocketmq -->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.4.0</version>
  6. </dependency>

三、yml 配置(接收方和发送方都要有)

  1. server:
  2. # 发送和接收的服务端口不要重复
  3. port: 8800
  4. # 可选,用于 三、创建配置映射类
  5. rocketmq:
  6. nameServerAddr: localhost:9876
  7. topicName: send-email-topic
  8. topicTag: tag1
  9. producerGroupName: sendMailProducer
  10. consumerGroupName: sendMailConsumer

四、创建配置映射类

  1. @Data
  2. @Configuration
  3. @ConfigurationProperties("rocketmq")
  4. public class RocketMqPojo implements Serializable {
  5. private String nameServerAddr;
  6. private String topicName;
  7. private String topicTag;
  8. private String producerGroupName;
  9. private String consumerGroupName;
  10. }

五、配置 rocket

调用 rocketMqPojo 的配置可根据实际需求,可在业务过程中再配置(如:五、发送信息)

1. 发送方

  1. @Configuration
  2. // 引入配置映射类
  3. @Import(com.lidaye.config.RocketMqPojo.class)
  4. public class SendMailComponent {
  5. @Resource
  6. private RocketMqPojo rocketMqPojo;
  7. @Bean
  8. public DefaultMQProducer defaultMQProducer() throws MQClientException {
  9. // 创建生产者,制定其生产者组
  10. DefaultMQProducer producer = new DefaultMQProducer();
  11. //设置服务器地址
  12. producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
  13. // 启动
  14. producer.start();
  15. return producer;
  16. }
  17. }

2. 接收方

  1. @Configuration
  2. // 引入配置映射类
  3. @Import(com.lidaye.config.RocketMqPojo.class)
  4. public class MessageConsumerCompoment {
  5. @Resource
  6. private RocketMqPojo rocketMqPojo;
  7. @Bean
  8. public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
  9. // 创建消费者,制定其消费者组
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqPojo.getConsumerGroupName());
  11. // 配置服务器地址
  12. consumer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
  13. // 设置标题
  14. consumer.subscribe(rocketMqPojo.getTopicName(), rocketMqPojo.getTopicTag());
  15. // 设置消费者的信息偏移量
  16. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  17. // 订阅
  18. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  19. // 收信息时的逻辑
  20. msgs.forEach(mt -> {
  21. System.out.println(new String(mt.getBody()));
  22. });
  23. // 返回状态
  24. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  25. });
  26. // 启动
  27. consumer.start();
  28. return consumer;
  29. }
  30. }

六、发送信息

  1. @RestController
  2. public class RockerController {
  3. @Resource
  4. private DefaultMQProducer defaultMQProducer;
  5. @Resource
  6. private RocketMqPojo rocketMqPojo;
  7. @GetMapping("/SendMessage")
  8. public String sendMessage() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  9. // 实例化信息类
  10. Message message = new Message();
  11. // 配置信息内容
  12. message.setBody("信息内容".getBytes());
  13. // 设置标题
  14. message.setTopic(rocketMqPojo.getTopicName());
  15. message.setTags(rocketMqPojo.getTopicTag());
  16. // 发送信息
  17. defaultMQProducer.send(message);
  18. return "success";
  19. }
  20. }

七、事务

参考1:https://www.jianshu.com/p/cc5c10221aa1
参考2:https://www.jianshu.com/p/694d6d2676ff
参考3:https://juejin.im/post/5d3bef91f265da1b725c4b3d
参考4:https://blog.csdn.net/lihongtai/article/details/84642817

1. TransactionListener 事务回查监听器

  1. @Component
  2. public class PointTransactionListener implements TransactionListener {
  3. /**
  4. * 根据消息发送的结果 判断是否执行本地事务
  5. * @param msg
  6. * @param arg
  7. * @return
  8. */
  9. @Override
  10. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  11. return LocalTransactionState.UNKNOW;
  12. }
  13. /**
  14. * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功
  15. * @param msg
  16. * @return
  17. */
  18. @Override
  19. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  20. return LocalTransactionState.COMMIT_MESSAGE;
  21. }
  22. }

2. config 中注册(在 五、1 基础上)

  1. /**
  2. * 事务回查监听器
  3. */
  4. @Resource
  5. private PointTransactionListener pointTransactionListener;
  6. @Bean
  7. public TransactionMQProducer transactionMQProducer() throws MQClientException {
  8. // 创建事务生产者,制定其生产者组
  9. TransactionMQProducer producer = new TransactionMQProducer("T"+rocketMqPojo.getProducerGroupName());
  10. //设置服务器地址
  11. producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
  12. // 设置事务决断处理类
  13. producer.setTransactionListener(pointTransactionListener);
  14. // 启动
  15. producer.start();
  16. return producer;
  17. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注