调用任务:Calling Tasks

基础入门

本文档介绍了任务实例和 canvas 对 Celery 统一的调用接口。

这些 API 定义了标准的执行选项集,也就是下面这三个方法:

  • apply_async(args[, kwargs[, ...]])

    发送一个任务消息。

  • delay(*args, **kwargs)

    直接发送一个任务消息,但是不支持运行参数。

  • calling(__call__)

    应用一个支持调用接口(例如,add(2,2))的对象,意味着任务不会被一个 worker 执行,但是会在当前线程中执行(但是消息不会被发送)。

速查表

  • T.delay(arg, kwarg=value)

    调用 apply_async 的快捷方式(.delay(_args, *_kwargs)等价于调用 .apply_async(args, kwargs))。

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10)

    从现在起, 十秒内执行。

  • T.apply_async(eta=now + timedelta(seconds=10))

    从现在起十秒内执行,指明使用eta。

  • T.apply_async(countdown=60, expires=120)

    从现在起一分钟执行,但在两分钟后过期。

  • T.apply_async(expires=now + timedelta(days=2))

    两天内过期,使用datetime对象。

例子

delay() 方法就像一个很规则的函数,很方便去调用它:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async() 替代你写的:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

尽管运行十分方便,但是如果像设置额外的行参数,你必须用 apply_async

小技巧

如果任务在当前线程没有注册,你可以通过名字替代的方法使用 send_task() 去调用这个任务。

接下来我们着重的任务参数详情(task excution options),所有的例子都基于一个任务add,返回两个参数之和:

@app.task
def add(x, y):
    return x + y

还有其他的方式......

你也多了解下一章将会讲到的Canvas,签名的对象用来传递任务的签名(例如,通过网络发送),它们还支持API调用:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()

Linking(callbacks/errbacks)

Celery支持将任务链,一个任务在另一个任务之后。回调任务将用父任务的结果作为一部分参数:

res = add.apply_async((2, 2), link=add.s(16))

# 译者注
# res.get() --> 4
# res.children[0].get() --> 20

第一个任务的结果(4)会被发送下一个新的任务的参数去加上16,可以这样表达 (2+2)+16=20

如果task引发异常(errback),您还可以使异常的回调,但这与常规回调的行为不同,因为它将被传递父任务的ID,而不是结果。这是因为抛出序列化引发的异常,因此错误回调需要启用backend,并且任务必须检索任务的结果。

这是一个错误回调的例子:

@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, exc, result.traceback))

可以使用 link_error 执行选项将其添加到任务中:

add.apply_async((2, 2), link_error=error_handler.s())

此外,linklink_error 选项都可以是list:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

然后将依次调用回调/错误返回,并且将使用父任务的返回值作为部分参数来调用所有回调。

On Message

Celery 可以通过消息回调获取所有状态的改变。例如对于长时任务发送人任务进程,你可以这样做:

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

r = hello.apply_async(4, 6)
print(r.get(on_message=on_raw_message, propagate=False))

将生成如下输出:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10

ETA and Countdown

ETA(estimated time of arrival, 预计到底时间)让你设置一个日期和时间,在这个时间之前任务将被执行。countdown 是一种以秒为单位设置ETA的快捷方式。

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

确保任务在指定的日期和时间之后的某个时间执行,但不一定在该时间执行。可能原因可能包括许多项目在队列中等待,或者严重的网络延迟。为了确保您的任务及时执行,你应该监视队列中的拥塞情况。使用Munin或类似工具来接收警报,因此可以采取适当的措施来减轻负载。点击查看Munin

尽管 countdown 是整数,但eta必须是一个 datetime 对象,并指定确切的日期和时间(包括毫秒精度和时区信息):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)

Expiration

expries 参数定义了一个可选的到期时间,既可以作为任务之后秒发布,或在特定日期和时间使用 datetime

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

worker 收到过期的任务时,它将任务标记为REVOKED(TaskRevokedError)。

