[关闭]
@hainingwyx 2020-10-13T21:48:30.000000Z 字数 3924 阅读 758

消息发布

rabbitmq



可靠投递


mandatory

当交换器无法路由消息,RabbitMQ将回发Basic.Return消息到发布者,同时回发完整消息。
Basic.Return是异步的,在消息发布后的任何时候都可能发生。
在rabbitpy库中,客户端自动接收Basic.Return,并触发异常

示例程序:发布失败

  1. import datetime
  2. import rabbitpy
  3. # Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
  4. with rabbitpy.Connection() as connection:
  5. with connection.channel() as channel:
  6. # Create the message to send
  7. body = 'server.cpu.utilization 25.5 1350884514'
  8. message = rabbitpy.Message(channel,
  9. body,
  10. {'content_type': 'text/plain',
  11. 'timestamp': datetime.datetime.now(),
  12. 'message_type': 'graphite metric'})
  13. # Publish the message to the exchange with the routing key
  14. # "server-metrics" and make sure it is routed to the exchange
  15. message.publish('chapter2-example', 'server-metrics', mandatory=True)

示例程序:异常捕获

  1. import datetime
  2. import rabbitpy
  3. # Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
  4. connection = rabbitpy.Connection()
  5. try:
  6. with connection.channel() as channel:
  7. properties = {'content_type': 'text/plain',
  8. 'timestamp': datetime.datetime.now(),
  9. 'message_type': 'graphite metric'}
  10. body = 'server.cpu.utilization 25.5 1350884514'
  11. message = rabbitpy.Message(channel, body, properties)
  12. message.publish('chapter2-example',
  13. 'server-metrics',
  14. mandatory=True)
  15. except rabbitpy.exceptions.MessageReturnedException as error:
  16. print('Publish failure: %s' % error)

发布者确认

发布者发送给RabbitMQ的每条消息,服务器发送一个确认(Basic.Ack)或者否认相应(Basic.Nack)。

示例程序:发布者确认

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel, 'chapter4-example')
  5. exchange.declare()
  6. channel.enable_publisher_confirms()
  7. message = rabbitpy.Message(channel,
  8. 'This is an important message',
  9. {'content_type': 'text/plain',
  10. 'message_type': 'very important'})
  11. if message.publish('chapter4-example', 'important.message'):
  12. print('The message was confirmed')

rabbitpy中没有使用回调的方法,而是在确认收到之前会一直阻塞。

备用交换器

处理无法路由的消息。当不可路由的消息发布到已经定义了备用交换器的交换器中时,它将被路由到备用交换器。

示例程序:备用交换器

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. my_ae = rabbitpy.Exchange(channel, 'my-ae',
  5. exchange_type='fanout')
  6. my_ae.declare()
  7. args = {'alternate-exchange': my_ae.name}
  8. exchange = rabbitpy.Exchange(channel,
  9. 'graphite',
  10. exchange_type='topic',
  11. arguments=args)
  12. exchange.declare()
  13. queue = rabbitpy.Queue(channel, 'unroutable-messages')
  14. queue.declare()
  15. if queue.bind(my_ae, '#'):
  16. print('Queue bound to alternate-exchange')

基于事务的批量处理

确保消息投递成功。

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. tx = rabbitpy.Tx(channel)
  5. tx.select()
  6. message = rabbitpy.Message(channel,
  7. 'This is an important message',
  8. {'content_type': 'text/plain',
  9. 'delivery_mode': 2,
  10. 'message_type': 'important'})
  11. message.publish('chapter4-example', 'important.message')
  12. try:
  13. if tx.commit():
  14. print('Transaction committed')
  15. except rabbitpy.exceptions.NoActiveTransactionError:
  16. print('Tried to commit without active transaction')

由于错误无法路由消息时,将及时返回Basic.Return。发布者应发送TX.RollbackRPC请求并等待来自代理服务器的TX.Rollback相应,然后继续后续的工作。

HA队列

允许队列在多个服务器拥有冗余副本。
发布者发送消息到集群中的任何节点。RabbitMQ节点同步队列中消息的状态。发布的消息被放入队列,并存储在每台服务器上。

示例代码:HA队列声明

  1. import rabbitpy
  2. connection = rabbitpy.Connection()
  3. try:
  4. with connection.channel() as channel:
  5. queue = rabbitpy.Queue(channel,
  6. 'my-ha-queue',
  7. arguments={'x-ha-policy': 'all'})
  8. if queue.declare():
  9. print('Queue declared')
  10. except rabbitpy.exceptions.RemoteClosedChannelException as error:
  11. print('Queue declare failed: %s' % error)

示例代码:HA队列指定节点

  1. import rabbitpy
  2. connection = rabbitpy.Connection()
  3. try:
  4. with connection.channel() as channel:
  5. arguments = {'x-ha-policy': 'nodes',
  6. 'x-ha-nodes': ['rabbit@node1',
  7. 'rabbit@node2',
  8. 'rabbit@node3']}
  9. queue = rabbitpy.Queue(channel,
  10. 'my-2nd-ha-queue',
  11. arguments=arguments)
  12. if queue.declare():
  13. print('Queue declared')
  14. except rabbitpy.exceptions.RemoteClosedChannelException as error:
  15. print('Queue declare failed: %s' % error)

消息持久化

delivery-mode=1,保存到内存;delivery-mode=2,保存到磁盘。服务器重启后,消息仍在队列中(队列必须声明为durable)。

如果rabbitmq经常等待操作系统响应读写请求,消息吞吐量将大大降低。

rabbitmq回推

发布者发布消息太快,RabbitMQ发送Channel.Flow RPC方法,阻塞发布者。发布者不能发送消息直到收到另一条Channel.Flow命令。

示例代码:检测连接状态

  1. import rabbitpy
  2. connection = rabbitpy.Connection()
  3. print('Channel is Blocked? %s' % connection.blocked)
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注