Celery系列二:Celery高级属性


声明:本文转载自https://my.oschina.net/zhangyangyang/blog/1576615,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

一、指定队列

Celery非常容易设置和运行,通常它会使用默认的名为celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)用来存放任务。可以使用优先级不同来确保高优先级的任务不需要等待就得到响应。

基于Celery系列一的project代码,创建一个projectq目录,并修改celeryconfig.py,修改后如下:

# -*- coding:utf-8 -*- BROKER_URL = 'amqp://guest:guest@localhost:5672//'  # 使用RabbitMQ作为消息代理 CELERY_TASK_PROTOCOL = 1  # 现在celery升级到了4.0,是老版本的librabbitmq与最新的celery4.0 Message Protocol协议不兼容,celery4.0默认使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把结果存在Redis CELERY_TASK_SERIALIZER = 'msgpack'  # 任务序列化肯反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json'   # 读取任务结果一般性能要求不高,所以使用可读性更好的json CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的内容类型  # 新加入的配置参数 from kombu import Queue  # 定义任务队列 CELERY_QUEUES = (     QUEUE('default', routing_key='task.#'),  # 路由键以‘task.’开头的都进default队列     QUEUE('web_tasks', routing_key='web.#'),  # 路由键以‘web.’开头的都进web_tasks队列 )  CELERY_DEFAULT_EXCHANGE = 'tasks'  # 默认的交换机名字为tasks  CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'  # 默认的交换类型是topic  CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列   CELERY_ROUTES = {   # tasks.add的消息会进入web_tasks队列     'projectq.tasks.add': {         'queue': 'web_tasks',         'routing_key': 'web.add',     } }   

用指定队列的方式启动消费者进程:

yangyang@yangyang-VirtualBox:~$ celery -A projectq -Q web_tasks  worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 10:25:16 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         project:0x7f6e064e1110 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> web_tasks        exchange=web_tasks(direct) key=web_tasks                   [tasks]   . project.tasks.add  [2017-11-21 10:25:16,432: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 10:25:16,463: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 10:25:17,649: INFO/MainProcess] mingle: all alone [2017-11-21 10:25:18,006: INFO/MainProcess] celery@yangyang-VirtualBox ready.   

上述worker只会执行web_tasks中的任务

二、任务调度

上述例子都是由发布者触发的,可以使用Celery的Beat进程自动生成任务。基于系列一project目录下的源码,创建projectb目录,修改projectb/celeryconfig.py添加如下配置:

 from datetime import timedelta  # CELERYBEAT_SCHEDULE中指定了tasks.add这个任务每10秒跑一次,执行的时候参数是16和16 CELERYBEAT_SCHEDULE = {     'add':{         'task': 'projectb.tasks.add',         'schedule': timedelta(seconds=10),         'args': (16, 16)     } }  

启动Beat程序:

yangyang@yangyang-VirtualBox:~$ celery beat -A projectb celery beat v4.1.0 (latentcall) is starting. __    -    ... __   -        _ LocalTime -> 2017-11-21 11:23:08 Configuration ->     . broker -> amqp://guest:**@localhost:5672//     . loader -> celery.loaders.app.AppLoader     . scheduler -> celery.beat.PersistentScheduler     . db -> celerybeat-schedule     . logfile -> [stderr]@%WARNING     . maxinterval -> 5.00 minutes (300s)   

启动worker进程

yangyang@yangyang-VirtualBox:~$ celery -A projectb worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 11:23:33 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         projectb:0x7f7259529190 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> celery           exchange=celery(direct) key=celery                   [tasks]   . projectb.tasks.add  # 可以看到每10秒会自动执行一次tasks.add [2017-11-21 11:23:33,327: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 11:23:33,355: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 11:23:34,498: INFO/MainProcess] mingle: all alone [2017-11-21 11:23:34,553: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 11:23:36,527: INFO/MainProcess] Received task: projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb]   [2017-11-21 11:23:36,547: INFO/ForkPoolWorker-2] Task projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb] succeeded in 0.0164396990003s: 32 [2017-11-21 11:23:39,057: INFO/MainProcess] Received task: projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2]   [2017-11-21 11:23:39,092: INFO/ForkPoolWorker-2] Task projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2] succeeded in 0.0325541210004s: 32 [2017-11-21 11:23:49,052: INFO/MainProcess] Received task: projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135]   [2017-11-21 11:23:49,057: INFO/ForkPoolWorker-2] Task projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135] succeeded in 0.00214087700078s: 32  

Beat 和 Worker 进程可以一并启动,但是Celery会有多个进程

celery -B -A -projectb worker -l info

三、任务绑定、记录日志和重试

任务绑定、记录日志、和重试是Celery常用的3个高级属性,现在修改project/tasks.py文件,添加div函数用于演示:

from celery.utils.log import get_task_logger  logger = get_task_logger(__name__)  @app.task(bind=True) def div(self, x, y):     logger.info(('Executing task id {0.id}, args: {0.args!r}'                  'kwargs: {0.kwargs!r}').format(self.request))     try:         result = x / y     except ZeroDivisionError as e:         raise self.retry(exc=e, countdown=5, max_retries=3)     return result  

当使用bind=True后,函数的参数发生变化,多出了参数self,相当于把div变成了一个已绑定的方法,通过self可以获得任务的上下文。

启动project项目:

yangyang@yangyang-VirtualBox:~$ celery -A project worker -l info    -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 13:59:09 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         project:0x7f04cb5d2190 - ** ---------- .> transport:   amqp://guest:**@localhost:5672// - ** ---------- .> results:     redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> celery           exchange=celery(direct) key=celery                   [tasks]   . project.tasks.add   . project.tasks.div  [2017-11-21 13:59:11,692: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 13:59:11,730: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 13:59:13,353: INFO/MainProcess] mingle: all alone [2017-11-21 13:59:13,476: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 14:00:18,978: INFO/MainProcess] Received task: project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df]   [2017-11-21 14:00:18,982: INFO/ForkPoolWorker-2] project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df]: Executing task id 0d2502ba-cf48-4973-98de-09e73050d4df, args: [2, 1]kwargs: {} [2017-11-21 14:00:19,053: INFO/ForkPoolWorker-2] Task project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df] succeeded in 0.0714741849988s: 2 [2017-11-21 14:01:17,100: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]   [2017-11-21 14:01:17,103: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:17,346: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:22.110529+00:00]  [2017-11-21 14:01:17,544: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:23,402: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:23,415: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:28.404381+00:00]  [2017-11-21 14:01:23,420: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:28,490: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:28,500: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]  ETA:[2017-11-21 06:01:33.494378+00:00]  [2017-11-21 14:01:28,501: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:33,655: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:33,660: ERROR/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] raised unexpected: ZeroDivisionError('integer division or modulo by zero',) Traceback (most recent call last):   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 374, in trace_task     R = retval = fun(*args, **kwargs)   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 629, in __protected_call__     return self.run(*args, **kwargs)   File "/home/yangyang/project/tasks.py", line 24, in div     raise  self.retry(exc=e, countdown=5, max_retries=3)   File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 669, in retry     raise_with_context(exc)   File "/home/yangyang/project/tasks.py", line 22, in div     result = x / y ZeroDivisionError: integer division or modulo by zero  

在IPython中调用div:

In [1]: from project.tasks import div  In [2]: r = div.delay(2,1)  # 执行能造成异常的参数,发现每五秒就会重试一次,默认重复三次,然后抛出异常 In [3]: r = div.delay(2,0)  

本文发表于2017年11月21日 16:33
(c)注:本文转载自https://my.oschina.net/zhangyangyang/blog/1576615,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2399 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1