[关闭]
@xtccc 2017-04-09T10:19:40.000000Z 字数 11882 阅读 2541

消息处理

给我写信
GitHub

此处输入图片的描述

RabbitMQ


目录:


1. Producing Messages


发送消息时,不需要对producer进行流量控制,他可以消息全部发到exchange中。

  1. package cn.gridx.rabbitmq.tutorials
  2. import com.rabbitmq.client.{BuiltinExchangeType, MessageProperties, ConnectionFactory}
  3. import org.slf4j.LoggerFactory
  4. object Sender {
  5. def HOST = "xx"
  6. def USER = "yyy"
  7. def VHOST = "vhost_tao"
  8. def PASSWD = "zzzz"
  9. def EXCHANGE = "Hub"
  10. def ROUTING_KEY = "G40"
  11. def Q = "Q"
  12. val logger = LoggerFactory.getLogger(getClass)
  13. def main(args: Array[String]) = {
  14. val factory = new ConnectionFactory()
  15. factory.setHost(HOST)
  16. factory.setUsername(USER)
  17. factory.setVirtualHost(VHOST)
  18. factory.setPassword(PASSWD)
  19. logger.info("sender已创建")
  20. val conn = factory.newConnection()
  21. val channel = conn.createChannel()
  22. // 声明1个topic类型的exchange
  23. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)
  24. logger.info(s"已创建exchange: $EXCHANGE")
  25. // 声明1个名为"Q"的queue
  26. channel.queueDeclare(Q, true/*durable*/, false/*exclusive*/, false/*autoDelete*/, null)
  27. logger.info(s"已声明Queue: $Q")
  28. // 创建从exchange到queue的binding,为这个binding指定routing_key
  29. channel.queueBind(Q, EXCHANGE, ROUTING_KEY)
  30. logger.info(s"exchange ($EXCHANGE) 已通过 routing key ($ROUTING_KEY) 绑定到 queue ($Q)")
  31. var i = 1
  32. logger.info("开始发送")
  33. while (i <= 10) {
  34. val msg = s"#$i"
  35. channel.basicPublish(EXCHANGE, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes)
  36. logger.info(s"#$i => ($EXCHANGE, $ROUTING_KEY)")
  37. i += 1
  38. }
  39. channel.close()
  40. conn.close()
  41. logger.info("发送结束")
  42. }
  43. }

上面的代码向exchange("Hub")发送了10条消息,这些消息全部被发送到名为"Q"的queue。



2. Consuming Messages


2.1 获取消息的两种途径

Queues are like named mailboxes. They’re where messages end up and wait to be consumed. Consumers receive messages from a particular queue in one of two ways:



下面是subscribe模式的例子。

  1. package cn.gridx.rabbitmq.tutorials
  2. import com.rabbitmq.client._
  3. import org.slf4j.LoggerFactory
  4. object Receiver {
  5. def HOST = "xx"
  6. def VHOST = "vhost_tao"
  7. def USER = "yyy"
  8. def PASSWD = "zzzz"
  9. def EXCHANGE = "Hub"
  10. def ROUTING_KEY = "G40"
  11. def Q = "Q"
  12. val logger = LoggerFactory.getLogger(getClass)
  13. def main(args: Array[String]): Unit = {
  14. val factory = new ConnectionFactory()
  15. factory.setHost(HOST)
  16. factory.setUsername(USER)
  17. factory.setVirtualHost(VHOST)
  18. factory.setPassword(PASSWD)
  19. val conn = factory.newConnection()
  20. val channel = conn.createChannel()
  21. logger.info("channel已创建")
  22. // 在channel中声明1个queue
  23. channel.queueDeclare(Q, true, false, false, null)
  24. logger.info(s"queue已声明: $Q")
  25. // 创建3个consumers, 它们会试图消费同一个channel中的消息
  26. /** By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion) */
  27. val consumer1 = new TheConsumer("A", Q, channel)
  28. val consumer2 = new TheConsumer("B", Q, channel)
  29. val consumer3 = new TheConsumer("C", Q, channel)
  30. logger.info("3个Consumers (A/B/C) 已就绪 ... ")
  31. /**
  32. * RabbitMQ just dispatches a message when the message enters the queue.
  33. * It doesn't look at the number of unacknowledged messages for a consumer
  34. * */
  35. consumer1.start()
  36. consumer2.start()
  37. consumer3.start()
  38. logger.info(s"正在主程序中 ....")
  39. }
  40. }
  41. /** Consumer Class */
  42. class TheConsumer(consumerName: String, queue: String, channel: Channel) extends DefaultConsumer(channel) {
  43. val logger = LoggerFactory.getLogger(getClass)
  44. def start(): Unit = {
  45. // 调用一次即可
  46. channel.basicConsume(queue, false, this)
  47. }
  48. /** 收到消息后的回调方法 */
  49. override
  50. def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
  51. val msg = new String(body, "UTF-8")
  52. val deliveryTag = envelope.getDeliveryTag.toString
  53. val exchange = envelope.getExchange
  54. val routingKey = envelope.getRoutingKey
  55. val redeliver = if (envelope.isRedeliver) "Y" else "N"
  56. val content = String.format("consumer name: %-4s | redeliver: %-2s | msg = %-6s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ",
  57. consumerName, redeliver, msg, deliveryTag, exchange, routingKey)
  58. logger.info(content)
  59. Thread.sleep(1000) // 模拟处理一条消息需要耗时1s
  60. channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
  61. // 不要再次调用basicConsume
  62. }
  63. }

