@Scrazy
2017-03-26T04:00:24.000000Z
字数 15625
阅读 1705
python
rabbitmq
RabbitMQ is a message broker. The principal idea is pretty simple: it accepts and forwards messages. You can think about it as a post office: when you send mail to the post box you're pretty sure that Mr. Postman will eventually deliver the mail to your recipient. Using this metaphor RabbitMQ is a post box, a post office and a postman.
简单的请求/回复 (Request/Reply)
使用 direct 交换机实现
广播 (Broadcast)
没有特定目标的接收者,使用 fanout 交换机实现
发布订阅 (Publish/Subscribe)
生产者发布消息,消费订阅其感兴趣的内容。如果没有消费者订阅,消息将被丢弃。
这个 Hello World 的流程一目了然:
首先创建一个文件夹
> mkdir rabbitmq
接着是我们生产者 P 的代码 send.py
# coding=utf-8
import sys
import pika # 纯 python 实现 AMQP 0-9-1 协议的客户端。pip install pika
# 建立与 RabbitMQ 服务的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello') # 声明名为 hello 的 queue
if len(sys.argv) != 1:
body = sys.argv[1]
else:
body = "Hello World!"
# 发布消息
# exchange 留空消息直接发到 queue,不推荐这样使用,后面会讲到如何添加交换机
channel.basic_publish(exchange='',
routing_key='hello', # queue 需要指定路由键
body=body)
print("[x] Sent {}.".format(body))
connection.close() # 记得关闭连接
接下来,看看消费者的代码 receive.py
# coding=utf-8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"))
channel = connection.channel()
# 消费者中同样声明 queue,如果没有则创建。
# 免受消费者和生产者谁先启动的影响
channel.queue_declare(queue='hello')
# 回调函数用于处理收到的消息
def callback(ch, method, properties, body):
print("[x] Receive {}".format(body))
channel.basic_consume(callback,
queue='hello',
no_ack=True) # 显示声明无消息确认回执,下文再述
print('[*] Waitting for messages. To exit press Ctrl+C')
try:
channel.start_consuming() # 开始消费
except KeyboardInterrupt:
channel.stop_consumimg() # 结束消费
好啦,测试下我们的代码, 首先,要启动 RabbitMQ
> sudo rabbitmq-server
在一个终端中执行 send.py
~/rabbitmq ▶ ︎python send.py
~/rabbitmq ▶︎︎ python send.py 'Hello, lambda'
[x] Sent Hello, lambda.
~/rabbitmq ▶︎︎ python send.py
[x] Sent Hello World!.
另一个终端执行 receive.py
~/rabbitmq ▶︎︎ python receive.py
[*] Waitting for messages. To exit press Ctrl+C
[x] Receive b'Hello, lambda'
[x] Receive b'Hello World!'
按下 Ctrl+C 结束此程序。
刚才我们演示了一个简单的 hello world。接下来我们要创建一个 Work Queue,它会分发耗时操作给多个消费者。其背后的思想是:避免立即执行耗时操作,而是将耗时操作添加到 queue 中,让后台的消费者执行 queue 中的任务。
下面,我们就用 time.sleep() 函数模拟一下耗时操作,通过命令行解析 '.' 的个数表示 sleep 的秒数。先修改下 send.py 函数,让其可以解析任意的命令行参数,新建一个文件 new_task.py
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))
print("[x] Sent %r." % message)
同时,也要修改下 receive.py 函数, 在其基础上新建一个 worker.py
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print("[x] Done")
使用任务队列的好处之一是可以很容易的并行化。 一般情况下 RabbitMQ 会把任务平均的分配给每个 worker 。待会我们会测试一下。
信息确认可以让我们清楚的知道消费者是否成功完成任务,如果一个 worker 进程突然挂掉,RabbitMQ 会把此任务发给其他的 worker (如果有的话)。消息确认默认是打开的,先前的例子中,被显式的关闭了。既然这样,那就修改下
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')
不过,有一点要注意的地方就是 **被遗忘的确认**i。miss bask_ack 是一个普遍的错误,当错误发生时,RabbitMQ 会把再次发送给已退出的客户端,而且它会占用越来越多的内存,因为它不能释放未未确认的信息。我们可以使用 rabbitmqctl 打印出 messages_unacknowledged 字段来进行调试。
~/rabbitmq ▶︎︎ sudo rabbitmqctl list_queues messages_ready messages_unacknowledged
[sudo] mouse 的密码:
Listing queues ...
0 0
0 0
0 0
1 0
0 0
0 0
0 0
虽然,我们知道了如何客户端挂掉后任务的正常处理,但是,如果 RabbitMQ 挂了肿么办啊!!! 肿么办。。。
对,没错,就是 持久化(durable),由于 RabbitMQ 不允许对已存在的 queue 设置别的参数,故重新声明一个 queue。
channel.queue_declare(queue='task_queue', durable=True)
声明了任务队列(task_queue)的持久化的同时,也要让我们的消息(messages)持久化。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent 消息持久化
))
平均分配说的不是任务数量平均分配,而是每个 worker 工作的时间大致相同。对于两个 worker A 和 B,如果 A 执行的任务是复杂的耗时任务,而 B 几乎不怎么工作,那么 RabbitMQ 优先把 queue 中的任务分给 B。
basic_qos() 方法可以完成这个任务。添加如下代码即可。
channel.basic_qos(prefetch_count=1)
合在一起,就是下面这个样子啦。
new_task.py
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# durable 持久化,即便RabbitMQ挂了也不会丢失信息
channel.queue_declare(queue='task_queue', durable=True)
messages = ''.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=messages,
properties=pika.BasicProperties(
delivery_mode=2)) # 消息持久化
print('[x] Send {}'.format(messages))
connection.close()
worker.py
# coding=utf-8
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print("[*] Waitting for messages. To exit press Ctrl+C")
def callback(ch, method, properties, body):
print("[x] Received {}".format(body))
time.sleep(body.count(b'.')) # 模拟耗时操作
print("[x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 负载均衡 fair dispatch
channel.basic_consume(callback, queue='task_queue')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
那么,在终端里运行我们的脚本。这次,使用以下方式测试:
先前的例子,我们都是把每个任务分配给一个特定 worker,这次我们是把 messages 同时发送给多个 worker。这种模式就叫 发布/订阅 。
先前我们那种直接把 messages 发送给 queue 的方式是不完整。下面我们就使用交换机来实现一个简易的日志系统。RabbitMQ 有四种交换机:
~/rabbitmq ▶︎︎ sudo rabbitmqctl list_exchanges
[sudo] mouse 的密码:
Listing exchanges ...
amq.topic topic
amq.fanout fanout
amq.headers headers
amq.direct direct
fanout:将消息路由到绑定到它身上的所有 queue 并不理会绑定的 路由键(routing_key)。它可以对单条信息做不同的处理,只需要改变消费者的代码就行,尤其当业务改变时,这点十分有用。可以有效的解耦发布者和消费者。
direct:根据消息携带的路由键将消息传送给特定的 queue。将一个 queue 绑定到某个交换机的同时添加一个路由键(K),当一个携带此路由键的 messages 发送给此交换机时,交换机会把它路由给同样绑定了此路由键(K)的 queue。direct 交换机用来处理消息的单播路由。
topic:通过消息携带的路由键和 queue 到交换机的绑定模式之间的匹配,将一个消息传递给一个或多个 queue。topic 交换机通常用来实现消息的多播路由。
headers:允许匹配 AMQP 的头而非路由键,使用起来和直接交换机类似,但是性能差很多,一般用不到。
下面就使用一个 fanout 交换机的例子来实现这个简易的日志系统。
首先,声明一个 fanout 类型的 queue
channel.exchange_declare(exchange='logs', type='fanout')
当 RabbitMQ 需要连接一个新建的空队列时,就可以创建临时队列。不用指定队列名,RabbitMQ 会自动创建一个随机的名字。就可以像下面这样创建队列:
result = channel.queue_declare()
不过,当断开连接时,应该删除此队列,RabbitMQ 提供了 exclusive 参数来实现:
result = channel.queue_declare(exclusive=True)
也就是把队列绑定到交换机上
channel.queue_bind(exchange='logs',queue=result.method.queue)
这样,交换机 logs 就会把 messages 添加到队列中。
然后,我们是小脚本就出来了:
生产者 emit_log.py
# coding: utf-8
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
messages = ''.join(sys.argv[1:]) or 'info: Hello World!'
channel.basic_publish(exchange='logs',
routing_key='',
body=messages)
print("[x] Send {}".format(messages))
connection.close()
对于,emit_logs.py 我们并没有创建队列,如果消费者没有运行,我们就把交换机上的 messages 丢弃。因为消费者要的是新的日志信息。
消费者 receive_logs.py
# coding: utf-8
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
# disconnect consumer the queue is down
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print("[*] Waitting for logs. To exit press Ctrl+C")
def callback(ch, method, properties, body):
print("[x] {}".format(body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
运行程序:
保存日志文件到 receive_log_rabbit.log
~/rabbitmq ▶︎︎ python receive_logs.py > receive_logs_rabbit.log
启动消费者:
~/rabbitmq ▶︎︎ python emit_logs.py
还想查看日志输出,再启动一个终端:
~/rabbitma ▶︎︎ python receive_logs.py
看下两个消费者创建的绑定和队列。
~/rabbitmq ▶︎︎ sudo rabbitmqctl list_bindings
[sudo] mouse 的密码:
Listing bindings ...
logs exchange amq.gen-A5Xw2ADUsYGNGB3zvHNOAQ queue amq.gen-A5Xw2ADUsYGNGB3zvHNOAQ []
logs exchange amq.gen-KEkWqJyvQMCbgmA0QWibPA queue amq.gen-KEkWqJyvQMCbgmA0QWibPA []
上个例子中,我们订阅了所有的的日志。下面将展示一个仅仅订阅部分日志的例子。通过指定消费者和生产者函数的命令行参数,实现部分日志的订阅。上个例子使用的 fanout 交换机,这次改用 direct 交换机。direct 交换机工作方式如下图:
队列 Q1 绑定了名为 orange 的路由键。队列 Q2 绑定了名为 black 和 green 的路由键。只要消息含有对应的路由键就会路由到Q1或Q2,不符合的消息将被丢弃。
direct 交换机通过不同的 routing_key 和队列绑定,来实现消息分发到不同的队列。同时多个队列绑定相同的 routing_key 也是可以的,不过 direct 和 fanout 就没有区别了。
生产者 emit_logs_direct.py 代码:
# coding=utf-8
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct') # 交换机类型
severity = sys.argv[1] if len(sys.argv) > 2 else 'info' # 默认为 info
messages = ''.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange="direct_logs",
routing_key=severity,
body=messages)
print('[x] Send {}:{}'.format(severity, messages))
connection.close()
消费者 receive_logs_direct.py 代码:
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write(
"Usage: {} [info] [warning] [error]\n".format(sys.argv[0]))
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print("[*] Waitting for logs. To exit press Ctrl+C")
def callback(ch, method, properties, body):
print("[x] {}:{}".format(method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
假如,只要保存 error
的 日志消息,只需要在 shell 中按照如下输入:
~/rabbitmq ▶︎︎ python receive_logs_direct.py error > receive_logs_rabbit.log
我们,可以开启另一个 shell B 来查看正确与否:
~/rabbitmq ▶︎︎ python receive_logs_direct.py info warning error
在新的 shell 执行生产者:
~/rabbitmq ▶︎︎ python emit_logs_direct.py error it can be save
[x] Send error:itcanbesave
~/rabbitmq ▶︎︎ python emit_logs_direct.py info it can not be save
[x] Send info:itcannotbesave
~/rabbitmq ▶︎︎ python emit_logs_direct.py warning it can not be save too
[x] Send warning:itcannotbesavetoo
发送了三次日志信息,shell B 也打印出三条信息,但是 receive_logs_rabbit.log 却只有符合我们预期的一条日志。
~/rabbitmq ▶︎︎ python receive_logs_direct.py info warning error
[*] Waitting for logs. To exit press Ctrl+C
[x] error:b'itcanbesave'
[x] info:b'itcannotbesave'
[x] warning:b'itcannotbesavetoo'
~/rabbitmq ▶︎︎ cat receive_logs_rabbit.log
[*] Waitting for logs. To exit press Ctrl+C
[x] error:b'itcanbesave'
看一下我们的绑定:
~/rabbitmq ▶︎︎ sudo rabbitmqctl list_bindings
Listing bindings ...
direct_logs exchange amq.gen-ICTgTwm3xY3JGP5BKm2xOg queue error []
direct_logs exchange amq.gen-ICTgTwm3xY3JGP5BKm2xOg queue info []
direct_logs exchange amq.gen-ICTgTwm3xY3JGP5BKm2xOg queue warning []
Topic 交换机可以让路由键实现类似正则表达式的功能,依次来匹配不同的 messages,使用起来更加灵活。其工作方式如下
下面我们使用 topic 交换机来改写上面的例子。
生产者 emit_logs_topic.py
# coding=utf-8
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
messages = ''.join(sys.argv[2:]) or "Hello World!"
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=messages)
print("[x] Send {}:{}".format(routing_key, messages))
connection.close()
消费者 receive_logs_topic.py
# coding=utf-8
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: {} [binding_keys]...\n".format(sys.argv[0]))
sys.exit()
for binding_key in binding_keys:
channel.queue_bind(queue=queue_name,
exchange='topic_logs',
routing_key=binding_key)
print("[*] Waitting for logs. To exit press Ctrl+C")
def callback(ch, method, properties, body):
print("[x] {}:{}".format(method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
接收所有 logs:
python receive_logs_topic.py "#"
接收所有来自 kern
的 logs:
python receive_logs_topic.py "kern.*"
接收仅仅与 critical
有关的 logs:
python receive_logs_topic.py "*.critical"
创建多个绑定:
python receive_logs_topic.py "kern.*" "*.critical"
发送一个路由键是 kern.critical
类型的 log:
python emit_log_topic.py "kern.critical" "A critical kernel error"
Kombu:python的一个消息传递库。致力于为 Python 提供一个 AMQP 的高级接口。同时也是 Celery 自带的用来收发消息的库,并且提供了符合 Python 语言的编程习惯。
AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security, for which the RabbitMQ messaging server is the most popular implementation.
RabbitMQ 是 AMQP 最受欢迎的实现。
kombu 有许多优点:
支持灰常多的消息代理
payloads.支持对有效载荷数据的自动编码、序列化以及压缩
接下来,要用 kombu
来改下 topic
交换机的例子。
首先是生产者 kombu_emit_logs_topic.py
# coding=utf-8
import sys
from kombu import Connection, Producer, Queue, Exchange
logs_exchange = Exchange('logs', 'topic', durable=True)
URL = 'amqp://localhost'
kombu_learn = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
messages = ''.join(sys.argv[2:]) or "Hello World!"
with Connection(URL) as conn:
producer = Producer(conn)
producer.publish(messages, exchange=logs_exchange,
routing_key=kombu_learn,
serializer='json')
消费者 kombu_receive_logs_topic.py
# coding=utf-8
import sys
from kombu import Exchange, Queue, Connection, Consumer
from kombu.async import Hub
logs_exchange = Exchange(name='logs', type="topic", durable=True)
URL = 'amqp://localhost'
hub = Hub()
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: {} [binding_keys]...\n".format(sys.argv[0]))
sys.exit()
tasks_queues = [Queue(binding_key,
logs_exchange,
exclusive=True,
routing_key=binding_key)
for binding_key in binding_keys]
print("[*] Waitting for logs. To exit press Ctrl+C")
def on_messages(body, messages):
print("""
Body: {0}
Properties: {1}
DeliveryInfo: {2}
""".format(body, messages.properties, messages.delivery_info)
)
with Connection(URL) as conn:
conn.register_with_event_loop(hub)
with Consumer(conn, tasks_queues, callbacks=[on_messages]):
try:
hub.run_forever()
except KeyboardInterrupt:
exit()
我们也可以使用 using ConsumerMixin
将消费者改成如下样式:
# coding=utf-8
import sys
from kombu import Exchange, Queue, Connection, Consumer
from kombu.async import Hub
logs_exchange = Exchange(name='logs', type="topic", durable=True)
URL = 'amqp://localhost'
hub = Hub()
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: {} [binding_keys]...\n".format(sys.argv[0]))
sys.exit()
tasks_queues = [Queue(binding_key,
logs_exchange,
exclusive=True,
routing_key=binding_key)
for binding_key in binding_keys]
print("[*] Waitting for logs. To exit press Ctrl+C")
def on_messages(body, messages):
print("""
Body: {0}
Properties: {1}
DeliveryInfo: {2}
""".format(body, messages.properties, messages.delivery_info)
)
with Connection(URL) as conn:
conn.register_with_event_loop(hub)
with Consumer(conn, tasks_queues, callbacks=[on_messages]):
try:
hub.run_forever()
except KeyboardInterrupt:
exit()
或者使用 ConsumerMixin
改写消费者命名为 kombu_receive_logs_topic_2.py
:
# coding=utf-8
import sys
from kombu import Exchange, Queue, Connection
from kombu.mixins import ConsumerMixin
class Worker(ConsumerMixin):
logs_exchange = Exchange(name='logs', type="topic", durable=True)
def __init__(self, connection):
self.connection = connection
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: {} [binding_keys] ...\n'.format(sys.argv[0]))
def get_consumers(self, Consumer, channel):
return [Consumer([Queue(binding_key,
self.logs_exchange,
exclusive=True,
routing_key=binding_key)
for binding_key in self.binding_keys],
callbacks=[self.on_messages])]
def on_messages(self, body, messages):
print('Body: {}'.format(body))
URL = 'amqp://localhost'
with Connection(URL) as connection:
Worker(connection).run()
参考:
[1] RabbitMQ Tutorial
[2] Kombu 官方文档
[3] 董伟明. Python Web 开发实战. 第 1 版. 北京:电子工业出版社,2016