调用任务:Calling Tasks
这些 API 定义了标准的执行选项集,也就是下面这三个方法:
- apply_async(args[, kwargs[, ...]])发送一个任务消息。
- delay(*args, **kwargs)直接发送一个任务消息,但是不支持运行参数。
- calling(__call__)应用一个支持调用接口(例如,add(2,2))的对象,意味着任务不会被一个 worker 执行,但是会在当前线程中执行(但是消息不会被发送)。
T.delay(arg, kwarg=value)
调用 apply_async 的快捷方式(.delay(_args, *_kwargs)等价于调用 .apply_async(args, kwargs))。T.apply_async((arg,), {'kwarg': value})
T.apply_async(countdown=10)
从现在起, 十秒内执行。T.apply_async(eta=now + timedelta(seconds=10))
从现在起十秒内执行,指明使用eta。T.apply_async(countdown=60, expires=120)
从现在起一分钟执行,但在两分钟后过期。T.apply_async(expires=now + timedelta(days=2))
两天内过期,使用datetime对象。
delay()
方法就像一个很规则的函数,很方便去调用它:task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
用
apply_async()
替代你写的:task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
尽管运行十分方便,但是如果像设置额外的行参数,你必须用
apply_async
。如果任务在当前线程没有注册,你可以通过名字替代的方法使用 send_task() 去调用这个任务。
接下来我们着重的任务参数详情(task excution options),所有的例子都基于一个任务add,返回两个参数之和:
@app.task
def add(x, y):
return x + y
你也多了解下一章将会讲到的Canvas,签名的对象用来传递任务的签名(例如,通过网络发送),它们还支持API调用:
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
Celery支持将任务链,一个任务在另一个任务之后。回调任务将用父任务的结果作为一部分参数:
res = add.apply_async((2, 2), link=add.s(16))
# 译者注
# res.get() --> 4
# res.children[0].get() --> 20
第一个 任务的结果(4)会被发送下一个新的任务的参数去加上16,可以这样表达
(2+2)+16=20
。如果task引发异常(errback),您还可以使异常的回调,但这与常规回调的行为不同,因为它将被传递父任务的ID,而不是结果。这是因为抛出序列化引发的异常,因此错误回调需要启用backend,并且任务必须检索任务的结果。
这是一个错误回调的例子:
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
uuid, exc, result.traceback))
可以使用
link_error
执行选项将其添加到任务中:add.apply_async((2, 2), link_error=error_handler.s())
此外,
link
和 link_error
选项都可以是list:add.apply_async((2, 2), link=[add.s(16), other_task.s()])
然后将依次调用回调/错误返回,并且将使用父任务的返回值作为部分参数来调用所有回调。
Celery 可以通过消息回调获取所有状态的改变。例如对于长时任务发送人任务进程,你可以这样做:
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
def on_raw_message(body):
print(body)
r = hello.apply_async(4, 6)
print(r.get(on_message=on_raw_message, propagate=False))
将生成如下输出:
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
ETA(estimated time of arrival, 预计到底时间)让你设置一个日期和时间,在这个时间之前任务将被执行。countdown 是一种以秒为单位设置ETA的快捷方式。
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # this takes at least 3 seconds to return
20
确保任务在指定的日期和时间之后的某个时间执行,但不一定在该时间执行。可能原因可能包括许多项目在队列中等待,或者严重的网络延迟。为了确保您的任务及时执行,你应该监视队列中的拥塞情况。使用Munin或类似工具来接收警报,因此可以采取适当的措施来减轻负载。点击查看Munin。
尽管
countdown
是整数,但eta必须是一个 datetime
对象,并指定确切的日期和时间(包括毫秒精度和时区信息):>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
expries
参数定义了一个可选的到期时间,既可以作为任务之后秒发布,或在特定日期和时间使用 datetime
:>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)
>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now() + timedelta(days=1)
当连接失败时,Celery 会自动重试发送消息,并且可以配置重试行为(例如重试频率或最大重试次数)或全部禁用。
add.apply_async((2, 2), retry=False)
相关设定
重试策略是一种控制重试行为的映射,可以包含以下键:
- max_retries最大重试次数,在这种情况下,将抛出重试失败的异常。值为None意味着它将永远重试。默认值为重试3次。
- interval_start定义两次重试之间要等待的秒数(浮点数或整数)。默认值为0(第一次重试是瞬时的)。
- interval_step在每次连续重试时,此数字将被添加到重试延迟中(浮点数或整数)。默认值为0.2。
- interval_max重试之间等待的最大秒数(浮点数或整数)。默认值为0.2。例如,默认策略与以下内容相关:
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
重试的最长时间为0.4秒。默认情况下将其设置为相对较短,因为如果代理连接断开,连接失败可能导致重试堆效应–例如,许多 Web 服务器进程正在等待重试,从而阻止了其他传入请求。
当您发送任务并且传输连接丢失或无法启动连接时,将引发
OperationalError
错误:>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
您也可以处理此错误:
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
在客户端和工作人员之间传输的数据需要进行序列化,因此 Celery 中的每条消息都有一个 content_type 标头,该标头描述了用于对其进行编码的序列化方法。
pickle 模块允许执行任意功能,请参阅安全指南。
Celery 还带有一个特殊的序列化程序,该序列化程序使用加密技术对您的消息进行签名。
每个序列化器都有其优点和缺点:
- JSON 的缺点是会限制你使用如下的数据类型:字符串、Unicode、字典和列表。小数和日期明显缺失。二进制数据使用 Base64 编码进行传输,这与支持纯二进制传输的相比,数据传输量增长了34%。但如果你的数据满足上述限制,并且你需要跨语言支持,则 JSON 可能是你的最佳选择。
- pickle - 如果你除了 Python 外,并不需要支持其他语言,那么使用 pickle 编码将让你获得对所有 Python 内建类型的支持(类实例除外)。相比 JSON,使用 pickle 序列化的二进制文件更小,传输速度更快。
- yaml - YAML 和 JSON 有许多相似的特征,yaml 支持包括日期、递归引用在内的更多数据类型。然而,Python 的 YMAL 库相比 JSON 库 要慢很多。如果你需要更具表现能力的数据集合,则 YMAL 比上面的序列化方式更适合。
- msgpack - msgpack 是一种接近 JSON 的二进制序列化格式。但是,它还很年轻,因此此时应该将支持视为实验性的
编码类型可以用作消息头,因此 workers 知道如何反序列化所有的任务。如果你使用自定义序列方案,则该序列化必须被 workers 支持。
发送任务时的序列化配置优先级如下(从高到低):
- 1.
serializer
执行选项。
为单个任务调用设置序列化方式:
>>> add.apply_async((10, 10), serializer='json')
Celery 可以使用以下内建方案压缩消息。
- brotlibrotli 针对 web 进行了优化,尤其是小型文档。该压缩对诸如字体、html页面等静态内容最有效。要使用 brotli,请用以下命令进行安装。$ pip install celery[brotli]
- bzip2bzip2 创建的文件比 gzip 小,但是压缩和解压的速度明显慢于 gzip。要使用 bzip2,请确保 bzip2 已经编译到你的 Python 可执行文件中。>>> import bz2Traceback (most recent call last):File "<stdin>", line 1, in <module>ImportError: No module named 'bz2'这意味着你应该重新编译支持 bzip2 的 Python 版本。
- gzipgzip 适用于内存占用较小的系统,因此 gzip 非常适合内存有限的系统。该压缩常用语生成带有 “.tar.gz” 后缀的文件。要使用 gzip,请确保 gzip 已经编译到你的 Python 可执行文件中。如果你得到以下错误[ImportError](https://docs.python.org/dev/library/exceptions.html#ImportError)>>> import gzipTraceback (most recent call last):File "<stdin>", line 1, in <module>ImportError: No module named 'gzip'这意味着你应该重新编译支持 gzip 的 Python 版本。
- lzmalzma 具有较好的压缩效率以及压缩解压速度,但内存消耗更大。要使用 lzma,请确保 gzip 已经编译到你的 Python 可执行文件中,并且你的 Python 版本为3.3或更高版本。>>> import lzmaTraceback (most recent call last):File "<stdin>", line 1, in <module>ImportError: No module named 'lzma'这意味着你应该重新编译支持 lzam 的 Python 版本。也可以通过以下的方式进行安装:$ pip install celery[lzma]
- zlibzlib 是 Deflate 算法的抽象,它的 API 支持包括 gzip 格式和轻量级流格式文件的支持。zlib 是许多软件系统的重要组成部分,例如 Linux 内核以及 Git VCS。要使用 zlib,请确保 zlib 已经编译到你的 Python 可执行文件中。>>> import zlibTraceback (most recent call last):File "<stdin>", line 1, in <module>ImportError: No module named 'zlib'这意味着你应该重新编译支持 zlib 的 Python 版本。
- zstdzstd是一个针对 zlib 的实时压缩方案,且有着更好的压缩效率。zstd 由 Huff0 和 FSE 库提供快速算法。要使用zstd,请用以下命令进行安装。$ pip install celery[zstd]
发送任务时的压缩方案配置优先级如下(从高到低):
- 1.
compression
执行选项。 - 2.Task.compression 属性。
- 3.task_compression 属性。
任务调用时指定压缩方法的示例:
>>> add.apply_async((2, 2), compression='zlib')
- 从2.3版开始,支持自动连接池,因此您不必手动处理连接和发布者即可重用连接。
- 从2.5版开始,默认情况下启用连接池。
您可以通过创建发布者来手动处理连接:
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for args in numbers:
res = add.apply_async((2, 2), publisher=publisher)
results.append(res)
print([res.get() for res in results])
尽管这是个特定示例,但是可以更好的展现一组:
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
Celery 可以将任务路由到不同的队列。
使用以下 queue 可以完成简单的路由(name <-> name):
add.apply_async(queue='priority.high')
然后,您可以指派 workers 给 priority.high 的队列,使用 worker -Q 参数将分配给队列:
$ celery -A proj worker -l info -Q celery,priority.high
- 交换(exchange)发送信息的 exchange(或者 kombu.entity.Exchange) 的名称。
- routing_key用于确定路由的密钥。
- 优先(priority)0~255 之间的数字,其中255是最高优先级。支持:RabbitMQ,Redis(优先级颠倒,最高为0)。
Last modified 3yr ago