如果我们先运行Consumer程序,然后再运行Producer程序,则会看到Consumer程序的输出如下:
此处输入图片的描述


2.2 Round-robin Dispatching

By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion)

多个consumer公用一个channel

可以看到,3个consumer都在试图消费同一个queue中的消息(它们共享同一个channel),因此RabbitMQ轮流将消息message发送给它们,即:第一条送给A,等A处理完后,再将下一条送给B,等B处理完后,再将下一条送给C,以此类推
这里处理完的意思是:consumer回送一个ACK(我们在调用basicConsume时将autoAck设置为了false)。

可见,在某一个channel中,如果一条已派发出去的消息没有被consumer回应(acknowledged),则下一条消息不会被派发到该channel中

所以,虽然这里有3个consumer存在,但是总是有2个consumer处于空闲的状态,怎么让3个consumer都忙起来呢? 让每个consumer拥有自己的channel

每个consumer拥有自己的channel

代码稍作修改:

  1. object Receiver {
  2. ....
  3. def main(args: Array[String]) {
  4. ....
  5. // 创建3个consumer, 它们会在各自的channel中共同消费同一个Queue中的message
  6. /** By default, RabbitMQ will send each message to the next consumer, in sequence (round-robin fashion) */
  7. val consumer1 = {
  8. val channel = conn.createChannel()
  9. channel.queueDeclare(Q, true, false, false, null)
  10. logger.info("A的channel已创建")
  11. new TheConsumer("A", Q, channel)
  12. }
  13. val consumer2 = {
  14. val channel = conn.createChannel()
  15. channel.queueDeclare(Q, true, false, false, null)
  16. logger.info("B的channel已创建")
  17. new TheConsumer("B", Q, channel)
  18. }
  19. val consumer3 = {
  20. val channel = conn.createChannel()
  21. channel.queueDeclare(Q, true, false, false, null)
  22. logger.info("C的channel已创建")
  23. new TheConsumer("C", Q, channel)
  24. }
  25. logger.info("3个Consumers (A/B/C) 已就绪 ... ")
  26. }
  27. ...
  28. }

再次运行Receiver程序,可以看到:
此处输入图片的描述

Acknowledgement

如果一个consumer收到了一条消息后,始终不回送ack,那么会怎么样?
它就再也不会收到新的消息,已收到的但是还未回应ACK的那条消息也不会再次被发送到其他consumer。但是如果它挂了(channel关闭,连接断开,或者或者应用崩溃),那条未被回应ACK的消息则会被重新发送到下一个可用的consumer。


在多个consumers之间分发消息

  1. 一个consumer收到一条消息后,如果它没有回应ack,那么即使queue中还有消息,也不会再向该consumer发送消息。但是,queue的下一条消息会继续发送给另一个consumer。如果所有consumer都没有回送ack,那么queue中的其他消息就不会再被发送给任何一个consumer了,除非某个consumer回送了ack,或者又有新的consumer订阅了该queue。
  2. 如果多个consumer订阅了同一个queue,每个consumer有自己的channel,那么当1条消息发送给了消费者A之后,下一条消息会发送消费者B,以此类推。


