一、指定队列
 Celery非常容易设置和运行,通常它会使用默认的名为celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)用来存放任务。可以使用优先级不同来确保高优先级的任务不需要等待就得到响应。
 基于Celery系列一的project代码,创建一个projectq目录,并修改celeryconfig.py,修改后如下:
 # -*- coding:utf-8 -*- BROKER_URL = 'amqp://guest:guest@localhost:5672//'  # 使用RabbitMQ作为消息代理 CELERY_TASK_PROTOCOL = 1  # 现在celery升级到了4.0,是老版本的librabbitmq与最新的celery4.0 Message Protocol协议不兼容,celery4.0默认使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把结果存在Redis CELERY_TASK_SERIALIZER = 'msgpack'  # 任务序列化肯反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json'   # 读取任务结果一般性能要求不高,所以使用可读性更好的json CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的内容类型  # 新加入的配置参数 from kombu import Queue  # 定义任务队列 CELERY_QUEUES = (     QUEUE('default', routing_key='task.#'),  # 路由键以‘task.’开头的都进default队列     QUEUE('web_tasks', routing_key='web.#'),  # 路由键以‘web.’开头的都进web_tasks队列 )  CELERY_DEFAULT_EXCHANGE = 'tasks'  # 默认的交换机名字为tasks  CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'  # 默认的交换类型是topic  CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列   CELERY_ROUTES = {   # tasks.add的消息会进入web_tasks队列     'projectq.tasks.add': {         'queue': 'web_tasks',         'routing_key': 'web.add',     } }   
 用指定队列的方式启动消费者进程:
 yangyang@yangyang-VirtualBox:~$ celery -A projectq -Q web_tasks  worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 10:25:16 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         project:0x7f6e064e1110 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> web_tasks        exchange=web_tasks(direct) key=web_tasks                   [tasks]   . project.tasks.add  [2017-11-21 10:25:16,432: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 10:25:16,463: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 10:25:17,649: INFO/MainProcess] mingle: all alone [2017-11-21 10:25:18,006: INFO/MainProcess] celery@yangyang-VirtualBox ready.   
 上述worker只会执行web_tasks中的任务
 二、任务调度
 上述例子都是由发布者触发的,可以使用Celery的Beat进程自动生成任务。基于系列一project目录下的源码,创建projectb目录,修改projectb/celeryconfig.py添加如下配置:
  from datetime import timedelta  # CELERYBEAT_SCHEDULE中指定了tasks.add这个任务每10秒跑一次,执行的时候参数是16和16 CELERYBEAT_SCHEDULE = {     'add':{         'task': 'projectb.tasks.add',         'schedule': timedelta(seconds=10),         'args': (16, 16)     } }  
 启动Beat程序:
 yangyang@yangyang-VirtualBox:~$ celery beat -A projectb celery beat v4.1.0 (latentcall) is starting. __    -    ... __   -        _ LocalTime -> 2017-11-21 11:23:08 Configuration ->     . broker -> amqp://guest:**@localhost:5672//     . loader -> celery.loaders.app.AppLoader     . scheduler -> celery.beat.PersistentScheduler     . db -> celerybeat-schedule     . logfile -> [stderr]@%WARNING     . maxinterval -> 5.00 minutes (300s)   
 启动worker进程
 yangyang@yangyang-VirtualBox:~$ celery -A projectb worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 11:23:33 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         projectb:0x7f7259529190 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> celery           exchange=celery(direct) key=celery                   [tasks]   . projectb.tasks.add  # 可以看到每10秒会自动执行一次tasks.add [2017-11-21 11:23:33,327: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 11:23:33,355: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 11:23:34,498: INFO/MainProcess] mingle: all alone [2017-11-21 11:23:34,553: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 11:23:36,527: INFO/MainProcess] Received task: projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb]   [2017-11-21 11:23:36,547: INFO/ForkPoolWorker-2] Task projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb] succeeded in 0.0164396990003s: 32 [2017-11-21 11:23:39,057: INFO/MainProcess] Received task: projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2]   [2017-11-21 11:23:39,092: INFO/ForkPoolWorker-2] Task projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2] succeeded in 0.0325541210004s: 32 [2017-11-21 11:23:49,052: INFO/MainProcess] Received task: projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135]   [2017-11-21 11:23:49,057: INFO/ForkPoolWorker-2] Task projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135] succeeded in 0.00214087700078s: 32  
 Beat 和 Worker 进程可以一并启动,但是Celery会有多个进程
  celery -B -A -projectb worker -l info
 
 三、任务绑定、记录日志和重试
 任务绑定、记录日志、和重试是Celery常用的3个高级属性,现在修改project/tasks.py文件,添加div函数用于演示:
 from celery.utils.log import get_task_logger  logger = get_task_logger(__name__)  @app.task(bind=True) def div(self, x, y):     logger.info(('Executing task id {0.id}, args: {0.args!r}'                  'kwargs: {0.kwargs!r}').format(self.request))     try:         result = x / y     except ZeroDivisionError as e:         raise self.retry(exc=e, countdown=5, max_retries=3)     return result  
 当使用bind=True后,函数的参数发生变化,多出了参数self,相当于把div变成了一个已绑定的方法,通过self可以获得任务的上下文。
 启动project项目:
 yangyang@yangyang-VirtualBox:~$ celery -A project worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 13:59:09 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         project:0x7f04cb5d2190 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> celery           exchange=celery(direct) key=celery                   [tasks]   . project.tasks.add   . project.tasks.div  [2017-11-21 13:59:11,692: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 13:59:11,730: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 13:59:13,353: INFO/MainProcess] mingle: all alone [2017-11-21 13:59:13,476: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 14:00:18,978: INFO/MainProcess] Received task: project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df]   [2017-11-21 14:00:18,982: INFO/ForkPoolWorker-2] project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df]: Executing task id 0d2502ba-cf48-4973-98de-09e73050d4df, args: [2, 1]kwargs: {} [2017-11-21 14:00:19,053: INFO/ForkPoolWorker-2] Task project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df] succeeded in 0.0714741849988s: 2 [2017-11-21 14:01:17,100: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]   [2017-11-21 14:01:17,103: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:17,346: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:22.110529+00:00]  [2017-11-21 14:01:17,544: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:23,402: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:23,415: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:28.404381+00:00]  [2017-11-21 14:01:23,420: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:28,490: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:28,500: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:33.494378+00:00]  [2017-11-21 14:01:28,501: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:33,655: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:33,660: ERROR/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] raised unexpected: ZeroDivisionError('integer division or modulo by zero',) Traceback (most recent call last):   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 374, in trace_task     R = retval = fun(*args, **kwargs)   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 629, in __protected_call__     return self.run(*args, **kwargs)   File "/home/yangyang/project/tasks.py", line 24, in div     raise  self.retry(exc=e, countdown=5, max_retries=3)   File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 669, in retry     raise_with_context(exc)   File "/home/yangyang/project/tasks.py", line 22, in div     result = x / y ZeroDivisionError: integer division or modulo by zero  
 在IPython中调用div:
 In [1]: from project.tasks import div  In [2]: r = div.delay(2,1)  # 执行能造成异常的参数,发现每五秒就会重试一次,默认重复三次,然后抛出异常 In [3]: r = div.delay(2,0)