[关闭]
@delight 2015-01-10T14:27:15.000000Z 字数 4065 阅读 3188

python celery组件使用

python celery redis


Prepare

install:

  1. pip install celery

选择broker,安装,这里假设使用Redis:

  1. apt-get install redis-server

Configure

首先认真阅读官方celery文档的get start部分,如果有时间的话,最好全部看一边…

然后参考阅读别人的best practices,基本就可以干活了。

几个要点

  1. task相关的文件,最好都是用绝对导入;否则,应该在task function上面指定name;
  2. 如果需要root权限执行,需要在相关文件中加入platforms.C_FORCE_ROOT=True,但是最好别用root;
  3. 可以根据需要消除pickle的警告,设置CELERY_ACCEPT_CONTENT=['pickle',]
  4. 默认不发心跳,需要加上BROKER_HEARTBEAT=10,来消除心跳相关警告;
  5. 5.

Router

router是不支持通配符的,如果需要,可以自己写一个自定义Router类。下面是一个celery.py的例子:

  1. from __future__ import absolute_import
  2. from celery import Celery, platforms
  3. from settings import CELERY_BROKER
  4. from kombu import Queue, Exchange
  5. class MyRouter(object):
  6. '''router for tasks using wildcard'''
  7. def route_for_task(self, task, *args, **kwargs):
  8. if task.startswith('writer'):
  9. return {'queue': 'async_writer', 'routing_key': 'async_writer'}
  10. elif task.startswith('caller'):
  11. return {'queue': 'async_caller', 'routing_key': 'async_caller'}
  12. else:
  13. return {'queue': 'default', 'routing_key': 'default'}
  14. QUEUES = (
  15. Queue('default', Exchange('default'), routing_key='default'),
  16. Queue('async_writer', Exchange('async_writer'),
  17. routing_key='async_writer'),
  18. Queue('async_caller', Exchange('async_caller'),
  19. routing_key='async_caller'),
  20. )
  21. platforms.C_FORCE_ROOT = True
  22. app = Celery('async',
  23. broker=CELERY_BROKER,
  24. include=['async.writer', 'async.caller', 'async.checker', ])
  25. app.conf.update(CELERY_ACCEPT_CONTENT=['pickle', ],
  26. CELERY_IGNORE_RESULT=True,
  27. CELERY_DISABLE_RATE_LIMITS=True,
  28. CELERY_DEFAULT_EXCHANGE='default',
  29. CELERY_DEFAULT_QUEUE='default',
  30. CELERY_DEFAULT_ROUTING_KEY='default',
  31. CELERY_DEFAULT_EXCHANGE_TYPE='topic',
  32. CELERY_TASK_SERIALIZER='pickle',
  33. CELERY_RESULT_SERIALIZER='pickle',
  34. BROKER_HEARTBEAT=10,
  35. CELERY_QUEUES=QUEUES,
  36. CELERY_ROUTES=(MyRouter(),),
  37. )
  38. if __name__ == "__main__":
  39. app.start()

Start

官方给出的init.d脚本不是很好用,下面是一个自己写的参考:

  1. #!/bin/bash
  2. #
  3. # PushserverCore uWSGI Web Server init script
  4. #
  5. ### BEGIN INIT INFO
  6. # Provides: PushserverCore
  7. # Required-Start: $remote_fs $remote_fs $network $syslog
  8. # Required-Stop: $remote_fs $remote_fs $network $syslog
  9. # Default-Start: 2 3 4 5
  10. # Default-Stop: 0 1 6
  11. # Short-Description: Start PushserverCore Service for generic init daemon
  12. # Description: PushserverCore Service thrift Server backend.
  13. ### END INIT INFO
  14. NAME="Core Thrift Server"
  15. PROJECT=PushserverCore
  16. PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/var/app/enabled/$PROJECT
  17. DESC="PushserverCore"
  18. APP_DIR=/var/app/enabled/$PROJECT/Core
  19. APP_PATH=$APP_DIR/CoreServer.py
  20. CELERY_LOG_PATH=/var/app/log/PushserverCore/celery.log
  21. print_succ()
  22. {
  23. echo "$(tput setaf 2)$(tput bold)DONE$(tput sgr0)"
  24. }
  25. print_fail()
  26. {
  27. echo "$(tput setaf 1)$(tput bold)FAILED$(tput sgr0)"
  28. }
  29. stop_service()
  30. {
  31. echo "stoping $NAME..."
  32. if pgrep -f $APP_PATH > /dev/null 2>&1; then
  33. pkill -f $APP_PATH
  34. fi
  35. print_succ
  36. }
  37. start_service()
  38. {
  39. if pgrep -f $APP_PATH > /dev/null 2>&1; then
  40. echo "$NAME service is already running."
  41. return
  42. else
  43. echo "starting $NAME service..."
  44. nohup python $APP_PATH >/dev/null 2>&1 &
  45. fi
  46. sleep 3
  47. if pgrep -f $APP_PATH > /dev/null 2>&1; then
  48. print_succ
  49. else
  50. print_fail
  51. fi
  52. }
  53. stop_worker()
  54. {
  55. echo "stoping celery worker..."
  56. if pgrep -f celery > /dev/null 2>&1;then
  57. pkill -f celery
  58. fi
  59. print_succ
  60. }
  61. start_worker()
  62. {
  63. if pgrep -f celery > /dev/null 2>&1; then
  64. echo "celery is already running"
  65. return
  66. else
  67. echo "starting celery worker..."
  68. celery -A async multi start writer caller default -Q:writer async_writer -Q:caller async_caller -Q:default default \
  69. -c 7 -l INFO --workdir=$APP_DIR --logfile=$CELERY_LOG_PATH
  70. fi
  71. sleep 3
  72. if pgrep -f celery > /dev/null 2>&1; then
  73. print_succ
  74. else
  75. print_fail
  76. fi
  77. }
  78. check_status()
  79. {
  80. if pgrep -f $APP_PATH > /dev/null 2>&1; then
  81. echo "$NAME is running"
  82. else
  83. echo "$NAME is not running"
  84. fi
  85. if pgrep -f celery > /dev/null 2>&1; then
  86. echo "celery worker is running"
  87. else
  88. echo "celery worker is not running"
  89. fi
  90. }
  91. set -e
  92. . /lib/lsb/init-functions
  93. case "$1" in
  94. start)
  95. echo "Starting $DESC..."
  96. start_service
  97. start_worker
  98. ;;
  99. stop)
  100. echo "Stopping $DESC..."
  101. stop_service
  102. stop_worker
  103. ;;
  104. restart)
  105. echo "Restarting $DESC..."
  106. stop_service
  107. stop_worker
  108. sleep 3
  109. start_service
  110. start_worker
  111. echo "Checking..."
  112. check_status
  113. ;;
  114. status)
  115. check_status
  116. ;;
  117. *)
  118. echo "Usage: $NAME {start|stop|restart|status}" >&2
  119. exit 1
  120. ;;
  121. esac
  122. exit 0

重点需要关注的是celery multi start的用法,注意start后面跟的是worker的名字(取数据的worker),也可以简单的写3,然后-Q:worker_name queue_name,最后-c是实际的worker(干活的worker)的数目,-Q是给队列指定worker。例子中的语句,意思是启动3个worker,分别命名为writer, caller和default,然后启动3个队列,名字分别是async_writer, async_caller和default,每个worker分配7个进程用来干活。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注