同时接收处理多条消息

  1. class TheConsumer(consumerName: String, queue: String, conn: Connection, workload: Int) {
  2. val logger = LoggerFactory.getLogger(getClass)
  3. def start(): Unit = {
  4. // 如果需要同同时最多消费3个message(channel不设置autoAck, 并且这个三个message都没有返回ack)
  5. // 那么必须创建3个channel
  6. // 对于一个channel, 如果它的handler没有调用basci ack,是无论如何也无法收到下一条消息的
  7. for (i <- 0 until 3) {
  8. val channel = conn.createChannel()
  9. channel.queueDeclare(queue, true, false, false, null)
  10. channel.basicQos(1)
  11. logger.info(s"${consumerName}的channel${i}已创建")
  12. channel.basicConsume(queue, false /* autoack */ , new MsgHandler(i.toString, channel))
  13. }
  14. }
  15. class MsgHandler(name: String, channel: Channel) extends DefaultConsumer(channel) {
  16. logger.info(s"MsgHandler $name is instantiated ...")
  17. override
  18. def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
  19. val msg = new String(body, "UTF-8")
  20. val deliveryTag = envelope.getDeliveryTag.toString
  21. val exchange = envelope.getExchange
  22. val routingKey = envelope.getRoutingKey
  23. val redeliver = if (envelope.isRedeliver) "Y" else "N"
  24. val headers = properties.getHeaders
  25. val jobType = headers.get("job-type")
  26. val content = String.format("consumer name: %-4s | redeliver: %-2s | jobType = %-10s | msg = %-6s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ", consumerName, redeliver, jobType, msg, deliveryTag, exchange, routingKey)
  27. logger.info(content)
  28. Thread.sleep(1000 * workload) // 模拟处理一条消息需要耗时n秒
  29. channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
  30. // 不需要再次调用 channel.basicConsume(queue, false, this)
  31. }
  32. }
  33. }


2.3 Dead letters


2.4 QoS(Channel Prefetch Settring)

消息首先被发送给某个channel,然后channel再发送给consumer。在任意时刻,一个channel里都可能有多条消息(unacknowledged messages)等待被送至consumer,但是如果consumer还没ack上一条消息,那么channel中的消息就不会被送至consumer。

通过对consumer的channel设置basic.qos,可以限制channle中unacknowledged messages的最大数量。如果达到了这个数量,则RabbitQos就不会再向该channel发送消息,直到该channle中的unacknowledged messages的数量小于预设的值。QoS也被称为prefetch count




3. Order & Priority


参考


3.1 消息delivery的顺序

同一条通路里面的消息,是满足FIFO的顺序的。
这里,同一条通路,指的是producer -> channel -> route -> channel -> consumer。
如果多个consumer订阅了同一个queue,则这些consumers之间的消息顺序是无法保证的。

3.2 Priority Queue

如果一个queue在声明时,为其加入属性x-max-priority(该queue中的消息所允许指定的最高优先级),则它就是一个priority queue,里面的message都是可以指定优先级的(通过basic.propertiespriority属性)。数字越大,优先级越高。、

消息的prioroty属性实际上以一个无符号字节来记录,所以消息的优先级的范围是0~255。

未定义priority的消息的优先级事实上为0。