消息重发 (Message Sending Retry)

当连接失败时,Celery 会自动重试发送消息,并且可以配置重试行为(例如重试频率或最大重试次数)或全部禁用。

add.apply_async((2, 2), retry=False)

相关设定

重试策略 (Retry Plicy )

重试策略是一种控制重试行为的映射,可以包含以下键:

  • max_retries

    最大重试次数,在这种情况下,将抛出重试失败的异常。

    值为None意味着它将永远重试。

    默认值为重试3次。

  • interval_start

    定义两次重试之间要等待的秒数(浮点数或整数)。默认值为0(第一次重试是瞬时的)。

  • interval_step

    在每次连续重试时,此数字将被添加到重试延迟中(浮点数或整数)。默认值为0.2。

  • interval_max

    重试之间等待的最大秒数(浮点数或整数)。默认值为0.2。

    例如,默认策略与以下内容相关:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

重试的最长时间为0.4秒。默认情况下将其设置为相对较短,因为如果代理连接断开,连接失败可能导致重试堆效应–例如,许多 Web 服务器进程正在等待重试,从而阻止了其他传入请求。

连接错误处理(Connection Error Handling)

当您发送任务并且传输连接丢失或无法启动连接时,将引发 OperationalError 错误:

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

如果启用了重试,则只有在重试用尽后或立即禁用后才发生。

您也可以处理此错误:

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)

序列化 (Serializers)

在客户端和工作人员之间传输的数据需要进行序列化,因此 Celery 中的每条消息都有一个 content_type 标头,该标头描述了用于对其进行编码的序列化方法。

默认的序列化器是JSON,但是您可以使用 task_serializer 设置更改此设置,或者针对每个任务,甚至针对每条消息进行更改。

有内置的支持JSON,pickle,YAML 和msgpack,你也可以通过他们登记到 Kombu 注册表中添加自己的自定义序列化

安全

pickle 模块允许执行任意功能,请参阅安全指南。

Celery 还带有一个特殊的序列化程序,该序列化程序使用加密技术对您的消息进行签名。

也可以看看

Kombu 中的消息序列化的用户指南。

每个序列化器都有其优点和缺点:

  • json - JSON 被大多数的编程语言支持,并且现在是 Python 标准的一部分(自2.6开始),并且现代 Python 库(例如 simplejson)具有非常快的 json 解析速度。

    JSON 的缺点是会限制你使用如下的数据类型:字符串、Unicode、字典和列表。小数和日期明显缺失。

    二进制数据使用 Base64 编码进行传输,这与支持纯二进制传输的相比,数据传输量增长了34%。

    但如果你的数据满足上述限制,并且你需要跨语言支持,则 JSON 可能是你的最佳选择。

    有关更多信息,请参见 http://json.org

  • pickle - 如果你除了 Python 外,并不需要支持其他语言,那么使用 pickle 编码将让你获得对所有 Python 内建类型的支持(类实例除外)。相比 JSON,使用 pickle 序列化的二进制文件更小,传输速度更快。

    请参阅 pickle 获得更多信息。

  • yaml - YAML 和 JSON 有许多相似的特征,yaml 支持包括日期、递归引用在内的更多数据类型。然而,Python 的 YMAL 库相比 JSON 库 要慢很多。

    如果你需要更具表现能力的数据集合,则 YMAL 比上面的序列化方式更适合。

    有关更多信息,请参见 http://yaml.org/

  • msgpack - msgpack 是一种接近 JSON 的二进制序列化格式。但是,它还很年轻,因此此时应该将支持视为实验性的

    有关更多信息,请参见 http://msgpack.org/

编码类型可以用作消息头,因此 workers 知道如何反序列化所有的任务。如果你使用自定义序列方案,则该序列化必须被 workers 支持。

发送任务时的序列化配置优先级如下(从高到低):

为单个任务调用设置序列化方式:

>>> add.apply_async((10, 10), serializer='json')

压缩 (Compression)

