[关闭]
@Scrazy 2017-03-26T06:33:19.000000Z 字数 6656 阅读 3871

Celery 的简单使用

python celery


代码在这里

Celery 是一个简单、灵活并且可靠的处理大量消息的分发系统。并且是自带电池的,本身提供了维护和操作这个系统的工具。

Celery 专注于实时处理的任务队列,并且支持任务调度。
优点:
1. 简单
2. 高可用
3. 快速
4. 灵活

Celery 架构

Celery 序列化

一个简单的简单例子

项目目录为

  1. celeries/proj/
  2. ├── celeryconfig.py
  3. ├── celery.py
  4. ├── __init__.py
  5. └── tasks.py

主程序 celery.py

  1. from __future__ import absolute_import
  2. from celery import Celery
  3. app = Celery('proj', include=['proj.tasks'],
  4. app.config_from_object('proj.celeryconfig')
  5. if __name__ == "main":
  6. app.start()

任务函数 tasks.py

  1. # coding=utf-8
  2. from __future__ import absolute_import
  3. from .celery import app
  4. @app.task
  5. def add(x, y):
  6. return x + y
  7. @app.task
  8. def mul(x, y):
  9. return x * y

接下来是 配置文件 celeryconfig.py

  1. # coding=utf-8
  2. BROKER_URL = 'amqp://localhost' # RabbitMQ 作为消息代理
  3. CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Redis 作为结果存储
  4. CELERY_TASK_SERIALIZER = 'msgpack'
  5. # 任务序列化和反序列化格式为 msgpack, 别忘了安装 msgpack-python
  6. CELERY_RESULT_SERIALIZER = 'json' # 结果存储序列化格式为 json
  7. CELERY_ACCEPT_CONTENT = ['msgpack', 'json'] # 任务接受格式类型

因为没有任务调度,所以直接启动消费者就行了。在启动之前,要先去安装 RabbitMQ 和 Redis, 并启动。

现在启动我们的消费者函数, 命令行直接启动:

> cd celeries
> celery -A celeries worker -l info

看到下面的提示信息,表示成功启动

  1. -------------- celery@mouse-pc v4.0.2 (latentcall)
  2. ---- **** -----
  3. --- * *** * -- Linux-4.9.15-1-MANJARO-x86_64-with-glibc2.2.5 2017-03-22 21:53:05
  4. -- * - **** ---
  5. - ** ---------- [config]
  6. - ** ---------- .> app: celeries:0x7f9737da7a58
  7. - ** ---------- .> transport: amqp://guest:**@localhost:5672//
  8. - ** ---------- .> results: redis://localhost/
  9. - *** --- * --- .> concurrency: 2 (prefork)
  10. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  11. --- ***** -----
  12. -------------- [queues]
  13. .> celery exchange=celery(direct) key=celery
  14. [tasks]
  15. . celeries.tasks.add
  16. . celeries.tasks.mul
  17. . celeries.tasks.xsum
  18. [2017-03-22 21:53:06,011: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
  19. [2017-03-22 21:53:06,034: INFO/MainProcess] mingle: searching for neighbors
  20. [2017-03-22 21:53:07,088: INFO/MainProcess] mingle: all alone
  21. [2017-03-22 21:53:07,115: INFO/MainProcess] celery@mouse-pc ready.

打开 IPython 测试一下我们的几个函数。

  1. ~ ▶︎︎ ipython
  2. Python 3.6.0 |Anaconda 4.3.1 (64-bit)| (default, Dec 23 2016, 12:22:00)
  3. Type "copyright", "credits" or "license" for more information.
  4. In [1]: from celeries.tasks import add, mul, xsum
  5. In [2]: add.delay(1, 9)
  6. Out[2]: <AsyncResult: 38022eec-2d3d-4ee0-8c7e-367ef92b5f1f>
  7. In [3]: r = mul.delay(2, 4)
  8. In [4]: r.status
  9. Out[4]: 'SUCCESS'
  10. In [5]: r.result
  11. Out[5]: 8
  12. In [6]: r.successful
  13. Out[6]: <bound method AsyncResult.successful of <AsyncResult: 17af4e48-736d-44c9-a8be-a50a35bbc435>>
  14. In [7]: r.backend
  15. Out[7]: <celery.backends.redis.RedisBackend at 0x7f5aebbbcba8> # 结果存储在 redis 里

delay() 是 apply_async() 的快捷方式。你也直接调用 apply_async() :

  1. In [24]: r = mul.apply_async((2, 4))
  2. In [25]: r.result
  3. Out[25]: 8

delay() & apply_async 返回的都是 AsyncResult 实例,可用于查看任务的执行状态,但首先你要配置好 result backend.
此时,在worker终端上可以看到,任务信息和结果

  1. [2017-03-22 22:05:13,689: INFO/MainProcess] Received task: celeries.tasks.add[38022eec-2d3d-4ee0-8c7e-367ef92b5f1f]
  2. [2017-03-22 22:05:14,765: INFO/PoolWorker-2] Task celeries.tasks.add[38022eec-2d3d-4ee0-8c7e-367ef92b5f1f] succeeded in 0.007736653999018017s: 10
  3. [2017-03-22 22:08:36,378: INFO/MainProcess] Received task: celeries.tasks.mul[17af4e48-736d-44c9-a8be-a50a35bbc435]
  4. [2017-03-22 22:08:37,010: INFO/PoolWorker-2] Task celeries.tasks.mul[17af4e48-736d-44c9-a8be-a50a35bbc435] succeeded in 0.011531784999533556s: 8

仔细看,每个任务都有一个 task_id。我们可以通过 task_id 获得任务的结果。

取 add 任务的 id:

  1. task_id = '38022eec-2d3d-4ee0-8c7e-367ef92b5f1f'
  2. In [8]: task_id = '38022eec-2d3d-4ee0-8c7e-367ef92b5f1f'
  3. In [9]: add.AsyncResult(task_id).get()
  4. Out[9]: 10

关联任务

In [2]: m = mul.apply_async((2, 2), link=mul.s(3))

在 Worker 终端里会看到两个值,关联之前和之后的。

  1. [2017-03-23 13:27:13,045: INFO/MainProcess] Received task: proj.tasks.mul[40492357-44bb-41e4-979f-6eb197107a5b]
  2. [2017-03-23 13:27:13,731: INFO/PoolWorker-2] Task proj.tasks.mul[40492357-44bb-41e4-979f-6eb197107a5b] succeeded in 0.0023383530005958164s: 4
  3. [2017-03-23 13:27:13,732: INFO/MainProcess] Received task: proj.tasks.mul[b01be1b8-f957-48b2-9d72-8187af6ac161]
  4. [2017-03-23 13:27:13,734: INFO/PoolWorker-2] Task proj.tasks.mul[b01be1b8-f957-48b2-9d72-8187af6ac161] succeeded in 0.0006868359996587969s: 12

指定队列

在 celeries 目录下新建一个目录 projb, 代码使用 proj 中的。

  1. celeries/projb
  2. ├── celeryconfig.py
  3. ├── celery.py
  4. ├── __init__.py
  5. └── tasks.py

在 celeryconfig.py 添加些配置:

  1. # coding=utf-8
  2. from kombu import Queue
  3. BROKER_URL = 'amqp://localhost' # RabbitMQ 作为消息代理
  4. CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Redis 作为结果存储
  5. CELERY_TASK_SERIALIZER = 'msgpack'
  6. # 任务序列化和反序列化格式为 msgpack, 别忘了安装 msgpack-python
  7. CELERY_RESULT_SERIALIZER = 'json' # 结果存储序列化格式为 json
  8. CELERY_ACCEPT_CONTENT = ['msgpack', 'json'] # 任务接受格式类型
  9. CELERY_QUEUES = {
  10. Queue('foo', routing_key='task.#'), # 路由键以 task. 开头的消息进入此队列
  11. Queue('feed_task', routing_key='*.feed'), # 路由键以 .feed 结尾的消息进入此队列
  12. }
  13. CELERY_DEFAULT_QUEUE = 'foo' # 默认队列
  14. CELERY_DEFAULT_EXCHANGE = 'tasks' # 默认交换机
  15. CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认交换机类型 topic
  16. CELERY_DEFAULT_ROUTING_KEY = 'task.foooooooo' # 默认交换机路由键, task. 后的值不影响
  17. CELERY_ROUTES = {
  18. 'projb.tasks.mul': {
  19. 'queue': 'feed_task', # 消息全都进入 feed_task 队列
  20. 'routing_key': 'mul.feed',
  21. },
  22. }

然后,我们以指定队列的方式启动:

>  celery -A projb worker -Q foo,feed_task -l info

tasks.py 中的 mul 函数只会通过队列 feed_task 被执行。add 函数通过默认队列 foo 执行。
```python
In [84]: from projb.tasks import mul, add

In [85]: r = add.delay(3, 3)

In [86]: r.result
Out[86]: 6

In [87]: res = mul.delay(3, 3)

In [88]: res.result
Out[88]: 9

  1. 不过,我们可以使用 apply_async() 函数来指定队列。
  2. ```python
  3. In [90]: r = add.apply_async((3, 3), queue='feed_task', routing_key='mul.feed')
  4. In [91]: r.result
  5. Out[91]: 6
  6. In [92]: res = mul.apply_async((3, 3), queue='foo', routing_key='task.foooooo')
  7. In [93]: res.result
  8. Out[93]: 9
  9. <div class="md-section-divider"></div>

任务调度

依法炮制,基于 projb 的代码,创建目录 projc,在 proc/celeryconfig.py 中添加如下配置。

  1. CELERYBEAT_SCHEDULE = {
  2. 'mul-every-30-seconds': {
  3. 'task': 'projc.tasks.mul',
  4. 'schedule': 30.0,
  5. 'args': (2, 2),
  6. }
  7. }
  8. <div class="md-section-divider"></div>

执行

> celery -B -A projc worker -l info

就可以在终端看到每 30s 执行一次任务。

  1. [2017-03-23 12:23:13,920: INFO/Beat] Scheduler: Sending due task mul-every-30-seconds (projc.tasks.mul)
  2. [2017-03-23 12:23:13,923: INFO/MainProcess] Received task: projc.tasks.mul[9c414257-d627-4c36-a9d8-9daed7e295c0]
  3. [2017-03-23 12:23:15,177: INFO/PoolWorker-3] Task projc.tasks.mul[9c414257-d627-4c36-a9d8-9daed7e295c0] succeeded in 0.0010301589991286164s: 4
  4. <div class="md-section-divider"></div>

任务绑定、日志记录和错误重试

任务绑定、记录日志和重试是 Celery 3 个常用的高级功能。接下来,修改 proj 的 tasks.py 文件。添加一个 div 函数。

  1. @app.task(bind=True)
  2. def div(self, x, y):
  3. logger.info(
  4. '''
  5. Executing task : {0.id}
  6. task.args : {0.args!r}
  7. task.kwargs : {0.kwargs!r}
  8. '''.format(self.request)
  9. )
  10. try:
  11. res = x / y
  12. except ZeroDivisionError as e:
  13. raise self.retry(exc=e, countdown=3, max_retries=3)
  14. else:
  15. return res
  16. <div class="md-section-divider"></div>

在 Ipython 调用:

In [3]: d = div.delay(2, 1)

在 worker 中可以看到

  1. [2017-03-23 14:57:17,361: INFO/PoolWorker-2] proj.tasks.div[68ef1584-16ac-4236-9858-b00842891bbc]:
  2. Executing task : 68ef1584-16ac-4236-9858-b00842891bbc
  3. task.args : [2, 1]
  4. task.kwargs : {}
  5. [2017-03-23 14:57:17,369: INFO/PoolWorker-2] Task proj.tasks.div[68ef1584-16ac-4236-9858-b00842891bbc] succeeded in 0.007741746998362942s: 2.0

换成可以引起异常的参数:

In [4]: d = div.delay(2, 0)

可以看到,在 worker 中每 3s 重试一次,总共重复三次(执行了 4 次),然后抛出异常!

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注