@hainingwyx
2020-10-09T13:20:33.000000Z
字数 12470
阅读 1069
rabbitmq
示例代码:Direct交换器
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:exchange = rabbitpy.Exchange(channel, 'direct-example',exchange_type='direct')exchange.declare()
RPC worker消费图片实现面部识别,将结果发回给消息发布方。

注意:RabbitMQ最大帧大小为131072字节,,消息体超过这个大小,就需要在AMQP协议级别分块。预先分配占用7字节,因此,每个消息体帧只能承载131065字节图片数据。
示例代码:RPC Publisher
import osimport rabbitpyimport timefrom ch6 import utils# Open the channel and connectionconnection = rabbitpy.Connection()channel = connection.channel()exchange = rabbitpy.DirectExchange(channel, 'rpc-replies')exchange.declare()# Create the response queue that will automatically delete, is not durable and# is exclusive to this publisherqueue_name = 'response-queue-%s' % os.getpid()response_queue = rabbitpy.Queue(channel,queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the response queueif response_queue.declare():print('Response queue declared')# Bind the response queueif response_queue.bind('rpc-replies', queue_name):print('Response queue bound')# Iterate through the images to send RPC requests forfor img_id, filename in enumerate(utils.get_images()):print('Sending request for image #%s: %s' % (img_id, filename))# Create the messagemessage = rabbitpy.Message(channel,utils.read_image(filename),{'content_type': utils.mime_type(filename),'correlation_id': str(img_id),'reply_to': queue_name},opinionated=True)# Pubish the messagemessage.publish('direct-rpc-requests', 'detect-faces')# Loop until there is a response messagemessage = Nonewhile not message:time.sleep(0.5)message = response_queue.get()# Ack the response messagemessage.ack()# Caculate how long it took from publish to responseduration = (time.time() -time.mktime(message.properties['headers']['first_publish']))print('Facial detection RPC call for image %s total duration: %s' %(message.properties['correlation_id'], duration))# Display the image in the IPython notebook interfaceutils.display_image(message.body, message.properties['content_type'])print('RPC requests processed')# Close the channel and connectionchannel.close()connection.close()
示例代码:RPC worker
import osimport rabbitpyimport timefrom ch6 import detectfrom ch6 import utils# Open the connection and the channelconnection = rabbitpy.Connection()channel = connection.channel()# Create the worker queuequeue_name = 'rpc-worker-%s' % os.getpid()queue = rabbitpy.Queue(channel, queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the worker queueif queue.declare():print('Worker queue declared')# Bind the worker queueif queue.bind('direct-rpc-requests', 'detect-faces'):print('Worker queue bound')# Consume messages from RabbitMQfor message in queue.consume_messages():# Display how long it took for the message to get hereduration = time.time() - int(message.properties['timestamp'].strftime('%s'))print('Received RPC request published %.2f seconds ago' % duration)# Write out the message body to a temp file for facial detection processtemp_file = utils.write_temp_file(message.body,message.properties['content_type'])# Detect facesresult_file = detect.faces(temp_file)# Build response properties including the timestamp from the first publishproperties = {'app_id': 'Chapter 6 Listing 2 Consumer','content_type': message.properties['content_type'],'correlation_id': message.properties['correlation_id'],'headers': {'first_publish': message.properties['timestamp']}}# The result file could just be the original image if nothing detectedbody = utils.read_image(result_file)# Remove the temp fileos.unlink(temp_file)# Remove the result fileos.unlink(result_file)# Publish the response responseresponse = rabbitpy.Message(channel, body, properties, opinionated=True)response.publish('rpc-replies', message.properties['reply_to'])# Acknowledge the delivery of the RPC request messagemessage.ack()
示例代码
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:exchange = rabbitpy.Exchange(channel,'fanout-rpc-requests',exchange_type='fanout')exchange.declare()

