一、指定队列
Celery非常容易设置和运行,通常它会使用默认的名为celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)用来存放任务。可以使用优先级不同来确保高优先级的任务不需要等待就得到响应。
基于Celery系列一的project代码,创建一个projectq目录,并修改celeryconfig.py,修改后如下:
# -*- coding:utf-8 -*- BROKER_URL = 'amqp://guest:guest@localhost:5672//' # 使用RabbitMQ作为消息代理 CELERY_TASK_PROTOCOL = 1 # 现在celery升级到了4.0,是老版本的librabbitmq与最新的celery4.0 Message Protocol协议不兼容,celery4.0默认使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把结果存在Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化肯反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用可读性更好的json CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的内容类型 # 新加入的配置参数 from kombu import Queue # 定义任务队列 CELERY_QUEUES = ( QUEUE('default', routing_key='task.#'), # 路由键以‘task.’开头的都进default队列 QUEUE('web_tasks', routing_key='web.#'), # 路由键以‘web.’开头的都进web_tasks队列 ) CELERY_DEFAULT_EXCHANGE = 'tasks' # 默认的交换机名字为tasks CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认的交换类型是topic CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列 CELERY_ROUTES = { # tasks.add的消息会进入web_tasks队列 'projectq.tasks.add': { 'queue': 'web_tasks', 'routing_key': 'web.add', } }
用指定队列的方式启动消费者进程:
yangyang@yangyang-VirtualBox:~$ celery -A projectq -Q web_tasks worker -l info -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 10:25:16 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: project:0x7f6e064e1110 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> web_tasks exchange=web_tasks(direct) key=web_tasks [tasks] . project.tasks.add [2017-11-21 10:25:16,432: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 10:25:16,463: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 10:25:17,649: INFO/MainProcess] mingle: all alone [2017-11-21 10:25:18,006: INFO/MainProcess] celery@yangyang-VirtualBox ready.
上述worker只会执行web_tasks中的任务
二、任务调度
上述例子都是由发布者触发的,可以使用Celery的Beat进程自动生成任务。基于系列一project目录下的源码,创建projectb目录,修改projectb/celeryconfig.py添加如下配置:
from datetime import timedelta # CELERYBEAT_SCHEDULE中指定了tasks.add这个任务每10秒跑一次,执行的时候参数是16和16 CELERYBEAT_SCHEDULE = { 'add':{ 'task': 'projectb.tasks.add', 'schedule': timedelta(seconds=10), 'args': (16, 16) } }
启动Beat程序:
yangyang@yangyang-VirtualBox:~$ celery beat -A projectb celery beat v4.1.0 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2017-11-21 11:23:08 Configuration -> . broker -> amqp://guest:**@localhost:5672// . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%WARNING . maxinterval -> 5.00 minutes (300s)
启动worker进程
yangyang@yangyang-VirtualBox:~$ celery -A projectb worker -l info -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 11:23:33 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: projectb:0x7f7259529190 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . projectb.tasks.add # 可以看到每10秒会自动执行一次tasks.add [2017-11-21 11:23:33,327: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 11:23:33,355: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 11:23:34,498: INFO/MainProcess] mingle: all alone [2017-11-21 11:23:34,553: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 11:23:36,527: INFO/MainProcess] Received task: projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb] [2017-11-21 11:23:36,547: INFO/ForkPoolWorker-2] Task projectb.tasks.add[fffffae8-01cf-43af-8559-21cd67113cdb] succeeded in 0.0164396990003s: 32 [2017-11-21 11:23:39,057: INFO/MainProcess] Received task: projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2] [2017-11-21 11:23:39,092: INFO/ForkPoolWorker-2] Task projectb.tasks.add[4baaf24c-6050-4fef-a27d-ca04790c17f2] succeeded in 0.0325541210004s: 32 [2017-11-21 11:23:49,052: INFO/MainProcess] Received task: projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135] [2017-11-21 11:23:49,057: INFO/ForkPoolWorker-2] Task projectb.tasks.add[47b6358a-ca5e-4ab3-9fa0-c4fef101c135] succeeded in 0.00214087700078s: 32
Beat 和 Worker 进程可以一并启动,但是Celery会有多个进程
celery -B -A -projectb worker -l info
三、任务绑定、记录日志和重试
任务绑定、记录日志、和重试是Celery常用的3个高级属性,现在修改project/tasks.py文件,添加div函数用于演示:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info(('Executing task id {0.id}, args: {0.args!r}' 'kwargs: {0.kwargs!r}').format(self.request)) try: result = x / y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
当使用bind=True后,函数的参数发生变化,多出了参数self,相当于把div变成了一个已绑定的方法,通过self可以获得任务的上下文。
启动project项目:
yangyang@yangyang-VirtualBox:~$ celery -A project worker -l info -------------- celery@yangyang-VirtualBox v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 13:59:09 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: project:0x7f04cb5d2190 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: redis://localhost:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . project.tasks.add . project.tasks.div [2017-11-21 13:59:11,692: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2017-11-21 13:59:11,730: INFO/MainProcess] mingle: searching for neighbors [2017-11-21 13:59:13,353: INFO/MainProcess] mingle: all alone [2017-11-21 13:59:13,476: INFO/MainProcess] celery@yangyang-VirtualBox ready. [2017-11-21 14:00:18,978: INFO/MainProcess] Received task: project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df] [2017-11-21 14:00:18,982: INFO/ForkPoolWorker-2] project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df]: Executing task id 0d2502ba-cf48-4973-98de-09e73050d4df, args: [2, 1]kwargs: {} [2017-11-21 14:00:19,053: INFO/ForkPoolWorker-2] Task project.tasks.div[0d2502ba-cf48-4973-98de-09e73050d4df] succeeded in 0.0714741849988s: 2 [2017-11-21 14:01:17,100: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] [2017-11-21 14:01:17,103: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:17,346: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] ETA:[2017-11-21 06:01:22.110529+00:00] [2017-11-21 14:01:17,544: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:23,402: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:23,415: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] ETA:[2017-11-21 06:01:28.404381+00:00] [2017-11-21 14:01:23,420: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:28,490: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:28,500: INFO/MainProcess] Received task: project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] ETA:[2017-11-21 06:01:33.494378+00:00] [2017-11-21 14:01:28,501: INFO/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] retry: Retry in 5s: ZeroDivisionError('integer division or modulo by zero',) [2017-11-21 14:01:33,655: INFO/ForkPoolWorker-2] project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca]: Executing task id 83088c09-2fa8-4dd7-a7b3-30de18f436ca, args: [2, 0]kwargs: {} [2017-11-21 14:01:33,660: ERROR/ForkPoolWorker-2] Task project.tasks.div[83088c09-2fa8-4dd7-a7b3-30de18f436ca] raised unexpected: ZeroDivisionError('integer division or modulo by zero',) Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 374, in trace_task R = retval = fun(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 629, in __protected_call__ return self.run(*args, **kwargs) File "/home/yangyang/project/tasks.py", line 24, in div raise self.retry(exc=e, countdown=5, max_retries=3) File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 669, in retry raise_with_context(exc) File "/home/yangyang/project/tasks.py", line 22, in div result = x / y ZeroDivisionError: integer division or modulo by zero
在IPython中调用div:
In [1]: from project.tasks import div In [2]: r = div.delay(2,1) # 执行能造成异常的参数,发现每五秒就会重试一次,默认重复三次,然后抛出异常 In [3]: r = div.delay(2,0)