注意:如果producer在声明一个queue时将其指定为priority queue,那么consumer在订阅该queue时,也必须将其指定为priority queue,否则会有异常。

  1. import java.util
  2. import com.rabbitmq.client._
  3. import org.slf4j.LoggerFactory
  4. object PriorityQueue {
  5. val logger = LoggerFactory.getLogger(getClass)
  6. def HOST = "xx"
  7. def USER = "yyyy"
  8. def VHOST = "vhost_tao"
  9. def PASSWD = "zzzzz"
  10. def EXCHANGE = "Hub"
  11. def ROUTING_KEY = "G25"
  12. def Q25 = "Q25"
  13. val PersistentMode = 2
  14. val TextMsgType = "text/plain"
  15. def main(args: Array[String]): Unit = {
  16. val factory = new ConnectionFactory()
  17. factory.setHost(HOST)
  18. factory.setUsername(USER)
  19. factory.setVirtualHost(VHOST)
  20. factory.setPassword(PASSWD)
  21. logger.info("ConnectionFactory已创建")
  22. sendMsg(factory)
  23. receiveMsg(factory)
  24. }
  25. /**
  26. * 创建一个priority queue, 并向其发送消息
  27. * */
  28. def sendMsg(factory: ConnectionFactory): Unit = {
  29. val conn = factory.newConnection()
  30. val channel = conn.createChannel()
  31. channel.confirmSelect()
  32. logger.info("Channel已创建")
  33. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)
  34. logger.info(s"已声明exchange: $EXCHANGE")
  35. val queueArgs: util.HashMap[String, Object] = new java.util.HashMap[String, Object]()
  36. queueArgs.put("x-max-priority", new Integer(5))
  37. channel.queueDeclare(Q25, true, false, false, queueArgs)
  38. logger.info(s"声明了一个优先级最高为5的queue: ${Q25}")
  39. channel.queueBind(Q25, EXCHANGE, ROUTING_KEY)
  40. logger.info(s"exchange ($EXCHANGE) 已通过 routing key ($ROUTING_KEY) 绑定到 queue ($Q25)")
  41. logger.info("开始发送30条消息,其中前20条消息的优先级为1, 后10条的优先级为3")
  42. val propertiesBuilder = new AMQP.BasicProperties().builder()
  43. propertiesBuilder.contentType(TextMsgType).deliveryMode(PersistentMode).priority(1)
  44. for (i <- 1 to 20) {
  45. val msg = s"Msg-#$i"
  46. channel.basicPublish(EXCHANGE, ROUTING_KEY, propertiesBuilder.build(), msg.getBytes())
  47. }
  48. propertiesBuilder.contentType("text/plain").deliveryMode(2).priority(5)
  49. for (i <- 21 to 30) {
  50. val msg = s"Msg-#$i"
  51. channel.basicPublish(EXCHANGE, ROUTING_KEY, propertiesBuilder.build(), msg.getBytes())
  52. }
  53. channel.waitForConfirmsOrDie()
  54. logger.info("发送结束")
  55. }
  56. /**
  57. * 从priority queue中接收消息
  58. * */
  59. def receiveMsg(factory: ConnectionFactory) = {
  60. val conn = factory.newConnection()
  61. val channel = conn.createChannel()
  62. logger.info("Channel 已创建")
  63. // sender已经在声明queue时加入了优先级的属性
  64. // 这里的receiver在声明queue时仍然必须加入优先级的属性
  65. val queueArgs: util.HashMap[String, Object] = new java.util.HashMap[String, Object]()
  66. queueArgs.put("x-max-priority", new Integer(5))
  67. channel.queueDeclare(Q25, true, false, false, queueArgs)
  68. channel.basicQos(1)
  69. logger.info("queue 已声明一个最高优先级为5的queue")
  70. val consumer = new TheConsumer("my-consumer", Q25, channel, 1)
  71. consumer.start()
  72. logger.info("consumer已启动")
  73. }
  74. }
  75. class TheConsumer(consumerName: String, queue: String, channel: Channel, workload: Int)
  76. extends DefaultConsumer(channel) {
  77. val logger = LoggerFactory.getLogger(getClass)
  78. def start(): Unit = {
  79. channel.basicQos(1)
  80. channel.basicConsume(queue, false /* autoack */, this)
  81. }
  82. override
  83. def handleDelivery(consumerTag: String, envelope: Envelope,
  84. properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
  85. val msg = new String(body, "UTF-8")
  86. val deliveryTag = envelope.getDeliveryTag.toString
  87. val exchange = envelope.getExchange
  88. val routingKey = envelope.getRoutingKey
  89. val priority = properties.getPriority.toString
  90. val redeliver = if (envelope.isRedeliver) "Y" else "N"
  91. val content = String.format("redeliver: %-2s | msg = %-6s | priority = %-2s | deliverTag = %-4s | exchange = %-10s | routingKey = %-10s ", redeliver, msg, priority, deliveryTag, exchange, routingKey)
  92. logger.info(content)
  93. Thread.sleep(1000 * workload) // 模拟处理一条消息需要耗时n秒
  94. channel.basicAck(envelope.getDeliveryTag, false) // 处理完毕后,回送ack
  95. }
  96. }


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注