@xtccc
2017-04-09T10:19:40.000000Z
字数 11882
阅读 2567
RabbitMQ
目录:
发送消息时,不需要对producer进行流量控制,他可以消息全部发到exchange中。
package cn.gridx.rabbitmq.tutorials
import com.rabbitmq.client.{BuiltinExchangeType, MessageProperties, ConnectionFactory}
import org.slf4j.LoggerFactory
object Sender {
def HOST = "xx"
def USER = "yyy"
def VHOST = "vhost_tao"
def PASSWD = "zzzz"
def EXCHANGE = "Hub"
def ROUTING_KEY = "G40"
def Q = "Q"
val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]) = {
val factory = new ConnectionFactory()
factory.setHost(HOST)
factory.setUsername(USER)
factory.setVirtualHost(VHOST)
factory.setPassword(PASSWD)
logger.info("sender已创建")
val conn = factory.newConnection()
val channel = conn.createChannel()
// 声明1个topic类型的exchange
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)
logger.info(s"已创建exchange: $EXCHANGE")
// 声明1个名为"Q"的queue
channel.queueDeclare(Q, true/*durable*/, false/*exclusive*/, false/*autoDelete*/, null)
logger.info(s"已声明Queue: $Q")
// 创建从exchange到queue的binding,为这个binding指定routing_key
channel.queueBind(Q, EXCHANGE, ROUTING_KEY)
logger.info(s"exchange ($EXCHANGE) 已通过 routing key ($ROUTING_KEY) 绑定到 queue ($Q)")
var i = 1
logger.info("开始发送")
while (i <= 10) {
val msg = s"#$i"
channel.basicPublish(EXCHANGE, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes)
logger.info(s"#$i => ($EXCHANGE, $ROUTING_KEY)")
i += 1
}
channel.close()
conn.close()
logger.info("发送结束")
}
}
上面的代码向exchange("Hub")发送了10条消息,这些消息全部被发送到名为"Q"的queue。
Queues are like named mailboxes. They’re where messages end up and wait to be consumed. Consumers receive messages from a particular queue in one of two ways:
By subscribing to it via the basic.consume
AMQP command. This will place the channel being used into a receive mode until unsubscribed from the queue. While subscribed, your consumer will automatically receive another message from the queue (as available) after consuming (or rejecting) the last received message. You should use basic.consume if your consumer is processing many messages out of a queue and/or needs to automatically receive messages from a queue as soon as they arrive.
Sometimes, you just want a single message from a queue and don’t need to be persistently subscribed. Requesting a single message from the queue is done by using the basic.get
AMQP command. This will cause the consumer to receive the next message in the queue and then not receive further messages until the next basic.get. You shouldn’t use basic.get in a loop as an alternative to basic.consume, because it’s much more intensive on Rabbit. basic.get essen- tially subscribes to the queue, retrieves a single message, and then unsubscribes every time you issue the command. High-throughput consumers should always use basic.consume.
下面是subscribe模式的例子。
package cn.gridx.rabbitmq.tutorials
import com.rabbitmq.client._
import org.slf4j.LoggerFactory
object Receiver {
def HOST = "xx"
def VHOST = "vhost_tao"
def USER = "yyy"
def PASSWD = "zzzz"
def EXCHANGE = "Hub"
def ROUTING_KEY = "G40"
def Q = "Q"
val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val factory = new ConnectionFactory()
factory.setHost(HOST)
factory.setUsername(USER)
factory.setVirtualHost(VHOST)
factory.setPassword(PASSWD)
val conn = factory.newConnection()
val channel = conn.createChannel()
logger.info("channel已创建")
// 在channel中声明1个queue
channel.queueDeclare(Q, true, false, false, null)
logger.info(s"queue已声明: $Q")
// 创建3个consumers, 它们会试图消费同一个channel中的消息
/** By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion) */
val consumer1 = new TheConsumer("A", Q, channel)
val consumer2 = new TheConsumer("B", Q, channel)
val consumer3 = new TheConsumer("C", Q, channel)
logger.info("3个Consumers (A/B/C) 已就绪 ... ")
/**
* RabbitMQ just dispatches a message when the message enters the queue.
* It doesn't look at the number of unacknowledged messages for a consumer
* */
consumer1.start()
consumer2.start()
consumer3.start()
logger.info(s"正在主程序中 ....")
}
}
/** Consumer Class */
class TheConsumer(consumerName: String, queue: String, channel: Channel) extends DefaultConsumer(channel) {
val logger = LoggerFactory.getLogger(getClass)
def start(): Unit = {
// 调用一次即可
channel.basicConsume(queue, false, this)
}
/** 收到消息后的回调方法 */
override
def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
val msg = new String(body, "UTF-8")
val deliveryTag = envelope.getDeliveryTag.toString
val exchange = envelope.getExchange
val routingKey = envelope.getRoutingKey
val redeliver = if (envelope.isRedeliver) "Y" else "N"
val content = String.format("consumer name: %-4s | redeliver: %-2s | msg = %-6s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ",
consumerName, redeliver, msg, deliveryTag, exchange, routingKey)
logger.info(content)
Thread.sleep(1000) // 模拟处理一条消息需要耗时1s
channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
// 不要再次调用basicConsume
}
}
如果我们先运行Consumer程序,然后再运行Producer程序,则会看到Consumer程序的输出如下:
By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion)
可以看到,3个consumer都在试图消费同一个queue中的消息(它们共享同一个channel),因此RabbitMQ轮流将消息message发送给它们,即:第一条送给A,等A处理完后,再将下一条送给B,等B处理完后,再将下一条送给C,以此类推。
这里处理完的意思是:consumer回送一个ACK(我们在调用basicConsume时将autoAck设置为了false)。
可见,在某一个channel中,如果一条已派发出去的消息没有被consumer回应(acknowledged),则下一条消息不会被派发到该channel中。
所以,虽然这里有3个consumer存在,但是总是有2个consumer处于空闲的状态,怎么让3个consumer都忙起来呢? 让每个consumer拥有自己的channel。
代码稍作修改:
object Receiver {
....
def main(args: Array[String]) {
....
// 创建3个consumer, 它们会在各自的channel中共同消费同一个Queue中的message
/** By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion) */
val consumer1 = {
val channel = conn.createChannel()
channel.queueDeclare(Q, true, false, false, null)
logger.info("A的channel已创建")
new TheConsumer("A", Q, channel)
}
val consumer2 = {
val channel = conn.createChannel()
channel.queueDeclare(Q, true, false, false, null)
logger.info("B的channel已创建")
new TheConsumer("B", Q, channel)
}
val consumer3 = {
val channel = conn.createChannel()
channel.queueDeclare(Q, true, false, false, null)
logger.info("C的channel已创建")
new TheConsumer("C", Q, channel)
}
logger.info("3个Consumers (A/B/C) 已就绪 ... ")
}
...
}
再次运行Receiver程序,可以看到:
如果一个consumer收到了一条消息后,始终不回送ack,那么会怎么样?
它就再也不会收到新的消息,已收到的但是还未回应ACK的那条消息也不会再次被发送到其他consumer。但是如果它挂了(channel关闭,连接断开,或者或者应用崩溃),那条未被回应ACK的消息则会被重新发送到下一个可用的consumer。
class TheConsumer(consumerName: String, queue: String, conn: Connection, workload: Int) {
val logger = LoggerFactory.getLogger(getClass)
def start(): Unit = {
// 如果需要同同时最多消费3个message(channel不设置autoAck, 并且这个三个message都没有返回ack)
// 那么必须创建3个channel
// 对于一个channel, 如果它的handler没有调用basci ack,是无论如何也无法收到下一条消息的
for (i <- 0 until 3) {
val channel = conn.createChannel()
channel.queueDeclare(queue, true, false, false, null)
channel.basicQos(1)
logger.info(s"${consumerName}的channel${i}已创建")
channel.basicConsume(queue, false /* autoack */ , new MsgHandler(i.toString, channel))
}
}
class MsgHandler(name: String, channel: Channel) extends DefaultConsumer(channel) {
logger.info(s"MsgHandler $name is instantiated ...")
override
def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
val msg = new String(body, "UTF-8")
val deliveryTag = envelope.getDeliveryTag.toString
val exchange = envelope.getExchange
val routingKey = envelope.getRoutingKey
val redeliver = if (envelope.isRedeliver) "Y" else "N"
val headers = properties.getHeaders
val jobType = headers.get("job-type")
val content = String.format("consumer name: %-4s | redeliver: %-2s | jobType = %-10s | msg = %-6s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ", consumerName, redeliver, jobType, msg, deliveryTag, exchange, routingKey)
logger.info(content)
Thread.sleep(1000 * workload) // 模拟处理一条消息需要耗时n秒
channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
// 不需要再次调用 channel.basicConsume(queue, false, this)
}
}
}
消息首先被发送给某个channel,然后channel再发送给consumer。在任意时刻,一个channel里都可能有多条消息(unacknowledged messages)等待被送至consumer,但是如果consumer还没ack上一条消息,那么channel中的消息就不会被送至consumer。
通过对consumer的channel设置basic.qos
,可以限制channle中unacknowledged messages的最大数量。如果达到了这个数量,则RabbitQos就不会再向该channel发送消息,直到该channle中的unacknowledged messages的数量小于预设的值。QoS也被称为prefetch count。
参考
同一条通路里面的消息,是满足FIFO的顺序的。
这里,同一条通路,指的是producer -> channel -> route -> channel -> consumer。
如果多个consumer订阅了同一个queue,则这些consumers之间的消息顺序是无法保证的。
如果一个queue在声明时,为其加入属性x-max-priority
(该queue中的消息所允许指定的最高优先级),则它就是一个priority queue,里面的message都是可以指定优先级的(通过basic.properties
的priority
属性)。数字越大,优先级越高。、
消息的prioroty
属性实际上以一个无符号字节来记录,所以消息的优先级的范围是0~255。
未定义priority的消息的优先级事实上为0。
注意:如果producer在声明一个queue时将其指定为priority queue,那么consumer在订阅该queue时,也必须将其指定为priority queue,否则会有异常。
import java.util
import com.rabbitmq.client._
import org.slf4j.LoggerFactory
object PriorityQueue {
val logger = LoggerFactory.getLogger(getClass)
def HOST = "xx"
def USER = "yyyy"
def VHOST = "vhost_tao"
def PASSWD = "zzzzz"
def EXCHANGE = "Hub"
def ROUTING_KEY = "G25"
def Q25 = "Q25"
val PersistentMode = 2
val TextMsgType = "text/plain"
def main(args: Array[String]): Unit = {
val factory = new ConnectionFactory()
factory.setHost(HOST)
factory.setUsername(USER)
factory.setVirtualHost(VHOST)
factory.setPassword(PASSWD)
logger.info("ConnectionFactory已创建")
sendMsg(factory)
receiveMsg(factory)
}
/**
* 创建一个priority queue, 并向其发送消息
* */
def sendMsg(factory: ConnectionFactory): Unit = {
val conn = factory.newConnection()
val channel = conn.createChannel()
channel.confirmSelect()
logger.info("Channel已创建")
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)
logger.info(s"已声明exchange: $EXCHANGE")
val queueArgs: util.HashMap[String, Object] = new java.util.HashMap[String, Object]()
queueArgs.put("x-max-priority", new Integer(5))
channel.queueDeclare(Q25, true, false, false, queueArgs)
logger.info(s"声明了一个优先级最高为5的queue: ${Q25}")
channel.queueBind(Q25, EXCHANGE, ROUTING_KEY)
logger.info(s"exchange ($EXCHANGE) 已通过 routing key ($ROUTING_KEY) 绑定到 queue ($Q25)")
logger.info("开始发送30条消息,其中前20条消息的优先级为1, 后10条的优先级为3")
val propertiesBuilder = new AMQP.BasicProperties().builder()
propertiesBuilder.contentType(TextMsgType).deliveryMode(PersistentMode).priority(1)
for (i <- 1 to 20) {
val msg = s"Msg-#$i"
channel.basicPublish(EXCHANGE, ROUTING_KEY, propertiesBuilder.build(), msg.getBytes())
}
propertiesBuilder.contentType("text/plain").deliveryMode(2).priority(5)
for (i <- 21 to 30) {
val msg = s"Msg-#$i"
channel.basicPublish(EXCHANGE, ROUTING_KEY, propertiesBuilder.build(), msg.getBytes())
}
channel.waitForConfirmsOrDie()
logger.info("发送结束")
}
/**
* 从priority queue中接收消息
* */
def receiveMsg(factory: ConnectionFactory) = {
val conn = factory.newConnection()
val channel = conn.createChannel()
logger.info("Channel 已创建")
// sender已经在声明queue时加入了优先级的属性
// 这里的receiver在声明queue时仍然必须加入优先级的属性
val queueArgs: util.HashMap[String, Object] = new java.util.HashMap[String, Object]()
queueArgs.put("x-max-priority", new Integer(5))
channel.queueDeclare(Q25, true, false, false, queueArgs)
channel.basicQos(1)
logger.info("queue 已声明一个最高优先级为5的queue")
val consumer = new TheConsumer("my-consumer", Q25, channel, 1)
consumer.start()
logger.info("consumer已启动")
}
}
class TheConsumer(consumerName: String, queue: String, channel: Channel, workload: Int)
extends DefaultConsumer(channel) {
val logger = LoggerFactory.getLogger(getClass)
def start(): Unit = {
channel.basicQos(1)
channel.basicConsume(queue, false /* autoack */, this)
}
override
def handleDelivery(consumerTag: String, envelope: Envelope,
properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
val msg = new String(body, "UTF-8")
val deliveryTag = envelope.getDeliveryTag.toString
val exchange = envelope.getExchange
val routingKey = envelope.getRoutingKey
val priority = properties.getPriority.toString
val redeliver = if (envelope.isRedeliver) "Y" else "N"
val content = String.format("redeliver: %-2s | msg = %-6s | priority = %-2s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ", redeliver, msg, priority, deliveryTag, exchange, routingKey)
logger.info(content)
Thread.sleep(1000 * workload) // 模拟处理一条消息需要耗时n秒
channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
}
}