[关闭]
@hainingwyx 2020-10-09T21:20:33.000000Z 字数 12470 阅读 813

消息路由

rabbitmq



direct交换器


特点

示例代码:Direct交换器

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel, 'direct-example',
  5. exchange_type='direct')
  6. exchange.declare()

示例场景

RPC worker消费图片实现面部识别,将结果发回给消息发布方。

此处输入图片的描述

注意:RabbitMQ最大帧大小为131072字节,,消息体超过这个大小,就需要在AMQP协议级别分块。预先分配占用7字节,因此,每个消息体帧只能承载131065字节图片数据。

示例代码:RPC Publisher

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import utils
  5. # Open the channel and connection
  6. connection = rabbitpy.Connection()
  7. channel = connection.channel()
  8. exchange = rabbitpy.DirectExchange(channel, 'rpc-replies')
  9. exchange.declare()
  10. # Create the response queue that will automatically delete, is not durable and
  11. # is exclusive to this publisher
  12. queue_name = 'response-queue-%s' % os.getpid()
  13. response_queue = rabbitpy.Queue(channel,
  14. queue_name,
  15. auto_delete=True,
  16. durable=False,
  17. exclusive=True)
  18. # Declare the response queue
  19. if response_queue.declare():
  20. print('Response queue declared')
  21. # Bind the response queue
  22. if response_queue.bind('rpc-replies', queue_name):
  23. print('Response queue bound')
  24. # Iterate through the images to send RPC requests for
  25. for img_id, filename in enumerate(utils.get_images()):
  26. print('Sending request for image #%s: %s' % (img_id, filename))
  27. # Create the message
  28. message = rabbitpy.Message(channel,
  29. utils.read_image(filename),
  30. {'content_type': utils.mime_type(filename),
  31. 'correlation_id': str(img_id),
  32. 'reply_to': queue_name},
  33. opinionated=True)
  34. # Pubish the message
  35. message.publish('direct-rpc-requests', 'detect-faces')
  36. # Loop until there is a response message
  37. message = None
  38. while not message:
  39. time.sleep(0.5)
  40. message = response_queue.get()
  41. # Ack the response message
  42. message.ack()
  43. # Caculate how long it took from publish to response
  44. duration = (time.time() -
  45. time.mktime(message.properties['headers']['first_publish']))
  46. print('Facial detection RPC call for image %s total duration: %s' %
  47. (message.properties['correlation_id'], duration))
  48. # Display the image in the IPython notebook interface
  49. utils.display_image(message.body, message.properties['content_type'])
  50. print('RPC requests processed')
  51. # Close the channel and connection
  52. channel.close()
  53. connection.close()

示例代码:RPC worker

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import detect
  5. from ch6 import utils
  6. # Open the connection and the channel
  7. connection = rabbitpy.Connection()
  8. channel = connection.channel()
  9. # Create the worker queue
  10. queue_name = 'rpc-worker-%s' % os.getpid()
  11. queue = rabbitpy.Queue(channel, queue_name,
  12. auto_delete=True,
  13. durable=False,
  14. exclusive=True)
  15. # Declare the worker queue
  16. if queue.declare():
  17. print('Worker queue declared')
  18. # Bind the worker queue
  19. if queue.bind('direct-rpc-requests', 'detect-faces'):
  20. print('Worker queue bound')
  21. # Consume messages from RabbitMQ
  22. for message in queue.consume_messages():
  23. # Display how long it took for the message to get here
  24. duration = time.time() - int(message.properties['timestamp'].strftime('%s'))
  25. print('Received RPC request published %.2f seconds ago' % duration)
  26. # Write out the message body to a temp file for facial detection process
  27. temp_file = utils.write_temp_file(message.body,
  28. message.properties['content_type'])
  29. # Detect faces
  30. result_file = detect.faces(temp_file)
  31. # Build response properties including the timestamp from the first publish
  32. properties = {'app_id': 'Chapter 6 Listing 2 Consumer',
  33. 'content_type': message.properties['content_type'],
  34. 'correlation_id': message.properties['correlation_id'],
  35. 'headers': {
  36. 'first_publish': message.properties['timestamp']}}
  37. # The result file could just be the original image if nothing detected
  38. body = utils.read_image(result_file)
  39. # Remove the temp file
  40. os.unlink(temp_file)
  41. # Remove the result file
  42. os.unlink(result_file)
  43. # Publish the response response
  44. response = rabbitpy.Message(channel, body, properties, opinionated=True)
  45. response.publish('rpc-replies', message.properties['reply_to'])
  46. # Acknowledge the delivery of the RPC request message
  47. message.ack()

fanout交换器

特点

示例代码

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel,
  5. 'fanout-rpc-requests',
  6. exchange_type='fanout')
  7. exchange.declare()

示例场景

此处输入图片的描述

