@zhou333666
2016-11-11T19:17:14.000000Z
字数 3124
阅读 1131
消息队列使用手册
消息队列使用和规范手册
消息队列,即MQ(Message Queue)。一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。而不是通过直接调用彼此来通信。这样能够达到解耦的作用。
比如用户下了订单后,会使用到付款的接口。万一付款接口出现问题,那生成订单就出问题了,页面就会抛异常。使用消息队列,就把生成订单接口,付款接口独立开来。当生成订单接口运行完后,发送消息通知一下付款接口,订单接口就运行结束了。无论付款接口是否可用,都跟生成订单接口无关。
消息队列有很多种,我们使用rabbitMQ。使用前需要安装Erlang OTP,Rabbit MQ环境。如果是window环境安装,可以向康文根,周建安拿安装包。网上下载安装包速度很慢。安装方法参考:http://jingyan.baidu.com/article/a17d5285173ce68098c8f2e5.html。再配一下环境变量即可。
规范1:发送消息调用公共接口---setMessage(String message,String exchangeName)message是下个接口所需要的参数,和(exchangeName)通道名字。(详情见下面代码)
规范2:需传递下个接口所需要的参数必须是json的格式。
规范3:接收者可以扩展,扩展代码见下面代码。
规范4:目前的收入做法,通道名字定义成income,传递的参数必须有服务员id,服务工单id.
一般一个消息发布者往往对应一个消息接受者。为了减少冗余。我们使用一个发布者对应多个接受者。
看下面代码:
发送者:
public class Send {
public static void main(String[] args) throws Exception {
//需要传递的参数
String message = "{ "firstName": "Brett","lastName":"McLaughlin", "email": "aaaa" }";
//发布通道
String exchangeName = "logs";
//String exchangeName = "logs2";
//发送信息
setMessage(message, exchangeName);
}
private static void setMessage(String message,String exchangeName) throws Exception{
if(Strings.isNullOrEmpty(exchangeName)){
return;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
/**exchange类型
* direct(直接)、topic(主题)、headers(标题)和fanout
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("Sent["+message+"]");
channel.close();
conn.close();
}
}
接收者1:
public class RabbitMQRecv {
private static final String EXCHANGE_NAME = "income";
public static void main(String[] args) throws Exception {
init();
}
public static void init() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定队列与exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLogs wait for message .TO exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received [" + message + "]");
}
}
}
接收者2:
public class RabbitMQRecv {
private static final String EXCHANGE_NAME = "income2";
public static void main(String[] args) throws Exception {
init();
}
public static void init() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定队列与exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLogs wait for message .TO exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received [" + message + "]");
}
}
}