@File
2019-10-24T07:06:53.000000Z
字数 3983
阅读 135
java
web
1.1 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
1.2 把 /bin 目录,配置到环境变量
1.3 cmd 执行 mqbroker.cmd -n localhost:9876
1.4 cmd 执行 mqnamesrv.cmd
1.1 下载地址:https://github.com/apache/rocketmq-externals
1.2 进入 /rocketmq-console 目录
1.3 cmd 执行 mvn clean package -Dmaven.test.skip=true
1.4 cmd 在执行 java -jar target/rocketmq-console-ng-1.0.1.jar
1.5 修改配置文件 rocketmq-externals\rocketmq-console\src\main\resourcesapplication.properties 中的 rocketmq.config.namesrvAddr=localhost:9876
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
server:
# 发送和接收的服务端口不要重复
port: 8800
# 可选,用于 三、创建配置映射类
rocketmq:
nameServerAddr: localhost:9876
topicName: send-email-topic
topicTag: tag1
producerGroupName: sendMailProducer
consumerGroupName: sendMailConsumer
@Data
@Configuration
@ConfigurationProperties("rocketmq")
public class RocketMqPojo implements Serializable {
private String nameServerAddr;
private String topicName;
private String topicTag;
private String producerGroupName;
private String consumerGroupName;
}
调用 rocketMqPojo 的配置可根据实际需求,可在业务过程中再配置(如:五、发送信息)
@Configuration
// 引入配置映射类
@Import(com.lidaye.config.RocketMqPojo.class)
public class SendMailComponent {
@Resource
private RocketMqPojo rocketMqPojo;
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
// 创建生产者,制定其生产者组
DefaultMQProducer producer = new DefaultMQProducer();
//设置服务器地址
producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
// 启动
producer.start();
return producer;
}
}
@Configuration
// 引入配置映射类
@Import(com.lidaye.config.RocketMqPojo.class)
public class MessageConsumerCompoment {
@Resource
private RocketMqPojo rocketMqPojo;
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
// 创建消费者,制定其消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqPojo.getConsumerGroupName());
// 配置服务器地址
consumer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
// 设置标题
consumer.subscribe(rocketMqPojo.getTopicName(), rocketMqPojo.getTopicTag());
// 设置消费者的信息偏移量
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 收信息时的逻辑
msgs.forEach(mt -> {
System.out.println(new String(mt.getBody()));
});
// 返回状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动
consumer.start();
return consumer;
}
}
@RestController
public class RockerController {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private RocketMqPojo rocketMqPojo;
@GetMapping("/SendMessage")
public String sendMessage() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// 实例化信息类
Message message = new Message();
// 配置信息内容
message.setBody("信息内容".getBytes());
// 设置标题
message.setTopic(rocketMqPojo.getTopicName());
message.setTags(rocketMqPojo.getTopicTag());
// 发送信息
defaultMQProducer.send(message);
return "success";
}
}
参考1:https://www.jianshu.com/p/cc5c10221aa1
参考2:https://www.jianshu.com/p/694d6d2676ff
参考3:https://juejin.im/post/5d3bef91f265da1b725c4b3d
参考4:https://blog.csdn.net/lihongtai/article/details/84642817
TransactionListener
事务回查监听器
@Component
public class PointTransactionListener implements TransactionListener {
/**
* 根据消息发送的结果 判断是否执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
/**
* RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
/**
* 事务回查监听器
*/
@Resource
private PointTransactionListener pointTransactionListener;
@Bean
public TransactionMQProducer transactionMQProducer() throws MQClientException {
// 创建事务生产者,制定其生产者组
TransactionMQProducer producer = new TransactionMQProducer("T"+rocketMqPojo.getProducerGroupName());
//设置服务器地址
producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());
// 设置事务决断处理类
producer.setTransactionListener(pointTransactionListener);
// 启动
producer.start();
return producer;
}