@yexiaoqi
2024-11-10T04:33:27.000000Z
字数 9031
阅读 1102
技术 RocketMQ 转载
RocketMQ 是阿里开源的一个消息中间件,在 Springboot 中使用主要有两种方式,第一种是基于 RocketMQ 原生的 API,第二种是采用 Springboot 对 RocketMQ 封装后的写法,下面分别来介绍这两种基本方法。
pom依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>
配置文件:
# NameServer地址apache.rocketmq.namesrvAddr=192.168.56.129:9876# 生产者的组名apache.rocketmq.producer.producerGroup=test_Producer
生产者初始化:
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.PropertySource;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;@Componentpublic class MsgProducer {@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;private DefaultMQProducer mqProducer;//调用方注入MsgProducer后,通过这个方法获取配置好的DefaultMQProducerpublic DefaultMQProducer getMqProducer(){return mqProducer;}@PostConstructpublic void initMQ(){mqProducer=new DefaultMQProducer(producerGroup);mqProducer.setNamesrvAddr(namesrvAddr);mqProducer.setVipChannelEnabled(false);try {mqProducer.start();} catch (MQClientException e) {e.printStackTrace();}}@PreDestroypublic void destory(){mqProducer.shutdown();}}
生产者发送消息【同步、非顺序】
@Autowiredprivate MsgProducer msgProducer;public void sendMsg() throws Exception {//消息体String msg = "hello, RocketMQ";Message message = new Message("test_topic_2", "test", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));//调用配置好的DefaultMQProducer发送消息,拿到返回结果SendResult result = msgProducer.getMqProducer().send(message);}
生产者发送【同步、顺序消息】
@Autowiredprivate MsgProducer msgProducer;public void sendMsg() throws Exception {//发100条消息测试for(int i = 1; i <= 100; i++) {//默认自动创建的topic有四个队列,如果按照队列读取,那么同一个队列id下的value一定按照接受的顺序从小到大String msg = "id:" + i%4 + " value:" + i;Message message = new Message("test_topic_2", "test", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult result = msgProducer.getMqProducer().send(message, new MessageQueueSelector(){@Override//arg一般是唯一id,这里是ipublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int queueNum = Integer.valueOf(String.valueOf(arg)) % 4;System.out.println("队列id:" + queueNum + " 消息:" + new String(msg.getBody()));return mqs.get(queueNum);}}, i);}}
生产者发送【异步、非顺序消息】
public void asyncProducer() throws Exception {// Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// Launch the instance.producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 100; i++) {final int index = i;//创建一个消息示例, 指定topic, tag 和 message体.Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息并设置回调函数producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}}
消息先不发到 broker 的目的队列,而是包装一层放到中间队列,待提交之后再放到目的队列。配置类如下:
@Componentpublic class TxMsgProducer {@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;private TransactionMQProducer mqProducer;public DefaultMQProducer getMqProducer(){return mqProducer;}@PostConstructpublic void initMQ(){mqProducer=new TransactionMQProducer(producerGroup);mqProducer.setNamesrvAddr(namesrvAddr);mqProducer.setVipChannelEnabled(false);//下面两个是新增的mqProducer.setExecutorService(getExecutorService());mqProducer.setTransactionListener(new TransactionListenerImpl());try {mqProducer.start();} catch (MQClientException e) {e.printStackTrace();}}@PreDestroy//在程序运行结束时执行public void destory(){mqProducer.shutdown();}/*** 事务监听*/class TransactionListenerImpl implements TransactionListener {//第一次判断是否提交或回滚@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object arg){//message就是那个半发送的消息 arg是在transcationProducter.send(Message,Object)时的另一个携带参数)//执行本地事务或调用其他为服务if(true) return LocalTransactionState.COMMIT_MESSAGE;if(true) return LocalTransactionState.ROLLBACK_MESSAGE;//如果在检查事务时数据库出现宕机可以让broker过一段时间回查 和return null 效果相同return LocalTransactionState.UNKNOW;}//返回UNKOWN时回查!@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {//只去返回commit或者rollbackreturn LocalTransactionState.COMMIT_MESSAGE;}}//定义一个线程池 让broker用来执行回调和回查public ExecutorService getExecutorService(){return new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000));}}
第二步,发送消息
@Autowiredprivate TxMsgProducer msgProducer;public void sendMsg() throws Exception {String msg="hello";Message message=new Message("test_topic_2","test",msg.getBytes());//调用配置好的TxMsgProducer 发送消息SendResult result=msgProducer.getMqProducer().send(message);}
pom依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency>
配置文件:
# NameServer地址apache.rocketmq.namesrvAddr=192.168.56.129:9876# 消费者的组名apache.rocketmq.consumer.PushConsumer=test_Consumer
@Componentpublic class MsgConsumer {@Value("${apache.rocketmq.consumer.PushConsumer}")private String consumerGroup;@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;private DefaultMQPushConsumer consumer;@PostConstructpublic void init() throws MQClientException {consumer=new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);//设置consumer所订阅的Topic和Tag,*代表全部的Tagconsumer.subscribe("test_topic_2", "*");/*** CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息* CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {try{System.out.println("接受:"+new String(list.get(0).getBody()));}catch (Exception e){//ACK机制,消费失败,触发RocketMQ 重发消息return ConsumeConcurrentlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}//ACK机制,消费成功return ConsumeConcurrentlyStatus.SUCCESS;}});consumer.start();}@PreDestroypublic void destory(){consumer.shutdown();}}
其他消费模式
consumer.registerMessageListener(new MessageListenerConcurrently(){});
改为
consumer.registerMessageListener(new MessageListenerOrderly(){});
不用改变消费者,如果事务的监听 rollback 了,消费者的消费结果会自动回滚
consumer.setMessageMode(MessageMode.BROADCASTING);consumer.setOffsetStore(OffsetStore.LocalFileOffsetStore);
pom依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
配置文件设置:
rocketmq:name-server: localhost:9876producer:group: my-groupserver:port: 8081
发送消息:
直接实现 CommandLineRunner 这个接口,复写 run 方法即可,然后注册 RocketMQTemplate,就可以生产消息了
@SpringBootApplicationpublic class SpringBootRocketmqProducerApplication implements CommandLineRunner {//引入依赖模板@Resourceprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void run(String... args) throws Exception {//发送消息rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");rocketMQTemplate.convertAndSend("test-topic-2",new OrderPaidEvent("orderId-0001", 88));}}@Data@AllArgsConstructorclass OrderPaidEvent implements Serializable {private String orderId;private Integer paidMoney;}
pom依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
配置文件设置:
rocketmq:name-server: localhost:9876server:port: 8082
消费消息:onMessage() 封装了ACK 机制,消费者往外抛异常时,RocketMQ 认为消费失败,重新发送该条消息,否则默认消费成功。
@Slf4j@Service@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")class MyConsumer1 implements RocketMQListener<String> {/***需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功*/@SneakyThrows@Overridepublic void onMessage(Message message) {log.info("receivie message:topic={},body={}", message.getTopic(), new String(message.getBody()));if(消费成功){//TODO}else if(消费失败){throw new Exception;}}}@Data@AllArgsConstructorclass OrderPaidEvent implements Serializable {private String orderId;private Integer paidMoney;}
现象:消息投递到消费者监听到,有 2~10 秒的延迟,但并未使用延迟消息
一般来说消费延迟可能有以下几个原因
经过判断1、3均排除,最有可能的是消费者的 consumer group 乱用,搜索代码中的所有消费者,修改消费组后,消费正常。
是否是多网卡环境,查看 broker 的配置,brokerIP1=xxx.xxx.xxx.xxx 需配置。
广播模式会把 offset 保存到本地,默认是在用户根目录/.rocketmq_offsets/xxx.xxx.xxx.xxx@DEFAULT下,当同一个机子上启动有多个相同的消费者时,会对这个本地的偏移量造成覆盖。
集群模式下的消费者的 instanceName 会用 PID,没有 instanceName 影响不大,广播模式的 instanceName 是DEFAULT,不同实例之间会重复。