示例程序:Publisher

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import utils
  5. # Open the channel and connection
  6. connection = rabbitpy.Connection()
  7. channel = connection.channel()
  8. # Create the response queue that will automatically delete, is not durable and
  9. # is exclusive to this publisher
  10. queue_name = 'response-queue-%s' % os.getpid()
  11. response_queue = rabbitpy.Queue(channel,
  12. queue_name,
  13. auto_delete=True,
  14. durable=False,
  15. exclusive=True)
  16. # Declare the response queue
  17. if response_queue.declare():
  18. print('Response queue declared')
  19. # Bind the response queue
  20. if response_queue.bind('rpc-replies', queue_name):
  21. print('Response queue bound')
  22. # Iterate through the images to send RPC requests for
  23. for img_id, filename in enumerate(utils.get_images()):
  24. print 'Sending request for image #%s: %s' % (img_id, filename)
  25. # Create the message
  26. message = rabbitpy.Message(channel,
  27. utils.read_image(filename),
  28. {'content_type': utils.mime_type(filename),
  29. 'correlation_id': str(img_id),
  30. 'reply_to': queue_name},
  31. opinionated=True)
  32. # Pubish the message
  33. message.publish('fanout-rpc-requests')
  34. # Loop until there is a response message
  35. message = None
  36. while not message:
  37. time.sleep(0.5)
  38. message = response_queue.get()
  39. # Ack the response message
  40. message.ack()
  41. # Caculate how long it took from publish to response
  42. duration = (time.time() -
  43. time.mktime(message.properties['headers']['first_publish']))
  44. print('Facial detection RPC call for image %s total duration: %s' %
  45. (message.properties['correlation_id'], duration))
  46. # Display the image in the IPython notebook interface
  47. utils.display_image(message.body, message.properties['content_type'])
  48. print 'RPC requests processed'
  49. # Close the channel and connection
  50. channel.close()
  51. connection.close()

示例程序:detect worker

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import detect
  5. from ch6 import utils
  6. # Open the connection and the channel
  7. connection = rabbitpy.Connection()
  8. channel = connection.channel()
  9. # Create the worker queue
  10. queue_name = 'rpc-worker-%s' % os.getpid()
  11. queue = rabbitpy.Queue(channel, queue_name,
  12. auto_delete=True,
  13. durable=False,
  14. exclusive=True)
  15. # Declare the worker queue
  16. if queue.declare():
  17. print('Worker queue declared')
  18. # Bind the worker queue
  19. if queue.bind('fanout-rpc-requests'):
  20. print('Worker queue bound')
  21. # Consume messages from RabbitMQ
  22. for message in queue.consume_messages():
  23. # Display how long it took for the message to get here
  24. duration = time.time() - int(message.properties['timestamp'].strftime('%s'))
  25. print('Received RPC request published %.2f seconds ago' % duration)
  26. # Write out the message body to a temp file for facial detection process
  27. temp_file = utils.write_temp_file(message.body,
  28. message.properties['content_type'])
  29. # Detect faces
  30. result_file = detect.faces(temp_file)
  31. # Build response properties including the timestamp from the first publish
  32. properties = {'app_id': 'Chapter 6 Listing 2 Consumer',
  33. 'content_type': message.properties['content_type'],
  34. 'correlation_id': message.properties['correlation_id'],
  35. 'headers': {
  36. 'first_publish': message.properties['timestamp']}}
  37. # The result file could just be the original image if nothing detected
  38. body = utils.read_image(result_file)
  39. # Remove the temp file
  40. os.unlink(temp_file)
  41. # Remove the result file
  42. os.unlink(result_file)
  43. # Publish the response response
  44. response = rabbitpy.Message(channel, body, properties)
  45. response.publish('rpc-replies', message.properties['reply_to'])
  46. # Acknowledge the delivery of the RPC request message
  47. message.ack()

示例程序:Hash Consumer

  1. import os
  2. import hashlib
  3. import rabbitpy
  4. # Open the connection and the channel
  5. connection = rabbitpy.Connection()
  6. channel = connection.channel()
  7. # Create the worker queue
  8. queue_name = 'hashing-worker-%s' % os.getpid()
  9. queue = rabbitpy.Queue(channel, queue_name,
  10. auto_delete=True,
  11. durable=False,
  12. exclusive=True)
  13. # Declare the worker queue
  14. if queue.declare():
  15. print('Worker queue declared')
  16. # Bind the worker queue
  17. if queue.bind('fanout-rpc-requests'):
  18. print('Worker queue bound')
  19. # Consume messages from RabbitMQ
  20. for message in queue.consume_messages():
  21. # Create the hashing object
  22. hash_obj = hashlib.md5(message.body)
  23. # Print out the info, this might go into a database or log file
  24. print('Image with correlation-id of %s has a hash of %s' %
  25. (message.properties['correlation_id'],
  26. hash_obj.hexdigest()))
  27. # Acknowledge the delivery of the RPC request message
  28. message.ack()

topic交换器

特点

示例场景

此处输入图片的描述

headers交换器

特点

示例代码

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel,
  5. 'headers-rpc-requests',
  6. exchange_type='headers')
  7. exchange.declare()