Celery 可以使用以下内建方案压缩消息。

  • brotli

    brotli 针对 web 进行了优化,尤其是小型文档。该压缩对诸如字体、html页面等静态内容最有效。

    要使用 brotli,请用以下命令进行安装。

    $ pip install celery[brotli]
  • bzip2

    bzip2 创建的文件比 gzip 小,但是压缩和解压的速度明显慢于 gzip。

    要使用 bzip2,请确保 bzip2 已经编译到你的 Python 可执行文件中。

    如果你得到以下错误 ImportError

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'

    这意味着你应该重新编译支持 bzip2 的 Python 版本。

  • gzip

    gzip 适用于内存占用较小的系统,因此 gzip 非常适合内存有限的系统。该压缩常用语生成带有 “.tar.gz” 后缀的文件。

    要使用 gzip,请确保 gzip 已经编译到你的 Python 可执行文件中。
    
    如果你得到以下错误[ImportError](https://docs.python.org/dev/library/exceptions.html#ImportError)
    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'

    这意味着你应该重新编译支持 gzip 的 Python 版本。

  • lzma

    lzma 具有较好的压缩效率以及压缩解压速度,但内存消耗更大。

    要使用 lzma,请确保 gzip 已经编译到你的 Python 可执行文件中,并且你的 Python 版本为3.3或更高版本。

    如果你得到以下错误 ImportError

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'

    这意味着你应该重新编译支持 lzam 的 Python 版本。

    也可以通过以下的方式进行安装:

    $ pip install celery[lzma]
  • zlib

    zlib 是 Deflate 算法的抽象,它的 API 支持包括 gzip 格式和轻量级流格式文件的支持。zlib 是许多软件系统的重要组成部分,例如 Linux 内核以及 Git VCS。

    要使用 zlib,请确保 zlib 已经编译到你的 Python 可执行文件中。

    如果你得到以下错误 ImportError

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'

    这意味着你应该重新编译支持 zlib 的 Python 版本。

  • zstd

    zstd是一个针对 zlib 的实时压缩方案,且有着更好的压缩效率。zstd 由 Huff0 和 FSE 库提供快速算法。

    要使用zstd,请用以下命令进行安装。

    $ pip install celery[zstd]

你还可以创建自己的压缩方式,并在kumbo压缩注册中注册它们。

发送任务时的压缩方案配置优先级如下(从高到低):

  • 1.compression 执行选项。

  • 2.Task.compression 属性。

  • 3.task_compression 属性。

任务调用时指定压缩方法的示例:

>>> add.apply_async((2, 2), compression='zlib')

连接(Connections)

自动池支持

  • 从2.3版开始,支持自动连接池,因此您不必手动处理连接和发布者即可重用连接。

  • 从2.5版开始,默认情况下启用连接池。

  • 有关 broker_pool_limit 更多信息,请参见设置。

您可以通过创建发布者来手动处理连接:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

尽管这是个特定示例,但是可以更好的展现一组:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]

路由选择 (Routing options)

Celery 可以将任务路由到不同的队列。

使用以下 queue 可以完成简单的路由(name <-> name):

add.apply_async(queue='priority.high')

然后,您可以指派 workers 给 priority.high 的队列,使用 worker -Q 参数将分配给队列:

$ celery -A proj worker -l info -Q celery,priority.high

也可以看看

不建议用代码对队列名称进行硬编码,最佳做法是使用配置路由器(task_routes)。

要了解有关路由的更多信息,请参阅“路由任务(Routing Tasks)”。

高级选项

这些选项适用于想要使用AMQP完整路由功能的高级用户。有兴趣的人士可以阅读路由指南

  • 交换(exchange)

    发送信息的 exchange(或者 kombu.entity.Exchange) 的名称。

  • routing_key

    用于确定路由的密钥。

  • 优先(priority)

    0~255 之间的数字,其中255是最高优先级。

    支持:RabbitMQ,Redis(优先级颠倒,最高为0)。

最后更新于