示例程序:Publisher
import osimport rabbitpyimport timefrom ch6 import utils# Open the channel and connectionconnection = rabbitpy.Connection()channel = connection.channel()# Create the response queue that will automatically delete, is not durable and# is exclusive to this publisherqueue_name = 'response-queue-%s' % os.getpid()response_queue = rabbitpy.Queue(channel,queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the response queueif response_queue.declare():print('Response queue declared')# Bind the response queueif response_queue.bind('rpc-replies', queue_name):print('Response queue bound')# Iterate through the images to send RPC requests forfor img_id, filename in enumerate(utils.get_images()):print 'Sending request for image #%s: %s' % (img_id, filename)# Create the messagemessage = rabbitpy.Message(channel,utils.read_image(filename),{'content_type': utils.mime_type(filename),'correlation_id': str(img_id),'reply_to': queue_name},opinionated=True)# Pubish the messagemessage.publish('fanout-rpc-requests')# Loop until there is a response messagemessage = Nonewhile not message:time.sleep(0.5)message = response_queue.get()# Ack the response messagemessage.ack()# Caculate how long it took from publish to responseduration = (time.time() -time.mktime(message.properties['headers']['first_publish']))print('Facial detection RPC call for image %s total duration: %s' %(message.properties['correlation_id'], duration))# Display the image in the IPython notebook interfaceutils.display_image(message.body, message.properties['content_type'])print 'RPC requests processed'# Close the channel and connectionchannel.close()connection.close()
示例程序:detect worker
import osimport rabbitpyimport timefrom ch6 import detectfrom ch6 import utils# Open the connection and the channelconnection = rabbitpy.Connection()channel = connection.channel()# Create the worker queuequeue_name = 'rpc-worker-%s' % os.getpid()queue = rabbitpy.Queue(channel, queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the worker queueif queue.declare():print('Worker queue declared')# Bind the worker queueif queue.bind('fanout-rpc-requests'):print('Worker queue bound')# Consume messages from RabbitMQfor message in queue.consume_messages():# Display how long it took for the message to get hereduration = time.time() - int(message.properties['timestamp'].strftime('%s'))print('Received RPC request published %.2f seconds ago' % duration)# Write out the message body to a temp file for facial detection processtemp_file = utils.write_temp_file(message.body,message.properties['content_type'])# Detect facesresult_file = detect.faces(temp_file)# Build response properties including the timestamp from the first publishproperties = {'app_id': 'Chapter 6 Listing 2 Consumer','content_type': message.properties['content_type'],'correlation_id': message.properties['correlation_id'],'headers': {'first_publish': message.properties['timestamp']}}# The result file could just be the original image if nothing detectedbody = utils.read_image(result_file)# Remove the temp fileos.unlink(temp_file)# Remove the result fileos.unlink(result_file)# Publish the response responseresponse = rabbitpy.Message(channel, body, properties)response.publish('rpc-replies', message.properties['reply_to'])# Acknowledge the delivery of the RPC request messagemessage.ack()
示例程序:Hash Consumer
import osimport hashlibimport rabbitpy# Open the connection and the channelconnection = rabbitpy.Connection()channel = connection.channel()# Create the worker queuequeue_name = 'hashing-worker-%s' % os.getpid()queue = rabbitpy.Queue(channel, queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the worker queueif queue.declare():print('Worker queue declared')# Bind the worker queueif queue.bind('fanout-rpc-requests'):print('Worker queue bound')# Consume messages from RabbitMQfor message in queue.consume_messages():# Create the hashing objecthash_obj = hashlib.md5(message.body)# Print out the info, this might go into a database or log fileprint('Image with correlation-id of %s has a hash of %s' %(message.properties['correlation_id'],hash_obj.hexdigest()))# Acknowledge the delivery of the RPC request messagemessage.ack()