示例程序:publisher

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import utils
  5. # Open the channel and connection
  6. connection = rabbitpy.Connection()
  7. channel = connection.channel()
  8. # Create the response queue that will automatically delete, is not durable and
  9. # is exclusive to this publisher
  10. queue_name = 'response-queue-%s' % os.getpid()
  11. response_queue = rabbitpy.Queue(channel,
  12. queue_name,
  13. auto_delete=True,
  14. durable=False,
  15. exclusive=True)
  16. # Declare the response queue
  17. if response_queue.declare():
  18. print('Response queue declared')
  19. # Bind the response queue
  20. if response_queue.bind('rpc-replies', queue_name):
  21. print('Response queue bound')
  22. # Iterate through the images to send RPC requests for
  23. for img_id, filename in enumerate(utils.get_images()):
  24. print('Sending request for image #%s: %s' % (img_id, filename))
  25. # Create the message
  26. message = rabbitpy.Message(channel,
  27. utils.read_image(filename),
  28. {'content_type': utils.mime_type(filename),
  29. 'correlation_id': str(img_id),
  30. 'headers': {'source': 'profile',
  31. 'object': 'image',
  32. 'action': 'new'},
  33. 'reply_to': queue_name},
  34. opinionated=True)
  35. # Pubish the message
  36. message.publish('headers-rpc-requests')
  37. # Loop until there is a response message
  38. message = None
  39. while not message:
  40. time.sleep(0.5)
  41. message = response_queue.get()
  42. # Ack the response message
  43. message.ack()
  44. # Caculate how long it took from publish to response
  45. duration = (time.time() -
  46. time.mktime(message.properties['headers']['first_publish']))
  47. print('Facial detection RPC call for image %s total duration: %s' %
  48. (message.properties['correlation_id'], duration))
  49. # Display the image in the IPython notebook interface
  50. utils.display_image(message.body, message.properties['content_type'])
  51. print('RPC requests processed')
  52. # Close the channel and connection
  53. channel.close()
  54. connection.close()

示例程序:worker

  1. import os
  2. import rabbitpy
  3. import time
  4. from ch6 import detect
  5. from ch6 import utils
  6. # Open the connection and the channel
  7. connection = rabbitpy.Connection()
  8. channel = connection.channel()
  9. # Create the worker queue
  10. queue_name = 'rpc-worker-%s' % os.getpid()
  11. queue = rabbitpy.Queue(channel, queue_name,
  12. auto_delete=True,
  13. durable=False,
  14. exclusive=True)
  15. # Declare the worker queue
  16. if queue.declare():
  17. print('Worker queue declared')
  18. # Bind the worker queue
  19. if queue.bind('headers-rpc-requests',
  20. arguments={'x-match': 'all',
  21. 'source': 'profile',
  22. 'object': 'image',
  23. 'action': 'new'}):
  24. print('Worker queue bound')
  25. # Consume messages from RabbitMQ
  26. for message in queue.consume_messages():
  27. # Display how long it took for the message to get here
  28. duration = time.time() - int(message.properties['timestamp'].strftime('%s'))
  29. print('Received RPC request published %.2f seconds ago' % duration)
  30. # Write out the message body to a temp file for facial detection process
  31. temp_file = utils.write_temp_file(message.body,
  32. message.properties['content_type'])
  33. # Detect faces
  34. result_file = detect.faces(temp_file)
  35. # Build response properties including the timestamp from the first publish
  36. properties = {'app_id': 'Chapter 6 Listing 2 Consumer',
  37. 'content_type': message.properties['content_type'],
  38. 'correlation_id': message.properties['correlation_id'],
  39. 'headers': {
  40. 'first_publish': message.properties['timestamp']}}
  41. # The result file could just be the original image if nothing detected
  42. body = utils.read_image(result_file)
  43. # Remove the temp file
  44. os.unlink(temp_file)
  45. # Remove the result file
  46. os.unlink(result_file)
  47. # Publish the response response
  48. response = rabbitpy.Message(channel, body, properties, opinionated=True)
  49. response.publish('rpc-replies', message.properties['reply_to'])
  50. # Acknowledge the delivery of the RPC request message
  51. message.ack()

交换器路由

交换器间绑定,使用RPC方法Exchange.Bind。

示例场景

test

示例代码:

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. tpc = rabbitpy.Exchange(channel, 'events',
  5. exchange_type='topic')
  6. tpc.declare()
  7. xch = rabbitpy.Exchange(channel, 'distributed-events',
  8. exchange_type='x-consistent-hash')
  9. xch.declare()
  10. xch.bind(foo, '#')

一致性哈希交换器

用于消息队列的负载均衡,可以提升吞吐量

示例场景

示例代码:采用路由键的哈希值来分发消息

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel, 'image-storage',
  5. exchange_type='x-consistent-hash')
  6. exchange.declare()

示例代码:header中的属性值作为哈希值

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. exchange = rabbitpy.Exchange(channel, 'image-storage',
  5. exchange_type='x-consistent-hash',
  6. arguments={'hash-header': 'image-hash'})
  7. exchange.declare()

示例代码:队列的创建与绑定

  1. import rabbitpy
  2. with rabbitpy.Connection() as connection:
  3. with connection.channel() as channel:
  4. for queue_num in range(4):
  5. queue = rabbitpy.Queue (channel, 'server%s' % queue_num)
  6. queue.declare()
  7. queue.bind('image-storage', '10')
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注