示例代码
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:exchange = rabbitpy.Exchange(channel,'headers-rpc-requests',exchange_type='headers')exchange.declare()
示例程序:publisher
import osimport rabbitpyimport timefrom ch6 import utils# Open the channel and connectionconnection = rabbitpy.Connection()channel = connection.channel()# Create the response queue that will automatically delete, is not durable and# is exclusive to this publisherqueue_name = 'response-queue-%s' % os.getpid()response_queue = rabbitpy.Queue(channel,queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the response queueif response_queue.declare():print('Response queue declared')# Bind the response queueif response_queue.bind('rpc-replies', queue_name):print('Response queue bound')# Iterate through the images to send RPC requests forfor img_id, filename in enumerate(utils.get_images()):print('Sending request for image #%s: %s' % (img_id, filename))# Create the messagemessage = rabbitpy.Message(channel,utils.read_image(filename),{'content_type': utils.mime_type(filename),'correlation_id': str(img_id),'headers': {'source': 'profile','object': 'image','action': 'new'},'reply_to': queue_name},opinionated=True)# Pubish the messagemessage.publish('headers-rpc-requests')# Loop until there is a response messagemessage = Nonewhile not message:time.sleep(0.5)message = response_queue.get()# Ack the response messagemessage.ack()# Caculate how long it took from publish to responseduration = (time.time() -time.mktime(message.properties['headers']['first_publish']))print('Facial detection RPC call for image %s total duration: %s' %(message.properties['correlation_id'], duration))# Display the image in the IPython notebook interfaceutils.display_image(message.body, message.properties['content_type'])print('RPC requests processed')# Close the channel and connectionchannel.close()connection.close()
示例程序:worker
import osimport rabbitpyimport timefrom ch6 import detectfrom ch6 import utils# Open the connection and the channelconnection = rabbitpy.Connection()channel = connection.channel()# Create the worker queuequeue_name = 'rpc-worker-%s' % os.getpid()queue = rabbitpy.Queue(channel, queue_name,auto_delete=True,durable=False,exclusive=True)# Declare the worker queueif queue.declare():print('Worker queue declared')# Bind the worker queueif queue.bind('headers-rpc-requests',arguments={'x-match': 'all','source': 'profile','object': 'image','action': 'new'}):print('Worker queue bound')# Consume messages from RabbitMQfor message in queue.consume_messages():# Display how long it took for the message to get hereduration = time.time() - int(message.properties['timestamp'].strftime('%s'))print('Received RPC request published %.2f seconds ago' % duration)# Write out the message body to a temp file for facial detection processtemp_file = utils.write_temp_file(message.body,message.properties['content_type'])# Detect facesresult_file = detect.faces(temp_file)# Build response properties including the timestamp from the first publishproperties = {'app_id': 'Chapter 6 Listing 2 Consumer','content_type': message.properties['content_type'],'correlation_id': message.properties['correlation_id'],'headers': {'first_publish': message.properties['timestamp']}}# The result file could just be the original image if nothing detectedbody = utils.read_image(result_file)# Remove the temp fileos.unlink(temp_file)# Remove the result fileos.unlink(result_file)# Publish the response responseresponse = rabbitpy.Message(channel, body, properties, opinionated=True)response.publish('rpc-replies', message.properties['reply_to'])# Acknowledge the delivery of the RPC request messagemessage.ack()
交换器间绑定,使用RPC方法Exchange.Bind。

示例代码:
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:tpc = rabbitpy.Exchange(channel, 'events',exchange_type='topic')tpc.declare()xch = rabbitpy.Exchange(channel, 'distributed-events',exchange_type='x-consistent-hash')xch.declare()xch.bind(foo, '#')
用于消息队列的负载均衡,可以提升吞吐量
示例代码:采用路由键的哈希值来分发消息
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:exchange = rabbitpy.Exchange(channel, 'image-storage',exchange_type='x-consistent-hash')exchange.declare()
示例代码:header中的属性值作为哈希值
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:exchange = rabbitpy.Exchange(channel, 'image-storage',exchange_type='x-consistent-hash',arguments={'hash-header': 'image-hash'})exchange.declare()
示例代码:队列的创建与绑定
import rabbitpywith rabbitpy.Connection() as connection:with connection.channel() as channel:for queue_num in range(4):queue = rabbitpy.Queue (channel, 'server%s' % queue_num)queue.declare()queue.bind('image-storage', '10')